#!/usr/bin/env python """ArGaze pipeline assets.""" __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple, Any from dataclasses import dataclass, field import json import os import importlib from inspect import getmembers import threading import time from argaze import DataFeatures, GazeFeatures from argaze.AreaOfInterest import * from argaze.GazeAnalysis import * import numpy import cv2 ArLayerType = TypeVar('ArLayer', bound="ArLayer") # Type definition for type annotation convenience ArFrameType = TypeVar('ArFrame', bound="ArFrame") # Type definition for type annotation convenience ArSceneType = TypeVar('ArScene', bound="ArScene") # Type definition for type annotation convenience ArCameraType = TypeVar('ArCamera', bound="ArCamera") # Type definition for type annotation convenience class PoseEstimationFailed(Exception): """ Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. """ def __init__(self, message, unconsistencies=None): super().__init__(message) self.unconsistencies = unconsistencies class SceneProjectionFailed(Exception): """ Exception raised by ArCamera watch method when the scene can't be projected. """ def __init__(self, message): super().__init__(message) class LoadingFailed(Exception): """ Exception raised when attributes loading fails. """ def __init__(self, message): super().__init__(message) # Define default ArLayer draw parameters DEFAULT_ARLAYER_DRAW_PARAMETERS = { "draw_aoi_scene": { "draw_aoi": { "color": (255, 255, 255), "border_size": 1 } }, "draw_aoi_matching": { "draw_matched_fixation": { "deviation_circle_color": (255, 255, 255) }, "draw_matched_fixation_positions": { "position_color": (0, 255, 255), "line_color": (0, 0, 0) }, "draw_matched_region": { "color": (0, 255, 0), "border_size": 4 }, "draw_looked_aoi": { "color": (0, 255, 0), "border_size": 2 }, "looked_aoi_name_color": (255, 255, 255), "looked_aoi_name_offset": (0, -10) } } @dataclass class ArLayer(DataFeatures.SharedObject): """ Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads Parameters: name: name of the layer aoi_scene: AOI scene description aoi_matcher: AOI matcher object aoi_scan_path: AOI scan path object aoi_scan_path_analyzers: dictionary of AOI scan path analyzers loggers: dictionary of timestamped data loggers draw_parameters: default parameters passed to draw method """ name: str aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene) aoi_matcher: GazeFeatures.AOIMatcher = field(default_factory=GazeFeatures.AOIMatcher) aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) aoi_scan_path_analyzers: dict = field(default_factory=dict) loggers: dict = field(default=dict) draw_parameters: dict = field(default_factory=DEFAULT_ARLAYER_DRAW_PARAMETERS) def __post_init__(self): # Init sharedObject super().__init__() # Define parent attribute: it will be setup by parent later self.__parent = None # Init current gaze movement self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() # Init new analysis available state self.__new_analysis_available = False # Cast aoi scene to its effective dimension if self.aoi_scene.dimension == 2: self.aoi_scene = AOI2DScene.AOI2DScene(self.aoi_scene) elif self.aoi_scene.dimension == 3: self.aoi_scene = AOI3DScene.AOI3DScene(self.aoi_scene) @classmethod def from_dict(self, layer_data: dict, working_directory: str = None) -> ArLayerType: """Load attributes from dictionary. Parameters: layer_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load name try: new_layer_name = layer_data.pop('name') except KeyError: new_layer_name = None # Load aoi scene try: new_aoi_scene_value = layer_data.pop('aoi_scene') # str: relative path to file if type(new_aoi_scene_value) == str: filepath = os.path.join(working_directory, new_aoi_scene_value) file_format = filepath.split('.')[-1] # JSON file format for 2D or 3D dimension if file_format == 'json': new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath) # SVG file format for 2D dimension only if file_format == 'svg': new_aoi_scene = AOI2DScene.AOI2DScene.from_svg(filepath) # OBJ file format for 3D dimension only elif file_format == 'obj': new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath) # dict: else: new_aoi_scene = AOIFeatures.AOIScene.from_dict(new_aoi_scene_value) except KeyError: pass # Add AOI 2D Scene by default new_aoi_scene = AOI2DScene.AOI2DScene() # Edit expected AOI list by removing AOI with name equals to layer name expected_aoi = list(new_aoi_scene.keys()) if new_layer_name in expected_aoi: expected_aoi.remove(new_layer_name) # Load aoi matcher try: aoi_matcher_value = layer_data.pop('aoi_matcher') aoi_matcher_module_path, aoi_matcher_parameters = aoi_matcher_value.popitem() # Prepend argaze.GazeAnalysis path when a single name is provided if len(aoi_matcher_module_path.split('.')) == 1: aoi_matcher_module_path = f'argaze.GazeAnalysis.{aoi_matcher_module_path}' aoi_matcher_module = importlib.import_module(aoi_matcher_module_path) new_aoi_matcher = aoi_matcher_module.AOIMatcher(**aoi_matcher_parameters) except KeyError: new_aoi_matcher = None # Load AOI scan path try: new_aoi_scan_path_data = layer_data.pop('aoi_scan_path') new_aoi_scan_path_data['expected_aoi'] = expected_aoi new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) except KeyError: new_aoi_scan_path_data = {} new_aoi_scan_path_data['expected_aoi'] = expected_aoi new_aoi_scan_path = None # Load AOI scan path analyzers new_aoi_scan_path_analyzers = {} try: new_aoi_scan_path_analyzers_value = layer_data.pop('aoi_scan_path_analyzers') for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items(): # Prepend argaze.GazeAnalysis path when a single name is provided if len(aoi_scan_path_analyzer_module_path.split('.')) == 1: aoi_scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_module_path}' aoi_scan_path_analyzer_module = importlib.import_module(aoi_scan_path_analyzer_module_path) # Check aoi scan path analyzer parameters type members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of argaze.GazeAnalysis module parameter_module_path = parameter_type.__module__.split('.') # Check if parameter is part of a package if len(parameter_type.__module__.split('.')) > 1: # Try get existing analyzer instance to append as parameter try: aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_type.__module__] except KeyError: raise LoadingFailed(f'{aoi_scan_path_analyzer_module_path} aoi scan path analyzer loading fails because {parameter_type.__module__} aoi scan path analyzer is missing.') aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters) new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer # Force AOI scan path creation if len(new_aoi_scan_path_analyzers) > 0 and new_aoi_scan_path == None: new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) except KeyError: pass # Load loggers new_loggers = {} try: new_loggers_value = layer_data.pop('loggers') for logger_name, logger_data in new_loggers_value.items(): logger = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) logger.name = logger_name new_loggers[logger_name] = logger except KeyError: pass # Load image parameters try: new_layer_draw_parameters = layer_data.pop('draw_parameters') except KeyError: new_layer_draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS # Create layer return ArLayer(new_layer_name, \ new_aoi_scene, \ new_aoi_matcher, \ new_aoi_scan_path, \ new_aoi_scan_path_analyzers, \ new_loggers, \ new_layer_draw_parameters \ ) @classmethod def from_json(self, json_filepath: str) -> ArLayerType: """ Load attributes from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: layer_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArLayer.from_dict(layer_data, working_directory) @property def parent(self): """Get parent instance""" return self.__parent @parent.setter def parent(self, parent): """Get parent instance""" self.__parent = parent @property def new_analysis_available(self) -> bool: """Is there new aoi scan path analysis to check?""" return self.__new_analysis_available @DataFeatures.PipelineStep def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> dict: """ Project timestamped gaze movement into layer. !!! warning Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute. Parameters: gaze_movement: gaze movement to project """ # Lock layer exploitation self.acquire() # Gather look data look_data = locals() # Update current gaze movement self.__gaze_movement = gaze_movement # No new analysis available by default self.__new_analysis_available = False # Init looked aoi name looked_aoi_name = None if self.aoi_matcher is not None: # Update looked aoi thanks to aoi matcher # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally looked_aoi_name, _ , match_time, match_exception = self.aoi_matcher.match(self.aoi_scene, gaze_movement) # Valid and finished gaze movement has been identified if gaze_movement.valid and gaze_movement.finished: if GazeFeatures.is_fixation(gaze_movement): # Append fixation to aoi scan path if self.aoi_scan_path is not None and looked_aoi_name is not None: aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, looked_aoi_name) # Is there a new step? if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): # Analyze aoi scan path analyze_time, analyze_exception = aoi_scan_path_analyzer.analyze(self.aoi_scan_path) # Update new analysis available state self.__new_analysis_available = True elif GazeFeatures.is_saccade(gaze_movement): # Append saccade to aoi scan path if self.aoi_scan_path is not None: self.aoi_scan_path.append_saccade(timestamp, gaze_movement) # Log look data for logger_name, logger in self.loggers.items(): logger.emit(look_data) # Unlock layer exploitation self.release() def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ Draw into image. Parameters: draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn) draw_aoi_matching: AOIMatcher.draw parameters (which depends of the loaded aoi matcher module, if None, no aoi matching is drawn) """ # Use draw_parameters attribute if no parameters if draw_aoi_scene is None and draw_aoi_matching is None: return self.draw(image, **self.draw_parameters) # Lock layer exploitation self.acquire() # Draw aoi if required if draw_aoi_scene is not None: self.aoi_scene.draw(image, **draw_aoi_scene) # Draw aoi matching if required if draw_aoi_matching is not None and self.aoi_matcher is not None: self.aoi_matcher.draw(image, self.aoi_scene, **draw_aoi_matching) # Unlock layer exploitation self.release() # Define default ArFrame image parameters DEFAULT_ARFRAME_IMAGE_PARAMETERS = { "background_weight": 1., "heatmap_weight": 0.5, "draw_scan_path": { "draw_fixations": { "deviation_circle_color": (255, 255, 255), "duration_border_color": (127, 127, 127), "duration_factor": 1e-2 }, "draw_saccades": { "line_color": (255, 255, 255) }, "deepness": 0 }, "draw_gaze_positions": { "color": (0, 255, 255), "size": 2 } } @dataclass class ArFrame(DataFeatures.SharedObject): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads Parameters: name: name of the frame size: defines the dimension of the rectangular area where gaze positions are projected gaze_position_calibrator: gaze position calibration algoritm gaze_movement_identifier: gaze movement identification algorithm filter_in_progress_identification: ignore in progress gaze movement identification scan_path: scan path object scan_path_analyzers: dictionary of scan path analyzers heatmap: heatmap object background: picture to draw behind layers: dictionary of AOI layers loggers: dictionary of timestamped data loggers image_parameters: default parameters passed to image method """ name: str size: tuple[int] = field(default=(1, 1)) gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = field(default_factory=GazeFeatures.GazePositionCalibrator) gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier) filter_in_progress_identification: bool = field(default=True) scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath) scan_path_analyzers: dict = field(default_factory=dict) heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap) background: numpy.array = field(default_factory=lambda : numpy.array([])) layers: dict = field(default_factory=dict) loggers: dict = field(default=dict) image_parameters: dict = field(default_factory=DEFAULT_ARFRAME_IMAGE_PARAMETERS) def __post_init__(self): # Init sharedObject super().__init__() # Define parent attribute: it will be setup by parent later self.__parent = None # Setup layers parent attribute for name, layer in self.layers.items(): layer.parent = self # Init current gaze position self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() # Init current gaze movement self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() # Init new analysis available state self.__new_analysis_available = False @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: """Load attributes from dictionary. Parameters: frame_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load name try: new_frame_name = frame_data.pop('name') except KeyError: new_frame_name = None # Load size try: new_frame_size = frame_data.pop('size') except KeyError: new_frame_size = (0, 0) # Load gaze position calibrator try: gaze_position_calibrator_value = frame_data.pop('gaze_position_calibrator') # str: relative path to file if type(gaze_position_calibrator_value) == str: filepath = os.path.join(working_directory, gaze_position_calibrator_value) file_format = filepath.split('.')[-1] # JSON file format if file_format == 'json': new_gaze_position_calibrator = GazeFeatures.GazePositionCalibrator.from_json(filepath) # dict: else: new_gaze_position_calibrator = GazeFeatures.GazePositionCalibrator.from_dict(gaze_position_calibrator_value) except KeyError: new_gaze_position_calibrator = None # Load gaze movement identifier try: gaze_movement_identifier_value = frame_data.pop('gaze_movement_identifier') gaze_movement_identifier_module_path, gaze_movement_identifier_parameters = gaze_movement_identifier_value.popitem() # Prepend argaze.GazeAnalysis path when a single name is provided if len(gaze_movement_identifier_module_path.split('.')) == 1: gaze_movement_identifier_module_path = f'argaze.GazeAnalysis.{gaze_movement_identifier_module_path}' gaze_movement_identifier_module = importlib.import_module(gaze_movement_identifier_module_path) new_gaze_movement_identifier = gaze_movement_identifier_module.GazeMovementIdentifier(**gaze_movement_identifier_parameters) except KeyError: new_gaze_movement_identifier = None # Current fixation matching try: filter_in_progress_identification = frame_data.pop('filter_in_progress_identification') except KeyError: filter_in_progress_identification = True # Load scan path try: new_scan_path_data = frame_data.pop('scan_path') new_scan_path = GazeFeatures.ScanPath(**new_scan_path_data) except KeyError: new_scan_path_data = {} new_scan_path = None # Load scan path analyzers new_scan_path_analyzers = {} try: new_scan_path_analyzers_value = frame_data.pop('scan_path_analyzers') for scan_path_analyzer_module_path, scan_path_analyzer_parameters in new_scan_path_analyzers_value.items(): # Prepend argaze.GazeAnalysis path when a single name is provided if len(scan_path_analyzer_module_path.split('.')) == 1: scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{scan_path_analyzer_module_path}' scan_path_analyzer_module = importlib.import_module(scan_path_analyzer_module_path) # Check scan path analyzer parameters type members = getmembers(scan_path_analyzer_module.ScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of a package if len(parameter_type.__module__.split('.')) > 1: # Try get existing analyzer instance to append as parameter try: scan_path_analyzer_parameters[parameter] = new_scan_path_analyzers[parameter_type.__module__] except KeyError: raise LoadingFailed(f'{scan_path_analyzer_module_path} scan path analyzer loading fails because {parameter_type.__module__} scan path analyzer is missing.') scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters) new_scan_path_analyzers[scan_path_analyzer_module_path] = scan_path_analyzer # Force scan path creation if len(new_scan_path_analyzers) > 0 and new_scan_path == None: new_scan_path = GazeFeatures.ScanPath(**new_scan_path_data) except KeyError: pass # Load heatmap try: new_heatmap_data = frame_data.pop('heatmap') # Default heatmap size equals frame size if 'size' not in new_heatmap_data.keys(): new_heatmap_data['size'] = new_frame_size new_heatmap = AOIFeatures.Heatmap(**new_heatmap_data) except KeyError: new_heatmap_data = {} new_heatmap = None # Load background image try: new_frame_background_value = frame_data.pop('background') new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC) except KeyError: new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8) # Load layers new_layers = {} try: for layer_name, layer_data in frame_data.pop('layers').items(): # Append name layer_data['name'] = layer_name # Create layer new_layer = ArLayer.from_dict(layer_data, working_directory) # Append new layer new_layers[layer_name] = new_layer except KeyError: pass # Load loggers new_loggers = {} try: new_loggers_value = frame_data.pop('loggers') for logger_name, logger_data in new_loggers_value.items(): logger = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) logger.name = logger_name new_loggers[logger_name] = logger except KeyError: pass # Load image parameters try: new_frame_image_parameters = frame_data.pop('image_parameters') except KeyError: new_frame_image_parameters = DEFAULT_ARFRAME_IMAGE_PARAMETERS # Create frame return ArFrame(new_frame_name, \ new_frame_size, \ new_gaze_position_calibrator, \ new_gaze_movement_identifier, \ filter_in_progress_identification, \ new_scan_path, \ new_scan_path_analyzers, \ new_heatmap, \ new_frame_background, \ new_layers, \ new_loggers, new_frame_image_parameters \ ) @classmethod def from_json(self, json_filepath: str) -> ArFrameType: """ Load attributes from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: frame_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArFrame.from_dict(frame_data, working_directory) @property def parent(self) -> object: """Get parent instance""" return self.__parent @parent.setter def parent(self, parent: object): """Set parent instance""" self.__parent = parent @property def gaze_position(self) -> object: """Get current calibrated gaze position""" return self.__calibrated_gaze_position @property def gaze_movement(self) -> object: """Get current identified gaze movement""" return self.__identified_gaze_movement @property def new_analysis_available(self) -> bool: """Is there new scan path analysis to check?""" return self.__new_analysis_available @DataFeatures.PipelineStep def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): """ Project gaze position into frame. !!! warning Be aware that gaze positions are in the same range of value than size attribute. Parameters: timestamp: any number used to know when the given gaze position occurs gaze_position: gaze position to project """ # Lock frame exploitation self.acquire() # Store look arguments look_data = locals() # No new analysis by default self.__new_analysis_available = False # No gaze movement identified by default self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() # Apply gaze position calibration if self.gaze_position_calibrator is not None: self.__calibrated_gaze_position = self.gaze_position_calibrator.apply(gaze_position) # Or update gaze position at least else: self.__calibrated_gaze_position = gaze_position # Identify gaze movement if self.gaze_movement_identifier is not None: # Identify finished gaze movement self.__identified_gaze_movement, identify_time, identify_exception = self.gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished: if GazeFeatures.is_fixation(self.__identified_gaze_movement): # Append fixation to scan path if self.scan_path is not None: self.scan_path.append_fixation(timestamp, self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): # Append saccade to scan path if self.scan_path is not None: scan_step = self.scan_path.append_saccade(timestamp, self.__identified_gaze_movement) # Is there a new step? if scan_step and len(self.scan_path) > 1: for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): # Analyze aoi scan path analyze_time, analyze_exception = scan_path_analyzer.analyze(self.scan_path) # Update new analysis available state self.__new_analysis_available = True # No valid finished gaze movement: optionnaly stop in progress identification filtering elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification: self.__identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement # Update heatmap if self.heatmap is not None: # Scale gaze position value scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]]) # Update heatmap image update_time, update_exception = self.heatmap.update(self.__calibrated_gaze_position.value * scale) # Look layers with valid identified gaze movement # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally for layer_name, layer in self.layers.items(): look_time, look_exception = layer.look(timestamp, self.__identified_gaze_movement) # Log look data for logger_name, logger in self.loggers.items(): logger.emit(look_data) # Unlock frame exploitation self.release() def __image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. Parameters: background_weight: weight of background overlay heatmap_weight: weight of heatmap overlay draw_gaze_position_calibrator: [GazeFeatures.GazePositionCalibrator.draw](argaze.md/#argaze.GazeFeatures.GazePositionCalibrator.draw) parameters (if None, nothing is drawn) draw_scan_path: [GazeFeatures.ScanPath.draw](argaze.md/#argaze.GazeFeatures.ScanPath.draw) parameters (if None, no scan path is drawn) draw_layers: dictionary of [ArLayer.draw](argaze.md/#argaze.ArFeatures.ArLayer.draw) parameters per layer (if None, no layer is drawn) draw_gaze_positions: [GazeFeatures.GazePosition.draw](argaze.md/#argaze.GazeFeatures.GazePosition.draw) parameters (if None, no gaze position is drawn) draw_fixations: [GazeFeatures.Fixation.draw](argaze.md/#argaze.GazeFeatures.Fixation.draw) parameters (if None, no fixation is drawn) draw_saccades: [GazeFeatures.Saccade.draw](argaze.md/#argaze.GazeFeatures.Saccade.draw) parameters (if None, no saccade is drawn) """ # Lock frame exploitation self.acquire() # Draw background only if background_weight is not None and (heatmap_weight is None or self.heatmap is None): image = self.background.copy() # Draw mix background and heatmap if required elif background_weight is not None and heatmap_weight is not None and self.heatmap: background_image = self.background.copy() heatmap_image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) image = cv2.addWeighted(heatmap_image, heatmap_weight, background_image, background_weight, 0) # Draw heatmap only elif background_weight is None and heatmap_weight is not None and self.heatmap: image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) # Draw black image else: image = numpy.full((self.size[1], self.size[0], 3), 0).astype(numpy.uint8) # Draw gaze position calibrator if draw_gaze_position_calibrator is not None: self.gaze_position_calibrator.draw(image, size=self.size, **draw_gaze_position_calibrator) # Draw scan path if required if draw_scan_path is not None and self.scan_path is not None: self.scan_path.draw(image, **draw_scan_path) # Draw current fixation if required if draw_fixations is not None and self.gaze_movement_identifier is not None: self.gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) # Draw current saccade if required if draw_saccades is not None and self.gaze_movement_identifier is not None: self.gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) # Draw layers if required if draw_layers is not None: for layer_name, draw_layer in draw_layers.items(): self.layers[layer_name].draw(image, **draw_layer) # Draw current gaze position if required if draw_gaze_positions is not None: self.__calibrated_gaze_position.draw(image, **draw_gaze_positions) # Unlock frame exploitation self.release() return image def image(self, **kwargs: dict) -> numpy.array: """ Get frame image. Parameters: kwargs: ArFrame.__image parameters """ # Use image_parameters attribute if no kwargs if kwargs: return self.__image(**kwargs) return self.__image(**self.image_parameters) @dataclass class ArScene(): """ Define abstract Augmented Reality scene with ArLayers and ArFrames inside. Parameters: name: name of the scene layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. """ name: str layers: dict = field(default_factory=dict) frames: dict = field(default_factory=dict) angle_tolerance: float = field(default=0.) distance_tolerance: float = field(default=0.) def __post_init__(self): # Define parent attribute: it will be setup by parent object later self.__parent = None # Setup layer parent attribute for name, layer in self.layers.items(): layer.parent = self # Setup frame parent attribute for name, frame in self.frames.items(): frame.parent = self def __str__(self) -> str: """ Returns: String representation """ output = f'parent:\n{self.parent.name}\n' if len(self.layers): output += f'ArLayers:\n' for name, layer in self.layers.items(): output += f'{name}:\n{layer}\n' if len(self.frames): output += f'ArFrames:\n' for name, frame in self.frames.items(): output += f'{name}:\n{frame}\n' return output @property def parent(self): """Get parent instance""" return self.__parent @parent.setter def parent(self, parent): """Get parent instance""" self.__parent = parent @classmethod def from_dict(self, scene_data: dict, working_directory: str = None) -> ArSceneType: """ Load ArScene from dictionary. Parameters: scene_data: dictionary working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load name try: new_scene_name = scene_data.pop('name') except KeyError: new_scene_name = None # Load layers new_layers = {} try: for layer_name, layer_data in scene_data.pop('layers').items(): # Append name layer_data['name'] = layer_name # Create layer new_layer = ArLayer.from_dict(layer_data, working_directory) # Append new layer new_layers[layer_name] = new_layer except KeyError: pass # Load frames new_frames = {} try: for frame_name, frame_data in scene_data.pop('frames').items(): # str: relative path to file if type(frame_data) == str: filepath = os.path.join(working_directory, frame_data) file_format = filepath.split('.')[-1] # JSON file format for 2D or 3D dimension if file_format == 'json': new_frame = ArFrame.from_json(filepath) # dict: else: # Append name frame_data['name'] = frame_name new_frame = ArFrame.from_dict(frame_data, working_directory) # Look for a scene layer with an AOI named like the frame for scene_layer_name, scene_layer in new_layers.items(): try: frame_3d = scene_layer.aoi_scene[frame_name] # Check that the frame have a layer named like this scene layer aoi_2d_scene = new_frame.layers[scene_layer_name].aoi_scene # Transform 2D frame layer AOI into 3D scene layer AOI # Then, add them to scene layer scene_layer.aoi_scene |= aoi_2d_scene.dimensionalize(frame_3d, new_frame.size) '''DEPRECATED: but maybe still usefull? # Project and reframe each layers into corresponding frame layers for frame_layer_name, frame_layer in new_frame.layers.items(): try: layer = new_layers[frame_layer_name] layer_aoi_scene_projection = layer.aoi_scene.orthogonal_projection aoi_frame_projection = layer_aoi_scene_projection[frame_name] frame_layer.aoi_scene = layer_aoi_scene_projection.reframe(aoi_frame_projection, new_frame.size) if frame_layer.aoi_scan_path is not None: # Edit expected AOI list by removing AOI with name equals to frame layer name expected_aoi = list(layer.aoi_scene.keys()) if frame_layer_name in expected_aoi: expected_aoi.remove(frame_layer_name) frame_layer.aoi_scan_path.expected_aoi = expected_aoi except KeyError: continue ''' except KeyError as e: print(e) # Append new frame new_frames[frame_name] = new_frame except KeyError: pass return ArScene(new_scene_name, new_layers, new_frames, **scene_data) def estimate_pose(self, detected_features: any) -> Tuple[numpy.array, numpy.array]: """Define abstract estimate scene pose method. Parameters: detected_features: any features detected by parent ArCamera that will help in scene pose estimation. Returns: tvec: scene translation vector rvec: scene rotation matrix """ raise NotImplementedError('estimate_pose() method not implemented') def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Tuple[str, AOI2DScene.AOI2DScene]: """Project layers according estimated pose and optional field of view clipping angles. Parameters: tvec: translation vector rvec: rotation vector visual_hfov: horizontal field of view clipping angle visual_vfov: vertical field of view clipping angle Returns: layer_name: name of projected layer layer_projection: AOI2DScene projection """ for name, layer in self.layers.items(): # Clip AOI out of the visual horizontal field of view (optional) # TODO: use HFOV and VFOV and don't use vision_cone method if visual_hfov > 0: # Transform layer aoi scene into camera referential aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec) # Get aoi inside vision cone field cone_vision_height_cm = 200 # cm cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) # Keep only aoi inside vision cone field aoi_scene_copy = layer.aoi_scene.copy(exclude=aoi_outside.keys()) else: aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) def draw(self, image: numpy.array, **kwargs: dict): """ Draw scene into image. Parameters: image: where to draw """ raise NotImplementedError('draw() method not implemented') @dataclass class ArCamera(ArFrame): """ Define abstract Augmented Reality camera as ArFrame with ArScenes inside. Parameters: scenes: all scenes to project into camera frame visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV). visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV). """ scenes: dict = field(default_factory=dict) visual_hfov: float = field(default=0.) visual_vfov: float = field(default=0.) def __post_init__(self): # Init ArFrame super().__post_init__() # Setup scenes parent attribute for name, scene in self.scenes.items(): scene.parent = self # Setup expected aoi of each layer aoi scan path with the aoi of corresponding scene layer # Edit aoi matcher exclude attribute to ignore frame aoi for layer_name, layer in self.layers.items(): if layer.aoi_scan_path is not None: expected_aoi_list = [] exclude_aoi_list = [] for scene_name, scene in self.scenes.items(): # Append scene layer aoi to corresponding expected camera layer aoi try: scene_layer = scene.layers[layer_name] expected_aoi_list.extend(list(scene_layer.aoi_scene.keys())) except KeyError: continue # Remove scene frame from expected camera layer aoi # Exclude scene frame from camera layer aoi matching for frame_name, frame in scene.frames.items(): try: expected_aoi_list.remove(frame_name) exclude_aoi_list.append(frame_name) except ValueError: continue layer.aoi_scan_path.expected_aoi = expected_aoi_list layer.aoi_matcher.exclude = exclude_aoi_list def __str__(self) -> str: """ Returns: String representation """ output = f'Name:\n{self.name}\n' for name, scene in self.scenes.items(): output += f'\"{name}\" {type(scene)}:\n{scene}\n' return output @classmethod def from_dict(self, camera_data: dict, working_directory: str = None) -> ArCameraType: """ Load ArCamera from dictionary. Parameters: camera_data: dictionary working_directory: folder path where to load files when a dictionary value is a relative filepath. """ raise NotImplementedError('from_dict() method not implemented') @classmethod def from_json(self, json_filepath: str) -> ArCameraType: """ Load ArCamera from .json file. Parameters: json_filepath: path to json file """ raise NotImplementedError('from_json() method not implemented') @property def scene_frames(self): """Iterate over all scenes frames""" # For each scene for scene_name, scene in self.scenes.items(): # For each scene frame for name, scene_frame in scene.frames.items(): yield scene_frame def watch(self, timestamp: int|float, image: numpy.array) -> Tuple[float, dict]: """Detect AR features from image and project scenes into camera frame. Parameters: timestamp: image time stamp (unit does'nt matter) image: image where to extract AR features Returns: detection time: AR features detection time in ms. exception: dictionary with exception raised per scene. """ raise NotImplementedError('watch() method not implemented') def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition): """Project timestamped gaze position into each scene frames. Parameters: timestamp: gaze position time stamp (unit does'nt matter) gaze_position: GazePosition object !!! warning watch method needs to be called first. """ # Project gaze position into camera frame yield self, super().look(timestamp, gaze_position) # Lock camera frame exploitation self.acquire() # Project gaze position into each scene frames if possible for scene_frame in self.scene_frames: # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? for camera_layer_name, camera_layer in self.layers.items(): try: aoi_2d = camera_layer.aoi_scene[scene_frame.name] # TODO?: Should we prefer to use camera frame AOIMatcher object? if aoi_2d.contains_point(gaze_position.value): inner_x, inner_y = aoi_2d.clockwise().inner_axis(*gaze_position.value) # QUESTION: How to project gaze precision? inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) yield scene_frame, scene_frame.look(timestamp, inner_gaze_position * scene_frame.size) # Ignore missing aoi in camera frame layer projection except KeyError: pass # Unlock camera frame exploitation self.release() def map(self): """Project camera frame background into scene frames background. !!! warning watch method needs to be called first. """ # Lock camera frame exploitation self.acquire() # Project camera frame background into each scene frame if possible for frame in self.scene_frames: # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? for camera_layer_name, camera_layer in self.layers.items(): try: aoi_2d = camera_layer.aoi_scene[frame.name] # Apply perspective transform algorithm to fill aoi frame background width, height = frame.size destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) frame.background = cv2.warpPerspective(self.background, mapping, (width, height)) # Ignore missing frame projection except KeyError: pass # Unlock camera frame exploitation self.release() def to_json(self, json_filepath): """Save camera to .json file.""" with open(json_filepath, 'w', encoding='utf-8') as file: json.dump(self, file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder)