#!/usr/bin/env python """ArGaze pipeline assets.""" __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple, Any, Iterator, Union import json import os import sys import importlib from inspect import getmembers import threading import time from argaze import DataFeatures, GazeFeatures from argaze.AreaOfInterest import * from argaze.GazeAnalysis import * import numpy import cv2 ArLayerType = TypeVar('ArLayer', bound="ArLayer") # Type definition for type annotation convenience ArFrameType = TypeVar('ArFrame', bound="ArFrame") # Type definition for type annotation convenience ArSceneType = TypeVar('ArScene', bound="ArScene") # Type definition for type annotation convenience ArCameraType = TypeVar('ArCamera', bound="ArCamera") # Type definition for type annotation convenience class PoseEstimationFailed(Exception): """ Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. """ def __init__(self, message, unconsistencies=None): super().__init__(message) self.unconsistencies = unconsistencies class SceneProjectionFailed(Exception): """ Exception raised by ArCamera watch method when the scene can't be projected. """ def __init__(self, message): super().__init__(message) class LoadingFailed(Exception): """ Exception raised when attributes loading fails. """ def __init__(self, message): super().__init__(message) # Define default ArLayer draw parameters DEFAULT_ARLAYER_DRAW_PARAMETERS = { "draw_aoi_scene": { "draw_aoi": { "color": (255, 255, 255), "border_size": 1 } }, "draw_aoi_matching": { "draw_matched_fixation": { "deviation_circle_color": (255, 255, 255) }, "draw_matched_fixation_positions": { "position_color": (0, 255, 255), "line_color": (0, 0, 0) }, "draw_matched_region": { "color": (0, 255, 0), "border_size": 4 }, "draw_looked_aoi": { "color": (0, 255, 0), "border_size": 2 }, "looked_aoi_name_color": (255, 255, 255), "looked_aoi_name_offset": (0, -10) } } class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads. """ def __init__(self, aoi_scene: AOIFeatures.AOIScene = None, aoi_matcher: GazeFeatures.AOIMatcher = None, aoi_scan_path: GazeFeatures.AOIScanPath = None, aoi_scan_path_analyzers: dict = None, draw_parameters: dict = None, **kwargs): """ Initialize ArLayer Parameters: aoi_scene: AOI scene description aoi_matcher: AOI matcher object aoi_scan_path: AOI scan path object aoi_scan_path_analyzers: dictionary of AOI scan path analyzers draw_parameters: default parameters passed to draw method """ # Init parent classes DataFeatures.SharedObject.__init__(self) DataFeatures.PipelineStepObject.__init__(self, **kwargs) # Init private attributes self.__aoi_scene = aoi_scene self.__aoi_matcher = aoi_matcher self.__aoi_scan_path = aoi_scan_path self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers self.__draw_parameters = draw_parameters self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() self.__looked_aoi_name = None self.__aoi_scan_path_analyzed = False # Cast aoi scene to its effective dimension if self.__aoi_scene.dimension == 2: self.__aoi_scene = AOI2DScene.AOI2DScene(self.__aoi_scene) elif self.__aoi_scene.dimension == 3: self.__aoi_scene = AOI3DScene.AOI3DScene(self.__aoi_scene) # Edit aoi_scan_path's expected aoi list by removing aoi with name equals to layer name if self.__aoi_scan_path is not None: expected_aoi = list(self.__aoi_scene.keys()) if self.name in expected_aoi: expected_aoi.remove(self.name) self.__aoi_scan_path.expected_aoi = expected_aoi # Edit pipeline step objects parent if self.__aoi_scene is not None: self.__aoi_scene.parent = self if self.__aoi_matcher is not None: self.__aoi_matcher.parent = self if self.__aoi_scan_path is not None: self.__aoi_scan_path.parent = self for name, analyzer in self.__aoi_scan_path_analyzers.items(): analyzer.parent = self @property def aoi_scene(self) -> AOIFeatures.AOIScene: """Get layer's aoi scene object.""" return self.__aoi_scene @aoi_scene.setter def aoi_scene(self, aoi_scene: AOIFeatures.AOIScene): """Set layer's aoi scene object.""" self.__aoi_scene = aoi_scene @property def aoi_matcher(self) -> GazeFeatures.AOIMatcher: """Get layer's aoi matcher object.""" return self.__aoi_matcher @property def aoi_scan_path(self) -> GazeFeatures.AOIScanPath: """Get layer's aoi scan path object.""" return self.__aoi_scan_path @property def aoi_scan_path_analyzers(self) -> dict: """Get layer's aoi scan analyzers dictionary.""" return self.__aoi_scan_path_analyzers @property def draw_parameters(self) -> dict: """Get layer's draw parameters dictionary.""" return self.__draw_parameters @property def last_looked_aoi_name(self) -> bool: """Get last looked aoi name.""" return self.__looked_aoi_name @property def analysis_available(self) -> bool: """Are aoi scan path analysis ready?""" return self.__aoi_scan_path_analyzed def analysis(self) -> Iterator[Union[str, dict]]: """Iterate over aoi scan path analysis. Returns iterator: analyzer module path, analysis dictionary """ assert(self.__aoi_scan_path_analyzed) for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis def as_dict(self) -> dict: """Export ArLayer properties as dictionary.""" return { **DataFeatures.PipelineStepObject.as_dict(self), "aoi_scene": self.__aoi_scene, "aoi_matcher": self.__aoi_matcher, "aoi_scan_path": self.__aoi_scan_path, "aoi_scan_path_analyzers": self.__aoi_scan_path_analyzers, "draw_parameters": self.__draw_parameters } @classmethod def from_dict(cls, layer_data: dict, working_directory: str = None) -> ArLayerType: """Load ArLayer attributes from dictionary. Parameters: layer_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Append working directory to the Python path if working_directory is not None: sys.path.append(working_directory) # Load aoi scene try: new_aoi_scene_value = layer_data.pop('aoi_scene') # str: relative path to file if type(new_aoi_scene_value) == str: filepath = os.path.join(working_directory, new_aoi_scene_value) file_format = filepath.split('.')[-1] # JSON file format for 2D or 3D dimension if file_format == 'json': new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath) # SVG file format for 2D dimension only if file_format == 'svg': new_aoi_scene = AOI2DScene.AOI2DScene.from_svg(filepath) # OBJ file format for 3D dimension only elif file_format == 'obj': new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath) # dict: else: new_aoi_scene = AOIFeatures.AOIScene.from_dict(new_aoi_scene_value) except KeyError: pass # Add AOI 2D Scene by default new_aoi_scene = AOI2DScene.AOI2DScene() # Load aoi matcher try: aoi_matcher_value = layer_data.pop('aoi_matcher') aoi_matcher_module_path, aoi_matcher_parameters = aoi_matcher_value.popitem() # Prepend argaze.GazeAnalysis path when a single name is provided if len(aoi_matcher_module_path.split('.')) == 1: aoi_matcher_module_path = f'argaze.GazeAnalysis.{aoi_matcher_module_path}' aoi_matcher_module = importlib.import_module(aoi_matcher_module_path) new_aoi_matcher = aoi_matcher_module.AOIMatcher(**aoi_matcher_parameters) except KeyError: new_aoi_matcher = None # Load AOI scan path try: new_aoi_scan_path_data = layer_data.pop('aoi_scan_path') new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) except KeyError: new_aoi_scan_path_data = {} new_aoi_scan_path = None # Load AOI scan path analyzers new_aoi_scan_path_analyzers = {} try: new_aoi_scan_path_analyzers_value = layer_data.pop('aoi_scan_path_analyzers') for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items(): # Prepend argaze.GazeAnalysis path when a single name is provided if len(aoi_scan_path_analyzer_module_path.split('.')) == 1: aoi_scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_module_path}' aoi_scan_path_analyzer_module = importlib.import_module(aoi_scan_path_analyzer_module_path) # Check aoi scan path analyzer parameters type members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of argaze.GazeAnalysis module parameter_module_path = parameter_type.__module__.split('.') # Check if parameter is part of a package if len(parameter_type.__module__.split('.')) > 1: # Try get existing analyzer instance to append as parameter try: aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_type.__module__] except KeyError: raise LoadingFailed(f'{aoi_scan_path_analyzer_module_path} aoi scan path analyzer loading fails because {parameter_type.__module__} aoi scan path analyzer is missing.') aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters) new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer # Force AOI scan path creation if len(new_aoi_scan_path_analyzers) > 0 and new_aoi_scan_path == None: new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) except KeyError: pass # Load image parameters try: new_layer_draw_parameters = layer_data.pop('draw_parameters') except KeyError: new_layer_draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS # Load temporary pipeline step object from layer_data then export it as dict temp_pipeline_step_object_data = DataFeatures.PipelineStepObject.from_dict(layer_data, working_directory).as_dict() # Create layer return ArLayer( \ new_aoi_scene, \ new_aoi_matcher, \ new_aoi_scan_path, \ new_aoi_scan_path_analyzers, \ new_layer_draw_parameters, \ **temp_pipeline_step_object_data \ ) @classmethod def from_json(self, json_filepath: str) -> ArLayerType: """ Load attributes from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: layer_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArLayer.from_dict(layer_data, working_directory) @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): """ Project timestamped gaze movement into layer. !!! warning Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute. Parameters: timestamp: method call timestamp (unit does'nt matter) gaze_movement: gaze movement to project """ # Use layer lock feature with self._lock: # Update current gaze movement self.__gaze_movement = gaze_movement # No looked aoi by default self.__looked_aoi_name = None # Reset aoi scan path analyzed state self.__aoi_scan_path_analyzed = False if self.__aoi_matcher is not None: # Update looked aoi thanks to aoi matcher # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement) # Valid and finished gaze movement has been identified if gaze_movement.valid and gaze_movement.finished: if GazeFeatures.is_fixation(gaze_movement): # Append fixation to aoi scan path if self.__aoi_scan_path is not None and self.__looked_aoi_name is not None: aoi_scan_step = self.__aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name) # Is there a new step? if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1: # Analyze aoi scan path for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): aoi_scan_path_analyzer.analyze(timestamp, self.__aoi_scan_path) # Update aoi scan path analyzed state self.__aoi_scan_path_analyzed = True elif GazeFeatures.is_saccade(gaze_movement): # Append saccade to aoi scan path if self.__aoi_scan_path is not None: self.__aoi_scan_path.append_saccade(timestamp, gaze_movement) def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ Draw into image. Parameters: draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn) draw_aoi_matching: AOIMatcher.draw parameters (which depends of the loaded aoi matcher module, if None, no aoi matching is drawn) """ # Use draw_parameters attribute if no parameters if draw_aoi_scene is None and draw_aoi_matching is None: return self.draw(image, **self.__draw_parameters) # Use layer lock feature with self._lock: # Draw aoi if required if draw_aoi_scene is not None: self.__aoi_scene.draw(image, **draw_aoi_scene) # Draw aoi matching if required if draw_aoi_matching is not None and self.__aoi_matcher is not None: self.__aoi_matcher.draw(image, self.__aoi_scene, **draw_aoi_matching) # Define default ArFrame image parameters DEFAULT_ARFRAME_IMAGE_PARAMETERS = { "background_weight": 1., "heatmap_weight": 0.5, "draw_scan_path": { "draw_fixations": { "deviation_circle_color": (255, 255, 255), "duration_border_color": (127, 127, 127), "duration_factor": 1e-2 }, "draw_saccades": { "line_color": (255, 255, 255) }, "deepness": 0 }, "draw_gaze_positions": { "color": (0, 255, 255), "size": 2 } } class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads """ def __init__(self, size: tuple[int] = (1, 1), gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = None, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = None, filter_in_progress_identification: bool = True, scan_path: GazeFeatures.ScanPath = None, scan_path_analyzers: dict = None, background: numpy.array = numpy.array([]), heatmap: AOIFeatures.Heatmap = None, layers: dict = None, image_parameters: dict = DEFAULT_ARFRAME_IMAGE_PARAMETERS, **kwargs): """ Initialize ArFrame Parameters: size: defines the dimension of the rectangular area where gaze positions are projected gaze_position_calibrator: gaze position calibration algoritm gaze_movement_identifier: gaze movement identification algorithm filter_in_progress_identification: ignore in progress gaze movement identification scan_path: scan path object scan_path_analyzers: dictionary of scan path analyzers background: picture to draw behind heatmap: heatmap object layers: dictionary of AOI layers image_parameters: default parameters passed to image method """ # Init parent classes DataFeatures.SharedObject.__init__(self) DataFeatures.PipelineStepObject.__init__(self, **kwargs) # Init private attributes self.__size = size self.__gaze_position_calibrator = gaze_position_calibrator self.__gaze_movement_identifier = gaze_movement_identifier self.__filter_in_progress_identification = filter_in_progress_identification self.__scan_path = scan_path self.__scan_path_analyzers = scan_path_analyzers self.__background = background self.__heatmap = heatmap self.__layers = layers self.__image_parameters = image_parameters self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() self.__scan_path_analyzed = False # Edit pipeline step objects parent if self.__gaze_position_calibrator is not None: self.__gaze_position_calibrator.parent = self if self.__gaze_movement_identifier is not None: self.__gaze_movement_identifier.parent = self if self.__scan_path is not None: self.__scan_path.parent = self for name, analyzer in self.__scan_path_analyzers.items(): analyzer.parent = self if self.__heatmap is not None: self.__heatmap.parent = self for name, layer in self.__layers.items(): layer.parent = self @property def size(self) -> tuple[int]: """Get frame's size.""" return self.__size @property def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator: """Get frame's gaze position calibrator object.""" return self.__gaze_position_calibrator @property def gaze_movement_identifier(self) -> GazeFeatures.GazeMovementIdentifier: """Get frame's gaze movement identifier object.""" return self.__gaze_movement_identifier @property def filter_in_progress_identification(self) -> bool: """Is frame filtering in progress identification?""" return self.__filter_in_progress_identification @property def scan_path(self) -> GazeFeatures.ScanPath: """Get frame's scan path object.""" return self.__scan_path @property def scan_path_analyzers(self) -> dict: """Get frame's scan path analyzers dictionary.""" return self.__scan_path_analyzers @property def background(self) -> numpy.array: """Get frame's background matrix.""" return self.__background @background.setter def background(self, image: numpy.array): """Set frame's background matrix.""" self.__background = image @property def heatmap(self) -> AOIFeatures.Heatmap: """Get frame's heatmap object.""" return self.__heatmap @property def layers(self) -> dict: """Get frame's layers dictionary.""" return self.__layers @property def image_parameters(self) -> dict: """Get frame's image parameters dictionary.""" return self.__image_parameters @property def last_gaze_position(self) -> object: """Get last calibrated gaze position""" return self.__calibrated_gaze_position @property def last_gaze_movement(self) -> object: """Get last identified gaze movement""" return self.__identified_gaze_movement @property def analysis_available(self) -> bool: """Are scan path analysis ready?""" return self.__scan_path_analyzed def analysis(self) -> Iterator[Union[str, dict]]: """Get scan path analysis. Returns iterator: analyzer module path, analysis dictionary """ assert(self.__scan_path_analyzed) for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis def as_dict(self) -> dict: """Export ArFrame attributes as dictionary. Returns: frame_data: dictionary with frame attributes values. """ d = { **DataFeatures.PipelineStepObject.as_dict(self), "size": self.__size, "gaze_position_calibrator": self.__gaze_position_calibrator, "gaze_movement_identifier": self.__gaze_movement_identifier, "filter_in_progress_identification": self.__filter_in_progress_identification, "scan_path": self.__scan_path, "scan_path_analyzers": self.__scan_path_analyzers, "background": self.__background, "heatmap": self.__heatmap, "layers": self.__layers, "image_parameters": self.__image_parameters } return d @classmethod def from_dict(cls, frame_data: dict, working_directory: str = None) -> ArFrameType: """Load ArFrame attributes from dictionary. Parameters: frame_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Append working directory to the Python path if working_directory is not None: sys.path.append(working_directory) # Load size try: new_frame_size = frame_data.pop('size') except KeyError: new_frame_size = (0, 0) # Load gaze position calibrator try: gaze_position_calibrator_value = frame_data.pop('gaze_position_calibrator') # str: relative path to file if type(gaze_position_calibrator_value) == str: filepath = os.path.join(working_directory, gaze_position_calibrator_value) file_format = filepath.split('.')[-1] # JSON file format if file_format == 'json': new_gaze_position_calibrator = GazeFeatures.GazePositionCalibrator.from_json(filepath) # dict: else: new_gaze_position_calibrator = GazeFeatures.GazePositionCalibrator.from_dict(gaze_position_calibrator_value) except KeyError: new_gaze_position_calibrator = None # Load gaze movement identifier try: gaze_movement_identifier_value = frame_data.pop('gaze_movement_identifier') gaze_movement_identifier_module_path, gaze_movement_identifier_parameters = gaze_movement_identifier_value.popitem() # Prepend argaze.GazeAnalysis path when a single name is provided if len(gaze_movement_identifier_module_path.split('.')) == 1: gaze_movement_identifier_module_path = f'argaze.GazeAnalysis.{gaze_movement_identifier_module_path}' gaze_movement_identifier_module = importlib.import_module(gaze_movement_identifier_module_path) new_gaze_movement_identifier = gaze_movement_identifier_module.GazeMovementIdentifier(**gaze_movement_identifier_parameters) except KeyError: new_gaze_movement_identifier = None # Current fixation matching try: filter_in_progress_identification = frame_data.pop('filter_in_progress_identification') except KeyError: filter_in_progress_identification = True # Load scan path try: new_scan_path_data = frame_data.pop('scan_path') new_scan_path = GazeFeatures.ScanPath(**new_scan_path_data) except KeyError: new_scan_path_data = {} new_scan_path = None # Load scan path analyzers new_scan_path_analyzers = {} try: new_scan_path_analyzers_value = frame_data.pop('scan_path_analyzers') for scan_path_analyzer_module_path, scan_path_analyzer_parameters in new_scan_path_analyzers_value.items(): # Prepend argaze.GazeAnalysis path when a single name is provided if len(scan_path_analyzer_module_path.split('.')) == 1: scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{scan_path_analyzer_module_path}' scan_path_analyzer_module = importlib.import_module(scan_path_analyzer_module_path) # Check scan path analyzer parameters type members = getmembers(scan_path_analyzer_module.ScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of a package if len(parameter_type.__module__.split('.')) > 1: # Try get existing analyzer instance to append as parameter try: scan_path_analyzer_parameters[parameter] = new_scan_path_analyzers[parameter_type.__module__] except KeyError: raise LoadingFailed(f'{scan_path_analyzer_module_path} scan path analyzer loading fails because {parameter_type.__module__} scan path analyzer is missing.') scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters) new_scan_path_analyzers[scan_path_analyzer_module_path] = scan_path_analyzer # Force scan path creation if len(new_scan_path_analyzers) > 0 and new_scan_path == None: new_scan_path = GazeFeatures.ScanPath(**new_scan_path_data) except KeyError: pass # Load background image try: new_frame_background_value = frame_data.pop('background') new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC) except KeyError: new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8) # Load heatmap try: new_heatmap_data = frame_data.pop('heatmap') # Default heatmap size equals frame size if 'size' not in new_heatmap_data.keys(): new_heatmap_data['size'] = new_frame_size new_heatmap = AOIFeatures.Heatmap(**new_heatmap_data) except KeyError: new_heatmap_data = {} new_heatmap = None # Load layers new_layers = {} try: for layer_name, layer_data in frame_data.pop('layers').items(): # Append name layer_data['name'] = layer_name # Create layer new_layer = ArLayer.from_dict(layer_data, working_directory) # Append new layer new_layers[layer_name] = new_layer except KeyError: pass # Load image parameters try: new_frame_image_parameters = frame_data.pop('image_parameters') except KeyError: new_frame_image_parameters = DEFAULT_ARFRAME_IMAGE_PARAMETERS # Load temporary pipeline step object from frame_data then export it as dict temp_pipeline_step_object_data = DataFeatures.PipelineStepObject.from_dict(frame_data, working_directory).as_dict() # Create frame return ArFrame( \ new_frame_size, \ new_gaze_position_calibrator, \ new_gaze_movement_identifier, \ filter_in_progress_identification, \ new_scan_path, \ new_scan_path_analyzers, \ new_frame_background, \ new_heatmap, \ new_layers, \ new_frame_image_parameters, \ **temp_pipeline_step_object_data \ ) @classmethod def from_json(self, json_filepath: str) -> ArFrameType: """ Load attributes from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: frame_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArFrame.from_dict(frame_data, working_directory) @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]: """ Project timestamped gaze position into frame. !!! warning Be aware that gaze positions are in the same range of value than size attribute. Parameters: timestamp: method call timestamp (unit does'nt matter) gaze_position: gaze position to project """ # Use frame lock feature with self._lock: # No gaze movement identified by default self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() # Reset scan path analyzed state self.__scan_path_analyzed = False # Apply gaze position calibration if self.__gaze_position_calibrator is not None: self.__calibrated_gaze_position = self.__gaze_position_calibrator.apply(gaze_position) # Or update gaze position at least else: self.__calibrated_gaze_position = gaze_position # Identify gaze movement if self.__gaze_movement_identifier is not None: # Identify finished gaze movement self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished: if GazeFeatures.is_fixation(self.__identified_gaze_movement): # Append fixation to scan path if self.__scan_path is not None: self.__scan_path.append_fixation(timestamp, self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): # Append saccade to scan path if self.__scan_path is not None: scan_step = self.__scan_path.append_saccade(timestamp, self.__identified_gaze_movement) # Is there a new step? if scan_step and len(self.__scan_path) > 1: # Analyze aoi scan path for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items(): scan_path_analyzer.analyze(timestamp, self.__scan_path) # Update scan path analyzed state self.__scan_path_analyzed = True # No valid finished gaze movement: optionnaly stop in progress identification filtering elif self.__gaze_movement_identifier is not None and not self.__filter_in_progress_identification: self.__identified_gaze_movement = self.__gaze_movement_identifier.current_gaze_movement # Update heatmap if self.__heatmap is not None: # Scale gaze position value scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image self.__heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale) # Look layers with valid identified gaze movement # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally for layer_name, layer in self.__layers.items(): layer.look(timestamp, self.__identified_gaze_movement) def __image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. Parameters: background_weight: weight of background overlay heatmap_weight: weight of heatmap overlay draw_gaze_position_calibrator: [GazeFeatures.GazePositionCalibrator.draw](argaze.md/#argaze.GazeFeatures.GazePositionCalibrator.draw) parameters (if None, nothing is drawn) draw_scan_path: [GazeFeatures.ScanPath.draw](argaze.md/#argaze.GazeFeatures.ScanPath.draw) parameters (if None, no scan path is drawn) draw_layers: dictionary of [ArLayer.draw](argaze.md/#argaze.ArFeatures.ArLayer.draw) parameters per layer (if None, no layer is drawn) draw_gaze_positions: [GazeFeatures.GazePosition.draw](argaze.md/#argaze.GazeFeatures.GazePosition.draw) parameters (if None, no gaze position is drawn) draw_fixations: [GazeFeatures.Fixation.draw](argaze.md/#argaze.GazeFeatures.Fixation.draw) parameters (if None, no fixation is drawn) draw_saccades: [GazeFeatures.Saccade.draw](argaze.md/#argaze.GazeFeatures.Saccade.draw) parameters (if None, no saccade is drawn) """ # Use frame lock feature with self._lock: # Draw background only if background_weight is not None and (heatmap_weight is None or self.__heatmap is None): image = self.__background.copy() # Draw mix background and heatmap if required elif background_weight is not None and heatmap_weight is not None and self.__heatmap: background_image = self.__background.copy() heatmap_image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR) image = cv2.addWeighted(heatmap_image, heatmap_weight, background_image, background_weight, 0) # Draw heatmap only elif background_weight is None and heatmap_weight is not None and self.__heatmap: image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR) # Draw black image else: image = numpy.full((self.__size[1], self.__size[0], 3), 0).astype(numpy.uint8) # Draw gaze position calibrator if draw_gaze_position_calibrator is not None: self.__gaze_position_calibrator.draw(image, size=self.__size, **draw_gaze_position_calibrator) # Draw scan path if required if draw_scan_path is not None and self.__scan_path is not None: self.__scan_path.draw(image, **draw_scan_path) # Draw current fixation if required if draw_fixations is not None and self.__gaze_movement_identifier is not None: self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) # Draw current saccade if required if draw_saccades is not None and self.__gaze_movement_identifier is not None: self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) # Draw layers if required if draw_layers is not None: for layer_name, draw_layer in draw_layers.items(): self.__layers[layer_name].draw(image, **draw_layer) # Draw current gaze position if required if draw_gaze_positions is not None: self.__calibrated_gaze_position.draw(image, **draw_gaze_positions) return image def image(self, **kwargs: dict) -> numpy.array: """ Get frame image. Parameters: kwargs: ArFrame.__image parameters """ # Use image_parameters attribute if no kwargs if kwargs: return self.__image(**kwargs) return self.__image(**self.__image_parameters) class ArScene(DataFeatures.PipelineStepObject): """ Define abstract Augmented Reality scene with ArLayers and ArFrames inside. """ def __init__(self, layers: dict = None, frames: dict = None, angle_tolerance: float = 0., distance_tolerance: float = 0., **kwargs): """ Initialize ArScene Parameters: layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. """ # Init parent classes super().__init__(**kwargs) # Init private attributes self.__layers = layers self.__frames = frames self.__angle_tolerance = angle_tolerance self.__distance_tolerance = distance_tolerance # Edit pipeline step objects parent for name, layer in self.__layers.items(): layer.parent = self for name, frame in self.__frames.items(): frame.parent = self @property def layers(self) -> dict: """Get scene's layers dictionary.""" return self.__layers @property def frames(self) -> dict: """Get scene's frames dictionary.""" return self.__frames @property def angle_tolerance(self) -> float: """Get scene's angle tolerance.""" return self.__angle_tolerance @angle_tolerance.setter def angle_tolerance(self, value: float): """Set scene's angle tolerance.""" self.__angle_tolerance = value @property def distance_tolerance(self) -> float: """Get scene's distance tolerance.""" return self.__distance_tolerance @distance_tolerance.setter def distance_tolerance(self, value: float): """Set scene's distance tolerance.""" self.__distance_tolerance = value def as_dict(self) -> dict: """Export ArScene properties as dictionary.""" return { **DataFeatures.PipelineStepObject.as_dict(self), "layers": self.__layers, "frames": self.__frames, "angle_tolerance": self.__angle_tolerance, "distance_tolerance": self.__distance_tolerance } @classmethod def from_dict(cls, scene_data: dict, working_directory: str = None) -> ArSceneType: """ Load ArScene attributes from dictionary. Parameters: scene_data: dictionary working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load layers new_layers = {} try: for layer_name, layer_data in scene_data.pop('layers').items(): # Append name layer_data['name'] = layer_name # Create layer new_layer = ArLayer.from_dict(layer_data, working_directory) # Append new layer new_layers[layer_name] = new_layer except KeyError: pass # Load frames new_frames = {} try: for frame_name, frame_data in scene_data.pop('frames').items(): # str: relative path to file if type(frame_data) == str: filepath = os.path.join(working_directory, frame_data) file_format = filepath.split('.')[-1] # JSON file format for 2D or 3D dimension if file_format == 'json': new_frame = ArFrame.from_json(filepath) # dict: else: # Append name frame_data['name'] = frame_name new_frame = ArFrame.from_dict(frame_data, working_directory) # Look for a scene layer with an AOI named like the frame for scene_layer_name, scene_layer in new_layers.items(): try: frame_3d = scene_layer.aoi_scene[frame_name] # Check that the frame have a layer named like this scene layer aoi_2d_scene = new_frame.layers[scene_layer_name].aoi_scene # Transform 2D frame layer AOI into 3D scene layer AOI # Then, add them to scene layer scene_layer.aoi_scene |= aoi_2d_scene.dimensionalize(frame_3d, new_frame.size) '''DEPRECATED: but maybe still usefull? # Project and reframe each layers into corresponding frame layers for frame_layer_name, frame_layer in new_frame.layers.items(): try: layer = new_layers[frame_layer_name] layer_aoi_scene_projection = layer.aoi_scene.orthogonal_projection aoi_frame_projection = layer_aoi_scene_projection[frame_name] frame_layer.aoi_scene = layer_aoi_scene_projection.reframe(aoi_frame_projection, new_frame.size) if frame_layer.aoi_scan_path is not None: # Edit expected AOI list by removing AOI with name equals to frame layer name expected_aoi = list(layer.aoi_scene.keys()) if frame_layer_name in expected_aoi: expected_aoi.remove(frame_layer_name) frame_layer.aoi_scan_path.expected_aoi = expected_aoi except KeyError: continue ''' except KeyError as e: print(e) # Append new frame new_frames[frame_name] = new_frame except KeyError: pass # Load temporary pipeline step object from scene_data then export it as dict temp_pipeline_step_object_data = DataFeatures.PipelineStepObject.from_dict(scene_data, working_directory).as_dict() # Create scene return ArScene( \ new_layers, \ new_frames, \ **scene_data, \ **temp_pipeline_step_object_data \ ) def estimate_pose(self, detected_features: any) -> Tuple[numpy.array, numpy.array]: """Define abstract estimate scene pose method. Parameters: detected_features: any features detected by parent ArCamera that will help in scene pose estimation. Returns: tvec: scene translation vector rvec: scene rotation matrix """ raise NotImplementedError('estimate_pose() method not implemented') def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]: """Project layers according estimated pose and optional field of view clipping angles. Parameters: tvec: translation vector rvec: rotation vector visual_hfov: horizontal field of view clipping angle visual_vfov: vertical field of view clipping angle Returns: iterator: name of projected layer and AOI2DScene projection """ for name, layer in self.__layers.items(): # Clip AOI out of the visual horizontal field of view (optional) # TODO: use HFOV and VFOV and don't use vision_cone method if visual_hfov > 0: # Transform layer aoi scene into camera referential aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec) # Get aoi inside vision cone field cone_vision_height_cm = 200 # cm cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) # Keep only aoi inside vision cone field aoi_scene_copy = layer.aoi_scene.copy(exclude=aoi_outside.keys()) else: aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) def draw(self, image: numpy.array, **kwargs: dict): """ Draw scene into image. Parameters: image: where to draw """ raise NotImplementedError('draw() method not implemented') class ArCamera(ArFrame): """ Define abstract Augmented Reality camera as ArFrame with ArScenes inside. """ def __init__(self, scenes: dict = None, visual_hfov: float = 0., visual_vfov: float = 0., **kwargs): """ Initialize ArCamera Parameters: scenes: all scenes to project into camera frame visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV). visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV). """ # Init parent class super().__init__(**kwargs) # Init private attributes self.__scenes = scenes self.__visual_hfov = visual_hfov self.__visual_vfov = visual_vfov # Edit pipeline step objects parent for name, scene in self.__scenes.items(): scene.parent = self # Setup expected aoi of each layer aoi scan path with the aoi of corresponding scene layer # Edit aoi matcher exclude attribute to ignore frame aoi for layer_name, layer in self.layers.items(): if layer.aoi_scan_path is not None: expected_aoi_list = [] exclude_aoi_list = [] for scene_name, scene in self.__scenes.items(): # Append scene layer aoi to corresponding expected camera layer aoi try: scene_layer = scene.layers[layer_name] expected_aoi_list.extend(list(scene_layer.aoi_scene.keys())) except KeyError: continue # Remove scene frame from expected camera layer aoi # Exclude scene frame from camera layer aoi matching for frame_name, frame in scene.frames.items(): try: expected_aoi_list.remove(frame_name) exclude_aoi_list.append(frame_name) except ValueError: continue layer.aoi_scan_path.expected_aoi = expected_aoi_list layer.aoi_matcher.exclude = exclude_aoi_list @property def scenes(self) -> dict: """Get camera's scenes dictionary.""" return self.__scenes @property def visual_hfov(self) -> float: """Get camera's visual horizontal field of view.""" return self.__visual_hfov @visual_hfov.setter def visual_hfov(self, value: float): """Set camera's visual horizontal field of view.""" self.__visual_hfov = value @property def visual_vfov(self) -> float: """Get camera's visual vertical field of view.""" return self.__visual_vfov @visual_vfov.setter def visual_vfov(self, value: float): """Set camera's visual vertical field of view.""" self.__visual_vfov = value def scene_frames(self) -> Iterator[ArFrame]: """Iterate over all scenes frames""" # For each scene for scene_name, scene in self.__scenes.items(): # For each scene frame for name, scene_frame in scene.frames.items(): yield scene_frame def as_dict(self) -> dict: """Export ArCamera properties as dictionary.""" return { **ArFrame.as_dict(self), "scenes": self.__scenes, "visual_hfov": self.__visual_hfov, "visual_vfov": self.__visual_vfov } @DataFeatures.PipelineStepMethod def watch(self, timestamp: int|float, image: numpy.array): """Detect AR features from image and project scenes into camera frame. Parameters: timestamp: method call timestamp (unit does'nt matter) image: image where to extract AR features """ raise NotImplementedError('watch() method not implemented') @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition): """Project timestamped gaze position into each scene frames. !!! warning watch method needs to be called first. Parameters: timestamp: method call timestamp (unit does'nt matter) gaze_position: gaze position to project """ # Project gaze position into camera frame super().look(timestamp, gaze_position) # Use camera frame lock feature with self._lock: # Project gaze position into each scene frames if possible for scene_frame in self.scene_frames(): # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? for camera_layer_name, camera_layer in self.layers.items(): try: aoi_2d = camera_layer.aoi_scene[scene_frame.name] # TODO?: Should we prefer to use camera frame AOIMatcher object? if aoi_2d.contains_point(gaze_position.value): inner_x, inner_y = aoi_2d.clockwise().inner_axis(*gaze_position.value) # QUESTION: How to project gaze precision? inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) # Project inner gaze position into scene frame scene_frame.look(timestamp, inner_gaze_position * scene_frame.size) # Ignore missing aoi in camera frame layer projection except KeyError as e: pass @DataFeatures.PipelineStepMethod def map(self, timestamp: int|float): """Project camera frame background into scene frames background. !!! warning watch method needs to be called first. Parameters: timestamp: method call timestamp (unit does'nt matter) """ # Use camera frame lock feature with self._lock: # Project camera frame background into each scene frame if possible for frame in self.scene_frames(): # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? for camera_layer_name, camera_layer in self.__layers.items(): try: aoi_2d = camera_layer.aoi_scene[frame.name] # Apply perspective transform algorithm to fill aoi frame background width, height = frame.size destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) frame.background = cv2.warpPerspective(self.__background, mapping, (width, height)) # Ignore missing frame projection except KeyError: pass