#!/usr/bin/env python """Manage AR environement assets.""" __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple from dataclasses import dataclass, field import json import os import importlib from inspect import getmembers import threading import time from argaze import DataStructures, GazeFeatures from argaze.ArUcoMarkers import * from argaze.AreaOfInterest import * from argaze.GazeAnalysis import * import numpy import cv2 ArLayerType = TypeVar('ArLayer', bound="ArLayer") # Type definition for type annotation convenience ArFrameType = TypeVar('ArFrame', bound="ArFrame") # Type definition for type annotation convenience ArSceneType = TypeVar('ArScene', bound="ArScene") # Type definition for type annotation convenience ArEnvironmentType = TypeVar('ArEnvironment', bound="ArEnvironment") # Type definition for type annotation convenience class PoseEstimationFailed(Exception): """ Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. """ def __init__(self, message, unconsistencies=None): super().__init__(message) self.unconsistencies = unconsistencies class SceneProjectionFailed(Exception): """ Exception raised by ArEnvironment detect_and_project method when the scene can't be projected. """ def __init__(self, message): super().__init__(message) class LoadingFailed(Exception): """ Exception raised when attributes loading fails. """ def __init__(self, message): super().__init__(message) # Define default ArLayer draw parameters DEFAULT_ARLAYER_DRAW_PARAMETERS = { "draw_aoi_scene": { "draw_aoi": { "color": (255, 255, 255), "border_size": 1 } }, "draw_aoi_matching": { "draw_matched_fixation": { "deviation_circle_color": (255, 255, 255) }, "draw_matched_fixation_positions": { "position_color": (0, 255, 255), "line_color": (0, 0, 0) }, "draw_matched_region": { "color": (0, 255, 0), "border_size": 4 }, "draw_looked_aoi": { "color": (0, 255, 0), "border_size": 2 }, "looked_aoi_name_color": (255, 255, 255), "looked_aoi_name_offset": (0, -10) } } @dataclass class ArLayer(): """ Defines a space where to make matching of gaze movements and AOIs and inside which those matchings need to be analyzed. Parameters: name: name of the layer aoi_color: color to used in draw method aoi_scene: AOI scene description aoi_matcher: AOI matcher object aoi_scan_path: AOI scan path object aoi_scan_path_analyzers: dictionary of AOI scan path analyzers log: enable aoi scan path analysis logging draw_parameters: default parameters passed to draw method """ name: str aoi_color: tuple = field(default=(0, 0, 0)) aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene) aoi_matcher: GazeFeatures.AOIMatcher = field(default_factory=GazeFeatures.AOIMatcher) aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) aoi_scan_path_analyzers: dict = field(default_factory=dict) log: bool = field(default=False) draw_parameters: dict = field(default_factory=DEFAULT_ARLAYER_DRAW_PARAMETERS) def __post_init__(self): # Define parent attribute: it will be setup by parent later self.__parent = None # Init current gaze movement self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() # Init lock to share looking data with multiples threads self.__look_lock = threading.Lock() # Cast aoi scene to its effective dimension if self.aoi_scene.dimension == 2: self.aoi_scene = AOI2DScene.AOI2DScene(self.aoi_scene) elif self.aoi_scene.dimension == 3: self.aoi_scene = AOI3DScene.AOI3DScene(self.aoi_scene) # Prepare logging if needed self.__ts_logs = {} if self.log: # Create timestamped buffers to log each aoi scan path analysis for aoi_scan_path_analyzer_module_path in self.aoi_scan_path_analyzers.keys(): self.__ts_logs[aoi_scan_path_analyzer_module_path] = DataStructures.TimeStampedBuffer() @classmethod def from_dict(self, layer_data: dict, working_directory: str = None) -> ArLayerType: """Load attributes from dictionary. Parameters: layer_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load name try: new_layer_name = layer_data.pop('name') except KeyError: new_layer_name = None # Load aoi color try: new_aoi_color = layer_data.pop('aoi_color') except KeyError: new_aoi_color = (0, 0, 0) # Load optional aoi filter try: aoi_exclude_list = layer_data.pop('aoi_exclude') except KeyError: aoi_exclude_list = [] # Load aoi scene try: new_aoi_scene_value = layer_data.pop('aoi_scene') # str: relative path to file if type(new_aoi_scene_value) == str: filepath = os.path.join(working_directory, new_aoi_scene_value) file_format = filepath.split('.')[-1] # JSON file format for 2D or 3D dimension if file_format == 'json': new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath).copy(exclude=aoi_exclude_list) # OBJ file format for 3D dimension only elif file_format == 'obj': new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath).copy(exclude=aoi_exclude_list) # dict: else: new_aoi_scene = AOIFeatures.AOIScene.from_dict(new_aoi_scene_value) except KeyError: # Add AOI 2D Scene by default new_aoi_scene = AOI2DScene.AOI2DScene() # Load aoi matcher try: aoi_matcher_value = layer_data.pop('aoi_matcher') aoi_matcher_module_path, aoi_matcher_parameters = aoi_matcher_value.popitem() # Prepend argaze.GazeAnalysis path when a single name is provided if len(aoi_matcher_module_path.split('.')) == 1: aoi_matcher_module_path = f'argaze.GazeAnalysis.{aoi_matcher_module_path}' aoi_matcher_module = importlib.import_module(aoi_matcher_module_path) new_aoi_matcher = aoi_matcher_module.AOIMatcher(**aoi_matcher_parameters) except KeyError: new_aoi_matcher = None # Edit expected AOI list by removing AOI with name equals to layer name expected_aois = list(new_aoi_scene.keys()) if new_layer_name in expected_aois: expected_aois.remove(new_layer_name) # Load AOI scan path try: new_aoi_scan_path_data = layer_data.pop('aoi_scan_path') new_aoi_scan_path_data['expected_aois'] = expected_aois new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) except KeyError: new_aoi_scan_path_data = {} new_aoi_scan_path_data['expected_aois'] = expected_aois new_aoi_scan_path = None # Load AOI scan path analyzers new_aoi_scan_path_analyzers = {} try: new_aoi_scan_path_analyzers_value = layer_data.pop('aoi_scan_path_analyzers') for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items(): # Prepend argaze.GazeAnalysis path when a single name is provided if len(aoi_scan_path_analyzer_module_path.split('.')) == 1: aoi_scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_module_path}' aoi_scan_path_analyzer_module = importlib.import_module(aoi_scan_path_analyzer_module_path) # Check aoi scan path analyzer parameters type members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of argaze.GazeAnalysis module parameter_module_path = parameter_type.__module__.split('.') # Check if parameter is part of a package if len(parameter_type.__module__.split('.')) > 1: # Try get existing analyzer instance to append as parameter try: aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_type.__module__] except KeyError: raise LoadingFailed(f'{aoi_scan_path_analyzer_module_path} aoi scan path analyzer loading fails because {parameter_type.__module__} aoi scan path analyzer is missing.') aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters) new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer # Force AOI scan path creation if len(new_aoi_scan_path_analyzers) > 0 and new_aoi_scan_path == None: new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) except KeyError: pass # Load log status try: new_layer_log = layer_data.pop('log') except KeyError: new_layer_log = False # Load image parameters try: new_layer_draw_parameters = layer_data.pop('draw_parameters') except KeyError: new_layer_draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS # Create layer return ArLayer(new_layer_name, \ new_aoi_color, \ new_aoi_scene, \ new_aoi_matcher, \ new_aoi_scan_path, \ new_aoi_scan_path_analyzers, \ new_layer_log, \ new_layer_draw_parameters \ ) @classmethod def from_json(self, json_filepath: str) -> ArLayerType: """ Load attributes from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: layer_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArLayer.from_dict(layer_data, working_directory) @property def parent(self): """Get parent instance""" return self.__parent @parent.setter def parent(self, parent): """Get parent instance""" self.__parent = parent @property def logs(self): """ Get stored logs """ return self.__ts_logs def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> dict: """ Project timestamped gaze movement into layer. !!! warning Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute. Parameters: gaze_movement: gaze movement to project Returns: looked_aoi: most likely looked aoi name aoi_scan_path_analysis: aoi scan path analysis at each new scan step if aoi_scan_path is instanciated exception: error catched during gaze movement processing """ # Lock layer exploitation self.__look_lock.acquire() # Update current gaze movement self.__gaze_movement = gaze_movement # Init looked aoi looked_aoi_name, looked_aoi = (None, None) # Init aoi scan path analysis report aoi_scan_path_analysis = {} # Assess pipeline execution times execution_times = { 'aoi_matcher': None, 'aoi_scan_step_analyzers': {} } # Catch any error exception = None try: # Check gaze movement validity if gaze_movement.valid: if self.aoi_matcher is not None: # Store aoi matching start date matching_start = time.perf_counter() # Update looked aoi thanks to aoi matcher # Note: don't filter finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally looked_aoi_name, looked_aoi = self.aoi_matcher.match(self.aoi_scene, gaze_movement, exclude=[self.name]) # Assess aoi matching time in ms execution_times['aoi_matcher'] = (time.perf_counter() - matching_start) * 1e3 # Finished gaze movement has been identified if gaze_movement.finished: if GazeFeatures.is_fixation(gaze_movement): # Append fixation to aoi scan path if self.aoi_scan_path is not None and looked_aoi_name is not None: aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, looked_aoi_name) # Is there a new step? if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): # Store aoi scan path analysis start date aoi_scan_path_analysis_start = time.perf_counter() # Analyze aoi scan path aoi_scan_path_analyzer.analyze(self.aoi_scan_path) # Assess aoi scan step analysis time in ms execution_times['aoi_scan_step_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_path_analysis_start) * 1e3 # Store analysis aoi_scan_path_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis # Log analysis if self.log: self.__ts_logs[aoi_scan_path_analyzer_module_path][timestamp] = aoi_scan_path_analyzer.analysis elif GazeFeatures.is_saccade(gaze_movement): # Append saccade to aoi scan path if self.aoi_scan_path is not None: self.aoi_scan_path.append_saccade(timestamp, gaze_movement) except Exception as e: print('Warning: the following error occurs in ArLayer.look method:', e) looked_aoi = None aoi_scan_path_analysis = {} exception = e # Unlock layer exploitation self.__look_lock.release() # Sum all execution times total_execution_time = 0 if execution_times['aoi_matcher']: total_execution_time += execution_times['aoi_matcher'] for _, aoi_scan_path_analysis_time in execution_times['aoi_scan_step_analyzers'].items(): total_execution_time += aoi_scan_path_analysis_time execution_times['total'] = total_execution_time # Return look data return looked_aoi, aoi_scan_path_analysis, execution_times, exception def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ Draw into image. Parameters: draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn) draw_aoi_matching: AOIMatcher.draw parameters (which depends of the loaded aoi matcher module, if None, no aoi matching is drawn) """ # Use draw_parameters attribute if no parameters if draw_aoi_scene is None and draw_aoi_matching is None: return self.draw(image, **self.draw_parameters) # Lock frame exploitation self.__look_lock.acquire() # Draw aoi if required if draw_aoi_scene is not None: self.aoi_scene.draw(image, **draw_aoi_scene) # Draw aoi matching if required if draw_aoi_matching is not None and self.aoi_matcher is not None: self.aoi_matcher.draw(image, **draw_aoi_matching) # Unlock frame exploitation self.__look_lock.release() # Define default ArFrame image parameters DEFAULT_ARFRAME_IMAGE_PARAMETERS = { "background_weight": 1., "heatmap_weight": 0.5, "draw_scan_path": { "draw_fixations": { "deviation_circle_color": (255, 255, 255), "duration_border_color": (127, 127, 127), "duration_factor": 1e-2 }, "draw_saccades": { "line_color": (255, 255, 255) }, "deepness": 0 }, "draw_gaze_position": { "color": (0, 255, 255), "size": 2 } } @dataclass class ArFrame(): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. Parameters: name: name of the frame size: defines the dimension of the rectangular area where gaze positions are projected. gaze_movement_identifier: gaze movement identification algorithm filter_in_progress_fixation: ignore in progress fixation scan_path: scan path object scan_path_analyzers: dictionary of scan path analyzers heatmap: heatmap object background: picture to draw behind layers: dictionary of AOI layers log: enable scan path analysis logging image_parameters: default parameters passed to image method """ name: str size: tuple[int] = field(default=(1, 1)) gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier) filter_in_progress_fixation: bool = field(default=True) scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath) scan_path_analyzers: dict = field(default_factory=dict) heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap) background: numpy.array = field(default_factory=numpy.array) layers: dict = field(default_factory=dict) log: bool = field(default=False) image_parameters: dict = field(default_factory=DEFAULT_ARFRAME_IMAGE_PARAMETERS) def __post_init__(self): # Define parent attribute: it will be setup by parent later self.__parent = None # Setup layers parent attribute for name, layer in self.layers.items(): layer.parent = self # Init current gaze position self.__gaze_position = GazeFeatures.UnvalidGazePosition() # Init lock to share looked data with multiples threads self.__look_lock = threading.Lock() # Prepare logging if needed self.__ts_logs = {} if self.log: # Create timestamped buffers to log each aoi scan path analysis for scan_path_analyzer_module_path in self.scan_path_analyzers.keys(): self.__ts_logs[scan_path_analyzer_module_path] = DataStructures.TimeStampedBuffer() @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: """Load attributes from dictionary. Parameters: frame_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load name try: new_frame_name = frame_data.pop('name') except KeyError: new_frame_name = None # Load size try: new_frame_size = frame_data.pop('size') except KeyError: new_frame_size = (0, 0) # Load gaze movement identifier try: gaze_movement_identifier_value = frame_data.pop('gaze_movement_identifier') gaze_movement_identifier_module_path, gaze_movement_identifier_parameters = gaze_movement_identifier_value.popitem() # Prepend argaze.GazeAnalysis path when a single name is provided if len(gaze_movement_identifier_module_path.split('.')) == 1: gaze_movement_identifier_module_path = f'argaze.GazeAnalysis.{gaze_movement_identifier_module_path}' gaze_movement_identifier_module = importlib.import_module(gaze_movement_identifier_module_path) new_gaze_movement_identifier = gaze_movement_identifier_module.GazeMovementIdentifier(**gaze_movement_identifier_parameters) except KeyError: new_gaze_movement_identifier = None # Current fixation matching try: filter_in_progress_fixation = frame_data.pop('filter_in_progress_fixation') except KeyError: filter_in_progress_fixation = True # Load scan path try: new_scan_path_data = frame_data.pop('scan_path') new_scan_path = GazeFeatures.ScanPath(**new_scan_path_data) except KeyError: new_scan_path_data = {} new_scan_path = None # Load scan path analyzers new_scan_path_analyzers = {} try: new_scan_path_analyzers_value = frame_data.pop('scan_path_analyzers') for scan_path_analyzer_module_path, scan_path_analyzer_parameters in new_scan_path_analyzers_value.items(): # Prepend argaze.GazeAnalysis path when a single name is provided if len(scan_path_analyzer_module_path.split('.')) == 1: scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{scan_path_analyzer_module_path}' scan_path_analyzer_module = importlib.import_module(scan_path_analyzer_module_path) # Check scan path analyzer parameters type members = getmembers(scan_path_analyzer_module.ScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of a package if len(parameter_type.__module__.split('.')) > 1: # Try get existing analyzer instance to append as parameter try: scan_path_analyzer_parameters[parameter] = new_scan_path_analyzers[parameter_type.__module__] except KeyError: raise LoadingFailed(f'{scan_path_analyzer_module_path} scan path analyzer loading fails because {parameter_type.__module__} scan path analyzer is missing.') scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters) new_scan_path_analyzers[scan_path_analyzer_module_path] = scan_path_analyzer # Force scan path creation if len(new_scan_path_analyzers) > 0 and new_scan_path == None: new_scan_path = GazeFeatures.ScanPath(**new_scan_path_data) except KeyError: pass # Load heatmap try: new_heatmap_data = frame_data.pop('heatmap') # Default heatmap size equals frame size if 'size' not in new_heatmap_data.keys(): new_heatmap_data['size'] = new_frame_size new_heatmap = AOIFeatures.Heatmap(**new_heatmap_data) except KeyError: new_heatmap_data = {} new_heatmap = None # Load background image try: new_frame_background_value = frame_data.pop('background') new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC) except KeyError: new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8) # Load layers new_layers = {} try: for layer_name, layer_data in frame_data.pop('layers').items(): # Append name layer_data['name'] = layer_name # Create layer new_layer = ArLayer.from_dict(layer_data, working_directory) # Project 3D aoi scene layer to get only 2D aoi scene if new_layer.aoi_scene.dimension == 3: new_layer.aoi_scene = new_layer.aoi_scene.orthogonal_projection * new_frame_size # Append new layer new_layers[layer_name] = new_layer except KeyError: pass # Load log status try: new_frame_log = frame_data.pop('log') except KeyError: new_frame_log = False # Load image parameters try: new_frame_image_parameters = frame_data.pop('image_parameters') except KeyError: new_frame_image_parameters = DEFAULT_ARFRAME_IMAGE_PARAMETERS # Create frame return ArFrame(new_frame_name, \ new_frame_size, \ new_gaze_movement_identifier, \ filter_in_progress_fixation, \ new_scan_path, \ new_scan_path_analyzers, \ new_heatmap, \ new_frame_background, \ new_layers, \ new_frame_log, new_frame_image_parameters \ ) @classmethod def from_json(self, json_filepath: str) -> ArFrameType: """ Load attributes from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: frame_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArFrame.from_dict(frame_data, working_directory) @property def parent(self): """Get parent instance""" return self.__parent @parent.setter def parent(self, parent): """Get parent instance""" self.__parent = parent @property def logs(self): """ Get stored logs """ return self.__ts_logs def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Tuple[GazeFeatures.GazeMovement, dict, dict, dict]: """ Project gaze position into frame. !!! warning Be aware that gaze positions are in the same range of value than size attribute. Parameters: timestamp: gaze_position: gaze position to project Returns: identified_gaze_movement: identified gaze movement from incoming consecutive timestamped gaze positions if gaze_movement_identifier is instanciated. Current gaze movement if filter_in_progress_fixation is True. scan_path_analysis: scan path analysis at each new scan step if scan_path is instanciated exception: error catched during gaze position processing """ # Lock frame exploitation self.__look_lock.acquire() # Update current gaze position self.__gaze_position = gaze_position # No gaze movement identified by default identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() # Init scan path analysis report scan_step_analysis = {} # Init layer analysis report layer_analysis = {} # Assess pipeline execution times execution_times = { 'gaze_movement_identifier': None, 'scan_step_analyzers':{}, 'heatmap': None, 'layers': {} } # Catch any error exception = None try: # Identify gaze movement if self.gaze_movement_identifier is not None: # Store movement identification start date identification_start = time.perf_counter() # Identify finished gaze movement identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__gaze_position) # Assess movement identification time in ms execution_times['gaze_movement_identifier'] = (time.perf_counter() - identification_start) * 1e3 # Valid and finished gaze movement has been identified if identified_gaze_movement.valid and identified_gaze_movement.finished: if GazeFeatures.is_fixation(identified_gaze_movement): # Append fixation to scan path if self.scan_path is not None: self.scan_path.append_fixation(timestamp, identified_gaze_movement) elif GazeFeatures.is_saccade(identified_gaze_movement): # Append saccade to scan path if self.scan_path is not None: scan_step = self.scan_path.append_saccade(timestamp, identified_gaze_movement) # Is there a new step? if scan_step and len(self.scan_path) > 1: for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): # Store scan step analysis start date scan_step_analysis_start = time.perf_counter() # Analyze aoi scan path scan_path_analyzer.analyze(self.scan_path) # Assess scan step analysis time in ms execution_times['scan_step_analyzers'][scan_path_analyzer_module_path] = (time.perf_counter() - scan_step_analysis_start) * 1e3 # Store analysis scan_step_analysis[scan_path_analyzer_module_path] = scan_path_analyzer.analysis # Log analysis if self.log: self.__ts_logs[scan_path_analyzer_module_path][timestamp] = scan_path_analyzer.analysis # No valid finished gaze movement: optionnaly stop in progress fixation filtering elif self.gaze_movement_identifier is not None and not self.filter_in_progress_fixation: current_fixation = self.gaze_movement_identifier.current_fixation if current_fixation.valid: identified_gaze_movement = current_fixation # Update heatmap if self.heatmap is not None: # Store heatmap start date heatmap_start = time.perf_counter() # Scale gaze position value scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]]) # Update heatmap image self.heatmap.update(self.__gaze_position.value * scale) # Assess heatmap time in ms execution_times['heatmap'] = (time.perf_counter() - heatmap_start) * 1e3 # Look layers for layer_name, layer in self.layers.items(): looked_aoi, aoi_scan_path_analysis, layer_execution_times, layer_exception = layer.look(timestamp, identified_gaze_movement) layer_analysis[layer_name] = aoi_scan_path_analysis execution_times['layers'][layer_name] = layer_execution_times if layer_exception: raise(layer_exception) except Exception as e: print('Warning: the following error occurs in ArFrame.look method:', e) identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() scan_step_analysis = {} layer_analysis = {} exception = e # Unlock frame exploitation self.__look_lock.release() # Sum all execution times total_execution_time = 0 if execution_times['gaze_movement_identifier']: total_execution_time += execution_times['gaze_movement_identifier'] for _, scan_step_analysis_time in execution_times['scan_step_analyzers'].items(): total_execution_time += scan_step_analysis_time if execution_times['heatmap']: total_execution_time += execution_times['heatmap'] for _, layer_execution_times in execution_times['layers'].items(): total_execution_time += layer_execution_times['total'] execution_times['total'] = total_execution_time # Return look data return identified_gaze_movement, scan_step_analysis, layer_analysis, execution_times, exception def image(self, background_weight: float = None, heatmap_weight: float = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_position: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. Parameters: background_weight: weight of background overlay heatmap_weight: weight of heatmap overlay draw_scan_path: [GazeFeatures.ScanPath.draw](argaze.md/#argaze.GazeFeatures.ScanPath.draw) parameters (if None, no scan path is drawn) draw_layers: dictionary of [ArLayer.draw](argaze.md/#argaze.ArFeatures.ArLayer.draw) parameters per layer (if None, no layer is drawn) draw_gaze_position: [GazeFeatures.GazePosition.draw](argaze.md/#argaze.GazeFeatures.GazePosition.draw) parameters (if None, no gaze position is drawn) """ # Use image_parameters attribute if no parameters if background_weight is None and heatmap_weight is None and draw_scan_path is None and draw_layers is None and draw_gaze_position is None: return self.image(**self.image_parameters) # Lock frame exploitation self.__look_lock.acquire() # Draw background only if background_weight is not None and heatmap_weight is None: image = self.background.copy() # Draw mix background and heatmap if required elif background_weight is not None and heatmap_weight is not None and self.heatmap: background_image = self.background.copy() heatmap_image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) image = cv2.addWeighted(heatmap_image, heatmap_weight, background_image, background_weight, 0) # Draw heatmap only elif background_weight is None and heatmap_weight is not None and self.heatmap: image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) # Draw black image else: image = numpy.full((self.size[1], self.size[0], 3), 0).astype(numpy.uint8) # Draw scan path if required if draw_scan_path is not None and self.scan_path is not None: self.scan_path.draw(image, **draw_scan_path) # Draw layers if required if draw_layers is not None: for layer_name, draw_layer in draw_layers.items(): self.layers[layer_name].draw(image, **draw_layer) # Draw current gaze position if required if draw_gaze_position is not None: self.__gaze_position.draw(image, **draw_gaze_position) # Unlock frame exploitation self.__look_lock.release() return image @dataclass class ArScene(): """ Define an Augmented Reality scene with ArUcoMarkers, ArLayers and ArFrames inside. Parameters: name: name of the scene aruco_scene: ArUco markers 3D scene description used to estimate scene pose from detected markers: see [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function below. layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. aruco_axis: Optional dictionary to define orthogonal axis where each axis is defined by list of 3 markers identifier (first is origin). \ This pose estimation strategy is used by [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function when at least 3 markers are detected. aruco_aoi: Optional dictionary of AOI defined by list of markers identifier and markers corners index tuples: see [build_aruco_aoi_scene][argaze.ArFeatures.ArScene.build_aruco_aoi_scene] function below. angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. """ name: str aruco_scene: ArUcoScene.ArUcoScene = field(default_factory=ArUcoScene.ArUcoScene) layers: dict = field(default_factory=dict) frames: dict = field(default_factory=dict) aruco_axis: dict = field(default_factory=dict) aruco_aoi: dict = field(default_factory=dict) angle_tolerance: float = field(default=0.) distance_tolerance: float = field(default=0.) def __post_init__(self): # Define parent attribute: it will be setup by parent object later self.__parent = None # Setup layer parent attribute for name, layer in self.layers.items(): layer.parent = self # Setup frame parent attribute for name, frame in self.frames.items(): frame.parent = self # Preprocess orthogonal projection to speed up further processings self.__orthogonal_projection_cache = {} for layer_name, layer in self.layers.items(): self.__orthogonal_projection_cache[layer_name] = layer.aoi_scene.orthogonal_projection def __str__(self) -> str: """ Returns: String representation """ output = f'parent:\n{self.parent.name}\n' output += f'ArUcoScene:\n{self.aruco_scene}\n' if len(self.layers): output += f'ArLayers:\n' for name, layer in self.layers.items(): output += f'{name}:\n{layer}\n' if len(self.frames): output += f'ArFrames:\n' for name, frame in self.frames.items(): output += f'{name}:\n{frame}\n' return output @property def parent(self): """Get parent instance""" return self.__parent @parent.setter def parent(self, parent): """Get parent instance""" self.__parent = parent @classmethod def from_dict(self, scene_data, working_directory: str = None) -> ArSceneType: # Load name try: new_scene_name = scene_data.pop('name') except KeyError: new_scene_name = None # Load aruco scene try: # Check aruco_scene value type aruco_scene_value = scene_data.pop('aruco_scene') # str: relative path to .obj file if type(aruco_scene_value) == str: aruco_scene_value = os.path.join(working_directory, aruco_scene_value) new_aruco_scene = ArUcoScene.ArUcoScene.from_obj(aruco_scene_value) # dict: else: new_aruco_scene = ArUcoScene.ArUcoScene(**aruco_scene_value) except KeyError: new_aruco_scene = None # Load layers new_layers = {} try: for layer_name, layer_data in scene_data.pop('layers').items(): # Append name layer_data['name'] = layer_name # Create layer new_layer = ArLayer.from_dict(layer_data, working_directory) # Append new layer new_layers[layer_name] = new_layer except KeyError: pass # Load frames new_frames = {} try: for frame_name, frame_data in scene_data.pop('frames').items(): # Append name frame_data['name'] = frame_name # Create frame new_frame = ArFrame.from_dict(frame_data, working_directory) # Look for AOI with same frame name aoi_frame = None aoi_frame_found = False for layer_name, layer in new_layers.items(): try: aoi_frame = layer.aoi_scene[frame_name] aoi_frame_found = True except KeyError: # AOI name should be unique break if aoi_frame_found: # Project and reframe each layers into corresponding frame layers for frame_layer_name, frame_layer in new_frame.layers.items(): try: layer = new_layers[frame_layer_name] layer_aoi_scene_projection = layer.aoi_scene.orthogonal_projection aoi_frame_projection = layer_aoi_scene_projection[frame_name] frame_layer.aoi_scene = layer_aoi_scene_projection.reframe(aoi_frame_projection, new_frame.size) if frame_layer.aoi_scan_path is not None: # Edit expected AOI list by removing AOI with name equals to frame layer name expected_aois = list(layer.aoi_scene.keys()) if frame_layer_name in expected_aois: expected_aois.remove(frame_layer_name) frame_layer.aoi_scan_path.expected_aois = expected_aois except KeyError: continue # Append new frame new_frames[frame_name] = new_frame except KeyError: pass return ArScene(new_scene_name, new_aruco_scene, new_layers, new_frames, **scene_data) def estimate_pose(self, detected_markers) -> Tuple[numpy.array, numpy.array, str, dict]: """Estimate scene pose from detected ArUco markers. Returns: scene translation vector scene rotation matrix pose estimation strategy dict of markers used to estimate the pose """ # Pose estimation fails when no marker is detected if len(detected_markers) == 0: raise PoseEstimationFailed('No marker detected') scene_markers, _ = self.aruco_scene.filter_markers(detected_markers) # Pose estimation fails when no marker belongs to the scene if len(scene_markers) == 0: raise PoseEstimationFailed('No marker belongs to the scene') # Estimate scene pose from unique marker transformations elif len(scene_markers) == 1: marker_id, marker = scene_markers.popitem() tvec, rmat = self.aruco_scene.estimate_pose_from_single_marker(marker) return tvec, rmat, 'estimate_pose_from_single_marker', {marker_id: marker} # Try to estimate scene pose from 3 markers defining an orthogonal axis elif len(scene_markers) >= 3 and len(self.aruco_axis) > 0: for axis_name, axis_markers in self.aruco_axis.items(): try: origin_marker = scene_markers[axis_markers['origin_marker']] horizontal_axis_marker = scene_markers[axis_markers['horizontal_axis_marker']] vertical_axis_marker = scene_markers[axis_markers['vertical_axis_marker']] tvec, rmat = self.aruco_scene.estimate_pose_from_axis_markers(origin_marker, horizontal_axis_marker, vertical_axis_marker) return tvec, rmat, 'estimate_pose_from_axis_markers', {origin_marker.identifier: origin_marker, horizontal_axis_marker.identifier: horizontal_axis_marker, vertical_axis_marker.identifier: vertical_axis_marker} except: pass raise PoseEstimationFailed('No marker axis') # Otherwise, check markers consistency consistent_markers, unconsistent_markers, unconsistencies = self.aruco_scene.check_markers_consistency(scene_markers, self.angle_tolerance, self.distance_tolerance) # Pose estimation fails when no marker passes consistency checking if len(consistent_markers) == 0: raise PoseEstimationFailed('Unconsistent marker poses', unconsistencies) # Otherwise, estimate scene pose from all consistent markers pose tvec, rmat = self.aruco_scene.estimate_pose_from_markers(consistent_markers) return tvec, rmat, 'estimate_pose_from_markers', consistent_markers def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> Tuple[str, AOI2DScene.AOI2DScene]: """Project layers according estimated pose and optional horizontal field of view clipping angle. Parameters: tvec: translation vector rvec: rotation vector visual_hfov: horizontal field of view clipping angle Returns: layer_name: name of projected layer layer_projection: AOI2DScene projection """ for name, layer in self.layers.items(): # Clip AOI out of the visual horizontal field of view (optional) if visual_hfov > 0: # Transform layer aoi scene into camera referential aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec) # Get aoi inside vision cone field cone_vision_height_cm = 200 # cm cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) # Keep only aoi inside vision cone field aoi_scene_copy = layer.aoi_scene.copy(exclude=aoi_outside.keys()) else: aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) def build_aruco_aoi_scene(self, detected_markers) -> AOI2DScene.AOI2DScene: """ Build AOI scene from detected ArUco markers as defined in aruco_aoi dictionary. Returns: aoi_2d_scene: built AOI 2D scene """ # ArUco aoi must be defined assert(self.aruco_aoi) # AOI projection fails when no marker is detected if len(detected_markers) == 0: raise SceneProjectionFailed('No marker detected') aruco_aoi_scene = {} for aruco_aoi_name, aoi in self.aruco_aoi.items(): # Each aoi's corner is defined by a marker's corner aoi_corners = [] for corner in ["upper_left_corner", "upper_right_corner", "lower_right_corner", "lower_left_corner"]: marker_identifier = aoi[corner]["marker_identifier"] try: aoi_corners.append(detected_markers[marker_identifier].corners[0][aoi[corner]["marker_corner_index"]]) except Exception as e: raise SceneProjectionFailed(f'Missing marker #{e} to build ArUco AOI scene') aruco_aoi_scene[aruco_aoi_name] = AOIFeatures.AreaOfInterest(aoi_corners) # Then each inner aoi is projected from the current aruco aoi for inner_aoi_name, inner_aoi in self.aoi_3d_scene.items(): if aruco_aoi_name != inner_aoi_name: aoi_corners = [numpy.array(aruco_aoi_scene[aruco_aoi_name].outter_axis(inner)) for inner in self.__orthogonal_projection_cache[inner_aoi_name]] aruco_aoi_scene[inner_aoi_name] = AOIFeatures.AreaOfInterest(aoi_corners) return AOI2DScene.AOI2DScene(aruco_aoi_scene) def draw_axis(self, image: numpy.array): """ Draw scene axis into image. Parameters: image: where to draw """ self.aruco_scene.draw_axis(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D) def draw_places(self, image: numpy.array): """ Draw scene places into image. Parameters: image: where to draw """ self.aruco_scene.draw_places(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D) # Define default ArEnvironment image_paremeters values DEFAULT_ARENVIRONMENT_IMAGE_PARAMETERS = { "draw_detected_markers": { "color": (0, 255, 0), "draw_axes": { "thickness": 3 } } } @dataclass class ArEnvironment(): """ Define Augmented Reality environment based on ArUco marker detection. Parameters: name: environment name aruco_detector: ArUco marker detector camera_frame: where to project scenes scenes: all environment scenes """ name: str aruco_detector: ArUcoDetector.ArUcoDetector = field(default_factory=ArUcoDetector.ArUcoDetector) camera_frame: ArFrame = field(default_factory=ArFrame) scenes: dict = field(default_factory=dict) image_parameters: dict = field(default_factory=DEFAULT_ARENVIRONMENT_IMAGE_PARAMETERS) def __post_init__(self): # Setup camera frame parent attribute if self.camera_frame is not None: self.camera_frame.parent = self # Setup scenes parent attribute for name, scene in self.scenes.items(): scene.parent = self # Init a lock to share AOI scene projections into camera frame between multiple threads self.__camera_frame_lock = threading.Lock() # Define public timestamp buffer to store ignored gaze positions self.ignored_gaze_positions = GazeFeatures.TimeStampedGazePositions() @classmethod def from_dict(self, environment_data, working_directory: str = None) -> ArEnvironmentType: new_environment_name = environment_data.pop('name') try: new_detector_data = environment_data.pop('aruco_detector') new_aruco_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(**new_detector_data.pop('dictionary')) new_marker_size = new_detector_data.pop('marker_size') # Check optic_parameters value type optic_parameters_value = new_detector_data.pop('optic_parameters') # str: relative path to .json file if type(optic_parameters_value) == str: optic_parameters_value = os.path.join(working_directory, optic_parameters_value) new_optic_parameters = ArUcoOpticCalibrator.OpticParameters.from_json(optic_parameters_value) # dict: else: new_optic_parameters = ArUcoOpticCalibrator.OpticParameters(**optic_parameters_value) # Check detector parameters value type detector_parameters_value = new_detector_data.pop('parameters') # str: relative path to .json file if type(detector_parameters_value) == str: detector_parameters_value = os.path.join(working_directory, detector_parameters_value) new_aruco_detector_parameters = ArUcoDetector.DetectorParameters.from_json(detector_parameters_value) # dict: else: new_aruco_detector_parameters = ArUcoDetector.DetectorParameters(**detector_parameters_value) new_aruco_detector = ArUcoDetector.ArUcoDetector(new_aruco_dictionary, new_marker_size, new_optic_parameters, new_aruco_detector_parameters) except KeyError: new_aruco_detector = None # Load camera frame as large as aruco dectector optic parameters try: camera_frame_data = environment_data.pop('camera_frame') # Create camera frame new_camera_frame = ArFrame.from_dict(camera_frame_data, working_directory) # Setup camera frame new_camera_frame.name = new_environment_name new_camera_frame.size = new_optic_parameters.dimensions new_camera_frame.background = numpy.zeros((new_optic_parameters.dimensions[1], new_optic_parameters.dimensions[0], 3)).astype(numpy.uint8) except KeyError: new_camera_frame = None # Build scenes new_scenes = {} for scene_name, scene_data in environment_data.pop('scenes').items(): # Append name scene_data['name'] = scene_name # Create new scene new_scene = ArScene.from_dict(scene_data, working_directory) # Append new scene new_scenes[scene_name] = new_scene # Setup expected aoi of each camera frame layer aoi scan path with the aoi of corresponding scene layer if new_camera_frame is not None: for camera_frame_layer_name, camera_frame_layer in new_camera_frame.layers.items(): if camera_frame_layer.aoi_scan_path is not None: all_aoi_list = [] for scene_name, scene in new_scenes.items(): try: scene_layer = scene.layers[camera_frame_layer_name] all_aoi_list.extend(list(scene_layer.aoi_scene.keys())) except KeyError: continue camera_frame_layer.aoi_scan_path.expected_aois = all_aoi_list # Load environment image parameters try: new_environment_image_parameters = environment_data.pop('image_parameters') except KeyError: new_environment_image_parameters = DEFAULT_ARENVIRONMENT_IMAGE_PARAMETERS # Create new environment return ArEnvironment(new_environment_name, \ new_aruco_detector, \ new_camera_frame, \ new_scenes, \ new_environment_image_parameters \ ) @classmethod def from_json(self, json_filepath: str) -> ArEnvironmentType: """ Load ArEnvironment from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: environment_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArEnvironment.from_dict(environment_data, working_directory) def __str__(self) -> str: """ Returns: String representation """ output = f'Name:\n{self.name}\n' output += f'ArUcoDetector:\n{self.aruco_detector}\n' for name, scene in self.scenes.items(): output += f'\"{name}\" ArScene:\n{scene}\n' return output @property def frames(self): """Iterate over all environment scenes frames""" # For each scene for scene_name, scene in self.scenes.items(): # For each frame for name, frame in scene.frames.items(): yield frame def detect_and_project(self, image: numpy.array) -> Tuple[float, dict]: """Detect environment aruco markers from image and project scenes into camera frame. Returns: - detection_time: aruco marker detection time in ms - exceptions: dictionary with exception raised per scene """ # Detect aruco markers detection_time = self.aruco_detector.detect_markers(image) # Lock camera frame exploitation self.__camera_frame_lock.acquire() # Fill camera frame background with image self.camera_frame.background = image # Clear former layers projection into camera frame for came_layer_name, camera_layer in self.camera_frame.layers.items(): camera_layer.aoi_scene = AOI2DScene.AOI2DScene() # Store exceptions for each scene exceptions = {} # Project each aoi 3d scene into camera frame for scene_name, scene in self.scenes.items(): ''' TODO: Enable aruco_aoi processing if scene.aruco_aoi: try: # Build AOI scene directly from detected ArUco marker corners self.camera_frame.aoi_2d_scene |= scene.build_aruco_aoi_scene(self.aruco_detector.detected_markers) except SceneProjectionFailed: pass ''' try: # Estimate scene markers poses self.aruco_detector.estimate_markers_pose(scene.aruco_scene.identifiers) # Estimate scene pose from detected scene markers tvec, rmat, _, _ = scene.estimate_pose(self.aruco_detector.detected_markers) # Project scene into camera frame according estimated pose for layer_name, layer_projection in scene.project(tvec, rmat): try: self.camera_frame.layers[layer_name].aoi_scene |= layer_projection except KeyError: pass # Store exceptions and continue except Exception as e: exceptions[scene_name] = e # Unlock camera frame exploitation self.__camera_frame_lock.release() # Return dection time and exceptions return detection_time, exceptions def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition): """Project timestamped gaze position into each frame. !!! warning detect_and_project method needs to be called first. """ # Can't use camera frame when it is locked if self.__camera_frame_lock.locked(): # TODO: Store ignored timestamped gaze positions for further projections # PB: This would imply to also store frame projections !!! self.ignored_gaze_positions[timestamp] = gaze_position return # Lock camera frame exploitation self.__camera_frame_lock.acquire() # Project gaze position into camera frame yield self.camera_frame, self.camera_frame.look(timestamp, gaze_position) # Project gaze position into each frame if possible for frame in self.frames: # Is there an AOI inside camera frame layers projection which its name equals to a frame name? for camera_layer_name, camera_layer in self.camera_frame.layers.items(): try: aoi_2d = camera_layer.aoi_scene[frame.name] # TODO?: Should we prefer to use camera frame AOIMatcher object? if aoi_2d.contains_point(gaze_position.value): inner_x, inner_y = aoi_2d.clockwise().inner_axis(gaze_position.value) # QUESTION: How to project gaze precision? inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) yield frame, frame.look(timestamp, inner_gaze_position * frame.size) # Ignore missing aoi in camera frame layer projection except KeyError: pass # Unlock camera frame exploitation self.__camera_frame_lock.release() def map(self): """Project camera frame background into frames background. .. warning:: detect_and_project method needs to be called first. """ # Can't use camera frame when it is locked if self.__camera_frame_lock.locked(): return # Lock camera frame exploitation self.__camera_frame_lock.acquire() # Project image into each frame if possible for frame in self.frames: # Is there an AOI inside camera frame layers projection which its name equals to a frame name? for camera_layer_name, camera_layer in self.camera_frame.layers.items(): try: aoi_2d = camera_layer.aoi_scene[frame.name] # Apply perspective transform algorithm to fill aoi frame background width, height = frame.size destination = numpy.float32([[0, height],[width, height],[width, 0],[0, 0]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) frame.background = cv2.warpPerspective(self.camera_frame.background, mapping, (width, height)) # Ignore missing frame projection except KeyError: pass # Unlock camera frame exploitation self.__camera_frame_lock.release() def image(self, draw_detected_markers: dict = None): """Get camera frame projections with ArUco detection visualisation. Parameters: image: image where to draw draw_detected_markers: ArucoMarker.draw parameters (if None, no marker drawn) """ # Use image_parameters attribute if no parameters if draw_detected_markers is None: return self.image(**self.image_parameters) # Can't use camera frame when it is locked if self.__camera_frame_lock.locked(): return # Lock camera frame exploitation self.__camera_frame_lock.acquire() # Get camera frame image image = self.camera_frame.image() # Draw detected markers if required if draw_detected_markers is not None: self.aruco_detector.draw_detected_markers(image, draw_detected_markers) # Unlock camera frame exploitation self.__camera_frame_lock.release() return image def to_json(self, json_filepath): """Save environment to .json file.""" with open(json_filepath, 'w', encoding='utf-8') as file: json.dump(self, file, ensure_ascii=False, indent=4, cls=DataStructures.JsonEncoder)