"""ArGaze pipeline assets.""" """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import logging import threading import math import os import ast from typing import Iterator, Union import cv2 import numpy from argaze import DataFeatures, GazeFeatures from argaze.AreaOfInterest import * from argaze.utils import UtilsFeatures class PoseEstimationFailed(Exception): """ Exception raised by ArScene estimate_pose method when the pose can't be estimated due to inconsistencies. """ def __init__(self, message, inconsistencies=None): super().__init__(message) self.inconsistencies = inconsistencies class SceneProjectionFailed(Exception): """ Exception raised by ArCamera watch method when the scene can't be projected. """ def __init__(self, message): super().__init__(message) class DrawingFailed(Exception): """ Exception raised when drawing fails. """ def __init__(self, message): super().__init__(message) # Define default ArLayer draw parameters DEFAULT_ARLAYER_DRAW_PARAMETERS = { "draw_aoi_scene": { "draw_aoi": { "color": (255, 255, 255), "border_size": 1 } }, "draw_aoi_matching": { "draw_matched_fixation": { "deviation_circle_color": (255, 255, 255) }, "draw_matched_fixation_positions": { "position_color": (0, 255, 255), "line_color": (0, 0, 0) }, "draw_matched_region": { "color": (0, 255, 0), "border_size": 4 }, "draw_looked_aoi": { "color": (0, 255, 0), "border_size": 2 }, "looked_aoi_name_color": (255, 255, 255), "looked_aoi_name_offset": (0, -10) } } class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a space where to make matching of gaze movements and AOI and inside which those matching need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads. """ @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArLayer.""" # Init parent classes DataFeatures.SharedObject.__init__(self) # Init private attributes self.__aoi_scene = None self.__aoi_matcher = None self.__aoi_scan_path = None self.__aoi_scan_path_analyzers = [] self.__gaze_movement = GazeFeatures.GazeMovement() self.__looked_aoi_name = None self.__aoi_scan_path_analyzed = False # Init pipeline step object attributes self.draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS @property def aoi_scene(self) -> AOIFeatures.AOIScene: """AOI scene description.""" return self.__aoi_scene @aoi_scene.setter def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene | str | dict): new_aoi_scene = None if issubclass(type(aoi_scene_value), AOIFeatures.AOIScene): new_aoi_scene = aoi_scene_value # str: relative path to file elif type(aoi_scene_value) is str: filepath = os.path.join(DataFeatures.get_working_directory(), aoi_scene_value) file_format = filepath.split('.')[-1] # JSON file format for 2D or 3D dimension if file_format == 'json': new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath) # SVG file format for 2D dimension only if file_format == 'svg': new_aoi_scene = AOI2DScene.AOI2DScene.from_svg(filepath) # OBJ file format for 3D dimension only elif file_format == 'obj': new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath) # dict: elif type(aoi_scene_value) is dict: new_aoi_scene = AOIFeatures.AOIScene.from_dict(aoi_scene_value) else: raise ValueError("Bad aoi scene value") # Cast aoi scene to its effective dimension if new_aoi_scene.dimension == 2: self.__aoi_scene = AOI2DScene.AOI2DScene(new_aoi_scene) elif new_aoi_scene.dimension == 3: self.__aoi_scene = AOI3DScene.AOI3DScene(new_aoi_scene) # Edit parent if self.__aoi_scene is not None: self.__aoi_scene.parent = self @property def aoi_matcher(self) -> GazeFeatures.AOIMatcher: """Select AOI matcher object.""" return self.__aoi_matcher @aoi_matcher.setter @DataFeatures.PipelineStepAttributeSetter def aoi_matcher(self, aoi_matcher: GazeFeatures.AOIMatcher): assert (issubclass(type(aoi_matcher), GazeFeatures.AOIMatcher)) self.__aoi_matcher = aoi_matcher # Edit parent if self.__aoi_matcher is not None: self.__aoi_matcher.parent = self @property def aoi_scan_path(self) -> GazeFeatures.AOIScanPath: """AOI scan path object.""" return self.__aoi_scan_path @aoi_scan_path.setter @DataFeatures.PipelineStepAttributeSetter def aoi_scan_path(self, aoi_scan_path: GazeFeatures.AOIScanPath): assert (isinstance(aoi_scan_path, GazeFeatures.AOIScanPath)) self.__aoi_scan_path = aoi_scan_path # Update expected AOI self._update_expected_aoi() # Edit parent if self.__aoi_scan_path is not None: self.__aoi_scan_path.parent = self @property def aoi_scan_path_analyzers(self) -> list: """AOI scan path analyzers list.""" return self.__aoi_scan_path_analyzers # noinspection PyUnresolvedReferences @aoi_scan_path_analyzers.setter @DataFeatures.PipelineStepAttributeSetter def aoi_scan_path_analyzers(self, aoi_scan_path_analyzers: list): self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers # Connect analyzers if required for analyzer in self.__aoi_scan_path_analyzers: assert (issubclass(type(analyzer), GazeFeatures.AOIScanPathAnalyzer)) # Check scan path analyzer properties type for name, item in type(analyzer).__dict__.items(): if isinstance(item, property) and item.fset is not None: # Check setter annotations to get expected value type try: property_type = list(item.fset.__annotations__.values())[0] except KeyError: raise (ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) if issubclass(property_type, GazeFeatures.AOIScanPathAnalyzer): # Search for analyzer instance to set property found = False for a in self.__aoi_scan_path_analyzers: if type(a) is property_type: setattr(analyzer, name, a) found = True if not found: raise DataFeatures.PipelineStepLoadingFailed( f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path is None: self.__aoi_scan_path = GazeFeatures.ScanPath() # Edit parent for analyzer in self.__aoi_scan_path_analyzers: analyzer.parent = self def last_looked_aoi_name(self) -> str: """Get last looked aoi name.""" return self.__looked_aoi_name def is_analysis_available(self) -> bool: """Are aoi scan path analysis ready?""" return self.__aoi_scan_path_analyzed def analysis(self) -> dict: """Get all aoi scan path analysis into dictionary.""" analysis = {} for analyzer in self.__aoi_scan_path_analyzers: analysis[DataFeatures.get_class_path(analyzer)] = analyzer.analysis() return analysis def as_dict(self) -> dict: """Export ArLayer properties as dictionary.""" return { **DataFeatures.PipelineStepObject.as_dict(self), "aoi_scene": self.__aoi_scene, "aoi_matcher": self.__aoi_matcher, "aoi_scan_path": self.__aoi_scan_path, "aoi_scan_path_analyzers": self.__aoi_scan_path_analyzers, "draw_parameters": self._draw_parameters } def _update_expected_aoi(self): """Update expected AOI of AOI scan path considering AOI scene and layer name.""" if self.__aoi_scene is None: logging.debug('ArLayer._update_expected_aoi %s (parent: %s): missing aoi scene', self.name, self.parent) return logging.debug('ArLayer._update_expected_aoi %s (parent: %s)', self.name, self.parent) # Get aoi names from aoi scene expected_aoi = list(self.__aoi_scene.keys()) # Remove layer name from expected aoi if self.name in expected_aoi: expected_aoi.remove(self.name) # Update expected aoi: this will clear the scan path self.__aoi_scan_path.expected_aoi = expected_aoi @DataFeatures.PipelineStepMethod def look(self, gaze_movement: GazeFeatures.GazeMovement = None): """ Project timestamped gaze movement into layer. !!! warning Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute. Parameters: gaze_movement: gaze movement to project """ if gaze_movement is None: gaze_movement = GazeFeatures.GazeMovement() # Use layer lock feature with self._lock: logging.debug('ArLayer.look %s (parent: %s)', self.name, self.parent.name) # Update current gaze movement self.__gaze_movement = gaze_movement # No looked aoi by default self.__looked_aoi_name = None # Reset aoi scan path analyzed state self.__aoi_scan_path_analyzed = False if self.__aoi_matcher is not None and self.__aoi_scene is not None: # Update looked aoi thanks to aoi matcher # Note: don't filter valid/invalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally self.__looked_aoi_name, _ = self.__aoi_matcher.match(gaze_movement, self.__aoi_scene) logging.debug('\t> looked aoi name: %s', self.__looked_aoi_name) # Valid and finished gaze movement has been identified if gaze_movement and gaze_movement.is_finished(): if GazeFeatures.is_fixation(gaze_movement): # Append fixation to aoi scan path # TODO: add an option to filter None looked_aoi_name or not if self.__aoi_scan_path is not None: logging.debug('\t> append fixation') aoi_scan_step = self.__aoi_scan_path.append_fixation(gaze_movement, self.__looked_aoi_name) # Is there a new step? if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1: logging.debug('\t> analyse aoi scan path') # Analyze aoi scan path for aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers: aoi_scan_path_analyzer.analyze(self.__aoi_scan_path, timestamp=gaze_movement.timestamp) # Update aoi scan path analyzed state self.__aoi_scan_path_analyzed = True elif GazeFeatures.is_saccade(gaze_movement): # Append saccade to aoi scan path if self.__aoi_scan_path is not None: logging.debug('\t> append saccade') self.__aoi_scan_path.append_saccade(gaze_movement) @DataFeatures.PipelineStepDraw def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ Draw into image. Parameters: image: image where to draw. draw_aoi_scene: [AOI2DScene.draw][argaze.AreaOfInterest.AOI2DScene.AOI2DScene.draw] parameters (if None, no aoi scene is drawn) draw_aoi_matching: [AOIMatcher.draw][argaze.GazeFeatures.AOIMatcher.draw] parameters (which depends on the loaded aoi matcher module, if None, no aoi matching is drawn) """ # Use layer lock feature with self._lock: # Draw aoi if required if draw_aoi_scene is not None and self.__aoi_scene is not None: self.__aoi_scene.draw(image, **draw_aoi_scene) # Draw aoi matching if required if draw_aoi_matching is not None and self.__aoi_matcher is not None: self.__aoi_matcher.draw(image, self.__aoi_scene, **draw_aoi_matching) # Define default ArFrame image parameters DEFAULT_ARFRAME_IMAGE_PARAMETERS = { "background_weight": 1., "heatmap_weight": 0.5, "draw_scan_path": { "draw_fixations": { "deviation_circle_color": (255, 255, 255), "duration_border_color": (127, 127, 127), "duration_factor": 1e-2 }, "draw_saccades": { "line_color": (255, 255, 255) }, "deepness": 0 }, "draw_gaze_positions": { "color": (0, 255, 255), "size": 2 } } class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads """ @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """ Initialize ArFrame.""" # Init parent classes DataFeatures.SharedObject.__init__(self) # Init private attributes self.__size = (1, 1) self.__gaze_position_calibrator = None self.__gaze_movement_identifier = None self.__filter_in_progress_identification = True self.__scan_path = None self.__scan_path_analyzers = [] self.__background = DataFeatures.TimestampedImage(numpy.full((1, 1, 3), 127).astype(numpy.uint8)) self.__heatmap = None self.__calibrated_gaze_position = GazeFeatures.GazePosition() self.__identified_gaze_movement = GazeFeatures.GazeMovement() self.__scan_path_analyzed = False # Init protected attributes self._layers = {} self._image_parameters = DEFAULT_ARFRAME_IMAGE_PARAMETERS @property def size(self) -> tuple[int, int]: """Defines the dimension of the rectangular area where gaze positions are projected.""" return self.__size @size.setter def size(self, size: tuple[int, int]): self.__size = size if self.background.size != self.__size: # Resize background to current size self.background = self.background @property def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator: """Select gaze position calibration algorithm.""" return self.__gaze_position_calibrator @gaze_position_calibrator.setter @DataFeatures.PipelineStepAttributeSetter def gaze_position_calibrator(self, gaze_position_calibrator: GazeFeatures.GazePositionCalibrator): assert (issubclass(type(gaze_position_calibrator), GazeFeatures.GazePositionCalibrator)) self.__gaze_position_calibrator = gaze_position_calibrator # Edit parent if self.__gaze_position_calibrator is not None: self.__gaze_position_calibrator.parent = self @property def gaze_movement_identifier(self) -> GazeFeatures.GazeMovementIdentifier: """Select gaze movement identification algorithm.""" return self.__gaze_movement_identifier @gaze_movement_identifier.setter @DataFeatures.PipelineStepAttributeSetter def gaze_movement_identifier(self, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier): assert (issubclass(type(gaze_movement_identifier), GazeFeatures.GazeMovementIdentifier)) self.__gaze_movement_identifier = gaze_movement_identifier # Edit parent if self.__gaze_movement_identifier is not None: self.__gaze_movement_identifier.parent = self @property def filter_in_progress_identification(self) -> bool: """Is frame ignores in progress gaze movement identification?""" return self.__filter_in_progress_identification @filter_in_progress_identification.setter @DataFeatures.PipelineStepAttributeSetter def filter_in_progress_identification(self, filter_in_progress_identification: bool = True): self.__filter_in_progress_identification = filter_in_progress_identification @property def scan_path(self) -> GazeFeatures.ScanPath: """Scan path object.""" return self.__scan_path @scan_path.setter @DataFeatures.PipelineStepAttributeSetter def scan_path(self, scan_path: GazeFeatures.ScanPath): assert (isinstance(scan_path, GazeFeatures.ScanPath)) self.__scan_path = scan_path # Edit parent if self.__scan_path is not None: self.__scan_path.parent = self @property def scan_path_analyzers(self) -> list: """Scan path analyzers list.""" return self.__scan_path_analyzers # noinspection PyUnresolvedReferences @scan_path_analyzers.setter @DataFeatures.PipelineStepAttributeSetter def scan_path_analyzers(self, scan_path_analyzers: list): self.__scan_path_analyzers = scan_path_analyzers # Connect analyzers if required for analyzer in self.__scan_path_analyzers: assert (issubclass(type(analyzer), GazeFeatures.ScanPathAnalyzer)) # Check scan path analyzer properties type for name, item in type(analyzer).__dict__.items(): if isinstance(item, property) and item.fset is not None: # Check setter annotations to get expected value type try: property_type = list(item.fset.__annotations__.values())[0] except KeyError: raise (ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) if issubclass(property_type, GazeFeatures.AOIScanPathAnalyzer): # Search for analyzer instance to set property found = False for a in self.__scan_path_analyzers: if type(a) is property_type: setattr(analyzer, name, a) found = True if not found: raise DataFeatures.PipelineStepLoadingFaile( f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation if len(self.__scan_path_analyzers) > 0 and self.__scan_path is None: self.__scan_path = GazeFeatures.ScanPath() # Edit parent for analyzer in self.__scan_path_analyzers: analyzer.parent = self @property def background(self) -> numpy.array: """Picture to draw behind.""" return self.__background @background.setter @DataFeatures.PipelineStepAttributeSetter def background(self, background: DataFeatures.TimestampedImage): assert (isinstance(background, DataFeatures.TimestampedImage)) if background.size != self.size: # Resize image to frame size self.__background = DataFeatures.TimestampedImage(cv2.resize(background, dsize=self.size, interpolation=cv2.INTER_CUBIC), background.timestamp) else: self.__background = background @property def heatmap(self) -> AOIFeatures.Heatmap: """Heatmap object.""" return self.__heatmap @heatmap.setter @DataFeatures.PipelineStepAttributeSetter def heatmap(self, heatmap: AOIFeatures.Heatmap): assert (isinstance(heatmap, AOIFeatures.Heatmap)) self.__heatmap = heatmap # Default heatmap size equals frame size if self.__heatmap.size == (1, 1): self.__heatmap.size = self.size # Edit parent if self.__heatmap is not None: self.__heatmap.parent = self @property def layers(self) -> dict: """Layers dictionary.""" return self._layers @layers.setter @DataFeatures.PipelineStepAttributeSetter def layers(self, layers: dict): self._layers = {} for layer_name, layer_data in layers.items(): self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # Edit parent for name, layer in self._layers.items(): layer.parent = self def last_gaze_position(self) -> object: """Get last calibrated gaze position""" return self.__calibrated_gaze_position def last_gaze_movement(self) -> object: """Get last identified gaze movement""" return self.__identified_gaze_movement def is_analysis_available(self) -> bool: """Are scan path analysis ready?""" return self.__scan_path_analyzed def analysis(self) -> dict: """Get all scan path analysis into dictionary.""" analysis = {} for analyzer in self.__scan_path_analyzers: analysis[DataFeatures.get_class_path(analyzer)] = analyzer.analysis() return analysis def as_dict(self) -> dict: """Export ArFrame attributes as dictionary. Returns: frame_data: dictionary with frame attributes values. """ d = { **DataFeatures.PipelineStepObject.as_dict(self), "size": self.__size, "gaze_position_calibrator": self.__gaze_position_calibrator, "gaze_movement_identifier": self.__gaze_movement_identifier, "filter_in_progress_identification": self.__filter_in_progress_identification, "scan_path": self.__scan_path, "scan_path_analyzers": self.__scan_path_analyzers, "background": self.__background, "heatmap": self.__heatmap, "layers": self._layers, "image_parameters": self._image_parameters } return d @DataFeatures.PipelineStepMethod def look(self, timestamped_gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): """ Project timestamped gaze position into frame. !!! warning Be aware that gaze positions are in the same range of value than size attribute. Parameters: timestamped_gaze_position: gaze position to project """ # Use frame lock feature with self._lock: # No gaze movement identified by default self.__identified_gaze_movement = GazeFeatures.GazeMovement() # Reset scan path analyzed state self.__scan_path_analyzed = False # Apply gaze position calibration if self.__gaze_position_calibrator is not None: self.__calibrated_gaze_position = self.__gaze_position_calibrator.apply(timestamped_gaze_position) # Or update gaze position at least else: self.__calibrated_gaze_position = timestamped_gaze_position # Identify gaze movement if self.__gaze_movement_identifier is not None: # Identify finished gaze movement self.__identified_gaze_movement = self.__gaze_movement_identifier.identify( self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified if self.__identified_gaze_movement and self.__identified_gaze_movement.is_finished(): if GazeFeatures.is_fixation(self.__identified_gaze_movement): # Append fixation to scan path if self.__scan_path is not None: self.__scan_path.append_fixation(self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): # Append saccade to scan path if self.__scan_path is not None: scan_step = self.__scan_path.append_saccade(self.__identified_gaze_movement) # Is there a new step? if scan_step and len(self.__scan_path) > 1: # Analyze aoi scan path for scan_path_analyzer in self.__scan_path_analyzers: scan_path_analyzer.analyze(self.__scan_path, timestamp=self.__identified_gaze_movement.timestamp) # Update scan path analyzed state self.__scan_path_analyzed = True # No valid finished gaze movement: optionally stop in progress identification filtering elif self.__gaze_movement_identifier is not None and not self.__filter_in_progress_identification: self.__identified_gaze_movement = self.__gaze_movement_identifier.current_gaze_movement() # Update heatmap if self.__heatmap is not None: # Scale gaze position value scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image self.__heatmap.update(self.__calibrated_gaze_position * scale, timestamp=self.__calibrated_gaze_position.timestamp) # Look layers with valid identified gaze movement # Note: don't filter valid/invalid finished/unfinished gaze movement to allow layers to reset internally for layer_name, layer in self._layers.items(): layer.look(self.__identified_gaze_movement) @DataFeatures.PipelineStepImage def image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualizations. Parameters: background_weight: weight of background overlay heatmap_weight: weight of heatmap overlay draw_gaze_position_calibrator: [GazeFeatures.GazePositionCalibrator.draw](argaze.md/#argaze.GazeFeatures.GazePositionCalibrator.draw) parameters (if None, nothing is drawn) draw_scan_path: [GazeFeatures.ScanPath.draw](argaze.md/#argaze.GazeFeatures.ScanPath.draw) parameters (if None, no scan path is drawn) draw_layers: dictionary of [ArLayer.draw](argaze.md/#argaze.ArFeatures.ArLayer.draw) parameters per layer (if None, no layer is drawn) draw_gaze_positions: [GazeFeatures.GazePosition.draw](argaze.md/#argaze.GazeFeatures.GazePosition.draw) parameters (if None, no gaze position is drawn) draw_fixations: [GazeFeatures.Fixation.draw](argaze.md/#argaze.GazeFeatures.Fixation.draw) parameters (if None, no fixation is drawn) draw_saccades: [GazeFeatures.Saccade.draw](argaze.md/#argaze.GazeFeatures.Saccade.draw) parameters (if None, no saccade is drawn) """ logging.debug('ArFrame.image %s', self.name) # Use frame lock feature with self._lock: # Draw background only if background_weight is not None and (heatmap_weight is None or self.__heatmap is None): logging.debug('\t> drawing background only') image = self.__background.copy() # Draw mix background and heatmap if required elif background_weight is not None and heatmap_weight is not None and self.__heatmap: logging.debug('\t> drawing background and heatmap') background_image = self.__background.copy() heatmap_image = cv2.resize(self.__heatmap.image(), dsize=self.__size, interpolation=cv2.INTER_LINEAR) image = cv2.addWeighted(heatmap_image, heatmap_weight, background_image, background_weight, 0) # Draw heatmap only elif background_weight is None and heatmap_weight is not None and self.__heatmap: logging.debug('\t> drawing heatmap only') image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR) # Draw black image else: logging.debug('\t> drawing black image') image = numpy.full((self.__size[1], self.__size[0], 3), 0).astype(numpy.uint8) # Draw gaze position calibrator if draw_gaze_position_calibrator is not None: logging.debug('\t> drawing gaze position calibrator') self.__gaze_position_calibrator.draw(image, size=self.__size, **draw_gaze_position_calibrator) # Draw scan path if required if draw_scan_path is not None and self.__scan_path is not None: logging.debug('\t> drawing scan path') self.__scan_path.draw(image, **draw_scan_path) # Draw current fixation if required if draw_fixations is not None and self.__gaze_movement_identifier is not None: if self.__gaze_movement_identifier.current_fixation(): logging.debug('\t> drawing current fixation') self.__gaze_movement_identifier.current_fixation().draw(image, **draw_fixations) # Draw current saccade if required if draw_saccades is not None and self.__gaze_movement_identifier is not None: if self.__gaze_movement_identifier.current_saccade(): logging.debug('\t> drawing current saccade') self.__gaze_movement_identifier.current_saccade().draw(image, **draw_saccades) # Draw layers if required if draw_layers is not None: for layer_name, draw_layer in draw_layers.items(): try: logging.debug('\t> drawing %s layer', layer_name) self._layers[layer_name].draw(image, **draw_layer) except KeyError: raise (DrawingFailed(f'\'{layer_name}\' layer doesn\'t exist.')) # Draw current gaze position if required if draw_gaze_positions is not None: logging.debug('\t> drawing current gaze position') self.__calibrated_gaze_position.draw(image, **draw_gaze_positions) logging.debug('\t> returning image (%i x %i)', image.shape[1], image.shape[0]) return DataFeatures.TimestampedImage(image, timestamp=self.__background.timestamp) class ArScene(DataFeatures.PipelineStepObject): """ Define abstract Augmented Reality scene with ArLayers and ArFrames inside. """ # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArScene""" # Init private attributes self._layers = {} self.__frames = {} self.__angle_tolerance = 0. self.__distance_tolerance = 0. @property def layers(self) -> dict: """Dictionary of ArLayers to project once the pose is estimated. See [project][argaze.ArFeatures.ArScene.project] function below.""" return self._layers @layers.setter @DataFeatures.PipelineStepAttributeSetter def layers(self, layers: dict): self._layers = {} for layer_name, layer_data in layers.items(): if type(layer_data) is dict: self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # str: relative path to JSON file elif type(layer_data) is str: self._layers[layer_name] = DataFeatures.from_json( os.path.join(DataFeatures.get_working_directory(), layer_data)) # Edit parent for name, layer in self._layers.items(): layer.parent = self @property def frames(self) -> dict: """Dictionary of ArFrames to project once the pose is estimated. See [project][argaze.ArFeatures.ArScene.project] function below.""" return self.__frames @frames.setter @DataFeatures.PipelineStepAttributeSetter def frames(self, frames: dict): self.__frames = {} for frame_name, frame_data in frames.items(): if type(frame_data) is dict: new_frame = ArFrame(name=frame_name, **frame_data) # str: relative path to JSON file elif type(frame_data) is str: new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data)) # Loaded frame name have to be equals to dictionary key assert (new_frame.name == frame_name) else: raise ValueError("Bad frame data.") # Look for a scene layer with an AOI named like the frame for scene_layer_name, scene_layer in self.layers.items(): try: frame_3d = scene_layer.aoi_scene[frame_name] try: # Check that the frame have a layer named like this scene layer aoi_2d_scene = new_frame.layers[scene_layer_name].aoi_scene # Transform 2D frame layer AOI into 3D scene layer AOI # Then, add them to scene layer scene_layer.aoi_scene |= aoi_2d_scene.dimensionalize(frame_3d, new_frame.size) except KeyError as e: # Warn user about missing layer even if it is possible logging.warning('ArScene.frames: %s layer doesn\'t exist in %s frame', e, new_frame.name) except KeyError as e: # Warn user about missing AOI even if it is possible logging.warning('ArScene.frames: %s AOI doesn\'t exist in %s layer of %s scene', e, scene_layer_name, self.name) # Append new frame self.__frames[frame_name] = new_frame # Edit parent for name, frame in self.__frames.items(): frame.parent = self @property def angle_tolerance(self) -> float: """Angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.""" return self.__angle_tolerance @angle_tolerance.setter def angle_tolerance(self, value: float): self.__angle_tolerance = value @property def distance_tolerance(self) -> float: """Distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.""" return self.__distance_tolerance @distance_tolerance.setter def distance_tolerance(self, value: float): self.__distance_tolerance = value def as_dict(self) -> dict: """Export ArScene properties as dictionary.""" return { **DataFeatures.PipelineStepObject.as_dict(self), "layers": self._layers, "frames": self.__frames, "angle_tolerance": self.__angle_tolerance, "distance_tolerance": self.__distance_tolerance } @DataFeatures.PipelineStepMethod def estimate_pose(self, detected_features: any) -> tuple[numpy.array, numpy.array, any]: """Define abstract estimate scene pose method. Parameters: detected_features: any features detected by parent ArCamera that will help in scene pose estimation. Returns: tvec: scene translation vector rvec: scene rotation matrix extra: any data about pose estimation """ raise NotImplementedError('estimate_pose() method not implemented') @DataFeatures.PipelineStepMethod def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]: """Project layers according estimated pose and optional field of view clipping angles. Parameters: tvec: translation vector rvec: rotation vector visual_hfov: horizontal field of view clipping angle visual_vfov: vertical field of view clipping angle Returns: iterator: name of projected layer and AOI2DScene projection """ for name, layer in self._layers.items(): # TODO: if greater than 0., use HFOV and VFOV # to clip AOI out of the visual horizontal field of view # Copy aoi scene before projection aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene # noinspection PyUnresolvedReferences yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) class ArCamera(ArFrame): """ Define abstract Augmented Reality camera as ArFrame with ArScenes inside. """ @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArCamera.""" # Init ArFrame class super().__init__() # Init private attributes self.__visual_hfov = 0. self.__visual_vfov = 0. self.__projection_cache = None self.__projection_cache_writer = None self.__projection_cache_reader = None self.__projection_cache_data = None # Init protected attributes self._scenes = {} @ArFrame.layers.setter @DataFeatures.PipelineStepAttributeSetter def layers(self, layers: dict): self._layers = {} for layer_name, layer_data in layers.items(): self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # Edit parent for name, layer in self._layers.items(): layer.parent = self # Update expected and excluded aoi self._update_expected_and_excluded_aoi() @property def scenes(self) -> dict: """All scenes to project into camera frame.""" return self._scenes @scenes.setter @DataFeatures.PipelineStepAttributeSetter def scenes(self, scenes: dict): self._scenes = {} for scene_name, scene_data in scenes.items(): self._scenes[scene_name] = ArScene(name=scene_name, **scene_data) # Edit parent for name, scene in self._scenes.items(): scene.parent = self # Update expected and excluded aoi self._update_expected_and_excluded_aoi() @property def visual_hfov(self) -> float: """Angle in degree to clip scenes projection according visual horizontal field of view (HFOV).""" return self.__visual_hfov @visual_hfov.setter def visual_hfov(self, value: float): """Set camera's visual horizontal field of view.""" self.__visual_hfov = value @property def visual_vfov(self) -> float: """Angle in degree to clip scenes projection according visual vertical field of view (VFOV).""" return self.__visual_vfov @visual_vfov.setter def visual_vfov(self, value: float): """Set camera's visual vertical field of view.""" self.__visual_vfov = value @property def projection_cache(self) -> str: """file path to store/read layers projections into/from a cache.""" return self.__projection_cache @projection_cache.setter def projection_cache(self, projection_cache: str): self.__projection_cache = projection_cache # The file doesn't exist yet: store projections into the cache if not os.path.exists(os.path.join( DataFeatures.get_working_directory(), self.__projection_cache) ): self.__projection_cache_writer = UtilsFeatures.FileWriter(path=self.__projection_cache) self.__projection_cache_reader = None logging.info('ArCamera %s writes projection into %s', self.name, self.__projection_cache) # The file exist: read projection from the cache else: self.__projection_cache_writer = None self.__projection_cache_reader = UtilsFeatures.FileReader(path=self.__projection_cache) logging.info('ArCamera %s reads projection from %s', self.name, self.__projection_cache) def _clear_projection(self): """Clear layers projection.""" logging.debug('ArCamera._clear_projection %s', self.name) for layer_name, layer in self.layers.items(): # Initialize layer if needed if layer.aoi_scene is None: layer.aoi_scene = AOI2DScene.AOI2DScene() else: layer.aoi_scene.clear() def _write_projection_cache(self, timestamp: int|float, exception = None): """Write layers aoi scene into the projection cache. Parameters: timestamp: cache time """ if self.__projection_cache_writer is not None: logging.debug('ArCamera._write_projection_cache %s %f', self.name, timestamp) if exception is None: projection = {} for layer_name, layer in self.layers.items(): projection[layer_name] = layer.aoi_scene self.__projection_cache_writer.write( (timestamp, projection) ) else: self.__projection_cache_writer.write( (timestamp, exception) ) def _read_projection_cache(self, timestamp: int|float): """Read layers aoi scene from the projection cache. Parameters: timestamp: cache time. Returns: success: False if there is no projection cache, True otherwise. """ if self.__projection_cache_reader is None: return False logging.debug('ArCamera._read_projection_cache %s %f', self.name, timestamp) # Clear former projection self._clear_projection() try: # Read first data if not done yet if self.__projection_cache_data is None: self.__projection_cache_data = self.__projection_cache_reader.read() # Continue reading cache until correct timestamped projection while float(self.__projection_cache_data[0]) < timestamp: self.__projection_cache_data = self.__projection_cache_reader.read() # No more projection in the cache except EOFError: raise DataFeatures.TimestampedException("Projection cache is empty", timestamp=timestamp) # Correct timestamped projection is found if float(self.__projection_cache_data[0]) == timestamp: # When correct timestamped projection is found projection = {} try: projection = ast.literal_eval(self.__projection_cache_data[1]) for layer_name, aoi_scene in projection.items(): self._layers[layer_name].aoi_scene = AOI2DScene.AOI2DScene(aoi_scene) self._layers[layer_name].timestamp = timestamp logging.debug('> reading %s projection from cache', layer_name) except SyntaxError as e: raise DataFeatures.TimestampedException(self.__projection_cache_data[1], timestamp=timestamp) return True def scene_frames(self) -> Iterator[ArFrame]: """Iterate over all scenes frames""" # For each scene for scene_name, scene in self._scenes.items(): # For each scene frame for name, scene_frame in scene.frames.items(): yield scene_frame def as_dict(self) -> dict: """Export ArCamera properties as dictionary.""" return { **ArFrame.as_dict(self), "scenes": self._scenes, "visual_hfov": self.__visual_hfov, "visual_vfov": self.__visual_vfov } @DataFeatures.PipelineStepEnter def __enter__(self): if self.__projection_cache_writer is not None: self.__projection_cache_writer.__enter__() if self.__projection_cache_reader is not None: self.__projection_cache_reader.__enter__() @DataFeatures.PipelineStepExit def __exit__(self, exception_type, exception_value, exception_traceback): if self.__projection_cache_writer is not None: self.__projection_cache_writer.__exit__(exception_type, exception_value, exception_traceback) if self.__projection_cache_reader is not None: self.__projection_cache_reader.__exit__(exception_type, exception_value, exception_traceback) def _update_expected_and_excluded_aoi(self): """Edit expected aoi of each layer aoi scan path with the aoi of corresponding scene layer. Edit excluded aoi to ignore frame aoi from aoi matching. """ if not self._layers or not self._scenes: logging.debug('ArCamera._update_expected_and_excluded_aoi %s: missing layers or scenes', self.name) return logging.debug('ArCamera._update_expected_and_excluded_aoi %s', self.name) for layer_name, layer in self._layers.items(): expected_aoi_list = [] excluded_aoi_list = [] for scene_name, scene in self._scenes.items(): # Append scene layer aoi to corresponding expected camera layer aoi try: scene_layer = scene.layers[layer_name] expected_aoi_list.extend(list(scene_layer.aoi_scene.keys())) except KeyError: continue # Remove scene frame from expected camera layer aoi # Exclude scene frame from camera layer aoi matching for frame_name, frame in scene.frames.items(): try: expected_aoi_list.remove(frame_name) excluded_aoi_list.append(frame_name) except ValueError: continue if layer.aoi_scan_path is not None: layer.aoi_scan_path.expected_aoi = expected_aoi_list if layer.aoi_matcher is not None: layer.aoi_matcher.exclude = excluded_aoi_list @DataFeatures.PipelineStepMethod def watch(self, image: numpy.array): """Detect AR features from image and project scenes into camera frame. Parameters: image: image where to extract AR features """ raise NotImplementedError('watch() method not implemented') @DataFeatures.PipelineStepMethod def look(self, timestamped_gaze_position: GazeFeatures.GazePosition): """Project timestamped gaze position into each scene frames. !!! warning watch method needs to be called first. Parameters: timestamped_gaze_position: gaze position to project """ # Project timestamped gaze position into camera frame # NOTE: the call to super().look method uses unwrap option to disable observers notification # as they are already notified that this look method is called. Cf DataFeatures.PipelineStepMethod.wrapper. super().look(timestamped_gaze_position, unwrap=True) # Use camera frame lock feature with self._lock: # Project gaze position into each scene frames if possible for scene_frame in self.scene_frames(): # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? for camera_layer_name, camera_layer in self.layers.items(): if camera_layer.aoi_scene: try: aoi_2d = camera_layer.aoi_scene[scene_frame.name] if timestamped_gaze_position: # TODO?: Should we prefer to use camera frame AOIMatcher object? if aoi_2d.contains_point(timestamped_gaze_position): inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position) # QUESTION: How to project gaze precision? inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp) # Project inner gaze position into scene frame scene_frame.look(inner_gaze_position * scene_frame.size) # Ignore missing aoi in camera frame layer projection except KeyError: pass @DataFeatures.PipelineStepMethod def map(self): """Project camera frame background into scene frames background. !!! warning watch method needs to be called first. """ # Use camera frame lock feature with self._lock: # Project camera frame background into each scene frame if possible for frame in self.scene_frames(): # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? for camera_layer_name, camera_layer in self.layers.items(): try: aoi_2d = camera_layer.aoi_scene[frame.name] # Apply perspective transform algorithm to fill aoi frame background width, height = frame.size destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) frame.background = DataFeatures.TimestampedImage( cv2.warpPerspective(self.background, mapping, (width, height)), timestamp=self.background.timestamp) # Ignore missing frame projection except KeyError: pass # Define default ArContext image parameters DEFAULT_ARCONTEXT_IMAGE_PARAMETERS = { "draw_times": True, "draw_exceptions": True } class ArContext(DataFeatures.PipelineStepObject): """ Defines abstract Python context manager to handle pipeline inputs. """ # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): # Init private attributes self.__pipeline = None self.__catch_exceptions = True self.__exceptions = DataFeatures.TimestampedExceptions() # Init gaze position processing assessment self.__process_gaze_position_chrono = UtilsFeatures.TimeProbe() self.__process_gaze_position_frequency = 0 # Init camera image processing assessment self.__process_camera_image_chrono = UtilsFeatures.TimeProbe() self.__process_camera_image_frequency = 0 # Init protected attributes self._stop_event = threading.Event() self._pause_event = threading.Event() self._image_parameters = DEFAULT_ARCONTEXT_IMAGE_PARAMETERS @property def pipeline(self) -> DataFeatures.PipelineStepObject: """ArFrame used to process gaze data or ArCamera used to process gaze data and video of environment.""" return self.__pipeline @pipeline.setter @DataFeatures.PipelineStepAttributeSetter def pipeline(self, pipeline: DataFeatures.PipelineStepObject): assert (issubclass(type(pipeline), DataFeatures.PipelineStepObject)) self.__pipeline = pipeline @property def catch_exceptions(self) -> bool: """Catch pipeline exception to display them instead of crashing execution.""" return self.__catch_exceptions @catch_exceptions.setter def catch_exceptions(self, catch_exceptions: bool): self.__catch_exceptions = catch_exceptions def exceptions(self) -> DataFeatures.TimestampedExceptions: """Get exceptions list""" return self.__exceptions def as_dict(self) -> dict: """Export ArContext properties as dictionary.""" return { **DataFeatures.PipelineStepObject.as_dict(self), "pipeline": self.__pipeline, "catch_exceptions": self.__catch_exceptions, "image_parameters": self._image_parameters } @DataFeatures.PipelineStepEnter def __enter__(self): """Enter into ArContext.""" self.__process_gaze_position_chrono.start() self.__process_camera_image_chrono.start() return self @DataFeatures.PipelineStepExit def __exit__(self, exception_type, exception_value, exception_traceback): """Exit from ArContext.""" pass def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None, precision: int | float = None): """Request pipeline to process new gaze position at a timestamp.""" logging.debug('ArContext._process_gaze_position %s', self.name) # Assess gaze position processing frequency lap_time, nb_laps, elapsed_time = self.__process_gaze_position_chrono.lap() if elapsed_time > 1e3: self.__process_gaze_position_frequency = nb_laps self.__process_gaze_position_chrono.restart() if issubclass(type(self.__pipeline), ArFrame): try: if x is None and y is None: # Edit empty gaze position self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp), catch_exceptions=self.__catch_exceptions) else: # Edit gaze position self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp), catch_exceptions=self.__catch_exceptions) except DataFeatures.TimestampedException as e: self.__exceptions.append(e) else: raise (TypeError('Pipeline is not ArFrame instance.')) def _process_camera_image(self, timestamp: int | float, image: numpy.array): """Request pipeline to process new camera image at a timestamp.""" logging.debug('ArContext._process_camera_image %s', self.name) # Assess camera image processing frequency lap_time, nb_laps, elapsed_time = self.__process_camera_image_chrono.lap() if elapsed_time > 1e3: self.__process_camera_image_frequency = nb_laps self.__process_camera_image_chrono.restart() if issubclass(type(self.__pipeline), ArCamera): height, width, _ = image.shape # Compare image size with ArCamera frame size if list(image.shape[0:2][::-1]) != self.__pipeline.size: logging.warning('%s._process_camera_image: image size (%i x %i) is different of ArCamera frame size (%i x %i)', DataFeatures.get_class_path(self), width, height, self.__pipeline.size[0], self.__pipeline.size[1]) return try: logging.debug('\t> watch image (%i x %i)', width, height) self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp), catch_exceptions=self.__catch_exceptions) # TODO: make this step optional self.__pipeline.map(timestamp=timestamp, catch_exceptions=self.__catch_exceptions) except DataFeatures.TimestampedException as e: logging.warning('%s._process_camera_image: %s', DataFeatures.get_class_path(self), e) self.__exceptions.append(e) else: raise (TypeError('Pipeline is not ArCamera instance.')) @DataFeatures.PipelineStepImage def image(self, draw_times: bool = None, draw_exceptions: bool = None): """ Get pipeline image with execution information. Parameters: draw_times: draw pipeline execution times draw_exceptions: draw pipeline exception messages """ logging.debug('ArContext.image %s', self.name) image = self.__pipeline.image() height, width, _ = image.shape logging.debug('\t> get image (%i x %i)', width, height) last_position = self.__pipeline.last_gaze_position() info_stack = 0 if draw_times: if image.is_timestamped(): info_stack += 1 cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArCamera): try: watch_time = int(self.__pipeline.execution_times['watch']) except KeyError: watch_time = math.nan info_stack += 1 cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if last_position is not None: info_stack += 1 cv2.putText(image, f'Position at {last_position.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArFrame): try: look_time = self.__pipeline.execution_times['look'] except KeyError: look_time = math.nan info_stack += 1 cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if draw_exceptions: # Write exceptions while self.__exceptions: e = self.__exceptions.pop() i = len(self.__exceptions) cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - i * 50), (0, 0, 127), -1) cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) return image def is_running(self) -> bool: """Is context running?""" return not self._stop_event.is_set() @DataFeatures.PipelineStepMethod def stop(self): """Stop context.""" self._stop_event.set() @DataFeatures.PipelineStepMethod def pause(self): """Pause pipeline processing.""" self._pause_event.set() def is_paused(self) -> bool: """Is pipeline processing paused?""" return self._pause_event.is_set() @DataFeatures.PipelineStepMethod def resume(self): """Resume pipeline processing.""" self._pause_event.clear() class LiveProcessingContext(ArContext): """ Defines abstract live data processing context. """ @DataFeatures.PipelineStepInit def __init__(self, **kwargs): super().__init__() def calibrate(self): """Launch device calibration process.""" raise NotImplementedError # Define default PostProcessingContext image parameters DEFAULT_POST_PROCESSING_CONTEXT_IMAGE_PARAMETERS = { "draw_progression": True } class PostProcessingContext(ArContext): """ Defines abstract post data processing context. """ @DataFeatures.PipelineStepInit def __init__(self, **kwargs): super().__init__() self._image_parameters = {**DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_POST_PROCESSING_CONTEXT_IMAGE_PARAMETERS} @property def duration(self) -> int|float: """Get data duration.""" raise NotImplementedError @property def progression(self) -> float: """Get data processing progression between 0 and 1.""" raise NotImplementedError @DataFeatures.PipelineStepImage def image(self, draw_progression: bool = None, **kwargs): """ Get pipeline image with post processing information. Parameters: draw_progression: draw progress bar """ logging.debug('PostProcessingContext.image %s', self.name) image = super().image(**kwargs) height, width, _ = image.shape if draw_progression: p = int(self.progression * width) cv2.rectangle(image, (0, 0), (p, 2), (255, 255, 255), -1) return image