From 5d7772e823fe66fefe9455848bce0389f8b8ab56 Mon Sep 17 00:00:00 2001
From: Théo de la Hogue
Date: Wed, 27 Mar 2024 09:26:03 +0100
Subject: Defining new ArContext class.
---
src/argaze/ArFeatures.py | 265 ++++-
src/argaze/ArUcoMarkers/ArUcoCamera.py | 21 +-
src/argaze/DataFeatures.py | 322 +++---
src/argaze/utils/Providers/TobiiProGlasses2.py | 1063 --------------------
src/argaze/utils/Providers/__init__.py | 4 -
src/argaze/utils/demo/A3_demo.pdf | Bin 0 -> 125304 bytes
src/argaze/utils/demo/aoi_2d_scene.json | 18 +
src/argaze/utils/demo/aoi_3d_scene.obj | 7 +
src/argaze/utils/demo/aruco_markers_group.json | 25 +
src/argaze/utils/demo/aruco_markers_group.obj | 32 +
src/argaze/utils/demo/aruco_markers_pipeline.json | 64 ++
src/argaze/utils/demo/demo.mov | Bin 0 -> 13345258 bytes
src/argaze/utils/demo/eyetracker_setup.json | 21 +
src/argaze/utils/demo/frame_background.jpg | Bin 0 -> 19108 bytes
src/argaze/utils/demo/gaze_analysis_pipeline.json | 135 +++
src/argaze/utils/demo/loggers.py | 84 ++
src/argaze/utils/demo_data/A3_demo.pdf | Bin 125304 -> 0 bytes
src/argaze/utils/demo_data/aoi_2d_scene.json | 18 -
src/argaze/utils/demo_data/aoi_3d_scene.obj | 7 -
.../utils/demo_data/aruco_markers_group.json | 25 -
src/argaze/utils/demo_data/aruco_markers_group.obj | 32 -
src/argaze/utils/demo_data/demo.mov | Bin 13345258 -> 0 bytes
.../utils/demo_data/demo_aruco_markers_setup.json | 167 ---
.../utils/demo_data/demo_gaze_analysis_setup.json | 133 ---
src/argaze/utils/demo_data/demo_loggers.py | 84 --
src/argaze/utils/demo_data/frame_background.jpg | Bin 19108 -> 0 bytes
src/argaze/utils/demo_data/provider_setup.json | 16 -
src/argaze/utils/eyetrackers/TobiiProGlasses2.py | 1046 +++++++++++++++++++
src/argaze/utils/eyetrackers/__init__.py | 4 +
src/argaze/utils/pipeline_run.py | 83 ++
src/argaze/utils/worn_device_stream.py | 107 --
31 files changed, 1912 insertions(+), 1871 deletions(-)
delete mode 100644 src/argaze/utils/Providers/TobiiProGlasses2.py
delete mode 100644 src/argaze/utils/Providers/__init__.py
create mode 100644 src/argaze/utils/demo/A3_demo.pdf
create mode 100644 src/argaze/utils/demo/aoi_2d_scene.json
create mode 100644 src/argaze/utils/demo/aoi_3d_scene.obj
create mode 100644 src/argaze/utils/demo/aruco_markers_group.json
create mode 100644 src/argaze/utils/demo/aruco_markers_group.obj
create mode 100644 src/argaze/utils/demo/aruco_markers_pipeline.json
create mode 100644 src/argaze/utils/demo/demo.mov
create mode 100644 src/argaze/utils/demo/eyetracker_setup.json
create mode 100644 src/argaze/utils/demo/frame_background.jpg
create mode 100644 src/argaze/utils/demo/gaze_analysis_pipeline.json
create mode 100644 src/argaze/utils/demo/loggers.py
delete mode 100644 src/argaze/utils/demo_data/A3_demo.pdf
delete mode 100644 src/argaze/utils/demo_data/aoi_2d_scene.json
delete mode 100644 src/argaze/utils/demo_data/aoi_3d_scene.obj
delete mode 100644 src/argaze/utils/demo_data/aruco_markers_group.json
delete mode 100644 src/argaze/utils/demo_data/aruco_markers_group.obj
delete mode 100644 src/argaze/utils/demo_data/demo.mov
delete mode 100644 src/argaze/utils/demo_data/demo_aruco_markers_setup.json
delete mode 100644 src/argaze/utils/demo_data/demo_gaze_analysis_setup.json
delete mode 100644 src/argaze/utils/demo_data/demo_loggers.py
delete mode 100644 src/argaze/utils/demo_data/frame_background.jpg
delete mode 100644 src/argaze/utils/demo_data/provider_setup.json
create mode 100644 src/argaze/utils/eyetrackers/TobiiProGlasses2.py
create mode 100644 src/argaze/utils/eyetrackers/__init__.py
create mode 100644 src/argaze/utils/pipeline_run.py
delete mode 100644 src/argaze/utils/worn_device_stream.py
(limited to 'src')
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 5e219ff..6b4b182 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -28,7 +28,6 @@ import time
from argaze import DataFeatures, GazeFeatures
from argaze.AreaOfInterest import *
from argaze.GazeAnalysis import *
-from argaze.utils import Providers
import numpy
import cv2
@@ -53,15 +52,6 @@ class SceneProjectionFailed(Exception):
super().__init__(message)
-class LoadingFailed(Exception):
- """
- Exception raised when attributes loading fails.
- """
-
- def __init__(self, message):
-
- super().__init__(message)
-
class DrawingFailed(Exception):
"""
Exception raised when drawing fails.
@@ -263,7 +253,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if not found:
- raise LoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
+ raise DataFeatures.PipelineStepLoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None:
@@ -473,7 +463,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Init private attributes
self.__size = (1, 1)
- self.__provider = None
self.__gaze_position_calibrator = None
self.__gaze_movement_identifier = None
self.__filter_in_progress_identification = True
@@ -497,24 +486,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@size.setter
def size(self, size: tuple[int]):
self.__size = size
-
- @property
- def provider(self) -> DataFeatures.PipelineInputProvider:
- """Provider object related to this frame."""
- return self.__provider
-
- @provider.setter
- @DataFeatures.PipelineStepAttributeSetter
- def provider(self, provider: DataFeatures.PipelineInputProvider):
-
- assert(issubclass(type(provider), DataFeatures.PipelineInputProvider))
-
- self.__provider = provider
-
- # Edit parent
- if self.__provider is not None:
-
- self.__provider.parent = self
@property
def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator:
@@ -625,7 +596,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if not found:
- raise LoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
+ raise DataFeatures.PipelineStepLoadingFaile(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
if len(self.__scan_path_analyzers) > 0 and self.scan_path == None:
@@ -734,7 +705,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
d = {
**DataFeatures.PipelineStepObject.as_dict(self),
"size": self.__size,
- "provider": self.__provider,
"gaze_position_calibrator": self.__gaze_position_calibrator,
"gaze_movement_identifier": self.__gaze_movement_identifier,
"filter_in_progress_identification": self.__filter_in_progress_identification,
@@ -963,7 +933,17 @@ class ArScene(DataFeatures.PipelineStepObject):
for layer_name, layer_data in layers.items():
- self._layers[layer_name] = ArLayer(name = layer_name, **layer_data)
+ if type(layer_data) == dict:
+
+ self._layers[layer_name] = ArLayer(name = layer_name, **layer_data)
+
+ # str: relative path to JSON file
+ elif type(layer_data) == str:
+
+ self._layers[layer_name] = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), layer_data))
+
+ # Loaded layer name have to be equals to dictionary key
+ assert(self._layers[layer_name].name == frame_name)
# Edit parent
for name, layer in self._layers.items():
@@ -984,7 +964,17 @@ class ArScene(DataFeatures.PipelineStepObject):
for frame_name, frame_data in frames.items():
- new_frame = ArFrame(name = frame_name, **frame_data)
+ if type(frame_data) == dict:
+
+ new_frame = ArFrame(name = frame_name, **frame_data)
+
+ # str: relative path to JSON file
+ elif type(frame_data) == str:
+
+ new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data))
+
+ # Loaded frame name have to be equals to dictionary key
+ assert(new_frame.name == frame_name)
# Look for a scene layer with an AOI named like the frame
for scene_layer_name, scene_layer in self.layers.items():
@@ -1295,27 +1285,29 @@ class ArCamera(ArFrame):
# Is there an AOI inside camera frame layers projection which its name equals to a scene frame name?
for camera_layer_name, camera_layer in self.layers.items():
- try:
+ if camera_layer.aoi_scene:
- aoi_2d = camera_layer.aoi_scene[scene_frame.name]
+ try:
- if timestamped_gaze_position:
+ aoi_2d = camera_layer.aoi_scene[scene_frame.name]
- # TODO?: Should we prefer to use camera frame AOIMatcher object?
- if aoi_2d.contains_point(timestamped_gaze_position):
+ if timestamped_gaze_position:
- inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position)
+ # TODO?: Should we prefer to use camera frame AOIMatcher object?
+ if aoi_2d.contains_point(timestamped_gaze_position):
- # QUESTION: How to project gaze precision?
- inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp)
+ inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position)
- # Project inner gaze position into scene frame
- scene_frame.look(inner_gaze_position * scene_frame.size)
+ # QUESTION: How to project gaze precision?
+ inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp)
- # Ignore missing aoi in camera frame layer projection
- except KeyError as e:
+ # Project inner gaze position into scene frame
+ scene_frame.look(inner_gaze_position * scene_frame.size)
- pass
+ # Ignore missing aoi in camera frame layer projection
+ except KeyError as e:
+
+ pass
@DataFeatures.PipelineStepMethod
def map(self):
@@ -1348,3 +1340,182 @@ class ArCamera(ArFrame):
except KeyError:
pass
+
+
+# Define default ArContext image parameters
+DEFAULT_ARCONTEXT_IMAGE_PARAMETERS = {
+ "draw_exceptions": True
+}
+
+class ArContext(DataFeatures.PipelineStepObject):
+ """
+ Define class to ...
+ """
+
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+
+ logging.debug('ArContext.__init__')
+
+ DataFeatures.PipelineStepObject.__init__(self)
+
+ # Init private attributes
+ self.__pipeline = None
+ self.__exceptions = DataFeatures.TimeStampedExceptions()
+
+ # Init protected attributes
+ self._image_parameters = DEFAULT_ARCONTEXT_IMAGE_PARAMETERS
+
+ @property
+ def pipeline(self) -> DataFeatures.PipelineStepObject:
+ """ArFrame used to process gaze data or ArCamera used to process gaze data and video of environment."""
+ return self.__pipeline
+
+ @pipeline.setter
+ @DataFeatures.PipelineStepAttributeSetter
+ def pipeline(self, pipeline: DataFeatures.PipelineStepObject):
+
+ assert(issubclass(type(pipeline), DataFeatures.PipelineStepObject))
+
+ self.__pipeline = pipeline
+
+ @property
+ def image_parameters(self) -> dict:
+ """Default image method parameters dictionary."""
+ return self._image_parameters
+
+ @image_parameters.setter
+ @DataFeatures.PipelineStepAttributeSetter
+ def image_parameters(self, image_parameters: dict):
+
+ self._image_parameters = image_parameters
+
+ def exceptions(self) -> DataFeatures.TimestampedException:
+ """Get exceptions list"""
+ return self.__exceptions
+
+ def as_dict(self) -> dict:
+ """Export Arcontext properties as dictionary."""
+
+ return {
+ **DataFeatures.PipelineStepObject.as_dict(self),
+ "pipeline": self.__pipeline,
+ "image_parameters": self._image_parameters
+ }
+
+ def __enter__(self):
+ """
+ Define abstract __enter__ method to use device as a context.
+
+ !!! warning
+ This method is called provided that the PipelineInput is created as a context using a with statement.
+ """
+ return self
+
+ def __exit__(self, type, value, traceback):
+ """
+ Define abstract __exit__ method to use device as a context.
+
+ !!! warning
+ This method is called provided that the PipelineInput is created as a context using a with statement.
+ """
+ pass
+
+ def _process_gaze_position(self, timestamp: int|float, x: int|float = None, y: int|float = None, precision: int|float = None):
+ """Request pipeline to process new gaze position at a timestamp."""
+
+ logging.debug('%s._process_gaze_position timestamp: %f', type(self).__name__, timestamp)
+
+ if issubclass(type(self.__pipeline), ArFrame):
+
+ try:
+
+ if x is None and y is None:
+
+ # Edit empty gaze position
+ self.__pipeline.look( GazeFeatures.GazePosition( timestamp = timestamp) )
+
+ else:
+
+ # Edit gaze position
+ self.__pipeline.look( GazeFeatures.GazePosition( (x, y), precision = precision, timestamp = timestamp) )
+
+ except DataFeatures.TimestampedException as e:
+
+ self.__exceptions.append(e)
+
+ else:
+
+ raise(TypeError('Pipeline is not ArFrame instance.'))
+
+ def _process_camera_image(self, timestamp: int|float, image: numpy.ndarray):
+ """Request pipeline to process new camera image at a timestamp."""
+
+ logging.debug('%s._process_camera_image timestamp: %f', type(self).__name__, timestamp)
+
+ if issubclass(type(self.__pipeline), ArCamera):
+
+ height, width, _ = image.shape
+
+ # Compare image size with ArCamera frame size
+ if width != self.__pipeline.size[0] or height != self.__pipeline.size[1]:
+
+ logging.warning('image size (%i x %i) is different of ArCamera frame size (%i x %i)', width, height, self.__pipeline.size[0], self.__pipeline.size[1])
+ return
+
+ try:
+
+ logging.debug('\t> watch image (%i x %i)', width, height)
+
+ self.__pipeline.watch( image, timestamp = timestamp)
+
+ except DataFeatures.TimestampedException as e:
+
+ logging.warning('%s', e)
+
+ self.__exceptions.append(e)
+
+ else:
+
+ raise(TypeError('Pipeline is not ArCamera instance.'))
+
+ def __image(self, draw_exceptions: bool):
+ """
+ Get pipeline image with execution informations.
+
+ Parameters:
+ draw_exceptions: ...
+ """
+ logging.debug('%s.__image', type(self).__name__)
+
+ image = self.__pipeline.image()
+ height, width, _ = image.shape
+
+ logging.debug('\t> get image (%i x %i)', width, height)
+
+ if draw_exceptions:
+
+ # Write exceptions
+ while self.__exceptions:
+
+ e = self.__exceptions.pop()
+ i = len(self.__exceptions)
+
+ cv2.rectangle(image, (0, height-(i+1)*50), (width, height-(i)*50), (0, 0, 127), -1)
+ cv2.putText(image, f'error: {e}', (20, height-(i+1)*50+25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+
+ return image
+
+ def image(self, **kwargs: dict) -> numpy.array:
+ """
+ Get pipeline image.
+
+ Parameters:
+ kwargs: PipelineInput.__image parameters
+ """
+ # Use image_parameters attribute if no kwargs
+ if kwargs:
+
+ return self.__image(**kwargs)
+
+ return self.__image(**self._image_parameters)
diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py
index df1362a..dda55be 100644
--- a/src/argaze/ArUcoMarkers/ArUcoCamera.py
+++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py
@@ -52,6 +52,7 @@ class ArUcoCamera(ArFeatures.ArCamera):
# Init private attribute
self.__aruco_detector = None
+ self.__sides_mask = 0
self._image_parameters = {**ArFeatures.DEFAULT_ARFRAME_IMAGE_PARAMETERS, **DEFAULT_ARUCOCAMERA_IMAGE_PARAMETERS}
@property
@@ -71,7 +72,7 @@ class ArUcoCamera(ArFeatures.ArCamera):
# Optic parameters dimensions should be equal to camera frame size
if self.__aruco_detector.optic_parameters.dimensions != self.size:
- raise ArFeatures.LoadingFailed('ArUcoCamera: aruco_detector.optic_parameters.dimensions have to be equal to size.')
+ raise DataFeatures.PipelineStepLoadingFaile('ArUcoCamera: aruco_detector.optic_parameters.dimensions have to be equal to size.')
# No optic parameters loaded
else:
@@ -85,6 +86,16 @@ class ArUcoCamera(ArFeatures.ArCamera):
self.__aruco_detector.parent = self
+ @property
+ def sides_mask(self) -> int:
+ """Size of mask (pixel) to hide video left and right sides."""
+ return self.__sides_mask
+
+ @sides_mask.setter
+ def sides_mask(self, size: int):
+
+ self.__sides_mask = size
+
@ArFeatures.ArCamera.scenes.setter
@DataFeatures.PipelineStepAttributeSetter
def scenes(self, scenes: dict):
@@ -124,6 +135,14 @@ class ArUcoCamera(ArFeatures.ArCamera):
# Use camera frame locker feature
with self._lock:
+ # Draw black rectangles to mask sides
+ if self.__sides_mask > 0:
+
+ height, width, _ = image.shape
+
+ cv2.rectangle(image, (0, 0), (self.__sides_mask, height), (0, 0, 0), -1)
+ cv2.rectangle(image, (width - self.__sides_mask, 0), (width, height), (0, 0, 0), -1)
+
# Detect aruco markers
self.__aruco_detector.detect_markers(image, timestamp=self.timestamp)
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index e24ecf1..f573f1c 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -41,12 +41,6 @@ WORKING_DIRECTORY = [None]
def get_working_directory() -> str:
"""Get global working directory."""
-
- # Check global working directory
- if WORKING_DIRECTORY[0] is None:
-
- raise(ValueError(f'No working directory.'))
-
return WORKING_DIRECTORY[0]
def set_working_directory(working_directory: str):
@@ -62,15 +56,6 @@ def set_working_directory(working_directory: str):
WORKING_DIRECTORY[0] = working_directory
-def module_path(obj) -> str:
- """
- Get object module path.
-
- Returns:
- module path
- """
- return obj.__class__.__module__
-
def get_class(class_path: str) -> object:
"""Get class object from 'path.to.class' string.
@@ -105,6 +90,113 @@ def properties(cls) -> list:
return properties
+def from_json(configuration_filepath: str, patch_filepath: str = None) -> object:
+ """
+ Load object instance from .json file.
+
+ !!! note
+ The directory where configuration file is will be the global working directory.
+
+ Parameters:
+ configuration_filepath: path to json configuration file
+ patch_filepath: path to json patch file to modify any configuration entries
+ """
+
+ logging.debug('DataFeatures.from_json')
+
+ # Edit working directory once
+ if get_working_directory() is None:
+
+ set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath)))
+
+ logging.debug('\t> set global working directory as %s', get_working_directory())
+
+ # Load configuration from JSON file
+ with open(configuration_filepath) as configuration_file:
+
+ object_data = json.load(configuration_file)
+
+ # Apply patch to configuration if required
+ if patch_filepath is not None:
+
+ with open(patch_filepath) as patch_file:
+
+ patch_data = json.load(patch_file)
+
+ import collections.abc
+
+ def update(d, u):
+
+ for k, v in u.items():
+
+ if isinstance(v, collections.abc.Mapping):
+
+ d[k] = update(d.get(k, {}), v)
+
+ elif v is None:
+
+ del d[k]
+
+ else:
+
+ d[k] = v
+
+ return d
+
+ objects_data = update(object_data, patch_data)
+
+ # Load unique object
+ object_class, object_data = object_data.popitem()
+
+ # Instanciate class
+ logging.debug('\t+ create %s object', object_class)
+
+ return get_class(object_class)(**object_data)
+
+def from_dict(expected_value_type: type, data: dict) -> any:
+ """Load expected type instance(s) from dict values."""
+
+ logging.debug('\t> load %s from dict', expected_value_type.__name__)
+
+ # Check if json keys are PipelineStepObject class and store them in a list
+ new_objects_list = []
+
+ for key, value in data.items():
+
+ try:
+
+ new_class = get_class(key)
+
+ except ValueError as e:
+
+ # Keys are not class name
+ if str(e) == 'Empty module name':
+
+ break
+
+ else:
+
+ raise(e)
+
+ logging.debug('\t+ create %s object from key using value as argument', key)
+
+ new_objects_list.append( new_class(**value) )
+
+ # Only one object have been loaded: pass the object if it is a subclass of expected type
+ if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type):
+
+ return new_objects_list[0]
+
+ # Pass non empty objects list
+ elif len(new_objects_list) > 0:
+
+ return new_objects_list
+
+ # Otherwise, data are parameters of the expected class
+ logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__)
+
+ return expected_value_type(**data)
+
def as_dict(obj, filter: bool=True) -> dict:
"""Export object as dictionary.
@@ -229,7 +321,7 @@ class TimestampedObjectsList(list):
def append(self, ts_object: TimestampedObject|dict):
"""Append timestamped object."""
- # Convert dict into GazePosition
+ # Convert dict into object
if type(ts_object) == dict:
ts_object = self.__object_type.from_dict(ts_object)
@@ -472,6 +564,33 @@ class SharedObject(TimestampedObject):
self._execution_times = {}
self._exceptions = {}
+class TimestampedException(Exception, TimestampedObject):
+ """Wrap exception to keep track of raising timestamp."""
+
+ def __init__(self, exception = Exception, timestamp: int|float = math.nan):
+
+ Exception.__init__(self, exception)
+ TimestampedObject.__init__(self, timestamp)
+
+class TimeStampedExceptions(TimestampedObjectsList):
+ """Handle timestamped exceptions into a list."""
+
+ def __init__(self, exceptions: list = []):
+
+ TimestampedObjectsList.__init__(self, TimestampedException, exceptions)
+
+ def values(self) -> list[str]:
+ """Get all timestamped exception values as list of messages."""
+ return [ts_exception.message for ts_exception in self]
+
+class PipelineStepLoadingFailed(Exception):
+ """
+ Exception raised when pipeline step object loading fails.
+ """
+ def __init__(self, message):
+
+ super().__init__(message)
+
def PipelineStepInit(method):
"""Define a decorator use into PipelineStepObject class to declare pipeline step init method."""
@@ -512,53 +631,10 @@ def PipelineStepAttributeSetter(method):
except KeyError:
- raise(ValueError(f'Annotations are missing for {method.__name__}: {method.__annotations__}'))
-
- logging.debug('@PipelineStepAttributeSetter %s.%s.setter(%s) with %s', type(self).__name__, method.__name__, expected_value_type.__name__, new_value_type.__name__)
+ raise(PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}'))
- # Define function to load dict values
- def load_dict(data: dict) -> any:
-
- logging.debug('\t> load %s from %s', expected_value_type.__name__, new_value_type.__name__)
-
- # Check if json keys are PipelineStepObject class and store them in a list
- new_objects_list = []
-
- for key, value in data.items():
-
- try:
-
- new_class = get_class(key)
-
- except ValueError as e:
-
- # Keys are not class name
- if str(e) == 'Empty module name':
-
- break
-
- else:
-
- raise(e)
-
- logging.debug('\t+ create %s object from key using value as argument', key)
-
- new_objects_list.append( new_class(**value) )
-
- # Only one object have been loaded: pass the object if it is a subclass of expected type
- if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type):
-
- return new_objects_list[0]
-
- # Pass non empty objects list
- elif len(new_objects_list) > 0:
-
- return new_objects_list
-
- # Otherwise, data are parameters of the expected class
- logging.debug('\t+ create %s object using %s as argument', expected_value_type.__name__, new_value_type.__name__)
-
- return expected_value_type(**data)
+ logging.debug('%s@%s.setter', type(self).__name__, method.__name__)
+ logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__)
# String not expected: load value from file
if new_value_type == str and new_value_type != expected_value_type:
@@ -568,28 +644,28 @@ def PipelineStepAttributeSetter(method):
# String have a dot inside: file path with format
if len(split_point) > 1:
- file_format = split_point[-1]
+ file_format = split_point[-1].upper()
- logging.debug('\t> %s is a path to a %s file', new_value, file_format.upper())
+ logging.debug('\t> %s is a path to a %s file', new_value, file_format)
filepath = os.path.join(get_working_directory(), new_value)
# Load image from JPG and PNG formats
- if file_format == 'jpg' or file_format == 'png':
+ if file_format == 'JPG' or file_format == 'PNG':
return method(self, cv2.imread(filepath))
# Load image from OBJ formats
- elif file_format == 'obj':
+ elif file_format == 'OBJ':
return method(self, expected_value_type.from_obj(filepath))
# Load object from JSON file
- elif file_format == 'json':
+ elif file_format == 'JSON':
with open(filepath) as file:
- return method(self, load_dict(json.load(file)))
+ return method(self, from_dict(expected_value_type, json.load(file)))
# No point inside string: identifier name
else:
@@ -602,7 +678,7 @@ def PipelineStepAttributeSetter(method):
# Dict not expected: load value from dict
if new_value_type == dict and expected_value_type != dict:
- return method(self, load_dict(new_value))
+ return method(self, from_dict(expected_value_type, new_value))
# Otherwise, pass new value to setter method
logging.debug('\t> use %s value as passed', new_value_type.__name__)
@@ -620,7 +696,7 @@ class PipelineStepObject():
def __init__(self, **kwargs):
"""Initialize PipelineStepObject."""
- logging.debug('PipelineStepObject.__init__ %s', type(self).__name__)
+ logging.debug('%s.__init__', type(self).__name__)
# Init private attribute
self.__name = None
@@ -633,6 +709,8 @@ class PipelineStepObject():
def __enter__(self):
"""At with statement start."""
+ logging.debug('%s.__enter__', type(self).__name__)
+
# Start children pipeline step objects
for child in self.children:
@@ -648,6 +726,8 @@ class PipelineStepObject():
def __exit__(self, exception_type, exception_value, exception_traceback):
"""At with statement end."""
+ logging.debug('%s.__exit__', type(self).__name__)
+
# End observers
for observer in self.__observers:
@@ -665,7 +745,7 @@ class PipelineStepObject():
if hasattr(self, key):
- logging.debug('PipelineStepObject.update_attributes %s.%s with %s value', type(self).__name__, key, type(value).__name__)
+ logging.debug('%s.update_attributes > update %s with %s value', type(self).__name__, key, type(value).__name__)
setattr(self, key, value)
@@ -721,65 +801,6 @@ class PipelineStepObject():
"observers": self.__observers
}
- @classmethod
- def from_json(cls, configuration_filepath: str, patch_filepath: str = None) -> object:
- """
- Load instance from .json file.
-
- !!! note
- The directory where configuration file is will be the global working directory.
-
- Parameters:
- configuration_filepath: path to json configuration file
- patch_filepath: path to json patch file to modify any configuration entries
- """
-
- logging.debug('%s.from_json', cls.__name__)
-
- # Edit working directory
- set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath)))
-
- logging.debug('\t> set global working directory as %s', get_working_directory())
-
- # Load configuration from JSON file
- with open(configuration_filepath) as configuration_file:
-
- object_data = json.load(configuration_file)
-
- # Apply patch to configuration if required
- if patch_filepath is not None:
-
- with open(patch_filepath) as patch_file:
-
- patch_data = json.load(patch_file)
-
- import collections.abc
-
- def update(d, u):
-
- for k, v in u.items():
-
- if isinstance(v, collections.abc.Mapping):
-
- d[k] = update(d.get(k, {}), v)
-
- elif v is None:
-
- del d[k]
-
- else:
-
- d[k] = v
-
- return d
-
- object_data = update(object_data, patch_data)
-
- # Instanciate class
- logging.debug('\t+ create %s object from configuration updated by patch', cls.__name__)
-
- return cls(**object_data)
-
def to_json(self, json_filepath: str = None):
"""Save pipeline step object into .json file."""
@@ -791,10 +812,10 @@ class PipelineStepObject():
# Open file
with open(self.__json_filepath, 'w', encoding='utf-8') as object_file:
- json.dump({module_path(self):as_dict(self)}, object_file, ensure_ascii=False, indent=4)
+ json.dump({self.__class__.__module__:as_dict(self)}, object_file, ensure_ascii=False, indent=4)
# QUESTION: maybe we need two saving mode?
- #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder)
+ #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder)
def __str__(self) -> str:
"""
@@ -973,10 +994,10 @@ def PipelineStepMethod(method):
# Call subscription
subscription(timestamp, self, exception)
- # Raise exception
+ # Raise timestamped exception
if exception is not None:
- raise exception
+ raise TimestampedException(exception, timestamp)
return result
@@ -1006,36 +1027,3 @@ class PipelineStepObserver():
This method is called provided that the observed PipelineStepObject is created as a context using a with statement.
"""
pass
-
-class PipelineInputProvider(PipelineStepObject):
- """
- Define class to ...
- """
- @PipelineStepInit
- def __init__(self, **kwargs):
-
- logging.debug('PipelineInputProvider.__init__')
-
- PipelineStepObject.__init__(self)
-
- def attach(self, method):
-
- logging.debug('PipelineInputProvider.attach', method)
-
- def __enter__(self):
- """
- Define abstract __enter__ method to use device as a context.
-
- !!! warning
- This method is called provided that the PipelineInputProvider is created as a context using a with statement.
- """
- return self
-
- def __exit__(self, type, value, traceback):
- """
- Define abstract __exit__ method to use device as a context.
-
- !!! warning
- This method is called provided that the PipelineInputProvider is created as a context using a with statement.
- """
- pass
\ No newline at end of file
diff --git a/src/argaze/utils/Providers/TobiiProGlasses2.py b/src/argaze/utils/Providers/TobiiProGlasses2.py
deleted file mode 100644
index 8ab7417..0000000
--- a/src/argaze/utils/Providers/TobiiProGlasses2.py
+++ /dev/null
@@ -1,1063 +0,0 @@
-""" Handle network connection to Tobii Pro Glasses 2 device.
- It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py).
-
-This program is free software: you can redistribute it and/or modify it under
-the terms of the GNU General Public License as published by the Free Software
-Foundation, either version 3 of the License, or (at your option) any later
-version.
-This program is distributed in the hope that it will be useful, but WITHOUT
-ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
-You should have received a copy of the GNU General Public License along with
-this program. If not, see .
-"""
-
-__author__ = "Théo de la Hogue"
-__credits__ = []
-__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
-__license__ = "GPLv3"
-
-import sys
-import logging
-import socket
-import threading
-import json
-import time
-import datetime
-import uuid
-from dataclasses import dataclass
-
-try:
- from urllib.parse import urlparse, urlencode
- from urllib.request import urlopen, Request
- from urllib.error import URLError, HTTPError
-
-except ImportError:
- from urlparse import urlparse
- from urllib import urlencode
- from urllib2 import urlopen, Request, HTTPError, URLError
-
-from argaze import DataFeatures, GazeFeatures
-from argaze.utils import UtilsFeatures
-
-import numpy
-import av
-
-socket.IPPROTO_IPV6 = 41
-
-TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f'
-TOBII_DATETIME_FORMAT_HUMREAD = '%d/%m/%Y %H:%M:%S'
-
-DEFAULT_PROJECT_NAME = 'DefaultProject'
-DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant'
-DEFAULT_RECORD_NAME = 'DefaultRecord'
-
-# Define extra classes to support Tobii data parsing
-@dataclass
-class DirSig():
- """Define dir sig data (dir sig)."""
-
- dir: int # meaning ?
- sig: int # meaning ?
-
-@dataclass
-class PresentationTimeStamp():
- """Define presentation time stamp (pts) data."""
-
- value: int
- """Pts value."""
-
-@dataclass
-class VideoTimeStamp():
- """Define video time stamp (vts) data."""
-
- value: int
- """Vts value."""
-
- offset: int
- """Primary time stamp value."""
-
-@dataclass
-class EventSynch():
- """Define event synch (evts) data."""
-
- value: int # meaning ?
- """Evts value."""
-
-@dataclass
-class Event():
- """Define event data (ets type tag)."""
-
- ets: int # meaning ?
- type: str
- tag: str # dict ?
-
-@dataclass
-class Accelerometer():
- """Define accelerometer data (ac)."""
-
- value: numpy.array
- """Accelerometer value"""
-
-@dataclass
-class Gyroscope():
- """Define gyroscope data (gy)."""
-
- value: numpy.array
- """Gyroscope value"""
-
-@dataclass
-class PupillCenter():
- """Define pupill center data (gidx pc eye)."""
-
- validity: int
- index: int
- value: tuple((float, float, float))
- eye: str # 'right' or 'left'
-
-@dataclass
-class PupillDiameter():
- """Define pupill diameter data (gidx pd eye)."""
-
- validity: int
- index: int
- value: float
- eye: str # 'right' or 'left'
-
-@dataclass
-class GazeDirection():
- """Define gaze direction data (gidx gd eye)."""
-
- validity: int
- index: int
- value: tuple((float, float, float))
- eye: str # 'right' or 'left'
-
-@dataclass
-class GazePosition():
- """Define gaze position data (gidx l gp)."""
-
- validity: int
- index: int
- l: str # ?
- value: tuple((float, float))
-
-@dataclass
-class GazePosition3D():
- """Define gaze position 3D data (gidx gp3)."""
-
- validity: int
- index: int
- value: tuple((float, float))
-
-@dataclass
-class MarkerPosition():
- """Define marker data (marker3d marker2d)."""
-
- value_3d: tuple((float, float, float))
- value_2d: tuple((float, float))
-
-class TobiiJsonDataParser():
-
- def __init__(self):
-
- self.__first_ts = 0
-
- self.__parse_data_map = {
- 'dir': self.__parse_dir_sig,
- 'pts': self.__parse_pts,
- 'vts': self.__parse_vts,
- 'evts': self.__parse_event_synch,
- 'ets': self.__parse_event,
- 'ac': self.__parse_accelerometer,
- 'gy': self.__parse_gyroscope,
- 'gidx': self.__parse_pupill_or_gaze,
- 'marker3d': self.__parse_marker_position
- }
-
- self.__parse_pupill_or_gaze_map = {
- 'pc': self.__parse_pupill_center,
- 'pd': self.__parse_pupill_diameter,
- 'gd': self.__parse_gaze_direction,
- 'l': self.__parse_gaze_position,
- 'gp3': self.__parse_gaze_position_3d
- }
-
- def parse(self, data):
-
- json_data = json.loads(data.decode('utf-8'))
-
- # Parse data status
- status = json_data.pop('s', -1)
-
- # Parse timestamp
- data_ts = json_data.pop('ts')
-
- # Parse data depending first json key
- first_key = next(iter(json_data))
-
- # Convert json data into data object
- data_object = self.__parse_data_map[first_key](status, json_data)
- data_object_type = type(data_object).__name__
-
- # Keep first timestamp to offset all timestamps
- if self.__first_ts == 0:
- self.__first_ts = data_ts
-
- data_ts -= self.__first_ts
-
- return data_ts, data_object, data_object_type
-
- def __parse_pupill_or_gaze(self, status, json_data):
-
- gaze_index = json_data.pop('gidx')
-
- # parse pupill or gaze data depending second json key
- second_key = next(iter(json_data))
-
- return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, json_data)
-
- def __parse_dir_sig(self, status, json_data):
-
- return DirSig(json_data['dir'], json_data['sig'])
-
- def __parse_pts(self, status, json_data):
-
- return PresentationTimeStamp(json_data['pts'])
-
- def __parse_vts(self, status, json_data):
-
- # ts is not sent when recording
- try:
-
- ts = json_data['ts']
-
- except KeyError:
-
- ts = -1
-
- return VideoTimeStamp(json_data['vts'], ts)
-
- def __parse_event_synch(self, status, json_data):
-
- return EventSynch(json_data['evts'])
-
- def __parse_event(self, status, json_data):
-
- return Event(json_data['ets'], json_data['type'], json_data['tag'])
-
- def __parse_accelerometer(self, status, json_data):
-
- return Accelerometer(json_data['ac'])
-
- def __parse_gyroscope(self, status, json_data):
-
- return Gyroscope(json_data['gy'])
-
- def __parse_pupill_center(self, status, gaze_index, json_data):
-
- return PupillCenter(status, gaze_index, json_data['pc'], json_data['eye'])
-
- def __parse_pupill_diameter(self, status, gaze_index, json_data):
-
- return PupillDiameter(status, gaze_index, json_data['pd'], json_data['eye'])
-
- def __parse_gaze_direction(self, status, gaze_index, json_data):
-
- return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye'])
-
- def __parse_gaze_position(self, status, gaze_index, json_data):
-
- return GazePosition(status, gaze_index, json_data['l'], json_data['gp'])
-
- def __parse_gaze_position_3d(self, status, gaze_index, json_data):
-
- return GazePosition3D(status, gaze_index, json_data['gp3'])
-
- def __parse_marker_position(self, status, json_data):
-
- return MarkerPosition(json_data['marker3d'], json_data['marker2d'])
-
-class Provider(DataFeatures.PipelineInputProvider):
-
- @DataFeatures.PipelineStepInit
- def __init__(self, **kwargs):
-
- # Init parent classes
- super().__init__()
-
- # Init private attributes
- self.__address = None
- self.__udpport = 49152
-
- self.__project_name = None
- self.__project_id = None
-
- self.__participant_name = None
- self.__participant_id = None
-
- self.__configuration = {}
-
- self.__parser = TobiiJsonDataParser()
-
- @property
- def address(self) -> str:
- """Network address where to find the device."""
- return self.__address
-
- @address.setter
- def address(self, address:str):
-
- self.__address = address
-
- # Remove part after % on under Windows
- if "%" in self.__address:
-
- if sys.platform == "win32":
-
- self.__address = self.__address.split("%")[0]
-
- # Define base url
- if ':' in self.__address:
-
- self.__base_url = f'http://[{self.__address}]'
-
- else:
-
- self.__base_url = 'http://' + self.__address
-
- @property
- def configuration(self)-> dict:
- """Patch system configuration dictionary."""
- return self.__configuration
-
- @configuration.setter
- def configuration(self, configuration:dict):
-
- self.__configuration = configuration
-
- @property
- def project(self) -> str:
- """Project name."""
- return self.__project_name
-
- @project.setter
- def project(self, project:str):
-
- self.__project_name = project
-
- def __bind_project(self):
- """Bind to a project or create one if it doesn't exist."""
-
- if self.__project_name is None:
-
- raise Exception(f'Project binding fails: setup project before.')
-
- self.__project_id = None
-
- # Check if project exist
- projects = self.__get_request('/api/projects')
-
- for project in projects:
-
- try:
-
- if project['pr_info']['Name'] == self.__project_name:
-
- self.__project_id = project['pr_id']
-
- logging.debug('> %s project already exist: %s', self.__project_name, self.__project_id)
-
- except:
-
- pass
-
- # The project doesn't exist, create one
- if self.__project_id is None:
-
- logging.debug('> %s project doesn\'t exist', self.__project_name)
-
- data = {
- 'pr_info' : {
- 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD),
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)),
- 'Name': self.__project_name
- },
- 'pr_created': self.__get_current_datetime()
- }
-
- json_data = self.__post_request('/api/projects', data)
-
- self.__project_id = json_data['pr_id']
-
- logging.debug('> new %s project created: %s', self.__project_name, self.__project_id)
-
- @property
- def participant(self)-> str:
- """Participant name"""
- return self.__participant_name
-
- @participant.setter
- def participant(self, participant:str):
-
- self.__participant_name = participant
-
- def __bind_participant(self):
- """Bind to a participant or create one if it doesn't exist.
-
- !!! warning
- Bind to a project before.
- """
-
- if self.__participant_name is None:
-
- raise Exception(f'Participant binding fails: setup participant before.')
-
- if self.__project_id is None :
-
- raise Exception(f'Participant binding fails: bind to a project before')
-
- self.__participant_id = None
-
- # Check if participant exist
- participants = self.__get_request('/api/participants')
-
- for participant in participants:
-
- try:
-
- if participant['pa_info']['Name'] == self.__participant_name:
-
- self.__participant_id = participant['pa_id']
-
- logging.debug('> %s participant already exist: %s', self.__participant_name, self.__participant_id)
-
- except:
-
- pass
-
- # The participant doesn't exist, create one
- if self.__participant_id is None:
-
- logging.debug('> %s participant doesn\'t exist', self.__participant_name)
-
- data = {
- 'pa_project': self.__project_id,
- 'pa_info': {
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__participant_name)),
- 'Name': self.__participant_name,
- 'Notes': '' # TODO: set participant notes
- },
- 'pa_created': self.__get_current_datetime()
- }
-
- json_data = self.__post_request('/api/participants', data)
-
- self.__participant_id = json_data['pa_id']
-
- logging.debug('> new %s participant created: %s', self.__participant_name, self.__participant_id)
-
- def __enter__(self):
-
- logging.info('Tobii Pro Glasses 2 connexion starts...')
- logging.debug('TobiiProGlasses2.Provider.__enter__')
-
- # Update current configuration with configuration patch
- logging.debug('> updating configuration')
-
- configuration = self.__get_request('/api/system/conf')
-
- if self.__configuration:
-
- configuration.update(self.__configuration)
- configuration = self.__post_request('/api/system/conf', configuration)
-
- # Log current configuration
- logging.info('Tobii Pro Glasses 2 configuration:')
-
- for key, value in configuration.items():
-
- logging.info('%s: %s', key, str(value))
-
- # Store video stream dimension
- self.__video_width = configuration['sys_sc_width']
- self.__video_height = configuration['sys_sc_height']
-
- # Bind to project if required
- if self.__project_name is not None:
-
- logging.debug('> binding project %s', self.__project_name)
-
- self.__bind_project()
-
- logging.info('Tobii Pro Glasses 2 project id: %s', self.__project_id)
-
- # Bind to participant if required
- if self.__participant_name is not None:
-
- logging.debug('> binding participant %s', self.__participant_name)
-
- self.__bind_participant()
-
- logging.info('Tobii Pro Glasses 2 participant id: %s', self.__participant_id)
-
- # Create stop event
- self.__stop_event = threading.Event()
-
- # Open data stream
- self.__data_socket = self.__make_socket()
- self.__data_thread = threading.Thread(target = self.__stream_data)
- self.__data_thread.daemon = True
-
- logging.debug('> starting data thread...')
- self.__data_thread.start()
-
- # Open video stream
- self.__video_socket = self.__make_socket()
- self.__video_thread = threading.Thread(target = self.__stream_video)
- self.__video_thread.daemon = True
-
- logging.debug('> starting video thread...')
- self.__video_thread.start()
-
- # Keep connection alive
- self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
- self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
- self.__keep_alive_thread.daemon = True
-
- logging.debug('> starting keep alive thread...')
- self.__keep_alive_thread.start()
-
- return self
-
- def __exit__(self, exception_type, exception_value, exception_traceback):
-
- logging.debug('TobiiProGlasses2.Provider.__exit__')
-
- # Close data stream
- self.__stop_event.set()
-
- # Stop keeping connection alive
- threading.Thread.join(self.__keep_alive_thread)
- self.__keep_alive_thread = None
-
- # Stop data streaming
- threading.Thread.join(self.__data_thread)
- self.__data_thread = None
-
- def __make_socket(self):
- """Create a socket to enable network communication."""
-
- iptype = socket.AF_INET
-
- if ':' in self.__address:
-
- iptype = socket.AF_INET6
-
- res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)
- family, socktype, proto, canonname, sockaddr = res[0]
- new_socket = socket.socket(family, socktype, proto)
-
- new_socket.settimeout(5.0)
-
- try:
-
- if iptype == socket.AF_INET6:
-
- new_socket.setsockopt(socket.SOL_SOCKET, 25, 1)
-
- except socket.error as e:
-
- if e.errno == 1:
-
- logging.error('Binding to a network interface is permitted only for root users.')
-
- return new_socket
-
- def __stream_data(self):
- """Stream data from dedicated socket."""
-
- logging.debug('TobiiProGlasses2.Provider.__stream_data')
-
- while not self.__stop_event.is_set():
-
- try:
-
- data, _ = self.__data_socket.recvfrom(1024)
-
- except TimeoutError:
-
- logging.error('> timeout occurred while receiving data')
-
- if data is not None:
-
- # Parse json into timestamped data object
- data_ts, data_object, data_object_type = self.__parser.parse(data)
-
- # Edit millisecond timestamp
- timestamp = int(data_ts * 1e-3)
-
- match data_object_type:
-
- case 'GazePosition':
-
- # When gaze position is valid
- if data_object.validity == 0:
-
- # Edit timestamped gaze position
- timestamped_gaze_position = GazeFeatures.GazePosition((int(data_object.value[0] * self.__video_width), int(data_object.value[1] * self.__video_height)), timestamp=timestamp)
-
- # DEBUG
- print('TobiiProGlasses2.__stream_data', timestamped_gaze_position)
- #self.gaze_position_callback(timestamped_gaze_position)
-
- else:
-
- # Edit empty gaze position
- empty_gaze_position = GazeFeatures.GazePosition(timestamp=timestamp)
-
- # DEBUG
- print('TobiiProGlasses2.__stream_data', empty_gaze_position)
- #self.gaze_position_callback(empty_gaze_position)
-
- def __stream_video(self):
- """Stream video from dedicated socket."""
-
- logging.debug('TobiiProGlasses2.Provider.__stream_video')
-
- container = av.open(f'rtsp://{self.__address}:8554/live/scene', options={'rtsp_transport': 'tcp'})
- self.__stream = container.streams.video[0]
- #self.__buffer = collections.OrderedDict()
-
- for image in container.decode(self.__stream):
-
- logging.debug('> new image decoded')
-
- # Quit if the video acquisition thread have been stopped
- if self.__stop_event.is_set():
-
- logging.debug('> stop event is set')
- break
-
- if image is not None:
-
- timestamp = int(image.time * 1e6)
-
- logging.debug('> image timestamp: %d', image.time)
- '''
- # Select callback reading mode
- if len(self.reading_callbacks) > 0:
-
- # Lock data subcription
- self.__subcription_lock.acquire()
-
- # Share incoming data to all subscribers
- for callback in self.reading_callbacks:
-
- callback(timestamp, image.to_ndarray(format='bgr24'))
-
- # Unlock data subscription
- self.__subcription_lock.release()
-
- # Select buffer reading mode
- else:
-
- # Lock buffer access
- self.__buffer_lock.acquire()
-
- # Decoding image and storing at time index
- self.__buffer[timestamp] = image.to_ndarray(format='bgr24')
-
- # Unlock buffer access
- self.__buffer_lock.release()
- '''
- def __keep_alive(self):
- """Maintain network connection."""
-
- logging.debug('TobiiProGlasses2.Provider.__keep_alive')
-
- while not self.__stop_event.is_set():
-
- self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
- self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
-
- time.sleep(1)
-
- def __get_request(self, api_action) -> str:
- """Send a GET request and get data back."""
-
- url = self.__base_url + api_action
-
- logging.debug('TobiiProGlasses2.Provider.__get_request %s', url)
-
- res = urlopen(url).read()
-
- try:
-
- data = json.loads(res.decode('utf-8'))
-
- except json.JSONDecodeError:
-
- data = None
-
- logging.debug('TobiiProGlasses2.Provider.__get_request received: %s', data)
-
- return data
-
- def __post_request(self, api_action, data = None, wait_for_response = True) -> str:
- """Send a POST request and get result back."""
-
- logging.debug('TobiiProGlasses2.Provider.__post_request %s', api_action)
-
- url = self.__base_url + api_action
- req = Request(url)
- req.add_header('Content-Type', 'application/json')
- data = json.dumps(data)
-
- if wait_for_response is False:
-
- threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start()
-
- return None
-
- response = urlopen(req, data.encode('utf-8'))
- res = response.read()
-
- try:
-
- res = json.loads(res.decode('utf-8'))
-
- except:
-
- pass
-
- return res
-
- def __wait_for_status(self, api_action, key, values, timeout = None) -> any:
- """Wait until a status matches given values."""
-
- url = self.__base_url + api_action
- running = True
-
- while running:
-
- req = Request(url)
- req.add_header('Content-Type', 'application/json')
-
- try:
-
- response = urlopen(req, None, timeout = timeout)
-
- except URLError as e:
-
- logging.error(e.reason)
- return -1
-
- data = response.read()
- json_data = json.loads(data.decode('utf-8'))
-
- if json_data[key] in values:
- running = False
-
- time.sleep(1)
-
- return json_data[key]
-
- def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT):
-
- return datetime.datetime.now().replace(microsecond=0).strftime(timeformat)
-
- # CALIBRATION
-
- def calibration_start(self, project_name, participant_name):
- """Start calibration process for project and participant."""
-
- project_id = self.__get_project_id(project_name)
- participant_id = self.get_participant_id(participant_name)
-
- # Init calibration id
- self.__calibration_id = None
-
- # Calibration have to be done for a project and a participant
- if project_id is None or participant_id is None:
-
- raise Exception(f'Setup project and participant before')
-
- data = {
- 'ca_project': project_id,
- 'ca_type': 'default',
- 'ca_participant': participant_id,
- 'ca_created': self.__get_current_datetime()
- }
-
- # Request calibration
- json_data = self.__post_request('/api/calibrations', data)
- self.__calibration_id = json_data['ca_id']
-
- # Start calibration
- self.__post_request('/api/calibrations/' + self.__calibration_id + '/start')
-
- def calibration_status(self) -> str:
- """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed."""
-
- if self.__calibration_id is not None:
-
- status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
-
- # Forget calibration id
- if status != 'calibrating':
-
- self.__calibration_id = None
-
- return status
-
- else:
-
- raise Exception(f'Start calibration before')
-
- def calibrate(self, project_name, participant_name):
- """Handle whole Tobii glasses calibration process."""
-
- # Start calibration
- self.calibration_start(project_name, participant_name)
-
- # While calibrating...
- status = self.calibration_status()
-
- while status == 'calibrating':
-
- time.sleep(1)
- status = self.calibration_status()
-
- if status == 'uncalibrated' or status == 'stale' or status == 'failed':
-
- raise Exception(f'Calibration {status}')
-
- # CALIBRATION
-
- def calibration_start(self, project_name, participant_name):
- """Start calibration process for project and participant."""
-
- project_id = self.__get_project_id(project_name)
- participant_id = self.get_participant_id(participant_name)
-
- # Init calibration id
- self.__calibration_id = None
-
- # Calibration have to be done for a project and a participant
- if project_id is None or participant_id is None:
-
- raise Exception(f'Setup project and participant before')
-
- data = {
- 'ca_project': project_id,
- 'ca_type': 'default',
- 'ca_participant': participant_id,
- 'ca_created': self.__get_current_datetime()
- }
-
- # Request calibration
- json_data = self.__post_request('/api/calibrations', data)
- self.__calibration_id = json_data['ca_id']
-
- # Start calibration
- self.__post_request('/api/calibrations/' + self.__calibration_id + '/start')
-
- def calibration_status(self) -> str:
- """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed."""
-
- if self.__calibration_id is not None:
-
- status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
-
- # Forget calibration id
- if status != 'calibrating':
-
- self.__calibration_id = None
-
- return status
-
- else:
-
- raise Exception(f'Start calibration before')
-
- def calibrate(self, project_name, participant_name):
- """Handle whole Tobii glasses calibration process."""
-
- # Start calibration
- self.calibration_start(project_name, participant_name)
-
- # While calibrating...
- status = self.calibration_status()
-
- while status == 'calibrating':
-
- time.sleep(1)
- status = self.calibration_status()
-
- if status == 'uncalibrated' or status == 'stale' or status == 'failed':
-
- raise Exception(f'Calibration {status}')
-
- # RECORDING FEATURES
-
- def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']):
- return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array)
-
- def create_recording(self, participant_name, recording_name = '', recording_notes = '') -> str:
- """Create a new recording.
-
- Returns:
- recording id
- """
-
- participant_id = self.get_participant_id(participant_name)
-
- if participant_id is None:
- raise NameError(f'{participant_name} participant doesn\'t exist')
-
- data = {
- 'rec_participant': participant_id,
- 'rec_info': {
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)),
- 'Name': recording_name,
- 'Notes': recording_notes
- },
- 'rec_created': self.__get_current_datetime()
- }
-
- json_data = self.__post_request('/api/recordings', data)
-
- return json_data['rec_id']
-
- def start_recording(self, recording_id) -> bool:
- """Start recording on the Tobii interface's SD Card."""
-
- self.__post_request('/api/recordings/' + recording_id + '/start')
- return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording'
-
- def stop_recording(self, recording_id) -> bool:
- """Stop recording on the Tobii interface's SD Card."""
-
- self.__post_request('/api/recordings/' + recording_id + '/stop')
- return self.__wait_for_recording_status(recording_id, ['done']) == "done"
-
- def pause_recording(self, recording_id) -> bool:
- """Pause recording on the Tobii interface's SD Card."""
-
- self.__post_request('/api/recordings/' + recording_id + '/pause')
- return self.__wait_for_recording_status(recording_id, ['paused']) == "paused"
-
- def __get_recording_status(self):
- return self.get_status()['sys_recording']
-
- def get_current_recording_id(self) -> str:
- """Get current recording id."""
-
- return self.__get_recording_status()['rec_id']
-
- @property
- def recording(self) -> bool:
- """Is it recording?"""
-
- rec_status = self.__get_recording_status()
-
- if rec_status != {}:
- if rec_status['rec_state'] == "recording":
- return True
-
- return False
-
- def get_recordings(self) -> str:
- """Get all recordings id."""
-
- return self.__get_request('/api/recordings')
-
- # EVENTS AND EXPERIMENTAL VARIABLES
-
- def __post_recording_data(self, event_type: str, event_tag = ''):
- data = {'type': event_type, 'tag': event_tag}
- self.__post_request('/api/events', data, wait_for_response=False)
-
- def send_event(self, event_type: str, event_value = None):
- self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value))
-
- def send_variable(self, variable_name: str, variable_value = None):
- self.__post_recording_data(str(variable_name), str(variable_value))
-
- # MISC
-
- def eject_sd(self):
- self.__get_request('/api/eject')
-
- def get_battery_info(self):
- return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) )
-
- def get_battery_level(self):
- return self.get_battery_status()['level']
-
- def get_battery_remaining_time(self):
- return self.get_battery_status()['remaining_time']
-
- def get_battery_status(self):
- return self.get_status()['sys_battery']
-
- def get_et_freq(self):
- return self.get_configuration()['sys_et_freq']
-
- def get_et_frequencies(self):
- return self.get_status()['sys_et']['frequencies']
-
- def identify(self):
- self.__get_request('/api/identify')
-
- def get_configuration(self):
- return self.__get_request('/api/system/conf')
-
- def get_status(self):
- return self.__get_request('/api/system/status')
-
- def get_storage_info(self):
- return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) )
-
- def get_storage_remaining_time(self):
- return self.get_storage_status()['remaining_time']
-
- def get_storage_status(self):
- return self.get_status()['sys_storage']
-
- def get_scene_camera_freq(self):
- return self.get_configuration()['sys_sc_fps']
-
- def set_et_freq_50(self):
- data = {'sys_et_freq': 50}
- json_data = self.__post_request('/api/system/conf', data)
-
- def set_et_freq_100(self):
- # May not be available. Check get_et_frequencies() first.
- data = {'sys_et_freq': 100}
- json_data = self.__post_request('/api/system/conf', data)
-
- def set_eye_camera_indoor_preset(self) -> str:
- data = {'sys_ec_preset': 'Indoor'}
- return self.__post_request('/api/system/conf', data)
-
- def set_eye_camera_outdoor_preset(self) -> str:
- data = {'sys_ec_preset': 'ClearWeather'}
- return self.__post_request('/api/system/conf', data)
-
- def set_scene_camera_auto_preset(self):
- data = {'sys_sc_preset': 'Auto'}
- json_data = self.__post_request('/api/system/conf', data)
-
- def set_scene_camera_gaze_preset(self):
- data = {'sys_sc_preset': 'GazeBasedExposure'}
- json_data = self.__post_request('/api/system/conf', data)
-
- def set_scene_camera_freq_25(self):
- data = {'sys_sc_fps': 25}
- json_data = self.__post_request('/api/system/conf/', data)
-
- def set_scene_camera_freq_50(self):
- data = {'sys_sc_fps': 50}
- json_data = self.__post_request('/api/system/conf/', data)
diff --git a/src/argaze/utils/Providers/__init__.py b/src/argaze/utils/Providers/__init__.py
deleted file mode 100644
index b76cd8b..0000000
--- a/src/argaze/utils/Providers/__init__.py
+++ /dev/null
@@ -1,4 +0,0 @@
-"""
-Collection of device interfaces.
-"""
-__all__ = ['TobiiProGlasses2']
\ No newline at end of file
diff --git a/src/argaze/utils/demo/A3_demo.pdf b/src/argaze/utils/demo/A3_demo.pdf
new file mode 100644
index 0000000..cc51bc2
Binary files /dev/null and b/src/argaze/utils/demo/A3_demo.pdf differ
diff --git a/src/argaze/utils/demo/aoi_2d_scene.json b/src/argaze/utils/demo/aoi_2d_scene.json
new file mode 100644
index 0000000..ac58b63
--- /dev/null
+++ b/src/argaze/utils/demo/aoi_2d_scene.json
@@ -0,0 +1,18 @@
+{
+ "BlueTriangle":[[960, 664], [1113, 971], [806, 971]],
+ "RedSquare": {
+ "Rectangle": {
+ "x": 268,
+ "y": 203,
+ "width": 308,
+ "height": 308
+ }
+ },
+ "GreenCircle": {
+ "Circle": {
+ "cx": 1497,
+ "cy": 356,
+ "radius": 153
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/argaze/utils/demo/aoi_3d_scene.obj b/src/argaze/utils/demo/aoi_3d_scene.obj
new file mode 100644
index 0000000..0ce97de
--- /dev/null
+++ b/src/argaze/utils/demo/aoi_3d_scene.obj
@@ -0,0 +1,7 @@
+o GrayRectangle
+v 0.000000 0.000000 0.000000
+v 25.000000 0.000000 0.000000
+v 0.000000 14.960000 0.000000
+v 25.000000 14.960000 0.000000
+s off
+f 1 2 4 3
diff --git a/src/argaze/utils/demo/aruco_markers_group.json b/src/argaze/utils/demo/aruco_markers_group.json
new file mode 100644
index 0000000..e103d14
--- /dev/null
+++ b/src/argaze/utils/demo/aruco_markers_group.json
@@ -0,0 +1,25 @@
+{
+ "dictionary": "DICT_APRILTAG_16h5",
+ "places": {
+ "0": {
+ "translation": [-2.5, 17.5, 0],
+ "rotation": [0.0, 0.0, 0.0],
+ "size": 5
+ },
+ "1": {
+ "translation": [27.5, 17.5, 0],
+ "rotation": [0.0, 0.0, 0.0],
+ "size": 5
+ },
+ "2": {
+ "translation": [-2.5, -2.5, 0],
+ "rotation": [0.0, 0.0, 0.0],
+ "size": 5
+ },
+ "3": {
+ "translation": [27.5, -2.5, 0],
+ "rotation": [0.0, 0.0, 0.0],
+ "size": 5
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/argaze/utils/demo/aruco_markers_group.obj b/src/argaze/utils/demo/aruco_markers_group.obj
new file mode 100644
index 0000000..83935ef
--- /dev/null
+++ b/src/argaze/utils/demo/aruco_markers_group.obj
@@ -0,0 +1,32 @@
+o DICT_APRILTAG_16h5#0_Marker
+v -5.000000 14.960000 0.000000
+v 0.000000 14.960000 0.000000
+v -5.000000 19.959999 0.000000
+v 0.000000 19.959999 0.000000
+vn 0.0000 0.0000 1.0000
+s off
+f 1//1 2//1 4//1 3//1
+o DICT_APRILTAG_16h5#1_Marker
+v 25.000000 14.960000 0.000000
+v 30.000000 14.960000 0.000000
+v 25.000000 19.959999 0.000000
+v 30.000000 19.959999 0.000000
+vn 0.0000 0.0000 1.0000
+s off
+f 5//2 6//2 8//2 7//2
+o DICT_APRILTAG_16h5#2_Marker
+v -5.000000 -5.000000 0.000000
+v 0.000000 -5.000000 0.000000
+v -5.000000 0.000000 0.000000
+v 0.000000 0.000000 0.000000
+vn 0.0000 0.0000 1.0000
+s off
+f 9//3 10//3 12//3 11//3
+o DICT_APRILTAG_16h5#3_Marker
+v 25.000000 -5.000000 0.000000
+v 30.000000 -5.000000 0.000000
+v 25.000000 0.000000 0.000000
+v 30.000000 0.000000 0.000000
+vn 0.0000 0.0000 1.0000
+s off
+f 13//4 14//4 16//4 15//4
diff --git a/src/argaze/utils/demo/aruco_markers_pipeline.json b/src/argaze/utils/demo/aruco_markers_pipeline.json
new file mode 100644
index 0000000..a4fe400
--- /dev/null
+++ b/src/argaze/utils/demo/aruco_markers_pipeline.json
@@ -0,0 +1,64 @@
+{
+ "argaze.ArUcoMarkers.ArUcoCamera.ArUcoCamera": {
+ "name": "demo_camera",
+ "size": [1920, 1080],
+ "aruco_detector": {
+ "dictionary": "DICT_APRILTAG_16h5",
+ "parameters": {
+ "useAruco3Detection": 1
+ }
+ },
+ "sides_mask": 420,
+ "layers": {
+ "demo_layer": {}
+ },
+ "image_parameters": {
+ "background_weight": 1,
+ "draw_layers": {
+ "demo_layer": {
+ "draw_aoi_scene": {
+ "draw_aoi": {
+ "color": [255, 255, 255],
+ "border_size": 1
+ }
+ }
+ }
+ },
+ "draw_gaze_positions": {
+ "color": [0, 255, 255],
+ "size": 4
+ },
+ "draw_detected_markers": {
+ "color": [0, 255, 0],
+ "draw_axes": {
+ "thickness": 3
+ }
+ },
+ "draw_scenes": {
+ "demo_scene": {
+ "draw_aruco_markers_group": {
+ "draw_axes": {
+ "thickness": 3,
+ "length": 10
+ }
+ }
+ }
+ }
+ },
+ "scenes": {
+ "demo_scene" : {
+ "aruco_markers_group": "aruco_markers_group.json",
+ "layers": {
+ "demo_layer" : {
+ "aoi_scene": "aoi_3d_scene.obj"
+ }
+ },
+ "frames": {
+ "GrayRectangle": "gaze_analysis_pipeline.json"
+ },
+ "angle_tolerance": 15.0,
+ "distance_tolerance": 2.54
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/argaze/utils/demo/demo.mov b/src/argaze/utils/demo/demo.mov
new file mode 100644
index 0000000..bba7999
Binary files /dev/null and b/src/argaze/utils/demo/demo.mov differ
diff --git a/src/argaze/utils/demo/eyetracker_setup.json b/src/argaze/utils/demo/eyetracker_setup.json
new file mode 100644
index 0000000..70f85e4
--- /dev/null
+++ b/src/argaze/utils/demo/eyetracker_setup.json
@@ -0,0 +1,21 @@
+{
+ "argaze.utils.eyetrackers.TobiiProGlasses2.LiveStream" : {
+ "name": "Tobii Pro Glasses 2 live stream",
+ "address": "10.34.0.17",
+ "project": "MyProject",
+ "participant": "NewParticipant",
+ "configuration": {
+ "sys_ec_preset": "Indoor",
+ "sys_sc_width": 1920,
+ "sys_sc_height": 1080,
+ "sys_sc_fps": 25,
+ "sys_sc_preset": "Auto",
+ "sys_et_freq": 50,
+ "sys_mems_freq": 100
+ },
+ "pipeline": "aruco_markers_pipeline.json",
+ "image_parameters": {
+ "draw_exceptions": true
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/argaze/utils/demo/frame_background.jpg b/src/argaze/utils/demo/frame_background.jpg
new file mode 100644
index 0000000..7aabe63
Binary files /dev/null and b/src/argaze/utils/demo/frame_background.jpg differ
diff --git a/src/argaze/utils/demo/gaze_analysis_pipeline.json b/src/argaze/utils/demo/gaze_analysis_pipeline.json
new file mode 100644
index 0000000..07b7e78
--- /dev/null
+++ b/src/argaze/utils/demo/gaze_analysis_pipeline.json
@@ -0,0 +1,135 @@
+{
+ "argaze.ArFeatures.ArFrame": {
+ "name": "GrayRectangle",
+ "size": [1920, 1149],
+ "background": "frame_background.jpg",
+ "gaze_movement_identifier": {
+ "argaze.GazeAnalysis.DispersionThresholdIdentification.GazeMovementIdentifier": {
+ "deviation_max_threshold": 50,
+ "duration_min_threshold": 200
+ }
+ },
+ "filter_in_progress_identification": false,
+ "scan_path": {
+ "duration_max": 10000
+ },
+ "scan_path_analyzers": {
+ "argaze.GazeAnalysis.Basic.ScanPathAnalyzer": {},
+ "argaze.GazeAnalysis.KCoefficient.ScanPathAnalyzer": {},
+ "argaze.GazeAnalysis.NearestNeighborIndex.ScanPathAnalyzer": {
+ "size": [1920, 1149]
+ },
+ "argaze.GazeAnalysis.ExploreExploitRatio.ScanPathAnalyzer": {
+ "short_fixation_duration_threshold": 0
+ }
+ },
+ "heatmap": {
+ "size": [320, 240]
+ },
+ "layers": {
+ "demo_layer": {
+ "aoi_scene": "aoi_2d_scene.json",
+ "aoi_matcher": {
+ "argaze.GazeAnalysis.DeviationCircleCoverage.AOIMatcher": {
+ "coverage_threshold": 0.5
+ }
+ },
+ "aoi_scan_path": {
+ "duration_max": 30000
+ },
+ "aoi_scan_path_analyzers": {
+ "argaze.GazeAnalysis.Basic.AOIScanPathAnalyzer": {},
+ "argaze.GazeAnalysis.TransitionMatrix.AOIScanPathAnalyzer": {},
+ "argaze.GazeAnalysis.KCoefficient.AOIScanPathAnalyzer": {},
+ "argaze.GazeAnalysis.LempelZivComplexity.AOIScanPathAnalyzer": {},
+ "argaze.GazeAnalysis.NGram.AOIScanPathAnalyzer": {
+ "n_min": 3,
+ "n_max": 3
+ },
+ "argaze.GazeAnalysis.Entropy.AOIScanPathAnalyzer":{}
+ },
+ "observers": {
+ "loggers.AOIScanPathAnalysisLogger": {
+ "path": "_export/logs/aoi_scan_path_metrics.csv",
+ "header": ["Timestamp (ms)", "Duration (ms)", "Step", "K", "LZC"]
+ }
+ }
+ }
+ },
+ "image_parameters": {
+ "background_weight": 1,
+ "heatmap_weight": 0.5,
+ "draw_scan_path": {
+ "draw_fixations": {
+ "deviation_circle_color": [255, 0, 255],
+ "duration_border_color": [127, 0, 127],
+ "duration_factor": 1e-2
+ },
+ "draw_saccades": {
+ "line_color": [255, 0, 255]
+ }
+ },
+ "draw_layers": {
+ "demo_layer": {
+ "draw_aoi_scene": {
+ "draw_aoi": {
+ "color": [255, 255, 255],
+ "border_size": 1
+ }
+ },
+ "draw_aoi_matching": {
+ "draw_matched_fixation": {
+ "deviation_circle_color": [255, 255, 255],
+ "draw_positions": {
+ "position_color": [0, 255, 0],
+ "line_color": [0, 0, 0]
+ }
+ },
+ "draw_matched_region": {
+ "color": [0, 255, 0],
+ "border_size": 4
+ },
+ "draw_looked_aoi": {
+ "color": [0, 255, 0],
+ "border_size": 2
+ },
+ "looked_aoi_name_color": [255, 255, 255],
+ "looked_aoi_name_offset": [0, -10]
+ }
+ }
+ },
+ "draw_fixations": {
+ "deviation_circle_color": [255, 255, 255],
+ "duration_border_color": [127, 0, 127],
+ "duration_factor": 1e-2,
+ "draw_positions": {
+ "position_color": [0, 255, 255],
+ "line_color": [0, 0, 0]
+ }
+ },
+ "draw_saccades": {
+ "line_color": [255, 0, 255]
+ },
+ "draw_gaze_positions": {
+ "color": [0, 255, 255],
+ "size": 2
+ }
+ },
+ "observers": {
+ "loggers.FixationLogger": {
+ "path": "_export/logs/fixations.csv",
+ "header": ["Timestamp (ms)", "Focus (px)", "Duration (ms)", "AOI"]
+ },
+ "loggers.ScanPathAnalysisLogger": {
+ "path": "_export/logs/scan_path_metrics.csv",
+ "header": ["Timestamp (ms)", "Duration (ms)", "Step", "K", "NNI", "XXR"]
+ },
+ "loggers.VideoRecorder": {
+ "path": "_export/logs/video.mp4",
+ "width": 1920,
+ "height": 1080,
+ "fps": 15
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/argaze/utils/demo/loggers.py b/src/argaze/utils/demo/loggers.py
new file mode 100644
index 0000000..5f1986e
--- /dev/null
+++ b/src/argaze/utils/demo/loggers.py
@@ -0,0 +1,84 @@
+"""
+
+This program is free software: you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation, either version 3 of the License, or (at your option) any later
+version.
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+You should have received a copy of the GNU General Public License along with
+this program. If not, see .
+"""
+
+__author__ = "Théo de la Hogue"
+__credits__ = []
+__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
+__license__ = "GPLv3"
+
+from argaze import DataFeatures, GazeFeatures
+from argaze.GazeAnalysis import *
+from argaze.utils import UtilsFeatures
+
+class FixationLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter):
+
+ def on_look(self, timestamp, frame, exception):
+ """Log frame fixations."""
+
+ # Log fixations
+ if GazeFeatures.is_fixation(frame.last_gaze_movement()) and frame.last_gaze_movement().is_finished():
+
+ log = (
+ timestamp,
+ frame.last_gaze_movement().focus,
+ frame.last_gaze_movement().duration,
+ frame.layers['demo_layer'].last_looked_aoi_name()
+ )
+
+ self.write(log)
+
+class ScanPathAnalysisLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter):
+
+ def on_look(self, timestamp, frame, exception):
+ """Log frame scan path metrics."""
+
+ if frame.is_analysis_available():
+
+ analysis = frame.analysis()
+
+ log = (
+ timestamp,
+ analysis[Basic.ScanPathAnalyzer].path_duration,
+ analysis[Basic.ScanPathAnalyzer].steps_number,
+ analysis[KCoefficient.ScanPathAnalyzer].K,
+ analysis[NearestNeighborIndex.ScanPathAnalyzer].nearest_neighbor_index,
+ analysis[ExploreExploitRatio.ScanPathAnalyzer].explore_exploit_ratio
+ )
+
+ self.write(log)
+
+class VideoRecorder(DataFeatures.PipelineStepObserver, UtilsFeatures.VideoWriter):
+
+ def on_look(self, timestamp, frame, exception):
+ """Write frame image."""
+
+ self.write(frame.image())
+
+class AOIScanPathAnalysisLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter):
+
+ def on_look(self, timestamp, layer, exception):
+ """Log layer aoi scan path metrics"""
+
+ if layer.is_analysis_available():
+
+ analysis = layer.analysis()
+
+ log = (
+ timestamp,
+ analysis[Basic.AOIScanPathAnalyzer].path_duration,
+ analysis[Basic.AOIScanPathAnalyzer].steps_number,
+ analysis[KCoefficient.AOIScanPathAnalyzer].K,
+ analysis[LempelZivComplexity.AOIScanPathAnalyzer].lempel_ziv_complexity
+ )
+
+ self.write(log)
diff --git a/src/argaze/utils/demo_data/A3_demo.pdf b/src/argaze/utils/demo_data/A3_demo.pdf
deleted file mode 100644
index cc51bc2..0000000
Binary files a/src/argaze/utils/demo_data/A3_demo.pdf and /dev/null differ
diff --git a/src/argaze/utils/demo_data/aoi_2d_scene.json b/src/argaze/utils/demo_data/aoi_2d_scene.json
deleted file mode 100644
index ac58b63..0000000
--- a/src/argaze/utils/demo_data/aoi_2d_scene.json
+++ /dev/null
@@ -1,18 +0,0 @@
-{
- "BlueTriangle":[[960, 664], [1113, 971], [806, 971]],
- "RedSquare": {
- "Rectangle": {
- "x": 268,
- "y": 203,
- "width": 308,
- "height": 308
- }
- },
- "GreenCircle": {
- "Circle": {
- "cx": 1497,
- "cy": 356,
- "radius": 153
- }
- }
-}
\ No newline at end of file
diff --git a/src/argaze/utils/demo_data/aoi_3d_scene.obj b/src/argaze/utils/demo_data/aoi_3d_scene.obj
deleted file mode 100644
index 0ce97de..0000000
--- a/src/argaze/utils/demo_data/aoi_3d_scene.obj
+++ /dev/null
@@ -1,7 +0,0 @@
-o GrayRectangle
-v 0.000000 0.000000 0.000000
-v 25.000000 0.000000 0.000000
-v 0.000000 14.960000 0.000000
-v 25.000000 14.960000 0.000000
-s off
-f 1 2 4 3
diff --git a/src/argaze/utils/demo_data/aruco_markers_group.json b/src/argaze/utils/demo_data/aruco_markers_group.json
deleted file mode 100644
index e103d14..0000000
--- a/src/argaze/utils/demo_data/aruco_markers_group.json
+++ /dev/null
@@ -1,25 +0,0 @@
-{
- "dictionary": "DICT_APRILTAG_16h5",
- "places": {
- "0": {
- "translation": [-2.5, 17.5, 0],
- "rotation": [0.0, 0.0, 0.0],
- "size": 5
- },
- "1": {
- "translation": [27.5, 17.5, 0],
- "rotation": [0.0, 0.0, 0.0],
- "size": 5
- },
- "2": {
- "translation": [-2.5, -2.5, 0],
- "rotation": [0.0, 0.0, 0.0],
- "size": 5
- },
- "3": {
- "translation": [27.5, -2.5, 0],
- "rotation": [0.0, 0.0, 0.0],
- "size": 5
- }
- }
-}
\ No newline at end of file
diff --git a/src/argaze/utils/demo_data/aruco_markers_group.obj b/src/argaze/utils/demo_data/aruco_markers_group.obj
deleted file mode 100644
index 83935ef..0000000
--- a/src/argaze/utils/demo_data/aruco_markers_group.obj
+++ /dev/null
@@ -1,32 +0,0 @@
-o DICT_APRILTAG_16h5#0_Marker
-v -5.000000 14.960000 0.000000
-v 0.000000 14.960000 0.000000
-v -5.000000 19.959999 0.000000
-v 0.000000 19.959999 0.000000
-vn 0.0000 0.0000 1.0000
-s off
-f 1//1 2//1 4//1 3//1
-o DICT_APRILTAG_16h5#1_Marker
-v 25.000000 14.960000 0.000000
-v 30.000000 14.960000 0.000000
-v 25.000000 19.959999 0.000000
-v 30.000000 19.959999 0.000000
-vn 0.0000 0.0000 1.0000
-s off
-f 5//2 6//2 8//2 7//2
-o DICT_APRILTAG_16h5#2_Marker
-v -5.000000 -5.000000 0.000000
-v 0.000000 -5.000000 0.000000
-v -5.000000 0.000000 0.000000
-v 0.000000 0.000000 0.000000
-vn 0.0000 0.0000 1.0000
-s off
-f 9//3 10//3 12//3 11//3
-o DICT_APRILTAG_16h5#3_Marker
-v 25.000000 -5.000000 0.000000
-v 30.000000 -5.000000 0.000000
-v 25.000000 0.000000 0.000000
-v 30.000000 0.000000 0.000000
-vn 0.0000 0.0000 1.0000
-s off
-f 13//4 14//4 16//4 15//4
diff --git a/src/argaze/utils/demo_data/demo.mov b/src/argaze/utils/demo_data/demo.mov
deleted file mode 100644
index bba7999..0000000
Binary files a/src/argaze/utils/demo_data/demo.mov and /dev/null differ
diff --git a/src/argaze/utils/demo_data/demo_aruco_markers_setup.json b/src/argaze/utils/demo_data/demo_aruco_markers_setup.json
deleted file mode 100644
index 9a95524..0000000
--- a/src/argaze/utils/demo_data/demo_aruco_markers_setup.json
+++ /dev/null
@@ -1,167 +0,0 @@
-{
- "name": "demo_camera",
- "size": [1280, 720],
- "aruco_detector": {
- "dictionary": "DICT_APRILTAG_16h5",
- "parameters": {
- "useAruco3Detection": 1
- }
- },
- "layers": {
- "demo_layer": {}
- },
- "image_parameters": {
- "background_weight": 1,
- "draw_layers": {
- "demo_layer": {
- "draw_aoi_scene": {
- "draw_aoi": {
- "color": [255, 255, 255],
- "border_size": 1
- }
- }
- }
- },
- "draw_gaze_positions": {
- "color": [0, 255, 255],
- "size": 4
- },
- "draw_detected_markers": {
- "color": [0, 255, 0],
- "draw_axes": {
- "thickness": 3
- }
- },
- "draw_scenes": {
- "demo_scene": {
- "draw_aruco_markers_group": {
- "draw_axes": {
- "thickness": 3,
- "length": 10
- }
- }
- }
- }
- },
- "scenes": {
- "demo_scene" : {
- "aruco_markers_group": "aruco_markers_group.json",
- "layers": {
- "demo_layer" : {
- "aoi_scene": "aoi_3d_scene.obj"
- }
- },
- "frames": {
- "GrayRectangle": {
- "size": [1920, 1149],
- "background": "frame_background.jpg",
- "gaze_movement_identifier": {
- "argaze.GazeAnalysis.DispersionThresholdIdentification.GazeMovementIdentifier": {
- "deviation_max_threshold": 50,
- "duration_min_threshold": 200
- }
- },
- "scan_path": {
- "duration_max": 10000
- },
- "scan_path_analyzers": {
- "argaze.GazeAnalysis.Basic.ScanPathAnalyzer": {},
- "argaze.GazeAnalysis.KCoefficient.ScanPathAnalyzer": {},
- "argaze.GazeAnalysis.NearestNeighborIndex.ScanPathAnalyzer": {
- "size": [1920, 1149]
- },
- "argaze.GazeAnalysis.ExploreExploitRatio.ScanPathAnalyzer": {
- "short_fixation_duration_threshold": 0
- }
- },
- "layers": {
- "demo_layer": {
- "aoi_scene": "aoi_2d_scene.json",
- "aoi_matcher": {
- "argaze.GazeAnalysis.FocusPointInside.AOIMatcher": {}
- },
- "aoi_scan_path": {
- "duration_max": 30000
- },
- "aoi_scan_path_analyzers": {
- "argaze.GazeAnalysis.Basic.AOIScanPathAnalyzer": {},
- "argaze.GazeAnalysis.TransitionMatrix.AOIScanPathAnalyzer": {},
- "argaze.GazeAnalysis.KCoefficient.AOIScanPathAnalyzer": {},
- "argaze.GazeAnalysis.LempelZivComplexity.AOIScanPathAnalyzer": {},
- "argaze.GazeAnalysis.NGram.AOIScanPathAnalyzer": {
- "n_min": 3,
- "n_max": 3
- },
- "argaze.GazeAnalysis.Entropy.AOIScanPathAnalyzer":{}
- },
- "observers": {
- "demo_loggers.AOIScanPathAnalysisLogger": {
- "path": "_export/logs/aoi_scan_path_metrics.csv",
- "header": ["Timestamp (ms)", "Duration (ms)", "Step", "K", "LZC"]
- }
- }
- }
- },
- "image_parameters": {
- "background_weight": 1,
- "draw_scan_path": {
- "draw_fixations": {
- "deviation_circle_color": [255, 0, 255],
- "duration_border_color": [127, 0, 127],
- "duration_factor": 1e-2
- },
- "draw_saccades": {
- "line_color": [255, 0, 255]
- }
- },
- "draw_layers": {
- "demo_layer": {
- "draw_aoi_scene": {
- "draw_aoi": {
- "color": [255, 255, 255],
- "border_size": 1
- }
- },
- "draw_aoi_matching": {
- "draw_looked_aoi": {
- "color": [0, 255, 255],
- "border_size": 10
- },
- "looked_aoi_name_color": [255, 255, 255],
- "looked_aoi_name_offset": [10, 10]
- }
- }
- },
- "draw_fixations": {
- "deviation_circle_color": [255, 255, 255],
- "duration_border_color": [127, 0, 127],
- "duration_factor": 1e-2
- },
- "draw_gaze_positions": {
- "color": [0, 255, 255],
- "size": 2
- }
- },
- "observers": {
- "demo_loggers.FixationLogger": {
- "path": "_export/logs/fixations.csv",
- "header": ["Timestamp (ms)", "Focus (px)", "Duration (ms)", "AOI"]
- },
- "demo_loggers.ScanPathAnalysisLogger": {
- "path": "_export/logs/scan_path_metrics.csv",
- "header": ["Timestamp (ms)", "Duration (ms)", "Step", "K", "NNI", "XXR"]
- },
- "demo_loggers.VideoRecorder": {
- "path": "_export/logs/video.mp4",
- "width": 1920,
- "height": 1080,
- "fps": 15
- }
- }
- }
- },
- "angle_tolerance": 15.0,
- "distance_tolerance": 2.54
- }
- }
-}
\ No newline at end of file
diff --git a/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json b/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json
deleted file mode 100644
index c897fa0..0000000
--- a/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json
+++ /dev/null
@@ -1,133 +0,0 @@
-{
- "name": "demo_frame",
- "size": [1920, 1149],
- "background": "frame_background.jpg",
- "gaze_movement_identifier": {
- "argaze.GazeAnalysis.DispersionThresholdIdentification.GazeMovementIdentifier": {
- "deviation_max_threshold": 50,
- "duration_min_threshold": 200
- }
- },
- "filter_in_progress_identification": false,
- "scan_path": {
- "duration_max": 10000
- },
- "scan_path_analyzers": {
- "argaze.GazeAnalysis.Basic.ScanPathAnalyzer": {},
- "argaze.GazeAnalysis.KCoefficient.ScanPathAnalyzer": {},
- "argaze.GazeAnalysis.NearestNeighborIndex.ScanPathAnalyzer": {
- "size": [1920, 1149]
- },
- "argaze.GazeAnalysis.ExploreExploitRatio.ScanPathAnalyzer": {
- "short_fixation_duration_threshold": 0
- }
- },
- "heatmap": {
- "size": [320, 240]
- },
- "layers": {
- "demo_layer": {
- "aoi_scene": "aoi_2d_scene.json",
- "aoi_matcher": {
- "argaze.GazeAnalysis.DeviationCircleCoverage.AOIMatcher": {
- "coverage_threshold": 0.5
- }
- },
- "aoi_scan_path": {
- "duration_max": 30000
- },
- "aoi_scan_path_analyzers": {
- "argaze.GazeAnalysis.Basic.AOIScanPathAnalyzer": {},
- "argaze.GazeAnalysis.TransitionMatrix.AOIScanPathAnalyzer": {},
- "argaze.GazeAnalysis.KCoefficient.AOIScanPathAnalyzer": {},
- "argaze.GazeAnalysis.LempelZivComplexity.AOIScanPathAnalyzer": {},
- "argaze.GazeAnalysis.NGram.AOIScanPathAnalyzer": {
- "n_min": 3,
- "n_max": 3
- },
- "argaze.GazeAnalysis.Entropy.AOIScanPathAnalyzer":{}
- },
- "observers": {
- "demo_loggers.AOIScanPathAnalysisLogger": {
- "path": "_export/logs/aoi_scan_path_metrics.csv",
- "header": ["Timestamp (ms)", "Duration (ms)", "Step", "K", "LZC"]
- }
- }
- }
- },
- "image_parameters": {
- "background_weight": 1,
- "heatmap_weight": 0.5,
- "draw_scan_path": {
- "draw_fixations": {
- "deviation_circle_color": [255, 0, 255],
- "duration_border_color": [127, 0, 127],
- "duration_factor": 1e-2
- },
- "draw_saccades": {
- "line_color": [255, 0, 255]
- }
- },
- "draw_layers": {
- "demo_layer": {
- "draw_aoi_scene": {
- "draw_aoi": {
- "color": [255, 255, 255],
- "border_size": 1
- }
- },
- "draw_aoi_matching": {
- "draw_matched_fixation": {
- "deviation_circle_color": [255, 255, 255],
- "draw_positions": {
- "position_color": [0, 255, 0],
- "line_color": [0, 0, 0]
- }
- },
- "draw_matched_region": {
- "color": [0, 255, 0],
- "border_size": 4
- },
- "draw_looked_aoi": {
- "color": [0, 255, 0],
- "border_size": 2
- },
- "looked_aoi_name_color": [255, 255, 255],
- "looked_aoi_name_offset": [0, -10]
- }
- }
- },
- "draw_fixations": {
- "deviation_circle_color": [255, 255, 255],
- "duration_border_color": [127, 0, 127],
- "duration_factor": 1e-2,
- "draw_positions": {
- "position_color": [0, 255, 255],
- "line_color": [0, 0, 0]
- }
- },
- "draw_saccades": {
- "line_color": [255, 0, 255]
- },
- "draw_gaze_positions": {
- "color": [0, 255, 255],
- "size": 2
- }
- },
- "observers": {
- "demo_loggers.FixationLogger": {
- "path": "_export/logs/fixations.csv",
- "header": ["Timestamp (ms)", "Focus (px)", "Duration (ms)", "AOI"]
- },
- "demo_loggers.ScanPathAnalysisLogger": {
- "path": "_export/logs/scan_path_metrics.csv",
- "header": ["Timestamp (ms)", "Duration (ms)", "Step", "K", "NNI", "XXR"]
- },
- "demo_loggers.VideoRecorder": {
- "path": "_export/logs/video.mp4",
- "width": 1920,
- "height": 1080,
- "fps": 15
- }
- }
-}
\ No newline at end of file
diff --git a/src/argaze/utils/demo_data/demo_loggers.py b/src/argaze/utils/demo_data/demo_loggers.py
deleted file mode 100644
index 5f1986e..0000000
--- a/src/argaze/utils/demo_data/demo_loggers.py
+++ /dev/null
@@ -1,84 +0,0 @@
-"""
-
-This program is free software: you can redistribute it and/or modify it under
-the terms of the GNU General Public License as published by the Free Software
-Foundation, either version 3 of the License, or (at your option) any later
-version.
-This program is distributed in the hope that it will be useful, but WITHOUT
-ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
-You should have received a copy of the GNU General Public License along with
-this program. If not, see .
-"""
-
-__author__ = "Théo de la Hogue"
-__credits__ = []
-__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
-__license__ = "GPLv3"
-
-from argaze import DataFeatures, GazeFeatures
-from argaze.GazeAnalysis import *
-from argaze.utils import UtilsFeatures
-
-class FixationLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter):
-
- def on_look(self, timestamp, frame, exception):
- """Log frame fixations."""
-
- # Log fixations
- if GazeFeatures.is_fixation(frame.last_gaze_movement()) and frame.last_gaze_movement().is_finished():
-
- log = (
- timestamp,
- frame.last_gaze_movement().focus,
- frame.last_gaze_movement().duration,
- frame.layers['demo_layer'].last_looked_aoi_name()
- )
-
- self.write(log)
-
-class ScanPathAnalysisLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter):
-
- def on_look(self, timestamp, frame, exception):
- """Log frame scan path metrics."""
-
- if frame.is_analysis_available():
-
- analysis = frame.analysis()
-
- log = (
- timestamp,
- analysis[Basic.ScanPathAnalyzer].path_duration,
- analysis[Basic.ScanPathAnalyzer].steps_number,
- analysis[KCoefficient.ScanPathAnalyzer].K,
- analysis[NearestNeighborIndex.ScanPathAnalyzer].nearest_neighbor_index,
- analysis[ExploreExploitRatio.ScanPathAnalyzer].explore_exploit_ratio
- )
-
- self.write(log)
-
-class VideoRecorder(DataFeatures.PipelineStepObserver, UtilsFeatures.VideoWriter):
-
- def on_look(self, timestamp, frame, exception):
- """Write frame image."""
-
- self.write(frame.image())
-
-class AOIScanPathAnalysisLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter):
-
- def on_look(self, timestamp, layer, exception):
- """Log layer aoi scan path metrics"""
-
- if layer.is_analysis_available():
-
- analysis = layer.analysis()
-
- log = (
- timestamp,
- analysis[Basic.AOIScanPathAnalyzer].path_duration,
- analysis[Basic.AOIScanPathAnalyzer].steps_number,
- analysis[KCoefficient.AOIScanPathAnalyzer].K,
- analysis[LempelZivComplexity.AOIScanPathAnalyzer].lempel_ziv_complexity
- )
-
- self.write(log)
diff --git a/src/argaze/utils/demo_data/frame_background.jpg b/src/argaze/utils/demo_data/frame_background.jpg
deleted file mode 100644
index 7aabe63..0000000
Binary files a/src/argaze/utils/demo_data/frame_background.jpg and /dev/null differ
diff --git a/src/argaze/utils/demo_data/provider_setup.json b/src/argaze/utils/demo_data/provider_setup.json
deleted file mode 100644
index eac909c..0000000
--- a/src/argaze/utils/demo_data/provider_setup.json
+++ /dev/null
@@ -1,16 +0,0 @@
-{
- "argaze.utils.Providers.TobiiProGlasses2.Provider" : {
- "address": "10.34.0.12",
- "project": "MyProject",
- "participant": "NewParticipant",
- "configuration": {
- "sys_ec_preset": "Indoor",
- "sys_sc_width": 1920,
- "sys_sc_height": 1080,
- "sys_sc_fps": 25,
- "sys_sc_preset": "Auto",
- "sys_et_freq": 50,
- "sys_mems_freq": 100
- }
- }
-}
\ No newline at end of file
diff --git a/src/argaze/utils/eyetrackers/TobiiProGlasses2.py b/src/argaze/utils/eyetrackers/TobiiProGlasses2.py
new file mode 100644
index 0000000..94f31a7
--- /dev/null
+++ b/src/argaze/utils/eyetrackers/TobiiProGlasses2.py
@@ -0,0 +1,1046 @@
+""" Handle network connection to Tobii Pro Glasses 2 device.
+ It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py).
+
+This program is free software: you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation, either version 3 of the License, or (at your option) any later
+version.
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+You should have received a copy of the GNU General Public License along with
+this program. If not, see .
+"""
+
+__author__ = "Théo de la Hogue"
+__credits__ = []
+__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
+__license__ = "GPLv3"
+
+import sys
+import logging
+import socket
+import threading
+import collections
+import json
+import time
+import datetime
+import uuid
+from dataclasses import dataclass
+
+try:
+ from urllib.parse import urlparse, urlencode
+ from urllib.request import urlopen, Request
+ from urllib.error import URLError, HTTPError
+
+except ImportError:
+ from urlparse import urlparse
+ from urllib import urlencode
+ from urllib2 import urlopen, Request, HTTPError, URLError
+
+from argaze import ArFeatures, DataFeatures, GazeFeatures
+from argaze.utils import UtilsFeatures
+
+import numpy
+import cv2
+import av
+
+socket.IPPROTO_IPV6 = 41
+
+TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f'
+TOBII_DATETIME_FORMAT_HUMREAD = '%d/%m/%Y %H:%M:%S'
+
+DEFAULT_PROJECT_NAME = 'DefaultProject'
+DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant'
+DEFAULT_RECORD_NAME = 'DefaultRecord'
+
+# Define extra classes to support Tobii data parsing
+@dataclass
+class DirSig():
+ """Define dir sig data (dir sig)."""
+
+ dir: int # meaning ?
+ sig: int # meaning ?
+
+@dataclass
+class PresentationTimeStamp():
+ """Define presentation time stamp (pts) data."""
+
+ value: int
+ """Pts value."""
+
+@dataclass
+class VideoTimeStamp():
+ """Define video time stamp (vts) data."""
+
+ value: int
+ """Vts value."""
+
+ offset: int
+ """Primary time stamp value."""
+
+@dataclass
+class EventSynch():
+ """Define event synch (evts) data."""
+
+ value: int # meaning ?
+ """Evts value."""
+
+@dataclass
+class Event():
+ """Define event data (ets type tag)."""
+
+ ets: int # meaning ?
+ type: str
+ tag: str # dict ?
+
+@dataclass
+class Accelerometer():
+ """Define accelerometer data (ac)."""
+
+ value: numpy.array
+ """Accelerometer value"""
+
+@dataclass
+class Gyroscope():
+ """Define gyroscope data (gy)."""
+
+ value: numpy.array
+ """Gyroscope value"""
+
+@dataclass
+class PupillCenter():
+ """Define pupill center data (gidx pc eye)."""
+
+ validity: int
+ index: int
+ value: tuple((float, float, float))
+ eye: str # 'right' or 'left'
+
+@dataclass
+class PupillDiameter():
+ """Define pupill diameter data (gidx pd eye)."""
+
+ validity: int
+ index: int
+ value: float
+ eye: str # 'right' or 'left'
+
+@dataclass
+class GazeDirection():
+ """Define gaze direction data (gidx gd eye)."""
+
+ validity: int
+ index: int
+ value: tuple((float, float, float))
+ eye: str # 'right' or 'left'
+
+@dataclass
+class GazePosition():
+ """Define gaze position data (gidx l gp)."""
+
+ validity: int
+ index: int
+ l: str # ?
+ value: tuple((float, float))
+
+@dataclass
+class GazePosition3D():
+ """Define gaze position 3D data (gidx gp3)."""
+
+ validity: int
+ index: int
+ value: tuple((float, float))
+
+@dataclass
+class MarkerPosition():
+ """Define marker data (marker3d marker2d)."""
+
+ value_3d: tuple((float, float, float))
+ value_2d: tuple((float, float))
+
+class TobiiJsonDataParser():
+
+ def __init__(self):
+
+ self.__first_ts = 0
+
+ self.__parse_data_map = {
+ 'dir': self.__parse_dir_sig,
+ 'pts': self.__parse_pts,
+ 'vts': self.__parse_vts,
+ 'evts': self.__parse_event_synch,
+ 'ets': self.__parse_event,
+ 'ac': self.__parse_accelerometer,
+ 'gy': self.__parse_gyroscope,
+ 'gidx': self.__parse_pupill_or_gaze,
+ 'marker3d': self.__parse_marker_position
+ }
+
+ self.__parse_pupill_or_gaze_map = {
+ 'pc': self.__parse_pupill_center,
+ 'pd': self.__parse_pupill_diameter,
+ 'gd': self.__parse_gaze_direction,
+ 'l': self.__parse_gaze_position,
+ 'gp3': self.__parse_gaze_position_3d
+ }
+
+ def parse(self, data):
+
+ json_data = json.loads(data.decode('utf-8'))
+
+ # Parse data status
+ status = json_data.pop('s', -1)
+
+ # Parse timestamp
+ data_ts = json_data.pop('ts')
+
+ # Parse data depending first json key
+ first_key = next(iter(json_data))
+
+ # Convert json data into data object
+ data_object = self.__parse_data_map[first_key](status, json_data)
+ data_object_type = type(data_object).__name__
+
+ # Keep first timestamp to offset all timestamps
+ if self.__first_ts == 0:
+ self.__first_ts = data_ts
+
+ data_ts -= self.__first_ts
+
+ return data_ts, data_object, data_object_type
+
+ def __parse_pupill_or_gaze(self, status, json_data):
+
+ gaze_index = json_data.pop('gidx')
+
+ # parse pupill or gaze data depending second json key
+ second_key = next(iter(json_data))
+
+ return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, json_data)
+
+ def __parse_dir_sig(self, status, json_data):
+
+ return DirSig(json_data['dir'], json_data['sig'])
+
+ def __parse_pts(self, status, json_data):
+
+ return PresentationTimeStamp(json_data['pts'])
+
+ def __parse_vts(self, status, json_data):
+
+ # ts is not sent when recording
+ try:
+
+ ts = json_data['ts']
+
+ except KeyError:
+
+ ts = -1
+
+ return VideoTimeStamp(json_data['vts'], ts)
+
+ def __parse_event_synch(self, status, json_data):
+
+ return EventSynch(json_data['evts'])
+
+ def __parse_event(self, status, json_data):
+
+ return Event(json_data['ets'], json_data['type'], json_data['tag'])
+
+ def __parse_accelerometer(self, status, json_data):
+
+ return Accelerometer(json_data['ac'])
+
+ def __parse_gyroscope(self, status, json_data):
+
+ return Gyroscope(json_data['gy'])
+
+ def __parse_pupill_center(self, status, gaze_index, json_data):
+
+ return PupillCenter(status, gaze_index, json_data['pc'], json_data['eye'])
+
+ def __parse_pupill_diameter(self, status, gaze_index, json_data):
+
+ return PupillDiameter(status, gaze_index, json_data['pd'], json_data['eye'])
+
+ def __parse_gaze_direction(self, status, gaze_index, json_data):
+
+ return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye'])
+
+ def __parse_gaze_position(self, status, gaze_index, json_data):
+
+ return GazePosition(status, gaze_index, json_data['l'], json_data['gp'])
+
+ def __parse_gaze_position_3d(self, status, gaze_index, json_data):
+
+ return GazePosition3D(status, gaze_index, json_data['gp3'])
+
+ def __parse_marker_position(self, status, json_data):
+
+ return MarkerPosition(json_data['marker3d'], json_data['marker2d'])
+
+class LiveStream(ArFeatures.ArContext):
+
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+
+ # Init parent classes
+ super().__init__()
+
+ # Init private attributes
+ self.__address = None
+ self.__udpport = 49152
+
+ self.__project_name = None
+ self.__project_id = None
+
+ self.__participant_name = None
+ self.__participant_id = None
+
+ self.__configuration = {}
+
+ self.__video_borders_size = 0
+
+ self.__parser = TobiiJsonDataParser()
+
+ @property
+ def address(self) -> str:
+ """Network address where to find the device."""
+ return self.__address
+
+ @address.setter
+ def address(self, address:str):
+
+ self.__address = address
+
+ # Remove part after % on under Windows
+ if "%" in self.__address:
+
+ if sys.platform == "win32":
+
+ self.__address = self.__address.split("%")[0]
+
+ # Define base url
+ if ':' in self.__address:
+
+ self.__base_url = f'http://[{self.__address}]'
+
+ else:
+
+ self.__base_url = 'http://' + self.__address
+
+ @property
+ def configuration(self)-> dict:
+ """Patch system configuration dictionary."""
+ return self.__configuration
+
+ @configuration.setter
+ @DataFeatures.PipelineStepAttributeSetter
+ def configuration(self, configuration: dict):
+
+ self.__configuration = configuration
+
+ @property
+ def project(self) -> str:
+ """Project name."""
+ return self.__project_name
+
+ @project.setter
+ def project(self, project:str):
+
+ self.__project_name = project
+
+ def __bind_project(self):
+ """Bind to a project or create one if it doesn't exist."""
+
+ if self.__project_name is None:
+
+ raise Exception(f'Project binding fails: setup project before.')
+
+ self.__project_id = None
+
+ # Check if project exist
+ projects = self.__get_request('/api/projects')
+
+ for project in projects:
+
+ try:
+
+ if project['pr_info']['Name'] == self.__project_name:
+
+ self.__project_id = project['pr_id']
+
+ logging.debug('> %s project already exist: %s', self.__project_name, self.__project_id)
+
+ except:
+
+ pass
+
+ # The project doesn't exist, create one
+ if self.__project_id is None:
+
+ logging.debug('> %s project doesn\'t exist', self.__project_name)
+
+ data = {
+ 'pr_info' : {
+ 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD),
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)),
+ 'Name': self.__project_name
+ },
+ 'pr_created': self.__get_current_datetime()
+ }
+
+ json_data = self.__post_request('/api/projects', data)
+
+ self.__project_id = json_data['pr_id']
+
+ logging.debug('> new %s project created: %s', self.__project_name, self.__project_id)
+
+ @property
+ def participant(self)-> str:
+ """Participant name"""
+ return self.__participant_name
+
+ @participant.setter
+ def participant(self, participant:str):
+
+ self.__participant_name = participant
+
+ def __bind_participant(self):
+ """Bind to a participant or create one if it doesn't exist.
+
+ !!! warning
+ Bind to a project before.
+ """
+
+ if self.__participant_name is None:
+
+ raise Exception(f'Participant binding fails: setup participant before.')
+
+ if self.__project_id is None :
+
+ raise Exception(f'Participant binding fails: bind to a project before')
+
+ self.__participant_id = None
+
+ # Check if participant exist
+ participants = self.__get_request('/api/participants')
+
+ for participant in participants:
+
+ try:
+
+ if participant['pa_info']['Name'] == self.__participant_name:
+
+ self.__participant_id = participant['pa_id']
+
+ logging.debug('> %s participant already exist: %s', self.__participant_name, self.__participant_id)
+
+ except:
+
+ pass
+
+ # The participant doesn't exist, create one
+ if self.__participant_id is None:
+
+ logging.debug('> %s participant doesn\'t exist', self.__participant_name)
+
+ data = {
+ 'pa_project': self.__project_id,
+ 'pa_info': {
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__participant_name)),
+ 'Name': self.__participant_name,
+ 'Notes': '' # TODO: set participant notes
+ },
+ 'pa_created': self.__get_current_datetime()
+ }
+
+ json_data = self.__post_request('/api/participants', data)
+
+ self.__participant_id = json_data['pa_id']
+
+ logging.debug('> new %s participant created: %s', self.__participant_name, self.__participant_id)
+
+ def __enter__(self):
+
+ logging.info('Tobii Pro Glasses 2 connexion starts...')
+ logging.debug('TobiiProGlasses2.Provider.__enter__')
+
+ # Update current configuration with configuration patch
+ logging.debug('> updating configuration')
+
+ configuration = self.__get_request('/api/system/conf')
+
+ if self.__configuration:
+
+ #configuration.update(self.__configuration)
+ configuration = self.__post_request('/api/system/conf', self.__configuration)
+
+ # Log current configuration
+ logging.info('Tobii Pro Glasses 2 configuration:')
+
+ for key, value in configuration.items():
+
+ logging.info('%s: %s', key, str(value))
+
+ # Store video stream dimension
+ self.__video_width = configuration['sys_sc_width']
+ self.__video_height = configuration['sys_sc_height']
+
+ # Bind to project if required
+ if self.__project_name is not None:
+
+ logging.debug('> binding project %s', self.__project_name)
+
+ self.__bind_project()
+
+ logging.info('Tobii Pro Glasses 2 project id: %s', self.__project_id)
+
+ # Bind to participant if required
+ if self.__participant_name is not None:
+
+ logging.debug('> binding participant %s', self.__participant_name)
+
+ self.__bind_participant()
+
+ logging.info('Tobii Pro Glasses 2 participant id: %s', self.__participant_id)
+
+ # Create stop event
+ self.__stop_event = threading.Event()
+
+ # Open data stream
+ self.__data_socket = self.__make_socket()
+ self.__data_thread = threading.Thread(target = self.__stream_data)
+ self.__data_thread.daemon = True
+
+ logging.debug('> starting data thread...')
+ self.__data_thread.start()
+
+ # Open video stream
+ self.__video_socket = self.__make_socket()
+ self.__video_thread = threading.Thread(target = self.__stream_video)
+ self.__video_thread.daemon = True
+
+ logging.debug('> starting video thread...')
+ self.__video_thread.start()
+
+ # Keep connection alive
+ self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
+ self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
+ self.__keep_alive_thread.daemon = True
+
+ logging.debug('> starting keep alive thread...')
+ self.__keep_alive_thread.start()
+
+ return self
+
+ def __exit__(self, exception_type, exception_value, exception_traceback):
+
+ logging.debug('TobiiProGlasses2.Provider.__exit__')
+
+ # Close data stream
+ self.__stop_event.set()
+
+ # Stop keeping connection alive
+ threading.Thread.join(self.__keep_alive_thread)
+ self.__keep_alive_thread = None
+
+ # Stop data streaming
+ threading.Thread.join(self.__data_thread)
+ self.__data_thread = None
+
+ def __make_socket(self):
+ """Create a socket to enable network communication."""
+
+ iptype = socket.AF_INET
+
+ if ':' in self.__address:
+
+ iptype = socket.AF_INET6
+
+ res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)
+ family, socktype, proto, canonname, sockaddr = res[0]
+ new_socket = socket.socket(family, socktype, proto)
+
+ new_socket.settimeout(5.0)
+
+ try:
+
+ if iptype == socket.AF_INET6:
+
+ new_socket.setsockopt(socket.SOL_SOCKET, 25, 1)
+
+ except socket.error as e:
+
+ if e.errno == 1:
+
+ logging.error('Binding to a network interface is permitted only for root users.')
+
+ return new_socket
+
+ def __stream_data(self):
+ """Stream data from dedicated socket."""
+
+ logging.debug('TobiiProGlasses2.Provider.__stream_data')
+
+ while not self.__stop_event.is_set():
+
+ try:
+
+ data, _ = self.__data_socket.recvfrom(1024)
+
+ except TimeoutError:
+
+ logging.error('> timeout occurred while receiving data')
+ continue
+
+ if data is not None:
+
+ # Parse json into timestamped data object
+ data_ts, data_object, data_object_type = self.__parser.parse(data)
+
+ # Edit millisecond timestamp
+ timestamp = int(data_ts * 1e-3)
+
+ match data_object_type:
+
+ case 'GazePosition':
+
+ # When gaze position is valid
+ if data_object.validity == 0:
+
+ # Process timestamped gaze position
+ self._process_gaze_position(
+ timestamp = timestamp,
+ x = int(data_object.value[0] * self.__video_width),
+ y = int(data_object.value[1] * self.__video_height) )
+
+ else:
+
+ # Process empty gaze position
+ self._process_gaze_position(timestamp = timestamp)
+
+ def __stream_video(self):
+ """Stream video from dedicated socket."""
+
+ logging.debug('TobiiProGlasses2.Provider.__stream_video')
+
+ container = av.open(f'rtsp://{self.__address}:8554/live/scene', options={'rtsp_transport': 'tcp'})
+ self.__stream = container.streams.video[0]
+ self.__buffer = collections.OrderedDict()
+
+ for image in container.decode(self.__stream):
+
+ logging.debug('> new image decoded')
+
+ # Quit if the video acquisition thread have been stopped
+ if self.__stop_event.is_set():
+
+ logging.debug('> stop event is set')
+ break
+
+ if image is not None:
+
+ if image.time is not None:
+
+ timestamp = int(image.time * 1e6)
+ image = image.to_ndarray(format='bgr24')
+
+ logging.debug('> image timestamp: %f', timestamp)
+
+ # Process camera image
+ self._process_camera_image(
+ timestamp = timestamp,
+ image = image)
+
+ def __keep_alive(self):
+ """Maintain network connection."""
+
+ logging.debug('TobiiProGlasses2.Provider.__keep_alive')
+
+ while not self.__stop_event.is_set():
+
+ self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
+ self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
+
+ time.sleep(1)
+
+ def __get_request(self, api_action) -> str:
+ """Send a GET request and get data back."""
+
+ url = self.__base_url + api_action
+
+ logging.debug('TobiiProGlasses2.Provider.__get_request %s', url)
+
+ res = urlopen(url).read()
+
+ try:
+
+ data = json.loads(res.decode('utf-8'))
+
+ except json.JSONDecodeError:
+
+ data = None
+
+ logging.debug('TobiiProGlasses2.Provider.__get_request received: %s', data)
+
+ return data
+
+ def __post_request(self, api_action, data = None, wait_for_response = True) -> str:
+ """Send a POST request and get result back."""
+
+ logging.debug('TobiiProGlasses2.Provider.__post_request %s', api_action)
+
+ url = self.__base_url + api_action
+ req = Request(url)
+ req.add_header('Content-Type', 'application/json')
+ data = json.dumps(data)
+
+ if wait_for_response is False:
+
+ threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start()
+
+ return None
+
+ response = urlopen(req, data.encode('utf-8'))
+ res = response.read()
+
+ try:
+
+ res = json.loads(res.decode('utf-8'))
+
+ except:
+
+ pass
+
+ return res
+
+ def __wait_for_status(self, api_action, key, values, timeout = None) -> any:
+ """Wait until a status matches given values."""
+
+ url = self.__base_url + api_action
+ running = True
+
+ while running:
+
+ req = Request(url)
+ req.add_header('Content-Type', 'application/json')
+
+ try:
+
+ response = urlopen(req, None, timeout = timeout)
+
+ except URLError as e:
+
+ logging.error(e.reason)
+ return -1
+
+ data = response.read()
+ json_data = json.loads(data.decode('utf-8'))
+
+ if json_data[key] in values:
+ running = False
+
+ time.sleep(1)
+
+ return json_data[key]
+
+ def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT):
+
+ return datetime.datetime.now().replace(microsecond=0).strftime(timeformat)
+
+ # CALIBRATION
+
+ def calibration_start(self, project_name, participant_name):
+ """Start calibration process for project and participant."""
+
+ project_id = self.__get_project_id(project_name)
+ participant_id = self.get_participant_id(participant_name)
+
+ # Init calibration id
+ self.__calibration_id = None
+
+ # Calibration have to be done for a project and a participant
+ if project_id is None or participant_id is None:
+
+ raise Exception(f'Setup project and participant before')
+
+ data = {
+ 'ca_project': project_id,
+ 'ca_type': 'default',
+ 'ca_participant': participant_id,
+ 'ca_created': self.__get_current_datetime()
+ }
+
+ # Request calibration
+ json_data = self.__post_request('/api/calibrations', data)
+ self.__calibration_id = json_data['ca_id']
+
+ # Start calibration
+ self.__post_request('/api/calibrations/' + self.__calibration_id + '/start')
+
+ def calibration_status(self) -> str:
+ """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed."""
+
+ if self.__calibration_id is not None:
+
+ status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
+
+ # Forget calibration id
+ if status != 'calibrating':
+
+ self.__calibration_id = None
+
+ return status
+
+ else:
+
+ raise Exception(f'Start calibration before')
+
+ def calibrate(self, project_name, participant_name):
+ """Handle whole Tobii glasses calibration process."""
+
+ # Start calibration
+ self.calibration_start(project_name, participant_name)
+
+ # While calibrating...
+ status = self.calibration_status()
+
+ while status == 'calibrating':
+
+ time.sleep(1)
+ status = self.calibration_status()
+
+ if status == 'uncalibrated' or status == 'stale' or status == 'failed':
+
+ raise Exception(f'Calibration {status}')
+
+ # CALIBRATION
+
+ def calibration_start(self, project_name, participant_name):
+ """Start calibration process for project and participant."""
+
+ project_id = self.__get_project_id(project_name)
+ participant_id = self.get_participant_id(participant_name)
+
+ # Init calibration id
+ self.__calibration_id = None
+
+ # Calibration have to be done for a project and a participant
+ if project_id is None or participant_id is None:
+
+ raise Exception(f'Setup project and participant before')
+
+ data = {
+ 'ca_project': project_id,
+ 'ca_type': 'default',
+ 'ca_participant': participant_id,
+ 'ca_created': self.__get_current_datetime()
+ }
+
+ # Request calibration
+ json_data = self.__post_request('/api/calibrations', data)
+ self.__calibration_id = json_data['ca_id']
+
+ # Start calibration
+ self.__post_request('/api/calibrations/' + self.__calibration_id + '/start')
+
+ def calibration_status(self) -> str:
+ """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed."""
+
+ if self.__calibration_id is not None:
+
+ status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
+
+ # Forget calibration id
+ if status != 'calibrating':
+
+ self.__calibration_id = None
+
+ return status
+
+ else:
+
+ raise Exception(f'Start calibration before')
+
+ def calibrate(self, project_name, participant_name):
+ """Handle whole Tobii glasses calibration process."""
+
+ # Start calibration
+ self.calibration_start(project_name, participant_name)
+
+ # While calibrating...
+ status = self.calibration_status()
+
+ while status == 'calibrating':
+
+ time.sleep(1)
+ status = self.calibration_status()
+
+ if status == 'uncalibrated' or status == 'stale' or status == 'failed':
+
+ raise Exception(f'Calibration {status}')
+
+ # RECORDING FEATURES
+
+ def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']):
+ return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array)
+
+ def create_recording(self, participant_name, recording_name = '', recording_notes = '') -> str:
+ """Create a new recording.
+
+ Returns:
+ recording id
+ """
+
+ participant_id = self.get_participant_id(participant_name)
+
+ if participant_id is None:
+ raise NameError(f'{participant_name} participant doesn\'t exist')
+
+ data = {
+ 'rec_participant': participant_id,
+ 'rec_info': {
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)),
+ 'Name': recording_name,
+ 'Notes': recording_notes
+ },
+ 'rec_created': self.__get_current_datetime()
+ }
+
+ json_data = self.__post_request('/api/recordings', data)
+
+ return json_data['rec_id']
+
+ def start_recording(self, recording_id) -> bool:
+ """Start recording on the Tobii interface's SD Card."""
+
+ self.__post_request('/api/recordings/' + recording_id + '/start')
+ return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording'
+
+ def stop_recording(self, recording_id) -> bool:
+ """Stop recording on the Tobii interface's SD Card."""
+
+ self.__post_request('/api/recordings/' + recording_id + '/stop')
+ return self.__wait_for_recording_status(recording_id, ['done']) == "done"
+
+ def pause_recording(self, recording_id) -> bool:
+ """Pause recording on the Tobii interface's SD Card."""
+
+ self.__post_request('/api/recordings/' + recording_id + '/pause')
+ return self.__wait_for_recording_status(recording_id, ['paused']) == "paused"
+
+ def __get_recording_status(self):
+ return self.get_status()['sys_recording']
+
+ def get_current_recording_id(self) -> str:
+ """Get current recording id."""
+
+ return self.__get_recording_status()['rec_id']
+
+ @property
+ def recording(self) -> bool:
+ """Is it recording?"""
+
+ rec_status = self.__get_recording_status()
+
+ if rec_status != {}:
+ if rec_status['rec_state'] == "recording":
+ return True
+
+ return False
+
+ def get_recordings(self) -> str:
+ """Get all recordings id."""
+
+ return self.__get_request('/api/recordings')
+
+ # EVENTS AND EXPERIMENTAL VARIABLES
+
+ def __post_recording_data(self, event_type: str, event_tag = ''):
+ data = {'type': event_type, 'tag': event_tag}
+ self.__post_request('/api/events', data, wait_for_response=False)
+
+ def send_event(self, event_type: str, event_value = None):
+ self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value))
+
+ def send_variable(self, variable_name: str, variable_value = None):
+ self.__post_recording_data(str(variable_name), str(variable_value))
+
+ # MISC
+
+ def eject_sd(self):
+ self.__get_request('/api/eject')
+
+ def get_battery_info(self):
+ return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) )
+
+ def get_battery_level(self):
+ return self.get_battery_status()['level']
+
+ def get_battery_remaining_time(self):
+ return self.get_battery_status()['remaining_time']
+
+ def get_battery_status(self):
+ return self.get_status()['sys_battery']
+
+ def get_et_freq(self):
+ return self.get_configuration()['sys_et_freq']
+
+ def get_et_frequencies(self):
+ return self.get_status()['sys_et']['frequencies']
+
+ def identify(self):
+ self.__get_request('/api/identify')
+
+ def get_configuration(self):
+ return self.__get_request('/api/system/conf')
+
+ def get_status(self):
+ return self.__get_request('/api/system/status')
+
+ def get_storage_info(self):
+ return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) )
+
+ def get_storage_remaining_time(self):
+ return self.get_storage_status()['remaining_time']
+
+ def get_storage_status(self):
+ return self.get_status()['sys_storage']
+
+ def get_scene_camera_freq(self):
+ return self.get_configuration()['sys_sc_fps']
+
+ def set_et_freq_50(self):
+ data = {'sys_et_freq': 50}
+ json_data = self.__post_request('/api/system/conf', data)
+
+ def set_et_freq_100(self):
+ # May not be available. Check get_et_frequencies() first.
+ data = {'sys_et_freq': 100}
+ json_data = self.__post_request('/api/system/conf', data)
+
+ def set_eye_camera_indoor_preset(self) -> str:
+ data = {'sys_ec_preset': 'Indoor'}
+ return self.__post_request('/api/system/conf', data)
+
+ def set_eye_camera_outdoor_preset(self) -> str:
+ data = {'sys_ec_preset': 'ClearWeather'}
+ return self.__post_request('/api/system/conf', data)
+
+ def set_scene_camera_auto_preset(self):
+ data = {'sys_sc_preset': 'Auto'}
+ json_data = self.__post_request('/api/system/conf', data)
+
+ def set_scene_camera_gaze_preset(self):
+ data = {'sys_sc_preset': 'GazeBasedExposure'}
+ json_data = self.__post_request('/api/system/conf', data)
+
+ def set_scene_camera_freq_25(self):
+ data = {'sys_sc_fps': 25}
+ json_data = self.__post_request('/api/system/conf/', data)
+
+ def set_scene_camera_freq_50(self):
+ data = {'sys_sc_fps': 50}
+ json_data = self.__post_request('/api/system/conf/', data)
diff --git a/src/argaze/utils/eyetrackers/__init__.py b/src/argaze/utils/eyetrackers/__init__.py
new file mode 100644
index 0000000..b76cd8b
--- /dev/null
+++ b/src/argaze/utils/eyetrackers/__init__.py
@@ -0,0 +1,4 @@
+"""
+Collection of device interfaces.
+"""
+__all__ = ['TobiiProGlasses2']
\ No newline at end of file
diff --git a/src/argaze/utils/pipeline_run.py b/src/argaze/utils/pipeline_run.py
new file mode 100644
index 0000000..dc9ef53
--- /dev/null
+++ b/src/argaze/utils/pipeline_run.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+
+"""Load and execute eyetracker pipeline.
+
+This program is free software: you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation, either version 3 of the License, or (at your option) any later
+version.
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+You should have received a copy of the GNU General Public License along with
+this program. If not, see .
+"""
+
+__author__ = "Théo de la Hogue"
+__credits__ = []
+__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
+__license__ = "GPLv3"
+
+import argparse
+import logging
+import contextlib
+
+from argaze import DataFeatures, ArFeatures
+
+import cv2
+
+# Manage arguments
+parser = argparse.ArgumentParser(description=__doc__.split('-')[0])
+parser.add_argument('configuration', metavar='CONFIGURATION', type=str, help='JSON configuration filepath')
+parser.add_argument('-p', '--patch', metavar='PATCH', type=str, help='JSON configuration patch filepath')
+parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console')
+args = parser.parse_args()
+
+# Manage logging
+logging.basicConfig(format = '%(levelname)s: %(message)s', level = logging.DEBUG if args.verbose else logging.INFO)
+
+def main():
+
+ # Load ArGaze context
+ with DataFeatures.from_json(args.configuration, args.patch) as context:
+
+ if args.verbose:
+
+ print(context)
+
+ # Create a window to display context
+ cv2.namedWindow(context.name, cv2.WINDOW_AUTOSIZE)
+
+ # Waiting for 'ctrl+C' interruption
+ with contextlib.suppress(KeyboardInterrupt):
+
+ # Visualisation loop
+ while True:
+
+ # DEBUG
+ print("DISPLAY", context.name)
+
+ # Display context
+ cv2.imshow(context.name, context.image())
+
+ # Head-monted eye tracker case: display environment frames image
+ if issubclass(type(context.pipeline), ArFeatures.ArCamera):
+
+ for scene_frame in context.pipeline.scene_frames():
+
+ cv2.imshow(scene_frame.name, scene_frame.image())
+
+ # Key interaction
+ key_pressed = cv2.waitKey(10)
+
+ # Esc: close window
+ if key_pressed == 27:
+
+ raise KeyboardInterrupt()
+
+ # Stop frame display
+ cv2.destroyAllWindows()
+
+if __name__ == '__main__':
+
+ main()
\ No newline at end of file
diff --git a/src/argaze/utils/worn_device_stream.py b/src/argaze/utils/worn_device_stream.py
deleted file mode 100644
index 3925bbe..0000000
--- a/src/argaze/utils/worn_device_stream.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#!/usr/bin/env python
-
-"""Load ArUcoCamera from a configuration file then, stream and process gaze positions and image from any worn eye-tracker device.
-
-This program is free software: you can redistribute it and/or modify it under
-the terms of the GNU General Public License as published by the Free Software
-Foundation, either version 3 of the License, or (at your option) any later
-version.
-This program is distributed in the hope that it will be useful, but WITHOUT
-ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
-You should have received a copy of the GNU General Public License along with
-this program. If not, see .
-"""
-
-__author__ = "Théo de la Hogue"
-__credits__ = []
-__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
-__license__ = "GPLv3"
-
-import argparse
-import contextlib
-
-from argaze import GazeFeatures, DataFeatures
-from argaze.ArUcoMarkers import ArUcoCamera
-
-import cv2
-
-# Manage arguments
-parser = argparse.ArgumentParser(description=__doc__.split('-')[0])
-parser.add_argument('configuration', metavar='CONFIGURATION', type=str, help='configuration filepath')
-parser.add_argument('-p', '--patch', metavar='PATCH', type=str, help='configuration patch filepath')
-parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console')
-args = parser.parse_args()
-
-def main():
-
- # Load ArUcoCamera configuration
- with ArUcoCamera.ArUcoCamera.from_json(args.configuration, args.patch) as aruco_camera:
-
- if args.verbose:
-
- print(aruco_camera)
-
- # Gaze position processing
- def gaze_position_callback(timestamped_gaze_position: GazeFeatures.GazePosition):
-
- # Project gaze position into environment
- try:
-
- aruco_camera.look(timestamped_gaze_position)
-
- # Handle exceptions
- except Exception as e:
-
- print(e)
-
- # Attach gaze position callback to provider
- aruco_camera.provider.attach(gaze_position_callback)
-
- # Image processing
- def image_callback(timestamp: int|float, image):
-
- # Detect ArUco code and project ArScenes
- try:
-
- # Watch ArUco markers into image and estimate camera pose
- aruco_camera.watch(image, timestamp=timestamp)
-
- # Handle exceptions
- except Exception as e:
-
- print(e)
-
- # Attach image callback to provider
- aruco_camera.provider.attach(image_callback)
-
- # Waiting for 'ctrl+C' interruption
- with contextlib.suppress(KeyboardInterrupt):
-
- # Visualisation loop
- while True:
-
- # Display camera frame image
- image = aruco_camera.image()
-
- cv2.imshow(aruco_camera.name, image)
-
- # Display each scene frames image
- for scene_frame in aruco_camera.scene_frames():
-
- cv2.imshow(scene_frame.name, scene_frame.image())
-
- # Key interaction
- key_pressed = cv2.waitKey(10)
-
- # Esc: close window
- if key_pressed == 27:
-
- raise KeyboardInterrupt()
-
- # Stop frame display
- cv2.destroyAllWindows()
-
-if __name__ == '__main__':
-
- main()
\ No newline at end of file
--
cgit v1.1