diff options
Diffstat (limited to 'src/argaze/ArFeatures.py')
-rw-r--r-- | src/argaze/ArFeatures.py | 299 |
1 files changed, 138 insertions, 161 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 3b05482..7cc1b9d 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -34,35 +34,36 @@ from argaze.utils import UtilsFeatures import numpy import cv2 + class PoseEstimationFailed(Exception): """ - Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. + Exception raised by ArScene estimate_pose method when the pose can't be estimated due to inconsistencies. """ - def __init__(self, message, unconsistencies=None): - + def __init__(self, message, inconsistencies=None): super().__init__(message) - self.unconsistencies = unconsistencies + self.inconsistencies = inconsistencies + class SceneProjectionFailed(Exception): """ Exception raised by ArCamera watch method when the scene can't be projected. """ - def __init__(self, message): - + def __init__(self, message): super().__init__(message) + class DrawingFailed(Exception): """ Exception raised when drawing fails. """ - def __init__(self, message): - + def __init__(self, message): super().__init__(message) + # Define default ArLayer draw parameters DEFAULT_ARLAYER_DRAW_PARAMETERS = { "draw_aoi_scene": { @@ -92,9 +93,10 @@ DEFAULT_ARLAYER_DRAW_PARAMETERS = { } } + class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ - Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed. + Defines a space where to make matching of gaze movements and AOI and inside which those matching need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads. @@ -118,14 +120,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Init pipeline step object attributes self.draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS - + @property def aoi_scene(self) -> AOIFeatures.AOIScene: """AOI scene description.""" return self.__aoi_scene @aoi_scene.setter - def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene|str|dict): + def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene | str | dict): if issubclass(type(aoi_scene_value), AOIFeatures.AOIScene): @@ -139,7 +141,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # JSON file format for 2D or 3D dimension if file_format == 'json': - new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath) # SVG file format for 2D dimension only @@ -168,7 +169,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Edit parent if self.__aoi_scene is not None: - self.__aoi_scene.parent = self @property @@ -180,15 +180,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def aoi_matcher(self, aoi_matcher: GazeFeatures.AOIMatcher): - assert(issubclass(type(aoi_matcher), GazeFeatures.AOIMatcher)) + assert (issubclass(type(aoi_matcher), GazeFeatures.AOIMatcher)) self.__aoi_matcher = aoi_matcher # Edit parent if self.__aoi_matcher is not None: - self.__aoi_matcher.parent = self - + @property def aoi_scan_path(self) -> GazeFeatures.AOIScanPath: """AOI scan path object.""" @@ -198,7 +197,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def aoi_scan_path(self, aoi_scan_path: GazeFeatures.AOIScanPath): - assert(isinstance(aoi_scan_path, GazeFeatures.AOIScanPath)) + assert (isinstance(aoi_scan_path, GazeFeatures.AOIScanPath)) self.__aoi_scan_path = aoi_scan_path @@ -207,9 +206,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Edit parent if self.__aoi_scan_path is not None: - self.__aoi_scan_path.parent = self - + @property def aoi_scan_path_analyzers(self) -> list: """AOI scan path analyzers list.""" @@ -224,7 +222,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Connect analyzers if required for analyzer in self.__aoi_scan_path_analyzers: - assert(issubclass(type(analyzer), GazeFeatures.AOIScanPathAnalyzer)) + assert (issubclass(type(analyzer), GazeFeatures.AOIScanPathAnalyzer)) # Check scan path analyzer properties type for name, item in type(analyzer).__dict__.items(): @@ -238,7 +236,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): except KeyError: - raise(ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) + raise (ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) if issubclass(property_type, GazeFeatures.AOIScanPathAnalyzer): @@ -248,28 +246,25 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): for a in self.__aoi_scan_path_analyzers: if type(a) == property_type: - setattr(analyzer, name, a) found = True if not found: - - raise DataFeatures.PipelineStepLoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') + raise DataFeatures.PipelineStepLoadingFailed( + f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None: - - self.scan_path = GazeFeatures.ScanPath() + self.__aoi_scan_path = GazeFeatures.ScanPath() # Edit parent for analyzer in self.__aoi_scan_path_analyzers: - analyzer.parent = self def last_looked_aoi_name(self) -> str: """Get last looked aoi name.""" return self.__looked_aoi_name - + def is_analysis_available(self) -> bool: """Are aoi scan path analysis ready?""" return self.__aoi_scan_path_analyzed @@ -279,7 +274,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): analysis = {} for analyzer in self.__aoi_scan_path_analyzers: - analysis[DataFeatures.get_class_path(analyzer)] = analyzer.analysis() return analysis @@ -300,19 +294,17 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """Update expected AOI of AOI scan path considering AOI scene and layer name.""" if self.__aoi_scene is None: - logging.debug('ArLayer._update_expected_aoi %s (parent: %s): missing aoi scene', self.name, self.parent) return logging.debug('ArLayer._update_expected_aoi %s (parent: %s)', self.name, self.parent) - # Get aoi names from aoi scene + # Get aoi names from aoi scene expected_aoi = list(self.__aoi_scene.keys()) # Remove layer name from expected aoi if self.name in expected_aoi: - expected_aoi.remove(self.name) # Update expected aoi: this will clear the scan path @@ -345,9 +337,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__aoi_scan_path_analyzed = False if self.__aoi_matcher is not None and self.__aoi_scene is not None: - # Update looked aoi thanks to aoi matcher - # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally + # Note: don't filter valid/invalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally self.__looked_aoi_name, _ = self.__aoi_matcher.match(gaze_movement, self.__aoi_scene) logging.debug('\t> looked aoi name: %s', self.__looked_aoi_name) @@ -372,7 +363,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Analyze aoi scan path for aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers: - aoi_scan_path_analyzer.analyze(self.__aoi_scan_path, timestamp=gaze_movement.timestamp) # Update aoi scan path analyzed state @@ -382,7 +372,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Append saccade to aoi scan path if self.__aoi_scan_path is not None: - logging.debug('\t> append saccade') self.__aoi_scan_path.append_saccade(gaze_movement) @@ -393,8 +382,10 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Draw into image. Parameters: + image: image where to draw. draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn) - draw_aoi_matching: AOIMatcher.draw parameters (which depends of the loaded aoi matcher module, if None, no aoi matching is drawn) + draw_aoi_matching: AOIMatcher.draw parameters (which depends on the loaded aoi matcher module, + if None, no aoi matching is drawn) """ # Use layer lock feature @@ -402,14 +393,13 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Draw aoi if required if draw_aoi_scene is not None and self.__aoi_scene is not None: - self.__aoi_scene.draw(image, **draw_aoi_scene) # Draw aoi matching if required if draw_aoi_matching is not None and self.__aoi_matcher is not None: - self.__aoi_matcher.draw(image, self.__aoi_scene, **draw_aoi_matching) + # Define default ArFrame image parameters DEFAULT_ARFRAME_IMAGE_PARAMETERS = { "background_weight": 1., @@ -431,6 +421,7 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = { } } + class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. @@ -453,7 +444,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__filter_in_progress_identification = True self.__scan_path = None self.__scan_path_analyzers = [] - self.__background = DataFeatures.TimestampedImage( numpy.full((1, 1, 3), 127).astype(numpy.uint8) ) + self.__background = DataFeatures.TimestampedImage(numpy.full((1, 1, 3), 127).astype(numpy.uint8)) self.__heatmap = None self.__calibrated_gaze_position = GazeFeatures.GazePosition() self.__identified_gaze_movement = GazeFeatures.GazeMovement() @@ -464,32 +455,31 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self._image_parameters = DEFAULT_ARFRAME_IMAGE_PARAMETERS @property - def size(self) -> tuple[int]: + def size(self) -> tuple[int, int]: """Defines the dimension of the rectangular area where gaze positions are projected.""" return self.__size @size.setter - def size(self, size: tuple[int]): + def size(self, size: tuple[int, int]): self.__size = size - + @property def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator: - """Select gaze position calibration algoritm.""" + """Select gaze position calibration algorithm.""" return self.__gaze_position_calibrator @gaze_position_calibrator.setter @DataFeatures.PipelineStepAttributeSetter def gaze_position_calibrator(self, gaze_position_calibrator: GazeFeatures.GazePositionCalibrator): - assert(issubclass(type(gaze_position_calibrator), GazeFeatures.GazePositionCalibrator)) + assert (issubclass(type(gaze_position_calibrator), GazeFeatures.GazePositionCalibrator)) self.__gaze_position_calibrator = gaze_position_calibrator # Edit parent if self.__gaze_position_calibrator is not None: - self.__gaze_position_calibrator.parent = self - + @property def gaze_movement_identifier(self) -> GazeFeatures.GazeMovementIdentifier: """Select gaze movement identification algorithm.""" @@ -499,15 +489,14 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def gaze_movement_identifier(self, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier): - assert(issubclass(type(gaze_movement_identifier), GazeFeatures.GazeMovementIdentifier)) + assert (issubclass(type(gaze_movement_identifier), GazeFeatures.GazeMovementIdentifier)) self.__gaze_movement_identifier = gaze_movement_identifier # Edit parent if self.__gaze_movement_identifier is not None: - self.__gaze_movement_identifier.parent = self - + @property def filter_in_progress_identification(self) -> bool: """Is frame ignores in progress gaze movement identification?""" @@ -526,15 +515,14 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @scan_path.setter @DataFeatures.PipelineStepAttributeSetter - def scan_path(self, scan_path: GazeFeatures.ScanPath) -> GazeFeatures.ScanPath: + def scan_path(self, scan_path: GazeFeatures.ScanPath): - assert(isinstance(scan_path, GazeFeatures.ScanPath)) + assert (isinstance(scan_path, GazeFeatures.ScanPath)) self.__scan_path = scan_path # Edit parent if self.__scan_path is not None: - self.__scan_path.parent = self @property @@ -551,7 +539,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Connect analyzers if required for analyzer in self.__scan_path_analyzers: - assert(issubclass(type(analyzer), GazeFeatures.ScanPathAnalyzer)) + assert (issubclass(type(analyzer), GazeFeatures.ScanPathAnalyzer)) # Check scan path analyzer properties type for name, item in type(analyzer).__dict__.items(): @@ -565,7 +553,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): except KeyError: - raise(ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) + raise (ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) if issubclass(property_type, GazeFeatures.AOIScanPathAnalyzer): @@ -575,22 +563,19 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): for a in self.__scan_path_analyzers: if type(a) == property_type: - setattr(analyzer, name, a) found = True if not found: - - raise DataFeatures.PipelineStepLoadingFaile(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') + raise DataFeatures.PipelineStepLoadingFaile( + f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation - if len(self.__scan_path_analyzers) > 0 and self.scan_path == None: - - self.scan_path = GazeFeatures.ScanPath() + if len(self.__scan_path_analyzers) > 0 and self.__scan_path == None: + self.__scan_path = GazeFeatures.ScanPath() # Edit parent for analyzer in self.__scan_path_analyzers: - analyzer.parent = self @property @@ -602,12 +587,13 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def background(self, background: DataFeatures.TimestampedImage): - assert(isinstance(background, DataFeatures.TimestampedImage)) + assert (isinstance(background, DataFeatures.TimestampedImage)) if background.size != self.size: # Resize image to frame size - self.__background = DataFeatures.TimestampedImage( cv2.resize(background, dsize = self.size, interpolation = cv2.INTER_CUBIC), background.timestamp) + self.__background = DataFeatures.TimestampedImage( + cv2.resize(background, dsize=self.size, interpolation=cv2.INTER_CUBIC), background.timestamp) else: @@ -622,20 +608,18 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def heatmap(self, heatmap: AOIFeatures.Heatmap): - assert(isinstance(heatmap, AOIFeatures.Heatmap)) + assert (isinstance(heatmap, AOIFeatures.Heatmap)) self.__heatmap = heatmap # Default heatmap size equals frame size if self.__heatmap.size == (1, 1): - self.__heatmap.size = self.size # Edit parent if self.__heatmap is not None: - self.__heatmap.parent = self - + @property def layers(self) -> dict: """Layers dictionary.""" @@ -648,12 +632,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self._layers = {} for layer_name, layer_data in layers.items(): - - self._layers[layer_name] = ArLayer(name = layer_name, **layer_data) + self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # Edit parent for name, layer in self._layers.items(): - layer.parent = self def last_gaze_position(self) -> object: @@ -673,7 +655,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): analysis = {} for analyzer in self.__scan_path_analyzers: - analysis[DataFeatures.get_class_path(analyzer)] = analyzer.analysis() return analysis @@ -701,7 +682,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return d @DataFeatures.PipelineStepMethod - def look(self, timestamped_gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()) -> Iterator[Union[object, type, dict]]: + def look(self, timestamped_gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): """ Project timestamped gaze position into frame. @@ -733,60 +714,60 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Identify gaze movement if self.__gaze_movement_identifier is not None: - # Identify finished gaze movement - self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(self.__calibrated_gaze_position) + self.__identified_gaze_movement = self.__gaze_movement_identifier.identify( + self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified if self.__identified_gaze_movement and self.__identified_gaze_movement.is_finished(): - + if GazeFeatures.is_fixation(self.__identified_gaze_movement): - + # Append fixation to scan path if self.__scan_path is not None: - self.__scan_path.append_fixation(self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): - + # Append saccade to scan path if self.__scan_path is not None: - + scan_step = self.__scan_path.append_saccade(self.__identified_gaze_movement) # Is there a new step? if scan_step and len(self.__scan_path) > 1: - + # Analyze aoi scan path for scan_path_analyzer in self.__scan_path_analyzers: - - scan_path_analyzer.analyze(self.__scan_path, timestamp=self.__identified_gaze_movement.timestamp) + scan_path_analyzer.analyze(self.__scan_path, + timestamp=self.__identified_gaze_movement.timestamp) # Update scan path analyzed state self.__scan_path_analyzed = True - # No valid finished gaze movement: optionnaly stop in progress identification filtering + # No valid finished gaze movement: optionally stop in progress identification filtering elif self.__gaze_movement_identifier is not None and not self.__filter_in_progress_identification: self.__identified_gaze_movement = self.__gaze_movement_identifier.current_gaze_movement() # Update heatmap if self.__heatmap is not None: - # Scale gaze position value scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image - self.__heatmap.update(self.__calibrated_gaze_position * scale, timestamp=self.__calibrated_gaze_position.timestamp) + self.__heatmap.update(self.__calibrated_gaze_position * scale, + timestamp=self.__calibrated_gaze_position.timestamp) # Look layers with valid identified gaze movement - # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally + # Note: don't filter valid/invalid finished/unfinished gaze movement to allow layers to reset internally for layer_name, layer in self._layers.items(): - layer.look(self.__identified_gaze_movement) @DataFeatures.PipelineStepImage - def image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: + def image(self, background_weight: float = None, heatmap_weight: float = None, + draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, + draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. @@ -838,14 +819,12 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Draw gaze position calibrator if draw_gaze_position_calibrator is not None: - logging.debug('\t> drawing gaze position calibrator') self.__gaze_position_calibrator.draw(image, size=self.__size, **draw_gaze_position_calibrator) # Draw scan path if required if draw_scan_path is not None and self.__scan_path is not None: - logging.debug('\t> drawing scan path') self.__scan_path.draw(image, **draw_scan_path) @@ -854,7 +833,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if draw_fixations is not None and self.__gaze_movement_identifier is not None: if self.__gaze_movement_identifier.current_fixation(): - logging.debug('\t> drawing current fixation') self.__gaze_movement_identifier.current_fixation().draw(image, **draw_fixations) @@ -863,7 +841,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if draw_saccades is not None and self.__gaze_movement_identifier is not None: if self.__gaze_movement_identifier.current_saccade(): - logging.debug('\t> drawing current saccade') self.__gaze_movement_identifier.current_saccade().draw(image, **draw_saccades) @@ -881,24 +858,24 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): except KeyError: - raise(DrawingFailed(f'\'{layer_name}\' layer doesn\'t exist.')) + raise (DrawingFailed(f'\'{layer_name}\' layer doesn\'t exist.')) # Draw current gaze position if required if draw_gaze_positions is not None: - logging.debug('\t> drawing current gaze position') self.__calibrated_gaze_position.draw(image, **draw_gaze_positions) logging.debug('\t> returning image (%i x %i)', image.shape[1], image.shape[0]) - return DataFeatures.TimestampedImage(image, timestamp = self.__background.timestamp) + return DataFeatures.TimestampedImage(image, timestamp=self.__background.timestamp) + class ArScene(DataFeatures.PipelineStepObject): """ Define abstract Augmented Reality scene with ArLayers and ArFrames inside. """ - + @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArScene""" @@ -906,8 +883,8 @@ class ArScene(DataFeatures.PipelineStepObject): # Init private attributes self._layers = {} self.__frames = {} - self.__angle_tolerance = 0, - self.__distance_tolerance = 0, + self.__angle_tolerance = 0. + self.__distance_tolerance = 0. @property def layers(self) -> dict: @@ -917,7 +894,7 @@ class ArScene(DataFeatures.PipelineStepObject): @layers.setter @DataFeatures.PipelineStepAttributeSetter - def layers(self, layers:dict): + def layers(self, layers: dict): self._layers = {} @@ -925,21 +902,18 @@ class ArScene(DataFeatures.PipelineStepObject): if type(layer_data) == dict: - self._layers[layer_name] = ArLayer(name = layer_name, **layer_data) + self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # str: relative path to JSON file elif type(layer_data) == str: - self._layers[layer_name] = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), layer_data)) - - # Loaded layer name have to be equals to dictionary key - assert(self._layers[layer_name].name == frame_name) + self._layers[layer_name] = DataFeatures.from_json( + os.path.join(DataFeatures.get_working_directory(), layer_data)) # Edit parent for name, layer in self._layers.items(): - layer.parent = self - + @property def frames(self) -> dict: """Dictionary of ArFrames to project once the pose is estimated. @@ -956,7 +930,7 @@ class ArScene(DataFeatures.PipelineStepObject): if type(frame_data) == dict: - new_frame = ArFrame(name = frame_name, **frame_data) + new_frame = ArFrame(name=frame_name, **frame_data) # str: relative path to JSON file elif type(frame_data) == str: @@ -964,7 +938,7 @@ class ArScene(DataFeatures.PipelineStepObject): new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data)) # Loaded frame name have to be equals to dictionary key - assert(new_frame.name == frame_name) + assert (new_frame.name == frame_name) # Look for a scene layer with an AOI named like the frame for scene_layer_name, scene_layer in self.layers.items(): @@ -976,7 +950,7 @@ class ArScene(DataFeatures.PipelineStepObject): # Check that the frame have a layer named like this scene layer aoi_2d_scene = new_frame.layers[scene_layer_name].aoi_scene - # Transform 2D frame layer AOI into 3D scene layer AOI + # Transform 2D frame layer AOI into 3D scene layer AOI # Then, add them to scene layer scene_layer.aoi_scene |= aoi_2d_scene.dimensionalize(frame_3d, new_frame.size) @@ -989,9 +963,8 @@ class ArScene(DataFeatures.PipelineStepObject): # Edit parent for name, frame in self.__frames.items(): - frame.parent = self - + @property def angle_tolerance(self) -> float: """Angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.""" @@ -1001,7 +974,7 @@ class ArScene(DataFeatures.PipelineStepObject): def angle_tolerance(self, value: float): self.__angle_tolerance = value - + @property def distance_tolerance(self) -> float: """Distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.""" @@ -1039,8 +1012,9 @@ class ArScene(DataFeatures.PipelineStepObject): raise NotImplementedError('estimate_pose() method not implemented') @DataFeatures.PipelineStepMethod - def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]: - """Project layers according estimated pose and optional field of view clipping angles. + def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> \ + Iterator[Union[str, AOI2DScene.AOI2DScene]]: + """Project layers according estimated pose and optional field of view clipping angles. Parameters: tvec: translation vector @@ -1061,8 +1035,8 @@ class ArScene(DataFeatures.PipelineStepObject): # Transform layer aoi scene into camera referential aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec) - # Get aoi inside vision cone field - cone_vision_height_cm = 200 # cm + # Get aoi inside vision cone field + cone_vision_height_cm = 200 # cm cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) @@ -1077,6 +1051,7 @@ class ArScene(DataFeatures.PipelineStepObject): # Project layer aoi scene yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) + class ArCamera(ArFrame): """ Define abstract Augmented Reality camera as ArFrame with ArScenes inside. @@ -1085,7 +1060,7 @@ class ArCamera(ArFrame): @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArCamera.""" - + # Init ArFrame class super().__init__() @@ -1103,12 +1078,10 @@ class ArCamera(ArFrame): self._layers = {} for layer_name, layer_data in layers.items(): - - self._layers[layer_name] = ArLayer(name = layer_name, **layer_data) + self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # Edit parent for name, layer in self._layers.items(): - layer.parent = self # Update expected and excluded aoi @@ -1126,12 +1099,10 @@ class ArCamera(ArFrame): self._scenes = {} for scene_name, scene_data in scenes.items(): - - self._scenes[scene_name] = ArScene(name = scene_name, **scene_data) + self._scenes[scene_name] = ArScene(name=scene_name, **scene_data) # Edit parent for name, scene in self._scenes.items(): - scene.parent = self # Update expected and excluded aoi @@ -1146,7 +1117,7 @@ class ArCamera(ArFrame): def visual_hfov(self, value: float): """Set camera's visual horizontal field of view.""" self.__visual_hfov = value - + @property def visual_vfov(self) -> float: """Angle in degree to clip scenes projection according visual vertical field of view (VFOV).""" @@ -1156,7 +1127,7 @@ class ArCamera(ArFrame): def visual_vfov(self, value: float): """Set camera's visual vertical field of view.""" self.__visual_vfov = value - + def scene_frames(self) -> Iterator[ArFrame]: """Iterate over all scenes frames""" @@ -1165,7 +1136,6 @@ class ArCamera(ArFrame): # For each scene frame for name, scene_frame in scene.frames.items(): - yield scene_frame def as_dict(self) -> dict: @@ -1184,7 +1154,6 @@ class ArCamera(ArFrame): """ if not self._layers or not self._scenes: - logging.debug('ArCamera._update_expected_and_excluded_aoi %s: missing layers or scenes', self.name) return @@ -1214,7 +1183,7 @@ class ArCamera(ArFrame): for frame_name, frame in scene.frames.items(): try: - + expected_aoi_list.remove(frame_name) excluded_aoi_list.append(frame_name) @@ -1223,11 +1192,9 @@ class ArCamera(ArFrame): continue if layer.aoi_scan_path is not None: - layer.aoi_scan_path.expected_aoi = expected_aoi_list if layer.aoi_matcher is not None: - layer.aoi_matcher.exclude = excluded_aoi_list @DataFeatures.PipelineStepMethod @@ -1275,11 +1242,11 @@ class ArCamera(ArFrame): # TODO?: Should we prefer to use camera frame AOIMatcher object? if aoi_2d.contains_point(timestamped_gaze_position): - inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position) # QUESTION: How to project gaze precision? - inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp) + inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), + timestamp=timestamped_gaze_position.timestamp) # Project inner gaze position into scene frame scene_frame.look(inner_gaze_position * scene_frame.size) @@ -1314,19 +1281,23 @@ class ArCamera(ArFrame): width, height = frame.size destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) - frame.background = DataFeatures.TimestampedImage( cv2.warpPerspective(self.background, mapping, (width, height)), timestamp = self.background.timestamp) + frame.background = DataFeatures.TimestampedImage( + cv2.warpPerspective(self.background, mapping, (width, height)), + timestamp=self.background.timestamp) # Ignore missing frame projection except KeyError: pass + # Define default ArContext image parameters DEFAULT_ARCONTEXT_IMAGE_PARAMETERS = { "draw_times": True, "draw_exceptions": True } + class ArContext(DataFeatures.PipelineStepObject): """ Define class to ... @@ -1340,14 +1311,14 @@ class ArContext(DataFeatures.PipelineStepObject): self.__catch_exceptions = True self.__exceptions = DataFeatures.TimestampedExceptions() - # Init gaze position processing assement + # Init gaze position processing assessment self.__process_gaze_position_chrono = UtilsFeatures.TimeProbe() self.__process_gaze_position_frequency = 0 - # Init camera image processing assement + # Init camera image processing assessment self.__process_camera_image_chrono = UtilsFeatures.TimeProbe() self.__process_camera_image_frequency = 0 - + # Init protected attributes self._image_parameters = DEFAULT_ARCONTEXT_IMAGE_PARAMETERS @@ -1360,7 +1331,7 @@ class ArContext(DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def pipeline(self, pipeline: DataFeatures.PipelineStepObject): - assert(issubclass(type(pipeline), DataFeatures.PipelineStepObject)) + assert (issubclass(type(pipeline), DataFeatures.PipelineStepObject)) self.__pipeline = pipeline @@ -1374,12 +1345,12 @@ class ArContext(DataFeatures.PipelineStepObject): self.__catch_exceptions = catch_exceptions - def exceptions(self) -> DataFeatures.TimestampedException: + def exceptions(self) -> DataFeatures.TimestampedExceptions: """Get exceptions list""" return self.__exceptions def as_dict(self) -> dict: - """Export Arcontext properties as dictionary.""" + """Export ArContext properties as dictionary.""" return { **DataFeatures.PipelineStepObject.as_dict(self), @@ -1402,7 +1373,8 @@ class ArContext(DataFeatures.PipelineStepObject): """Exit from ArContext.""" pass - def _process_gaze_position(self, timestamp: int|float, x: int|float = None, y: int|float = None, precision: int|float = None): + def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None, + precision: int | float = None): """Request pipeline to process new gaze position at a timestamp.""" logging.debug('ArContext._process_gaze_position %s', self.name) @@ -1411,7 +1383,6 @@ class ArContext(DataFeatures.PipelineStepObject): lap_time, nb_laps, elapsed_time = self.__process_gaze_position_chrono.lap() if elapsed_time > 1e3: - self.__process_gaze_position_frequency = nb_laps self.__process_gaze_position_chrono.restart() @@ -1422,12 +1393,14 @@ class ArContext(DataFeatures.PipelineStepObject): if x is None and y is None: # Edit empty gaze position - self.__pipeline.look( GazeFeatures.GazePosition( timestamp = timestamp), catch_exceptions = self.__catch_exceptions ) + self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp), + catch_exceptions=self.__catch_exceptions) else: # Edit gaze position - self.__pipeline.look( GazeFeatures.GazePosition( (x, y), precision = precision, timestamp = timestamp), catch_exceptions = self.__catch_exceptions) + self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp), + catch_exceptions=self.__catch_exceptions) except DataFeatures.TimestampedException as e: @@ -1435,9 +1408,9 @@ class ArContext(DataFeatures.PipelineStepObject): else: - raise(TypeError('Pipeline is not ArFrame instance.')) + raise (TypeError('Pipeline is not ArFrame instance.')) - def _process_camera_image(self, timestamp: int|float, image: numpy.array): + def _process_camera_image(self, timestamp: int | float, image: numpy.array): """Request pipeline to process new camera image at a timestamp.""" logging.debug('ArContext._process_camera_image %s', self.name) @@ -1446,7 +1419,6 @@ class ArContext(DataFeatures.PipelineStepObject): lap_time, nb_laps, elapsed_time = self.__process_camera_image_chrono.lap() if elapsed_time > 1e3: - self.__process_camera_image_frequency = nb_laps self.__process_camera_image_chrono.restart() @@ -1456,18 +1428,20 @@ class ArContext(DataFeatures.PipelineStepObject): # Compare image size with ArCamera frame size if list(image.shape[0:2][::-1]) != self.__pipeline.size: - - logging.warning('%s._process_camera_image: image size (%i x %i) is different of ArCamera frame size (%i x %i)', DataFeatures.get_class_path(self) , width, height, self.__pipeline.size[0], self.__pipeline.size[1]) + logging.warning( + '%s._process_camera_image: image size (%i x %i) is different of ArCamera frame size (%i x %i)', + DataFeatures.get_class_path(self), width, height, self.__pipeline.size[0], self.__pipeline.size[1]) return try: logging.debug('\t> watch image (%i x %i)', width, height) - self.__pipeline.watch( DataFeatures.TimestampedImage(image, timestamp = timestamp), catch_exceptions = self.__catch_exceptions ) + self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp), + catch_exceptions=self.__catch_exceptions) # TODO: make this step optional - self.__pipeline.map(timestamp = timestamp, catch_exceptions = self.__catch_exceptions) + self.__pipeline.map(timestamp=timestamp, catch_exceptions=self.__catch_exceptions) except DataFeatures.TimestampedException as e: @@ -1477,15 +1451,16 @@ class ArContext(DataFeatures.PipelineStepObject): else: - raise(TypeError('Pipeline is not ArCamera instance.')) + raise (TypeError('Pipeline is not ArCamera instance.')) @DataFeatures.PipelineStepImage def image(self, draw_times: bool = None, draw_exceptions: bool = None): """ - Get pipeline image with execution informations. + Get pipeline image with execution information. Parameters: - draw_exceptions: ... + draw_times: draw pipeline execution times + draw_exceptions: draw pipeline exception messages """ logging.debug('ArContext.image %s', self.name) @@ -1499,9 +1474,9 @@ class ArContext(DataFeatures.PipelineStepObject): if draw_times: if image.is_timestamped(): - info_stack += 1 - cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, + (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArCamera): @@ -1514,7 +1489,8 @@ class ArContext(DataFeatures.PipelineStepObject): watch_time = math.nan info_stack += 1 - cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', + (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArFrame): @@ -1527,17 +1503,18 @@ class ArContext(DataFeatures.PipelineStepObject): look_time = math.nan info_stack += 1 - cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', + (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if draw_exceptions: # Write exceptions while self.__exceptions: - e = self.__exceptions.pop() i = len(self.__exceptions) - cv2.rectangle(image, (0, height-(i+1)*50), (width, height-(i)*50), (0, 0, 127), -1) - cv2.putText(image, f'error: {e}', (20, height-(i+1)*50+25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - (i) * 50), (0, 0, 127), -1) + cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, + (255, 255, 255), 1, cv2.LINE_AA) return image |