diff options
Diffstat (limited to 'src/argaze/ArFeatures.py')
-rw-r--r-- | src/argaze/ArFeatures.py | 874 |
1 files changed, 516 insertions, 358 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 8c9b3c8..93a21ed 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -8,8 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple, Any, Iterator, Union -from types import ModuleType -from dataclasses import dataclass, field import json import os import sys @@ -95,61 +93,144 @@ DEFAULT_ARLAYER_DRAW_PARAMETERS = { } } -@dataclass class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed. !!! note - Inherits from DataFeatures.SharedObject class to be shared by multiple threads - - Parameters: - name: name of the layer - aoi_scene: AOI scene description - aoi_matcher: AOI matcher object - aoi_scan_path: AOI scan path object - aoi_scan_path_analyzers: dictionary of AOI scan path analyzers - draw_parameters: default parameters passed to draw method - logging_module: path to logging module file in working directory + Inherits from DataFeatures.SharedObject class to be shared by multiple threads. """ - name: str - aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene) - aoi_matcher: GazeFeatures.AOIMatcher = field(default_factory=GazeFeatures.AOIMatcher) - aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) - aoi_scan_path_analyzers: dict = field(default_factory=dict) - draw_parameters: dict = field(default_factory=DEFAULT_ARLAYER_DRAW_PARAMETERS) - logging_module: ModuleType = field(default=None) + def __init__(self, name: str = None, aoi_scene: AOIFeatures.AOIScene = None, aoi_matcher: GazeFeatures.AOIMatcher = None, aoi_scan_path: GazeFeatures.AOIScanPath = None, aoi_scan_path_analyzers: dict = None, draw_parameters: dict = None): + """ Initialize ArLayer - def __post_init__(self): + Parameters: + name: name of the layer + aoi_scene: AOI scene description + aoi_matcher: AOI matcher object + aoi_scan_path: AOI scan path object + aoi_scan_path_analyzers: dictionary of AOI scan path analyzers + draw_parameters: default parameters passed to draw method + """ - # Init sharedObject + # Init parent classes super().__init__() - # Define parent attribute: it will be setup by parent later - self.__parent = None - - # Init current gaze movement + # Init private attributes + self.__name = name + self.__aoi_scene = aoi_scene + self.__aoi_matcher = aoi_matcher + self.__aoi_scan_path = aoi_scan_path + self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers + self.__draw_parameters = draw_parameters + self.__parent = None # it will be setup by parent later self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() - - # Init current looked aoi name self.__looked_aoi_name = None - - # Init aoi scan path analyzed state self.__aoi_scan_path_analyzed = False + + ''' + # Register loggers from logging module as pipeline step observers + if self.logging_module is not None: + + self.__observers = importlib.import_module(self.logging_module).__loggers__ + + # DEBUG + print(f'Observers registered for {self.__name} layer:', self.__observers) + ''' # Cast aoi scene to its effective dimension - if self.aoi_scene.dimension == 2: + if self.__aoi_scene.dimension == 2: + + self.__aoi_scene = AOI2DScene.AOI2DScene(self.__aoi_scene) + + elif self.__aoi_scene.dimension == 3: + + self.__aoi_scene = AOI3DScene.AOI3DScene(self.__aoi_scene) + + @property + def name(self) -> str: + """Get layer's name.""" + return self.__name + + @property + def aoi_scene(self) -> AOIFeatures.AOIScene: + """Get layer's aoi scene object.""" + return self.__aoi_scene + + @aoi_scene.setter + def aoi_scene(self, aoi_scene: AOIFeatures.AOIScene): + """Set layer's aoi scene object.""" + self.__aoi_scene = aoi_scene + + @property + def aoi_matcher(self) -> GazeFeatures.AOIMatcher: + """Get layer's aoi matcher object.""" + return self.__aoi_matcher + + @property + def aoi_scan_path(self) -> GazeFeatures.AOIScanPath: + """Get layer's aoi scan path object.""" + return self.__aoi_scan_path + + @property + def aoi_scan_path_analyzers(self) -> dict: + """Get layer's aoi scan analyzers dictionary.""" + return self.__aoi_scan_path_analyzers + + @property + def draw_parameters(self): + """Get layer's draw parameters dictionary.""" + return self.__draw_parameters - self.aoi_scene = AOI2DScene.AOI2DScene(self.aoi_scene) + @property + def parent(self) -> object: + """Get layer's parent object.""" + return self.__parent - elif self.aoi_scene.dimension == 3: + @parent.setter + def parent(self, parent: object): + """Set layer's parent object.""" + self.__parent = parent - self.aoi_scene = AOI3DScene.AOI3DScene(self.aoi_scene) + @property + def looked_aoi_name(self) -> str: + """Get aoi matcher looked aoi name.""" + return self.__looked_aoi_name + + @property + def aoi_scan_path_analyzed(self) -> bool: + """Are aoi scan path analysis ready?""" + + return self.__aoi_scan_path_analyzed + + @property + def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]: + """Get aoi scan path analysis. + + Returns + iterator: analyzer module path, analysis dictionary + """ + assert(self.__aoi_scan_path_analyzed) + + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): + + yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis + + def as_dict(self) -> dict: + """Export ArLayer attributes as dictionary.""" + + return { + "name": self.__name, + "aoi_scene": self.__aoi_scene, + "aoi_matcher": self.__aoi_matcher, + "aoi_scan_path": self.__aoi_scan_path, + "aoi_scan_path_analyzers": self.__aoi_scan_path_analyzers, + "draw_parameters": self.__draw_parameters + } @classmethod def from_dict(self, layer_data: dict, working_directory: str = None) -> ArLayerType: - """Load attributes from dictionary. + """Load ArLayer attributes from dictionary. Parameters: layer_data: dictionary with attributes to load @@ -305,7 +386,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): except KeyError: new_layer_draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS - + ''' # Load logging module try: @@ -314,26 +395,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # str: relative path to file if type(new_logging_module_value) == str: - logging_module_name = new_logging_module_value.split('.')[0] + new_logging_module = new_logging_module_value.split('.')[0] - # Import logging module - self.logging_module = importlib.import_module(logging_module_name) - - # Register loggers as pipeline step observers - self.observers = self.logging_module.__loggers__ - except KeyError: - pass - + new_logging_module = None + ''' # Create layer - return ArLayer(new_layer_name, \ - new_aoi_scene, \ - new_aoi_matcher, \ - new_aoi_scan_path, \ - new_aoi_scan_path_analyzers, \ - new_layer_draw_parameters \ - ) + return ArLayer(new_layer_name, new_aoi_scene, new_aoi_matcher, new_aoi_scan_path, new_aoi_scan_path_analyzers, new_layer_draw_parameters) @classmethod def from_json(self, json_filepath: str) -> ArLayerType: @@ -351,43 +420,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return ArLayer.from_dict(layer_data, working_directory) - @property - def parent(self): - """Get parent instance""" - - return self.__parent - - @parent.setter - def parent(self, parent): - """Get parent instance""" - - self.__parent = parent - - @property - def looked_aoi_name(self) -> str: - """The name of looked aoi.""" - - return self.__looked_aoi_name - - @property - def aoi_scan_path_analyzed(self) -> bool: - """Are aoi scan path analysis ready?""" - - return self.__aoi_scan_path_analyzed - - def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]: - """Get aoi scan path analysis. - - Returns - iterator: analyzer module path, analysis dictionary - """ - - assert(self.__aoi_scan_path_analyzed) - - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): - - yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis - @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): """ @@ -402,7 +434,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ # Use layer locker feature - with self.locker: + with self._lock: # Update current gaze movement self.__gaze_movement = gaze_movement @@ -413,11 +445,11 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Reset aoi scan path analyzed state self.__aoi_scan_path_analyzed = False - if self.aoi_matcher is not None: + if self.__aoi_matcher is not None: # Update looked aoi thanks to aoi matcher # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally - self.__looked_aoi_name, _ = self.aoi_matcher.match(timestamp, self.aoi_scene, gaze_movement) + self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement) # Valid and finished gaze movement has been identified if gaze_movement.valid and gaze_movement.finished: @@ -425,17 +457,17 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if GazeFeatures.is_fixation(gaze_movement): # Append fixation to aoi scan path - if self.aoi_scan_path is not None and self.__looked_aoi_name is not None: + if self.__aoi_scan_path is not None and self.__looked_aoi_name is not None: - aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name) + aoi_scan_step = self.__aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name) # Is there a new step? - if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: + if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1: # Analyze aoi scan path - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): - aoi_scan_path_analyzer.analyze(timestamp, self.aoi_scan_path) + aoi_scan_path_analyzer.analyze(timestamp, self.__aoi_scan_path) # Update aoi scan path analyzed state self.__aoi_scan_path_analyzed = True @@ -443,9 +475,9 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): elif GazeFeatures.is_saccade(gaze_movement): # Append saccade to aoi scan path - if self.aoi_scan_path is not None: + if self.__aoi_scan_path is not None: - self.aoi_scan_path.append_saccade(timestamp, gaze_movement) + self.__aoi_scan_path.append_saccade(timestamp, gaze_movement) def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ @@ -459,20 +491,20 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Use draw_parameters attribute if no parameters if draw_aoi_scene is None and draw_aoi_matching is None: - return self.draw(image, **self.draw_parameters) + return self.draw(image, **self.__draw_parameters) # Use layer locker feature - with self.locker: + with self._lock: # Draw aoi if required if draw_aoi_scene is not None: - self.aoi_scene.draw(image, **draw_aoi_scene) + self.__aoi_scene.draw(image, **draw_aoi_scene) # Draw aoi matching if required - if draw_aoi_matching is not None and self.aoi_matcher is not None: + if draw_aoi_matching is not None and self.__aoi_matcher is not None: - self.aoi_matcher.draw(image, self.aoi_scene, **draw_aoi_matching) + self.__aoi_matcher.draw(image, self.__aoi_scene, **draw_aoi_matching) # Define default ArFrame image parameters DEFAULT_ARFRAME_IMAGE_PARAMETERS = { @@ -495,67 +527,186 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = { } } -@dataclass class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads - - Parameters: - name: name of the frame - size: defines the dimension of the rectangular area where gaze positions are projected - gaze_position_calibrator: gaze position calibration algoritm - gaze_movement_identifier: gaze movement identification algorithm - filter_in_progress_identification: ignore in progress gaze movement identification - scan_path: scan path object - scan_path_analyzers: dictionary of scan path analyzers - heatmap: heatmap object - background: picture to draw behind - layers: dictionary of AOI layers - image_parameters: default parameters passed to image method - logging_module: path to logging module file in working directory """ - name: str - size: tuple[int] = field(default=(1, 1)) - gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = field(default_factory=GazeFeatures.GazePositionCalibrator) - gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier) - filter_in_progress_identification: bool = field(default=True) - scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath) - scan_path_analyzers: dict = field(default_factory=dict) - heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap) - background: numpy.array = field(default_factory=lambda : numpy.array([])) - layers: dict = field(default_factory=dict) - image_parameters: dict = field(default_factory=DEFAULT_ARFRAME_IMAGE_PARAMETERS) - logging_module: ModuleType = field(default=None) - - def __post_init__(self): - - # Init sharedObject + def __init__(self, name: str = None, size: tuple[int] = (1, 1), gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = None, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = None, filter_in_progress_identification: bool = True, scan_path: GazeFeatures.ScanPath = None, scan_path_analyzers: dict = None, background: numpy.array = numpy.array([]), heatmap: AOIFeatures.Heatmap = None, layers: dict = None, image_parameters: dict = DEFAULT_ARFRAME_IMAGE_PARAMETERS): + """ Initialize ArFrame + + Parameters: + name: name of the frame + size: defines the dimension of the rectangular area where gaze positions are projected + gaze_position_calibrator: gaze position calibration algoritm + gaze_movement_identifier: gaze movement identification algorithm + filter_in_progress_identification: ignore in progress gaze movement identification + scan_path: scan path object + scan_path_analyzers: dictionary of scan path analyzers + background: picture to draw behind + heatmap: heatmap object + layers: dictionary of AOI layers + image_parameters: default parameters passed to image method + """ + + # DEBUG + print(f'ArFrame.__init__ {name} {layers}') + + # Init parent classes super().__init__() - # Define parent attribute: it will be setup by parent later - self.__parent = None + # Init private attributes + self.__name = name + self.__size = size + self.__gaze_position_calibrator = gaze_position_calibrator + self.__gaze_movement_identifier = gaze_movement_identifier + self.__filter_in_progress_identification = filter_in_progress_identification + self.__scan_path = scan_path + self.__scan_path_analyzers = scan_path_analyzers + self.__background = background + self.__heatmap = heatmap + self.__layers = layers + self.__image_parameters = image_parameters + self.__parent = None # it will be setup by parent later + self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() + self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + self.__scan_path_analyzed = False # Setup layers parent attribute - for name, layer in self.layers.items(): + for name, layer in self.__layers.items(): layer.parent = self + ''' + # Import logging module __loggers__ variable as pipeline step observers + if self.logging_module is not None: - # Init current gaze position - self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() + self.__observers = importlib.import_module(self.logging_module).__loggers__ - # Init current gaze movement - self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + # DEBUG + print(f'Observers registered for {self.__name} frame:', self.__observers) + ''' - # Init scan path analyzed state - self.__scan_path_analyzed = False + @property + def name(self) -> str: + """Get frame's name.""" + return self.__name + + @property + def size(self) -> tuple[int]: + """Get frame's size.""" + return self.__size + + @property + def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator: + """Get frame's gaze position calibrator object.""" + return self.__gaze_position_calibrator + + @property + def gaze_movement_identifier(self) -> GazeFeatures.GazeMovementIdentifier: + """Get frame's gaze movement identifier object.""" + return self.__gaze_movement_identifier + + @property + def filter_in_progress_indentification(self) -> bool: + """Is frame filtering in progress identification?""" + return self.__filter_in_progress_indentification + + @property + def scan_path(self) -> GazeFeatures.ScanPath: + """Get frame's scan path object.""" + return self.__scan_path + + @property + def scan_path_analyzers(self) -> dict: + """Get frame's scan path analyzers dictionary.""" + return self.__scan_path_analyzers + + @property + def background(self) -> numpy.array: + """Get frame's background matrix.""" + return self.__background + + @background.setter + def background(self, image: numpy.array): + """Set frame's background matrix.""" + self.__background = image + + @property + def heatmap(self) -> AOIFeatures.Heatmap: + """Get frame's heatmap object.""" + return self.__heatmap + + @property + def layers(self) -> dict: + """Get frame's layers dictionary.""" + return self.__layers + + @property + def image_parameters(self) -> dict: + """Get frame's image parameters dictionary.""" + return self.__image_parameters + + @property + def parent(self) -> object: + """Get frame's parent object.""" + return self.__parent + + @parent.setter + def parent(self, parent: object): + """Set frame's parent object.""" + self.__parent = parent + + @property + def gaze_position(self) -> object: + """Get current calibrated gaze position""" + return self.__calibrated_gaze_position + + @property + def gaze_movement(self) -> object: + """Get current identified gaze movement""" + return self.__identified_gaze_movement + + @property + def scan_path_analyzed(self) -> bool: + """Are scan path analysis ready?""" + return self.__scan_path_analyzed + + @property + def scan_path_analysis(self) -> Iterator[Union[str, dict]]: + """Get scan path analysis. + + Returns + iterator: analyzer module path, analysis dictionary + """ + assert(self.__scan_path_analyzed) + + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): + + yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis + + def as_dict(self) -> dict: + """Export ArFrame attributes as dictionary.""" + + return { + "name": self.__name, + "size": self.__size, + "gaze_position_calibrator": self.__gaze_position_calibrator, + "gaze_movement_identifier": self.__gaze_movement_identifier, + "filter_in_progress_identification": self.__filter_in_progress_identification, + "scan_path": self.__scan_path, + "scan_path_analyzers": self.__scan_path_analyzers, + "background": self.__background, + "heatmap": self.__heatmap, + "layers": self.__layers, + "image_parameters": self.__image_parameters + } @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: - """Load attributes from dictionary. + """Load ArFrame attributes from dictionary. Parameters: frame_data: dictionary with attributes to load @@ -697,6 +848,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): pass + # Load background image + try: + + new_frame_background_value = frame_data.pop('background') + new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) + new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC) + + except KeyError: + + new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8) + # Load heatmap try: @@ -714,17 +876,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): new_heatmap_data = {} new_heatmap = None - # Load background image - try: - - new_frame_background_value = frame_data.pop('background') - new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) - new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC) - - except KeyError: - - new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8) - # Load layers new_layers = {} @@ -762,31 +913,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # str: relative path to file if type(new_logging_module_value) == str: - logging_module_name = new_logging_module_value.split('.')[0] + new_logging_module = new_logging_module_value.split('.')[0] - # Import logging module - self.logging_module = importlib.import_module(logging_module_name) - - # Register loggers as pipeline step observers - self.observers = self.logging_module.__loggers__ - except KeyError: - pass + new_logging_module = None + + # DEBUG + print('Create frame', new_frame_name) # Create frame - return ArFrame(new_frame_name, \ - new_frame_size, \ - new_gaze_position_calibrator, \ - new_gaze_movement_identifier, \ - filter_in_progress_identification, \ - new_scan_path, \ - new_scan_path_analyzers, \ - new_heatmap, \ - new_frame_background, \ - new_layers, \ - new_frame_image_parameters \ - ) + return ArFrame(new_frame_name, new_frame_size, new_gaze_position_calibrator, new_gaze_movement_identifier, filter_in_progress_identification, new_scan_path, new_scan_path_analyzers, new_frame_background, new_heatmap, new_layers, new_frame_image_parameters) @classmethod def from_json(self, json_filepath: str) -> ArFrameType: @@ -804,49 +941,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return ArFrame.from_dict(frame_data, working_directory) - @property - def parent(self) -> object: - """Get parent instance""" - - return self.__parent - - @parent.setter - def parent(self, parent: object): - """Set parent instance""" - - self.__parent = parent - - @property - def gaze_position(self) -> object: - """Get current calibrated gaze position""" - - return self.__calibrated_gaze_position - - @property - def gaze_movement(self) -> object: - """Get current identified gaze movement""" - - return self.__identified_gaze_movement - - @property - def scan_path_analyzed(self) -> bool: - """Are scan path analysis ready?""" - - return self.__scan_path_analyzed - - def scan_path_analysis(self) -> Iterator[Union[str, dict]]: - """Get scan path analysis. - - Returns - iterator: analyzer module path, analysis dictionary - """ - - assert(self.__scan_path_analyzed) - - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): - - yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis - @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]: """ @@ -861,7 +955,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ # Use frame locker feature - with self.locker: + with self._lock: # No gaze movement identified by default self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() @@ -870,9 +964,9 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__scan_path_analyzed = False # Apply gaze position calibration - if self.gaze_position_calibrator is not None: + if self.__gaze_position_calibrator is not None: - self.__calibrated_gaze_position = self.gaze_position_calibrator.apply(gaze_position) + self.__calibrated_gaze_position = self.__gaze_position_calibrator.apply(gaze_position) # Or update gaze position at least else: @@ -880,10 +974,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__calibrated_gaze_position = gaze_position # Identify gaze movement - if self.gaze_movement_identifier is not None: + if self.__gaze_movement_identifier is not None: # Identify finished gaze movement - self.__identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) + self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished: @@ -891,45 +985,45 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if GazeFeatures.is_fixation(self.__identified_gaze_movement): # Append fixation to scan path - if self.scan_path is not None: + if self.__scan_path is not None: - self.scan_path.append_fixation(timestamp, self.__identified_gaze_movement) + self.__scan_path.append_fixation(timestamp, self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): # Append saccade to scan path - if self.scan_path is not None: + if self.__scan_path is not None: - scan_step = self.scan_path.append_saccade(timestamp, self.__identified_gaze_movement) + scan_step = self.__scan_path.append_saccade(timestamp, self.__identified_gaze_movement) # Is there a new step? - if scan_step and len(self.scan_path) > 1: + if scan_step and len(self.__scan_path) > 1: # Analyze aoi scan path - for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): + for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items(): - scan_path_analyzer.analyze(timestamp, self.scan_path) + scan_path_analyzer.analyze(timestamp, self.__scan_path) # Update scan path analyzed state self.__scan_path_analyzed = True # No valid finished gaze movement: optionnaly stop in progress identification filtering - elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification: + elif self.__gaze_movement_identifier is not None and not self.__filter_in_progress_identification: - self.__identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement + self.__identified_gaze_movement = self.__gaze_movement_identifier.current_gaze_movement # Update heatmap - if self.heatmap is not None: + if self.__heatmap is not None: # Scale gaze position value - scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]]) + scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image - self.heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale) + self.__heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale) # Look layers with valid identified gaze movement # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally - for layer_name, layer in self.layers.items(): + for layer_name, layer in self.__layers.items(): layer.look(timestamp, self.__identified_gaze_movement) @@ -949,56 +1043,56 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ # Use frame locker feature - with self.locker: + with self._lock: # Draw background only - if background_weight is not None and (heatmap_weight is None or self.heatmap is None): + if background_weight is not None and (heatmap_weight is None or self.__heatmap is None): - image = self.background.copy() + image = self.__background.copy() # Draw mix background and heatmap if required - elif background_weight is not None and heatmap_weight is not None and self.heatmap: + elif background_weight is not None and heatmap_weight is not None and self.__heatmap: - background_image = self.background.copy() - heatmap_image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) + background_image = self.__background.copy() + heatmap_image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR) image = cv2.addWeighted(heatmap_image, heatmap_weight, background_image, background_weight, 0) # Draw heatmap only - elif background_weight is None and heatmap_weight is not None and self.heatmap: + elif background_weight is None and heatmap_weight is not None and self.__heatmap: - image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) + image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR) # Draw black image else: - image = numpy.full((self.size[1], self.size[0], 3), 0).astype(numpy.uint8) + image = numpy.full((self.__size[1], self.__size[0], 3), 0).astype(numpy.uint8) # Draw gaze position calibrator if draw_gaze_position_calibrator is not None: - self.gaze_position_calibrator.draw(image, size=self.size, **draw_gaze_position_calibrator) + self.__gaze_position_calibrator.draw(image, size=self.__size, **draw_gaze_position_calibrator) # Draw scan path if required - if draw_scan_path is not None and self.scan_path is not None: + if draw_scan_path is not None and self.__scan_path is not None: - self.scan_path.draw(image, **draw_scan_path) + self.__scan_path.draw(image, **draw_scan_path) # Draw current fixation if required - if draw_fixations is not None and self.gaze_movement_identifier is not None: + if draw_fixations is not None and self.__gaze_movement_identifier is not None: - self.gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) + self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) # Draw current saccade if required - if draw_saccades is not None and self.gaze_movement_identifier is not None: + if draw_saccades is not None and self.__gaze_movement_identifier is not None: - self.gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) + self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) # Draw layers if required if draw_layers is not None: for layer_name, draw_layer in draw_layers.items(): - self.layers[layer_name].draw(image, **draw_layer) + self.__layers[layer_name].draw(image, **draw_layer) # Draw current gaze position if required if draw_gaze_positions is not None: @@ -1019,77 +1113,105 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return self.__image(**kwargs) - return self.__image(**self.image_parameters) + return self.__image(**self.__image_parameters) -@dataclass -class ArScene(): +class ArScene(DataFeatures.PipelineStepObject): """ Define abstract Augmented Reality scene with ArLayers and ArFrames inside. - - Parameters: - name: name of the scene - layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. - frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. - angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. - distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. """ - name: str - layers: dict = field(default_factory=dict) - frames: dict = field(default_factory=dict) - angle_tolerance: float = field(default=0.) - distance_tolerance: float = field(default=0.) + + def __init__(self, name: str = None, layers: dict = None, frames: dict = None, angle_tolerance: float = 0., distance_tolerance: float = 0.): + """ Initialize ArScene - def __post_init__(self): + Parameters: + name: name of the scene + layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. + frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. + angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. + distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. + """ - # Define parent attribute: it will be setup by parent object later - self.__parent = None + # Init parent classes + super().__init__() + + # Init private attributes + self.__name = name + self.__layers = layers + self.__frames = frames + self.__angle_tolerance = angle_tolerance + self.__distance_tolerance = distance_tolerance + self.__parent = None # it will be setup by parent later # Setup layer parent attribute - for name, layer in self.layers.items(): + for name, layer in self.__layers.items(): layer.parent = self # Setup frame parent attribute - for name, frame in self.frames.items(): + for name, frame in self.__frames.items(): frame.parent = self - def __str__(self) -> str: - """ - Returns: - String representation - """ - - output = f'parent:\n{self.parent.name}\n' - - if len(self.layers): - output += f'ArLayers:\n' - for name, layer in self.layers.items(): - output += f'{name}:\n{layer}\n' - - if len(self.frames): - output += f'ArFrames:\n' - for name, frame in self.frames.items(): - output += f'{name}:\n{frame}\n' - - return output - @property - def parent(self): - """Get parent instance""" - + def name(self) -> str: + """Get scene's name.""" + return self.__name + + @property + def layers(self) -> dict: + """Get scene's layers dictionary.""" + return self.__layers + + @property + def frames(self) -> dict: + """Get scene's frames dictionary.""" + return self.__frames + + @property + def angle_tolerance(self) -> float: + """Get scene's angle tolerance.""" + return self.__angle_tolerance + + @angle_tolerance.setter + def angle_tolerance(self, value: float): + """Set scene's angle tolerance.""" + self.__angle_tolerance = value + + @property + def distance_tolerance(self) -> float: + """Get scene's distance tolerance.""" + return self.__distance_tolerance + + @distance_tolerance.setter + def distance_tolerance(self, value: float): + """Set scene's distance tolerance.""" + self.__distance_tolerance = value + + @property + def parent(self) -> object: + """Get frame's parent object.""" return self.__parent @parent.setter - def parent(self, parent): - """Get parent instance""" - + def parent(self, parent: object): + """Set frame's parent object.""" self.__parent = parent + def as_dict(self) -> dict: + """Export ArScene attributes as dictionary.""" + + return { + "name": self.__name, + "layers": self.__layers, + "frames": self.__frames, + "angle_tolerance": self.__angle_tolerance, + "distance_tolerance": self.__distance_tolerance + } + @classmethod def from_dict(self, scene_data: dict, working_directory: str = None) -> ArSceneType: """ - Load ArScene from dictionary. + Load ArScene attributes from dictionary. Parameters: scene_data: dictionary @@ -1205,7 +1327,27 @@ class ArScene(): pass return ArScene(new_scene_name, new_layers, new_frames, **scene_data) - + + def __str__(self) -> str: + """ + Returns: + String representation + """ + + output = f'parent:\n{self.__parent.name}\n' + + if len(self.__layers): + output += f'ArLayers:\n' + for name, layer in self.__layers.items(): + output += f'{name}:\n{layer}\n' + + if len(self.__frames): + output += f'ArFrames:\n' + for name, frame in self.__frames.items(): + output += f'{name}:\n{frame}\n' + + return output + def estimate_pose(self, detected_features: any) -> Tuple[numpy.array, numpy.array]: """Define abstract estimate scene pose method. @@ -1232,7 +1374,7 @@ class ArScene(): iterator: name of projected layer and AOI2DScene projection """ - for name, layer in self.layers.items(): + for name, layer in self.__layers.items(): # Clip AOI out of the visual horizontal field of view (optional) # TODO: use HFOV and VFOV and don't use vision_cone method @@ -1255,7 +1397,7 @@ class ArScene(): aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene - yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) + yield name, aoi_scene_copy.project(tvec, rvec, self.__parent.aruco_detector.optic_parameters.K) def draw(self, image: numpy.array, **kwargs: dict): """ @@ -1267,28 +1409,33 @@ class ArScene(): raise NotImplementedError('draw() method not implemented') -@dataclass class ArCamera(ArFrame): """ Define abstract Augmented Reality camera as ArFrame with ArScenes inside. - - Parameters: - scenes: all scenes to project into camera frame - visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV). - visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV). """ - scenes: dict = field(default_factory=dict) - visual_hfov: float = field(default=0.) - visual_vfov: float = field(default=0.) + def __init__(self, scenes: dict = None, visual_hfov: float = 0., visual_vfov: float = 0., **kwargs): + """ Initialize ArCamera + + Parameters: + scenes: all scenes to project into camera frame + visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV). + visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV). + """ + + # DEBUG + print('ArCamera.__init__ kwargs', kwargs) - def __post_init__(self): + # Init parent class + super().__init__(**kwargs) - # Init ArFrame - super().__post_init__() + # Init private attributes + self.__scenes = scenes + self.__visual_hfov = visual_hfov + self.__visual_vfov = visual_vfov # Setup scenes parent attribute - for name, scene in self.scenes.items(): + for name, scene in self.__scenes.items(): scene.parent = self @@ -1301,7 +1448,7 @@ class ArCamera(ArFrame): expected_aoi_list = [] exclude_aoi_list = [] - for scene_name, scene in self.scenes.items(): + for scene_name, scene in self.__scenes.items(): # Append scene layer aoi to corresponding expected camera layer aoi try: @@ -1329,55 +1476,66 @@ class ArCamera(ArFrame): layer.aoi_scan_path.expected_aoi = expected_aoi_list layer.aoi_matcher.exclude = exclude_aoi_list - - def __str__(self) -> str: - """ - Returns: - String representation - """ - - output = f'Name:\n{self.name}\n' - for name, scene in self.scenes.items(): - output += f'\"{name}\" {type(scene)}:\n{scene}\n' - - return output - - @classmethod - def from_dict(self, camera_data: dict, working_directory: str = None) -> ArCameraType: - """ - Load ArCamera from dictionary. - - Parameters: - camera_data: dictionary - working_directory: folder path where to load files when a dictionary value is a relative filepath. - """ - - raise NotImplementedError('from_dict() method not implemented') - - @classmethod - def from_json(self, json_filepath: str) -> ArCameraType: - """ - Load ArCamera from .json file. - - Parameters: - json_filepath: path to json file - """ - - raise NotImplementedError('from_json() method not implemented') + @property + def scenes(self) -> dict: + """Get camera's scenes dictionary.""" + return self.__scenes @property + def visual_hfov(self) -> float: + """Get camera's visual horizontal field of view.""" + return self.__visual_hfov + + @visual_hfov.setter + def visual_hfov(self, value: float): + """Set camera's visual horizontal field of view.""" + self.__visual_hfov = value + + @property + def visual_vfov(self) -> float: + """Get camera's visual vertical field of view.""" + return self.__visual_vfov + + @visual_vfov.setter + def visual_vfov(self, value: float): + """Set camera's visual vertical field of view.""" + self.__visual_vfov = value + + @property def scene_frames(self) -> Iterator[ArFrame]: """Iterate over all scenes frames""" # For each scene - for scene_name, scene in self.scenes.items(): + for scene_name, scene in self.__scenes.items(): # For each scene frame for name, scene_frame in scene.frames.items(): yield scene_frame + def as_dict(self) -> dict: + """Export ArCamera attributes as dictionary.""" + + return { + "scenes": self.__scenes, + "visual_hfov": self.__visual_hfov, + "visual_vfov": self.__visual_vfov + } + + def __str__(self) -> str: + """ + Returns: + String representation + """ + + output = f'Name:\n{self.__name}\n' + + for name, scene in self.__scenes.items(): + output += f'\"{name}\" {type(scene)}:\n{scene}\n' + + return output + @DataFeatures.PipelineStepMethod def watch(self, timestamp: int|float, image: numpy.array): """Detect AR features from image and project scenes into camera frame. @@ -1405,7 +1563,7 @@ class ArCamera(ArFrame): super().look(timestamp, gaze_position) # Use camera frame locker feature - with self.locker: + with self._lock: # Project gaze position into each scene frames if possible for scene_frame in self.scene_frames: @@ -1441,13 +1599,13 @@ class ArCamera(ArFrame): """ # Use camera frame locker feature - with self.locker: + with self._lock: # Project camera frame background into each scene frame if possible for frame in self.scene_frames: # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? - for camera_layer_name, camera_layer in self.layers.items(): + for camera_layer_name, camera_layer in self.__layers.items(): try: @@ -1457,7 +1615,7 @@ class ArCamera(ArFrame): width, height = frame.size destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) - frame.background = cv2.warpPerspective(self.background, mapping, (width, height)) + frame.background = cv2.warpPerspective(self.__background, mapping, (width, height)) # Ignore missing frame projection except KeyError: |