From 5cb362ba53ec9543e9aab9b6de25d3fa27b175c1 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 25 Jan 2024 20:55:53 +0100 Subject: ArFeatures classes are not dataclass anymore. --- src/argaze/ArFeatures.py | 874 +++++++++++++++++------------ src/argaze/ArUcoMarkers/ArUcoCamera.py | 75 ++- src/argaze/ArUcoMarkers/ArUcoDetector.py | 64 ++- src/argaze/ArUcoMarkers/ArUcoScene.py | 62 +- src/argaze/DataFeatures.py | 103 ++-- src/argaze/utils/demo_aruco_markers_run.py | 8 +- 6 files changed, 716 insertions(+), 470 deletions(-) diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 8c9b3c8..93a21ed 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -8,8 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple, Any, Iterator, Union -from types import ModuleType -from dataclasses import dataclass, field import json import os import sys @@ -95,61 +93,144 @@ DEFAULT_ARLAYER_DRAW_PARAMETERS = { } } -@dataclass class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed. !!! note - Inherits from DataFeatures.SharedObject class to be shared by multiple threads - - Parameters: - name: name of the layer - aoi_scene: AOI scene description - aoi_matcher: AOI matcher object - aoi_scan_path: AOI scan path object - aoi_scan_path_analyzers: dictionary of AOI scan path analyzers - draw_parameters: default parameters passed to draw method - logging_module: path to logging module file in working directory + Inherits from DataFeatures.SharedObject class to be shared by multiple threads. """ - name: str - aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene) - aoi_matcher: GazeFeatures.AOIMatcher = field(default_factory=GazeFeatures.AOIMatcher) - aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) - aoi_scan_path_analyzers: dict = field(default_factory=dict) - draw_parameters: dict = field(default_factory=DEFAULT_ARLAYER_DRAW_PARAMETERS) - logging_module: ModuleType = field(default=None) + def __init__(self, name: str = None, aoi_scene: AOIFeatures.AOIScene = None, aoi_matcher: GazeFeatures.AOIMatcher = None, aoi_scan_path: GazeFeatures.AOIScanPath = None, aoi_scan_path_analyzers: dict = None, draw_parameters: dict = None): + """ Initialize ArLayer - def __post_init__(self): + Parameters: + name: name of the layer + aoi_scene: AOI scene description + aoi_matcher: AOI matcher object + aoi_scan_path: AOI scan path object + aoi_scan_path_analyzers: dictionary of AOI scan path analyzers + draw_parameters: default parameters passed to draw method + """ - # Init sharedObject + # Init parent classes super().__init__() - # Define parent attribute: it will be setup by parent later - self.__parent = None - - # Init current gaze movement + # Init private attributes + self.__name = name + self.__aoi_scene = aoi_scene + self.__aoi_matcher = aoi_matcher + self.__aoi_scan_path = aoi_scan_path + self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers + self.__draw_parameters = draw_parameters + self.__parent = None # it will be setup by parent later self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() - - # Init current looked aoi name self.__looked_aoi_name = None - - # Init aoi scan path analyzed state self.__aoi_scan_path_analyzed = False + + ''' + # Register loggers from logging module as pipeline step observers + if self.logging_module is not None: + + self.__observers = importlib.import_module(self.logging_module).__loggers__ + + # DEBUG + print(f'Observers registered for {self.__name} layer:', self.__observers) + ''' # Cast aoi scene to its effective dimension - if self.aoi_scene.dimension == 2: + if self.__aoi_scene.dimension == 2: + + self.__aoi_scene = AOI2DScene.AOI2DScene(self.__aoi_scene) + + elif self.__aoi_scene.dimension == 3: + + self.__aoi_scene = AOI3DScene.AOI3DScene(self.__aoi_scene) + + @property + def name(self) -> str: + """Get layer's name.""" + return self.__name + + @property + def aoi_scene(self) -> AOIFeatures.AOIScene: + """Get layer's aoi scene object.""" + return self.__aoi_scene + + @aoi_scene.setter + def aoi_scene(self, aoi_scene: AOIFeatures.AOIScene): + """Set layer's aoi scene object.""" + self.__aoi_scene = aoi_scene + + @property + def aoi_matcher(self) -> GazeFeatures.AOIMatcher: + """Get layer's aoi matcher object.""" + return self.__aoi_matcher + + @property + def aoi_scan_path(self) -> GazeFeatures.AOIScanPath: + """Get layer's aoi scan path object.""" + return self.__aoi_scan_path + + @property + def aoi_scan_path_analyzers(self) -> dict: + """Get layer's aoi scan analyzers dictionary.""" + return self.__aoi_scan_path_analyzers + + @property + def draw_parameters(self): + """Get layer's draw parameters dictionary.""" + return self.__draw_parameters - self.aoi_scene = AOI2DScene.AOI2DScene(self.aoi_scene) + @property + def parent(self) -> object: + """Get layer's parent object.""" + return self.__parent - elif self.aoi_scene.dimension == 3: + @parent.setter + def parent(self, parent: object): + """Set layer's parent object.""" + self.__parent = parent - self.aoi_scene = AOI3DScene.AOI3DScene(self.aoi_scene) + @property + def looked_aoi_name(self) -> str: + """Get aoi matcher looked aoi name.""" + return self.__looked_aoi_name + + @property + def aoi_scan_path_analyzed(self) -> bool: + """Are aoi scan path analysis ready?""" + + return self.__aoi_scan_path_analyzed + + @property + def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]: + """Get aoi scan path analysis. + + Returns + iterator: analyzer module path, analysis dictionary + """ + assert(self.__aoi_scan_path_analyzed) + + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): + + yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis + + def as_dict(self) -> dict: + """Export ArLayer attributes as dictionary.""" + + return { + "name": self.__name, + "aoi_scene": self.__aoi_scene, + "aoi_matcher": self.__aoi_matcher, + "aoi_scan_path": self.__aoi_scan_path, + "aoi_scan_path_analyzers": self.__aoi_scan_path_analyzers, + "draw_parameters": self.__draw_parameters + } @classmethod def from_dict(self, layer_data: dict, working_directory: str = None) -> ArLayerType: - """Load attributes from dictionary. + """Load ArLayer attributes from dictionary. Parameters: layer_data: dictionary with attributes to load @@ -305,7 +386,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): except KeyError: new_layer_draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS - + ''' # Load logging module try: @@ -314,26 +395,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # str: relative path to file if type(new_logging_module_value) == str: - logging_module_name = new_logging_module_value.split('.')[0] + new_logging_module = new_logging_module_value.split('.')[0] - # Import logging module - self.logging_module = importlib.import_module(logging_module_name) - - # Register loggers as pipeline step observers - self.observers = self.logging_module.__loggers__ - except KeyError: - pass - + new_logging_module = None + ''' # Create layer - return ArLayer(new_layer_name, \ - new_aoi_scene, \ - new_aoi_matcher, \ - new_aoi_scan_path, \ - new_aoi_scan_path_analyzers, \ - new_layer_draw_parameters \ - ) + return ArLayer(new_layer_name, new_aoi_scene, new_aoi_matcher, new_aoi_scan_path, new_aoi_scan_path_analyzers, new_layer_draw_parameters) @classmethod def from_json(self, json_filepath: str) -> ArLayerType: @@ -351,43 +420,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return ArLayer.from_dict(layer_data, working_directory) - @property - def parent(self): - """Get parent instance""" - - return self.__parent - - @parent.setter - def parent(self, parent): - """Get parent instance""" - - self.__parent = parent - - @property - def looked_aoi_name(self) -> str: - """The name of looked aoi.""" - - return self.__looked_aoi_name - - @property - def aoi_scan_path_analyzed(self) -> bool: - """Are aoi scan path analysis ready?""" - - return self.__aoi_scan_path_analyzed - - def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]: - """Get aoi scan path analysis. - - Returns - iterator: analyzer module path, analysis dictionary - """ - - assert(self.__aoi_scan_path_analyzed) - - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): - - yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis - @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): """ @@ -402,7 +434,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ # Use layer locker feature - with self.locker: + with self._lock: # Update current gaze movement self.__gaze_movement = gaze_movement @@ -413,11 +445,11 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Reset aoi scan path analyzed state self.__aoi_scan_path_analyzed = False - if self.aoi_matcher is not None: + if self.__aoi_matcher is not None: # Update looked aoi thanks to aoi matcher # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally - self.__looked_aoi_name, _ = self.aoi_matcher.match(timestamp, self.aoi_scene, gaze_movement) + self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement) # Valid and finished gaze movement has been identified if gaze_movement.valid and gaze_movement.finished: @@ -425,17 +457,17 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if GazeFeatures.is_fixation(gaze_movement): # Append fixation to aoi scan path - if self.aoi_scan_path is not None and self.__looked_aoi_name is not None: + if self.__aoi_scan_path is not None and self.__looked_aoi_name is not None: - aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name) + aoi_scan_step = self.__aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name) # Is there a new step? - if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: + if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1: # Analyze aoi scan path - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): - aoi_scan_path_analyzer.analyze(timestamp, self.aoi_scan_path) + aoi_scan_path_analyzer.analyze(timestamp, self.__aoi_scan_path) # Update aoi scan path analyzed state self.__aoi_scan_path_analyzed = True @@ -443,9 +475,9 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): elif GazeFeatures.is_saccade(gaze_movement): # Append saccade to aoi scan path - if self.aoi_scan_path is not None: + if self.__aoi_scan_path is not None: - self.aoi_scan_path.append_saccade(timestamp, gaze_movement) + self.__aoi_scan_path.append_saccade(timestamp, gaze_movement) def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ @@ -459,20 +491,20 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Use draw_parameters attribute if no parameters if draw_aoi_scene is None and draw_aoi_matching is None: - return self.draw(image, **self.draw_parameters) + return self.draw(image, **self.__draw_parameters) # Use layer locker feature - with self.locker: + with self._lock: # Draw aoi if required if draw_aoi_scene is not None: - self.aoi_scene.draw(image, **draw_aoi_scene) + self.__aoi_scene.draw(image, **draw_aoi_scene) # Draw aoi matching if required - if draw_aoi_matching is not None and self.aoi_matcher is not None: + if draw_aoi_matching is not None and self.__aoi_matcher is not None: - self.aoi_matcher.draw(image, self.aoi_scene, **draw_aoi_matching) + self.__aoi_matcher.draw(image, self.__aoi_scene, **draw_aoi_matching) # Define default ArFrame image parameters DEFAULT_ARFRAME_IMAGE_PARAMETERS = { @@ -495,67 +527,186 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = { } } -@dataclass class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads - - Parameters: - name: name of the frame - size: defines the dimension of the rectangular area where gaze positions are projected - gaze_position_calibrator: gaze position calibration algoritm - gaze_movement_identifier: gaze movement identification algorithm - filter_in_progress_identification: ignore in progress gaze movement identification - scan_path: scan path object - scan_path_analyzers: dictionary of scan path analyzers - heatmap: heatmap object - background: picture to draw behind - layers: dictionary of AOI layers - image_parameters: default parameters passed to image method - logging_module: path to logging module file in working directory """ - name: str - size: tuple[int] = field(default=(1, 1)) - gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = field(default_factory=GazeFeatures.GazePositionCalibrator) - gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier) - filter_in_progress_identification: bool = field(default=True) - scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath) - scan_path_analyzers: dict = field(default_factory=dict) - heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap) - background: numpy.array = field(default_factory=lambda : numpy.array([])) - layers: dict = field(default_factory=dict) - image_parameters: dict = field(default_factory=DEFAULT_ARFRAME_IMAGE_PARAMETERS) - logging_module: ModuleType = field(default=None) - - def __post_init__(self): - - # Init sharedObject + def __init__(self, name: str = None, size: tuple[int] = (1, 1), gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = None, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = None, filter_in_progress_identification: bool = True, scan_path: GazeFeatures.ScanPath = None, scan_path_analyzers: dict = None, background: numpy.array = numpy.array([]), heatmap: AOIFeatures.Heatmap = None, layers: dict = None, image_parameters: dict = DEFAULT_ARFRAME_IMAGE_PARAMETERS): + """ Initialize ArFrame + + Parameters: + name: name of the frame + size: defines the dimension of the rectangular area where gaze positions are projected + gaze_position_calibrator: gaze position calibration algoritm + gaze_movement_identifier: gaze movement identification algorithm + filter_in_progress_identification: ignore in progress gaze movement identification + scan_path: scan path object + scan_path_analyzers: dictionary of scan path analyzers + background: picture to draw behind + heatmap: heatmap object + layers: dictionary of AOI layers + image_parameters: default parameters passed to image method + """ + + # DEBUG + print(f'ArFrame.__init__ {name} {layers}') + + # Init parent classes super().__init__() - # Define parent attribute: it will be setup by parent later - self.__parent = None + # Init private attributes + self.__name = name + self.__size = size + self.__gaze_position_calibrator = gaze_position_calibrator + self.__gaze_movement_identifier = gaze_movement_identifier + self.__filter_in_progress_identification = filter_in_progress_identification + self.__scan_path = scan_path + self.__scan_path_analyzers = scan_path_analyzers + self.__background = background + self.__heatmap = heatmap + self.__layers = layers + self.__image_parameters = image_parameters + self.__parent = None # it will be setup by parent later + self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() + self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + self.__scan_path_analyzed = False # Setup layers parent attribute - for name, layer in self.layers.items(): + for name, layer in self.__layers.items(): layer.parent = self + ''' + # Import logging module __loggers__ variable as pipeline step observers + if self.logging_module is not None: - # Init current gaze position - self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() + self.__observers = importlib.import_module(self.logging_module).__loggers__ - # Init current gaze movement - self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + # DEBUG + print(f'Observers registered for {self.__name} frame:', self.__observers) + ''' - # Init scan path analyzed state - self.__scan_path_analyzed = False + @property + def name(self) -> str: + """Get frame's name.""" + return self.__name + + @property + def size(self) -> tuple[int]: + """Get frame's size.""" + return self.__size + + @property + def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator: + """Get frame's gaze position calibrator object.""" + return self.__gaze_position_calibrator + + @property + def gaze_movement_identifier(self) -> GazeFeatures.GazeMovementIdentifier: + """Get frame's gaze movement identifier object.""" + return self.__gaze_movement_identifier + + @property + def filter_in_progress_indentification(self) -> bool: + """Is frame filtering in progress identification?""" + return self.__filter_in_progress_indentification + + @property + def scan_path(self) -> GazeFeatures.ScanPath: + """Get frame's scan path object.""" + return self.__scan_path + + @property + def scan_path_analyzers(self) -> dict: + """Get frame's scan path analyzers dictionary.""" + return self.__scan_path_analyzers + + @property + def background(self) -> numpy.array: + """Get frame's background matrix.""" + return self.__background + + @background.setter + def background(self, image: numpy.array): + """Set frame's background matrix.""" + self.__background = image + + @property + def heatmap(self) -> AOIFeatures.Heatmap: + """Get frame's heatmap object.""" + return self.__heatmap + + @property + def layers(self) -> dict: + """Get frame's layers dictionary.""" + return self.__layers + + @property + def image_parameters(self) -> dict: + """Get frame's image parameters dictionary.""" + return self.__image_parameters + + @property + def parent(self) -> object: + """Get frame's parent object.""" + return self.__parent + + @parent.setter + def parent(self, parent: object): + """Set frame's parent object.""" + self.__parent = parent + + @property + def gaze_position(self) -> object: + """Get current calibrated gaze position""" + return self.__calibrated_gaze_position + + @property + def gaze_movement(self) -> object: + """Get current identified gaze movement""" + return self.__identified_gaze_movement + + @property + def scan_path_analyzed(self) -> bool: + """Are scan path analysis ready?""" + return self.__scan_path_analyzed + + @property + def scan_path_analysis(self) -> Iterator[Union[str, dict]]: + """Get scan path analysis. + + Returns + iterator: analyzer module path, analysis dictionary + """ + assert(self.__scan_path_analyzed) + + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): + + yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis + + def as_dict(self) -> dict: + """Export ArFrame attributes as dictionary.""" + + return { + "name": self.__name, + "size": self.__size, + "gaze_position_calibrator": self.__gaze_position_calibrator, + "gaze_movement_identifier": self.__gaze_movement_identifier, + "filter_in_progress_identification": self.__filter_in_progress_identification, + "scan_path": self.__scan_path, + "scan_path_analyzers": self.__scan_path_analyzers, + "background": self.__background, + "heatmap": self.__heatmap, + "layers": self.__layers, + "image_parameters": self.__image_parameters + } @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: - """Load attributes from dictionary. + """Load ArFrame attributes from dictionary. Parameters: frame_data: dictionary with attributes to load @@ -697,6 +848,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): pass + # Load background image + try: + + new_frame_background_value = frame_data.pop('background') + new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) + new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC) + + except KeyError: + + new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8) + # Load heatmap try: @@ -714,17 +876,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): new_heatmap_data = {} new_heatmap = None - # Load background image - try: - - new_frame_background_value = frame_data.pop('background') - new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) - new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC) - - except KeyError: - - new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8) - # Load layers new_layers = {} @@ -762,31 +913,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # str: relative path to file if type(new_logging_module_value) == str: - logging_module_name = new_logging_module_value.split('.')[0] + new_logging_module = new_logging_module_value.split('.')[0] - # Import logging module - self.logging_module = importlib.import_module(logging_module_name) - - # Register loggers as pipeline step observers - self.observers = self.logging_module.__loggers__ - except KeyError: - pass + new_logging_module = None + + # DEBUG + print('Create frame', new_frame_name) # Create frame - return ArFrame(new_frame_name, \ - new_frame_size, \ - new_gaze_position_calibrator, \ - new_gaze_movement_identifier, \ - filter_in_progress_identification, \ - new_scan_path, \ - new_scan_path_analyzers, \ - new_heatmap, \ - new_frame_background, \ - new_layers, \ - new_frame_image_parameters \ - ) + return ArFrame(new_frame_name, new_frame_size, new_gaze_position_calibrator, new_gaze_movement_identifier, filter_in_progress_identification, new_scan_path, new_scan_path_analyzers, new_frame_background, new_heatmap, new_layers, new_frame_image_parameters) @classmethod def from_json(self, json_filepath: str) -> ArFrameType: @@ -804,49 +941,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return ArFrame.from_dict(frame_data, working_directory) - @property - def parent(self) -> object: - """Get parent instance""" - - return self.__parent - - @parent.setter - def parent(self, parent: object): - """Set parent instance""" - - self.__parent = parent - - @property - def gaze_position(self) -> object: - """Get current calibrated gaze position""" - - return self.__calibrated_gaze_position - - @property - def gaze_movement(self) -> object: - """Get current identified gaze movement""" - - return self.__identified_gaze_movement - - @property - def scan_path_analyzed(self) -> bool: - """Are scan path analysis ready?""" - - return self.__scan_path_analyzed - - def scan_path_analysis(self) -> Iterator[Union[str, dict]]: - """Get scan path analysis. - - Returns - iterator: analyzer module path, analysis dictionary - """ - - assert(self.__scan_path_analyzed) - - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): - - yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis - @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]: """ @@ -861,7 +955,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ # Use frame locker feature - with self.locker: + with self._lock: # No gaze movement identified by default self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() @@ -870,9 +964,9 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__scan_path_analyzed = False # Apply gaze position calibration - if self.gaze_position_calibrator is not None: + if self.__gaze_position_calibrator is not None: - self.__calibrated_gaze_position = self.gaze_position_calibrator.apply(gaze_position) + self.__calibrated_gaze_position = self.__gaze_position_calibrator.apply(gaze_position) # Or update gaze position at least else: @@ -880,10 +974,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__calibrated_gaze_position = gaze_position # Identify gaze movement - if self.gaze_movement_identifier is not None: + if self.__gaze_movement_identifier is not None: # Identify finished gaze movement - self.__identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) + self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished: @@ -891,45 +985,45 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if GazeFeatures.is_fixation(self.__identified_gaze_movement): # Append fixation to scan path - if self.scan_path is not None: + if self.__scan_path is not None: - self.scan_path.append_fixation(timestamp, self.__identified_gaze_movement) + self.__scan_path.append_fixation(timestamp, self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): # Append saccade to scan path - if self.scan_path is not None: + if self.__scan_path is not None: - scan_step = self.scan_path.append_saccade(timestamp, self.__identified_gaze_movement) + scan_step = self.__scan_path.append_saccade(timestamp, self.__identified_gaze_movement) # Is there a new step? - if scan_step and len(self.scan_path) > 1: + if scan_step and len(self.__scan_path) > 1: # Analyze aoi scan path - for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): + for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items(): - scan_path_analyzer.analyze(timestamp, self.scan_path) + scan_path_analyzer.analyze(timestamp, self.__scan_path) # Update scan path analyzed state self.__scan_path_analyzed = True # No valid finished gaze movement: optionnaly stop in progress identification filtering - elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification: + elif self.__gaze_movement_identifier is not None and not self.__filter_in_progress_identification: - self.__identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement + self.__identified_gaze_movement = self.__gaze_movement_identifier.current_gaze_movement # Update heatmap - if self.heatmap is not None: + if self.__heatmap is not None: # Scale gaze position value - scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]]) + scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image - self.heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale) + self.__heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale) # Look layers with valid identified gaze movement # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally - for layer_name, layer in self.layers.items(): + for layer_name, layer in self.__layers.items(): layer.look(timestamp, self.__identified_gaze_movement) @@ -949,56 +1043,56 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ # Use frame locker feature - with self.locker: + with self._lock: # Draw background only - if background_weight is not None and (heatmap_weight is None or self.heatmap is None): + if background_weight is not None and (heatmap_weight is None or self.__heatmap is None): - image = self.background.copy() + image = self.__background.copy() # Draw mix background and heatmap if required - elif background_weight is not None and heatmap_weight is not None and self.heatmap: + elif background_weight is not None and heatmap_weight is not None and self.__heatmap: - background_image = self.background.copy() - heatmap_image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) + background_image = self.__background.copy() + heatmap_image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR) image = cv2.addWeighted(heatmap_image, heatmap_weight, background_image, background_weight, 0) # Draw heatmap only - elif background_weight is None and heatmap_weight is not None and self.heatmap: + elif background_weight is None and heatmap_weight is not None and self.__heatmap: - image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) + image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR) # Draw black image else: - image = numpy.full((self.size[1], self.size[0], 3), 0).astype(numpy.uint8) + image = numpy.full((self.__size[1], self.__size[0], 3), 0).astype(numpy.uint8) # Draw gaze position calibrator if draw_gaze_position_calibrator is not None: - self.gaze_position_calibrator.draw(image, size=self.size, **draw_gaze_position_calibrator) + self.__gaze_position_calibrator.draw(image, size=self.__size, **draw_gaze_position_calibrator) # Draw scan path if required - if draw_scan_path is not None and self.scan_path is not None: + if draw_scan_path is not None and self.__scan_path is not None: - self.scan_path.draw(image, **draw_scan_path) + self.__scan_path.draw(image, **draw_scan_path) # Draw current fixation if required - if draw_fixations is not None and self.gaze_movement_identifier is not None: + if draw_fixations is not None and self.__gaze_movement_identifier is not None: - self.gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) + self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) # Draw current saccade if required - if draw_saccades is not None and self.gaze_movement_identifier is not None: + if draw_saccades is not None and self.__gaze_movement_identifier is not None: - self.gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) + self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) # Draw layers if required if draw_layers is not None: for layer_name, draw_layer in draw_layers.items(): - self.layers[layer_name].draw(image, **draw_layer) + self.__layers[layer_name].draw(image, **draw_layer) # Draw current gaze position if required if draw_gaze_positions is not None: @@ -1019,77 +1113,105 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return self.__image(**kwargs) - return self.__image(**self.image_parameters) + return self.__image(**self.__image_parameters) -@dataclass -class ArScene(): +class ArScene(DataFeatures.PipelineStepObject): """ Define abstract Augmented Reality scene with ArLayers and ArFrames inside. - - Parameters: - name: name of the scene - layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. - frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. - angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. - distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. """ - name: str - layers: dict = field(default_factory=dict) - frames: dict = field(default_factory=dict) - angle_tolerance: float = field(default=0.) - distance_tolerance: float = field(default=0.) + + def __init__(self, name: str = None, layers: dict = None, frames: dict = None, angle_tolerance: float = 0., distance_tolerance: float = 0.): + """ Initialize ArScene - def __post_init__(self): + Parameters: + name: name of the scene + layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. + frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. + angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. + distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. + """ - # Define parent attribute: it will be setup by parent object later - self.__parent = None + # Init parent classes + super().__init__() + + # Init private attributes + self.__name = name + self.__layers = layers + self.__frames = frames + self.__angle_tolerance = angle_tolerance + self.__distance_tolerance = distance_tolerance + self.__parent = None # it will be setup by parent later # Setup layer parent attribute - for name, layer in self.layers.items(): + for name, layer in self.__layers.items(): layer.parent = self # Setup frame parent attribute - for name, frame in self.frames.items(): + for name, frame in self.__frames.items(): frame.parent = self - def __str__(self) -> str: - """ - Returns: - String representation - """ - - output = f'parent:\n{self.parent.name}\n' - - if len(self.layers): - output += f'ArLayers:\n' - for name, layer in self.layers.items(): - output += f'{name}:\n{layer}\n' - - if len(self.frames): - output += f'ArFrames:\n' - for name, frame in self.frames.items(): - output += f'{name}:\n{frame}\n' - - return output - @property - def parent(self): - """Get parent instance""" - + def name(self) -> str: + """Get scene's name.""" + return self.__name + + @property + def layers(self) -> dict: + """Get scene's layers dictionary.""" + return self.__layers + + @property + def frames(self) -> dict: + """Get scene's frames dictionary.""" + return self.__frames + + @property + def angle_tolerance(self) -> float: + """Get scene's angle tolerance.""" + return self.__angle_tolerance + + @angle_tolerance.setter + def angle_tolerance(self, value: float): + """Set scene's angle tolerance.""" + self.__angle_tolerance = value + + @property + def distance_tolerance(self) -> float: + """Get scene's distance tolerance.""" + return self.__distance_tolerance + + @distance_tolerance.setter + def distance_tolerance(self, value: float): + """Set scene's distance tolerance.""" + self.__distance_tolerance = value + + @property + def parent(self) -> object: + """Get frame's parent object.""" return self.__parent @parent.setter - def parent(self, parent): - """Get parent instance""" - + def parent(self, parent: object): + """Set frame's parent object.""" self.__parent = parent + def as_dict(self) -> dict: + """Export ArScene attributes as dictionary.""" + + return { + "name": self.__name, + "layers": self.__layers, + "frames": self.__frames, + "angle_tolerance": self.__angle_tolerance, + "distance_tolerance": self.__distance_tolerance + } + @classmethod def from_dict(self, scene_data: dict, working_directory: str = None) -> ArSceneType: """ - Load ArScene from dictionary. + Load ArScene attributes from dictionary. Parameters: scene_data: dictionary @@ -1205,7 +1327,27 @@ class ArScene(): pass return ArScene(new_scene_name, new_layers, new_frames, **scene_data) - + + def __str__(self) -> str: + """ + Returns: + String representation + """ + + output = f'parent:\n{self.__parent.name}\n' + + if len(self.__layers): + output += f'ArLayers:\n' + for name, layer in self.__layers.items(): + output += f'{name}:\n{layer}\n' + + if len(self.__frames): + output += f'ArFrames:\n' + for name, frame in self.__frames.items(): + output += f'{name}:\n{frame}\n' + + return output + def estimate_pose(self, detected_features: any) -> Tuple[numpy.array, numpy.array]: """Define abstract estimate scene pose method. @@ -1232,7 +1374,7 @@ class ArScene(): iterator: name of projected layer and AOI2DScene projection """ - for name, layer in self.layers.items(): + for name, layer in self.__layers.items(): # Clip AOI out of the visual horizontal field of view (optional) # TODO: use HFOV and VFOV and don't use vision_cone method @@ -1255,7 +1397,7 @@ class ArScene(): aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene - yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) + yield name, aoi_scene_copy.project(tvec, rvec, self.__parent.aruco_detector.optic_parameters.K) def draw(self, image: numpy.array, **kwargs: dict): """ @@ -1267,28 +1409,33 @@ class ArScene(): raise NotImplementedError('draw() method not implemented') -@dataclass class ArCamera(ArFrame): """ Define abstract Augmented Reality camera as ArFrame with ArScenes inside. - - Parameters: - scenes: all scenes to project into camera frame - visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV). - visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV). """ - scenes: dict = field(default_factory=dict) - visual_hfov: float = field(default=0.) - visual_vfov: float = field(default=0.) + def __init__(self, scenes: dict = None, visual_hfov: float = 0., visual_vfov: float = 0., **kwargs): + """ Initialize ArCamera + + Parameters: + scenes: all scenes to project into camera frame + visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV). + visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV). + """ + + # DEBUG + print('ArCamera.__init__ kwargs', kwargs) - def __post_init__(self): + # Init parent class + super().__init__(**kwargs) - # Init ArFrame - super().__post_init__() + # Init private attributes + self.__scenes = scenes + self.__visual_hfov = visual_hfov + self.__visual_vfov = visual_vfov # Setup scenes parent attribute - for name, scene in self.scenes.items(): + for name, scene in self.__scenes.items(): scene.parent = self @@ -1301,7 +1448,7 @@ class ArCamera(ArFrame): expected_aoi_list = [] exclude_aoi_list = [] - for scene_name, scene in self.scenes.items(): + for scene_name, scene in self.__scenes.items(): # Append scene layer aoi to corresponding expected camera layer aoi try: @@ -1329,55 +1476,66 @@ class ArCamera(ArFrame): layer.aoi_scan_path.expected_aoi = expected_aoi_list layer.aoi_matcher.exclude = exclude_aoi_list - - def __str__(self) -> str: - """ - Returns: - String representation - """ - - output = f'Name:\n{self.name}\n' - for name, scene in self.scenes.items(): - output += f'\"{name}\" {type(scene)}:\n{scene}\n' - - return output - - @classmethod - def from_dict(self, camera_data: dict, working_directory: str = None) -> ArCameraType: - """ - Load ArCamera from dictionary. - - Parameters: - camera_data: dictionary - working_directory: folder path where to load files when a dictionary value is a relative filepath. - """ - - raise NotImplementedError('from_dict() method not implemented') - - @classmethod - def from_json(self, json_filepath: str) -> ArCameraType: - """ - Load ArCamera from .json file. - - Parameters: - json_filepath: path to json file - """ - - raise NotImplementedError('from_json() method not implemented') + @property + def scenes(self) -> dict: + """Get camera's scenes dictionary.""" + return self.__scenes @property + def visual_hfov(self) -> float: + """Get camera's visual horizontal field of view.""" + return self.__visual_hfov + + @visual_hfov.setter + def visual_hfov(self, value: float): + """Set camera's visual horizontal field of view.""" + self.__visual_hfov = value + + @property + def visual_vfov(self) -> float: + """Get camera's visual vertical field of view.""" + return self.__visual_vfov + + @visual_vfov.setter + def visual_vfov(self, value: float): + """Set camera's visual vertical field of view.""" + self.__visual_vfov = value + + @property def scene_frames(self) -> Iterator[ArFrame]: """Iterate over all scenes frames""" # For each scene - for scene_name, scene in self.scenes.items(): + for scene_name, scene in self.__scenes.items(): # For each scene frame for name, scene_frame in scene.frames.items(): yield scene_frame + def as_dict(self) -> dict: + """Export ArCamera attributes as dictionary.""" + + return { + "scenes": self.__scenes, + "visual_hfov": self.__visual_hfov, + "visual_vfov": self.__visual_vfov + } + + def __str__(self) -> str: + """ + Returns: + String representation + """ + + output = f'Name:\n{self.__name}\n' + + for name, scene in self.__scenes.items(): + output += f'\"{name}\" {type(scene)}:\n{scene}\n' + + return output + @DataFeatures.PipelineStepMethod def watch(self, timestamp: int|float, image: numpy.array): """Detect AR features from image and project scenes into camera frame. @@ -1405,7 +1563,7 @@ class ArCamera(ArFrame): super().look(timestamp, gaze_position) # Use camera frame locker feature - with self.locker: + with self._lock: # Project gaze position into each scene frames if possible for scene_frame in self.scene_frames: @@ -1441,13 +1599,13 @@ class ArCamera(ArFrame): """ # Use camera frame locker feature - with self.locker: + with self._lock: # Project camera frame background into each scene frame if possible for frame in self.scene_frames: # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? - for camera_layer_name, camera_layer in self.layers.items(): + for camera_layer_name, camera_layer in self.__layers.items(): try: @@ -1457,7 +1615,7 @@ class ArCamera(ArFrame): width, height = frame.size destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) - frame.background = cv2.warpPerspective(self.background, mapping, (width, height)) + frame.background = cv2.warpPerspective(self.__background, mapping, (width, height)) # Ignore missing frame projection except KeyError: diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index 775ab40..ca58c20 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -8,7 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple -from dataclasses import dataclass, field import json import os import time @@ -33,26 +32,32 @@ DEFAULT_ARUCOCAMERA_IMAGE_PARAMETERS = { } } -@dataclass class ArUcoCamera(ArFeatures.ArCamera): """ Define an ArCamera based on ArUco marker detection. - - Parameters: - aruco_detector: ArUco marker detector """ - aruco_detector: ArUcoDetector.ArUcoDetector = field(default_factory=ArUcoDetector.ArUcoDetector) + def __init__(self, aruco_detector: ArUcoDetector.ArUcoDetector, **kwargs): + """ Initialize ArUcoCamera + + Parameters: + aruco_detector: ArUco marker detector + """ - def __post_init__(self): + # DEBUG + print('ArUcoCamera.__init__ kwargs', kwargs) - super().__post_init__() + # Init parent class + super().__init__(**kwargs) + + # Init private attribute + self.__aruco_detector = aruco_detector # Check optic parameters - if self.aruco_detector.optic_parameters is not None: + if self.__aruco_detector.optic_parameters is not None: # Optic parameters dimensions should be equal to camera frame size - if self.aruco_detector.optic_parameters.dimensions != self.size: + if self.__aruco_detector.optic_parameters.dimensions != self.size: raise ArFeatures.LoadingFailed('ArUcoCamera: aruco_detector.optic_parameters.dimensions have to be equal to size.') @@ -61,18 +66,12 @@ class ArUcoCamera(ArFeatures.ArCamera): # Create default optic parameters adapted to frame size # Note: The choice of 1000 for default focal length should be discussed... - self.aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=self.size, K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=self.size[0], height=self.size[1])) + self.__aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=self.size, K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=self.size[0], height=self.size[1])) - def __str__(self) -> str: - """ - Returns: - String representation - """ - - output = super().__str__() - output += f'ArUcoDetector:\n{self.aruco_detector}\n' - - return output + @property + def aruco_detector(self) -> ArUcoDetector.ArUcoDetector: + """Get ArUco detector object.""" + return self.__aruco_detector @classmethod def from_dict(self, aruco_camera_data: dict, working_directory: str = None) -> ArUcoCameraType: @@ -120,11 +119,14 @@ class ArUcoCamera(ArFeatures.ArCamera): for layer_name, layer_data in aruco_camera_data['layers'].items(): aruco_camera_data['image_parameters']['draw_layers'][layer_name] = ArFeatures.DEFAULT_ARLAYER_DRAW_PARAMETERS - # Get values of temporary ar frame created from aruco_camera_data - temp_ar_frame_values = DataFeatures.as_dict(ArFeatures.ArFrame.from_dict(aruco_camera_data, working_directory)) + # Load temporary camera from aruco_camera_data then export it as dict + temp_camera_data = ArFeatures.ArCamera.from_dict(aruco_camera_data, working_directory).as_dict() + + # DEBUG + print('ArUcoCamera.from_dict: temp_camera_data=', temp_camera_data) # Create new aruco camera using temporary ar frame values - return ArUcoCamera(aruco_detector=new_aruco_detector, scenes=new_scenes, **temp_ar_frame_values) + return ArUcoCamera(aruco_detector = new_aruco_detector, scenes = new_scenes, **temp_camera_data) @classmethod def from_json(self, json_filepath: str) -> ArUcoCameraType: @@ -142,6 +144,17 @@ class ArUcoCamera(ArFeatures.ArCamera): return ArUcoCamera.from_dict(aruco_camera_data, working_directory) + def __str__(self) -> str: + """ + Returns: + String representation + """ + + output = super().__str__() + output += f'ArUcoDetector:\n{self.__aruco_detector}\n' + + return output + @DataFeatures.PipelineStepMethod def watch(self, timestamp: int|float, image: numpy.array): """Detect environment aruco markers from image and project scenes into camera frame. @@ -151,10 +164,10 @@ class ArUcoCamera(ArFeatures.ArCamera): """ # Use camera frame locker feature - with self.locker: + with self._lock: # Detect aruco markers - self.aruco_detector.detect_markers(timestamp, image) + self.__aruco_detector.detect_markers(timestamp, image) # Fill camera frame background with image self.background = image @@ -173,7 +186,7 @@ class ArUcoCamera(ArFeatures.ArCamera): try: # Build AOI scene directly from detected ArUco marker corners - self.layers[??].aoi_2d_scene |= scene.build_aruco_aoi_scene(self.aruco_detector.detected_markers) + self.layers[??].aoi_2d_scene |= scene.build_aruco_aoi_scene(self.__aruco_detector.detected_markers) except ArFeatures.PoseEstimationFailed: @@ -181,7 +194,7 @@ class ArUcoCamera(ArFeatures.ArCamera): ''' # Estimate scene pose from detected scene markers - tvec, rmat, _ = scene.estimate_pose(self.aruco_detector.detected_markers) + tvec, rmat, _ = scene.estimate_pose(self.__aruco_detector.detected_markers) # Project scene into camera frame according estimated pose for layer_name, layer_projection in scene.project(tvec, rmat, self.visual_hfov, self.visual_vfov): @@ -216,12 +229,12 @@ class ArUcoCamera(ArFeatures.ArCamera): image = super().image(**kwargs) # Use frame locker feature - with self.locker: + with self._lock: # Draw optic parameters grid if required if draw_optic_parameters_grid is not None: - self.aruco_detector.optic_parameters.draw(image, **draw_optic_parameters_grid) + self.__aruco_detector.optic_parameters.draw(image, **draw_optic_parameters_grid) # Draw scenes if required if draw_scenes is not None: @@ -233,7 +246,7 @@ class ArUcoCamera(ArFeatures.ArCamera): # Draw detected markers if required if draw_detected_markers is not None: - self.aruco_detector.draw_detected_markers(image, draw_detected_markers) + self.__aruco_detector.draw_detected_markers(image, draw_detected_markers) return image diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py index c562467..63f4851 100644 --- a/src/argaze/ArUcoMarkers/ArUcoDetector.py +++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py @@ -8,7 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple -from dataclasses import dataclass, field import json import os from collections import Counter @@ -131,7 +130,6 @@ class DetectorParameters(): def internal(self): return self.__parameters -@dataclass class ArUcoDetector(DataFeatures.PipelineStepObject): """ArUco markers detector. @@ -142,12 +140,24 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): parameters: ArUco detector parameters. """ - dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary = field(default_factory=ArUcoMarkersDictionary.ArUcoMarkersDictionary) - marker_size: float = field(default=0.) - optic_parameters: ArUcoOpticCalibrator.OpticParameters = field(default_factory=ArUcoOpticCalibrator.OpticParameters) - parameters: DetectorParameters = field(default_factory=DetectorParameters) + def __init__(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary = None, marker_size: float = 0., optic_parameters: ArUcoOpticCalibrator.OpticParameters = None, parameters: DetectorParameters = None): + """ Initialize ArUcoDetector. - def __post_init__(self): + Parameters: + dictionary: ArUco markers dictionary to detect. + marker_size: Size of ArUco markers to detect in centimeter. + optic_parameters: Optic parameters to use for ArUco detection into image. + parameters: ArUco detector parameters. + """ + + # Init parent class + super().__init__() + + # Init private attributes + self.__dictionary = dictionary + self.__marker_size = marker_size + self.__optic_parameters = optic_parameters + self.__parameters = parameters # Init detected markers data self.__detected_markers = {} @@ -162,9 +172,29 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): self.__detection_count = 0 self.__detected_ids = [] + @property + def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary: + """Get aruco detector's dictionary object.""" + return self.__dictionary + + @property + def marker_size(self) -> float: + """Get aruco detector's marker size.""" + return self.__marker_size + + @property + def optic_parameters(self) -> ArUcoOpticCalibrator.OpticParameters: + """Get aruco detector's opetic parameters object.""" + return self.__optic_parameters + + @property + def parameters(self) -> DetectorParameters: + """Get aruco detector's parameters object.""" + return self.__parameters + @classmethod def from_dict(self, aruco_detector_data: dict, working_directory: str = None) -> ArUcoDetectorType: - """Load attributes from dictionary. + """Load ArUcoDetector attributes from dictionary. Parameters: aruco_detector_data: dictionary with attributes to load @@ -249,10 +279,10 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): def __str__(self) -> str: """String display""" - output = f'\n\tDictionary: {self.dictionary}\n' - output += f'\tMarker size: {self.marker_size} cm\n\n' - output += f'\tOptic parameters:\n{self.optic_parameters}\n' - output += f'\tDetection Parameters:\n{self.parameters}' + output = f'\n\tDictionary: {self.__dictionary}\n' + output += f'\tMarker size: {self.__marker_size} cm\n\n' + output += f'\tOptic parameters:\n{self.__optic_parameters}\n' + output += f'\tDetection Parameters:\n{self.__parameters}' return output @@ -277,7 +307,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): detection_start = time.perf_counter() # Detect markers into gray picture - detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(cv.cvtColor(image, cv.COLOR_BGR2GRAY), self.dictionary.markers, parameters = self.parameters.internal) + detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(cv.cvtColor(image, cv.COLOR_BGR2GRAY), self.__dictionary.markers, parameters = self.__parameters.internal) # Assess marker detection time in ms detection_time = (time.perf_counter() - detection_start) * 1e3 @@ -293,7 +323,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): for i, marker_id in enumerate(detected_markers_ids): - marker = ArUcoMarker.ArUcoMarker(self.dictionary, marker_id, self.marker_size) + marker = ArUcoMarker.ArUcoMarker(self.__dictionary, marker_id, self.__marker_size) marker.corners = detected_markers_corners[i][0] # No pose estimation: call estimate_markers_pose to get one @@ -333,7 +363,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): # Estimate pose of selected markers if len(selected_markers_corners) > 0: - markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(selected_markers_corners, self.marker_size, numpy.array(self.optic_parameters.K), numpy.array(self.optic_parameters.D)) + markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(selected_markers_corners, self.__marker_size, numpy.array(self.__optic_parameters.K), numpy.array(self.__optic_parameters.D)) for i, marker_id in enumerate(selected_markers_ids): @@ -368,7 +398,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): for marker_id, marker in self.__detected_markers.items(): - marker.draw(image, self.optic_parameters.K, self.optic_parameters.D, **draw_marker) + marker.draw(image, self.__optic_parameters.K, self.__optic_parameters.D, **draw_marker) def detect_board(self, image: numpy.array, board, expected_markers_number): """Detect ArUco markers board in image setting up the number of detected markers needed to agree detection. @@ -379,7 +409,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): # detect markers from gray picture gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY) - detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(gray, self.dictionary.markers, parameters = self.parameters.internal) + detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers, parameters = self.__parameters.internal) # if all board markers are detected if len(detected_markers_corners) == expected_markers_number: diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py index b60b59d..34c3157 100644 --- a/src/argaze/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze/ArUcoMarkers/ArUcoScene.py @@ -8,7 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple -from dataclasses import dataclass, field import json import os @@ -22,32 +21,31 @@ import numpy ArUcoSceneType = TypeVar('ArUcoScene', bound="ArUcoScene") # Type definition for type annotation convenience -@dataclass class ArUcoScene(ArFeatures.ArScene): """ Define an ArScene based on an ArUcoMarkersGroup description. - - Parameters: - - aruco_markers_group: ArUco markers 3D scene description used to estimate scene pose from detected markers: see [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function below. - """ - aruco_markers_group: ArUcoMarkersGroup.ArUcoMarkersGroup = field(default_factory=ArUcoMarkersGroup.ArUcoMarkersGroup) + + def __init__(self, aruco_markers_group: ArUcoMarkersGroup.ArUcoMarkersGroup, **kwargs): + """ Initialize ArUcoScene - def __post_init__(self): + Parameters: + aruco_markers_group: ArUco markers 3D scene description used to estimate scene pose from detected markers: see [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function below. + """ - super().__post_init__() + # DEBUG + print(f'ArUcoScene.__init__', kwargs) - def __str__(self) -> str: - """ - Returns: - String representation - """ + # Init parent classes + super().__init__(**kwargs) - output = output = super().__str__() - output += f'ArUcoMarkersGroup:\n{self.aruco_markers_group}\n' + # Init private attribute + self.__aruco_markers_group = aruco_markers_group - return output + @property + def aruco_markers_group(self) -> ArUcoMarkersGroup.ArUcoMarkersGroup: + """Get ArUco scene markers group object.""" + return self.__aruco_markers_group @classmethod def from_dict(self, aruco_scene_data: dict, working_directory: str = None) -> ArUcoSceneType: @@ -90,12 +88,26 @@ class ArUcoScene(ArFeatures.ArScene): new_aruco_markers_group = None - # Get values of temporary ar scene created from aruco_scene_data - temp_ar_scene_values = DataFeatures.as_dict(ArFeatures.ArScene.from_dict(aruco_scene_data, working_directory)) + # Load temporary scene from aruco_scene_data then export it as dict + temp_scene_data = ArFeatures.ArScene.from_dict(aruco_scene_data, working_directory).as_dict() + + # DEBUG + print('ArUcoScene.from_dict: temp_scene_data=', temp_scene_data) # Create new aruco scene using temporary ar scene values - return ArUcoScene(aruco_markers_group=new_aruco_markers_group, **temp_ar_scene_values) - + return ArUcoScene(aruco_markers_group=new_aruco_markers_group, **temp_scene_data) + + def __str__(self) -> str: + """ + Returns: + String representation + """ + + output = output = super().__str__() + output += f'ArUcoMarkersGroup:\n{self.__aruco_markers_group}\n' + + return output + def estimate_pose(self, detected_markers) -> Tuple[numpy.array, numpy.array, dict]: """Estimate scene pose from detected ArUco markers. @@ -110,7 +122,7 @@ class ArUcoScene(ArFeatures.ArScene): raise ArFeatures.PoseEstimationFailed('No marker detected') - scene_markers, _ = self.aruco_markers_group.filter_markers(detected_markers) + scene_markers, _ = self.__aruco_markers_group.filter_markers(detected_markers) # Pose estimation fails when no marker belongs to the scene if len(scene_markers) == 0: @@ -123,7 +135,7 @@ class ArUcoScene(ArFeatures.ArScene): raise ArFeatures.PoseEstimationFailed('Only one marker belongs to the scene') # Estimate pose from a markers corners - success, tvec, rmat = self.aruco_markers_group.estimate_pose_from_markers_corners(scene_markers, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D) + success, tvec, rmat = self.__aruco_markers_group.estimate_pose_from_markers_corners(scene_markers, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D) if not success: @@ -143,4 +155,4 @@ class ArUcoScene(ArFeatures.ArScene): # Draw group if required if draw_aruco_markers_group is not None: - self.aruco_markers_group.draw(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D, **draw_aruco_markers_group) + self.__aruco_markers_group.draw(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D, **draw_aruco_markers_group) diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 793f498..9b673cc 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -8,7 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple, Any -from dataclasses import dataclass, field import importlib from inspect import getmembers, getmodule import collections @@ -33,23 +32,6 @@ DataType = TypeVar('Data') TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") # Type definition for type annotation convenience -def as_dict(dataclass_object) -> dict: - """ - Get dataclass object fields's values as a dictionary. - - Returns: - values: dictionary of dataclass fields's values - """ - - # Get data class fields names - fields_names = [] - for member_name, member_value in getmembers(dataclass_object): - if member_name == '__dataclass_fields__': - fields_names = member_value.keys() - - # Copy fields values - return {name: vars(dataclass_object)[name] for name in fields_names} - def module_path(obj) -> str: """ Get object module path. @@ -369,23 +351,13 @@ class SharedObject(): self._exceptions = {} @property - def locker(self) -> threading.Lock: + def lock(self) -> threading.Lock: + """Get shared object lock object.""" return self._lock - def acquire(self): - self._lock.acquire() - - def release(self): - self._lock.release() - - @property - def locked(self) -> bool: - return self._lock.locked() - @property def timestamp(self) -> int|float: - """Get timestamp""" - + """Get shared object timestamp.""" self._lock.acquire() timestamp = self._timestamp self._lock.release() @@ -394,15 +366,13 @@ class SharedObject(): @timestamp.setter def timestamp(self, timestamp: int|float): - """Set timestamp""" - + """Set shared object timestamp.""" self._lock.acquire() self._timestamp = timestamp self._lock.release() def untimestamp(self): - """Reset timestamp""" - + """Reset shared object timestamp.""" self._lock.acquire() self._timestamp = math.nan self._lock.release() @@ -410,7 +380,6 @@ class SharedObject(): @property def timestamped(self) -> bool: """Is the object timestamped?""" - self._lock.acquire() timestamped = not math.isnan(self._timestamp) self._lock.release() @@ -422,10 +391,65 @@ class PipelineStepObject(): Parameters: execution_times: dictionary with each PipelineStepMethod execution time in ms. + observers: dictionary ... """ execution_times: dict = {} - observers: dict = {} + __observers: dict = {} + + @property + def observers(self) -> dict: + + return self.__observers + + def as_dict(self) -> dict: + """ + Define abstract method to export PipelineStepObject attributes as dictionary. + + Returns: + object_data: dictionary of PipelineStepObject. + """ + raise NotImplementedError('serialize() method not implemented') + + @classmethod + def from_dict(self, object_data: dict, working_directory: str = None) -> object: + """ + Define abstract method to import PipelineStepObject attributes from dictionary. + + Returns: + object_data: dictionary of PipelineStepObject + working_directory: folder path where to load files when a dictionary value is a relative filepath. + """ + raise NotImplementedError('serialize() method not implemented') + + @classmethod + def from_json(self, json_filepath: str) -> object: + """ + Define abstract method to load PipelineStepObject from .json file. + + Parameters: + json_filepath: path to json file + """ + raise NotImplementedError('from_json() method not implemented') + + def __str__(self) -> str: + """ + Define abstract method to have a string representation of PipelineStepObject. + + Returns: + String representation + """ + raise NotImplementedError('__str__() method not implemented') + +def PipelineStepAttribute(method): + + # Mark method as + method._tags = tags + + return method + +# DEBUG +from argaze import ArFeatures def PipelineStepMethod(method): """Define a decorator use into PipelineStepObject class to declare pipeline method. @@ -437,6 +461,11 @@ def PipelineStepMethod(method): def wrapper(self, timestamp, *args, **kw): """Wrap pipeline step method to measure execution time.""" + # DEBUG + if type(self) == ArFeatures.ArFrame: + + print(timestamp, self.name, method.__name__, len(self.observers)) + # Initialize execution time assessment start = time.perf_counter() diff --git a/src/argaze/utils/demo_aruco_markers_run.py b/src/argaze/utils/demo_aruco_markers_run.py index c8b140c..a5b02f0 100644 --- a/src/argaze/utils/demo_aruco_markers_run.py +++ b/src/argaze/utils/demo_aruco_markers_run.py @@ -71,6 +71,7 @@ def main(): except Exception as e: + print(e) gaze_analysis_time = 0 # Attach mouse callback to window @@ -117,6 +118,7 @@ def main(): # Detect and project AR features aruco_camera.watch(capture_time, video_image) + # Detection suceeded exception = None # Write errors @@ -140,8 +142,10 @@ def main(): cv2.putText(aruco_camera_image, f'{gaze_positions_frequency} gaze positions/s | Gaze analysis {gaze_analysis_time:.2f}ms', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) # Handle exceptions - cv2.rectangle(aruco_camera_image, (0, 100), (aruco_camera.size[0], 80), (127, 127, 127), -1) - cv2.putText(aruco_camera_image, f'error: {exception}', (20, 140), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + if exception is not None: + + cv2.rectangle(aruco_camera_image, (0, 100), (aruco_camera.size[0], 80), (127, 127, 127), -1) + cv2.putText(aruco_camera_image, f'error: {exception}', (20, 140), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) # Write hint cv2.putText(aruco_camera_image, 'Mouve mouse pointer over gray rectangle area', (20, aruco_camera.size[1]-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) -- cgit v1.1