#!/usr/bin/env python """ArGaze pipeline assets.""" __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple, Any from dataclasses import dataclass, field import json import os import importlib from inspect import getmembers import threading import time from argaze import DataStructures, GazeFeatures from argaze.AreaOfInterest import * from argaze.GazeAnalysis import * import numpy import cv2 ArLayerType = TypeVar('ArLayer', bound="ArLayer") # Type definition for type annotation convenience ArFrameType = TypeVar('ArFrame', bound="ArFrame") # Type definition for type annotation convenience ArSceneType = TypeVar('ArScene', bound="ArScene") # Type definition for type annotation convenience ArCameraType = TypeVar('ArCamera', bound="ArCamera") # Type definition for type annotation convenience class PoseEstimationFailed(Exception): """ Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. """ def __init__(self, message, unconsistencies=None): super().__init__(message) self.unconsistencies = unconsistencies class SceneProjectionFailed(Exception): """ Exception raised by ArCamera watch method when the scene can't be projected. """ def __init__(self, message): super().__init__(message) class LoadingFailed(Exception): """ Exception raised when attributes loading fails. """ def __init__(self, message): super().__init__(message) # Define default ArLayer draw parameters DEFAULT_ARLAYER_DRAW_PARAMETERS = { "draw_aoi_scene": { "draw_aoi": { "color": (255, 255, 255), "border_size": 1 } }, "draw_aoi_matching": { "draw_matched_fixation": { "deviation_circle_color": (255, 255, 255) }, "draw_matched_fixation_positions": { "position_color": (0, 255, 255), "line_color": (0, 0, 0) }, "draw_matched_region": { "color": (0, 255, 0), "border_size": 4 }, "draw_looked_aoi": { "color": (0, 255, 0), "border_size": 2 }, "looked_aoi_name_color": (255, 255, 255), "looked_aoi_name_offset": (0, -10) } } @dataclass class ArLayer(): """ Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed. Parameters: name: name of the layer aoi_scene: AOI scene description aoi_matcher: AOI matcher object aoi_scan_path: AOI scan path object aoi_scan_path_analyzers: dictionary of AOI scan path analyzers log: enable aoi scan path analysis logging draw_parameters: default parameters passed to draw method """ name: str aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene) aoi_matcher: GazeFeatures.AOIMatcher = field(default_factory=GazeFeatures.AOIMatcher) aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) aoi_scan_path_analyzers: dict = field(default_factory=dict) log: bool = field(default=False) draw_parameters: dict = field(default_factory=DEFAULT_ARLAYER_DRAW_PARAMETERS) def __post_init__(self): # Define parent attribute: it will be setup by parent later self.__parent = None # Init current gaze movement self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() # Init lock to share looking data with multiples threads self.__look_lock = threading.Lock() # Cast aoi scene to its effective dimension if self.aoi_scene.dimension == 2: self.aoi_scene = AOI2DScene.AOI2DScene(self.aoi_scene) elif self.aoi_scene.dimension == 3: self.aoi_scene = AOI3DScene.AOI3DScene(self.aoi_scene) # Prepare logging if needed self.__ts_logs = {} if self.log: # Create timestamped buffers to log each aoi scan path analysis for aoi_scan_path_analyzer_module_path in self.aoi_scan_path_analyzers.keys(): self.__ts_logs[aoi_scan_path_analyzer_module_path] = DataStructures.TimeStampedBuffer() @classmethod def from_dict(self, layer_data: dict, working_directory: str = None) -> ArLayerType: """Load attributes from dictionary. Parameters: layer_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load name try: new_layer_name = layer_data.pop('name') except KeyError: new_layer_name = None # Load aoi scene try: new_aoi_scene_value = layer_data.pop('aoi_scene') # str: relative path to file if type(new_aoi_scene_value) == str: filepath = os.path.join(working_directory, new_aoi_scene_value) file_format = filepath.split('.')[-1] # JSON file format for 2D or 3D dimension if file_format == 'json': new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath) # SVG file format for 2D dimension only if file_format == 'svg': new_aoi_scene = AOIFeatures.AOI2DScene.from_svg(filepath) # OBJ file format for 3D dimension only elif file_format == 'obj': new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath) # dict: else: new_aoi_scene = AOIFeatures.AOIScene.from_dict(new_aoi_scene_value) except KeyError: pass # Add AOI 2D Scene by default new_aoi_scene = AOI2DScene.AOI2DScene() # Edit expected AOI list by removing AOI with name equals to layer name expected_aoi = list(new_aoi_scene.keys()) if new_layer_name in expected_aoi: expected_aoi.remove(new_layer_name) # Load aoi matcher try: aoi_matcher_value = layer_data.pop('aoi_matcher') aoi_matcher_module_path, aoi_matcher_parameters = aoi_matcher_value.popitem() # Prepend argaze.GazeAnalysis path when a single name is provided if len(aoi_matcher_module_path.split('.')) == 1: aoi_matcher_module_path = f'argaze.GazeAnalysis.{aoi_matcher_module_path}' aoi_matcher_module = importlib.import_module(aoi_matcher_module_path) new_aoi_matcher = aoi_matcher_module.AOIMatcher(**aoi_matcher_parameters) except KeyError: new_aoi_matcher = None # Load AOI scan path try: new_aoi_scan_path_data = layer_data.pop('aoi_scan_path') new_aoi_scan_path_data['expected_aoi'] = expected_aoi new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) except KeyError: new_aoi_scan_path_data = {} new_aoi_scan_path_data['expected_aoi'] = expected_aoi new_aoi_scan_path = None # Load AOI scan path analyzers new_aoi_scan_path_analyzers = {} try: new_aoi_scan_path_analyzers_value = layer_data.pop('aoi_scan_path_analyzers') for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items(): # Prepend argaze.GazeAnalysis path when a single name is provided if len(aoi_scan_path_analyzer_module_path.split('.')) == 1: aoi_scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_module_path}' aoi_scan_path_analyzer_module = importlib.import_module(aoi_scan_path_analyzer_module_path) # Check aoi scan path analyzer parameters type members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of argaze.GazeAnalysis module parameter_module_path = parameter_type.__module__.split('.') # Check if parameter is part of a package if len(parameter_type.__module__.split('.')) > 1: # Try get existing analyzer instance to append as parameter try: aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_type.__module__] except KeyError: raise LoadingFailed(f'{aoi_scan_path_analyzer_module_path} aoi scan path analyzer loading fails because {parameter_type.__module__} aoi scan path analyzer is missing.') aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters) new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer # Force AOI scan path creation if len(new_aoi_scan_path_analyzers) > 0 and new_aoi_scan_path == None: new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) except KeyError: pass # Load log status try: new_layer_log = layer_data.pop('log') except KeyError: new_layer_log = False # Load image parameters try: new_layer_draw_parameters = layer_data.pop('draw_parameters') except KeyError: new_layer_draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS # Create layer return ArLayer(new_layer_name, \ new_aoi_scene, \ new_aoi_matcher, \ new_aoi_scan_path, \ new_aoi_scan_path_analyzers, \ new_layer_log, \ new_layer_draw_parameters \ ) @classmethod def from_json(self, json_filepath: str) -> ArLayerType: """ Load attributes from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: layer_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArLayer.from_dict(layer_data, working_directory) @property def parent(self): """Get parent instance""" return self.__parent @parent.setter def parent(self, parent): """Get parent instance""" self.__parent = parent @property def logs(self): """ Get stored logs """ return self.__ts_logs def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> dict: """ Project timestamped gaze movement into layer. !!! warning Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute. Parameters: gaze_movement: gaze movement to project Returns: looked_aoi: most likely looked aoi name aoi_scan_path_analysis: aoi scan path analysis at each new scan step if aoi_scan_path is instanciated exception: error catched during gaze movement processing """ # Lock layer exploitation self.__look_lock.acquire() # Update current gaze movement self.__gaze_movement = gaze_movement # Init looked aoi looked_aoi_name, looked_aoi = (None, None) # Init aoi scan path analysis report aoi_scan_path_analysis = {} # Assess pipeline execution times execution_times = { 'aoi_matcher': None, 'aoi_scan_step_analyzers': {} } # Catch any error exception = None try: if self.aoi_matcher is not None: # Store aoi matching start date matching_start = time.perf_counter() # Update looked aoi thanks to aoi matcher # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally looked_aoi_name, looked_aoi = self.aoi_matcher.match(self.aoi_scene, gaze_movement) # Assess aoi matching time in ms execution_times['aoi_matcher'] = (time.perf_counter() - matching_start) * 1e3 # Valid and finished gaze movement has been identified if gaze_movement.valid and gaze_movement.finished: if GazeFeatures.is_fixation(gaze_movement): # Append fixation to aoi scan path if self.aoi_scan_path is not None and looked_aoi_name is not None: aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, looked_aoi_name) # Is there a new step? if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): # Store aoi scan path analysis start date aoi_scan_path_analysis_start = time.perf_counter() # Analyze aoi scan path aoi_scan_path_analyzer.analyze(self.aoi_scan_path) # Assess aoi scan step analysis time in ms execution_times['aoi_scan_step_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_path_analysis_start) * 1e3 # Store analysis aoi_scan_path_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis # Log analysis if self.log: self.__ts_logs[aoi_scan_path_analyzer_module_path][timestamp] = aoi_scan_path_analyzer.analysis elif GazeFeatures.is_saccade(gaze_movement): # Append saccade to aoi scan path if self.aoi_scan_path is not None: self.aoi_scan_path.append_saccade(timestamp, gaze_movement) except Exception as e: print('Warning: the following error occurs in ArLayer.look method:', e) looked_aoi = None aoi_scan_path_analysis = {} exception = e # Unlock layer exploitation self.__look_lock.release() # Sum all execution times total_execution_time = 0 if execution_times['aoi_matcher']: total_execution_time += execution_times['aoi_matcher'] for _, aoi_scan_path_analysis_time in execution_times['aoi_scan_step_analyzers'].items(): total_execution_time += aoi_scan_path_analysis_time execution_times['total'] = total_execution_time # Return look data return looked_aoi, aoi_scan_path_analysis, execution_times, exception def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ Draw into image. Parameters: draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn) draw_aoi_matching: AOIMatcher.draw parameters (which depends of the loaded aoi matcher module, if None, no aoi matching is drawn) """ # Use draw_parameters attribute if no parameters if draw_aoi_scene is None and draw_aoi_matching is None: return self.draw(image, **self.draw_parameters) # Lock frame exploitation self.__look_lock.acquire() # Draw aoi if required if draw_aoi_scene is not None: self.aoi_scene.draw(image, **draw_aoi_scene) # Draw aoi matching if required if draw_aoi_matching is not None and self.aoi_matcher is not None: self.aoi_matcher.draw(image, self.aoi_scene, **draw_aoi_matching) # Unlock frame exploitation self.__look_lock.release() # Define default ArFrame image parameters DEFAULT_ARFRAME_IMAGE_PARAMETERS = { "background_weight": 1., "heatmap_weight": 0.5, "draw_scan_path": { "draw_fixations": { "deviation_circle_color": (255, 255, 255), "duration_border_color": (127, 127, 127), "duration_factor": 1e-2 }, "draw_saccades": { "line_color": (255, 255, 255) }, "deepness": 0 }, "draw_gaze_positions": { "color": (0, 255, 255), "size": 2 } } @dataclass class ArFrame(): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. Parameters: name: name of the frame size: defines the dimension of the rectangular area where gaze positions are projected. gaze_movement_identifier: gaze movement identification algorithm filter_in_progress_identification: ignore in progress gaze movement identification scan_path: scan path object scan_path_analyzers: dictionary of scan path analyzers heatmap: heatmap object background: picture to draw behind layers: dictionary of AOI layers log: enable scan path analysis logging image_parameters: default parameters passed to image method """ name: str size: tuple[int] = field(default=(1, 1)) gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier) filter_in_progress_identification: bool = field(default=True) scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath) scan_path_analyzers: dict = field(default_factory=dict) heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap) background: numpy.array = field(default_factory=lambda : numpy.array([])) layers: dict = field(default_factory=dict) log: bool = field(default=False) image_parameters: dict = field(default_factory=DEFAULT_ARFRAME_IMAGE_PARAMETERS) def __post_init__(self): # Define parent attribute: it will be setup by parent later self.__parent = None # Setup layers parent attribute for name, layer in self.layers.items(): layer.parent = self # Init current gaze position self.__gaze_position = GazeFeatures.UnvalidGazePosition() # Init lock to share looked data with multiples threads self.__look_lock = threading.Lock() # Prepare logging if needed self.__ts_logs = {} if self.log: # Create timestamped buffers to log each aoi scan path analysis for scan_path_analyzer_module_path in self.scan_path_analyzers.keys(): self.__ts_logs[scan_path_analyzer_module_path] = DataStructures.TimeStampedBuffer() @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: """Load attributes from dictionary. Parameters: frame_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load name try: new_frame_name = frame_data.pop('name') except KeyError: new_frame_name = None # Load size try: new_frame_size = frame_data.pop('size') except KeyError: new_frame_size = (0, 0) # Load gaze movement identifier try: gaze_movement_identifier_value = frame_data.pop('gaze_movement_identifier') gaze_movement_identifier_module_path, gaze_movement_identifier_parameters = gaze_movement_identifier_value.popitem() # Prepend argaze.GazeAnalysis path when a single name is provided if len(gaze_movement_identifier_module_path.split('.')) == 1: gaze_movement_identifier_module_path = f'argaze.GazeAnalysis.{gaze_movement_identifier_module_path}' gaze_movement_identifier_module = importlib.import_module(gaze_movement_identifier_module_path) new_gaze_movement_identifier = gaze_movement_identifier_module.GazeMovementIdentifier(**gaze_movement_identifier_parameters) except KeyError: new_gaze_movement_identifier = None # Current fixation matching try: filter_in_progress_identification = frame_data.pop('filter_in_progress_identification') except KeyError: filter_in_progress_identification = True # Load scan path try: new_scan_path_data = frame_data.pop('scan_path') new_scan_path = GazeFeatures.ScanPath(**new_scan_path_data) except KeyError: new_scan_path_data = {} new_scan_path = None # Load scan path analyzers new_scan_path_analyzers = {} try: new_scan_path_analyzers_value = frame_data.pop('scan_path_analyzers') for scan_path_analyzer_module_path, scan_path_analyzer_parameters in new_scan_path_analyzers_value.items(): # Prepend argaze.GazeAnalysis path when a single name is provided if len(scan_path_analyzer_module_path.split('.')) == 1: scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{scan_path_analyzer_module_path}' scan_path_analyzer_module = importlib.import_module(scan_path_analyzer_module_path) # Check scan path analyzer parameters type members = getmembers(scan_path_analyzer_module.ScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of a package if len(parameter_type.__module__.split('.')) > 1: # Try get existing analyzer instance to append as parameter try: scan_path_analyzer_parameters[parameter] = new_scan_path_analyzers[parameter_type.__module__] except KeyError: raise LoadingFailed(f'{scan_path_analyzer_module_path} scan path analyzer loading fails because {parameter_type.__module__} scan path analyzer is missing.') scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters) new_scan_path_analyzers[scan_path_analyzer_module_path] = scan_path_analyzer # Force scan path creation if len(new_scan_path_analyzers) > 0 and new_scan_path == None: new_scan_path = GazeFeatures.ScanPath(**new_scan_path_data) except KeyError: pass # Load heatmap try: new_heatmap_data = frame_data.pop('heatmap') # Default heatmap size equals frame size if 'size' not in new_heatmap_data.keys(): new_heatmap_data['size'] = new_frame_size new_heatmap = AOIFeatures.Heatmap(**new_heatmap_data) except KeyError: new_heatmap_data = {} new_heatmap = None # Load background image try: new_frame_background_value = frame_data.pop('background') new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC) except KeyError: new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8) # Load layers new_layers = {} try: for layer_name, layer_data in frame_data.pop('layers').items(): # Append name layer_data['name'] = layer_name # Create layer new_layer = ArLayer.from_dict(layer_data, working_directory) # Append new layer new_layers[layer_name] = new_layer except KeyError: pass # Load log status try: new_frame_log = frame_data.pop('log') except KeyError: new_frame_log = False # Load image parameters try: new_frame_image_parameters = frame_data.pop('image_parameters') except KeyError: new_frame_image_parameters = DEFAULT_ARFRAME_IMAGE_PARAMETERS # Create frame return ArFrame(new_frame_name, \ new_frame_size, \ new_gaze_movement_identifier, \ filter_in_progress_identification, \ new_scan_path, \ new_scan_path_analyzers, \ new_heatmap, \ new_frame_background, \ new_layers, \ new_frame_log, new_frame_image_parameters \ ) @classmethod def from_json(self, json_filepath: str) -> ArFrameType: """ Load attributes from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: frame_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArFrame.from_dict(frame_data, working_directory) @property def parent(self): """Get parent instance""" return self.__parent @parent.setter def parent(self, parent): """Get parent instance""" self.__parent = parent @property def logs(self): """ Get stored logs """ return self.__ts_logs def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Tuple[GazeFeatures.GazeMovement, dict, dict, dict, Exception]: """ Project gaze position into frame. !!! warning Be aware that gaze positions are in the same range of value than size attribute. Parameters: timestamp: any number used to know when the given gaze position occurs gaze_position: gaze position to project Returns: identified_gaze_movement: identified gaze movement from incoming consecutive timestamped gaze positions if gaze_movement_identifier is instanciated. Current gaze movement if filter_in_progress_identification is False. scan_path_analysis: scan path analysis at each new scan step if scan_path is instanciated. layers_analysis: aoi scan path analysis at each new aoi scan step for each instanciated layers aoi scan path. execution_times: all pipeline steps execution times. exception: error catched during gaze position processing. """ # Lock frame exploitation self.__look_lock.acquire() # Update current gaze position self.__gaze_position = gaze_position # No gaze movement identified by default identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() # Init scan path analysis report scan_step_analysis = {} # Init layer analysis report layer_analysis = {} # Assess pipeline execution times execution_times = { 'gaze_movement_identifier': None, 'scan_step_analyzers':{}, 'heatmap': None, 'layers': {} } # Catch any error exception = None try: # Identify gaze movement if self.gaze_movement_identifier is not None: # Store movement identification start date identification_start = time.perf_counter() # Identify finished gaze movement identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__gaze_position) # Assess movement identification time in ms execution_times['gaze_movement_identifier'] = (time.perf_counter() - identification_start) * 1e3 # Valid and finished gaze movement has been identified if identified_gaze_movement.valid and identified_gaze_movement.finished: if GazeFeatures.is_fixation(identified_gaze_movement): # Append fixation to scan path if self.scan_path is not None: self.scan_path.append_fixation(timestamp, identified_gaze_movement) elif GazeFeatures.is_saccade(identified_gaze_movement): # Append saccade to scan path if self.scan_path is not None: scan_step = self.scan_path.append_saccade(timestamp, identified_gaze_movement) # Is there a new step? if scan_step and len(self.scan_path) > 1: for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): # Store scan step analysis start date scan_step_analysis_start = time.perf_counter() # Analyze aoi scan path scan_path_analyzer.analyze(self.scan_path) # Assess scan step analysis time in ms execution_times['scan_step_analyzers'][scan_path_analyzer_module_path] = (time.perf_counter() - scan_step_analysis_start) * 1e3 # Store analysis scan_step_analysis[scan_path_analyzer_module_path] = scan_path_analyzer.analysis # Log analysis if self.log: self.__ts_logs[scan_path_analyzer_module_path][timestamp] = scan_path_analyzer.analysis # No valid finished gaze movement: optionnaly stop in progress identification filtering elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification: identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement # Update heatmap if self.heatmap is not None: # Store heatmap start date heatmap_start = time.perf_counter() # Scale gaze position value scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]]) # Update heatmap image self.heatmap.update(self.__gaze_position.value * scale) # Assess heatmap time in ms execution_times['heatmap'] = (time.perf_counter() - heatmap_start) * 1e3 # Look layers with valid identified gaze movement # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally for layer_name, layer in self.layers.items(): looked_aoi, aoi_scan_path_analysis, layer_execution_times, layer_exception = layer.look(timestamp, identified_gaze_movement) layer_analysis[layer_name] = aoi_scan_path_analysis execution_times['layers'][layer_name] = layer_execution_times if layer_exception: raise(layer_exception) except Exception as e: print('Warning: the following error occurs in ArFrame.look method:', e) identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() scan_step_analysis = {} layer_analysis = {} exception = e # Unlock frame exploitation self.__look_lock.release() # Sum all execution times total_execution_time = 0 if execution_times['gaze_movement_identifier']: total_execution_time += execution_times['gaze_movement_identifier'] for _, scan_step_analysis_time in execution_times['scan_step_analyzers'].items(): total_execution_time += scan_step_analysis_time if execution_times['heatmap']: total_execution_time += execution_times['heatmap'] for _, layer_execution_times in execution_times['layers'].items(): total_execution_time += layer_execution_times['total'] execution_times['total'] = total_execution_time # Return look data return identified_gaze_movement, scan_step_analysis, layer_analysis, execution_times, exception def __image(self, background_weight: float = None, heatmap_weight: float = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. Parameters: background_weight: weight of background overlay heatmap_weight: weight of heatmap overlay draw_scan_path: [GazeFeatures.ScanPath.draw](argaze.md/#argaze.GazeFeatures.ScanPath.draw) parameters (if None, no scan path is drawn) draw_layers: dictionary of [ArLayer.draw](argaze.md/#argaze.ArFeatures.ArLayer.draw) parameters per layer (if None, no layer is drawn) draw_gaze_positions: [GazeFeatures.GazePosition.draw](argaze.md/#argaze.GazeFeatures.GazePosition.draw) parameters (if None, no gaze position is drawn) draw_fixations: [GazeFeatures.Fixation.draw](argaze.md/#argaze.GazeFeatures.Fixation.draw) parameters (if None, no fixation is drawn) draw_saccades: [GazeFeatures.Saccade.draw](argaze.md/#argaze.GazeFeatures.Saccade.draw) parameters (if None, no saccade is drawn) """ # Lock frame exploitation self.__look_lock.acquire() # Draw background only if background_weight is not None and (heatmap_weight is None or self.heatmap is None): image = self.background.copy() # Draw mix background and heatmap if required elif background_weight is not None and heatmap_weight is not None and self.heatmap: background_image = self.background.copy() heatmap_image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) image = cv2.addWeighted(heatmap_image, heatmap_weight, background_image, background_weight, 0) # Draw heatmap only elif background_weight is None and heatmap_weight is not None and self.heatmap: image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR) # Draw black image else: image = numpy.full((self.size[1], self.size[0], 3), 0).astype(numpy.uint8) # Draw scan path if required if draw_scan_path is not None and self.scan_path is not None: self.scan_path.draw(image, **draw_scan_path) # Draw current fixation if required if draw_fixations is not None and self.gaze_movement_identifier is not None: self.gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) # Draw current saccade if required if draw_saccades is not None and self.gaze_movement_identifier is not None: self.gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) # Draw layers if required if draw_layers is not None: for layer_name, draw_layer in draw_layers.items(): self.layers[layer_name].draw(image, **draw_layer) # Draw current gaze position if required if draw_gaze_positions is not None: self.__gaze_position.draw(image, **draw_gaze_positions) # Unlock frame exploitation self.__look_lock.release() return image def image(self, **kwargs: dict) -> numpy.array: """ Get frame image. Parameters: kwargs: ArFrame.__image parameters """ # Use image_parameters attribute if no kwargs if kwargs: return self.__image(**kwargs) return self.__image(**self.image_parameters) @dataclass class ArScene(): """ Define abstract Augmented Reality scene with ArLayers and ArFrames inside. Parameters: name: name of the scene layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. """ name: str layers: dict = field(default_factory=dict) frames: dict = field(default_factory=dict) angle_tolerance: float = field(default=0.) distance_tolerance: float = field(default=0.) def __post_init__(self): # Define parent attribute: it will be setup by parent object later self.__parent = None # Setup layer parent attribute for name, layer in self.layers.items(): layer.parent = self # Setup frame parent attribute for name, frame in self.frames.items(): frame.parent = self def __str__(self) -> str: """ Returns: String representation """ output = f'parent:\n{self.parent.name}\n' if len(self.layers): output += f'ArLayers:\n' for name, layer in self.layers.items(): output += f'{name}:\n{layer}\n' if len(self.frames): output += f'ArFrames:\n' for name, frame in self.frames.items(): output += f'{name}:\n{frame}\n' return output @property def parent(self): """Get parent instance""" return self.__parent @parent.setter def parent(self, parent): """Get parent instance""" self.__parent = parent @classmethod def from_dict(self, scene_data: dict, working_directory: str = None) -> ArSceneType: """ Load ArScene from dictionary. Parameters: scene_data: dictionary working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load name try: new_scene_name = scene_data.pop('name') except KeyError: new_scene_name = None # Load layers new_layers = {} try: for layer_name, layer_data in scene_data.pop('layers').items(): # Append name layer_data['name'] = layer_name # Create layer new_layer = ArLayer.from_dict(layer_data, working_directory) # Append new layer new_layers[layer_name] = new_layer except KeyError: pass # Load frames new_frames = {} try: for frame_name, frame_data in scene_data.pop('frames').items(): # str: relative path to file if type(frame_data) == str: filepath = os.path.join(working_directory, frame_data) file_format = filepath.split('.')[-1] # JSON file format for 2D or 3D dimension if file_format == 'json': new_frame = ArFrame.from_json(filepath) # dict: else: # Append name frame_data['name'] = frame_name new_frame = ArFrame.from_dict(frame_data, working_directory) # Look for a scene layer with an AOI named like the frame for scene_layer_name, scene_layer in new_layers.items(): try: frame_3d = scene_layer.aoi_scene[frame_name] # Check that the frame have a layer named like this scene layer aoi_2d_scene = new_frame.layers[scene_layer_name].aoi_scene # Transform 2D frame layer AOI into 3D scene layer AOI # Then, add them to scene layer scene_layer.aoi_scene |= aoi_2d_scene.dimensionalize(frame_3d, new_frame.size) '''DEPRECATED: but maybe still usefull? # Project and reframe each layers into corresponding frame layers for frame_layer_name, frame_layer in new_frame.layers.items(): try: layer = new_layers[frame_layer_name] layer_aoi_scene_projection = layer.aoi_scene.orthogonal_projection aoi_frame_projection = layer_aoi_scene_projection[frame_name] frame_layer.aoi_scene = layer_aoi_scene_projection.reframe(aoi_frame_projection, new_frame.size) if frame_layer.aoi_scan_path is not None: # Edit expected AOI list by removing AOI with name equals to frame layer name expected_aoi = list(layer.aoi_scene.keys()) if frame_layer_name in expected_aoi: expected_aoi.remove(frame_layer_name) frame_layer.aoi_scan_path.expected_aoi = expected_aoi except KeyError: continue ''' except KeyError as e: print(e) # Append new frame new_frames[frame_name] = new_frame except KeyError: pass return ArScene(new_scene_name, new_layers, new_frames, **scene_data) def estimate_pose(self, detected_features: Any) -> Tuple[numpy.array, numpy.array]: """Define abstract estimate scene pose method. Parameters: detected_features: any features detected by parent ArCamera that will help in scene pose estimation. Returns: tvec: scene translation vector rvec: scene rotation matrix """ raise NotImplementedError('estimate_pose() method not implemented') def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> Tuple[str, AOI2DScene.AOI2DScene]: """Project layers according estimated pose and optional horizontal field of view clipping angle. Parameters: tvec: translation vector rvec: rotation vector visual_hfov: horizontal field of view clipping angle Returns: layer_name: name of projected layer layer_projection: AOI2DScene projection """ for name, layer in self.layers.items(): # Clip AOI out of the visual horizontal field of view (optional) if visual_hfov > 0: # Transform layer aoi scene into camera referential aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec) # Get aoi inside vision cone field cone_vision_height_cm = 200 # cm cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) # Keep only aoi inside vision cone field aoi_scene_copy = layer.aoi_scene.copy(exclude=aoi_outside.keys()) else: aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) def draw(self, image: numpy.array, **kwargs: dict): """ Draw scene into image. Parameters: image: where to draw """ raise NotImplementedError('draw() method not implemented') @dataclass class ArCamera(ArFrame): """ Define abstract Augmented Reality camera as ArFrame with ArScenes inside. Parameters: scenes: all scenes to project into camera frame """ scenes: dict = field(default_factory=dict) def __post_init__(self): # Init ArFrame super().__post_init__() # Setup scenes parent attribute for name, scene in self.scenes.items(): scene.parent = self # Setup expected aoi of each layer aoi scan path with the aoi of corresponding scene layer for layer_name, layer in self.layers.items(): if layer.aoi_scan_path is not None: all_aoi_list = [] for scene_name, scene in self.scenes.items(): try: scene_layer = scene.layers[layer_name] all_aoi_list.extend(list(scene_layer.aoi_scene.keys())) except KeyError: continue layer.aoi_scan_path.expected_aoi = all_aoi_list # Init a lock to share scene projections into camera frame between multiple threads self._frame_lock = threading.Lock() # Define public timestamp buffer to store ignored gaze positions self.ignored_gaze_positions = GazeFeatures.TimeStampedGazePositions() def __str__(self) -> str: """ Returns: String representation """ output = f'Name:\n{self.name}\n' for name, scene in self.scenes.items(): output += f'\"{name}\" {type(scene)}:\n{scene}\n' return output @classmethod def from_dict(self, camera_data: dict, working_directory: str = None) -> ArCameraType: """ Load ArCamera from dictionary. Parameters: camera_data: dictionary working_directory: folder path where to load files when a dictionary value is a relative filepath. """ raise NotImplementedError('from_dict() method not implemented') @classmethod def from_json(self, json_filepath: str) -> ArCameraType: """ Load ArCamera from .json file. Parameters: json_filepath: path to json file """ raise NotImplementedError('from_json() method not implemented') @property def scene_frames(self): """Iterate over all scenes frames""" # For each scene for scene_name, scene in self.scenes.items(): # For each scene frame for name, scene_frame in scene.frames.items(): yield scene_frame def watch(self, image: numpy.array) -> Tuple[float, dict]: """Detect AR features from image and project scenes into camera frame. Returns: - detection_time: AR features detection time in ms - exceptions: dictionary with exception raised per scene """ raise NotImplementedError('watch() method not implemented') def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition): """Project timestamped gaze position into each scene frames. !!! warning watch method needs to be called first. """ # Can't use camera frame when it is locked if self._frame_lock.locked(): # TODO: Store ignored timestamped gaze positions for further projections # PB: This would imply to also store frame projections !!! self.ignored_gaze_positions[timestamp] = gaze_position return None, None # Lock camera frame exploitation self._frame_lock.acquire() # Project gaze position into camera frame yield self, super().look(timestamp, gaze_position) # Project gaze position into each scene frames if possible for scene_frame in self.scene_frames: # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? for camera_layer_name, camera_layer in self.layers.items(): try: aoi_2d = camera_layer.aoi_scene[scene_frame.name] # TODO?: Should we prefer to use camera frame AOIMatcher object? if aoi_2d.contains_point(gaze_position.value): inner_x, inner_y = aoi_2d.clockwise().inner_axis(*gaze_position.value) # QUESTION: How to project gaze precision? inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) yield scene_frame, scene_frame.look(timestamp, inner_gaze_position * scene_frame.size) # Ignore missing aoi in camera frame layer projection except KeyError: pass # Unlock camera frame exploitation self._frame_lock.release() def map(self): """Project camera frame background into scene frames background. .. warning:: watch method needs to be called first. """ # Can't use camera frame when it is locked if self._frame_lock.locked(): return # Lock camera frame exploitation self._frame_lock.acquire() # Project camera frame background into each scene frame if possible for frame in self.scene_frames: # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? for camera_layer_name, camera_layer in self.layers.items(): try: aoi_2d = camera_layer.aoi_scene[frame.name] # Apply perspective transform algorithm to fill aoi frame background width, height = frame.size destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) frame.background = cv2.warpPerspective(self.background, mapping, (width, height)) # Ignore missing frame projection except KeyError: pass # Unlock camera frame exploitation self._frame_lock.release() def image(self, **kwargs: dict) -> numpy.array: """ Get frame image. Parameters: kwargs: ArFrame.image parameters """ return super().image(**kwargs) def to_json(self, json_filepath): """Save camera to .json file.""" with open(json_filepath, 'w', encoding='utf-8') as file: json.dump(self, file, ensure_ascii=False, indent=4, cls=DataStructures.JsonEncoder)