From 6a47f96e33bcbe8ee3caf58767ae1863422676fd Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 10 Aug 2023 14:54:04 +0200 Subject: Working on a new architecture based on new ArLayer class. --- src/argaze/ArFeatures.py | 1086 +++++++++++++------- src/argaze/AreaOfInterest/AOI2DScene.py | 12 +- src/argaze/AreaOfInterest/AOIFeatures.py | 36 + src/argaze/utils/demo_ar_features_run.py | 12 +- .../demo_environment/demo_ar_features_setup.json | 8 +- .../demo_environment/demo_gaze_features_setup.json | 86 +- src/argaze/utils/demo_gaze_features_run.py | 68 +- 7 files changed, 825 insertions(+), 483 deletions(-) diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 0b53034..0022c80 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -24,13 +24,16 @@ from argaze.GazeAnalysis import * import numpy import cv2 -ArEnvironmentType = TypeVar('ArEnvironment', bound="ArEnvironment") +ArLayerType = TypeVar('ArLayer', bound="ArLayer") +# Type definition for type annotation convenience + +ArFrameType = TypeVar('ArFrame', bound="ArFrame") # Type definition for type annotation convenience ArSceneType = TypeVar('ArScene', bound="ArScene") # Type definition for type annotation convenience -ArFrameType = TypeVar('ArFrame', bound="ArFrame") +ArEnvironmentType = TypeVar('ArEnvironment', bound="ArEnvironment") # Type definition for type annotation convenience class PoseEstimationFailed(Exception): @@ -53,9 +56,9 @@ class SceneProjectionFailed(Exception): super().__init__(message) -class JSONLoadingFailed(Exception): +class LoadingFailed(Exception): """ - Exception raised when JSON loading fails. + Exception raised when attributes loading fails. """ def __init__(self, message): @@ -63,50 +66,497 @@ class JSONLoadingFailed(Exception): super().__init__(message) @dataclass +class ArLayer(): + """ + Defines a space where to make matching of gaze movements and AOIs and inside which those matchings need to be analyzed. + + Parameters: + name: name of the layer + aoi_scene: AOI scene description + looked_aoi_covering_threshold: + aoi_scan_path: AOI scan path object + aoi_scan_path_analyzers: dictionary of AOI scan path analyzers + """ + + name: str + aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene) + looked_aoi_covering_threshold: int = field(default=0) + aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) + aoi_scan_path_analyzers: dict = field(default_factory=dict) + + def __post_init__(self): + + # Define parent attribute: it will be setup by parent later + self.__parent = None + + # Init current gaze movement + self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() + + # Init looked aoi data + self.__init_looked_aoi_data() + + # Init lock to share looking data with multiples threads + self.__look_lock = threading.Lock() + + # Cast aoi scene to its effective dimension + if self.aoi_scene.dimension == 2: + + self.aoi_scene = AOI2DScene.AOI2DScene(self.aoi_scene) + + elif self.aoi_scene.dimension == 3: + + self.aoi_scene = AOI3DScene.AOI3DScene(self.aoi_scene) + + @classmethod + def from_dict(self, layer_data, working_directory: str = None) -> ArLayerType: + """Load attributes from dictionary. + + Parameters: + layer_data: dictionary with attributes to load + working_directory: folder path where to load files when a dictionary value is a relative filepath. + """ + + # Load name + try: + + new_layer_name = layer_data.pop('name') + + except KeyError: + + new_layer_name = None + + # Load optional aoi filter + try: + + aoi_exclude_list = layer_data.pop('aoi_exclude') + + except KeyError: + + aoi_exclude_list = [] + + # Load aoi scene + try: + + new_aoi_scene_value = layer_data.pop('aoi_scene') + + # str: relative path to file + if type(new_aoi_scene_value) == str: + + filepath = os.path.join(working_directory, new_aoi_scene_value) + file_format = filepath.split('.')[-1] + + # JSON file format for 2D or 3D dimension + if file_format == 'json': + + new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath).copy(exclude=aoi_exclude_list) + + # OBJ file format for 3D dimension only + elif file_format == 'obj': + + new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath).copy(exclude=aoi_exclude_list) + + # dict: + else: + + new_aoi_scene = AOIFeatures.AOIScene.from_dict(new_aoi_scene_value) + + except KeyError: + + new_aoi_scene = AOIFeatures.AOIScene() + + # Looked aoi validity threshold + try: + + looked_aoi_covering_threshold = layer_data.pop('looked_aoi_covering_threshold') + + except KeyError: + + looked_aoi_covering_threshold = 0 + + # Edit expected AOI list by removing AOI with name equals to layer name + expected_aois = list(new_aoi_scene.keys()) + expected_aois.remove(new_layer_name) + + # Load AOI scan path + try: + + new_aoi_scan_path_data = layer_data.pop('aoi_scan_path') + new_aoi_scan_path_data['expected_aois'] = expected_aois + new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) + + except KeyError: + + new_aoi_scan_path_data = {} + new_aoi_scan_path_data['expected_aois'] = expected_aois + new_aoi_scan_path = None + + # Load AOI scan path analyzers + new_aoi_scan_path_analyzers = {} + + try: + + new_aoi_scan_path_analyzers_value = layer_data.pop('aoi_scan_path_analyzers') + + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items(): + + # Prepend argaze.GazeAnalysis path when a single name is provided + if len(aoi_scan_path_analyzer_module_path.split('.')) == 1: + aoi_scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_module_path}' + + aoi_scan_path_analyzer_module = importlib.import_module(aoi_scan_path_analyzer_module_path) + + # Check aoi scan path analyzer parameters type + members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer) + + for member in members: + + if '__annotations__' in member: + + for parameter, parameter_type in member[1].items(): + + # Check if parameter is part of argaze.GazeAnalysis module + parameter_module_path = parameter_type.__module__.split('.') + + # Check if parameter is part of a package + if len(parameter_type.__module__.split('.')) > 1: + + # Try get existing analyzer instance to append as parameter + try: + + aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_type.__module__] + + except KeyError: + + raise LoadingFailed(f'{aoi_scan_path_analyzer_module_path} aoi scan path analyzer loading fails because {parameter_type.__module__} aoi scan path analyzer is missing.') + + aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters) + + new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer + + # Force AOI scan path creation + if len(new_aoi_scan_path_analyzers) > 0 and new_aoi_scan_path == None: + + new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) + + except KeyError: + + pass + + # Create layer + return ArLayer(new_layer_name, \ + new_aoi_scene, \ + looked_aoi_covering_threshold, \ + new_aoi_scan_path, \ + new_aoi_scan_path_analyzers \ + ) + + @classmethod + def from_json(self, json_filepath: str) -> ArLayerType: + """ + Load attributes from .json file. + + Parameters: + json_filepath: path to json file + """ + + with open(json_filepath) as configuration_file: + + layer_data = json.load(configuration_file) + working_directory = os.path.dirname(json_filepath) + + return ArLayer.from_dict(layer_data, working_directory) + + @property + def parent(self): + """Get parent instance""" + + return self.__parent + + @parent.setter + def parent(self, parent): + """Get parent instance""" + + self.__parent = parent + + @property + def looked_aoi(self) -> str: + """Get most likely looked aoi name for current fixation (e.g. the aoi with the highest covering mean value)""" + + return self.__looked_aoi + + @property + def looked_aoi_covering_mean(self) -> float: + """Get looked aoi covering mean for current fixation. + It represents the ratio of fixation deviation circle surface that used to cover the looked aoi.""" + + return self.__looked_aoi_covering_mean + + @property + def looked_aoi_covering(self) -> dict: + """Get all looked aois covering for current fixation.""" + + return self.__looked_aoi_covering + + def __init_looked_aoi_data(self): + """Init looked aoi data.""" + + self.__look_count = 0 + self.__looked_aoi = None + self.__looked_aoi_covering_mean = 0 + self.__looked_aoi_covering = {} + + def __update_looked_aoi_data(self, fixation) -> str: + """Update looked aoi data.""" + + self.__look_count += 1 + + max_covering = 0. + most_likely_looked_aoi = None + + for name, aoi in self.aoi_scene.items(): + + _, _, circle_ratio = aoi.circle_intersection(fixation.focus, fixation.deviation_max) + + if name != self.name and circle_ratio > 0: + + # Sum circle ratio to update aoi covering + try: + + self.__looked_aoi_covering[name] += circle_ratio + + except KeyError: + + self.__looked_aoi_covering[name] = circle_ratio + + # Update most likely looked aoi + if self.__looked_aoi_covering[name] > max_covering: + + most_likely_looked_aoi = name + max_covering = self.__looked_aoi_covering[name] + + # Update looked aoi + self.__looked_aoi = most_likely_looked_aoi + + # Update looked aoi covering mean + self.__looked_aoi_covering_mean = int(100 * max_covering / self.__look_count) / 100 + + return self.__looked_aoi + + def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> dict: + """ + Project timestamped gaze movement into layer. + + !!! warning + Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute. + + Parameters: + gaze_movement: gaze movement to project + + Returns: + looked_aoi: most likely looked aoi name + aoi_scan_path_analysis: aoi scan path analysis at each new scan step if aoi_scan_path is instanciated + exception: error catched during gaze movement processing + """ + + # Lock layer exploitation + self.__look_lock.acquire() + + # Update current gaze movement + self.__gaze_movement = gaze_movement + + # Init looked aoi + looked_aoi = None + + # Init aoi scan path analysis report + aoi_scan_path_analysis = {} + + # Assess pipeline execution times + execution_times = { + 'aoi_fixation_matcher': None, + 'aoi_scan_step_analyzers': {} + } + + # Catch any error + exception = None + + try: + + # Valid and finished gaze movement has been identified + if gaze_movement.valid and gaze_movement.finished: + + if GazeFeatures.is_fixation(gaze_movement): + + # Store aoi matching start date + matching_start = time.perf_counter() + + # Does the finished fixation match an aoi? + looked_aoi = self.__update_looked_aoi_data(gaze_movement) + + # Assess aoi matching time in ms + execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3 + + # Append fixation to aoi scan path + if self.aoi_scan_path != None and self.looked_aoi != None and self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold: + + aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, self.looked_aoi) + + # Is there a new step? + if aoi_scan_step and len(self.aoi_scan_path) > 1: + + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): + + # Store aoi scan path analysis start date + aoi_scan_path_analysis_start = time.perf_counter() + + # Analyze aoi scan path + aoi_scan_path_analyzer.analyze(self.aoi_scan_path) + + # Assess aoi scan step analysis time in ms + execution_times['aoi_scan_step_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_path_analysis_start) * 1e3 + + # Store analysis + aoi_scan_path_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis + + elif GazeFeatures.is_saccade(gaze_movement): + + # Reset looked aoi + self.__init_looked_aoi_data() + + # Append saccade to aoi scan path + if self.aoi_scan_path != None: + + self.aoi_scan_path.append_saccade(timestamp, gaze_movement) + + # Valid in progress fixation + elif gaze_movement.valid and not gaze_movement.finished: + + if GazeFeatures.is_fixation(gaze_movement): + + # Store aoi matching start date + matching_start = time.perf_counter() + + # Does the finished fixation match an aoi? + looked_aoi = self.__update_looked_aoi_data(gaze_movement) + + # Assess aoi matching time in ms + execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3 + + except Exception as e: + + print('Warning: the following error occurs in ArLayer.look method:', e) + + looked_aoi = None + aoi_scan_path_analysis = {} + exception = e + + # Unlock layer exploitation + self.__look_lock.release() + + # Sum all execution times + total_execution_time = 0 + + if execution_times['aoi_fixation_matcher']: + + total_execution_time += execution_times['aoi_fixation_matcher'] + + for _, aoi_scan_path_analysis_time in execution_times['aoi_scan_step_analyzers'].items(): + + total_execution_time += aoi_scan_path_analysis_time + + execution_times['total'] = total_execution_time + + # Return look data + return looked_aoi, aoi_scan_path_analysis, execution_times, exception + + def draw(self, image:numpy.array, aoi_color=(0, 0, 0)) -> Exception: + """ + Draw layer into image. + + Parameters: + image: where to draw + """ + + # Lock frame exploitation + self.__look_lock.acquire() + + # Catch any drawing error + exception = None + + try: + + # Draw aoi + self.aoi_scene.draw(image, color=aoi_color) + + # Draw current gaze movement + if self.__gaze_movement.valid: + + if GazeFeatures.is_fixation(self.__gaze_movement): + + self.__gaze_movement.draw(image, color=(0, 255, 255)) + self.__gaze_movement.draw_positions(image) + + # Draw looked aoi + if self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold: + + self.aoi_scene.draw_circlecast(image, self.__gaze_movement.focus, self.__gaze_movement.deviation_max, matching_aoi = [self.__looked_aoi], base_color=(0, 0, 0), matching_color=(255, 255, 255)) + + elif GazeFeatures.is_saccade(self.__gaze_movement): + + self.__gaze_movement.draw(image, color=(0, 255, 255)) + self.__gaze_movement.draw_positions(image) + + except Exception as e: + + # Store error to return it + exception = e + + # Unlock frame exploitation + self.__look_lock.release() + + # Return drawing error + return exception + +@dataclass class ArFrame(): """ - Defines rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. + Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. Parameters: name: name of the frame size: defines the dimension of the rectangular area where gaze positions are projected. - aoi_2d_scene: AOI 2D scene description background: image to draw behind gaze_movement_identifier: gaze movement identification algorithm - current_fixation_matching: enable AOI fixation matching even for in progress fixation - looked_aoi_covering_threshold: + filter_in_progress_fixation: ignore in progress fixation scan_path: scan path object - scan_path_analyzers: dictionary of scan path analysis to apply on scan path - aoi_scan_path: AOI scan path object - aoi_scan_path_analyzers: dictionary of scan path analysis to apply on AOI scan path + scan_path_analyzers: dictionary of scan path analyzers heatmap: heatmap object + aoi_layers: dictionary of AOI layers """ name: str size: tuple[int] = field(default=(1, 1)) - aoi_2d_scene: AOI2DScene.AOI2DScene = field(default_factory=AOI2DScene.AOI2DScene) background: numpy.array = field(default_factory=numpy.array) gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier) - current_fixation_matching: bool = field(default=False) - looked_aoi_covering_threshold: int = field(default=0) + filter_in_progress_fixation: bool = field(default=True) scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath) scan_path_analyzers: dict = field(default_factory=dict) - aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) - aoi_scan_path_analyzers: dict = field(default_factory=dict) heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap) + layers: dict = field(default_factory=dict) def __post_init__(self): # Define parent attribute: it will be setup by parent later self.__parent = None + # Setup layers parent attribute + for name, layer in self.layers.items(): + + layer.parent = self + # Init current gaze position self.__gaze_position = GazeFeatures.UnvalidGazePosition() - # Init looked aoi data - self.__init_looked_aoi_data() - - # Init lock to share looked data wit hmultiples threads + # Init lock to share looked data with multiples threads self.__look_lock = threading.Lock() @classmethod @@ -136,26 +586,6 @@ class ArFrame(): new_frame_size = (0, 0) - # Load aoi 2D scene - try: - - new_aoi_2d_scene_value = frame_data.pop('aoi_2d_scene') - - # str: relative path to .json file - if type(new_aoi_2d_scene_value) == str: - - json_filepath = os.path.join(working_directory, new_aoi_2d_scene_value) - new_aoi_2d_scene = AOI2DScene.AOI2DScene.from_json(obj_filepath) - - # dict: - else: - - new_aoi_2d_scene = AOI2DScene.AOI2DScene(new_aoi_2d_scene_value) - - except KeyError: - - new_aoi_2d_scene = AOI2DScene.AOI2DScene() - # Load background image try: @@ -185,23 +615,14 @@ class ArFrame(): new_gaze_movement_identifier = None - # Current fixation matching - try: - - current_fixation_matching = frame_data.pop('current_fixation_matching') - - except KeyError: - - current_fixation_matching = False - - # Looked aoi validity threshold + # Current fixation matching try: - looked_aoi_covering_threshold = frame_data.pop('looked_aoi_covering_threshold') + filter_in_progress_fixation = frame_data.pop('filter_in_progress_fixation') except KeyError: - looked_aoi_covering_threshold = 0 + filter_in_progress_fixation = False # Load scan path try: @@ -248,7 +669,7 @@ class ArFrame(): except KeyError: - raise JSONLoadingFailed(f'{scan_path_analyzer_module_path} scan path analyzer loading fails because {parameter_type.__module__} scan path analyzer is missing.') + raise LoadingFailed(f'{scan_path_analyzer_module_path} scan path analyzer loading fails because {parameter_type.__module__} scan path analyzer is missing.') scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters) @@ -263,105 +684,62 @@ class ArFrame(): pass - # Load AOI scan path - try: - - new_aoi_scan_path_data = frame_data.pop('aoi_scan_path') - new_aoi_scan_path_data['expected_aois'] = list(new_aoi_2d_scene.keys()) - new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) - - except KeyError: - - new_aoi_scan_path_data = {} - new_aoi_scan_path_data['expected_aois'] = list(new_aoi_2d_scene.keys()) - new_aoi_scan_path = None - - # Load AOI scan path analyzers - new_aoi_scan_path_analyzers = {} - + # Load heatmap try: - new_aoi_scan_path_analyzers_value = frame_data.pop('aoi_scan_path_analyzers') - - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items(): - - # Prepend argaze.GazeAnalysis path when a single name is provided - if len(aoi_scan_path_analyzer_module_path.split('.')) == 1: - aoi_scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_module_path}' - - aoi_scan_path_analyzer_module = importlib.import_module(aoi_scan_path_analyzer_module_path) - - # Check aoi scan path analyzer parameters type - members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer) + new_heatmap_data = frame_data.pop('heatmap') - for member in members: + # Default heatmap size equals frame size + if 'size' not in new_heatmap_data.keys(): - if '__annotations__' in member: + new_heatmap_data['size'] = new_frame_size - for parameter, parameter_type in member[1].items(): + new_heatmap = AOIFeatures.Heatmap(**new_heatmap_data) - # Check if parameter is part of argaze.GazeAnalysis module - parameter_module_path = parameter_type.__module__.split('.') + except KeyError: - # Check if parameter is part of a package - if len(parameter_type.__module__.split('.')) > 1: + new_heatmap_data = {} + new_heatmap = None - # Try get existing analyzer instance to append as parameter - try: + # Load layers + new_layers = {} - aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_type.__module__] + try: - except KeyError: + for layer_name, layer_data in frame_data.pop('layers').items(): - raise JSONLoadingFailed(f'{aoi_scan_path_analyzer_module_path} aoi scan path analyzer loading fails because {parameter_type.__module__} aoi scan path analyzer is missing.') + # Append name + layer_data['name'] = layer_name - aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters) + # Create layer + new_layer = ArLayer.from_dict(layer_data, working_directory) - new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer + # Setup layer + if new_layer.aoi_scene.dimension == 3: - # Force AOI scan path creation - if len(new_aoi_scan_path_analyzers) > 0 and new_aoi_scan_path == None: + new_layer.aoi_scene = new_layer.aoi_scene.orthogonal_projection * new_frame_size - new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) + # Append new layer + new_layers[layer_name] = new_layer except KeyError: pass - # Load heatmap - try: - - new_heatmap_data = frame_data.pop('heatmap') - - # Default heatmap size equals frame size - if 'size' not in new_heatmap_data.keys(): - - new_heatmap_data['size'] = new_frame_size - - new_heatmap = AOIFeatures.Heatmap(**new_heatmap_data) - - except KeyError: - - new_heatmap_data = {} - new_heatmap = None - # Create frame return ArFrame(new_frame_name, \ new_frame_size, \ - new_aoi_2d_scene, \ new_frame_background, \ new_gaze_movement_identifier, \ - current_fixation_matching, \ - looked_aoi_covering_threshold, \ + filter_in_progress_fixation, \ new_scan_path, \ new_scan_path_analyzers, \ - new_aoi_scan_path, \ - new_aoi_scan_path_analyzers, \ - new_heatmap \ + new_heatmap, \ + new_layers \ ) @classmethod - def from_json(self, json_filepath: str) -> ArEnvironmentType: + def from_json(self, json_filepath: str) -> ArFrameType: """ Load attributes from .json file. @@ -410,69 +788,7 @@ class ArFrame(): return image - @property - def looked_aoi(self) -> str: - """Get most likely looked aoi name for current fixation (e.g. the aoi with the highest covering mean value)""" - - return self.__looked_aoi - - @property - def looked_aoi_covering_mean(self) -> float: - """Get looked aoi covering mean for current fixation. - It represents the ratio of fixation deviation circle surface that used to cover the looked aoi.""" - - return self.__looked_aoi_covering_mean - - @property - def looked_aoi_covering(self) -> dict: - """Get all looked aois covering for current fixation.""" - - return self.__looked_aoi_covering - - def __init_looked_aoi_data(self): - """Init looked aoi data.""" - - self.__look_count = 0 - self.__looked_aoi = None - self.__looked_aoi_covering_mean = 0 - self.__looked_aoi_covering = {} - - def __update_looked_aoi_data(self, fixation): - """Update looked aoi data.""" - - self.__look_count += 1 - - max_covering = 0. - most_likely_looked_aoi = None - - for name, aoi in self.aoi_2d_scene.items(): - - _, _, circle_ratio = aoi.circle_intersection(fixation.focus, fixation.deviation_max) - - if name != self.name and circle_ratio > 0: - - # Sum circle ratio to update aoi covering - try: - - self.__looked_aoi_covering[name] += circle_ratio - - except KeyError: - - self.__looked_aoi_covering[name] = circle_ratio - - # Update most likely looked aoi - if self.__looked_aoi_covering[name] > max_covering: - - most_likely_looked_aoi = name - max_covering = self.__looked_aoi_covering[name] - - # Update looked aoi - self.__looked_aoi = most_likely_looked_aoi - - # Update looked aoi covering mean - self.__looked_aoi_covering_mean = int(100 * max_covering / self.__look_count) / 100 - - def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition(), identified_gaze_movement: GazeFeatures.GazeMovement = GazeFeatures.UnvalidGazeMovement()) -> Tuple[GazeFeatures.GazeMovement, dict, dict, dict]: + def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Tuple[GazeFeatures.GazeMovement, dict, dict, dict]: """ Project gaze position into frame. @@ -482,12 +798,11 @@ class ArFrame(): Parameters: timestamp: gaze_position: gaze position to project - identified_gaze_movement: pass identified gaze movement instead of timestamped gaze position to avoid double identification process. Returns: - identified_gaze_movement: identified gaze movement from incoming consecutive timestamped gaze positions if gaze_movement_identifier is instanciated. Current gaze movement if current_fixation_matching is True. + identified_gaze_movement: identified gaze movement from incoming consecutive timestamped gaze positions if gaze_movement_identifier is instanciated. Current gaze movement if filter_in_progress_fixation is True. scan_path_analysis: scan path analysis at each new scan step if scan_path is instanciated - aoi_scan_path_analysis: new scan step at each new aoi scan step if aoi_scan_path is instanciated + exception: error catched during gaze position processing """ @@ -498,19 +813,20 @@ class ArFrame(): self.__gaze_position = gaze_position # No gaze movement identified by default - temp_gaze_movement = GazeFeatures.UnvalidGazeMovement() + identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() # Init scan path analysis report scan_step_analysis = {} - aoi_scan_step_analysis = {} + + # Init layer analysis report + layer_analysis = {} # Assess pipeline execution times execution_times = { 'gaze_movement_identifier': None, - 'aoi_fixation_matcher': None, 'scan_step_analyzers':{}, - 'aoi_scan_step_analyzers': {}, - 'heatmap': None + 'heatmap': None, + 'layers': {} } # Catch any error @@ -525,66 +841,27 @@ class ArFrame(): identification_start = time.perf_counter() # Identify finished gaze movement - temp_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__gaze_position) + identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__gaze_position) # Assess movement identification time in ms execution_times['gaze_movement_identifier'] = (time.perf_counter() - identification_start) * 1e3 - # Use given identified gaze movement - else: - - temp_gaze_movement = identified_gaze_movement - # Valid and finished gaze movement has been identified - if temp_gaze_movement.valid and temp_gaze_movement.finished: - - if GazeFeatures.is_fixation(temp_gaze_movement): - - # Store aoi matching start date - matching_start = time.perf_counter() - - # Does the finished fixation match an aoi? - self.__update_looked_aoi_data(temp_gaze_movement) + if identified_gaze_movement.valid and identified_gaze_movement.finished: - # Assess aoi matching time in ms - execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3 + if GazeFeatures.is_fixation(identified_gaze_movement): # Append fixation to scan path if self.scan_path != None: - self.scan_path.append_fixation(timestamp, temp_gaze_movement) - - # Append fixation to aoi scan path - if self.aoi_scan_path != None and self.looked_aoi != None and self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold: - - aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, temp_gaze_movement, self.looked_aoi) - - # Is there a new step? - if aoi_scan_step and len(self.aoi_scan_path) > 1: - - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): - - # Store aoi scan step analysis start date - aoi_scan_step_analysis_start = time.perf_counter() - - # Analyze aoi scan path - aoi_scan_path_analyzer.analyze(self.aoi_scan_path) - - # Assess aoi scan step analysis time in ms - execution_times['aoi_scan_step_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_step_analysis_start) * 1e3 - - # Store analysis - aoi_scan_step_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis - - elif GazeFeatures.is_saccade(temp_gaze_movement): + self.scan_path.append_fixation(timestamp, identified_gaze_movement) - # Reset looked aoi - self.__init_looked_aoi_data() + elif GazeFeatures.is_saccade(identified_gaze_movement): # Append saccade to scan path if self.scan_path != None: - scan_step = self.scan_path.append_saccade(timestamp, temp_gaze_movement) + scan_step = self.scan_path.append_saccade(timestamp, identified_gaze_movement) # Is there a new step? if scan_step and len(self.scan_path) > 1: @@ -603,28 +880,14 @@ class ArFrame(): # Store analysis scan_step_analysis[scan_path_analyzer_module_path] = scan_path_analyzer.analysis - # Append saccade to aoi scan path - if self.aoi_scan_path != None: - - self.aoi_scan_path.append_saccade(timestamp, temp_gaze_movement) - - # No valid finished gaze movement: optionnaly check current fixation matching - elif self.gaze_movement_identifier and self.current_fixation_matching: + # No valid finished gaze movement: optionnaly stop in progress fixation filtering + elif self.gaze_movement_identifier and not self.filter_in_progress_fixation: current_fixation = self.gaze_movement_identifier.current_fixation if current_fixation.valid: - temp_gaze_movement = current_fixation - - # Store aoi matching start date - matching_start = time.perf_counter() - - # Does the current fixation match an aoi? - self.__update_looked_aoi_data(current_fixation) - - # Assess aoi matching time in ms - execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3 + identified_gaze_movement = current_fixation # Update heatmap if self.heatmap: @@ -640,14 +903,27 @@ class ArFrame(): # Assess heatmap time in ms execution_times['heatmap'] = (time.perf_counter() - heatmap_start) * 1e3 - + + # Look layers + for layer_name, layer in self.layers.items(): + + looked_aoi, aoi_scan_path_analysis, layer_execution_times, layer_exception = layer.look(timestamp, identified_gaze_movement) + + layer_analysis[layer_name] = aoi_scan_path_analysis + + execution_times['layers'][layer_name] = layer_execution_times + + if layer_exception: + + raise(layer_exception) + except Exception as e: print('Warning: the following error occurs in ArFrame.look method:', e) - returned_fixation = GazeFeatures.UnvalidGazeMovement() + identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() scan_step_analysis = {} - aoi_scan_step_analysis = {} + layer_analysis = {} exception = e # Unlock frame exploitation @@ -660,26 +936,22 @@ class ArFrame(): total_execution_time += execution_times['gaze_movement_identifier'] - if execution_times['aoi_fixation_matcher']: - - total_execution_time += execution_times['aoi_fixation_matcher'] - for _, scan_step_analysis_time in execution_times['scan_step_analyzers'].items(): total_execution_time += scan_step_analysis_time - for _, aoi_scan_step_analysis_time in execution_times['aoi_scan_step_analyzers'].items(): - - total_execution_time += aoi_scan_step_analysis_time - if execution_times['heatmap']: total_execution_time += execution_times['heatmap'] + for _, layer_execution_times in execution_times['layers'].items(): + + total_execution_time += layer_execution_times['total'] + execution_times['total'] = total_execution_time # Return look data - return temp_gaze_movement, scan_step_analysis, aoi_scan_step_analysis, execution_times, exception + return identified_gaze_movement, scan_step_analysis, layer_analysis, execution_times, exception def draw(self, image:numpy.array, aoi_color=(0, 0, 0)) -> Exception: """ @@ -697,32 +969,14 @@ class ArFrame(): try: - # Draw aoi - self.aoi_2d_scene.draw(image, color=aoi_color) + # Draw layers + for layer_name, layer in self.layers.items(): + + exception = layer.draw(image, aoi_color) # Draw current gaze position self.__gaze_position.draw(image, color=(255, 255, 255)) - # Draw current gaze movement - current_gaze_movement = self.gaze_movement_identifier.current_gaze_movement - - if current_gaze_movement.valid: - - if GazeFeatures.is_fixation(current_gaze_movement): - - current_gaze_movement.draw(image, color=(0, 255, 255)) - current_gaze_movement.draw_positions(image) - - # Draw looked aoi - if self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold: - - self.aoi_2d_scene.draw_circlecast(image, current_gaze_movement.focus, current_gaze_movement.deviation_max, matching_aoi = [self.__looked_aoi], base_color=(0, 0, 0), matching_color=(255, 255, 255)) - - elif GazeFeatures.is_saccade(current_gaze_movement): - - current_gaze_movement.draw(image, color=(0, 255, 255)) - current_gaze_movement.draw_positions(image) - except Exception as e: # Store error to return it @@ -737,7 +991,7 @@ class ArFrame(): @dataclass class ArScene(): """ - Define an Augmented Reality scene with ArUco markers and AOI scenes. + Define an Augmented Reality scene with ArUcoMarkers, ArLayers and ArFrames inside. Parameters: @@ -745,9 +999,9 @@ class ArScene(): aruco_scene: ArUco markers 3D scene description used to estimate scene pose from detected markers: see [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function below. - aoi_3d_scene: AOI 3D scene description that will be projected onto estimated scene once its pose will be estimated : see [project][argaze.ArFeatures.ArScene.project] function below. + layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. - aoi_frames: Optional dictionary to define AOI as ArFrame. + frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. aruco_axis: Optional dictionary to define orthogonal axis where each axis is defined by list of 3 markers identifier (first is origin). \ This pose estimation strategy is used by [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function when at least 3 markers are detected. @@ -760,8 +1014,8 @@ class ArScene(): """ name: str aruco_scene: ArUcoScene.ArUcoScene = field(default_factory=ArUcoScene.ArUcoScene) - aoi_3d_scene: AOI3DScene.AOI3DScene = field(default_factory=AOI3DScene.AOI3DScene) - aoi_frames: dict = field(default_factory=dict) + layers: dict = field(default_factory=dict) + frames: dict = field(default_factory=dict) aruco_axis: dict = field(default_factory=dict) aruco_aoi: dict = field(default_factory=dict) angle_tolerance: float = field(default=0.) @@ -769,29 +1023,59 @@ class ArScene(): def __post_init__(self): - # Define environment attribute: it will be setup by parent environment later - self.__environment = None + # Define parent attribute: it will be setup by parent object later + self.__parent = None + + # Setup layer parent attribute + for name, layer in self.layers.items(): - # Preprocess orthogonal projection to speed up further aruco aoi processings - self.__orthogonal_projection_cache = self.aoi_3d_scene.orthogonal_projection + layer.parent = self - # Setup aoi frame parent attribute - for aoi_name, frame in self.aoi_frames.items(): + # Setup frame parent attribute + for name, frame in self.frames.items(): frame.parent = self + # Preprocess orthogonal projection to speed up further processings + self.__orthogonal_projection_cache = {} + + for name, layer in self.layers.items(): + + self.__orthogonal_projection_cache[name] = self.aoi_scene.orthogonal_projection + def __str__(self) -> str: """ Returns: String representation """ - output = f'ArEnvironment:\n{self.environment.name}\n' + output = f'parent:\n{self.parent.name}\n' output += f'ArUcoScene:\n{self.aruco_scene}\n' - output += f'AOI3DScene:\n{self.aoi_3d_scene}\n' + + if len(self.layers): + output += f'ArLayers:\n' + for name, layer in self.layers.items(): + output += f'{name}:\n{layer}\n' + + if len(self.frames): + output += f'ArFrames:\n' + for name, frame in self.frames.items(): + output += f'{name}:\n{frame}\n' return output + @property + def parent(self): + """Get parent instance""" + + return self.__parent + + @parent.setter + def parent(self, parent): + """Get parent instance""" + + self.__parent = parent + @classmethod def from_dict(self, scene_data, working_directory: str = None) -> ArSceneType: @@ -825,74 +1109,83 @@ class ArScene(): new_aruco_scene = None - # Load optional aoi filter + # Load layers + new_layers = {} + try: - aoi_exclude_list = scene_data.pop('aoi_exclude') + for layer_name, layer_data in scene_data.pop('layers').items(): + + # Append name + layer_data['name'] = layer_name + + # Create layer + new_layer = ArLayer.from_dict(layer_data, working_directory) + + # Append new layer + new_layers[layer_name] = new_layer except KeyError: - aoi_exclude_list = [] + pass - # Load aoi 3d scene - try: + # Load frames + new_frames = {} - # Check aoi_3d_scene value type - aoi_3d_scene_value = scene_data.pop('aoi_3d_scene') + try: - # str: relative path to .obj file - if type(aoi_3d_scene_value) == str: + for frame_name, frame_data in scene_data.pop('frames').items(): - obj_filepath = os.path.join(working_directory, aoi_3d_scene_value) - new_aoi_3d_scene = AOI3DScene.AOI3DScene.from_obj(obj_filepath).copy(exclude=aoi_exclude_list) + # Append name + frame_data['name'] = frame_name - # dict: - else: + # Create frame + new_frame = ArFrame.from_dict(frame_data, working_directory) - new_aoi_3d_scene = AOI3DScene.AOI3DScene(aoi_3d_scene_value).copy(exclude=aoi_exclude_list) + # Look for AOI with same frame name + aoi_frame = None + for layer_name, layer in new_layers.items(): - except KeyError: + try: - new_aoi_3d_scene = None + aoi_frame = layer.aoi_scene[frame_name] - # Load aoi frames - new_aoi_frames = {} + except KeyError: - try: + # AOI name should be unique + break - for aoi_name, aoi_frame_data in scene_data.pop('aoi_frames').items(): + if aoi_frame: - # Create aoi frame - new_aoi_frame = ArFrame.from_dict(aoi_frame_data, working_directory) + # Project and reframe each layers into corresponding frame layers + for frame_layer_name, frame_layer in new_frame.layers.items(): - # Setup aoi frame - new_aoi_frame.name = aoi_name - new_aoi_frame.aoi_2d_scene = new_aoi_3d_scene.orthogonal_projection.reframe(aoi_name, new_aoi_frame.size) + try: - if new_aoi_frame.aoi_scan_path != None: + layer = new_layers[frame_layer_name] + + frame_layer.aoi_scene = layer.aoi_scene.orthogonal_projection.reframe(aoi_frame, new_frame.size) - new_aoi_frame.aoi_scan_path.expected_aois = list(new_aoi_3d_scene.keys()) + if frame_layer.aoi_scan_path != None: - # Append new aoi frame - new_aoi_frames[aoi_name] = new_aoi_frame + # Edit expected AOI list by removing AOI with name equals to frame layer name + expected_aois = list(layer.aoi_scene.keys()) + expected_aois.remove(frame_layer_name) - except KeyError: + frame_layer.aoi_scan_path.expected_aois = expected_aois - pass + except KeyError: - return ArScene(new_scene_name, new_aruco_scene, new_aoi_3d_scene, new_aoi_frames, **scene_data) + continue - @property - def environment(self): - """Get parent environment instance""" + # Append new frame + new_frames[name] = new_frame - return self.__environment + except KeyError: - @environment.setter - def environment(self, environment): - """Set parent environment instance""" + pass - self.__environment = environment + return ArScene(new_scene_name, new_aruco_scene, new_layers, new_frames, **scene_data) def estimate_pose(self, detected_markers) -> Tuple[numpy.array, numpy.array, str, dict]: """Estimate scene pose from detected ArUco markers. @@ -957,8 +1250,8 @@ class ArScene(): return tvec, rmat, 'estimate_pose_from_markers', consistent_markers - def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> AOI2DScene.AOI2DScene: - """Project AOI scene according estimated pose and optional horizontal field of view clipping angle. + def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> dict: + """Project layers according estimated pose and optional horizontal field of view clipping angle. Parameters: tvec: translation vector @@ -966,29 +1259,36 @@ class ArScene(): visual_hfov: horizontal field of view clipping angle Returns: - aoi_2d_scene: scene projection + layer_projections: dictionary of AOI2DScene projection """ - # Clip AOI out of the visual horizontal field of view (optional) - if visual_hfov > 0: + layer_projections = {} - # Transform scene into camera referential - aoi_3d_scene_camera_ref = self.aoi_3d_scene.transform(tvec, rvec) + for name, layer in self.layers.items(): - # Get aoi inside vision cone field - cone_vision_height_cm = 200 # cm - cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm + # Clip AOI out of the visual horizontal field of view (optional) + if visual_hfov > 0: - _, aoi_outside = aoi_3d_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) + # Transform layer aoi scene into camera referential + aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec) - # Keep only aoi inside vision cone field - aoi_3d_scene_copy = self.aoi_3d_scene.copy(exclude=aoi_outside.keys()) + # Get aoi inside vision cone field + cone_vision_height_cm = 200 # cm + cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm + + _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) + + # Keep only aoi inside vision cone field + aoi_scene_copy = layer.aoi_scene.copy(exclude=aoi_outside.keys()) + + else: - else: + aoi_scene_copy = layer.aoi_scene.copy() - aoi_3d_scene_copy = self.aoi_3d_scene.copy() + # Project layer aoi scene + layer_projections[name] = aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) - return aoi_3d_scene_copy.project(tvec, rvec, self.environment.aruco_detector.optic_parameters.K) + return projected_layers def build_aruco_aoi_scene(self, detected_markers) -> AOI2DScene.AOI2DScene: """ @@ -1044,7 +1344,7 @@ class ArScene(): image: where to draw """ - self.aruco_scene.draw_axis(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D) + self.aruco_scene.draw_axis(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D) def draw_places(self, image: numpy.array): """ @@ -1054,7 +1354,7 @@ class ArScene(): image: where to draw """ - self.aruco_scene.draw_places(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D) + self.aruco_scene.draw_places(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D) @dataclass class ArEnvironment(): @@ -1080,10 +1380,10 @@ class ArEnvironment(): self.camera_frame.parent = self - # Setup scenes environment attribute + # Setup scenes parent attribute for name, scene in self.scenes.items(): - scene.environment = self + scene.parent = self # Init a lock to share AOI scene projections into camera frame between multiple threads self.__camera_frame_lock = threading.Lock() @@ -1155,29 +1455,37 @@ class ArEnvironment(): # Build scenes new_scenes = {} - for new_scene_name, scene_data in environment_data.pop('scenes').items(): + for scene_name, scene_data in environment_data.pop('scenes').items(): + + # Append name + scene_data['name'] = scene_name # Create new scene new_scene = ArScene.from_dict(scene_data, working_directory) - # Setup new scene - new_scene.name = new_scene_name - # Append new scene - new_scenes[new_scene_name] = new_scene + new_scenes[scene_name] = new_scene - # Setup expected aoi for camera frame aoi scan path + # Setup expected aoi of each camera frame layer aoi scan path with the aoi of corresponding scene layer if new_camera_frame != None: - if new_camera_frame.aoi_scan_path != None: + for camera_frame_layer_name, camera_frame_layer in new_camera_frame.layers.items(): - # List all environment aoi all_aoi_list = [] + for scene_name, scene in new_scenes.items(): - all_aoi_list.extend(list(scene.aoi_3d_scene.keys())) + try: + + scene_layer = scene.layers[camera_frame_layer_name] - new_camera_frame.aoi_scan_path.expected_aois = all_aoi_list + all_aoi_list.extend(list(scene_layer.aoi_scene.keys())) + + except KeyError: + + continue + + camera_frame_layer.aoi_scan_path.expected_aois = all_aoi_list # Create new environment return ArEnvironment(new_environment_name, new_aruco_detector, new_camera_frame, new_scenes) @@ -1232,16 +1540,16 @@ class ArEnvironment(): return image @property - def aoi_frames(self): - """Iterate over all environment scenes aoi frames""" + def frames(self): + """Iterate over all environment scenes frames""" # For each scene for scene_name, scene in self.scenes.items(): - # For each aoi frame - for frame_name, aoi_frame in scene.aoi_frames.items(): + # For each frame + for name, frame in scene.frames.items(): - yield aoi_frame + yield frame def detect_and_project(self, image: numpy.array) -> Tuple[float, dict]: """Detect environment aruco markers from image and project scenes into camera frame. @@ -1325,13 +1633,13 @@ class ArEnvironment(): # Project gaze position into camera frame yield self.camera_frame, self.camera_frame.look(timestamp, gaze_position) - # Project gaze position into each aoi frames if possible - for aoi_frame in self.aoi_frames: + # Project gaze position into each frames if possible + for frame in self.frames: - # Is aoi frame projected into camera frame ? + # Is there an AOI with the same frame name projected into camera frame ? try: - aoi_2d = self.camera_frame.aoi_2d_scene[aoi_frame.name] + aoi_2d = self.camera_frame.aoi_2d_scene[frame.name] # TODO: Add option to use gaze precision circle if aoi_2d.contains_point(gaze_position.value): @@ -1341,7 +1649,7 @@ class ArEnvironment(): # QUESTION: How to project gaze precision? inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) - yield aoi_frame, aoi_frame.look(timestamp, inner_gaze_position * aoi_frame.size) + yield frame, frame.look(timestamp, inner_gaze_position * frame.size) # Ignore missing aoi frame projection except KeyError: @@ -1352,7 +1660,7 @@ class ArEnvironment(): self.__camera_frame_lock.release() def map(self): - """Project camera frame background into aoi frames background. + """Project camera frame background into frames background. .. warning:: detect_and_project method needs to be called first. """ @@ -1365,20 +1673,20 @@ class ArEnvironment(): self.__camera_frame_lock.acquire() # Project image if possible - for aoi_frame in self.aoi_frames: + for frame in self.frames: - # Is aoi frame projected into camera frame ? + # Is there an AOI with the same frame name projected into camera frame ? try: - aoi_2d = self.camera_frame.aoi_2d_scene[aoi_frame.name] + aoi_2d = self.camera_frame.aoi_2d_scene[frame.name] # Apply perspective transform algorithm to fill aoi frame background - width, height = aoi_frame.size + width, height = frame.size destination = numpy.float32([[0, height],[width, height],[width, 0],[0, 0]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) - aoi_frame.background = cv2.warpPerspective(self.camera_frame.background, mapping, (width, height)) + frame.background = cv2.warpPerspective(self.camera_frame.background, mapping, (width, height)) - # Ignore missing aoi frame projection + # Ignore missing frame projection except KeyError: pass diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py index 694e304..b2dba39 100644 --- a/src/argaze/AreaOfInterest/AOI2DScene.py +++ b/src/argaze/AreaOfInterest/AOI2DScene.py @@ -112,21 +112,23 @@ class AOI2DScene(AOIFeatures.AOIScene): # Draw form aoi.draw(image, color) - def reframe(self, aoi_name: str, size: tuple) -> AOI2DSceneType: + def reframe(self, aoi: AOIFeatures.AreaOfInterest, size: tuple) -> AOI2DSceneType: """ - Reframe whole scene to a scene bounded by an AOI. + Reframe whole scene to a scene bounded by a 4 vertices 2D AOI. Parameters: - aoi: name of AOI used to reframe scene + aoi: 4 vertices 2D AOI used to reframe scene + size: size of reframed scene Returns: reframed AOI 2D scene """ - assert(self[aoi_name].points_number == 4) + assert(aoi.dimension == 2) + assert(aoi.points_number == 4) # Edit affine transformation (M) allowing to transform source axis (Src) into destination axis (Dst) - Src = self[aoi_name].clockwise().astype(numpy.float32) + Src = aoi.clockwise().astype(numpy.float32) Src_origin = Src[0] Src = Src - Src_origin Dst = numpy.float32([[0, 0], [size[0], 0], [size[0], size[1]], [0, size[1]]]) diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index 8c684c0..5a9d0a9 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -241,6 +241,42 @@ class AOIScene(): for name, area in areas.items(): self[name] = AreaOfInterest(area) + @classmethod + def from_dict(self, aoi_scene_data, working_directory: str = None) -> AOISceneType: + """Load attributes from dictionary. + + Parameters: + aoi_scene_data: dictionary with attributes to load + working_directory: folder path where to load files when a dictionary value is a relative filepath. + """ + + # Load areas + areas = {} + + for name, area in aoi_scene_data.items(): + areas[name] = AreaOfInterest(area) + + # Guess dimension from first area dimension + dimension = areas.values()[0].dimension + + return AOIScene(dimension = dimension, areas = areas) + + @classmethod + def from_json(self, json_filepath: str) -> AOISceneType: + """ + Load attributes from .json file. + + Parameters: + json_filepath: path to json file + """ + + with open(json_filepath) as configuration_file: + + aoi_scene_data = json.load(configuration_file) + working_directory = os.path.dirname(json_filepath) + + return AOIScene.from_dict(aoi_scene_data, working_directory) + def __getitem__(self, name) -> AreaOfInterest: """Get an AOI from the scene.""" diff --git a/src/argaze/utils/demo_ar_features_run.py b/src/argaze/utils/demo_ar_features_run.py index bd48d0b..0df81c5 100644 --- a/src/argaze/utils/demo_ar_features_run.py +++ b/src/argaze/utils/demo_ar_features_run.py @@ -49,7 +49,7 @@ def main(): for frame, look_data in ar_environment.look(timestamp, GazeFeatures.GazePosition((x, y))): # Unpack look data - movement, scan_step_analysis, aoi_scan_step_analysis, times, exception = look_data + movement, scan_step_analysis, layer_analysis, execution_times, exception = look_data # Do something with look data # ... @@ -94,17 +94,17 @@ def main(): # Display environment cv2.imshow(ar_environment.name, environment_image) - # Draw and display each aoi frames - for aoi_frame in ar_environment.aoi_frames: + # Draw and display each frames + for frame in ar_environment.frames: # Create frame image - aoi_frame_image = aoi_frame.image + frame_image = frame.image # Draw frame info - aoi_frame.draw(aoi_frame_image) + frame.draw(frame_image) # Display frame - cv2.imshow(f'{aoi_frame.parent.name}:{aoi_frame.name}', aoi_frame_image) + cv2.imshow(f'{frame.parent.name}:{frame.name}', frame_image) # Stop by pressing 'Esc' key if cv2.waitKey(10) == 27: diff --git a/src/argaze/utils/demo_environment/demo_ar_features_setup.json b/src/argaze/utils/demo_environment/demo_ar_features_setup.json index 3e030f8..b943a83 100644 --- a/src/argaze/utils/demo_environment/demo_ar_features_setup.json +++ b/src/argaze/utils/demo_environment/demo_ar_features_setup.json @@ -23,8 +23,12 @@ "scenes": { "AR Scene Demo" : { "aruco_scene": "aruco_scene.obj", - "aoi_3d_scene": "aoi_3d_scene.obj", - "aoi_frames": { + "layers": { + "MainLayer" : { + "aoi_scene": "aoi_3d_scene.obj" + } + }, + "frames": { "GrayRectangle": { "size": [640, 480], "background": "frame_background.jpg", diff --git a/src/argaze/utils/demo_environment/demo_gaze_features_setup.json b/src/argaze/utils/demo_environment/demo_gaze_features_setup.json index 49bf257..f9947d6 100644 --- a/src/argaze/utils/demo_environment/demo_gaze_features_setup.json +++ b/src/argaze/utils/demo_environment/demo_gaze_features_setup.json @@ -1,49 +1,45 @@ { - "name": "AR Environment Demo", - "scenes": { - "AR Scene Demo" : { - "aoi_3d_scene": "aoi_3d_scene.obj", - "aoi_frames": { - "GrayRectangle": { - "size": [1920, 1149], - "background": "frame_background.jpg", - "gaze_movement_identifier": { - "DispersionThresholdIdentification": { - "deviation_max_threshold": 50, - "duration_min_threshold": 200 - } - }, - "scan_path": { - "duration_max": 10000 - }, - "scan_path_analyzers": { - "Basic": {}, - "KCoefficient": {}, - "NearestNeighborIndex": { - "size": [1920, 1149] - }, - "ExploitExploreRatio": { - "short_fixation_duration_threshold": 0 - } - }, - "aoi_scan_path": { - "duration_max": 10000 - }, - "aoi_scan_path_analyzers": { - "Basic": {}, - "TransitionMatrix": {}, - "KCoefficient": {}, - "LempelZivComplexity": {}, - "NGram": { - "n_min": 3, - "n_max": 3 - }, - "Entropy":{} - }, - "heatmap": { - "size": [320, 240] - } - } + "name": "ArFrame Demo", + "size": [1920, 1149], + "background": "frame_background.jpg", + "gaze_movement_identifier": { + "DispersionThresholdIdentification": { + "deviation_max_threshold": 50, + "duration_min_threshold": 200 + } + }, + "scan_path": { + "duration_max": 10000 + }, + "scan_path_analyzers": { + "Basic": {}, + "KCoefficient": {}, + "NearestNeighborIndex": { + "size": [1920, 1149] + }, + "ExploitExploreRatio": { + "short_fixation_duration_threshold": 0 + } + }, + "heatmap": { + "size": [320, 240] + }, + "layers": { + "GrayRectangle": { + "aoi_scene": "aoi_3d_scene.obj", + "aoi_scan_path": { + "duration_max": 10000 + }, + "aoi_scan_path_analyzers": { + "Basic": {}, + "TransitionMatrix": {}, + "KCoefficient": {}, + "LempelZivComplexity": {}, + "NGram": { + "n_min": 3, + "n_max": 3 + }, + "Entropy":{} } } } diff --git a/src/argaze/utils/demo_gaze_features_run.py b/src/argaze/utils/demo_gaze_features_run.py index 15fc4f4..915ae86 100644 --- a/src/argaze/utils/demo_gaze_features_run.py +++ b/src/argaze/utils/demo_gaze_features_run.py @@ -21,23 +21,20 @@ import pandas def main(): """ - Load AR environment from .json file to project AOI scene on screen and use mouse pointer to simulate gaze positions. + Load ArFrame from .json file and use mouse pointer to simulate gaze positions. """ current_directory = os.path.dirname(os.path.abspath(__file__)) # Manage arguments parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('environment', metavar='ENVIRONMENT', type=str, help='ar environment filepath') + parser.add_argument('frame', metavar='FRAME', type=str, help='ar frame filepath') args = parser.parse_args() - # Load AR environment - ar_environment = ArFeatures.ArEnvironment.from_json(args.environment) + # Load ArFrame + ar_frame = ArFeatures.ArFrame.from_json(args.frame) - # Select AR frame - ar_frame = ar_environment.scenes["AR Scene Demo"].aoi_frames["GrayRectangle"] - - # Create a window to display AR environment + # Create a window to display ArEnvironment cv2.namedWindow(ar_frame.name, cv2.WINDOW_AUTOSIZE) # Heatmap buffer display option @@ -53,7 +50,7 @@ def main(): timestamp = int((time.time() - start_time) * 1e3) # Project gaze position into frame - movement, scan_step_analysis, aoi_scan_step_analysis, times, exception = ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y))) + movement, scan_step_analysis, layer_analysis, execution_times, exception = ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y))) # Do something with look data # ... @@ -80,45 +77,45 @@ def main(): # Write last 5 steps of aoi scan path path = '' - for step in ar_frame.aoi_scan_path[-5:]: + for step in ar_frame.layers["GrayRectangle"].aoi_scan_path[-5:]: path += f'> {step.aoi} ' - path += f'> {ar_frame.aoi_scan_path.current_aoi}' + path += f'> {ar_frame.layers["GrayRectangle"].aoi_scan_path.current_aoi}' cv2.putText(frame_image, path, (20, ar_frame.size[1]-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) # Display Transition matrix analysis if loaded - try: + #try: - transition_matrix_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.TransitionMatrix"] + transition_matrix_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.TransitionMatrix"] - cv2.putText(frame_image, f'Transition matrix density: {transition_matrix_analyzer.transition_matrix_density:.2f}', (20, ar_frame.size[1]-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - - # Iterate over indexes (departures) - for from_aoi, row in transition_matrix_analyzer.transition_matrix_probabilities.iterrows(): + cv2.putText(frame_image, f'Transition matrix density: {transition_matrix_analyzer.transition_matrix_density:.2f}', (20, ar_frame.size[1]-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + + # Iterate over indexes (departures) + for from_aoi, row in transition_matrix_analyzer.transition_matrix_probabilities.iterrows(): - # Iterate over columns (destinations) - for to_aoi, probability in row.items(): + # Iterate over columns (destinations) + for to_aoi, probability in row.items(): - if from_aoi != to_aoi and probability > 0.0: + if from_aoi != to_aoi and probability > 0.0: - from_center = ar_frame.aoi_2d_scene[from_aoi].center.astype(int) - to_center = ar_frame.aoi_2d_scene[to_aoi].center.astype(int) - start_line = (0.5 * from_center + 0.5 * to_center).astype(int) + from_center = ar_frame.layers['GrayRectangle'].aoi_scene[from_aoi].center.astype(int) + to_center = ar_frame.layers['GrayRectangle'].aoi_scene[to_aoi].center.astype(int) + start_line = (0.5 * from_center + 0.5 * to_center).astype(int) - color = [int(probability*200) + 55, int(probability*200) + 55, int(probability*200) + 55] + color = [int(probability*200) + 55, int(probability*200) + 55, int(probability*200) + 55] - cv2.line(frame_image, start_line, to_center, color, int(probability*10) + 2) - cv2.line(frame_image, from_center, to_center, [55, 55, 55], 2) - - except KeyError: - pass + cv2.line(frame_image, start_line, to_center, color, int(probability*10) + 2) + cv2.line(frame_image, from_center, to_center, [55, 55, 55], 2) + + #except KeyError: + # pass # Display aoi scan path basic metrics analysis if loaded try: - basic_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.Basic"] + basic_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.Basic"] # Write basic analysis cv2.putText(frame_image, f'Step number: {basic_analyzer.steps_number}', (20, ar_frame.size[1]-440), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) @@ -147,7 +144,7 @@ def main(): # Display aoi scan path K-modified coefficient analysis if loaded try: - aoi_kc_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.KCoefficient"] + aoi_kc_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.KCoefficient"] # Write aoi Kc analysis if aoi_kc_analyzer.K < 0.: @@ -164,7 +161,7 @@ def main(): # Display Lempel-Ziv complexity analysis if loaded try: - lzc_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.LempelZivComplexity"] + lzc_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.LempelZivComplexity"] cv2.putText(frame_image, f'Lempel-Ziv complexity: {lzc_analyzer.lempel_ziv_complexity}', (20, ar_frame.size[1]-200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) @@ -174,7 +171,7 @@ def main(): # Display N-Gram analysis if loaded try: - ngram_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.NGram"] + ngram_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.NGram"] # Display only 3-gram analysis start = ar_frame.size[1] - ((len(ngram_analyzer.ngrams_count[3]) + 1) * 40) @@ -194,7 +191,7 @@ def main(): # Display Entropy analysis if loaded try: - entropy_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.Entropy"] + entropy_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.Entropy"] cv2.putText(frame_image, f'Stationary entropy: {entropy_analyzer.stationary_entropy:.3f},', (20, ar_frame.size[1]-280), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) cv2.putText(frame_image, f'Transition entropy: {entropy_analyzer.transition_entropy:.3f},', (20, ar_frame.size[1]-240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) @@ -234,8 +231,7 @@ def main(): # Reload environment with 'h' key if key_pressed == 114: - ar_environment = ArFeatures.ArEnvironment.from_json(args.environment) - ar_frame = ar_environment.scenes["AR Scene Demo"].aoi_frames["GrayRectangle"] + ar_frame = ArFeatures.ArFrame.from_json(args.frame) # Enable heatmap buffer with 'b' key if key_pressed == 98: -- cgit v1.1