#!/usr/bin/env python """Manage AR environement assets.""" __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple from dataclasses import dataclass, field import json import os import importlib from inspect import getmembers import threading from argaze import DataStructures, GazeFeatures from argaze.ArUcoMarkers import * from argaze.AreaOfInterest import * from argaze.GazeAnalysis import * import numpy import cv2 ArEnvironmentType = TypeVar('ArEnvironment', bound="ArEnvironment") # Type definition for type annotation convenience ArSceneType = TypeVar('ArScene', bound="ArScene") # Type definition for type annotation convenience ArFrameType = TypeVar('ArFrame', bound="ArFrame") # Type definition for type annotation convenience class EnvironmentJSONLoadingFailed(Exception): """ Exception raised by ArEnvironment when JSON loading fails. """ def __init__(self, message): super().__init__(message) class PoseEstimationFailed(Exception): """ Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. """ def __init__(self, message, unconsistencies=None): super().__init__(message) self.unconsistencies = unconsistencies class SceneProjectionFailed(Exception): """ Exception raised by ArEnvironment detect_and_project method when the scene can't be projected. """ def __init__(self, message): super().__init__(message) @dataclass class ArFrame(): """ Define Augmented Reality frame as an AOI2DScene made from a projected then reframed parent AOI3DScene. Parameters: name: name of the frame size: frame dimension in pixel. background: image to draw behind aoi_2d_scene: AOI 2D scene description ... : see [orthogonal_projection][argaze.ArFeatures.ArScene.orthogonal_projection] and [reframe][argaze.AreaOfInterest.AOI2DScene.reframe] functions. ... """ name: str size: tuple[int] = field(default=(1, 1)) aoi_2d_scene: AOI2DScene.AOI2DScene = field(default_factory=AOI2DScene.AOI2DScene) background: numpy.array = field(default_factory=numpy.array) gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier) scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath) scan_path_analyzers: dict = field(default_factory=dict) aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) aoi_scan_path_analyzers: dict = field(default_factory=dict) heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap) def __post_init__(self): # Define parent attribute: it will be setup by parent later self.__parent = None # Init current gaze position self.__gaze_position = GazeFeatures.UnvalidGazePosition() # Init heatmap if required if self.heatmap: self.heatmap.init() # Init lock to share looked data wit hmultiples threads self.__look_lock = threading.Lock() @classmethod def from_dict(self, frame_data, working_directory: str = None) -> ArFrameType: # Load name try: new_frame_name = frame_data.pop('name') except KeyError: new_frame_name = None # Load size try: new_frame_size = frame_data.pop('size') except KeyError: new_frame_size = (0, 0) # Load aoi 2D scene try: new_aoi_2d_scene_value = frame_data.pop('aoi_2d_scene') # str: relative path to .json file if type(new_aoi_2d_scene_value) == str: json_filepath = os.path.join(working_directory, new_aoi_2d_scene_value) new_aoi_2d_scene = AOI2DScene.AOI2DScene.from_json(obj_filepath) # dict: else: new_aoi_2d_scene = AOI2DScene.AOI2DScene(new_aoi_2d_scene_value) except KeyError: new_aoi_2d_scene = AOI2DScene.AOI2DScene() # Load background image try: new_frame_background_value = frame_data.pop('background') new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) new_frame_background = cv2.resize(new_frame_background, dsize=(new_frame_size[0], new_frame_size[1]), interpolation=cv2.INTER_CUBIC) except KeyError: new_frame_background = numpy.zeros((new_frame_size[1], new_frame_size[0], 3)).astype(numpy.uint8) # Load gaze movement identifier try: gaze_movement_identifier_value = frame_data.pop('gaze_movement_identifier') gaze_movement_identifier_type, gaze_movement_identifier_parameters = gaze_movement_identifier_value.popitem() gaze_movement_identifier_module = importlib.import_module(f'argaze.GazeAnalysis.{gaze_movement_identifier_type}') finished_gaze_movement_identifier = gaze_movement_identifier_module.GazeMovementIdentifier(**gaze_movement_identifier_parameters) except KeyError: finished_gaze_movement_identifier = None # Load scan path analyzers new_scan_path_analyzers = {} try: new_scan_path_analyzers_value = frame_data.pop('scan_path_analyzers') for scan_path_analyzer_type, scan_path_analyzer_parameters in new_scan_path_analyzers_value.items(): scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{scan_path_analyzer_type}') # Check scan path analyzer parameters type members = getmembers(scan_path_analyzer_module.ScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of argaze.GazeAnalysis module parameter_module_path = parameter_type.__module__.split('.') if len(parameter_module_path) == 3: if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis': # Try get existing analyzer instance to append as parameter try: scan_path_analyzer_parameters[parameter] = new_scan_path_analyzers[parameter_module_path[2]] except KeyError: raise EnvironmentJSONLoadingFailed(f'{scan_path_analyzer_type} scan path analyzer loading fails because {parameter_module_path[2]} scan path analyzer is missing.') scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters) new_scan_path_analyzers[scan_path_analyzer_type] = scan_path_analyzer except KeyError: pass # Load AOI scan path analyzers new_aoi_scan_path_analyzers = {} try: new_aoi_scan_path_analyzers_value = frame_data.pop('aoi_scan_path_analyzers') for aoi_scan_path_analyzer_type, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items(): aoi_scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_type}') # Check aoi scan path analyzer parameters type members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer) for member in members: if '__annotations__' in member: for parameter, parameter_type in member[1].items(): # Check if parameter is part of argaze.GazeAnalysis module parameter_module_path = parameter_type.__module__.split('.') if len(parameter_module_path) == 3: if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis': # Try get existing analyzer instance to append as parameter try: aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_module_path[2]] except KeyError: raise EnvironmentJSONLoadingFailed(f'{aoi_scan_path_analyzer_type} aoi scan path analyzer loading fails because {parameter_module_path[2]} aoi scan path analyzer is missing.') aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters) new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_type] = aoi_scan_path_analyzer except KeyError: pass # Load heatmap try: new_heatmap_value = frame_data.pop('heatmap') except KeyError: new_heatmap_value = False # Create frame return ArFrame(new_frame_name, \ new_frame_size, \ new_aoi_2d_scene, \ new_frame_background, \ finished_gaze_movement_identifier, \ GazeFeatures.ScanPath() if len(new_scan_path_analyzers) > 0 else None, \ new_scan_path_analyzers, \ GazeFeatures.AOIScanPath(list(new_aoi_2d_scene.keys())) if len(new_aoi_scan_path_analyzers) > 0 else None, \ new_aoi_scan_path_analyzers, \ AOIFeatures.Heatmap(new_frame_size) if new_heatmap_value else None \ ) @classmethod def from_json(self, json_filepath: str) -> ArEnvironmentType: """ Load ArFrame from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: frame_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArFrame.from_dict(frame_data, working_directory) @property def parent(self): """Get parent instance""" return self.__parent @parent.setter def parent(self, parent): """Get parent instance""" self.__parent = parent @property def image(self): """ Get background image + heatmap image """ # Lock frame exploitation self.__look_lock.acquire() image = self.background.copy() # Draw heatmap if self.heatmap: image = cv2.addWeighted(self.heatmap.image, 0.5, image, 1., 0) # Unlock frame exploitation self.__look_lock.release() return image def look(self, timestamp: int|float, inner_gaze_position: GazeFeatures.GazePosition) -> Tuple[GazeFeatures.GazeMovement, str, dict, dict, dict]: """ GazeFeatures.AOIScanStepError Returns: fixation: identified fixation (if gaze_movement_identifier is instanciated) look at: when identified fixation looks at scan_step: new scan step (if scan_path is instanciated) aoi_scan_step: new scan step (if aoi_scan_path is instanciated) """ # Lock frame exploitation self.__look_lock.acquire() # Update current gaze position self.__gaze_position = inner_gaze_position # No fixation is identified by default fixation = GazeFeatures.UnvalidGazeMovement() # No aoi is looked by default look_at = None # Init scan path analysis report scan_step_analysis = {} aoi_scan_step_analysis = {} # Catch any error exception = None try: # Identify gaze movement if self.gaze_movement_identifier: # Identify finished gaze movement finished_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__gaze_position) # Valid and finished gaze movement has been identified if finished_gaze_movement.valid: if GazeFeatures.is_fixation(finished_gaze_movement): # Update current fixation fixation = finished_gaze_movement # Does the fixation match an aoi? for name, aoi in self.aoi_2d_scene.items(): _, _, circle_ratio = aoi.circle_intersection(finished_gaze_movement.focus, finished_gaze_movement.deviation_max) if circle_ratio > 0.25: if name != self.name: # Update current look at look_at = name break # Append fixation to scan path if self.scan_path != None: self.scan_path.append_fixation(timestamp, finished_gaze_movement) # Append fixation to aoi scan path if self.aoi_scan_path != None and look_at != None: aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, finished_gaze_movement, look_at) # Analyze aoi scan path if aoi_scan_step and len(self.aoi_scan_path) > 1: for aoi_scan_path_analyzer_type, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): aoi_scan_path_analyzer.analyze(self.aoi_scan_path) aoi_scan_step_analysis[aoi_scan_path_analyzer_type] = aoi_scan_path_analyzer.analysis elif GazeFeatures.is_saccade(finished_gaze_movement): # Update current look at look_at = None # Append saccade to scan path if self.scan_path != None: scan_step = self.scan_path.append_saccade(timestamp, finished_gaze_movement) # Analyze aoi scan path if scan_step and len(self.scan_path) > 1: for scan_path_analyzer_type, scan_path_analyzer in self.scan_path_analyzers.items(): scan_path_analyzer.analyze(self.scan_path) scan_step_analysis[scan_path_analyzer_type] = scan_path_analyzer.analysis # Append saccade to aoi scan path if self.aoi_scan_path != None: self.aoi_scan_path.append_saccade(timestamp, finished_gaze_movement) # No valid finished gaze movement: check current fixation else: current_fixation = self.gaze_movement_identifier.current_fixation if current_fixation.valid: # Update current fixation fixation = current_fixation # Does the fixation match an aoi? for name, aoi in self.aoi_2d_scene.items(): _, _, circle_ratio = aoi.circle_intersection(current_fixation.focus, current_fixation.deviation_max) if circle_ratio > 0.25: if name != self.name: # Update current look at look_at = name break # Update heatmap if self.heatmap: self.heatmap.update(self.__gaze_position.value, sigma=0.05) except Exception as e: fixation = GazeFeatures.UnvalidGazeMovement() look_at = None scan_step_analysis = {} aoi_scan_step_analysis = {} exception = e # Unlock frame exploitation self.__look_lock.release() # Return look data return fixation, look_at, scan_step_analysis, aoi_scan_step_analysis, exception def draw(self, image:numpy.array, aoi_color=(0, 0, 0)): """ Draw frame into image. Parameters: image: where to draw """ # Lock frame exploitation self.__look_lock.acquire() # Draw aoi self.aoi_2d_scene.draw(image, color=aoi_color) # Draw current gaze position self.__gaze_position.draw(image, color=(255, 255, 255)) # Draw current gaze movements if self.gaze_movement_identifier: current_fixation = self.gaze_movement_identifier.current_fixation if current_fixation.valid: current_fixation.draw(image, color=(0, 255, 255)) current_fixation.draw_positions(image) # Draw looked AOI self.aoi_2d_scene.draw_circlecast(image, current_fixation.focus, current_fixation.deviation_max, base_color=(0, 0, 0), matching_color=(255, 255, 255)) current_saccade = self.gaze_movement_identifier.current_saccade if current_saccade.valid: current_saccade.draw(image, color=(0, 255, 255)) current_saccade.draw_positions(image) # Unlock frame exploitation self.__look_lock.release() @dataclass class ArScene(): """ Define an Augmented Reality scene with ArUco markers and AOI scenes. Parameters: name: name of the scene aruco_scene: ArUco markers 3D scene description used to estimate scene pose from detected markers: see [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function below. aoi_3d_scene: AOI 3D scene description that will be projected onto estimated scene once its pose will be estimated : see [project][argaze.ArFeatures.ArScene.project] function below. aoi_frames: Optional dictionary to define AOI as ArFrame. aruco_axis: Optional dictionary to define orthogonal axis where each axis is defined by list of 3 markers identifier (first is origin). \ This pose estimation strategy is used by [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function when at least 3 markers are detected. aruco_aoi: Optional dictionary of AOI defined by list of markers identifier and markers corners index tuples: see [build_aruco_aoi_scene][argaze.ArFeatures.ArScene.build_aruco_aoi_scene] function below. angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. """ name: str aruco_scene: ArUcoScene.ArUcoScene = field(default_factory=ArUcoScene.ArUcoScene) aoi_3d_scene: AOI3DScene.AOI3DScene = field(default_factory=AOI3DScene.AOI3DScene) aoi_frames: dict = field(default_factory=dict) aruco_axis: dict = field(default_factory=dict) aruco_aoi: dict = field(default_factory=dict) angle_tolerance: float = field(default=0.) distance_tolerance: float = field(default=0.) def __post_init__(self): # Define environment attribute: it will be setup by parent environment later self.__environment = None # Preprocess orthogonal projection to speed up further aruco aoi processings self.__orthogonal_projection_cache = self.aoi_3d_scene.orthogonal_projection # Setup aoi frame parent attribute for aoi_name, frame in self.aoi_frames.items(): frame.parent = self def __str__(self) -> str: """ Returns: String representation """ output = f'ArEnvironment:\n{self.environment.name}\n' output += f'ArUcoScene:\n{self.aruco_scene}\n' output += f'AOI3DScene:\n{self.aoi_3d_scene}\n' return output @classmethod def from_dict(self, scene_data, working_directory: str = None) -> ArSceneType: # Load name try: new_scene_name = scene_data.pop('name') except KeyError: new_scene_name = None # Load aruco scene try: # Check aruco_scene value type aruco_scene_value = scene_data.pop('aruco_scene') # str: relative path to .obj file if type(aruco_scene_value) == str: aruco_scene_value = os.path.join(working_directory, aruco_scene_value) new_aruco_scene = ArUcoScene.ArUcoScene.from_obj(aruco_scene_value) # dict: else: new_aruco_scene = ArUcoScene.ArUcoScene(**aruco_scene_value) except KeyError: new_aruco_scene = None # Load optional aoi filter try: aoi_exclude_list = scene_data.pop('aoi_exclude') except KeyError: aoi_exclude_list = [] # Load aoi 3d scene try: # Check aoi_3d_scene value type aoi_3d_scene_value = scene_data.pop('aoi_3d_scene') # str: relative path to .obj file if type(aoi_3d_scene_value) == str: obj_filepath = os.path.join(working_directory, aoi_3d_scene_value) new_aoi_3d_scene = AOI3DScene.AOI3DScene.from_obj(obj_filepath).copy(exclude=aoi_exclude_list) # dict: else: new_aoi_3d_scene = AOI3DScene.AOI3DScene(aoi_3d_scene_value).copy(exclude=aoi_exclude_list) except KeyError: new_aoi_3d_scene = None # Load aoi frames new_aoi_frames = {} try: for aoi_name, aoi_frame_data in scene_data.pop('aoi_frames').items(): # Create aoi frame new_aoi_frame = ArFrame.from_dict(aoi_frame_data, working_directory) # Setup aoi frame new_aoi_frame.name = aoi_name new_aoi_frame.aoi_2d_scene = new_aoi_3d_scene.orthogonal_projection.reframe(aoi_name, new_aoi_frame.size) if new_aoi_frame.aoi_scan_path != None: new_aoi_frame.aoi_scan_path.expected_aois = list(new_aoi_3d_scene.keys()) # Append new aoi frame new_aoi_frames[aoi_name] = new_aoi_frame except KeyError: pass return ArScene(new_scene_name, new_aruco_scene, new_aoi_3d_scene, new_aoi_frames, **scene_data) @property def environment(self): """Get parent environment instance""" return self.__environment @environment.setter def environment(self, environment): """Set parent environment instance""" self.__environment = environment def estimate_pose(self, detected_markers) -> Tuple[numpy.array, numpy.array, str, dict]: """Estimate scene pose from detected ArUco markers. Returns: scene translation vector scene rotation matrix pose estimation strategy dict of markers used to estimate the pose """ # Pose estimation fails when no marker is detected if len(detected_markers) == 0: raise PoseEstimationFailed('No marker detected') scene_markers, _ = self.aruco_scene.filter_markers(detected_markers) # Pose estimation fails when no marker belongs to the scene if len(scene_markers) == 0: raise PoseEstimationFailed('No marker belongs to the scene') # Estimate scene pose from unique marker transformations elif len(scene_markers) == 1: marker_id, marker = scene_markers.popitem() tvec, rmat = self.aruco_scene.estimate_pose_from_single_marker(marker) return tvec, rmat, 'estimate_pose_from_single_marker', {marker_id: marker} # Try to estimate scene pose from 3 markers defining an orthogonal axis elif len(scene_markers) >= 3 and len(self.aruco_axis) > 0: for axis_name, axis_markers in self.aruco_axis.items(): try: origin_marker = scene_markers[axis_markers['origin_marker']] horizontal_axis_marker = scene_markers[axis_markers['horizontal_axis_marker']] vertical_axis_marker = scene_markers[axis_markers['vertical_axis_marker']] tvec, rmat = self.aruco_scene.estimate_pose_from_axis_markers(origin_marker, horizontal_axis_marker, vertical_axis_marker) return tvec, rmat, 'estimate_pose_from_axis_markers', {origin_marker.identifier: origin_marker, horizontal_axis_marker.identifier: horizontal_axis_marker, vertical_axis_marker.identifier: vertical_axis_marker} except: pass raise PoseEstimationFailed('No marker axis') # Otherwise, check markers consistency consistent_markers, unconsistent_markers, unconsistencies = self.aruco_scene.check_markers_consistency(scene_markers, self.angle_tolerance, self.distance_tolerance) # Pose estimation fails when no marker passes consistency checking if len(consistent_markers) == 0: raise PoseEstimationFailed('Unconsistent marker poses', unconsistencies) # Otherwise, estimate scene pose from all consistent markers pose tvec, rmat = self.aruco_scene.estimate_pose_from_markers(consistent_markers) return tvec, rmat, 'estimate_pose_from_markers', consistent_markers def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> AOI2DScene.AOI2DScene: """Project AOI scene according estimated pose and optional horizontal field of view clipping angle. Parameters: tvec: translation vector rvec: rotation vector visual_hfov: horizontal field of view clipping angle Returns: aoi_2d_scene: scene projection """ # Clip AOI out of the visual horizontal field of view (optional) if visual_hfov > 0: # Transform scene into camera referential aoi_3d_scene_camera_ref = self.aoi_3d_scene.transform(tvec, rvec) # Get aoi inside vision cone field cone_vision_height_cm = 200 # cm cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm _, aoi_outside = aoi_3d_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) # Keep only aoi inside vision cone field aoi_3d_scene_copy = self.aoi_3d_scene.copy(exclude=aoi_outside.keys()) else: aoi_3d_scene_copy = self.aoi_3d_scene.copy() return aoi_3d_scene_copy.project(tvec, rvec, self.environment.aruco_detector.optic_parameters.K) def build_aruco_aoi_scene(self, detected_markers) -> AOI2DScene.AOI2DScene: """ Build AOI scene from detected ArUco markers as defined in aruco_aoi dictionary. Returns: aoi_2d_scene: built AOI 2D scene """ # ArUco aoi must be defined assert(self.aruco_aoi) # AOI projection fails when no marker is detected if len(detected_markers) == 0: raise SceneProjectionFailed('No marker detected') aruco_aoi_scene = {} for aruco_aoi_name, aoi in self.aruco_aoi.items(): # Each aoi's corner is defined by a marker's corner aoi_corners = [] for corner in ["upper_left_corner", "upper_right_corner", "lower_right_corner", "lower_left_corner"]: marker_identifier = aoi[corner]["marker_identifier"] try: aoi_corners.append(detected_markers[marker_identifier].corners[0][aoi[corner]["marker_corner_index"]]) except Exception as e: raise SceneProjectionFailed(f'Missing marker #{e} to build ArUco AOI scene') aruco_aoi_scene[aruco_aoi_name] = AOIFeatures.AreaOfInterest(aoi_corners) # Then each inner aoi is projected from the current aruco aoi for inner_aoi_name, inner_aoi in self.aoi_3d_scene.items(): if aruco_aoi_name != inner_aoi_name: aoi_corners = [numpy.array(aruco_aoi_scene[aruco_aoi_name].outter_axis(inner)) for inner in self.__orthogonal_projection_cache[inner_aoi_name]] aruco_aoi_scene[inner_aoi_name] = AOIFeatures.AreaOfInterest(aoi_corners) return AOI2DScene.AOI2DScene(aruco_aoi_scene) def draw_axis(self, image: numpy.array): """ Draw scene axis into image. Parameters: image: where to draw """ self.aruco_scene.draw_axis(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D) def draw_places(self, image: numpy.array): """ Draw scene places into image. Parameters: image: where to draw """ self.aruco_scene.draw_places(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D) @dataclass class ArEnvironment(): """ Define Augmented Reality environment based on ArUco marker detection. Parameters: name: environment name aruco_detector: ArUco marker detector camera_frame: where to project scenes scenes: all environment scenes """ name: str aruco_detector: ArUcoDetector.ArUcoDetector = field(default_factory=ArUcoDetector.ArUcoDetector) camera_frame: ArFrame = field(default_factory=ArFrame) scenes: dict = field(default_factory=dict) def __post_init__(self): # Setup camera frame parent attribute if self.camera_frame != None: self.camera_frame.parent = self # Setup scenes environment attribute for name, scene in self.scenes.items(): scene.environment = self # Init a lock to share AOI scene projections into camera frame between multiple threads self.__camera_frame_lock = threading.Lock() # Define public timestamp buffer to store ignored gaze positions self.ignored_gaze_positions = GazeFeatures.TimeStampedGazePositions() @classmethod def from_dict(self, environment_data, working_directory: str = None) -> ArEnvironmentType: new_environment_name = environment_data.pop('name') try: new_detector_data = environment_data.pop('aruco_detector') new_aruco_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(**new_detector_data.pop('dictionary')) new_marker_size = new_detector_data.pop('marker_size') # Check optic_parameters value type optic_parameters_value = new_detector_data.pop('optic_parameters') # str: relative path to .json file if type(optic_parameters_value) == str: optic_parameters_value = os.path.join(working_directory, optic_parameters_value) new_optic_parameters = ArUcoOpticCalibrator.OpticParameters.from_json(optic_parameters_value) # dict: else: new_optic_parameters = ArUcoOpticCalibrator.OpticParameters(**optic_parameters_value) # Check detector parameters value type detector_parameters_value = new_detector_data.pop('parameters') # str: relative path to .json file if type(detector_parameters_value) == str: detector_parameters_value = os.path.join(working_directory, detector_parameters_value) new_aruco_detector_parameters = ArUcoDetector.DetectorParameters.from_json(detector_parameters_value) # dict: else: new_aruco_detector_parameters = ArUcoDetector.DetectorParameters(**detector_parameters_value) new_aruco_detector = ArUcoDetector.ArUcoDetector(new_aruco_dictionary, new_marker_size, new_optic_parameters, new_aruco_detector_parameters) except KeyError: new_aruco_detector = None # Load camera frame as large as aruco dectector optic parameters try: camera_frame_data = environment_data.pop('camera_frame') # Create camera frame new_camera_frame = ArFrame.from_dict(camera_frame_data, working_directory) # Setup camera frame new_camera_frame.name = new_environment_name new_camera_frame.size = new_optic_parameters.dimensions new_camera_frame.background = numpy.zeros((new_optic_parameters.dimensions[1], new_optic_parameters.dimensions[0], 3)).astype(numpy.uint8) except KeyError: new_camera_frame = None # Build scenes new_scenes = {} for new_scene_name, scene_data in environment_data.pop('scenes').items(): # Create new scene new_scene = ArScene.from_dict(scene_data, working_directory) # Setup new scene new_scene.name = new_scene_name # Append new scene new_scenes[new_scene_name] = new_scene # Setup expected aoi for camera frame aoi scan path if new_camera_frame != None: if new_camera_frame.aoi_scan_path != None: # List all environment aoi all_aoi_list = [] for scene_name, scene in new_scenes.items(): all_aoi_list.extend(list(scene.aoi_3d_scene.keys())) new_camera_frame.aoi_scan_path.expected_aois = all_aoi_list # Create new environment return ArEnvironment(new_environment_name, new_aruco_detector, new_camera_frame, new_scenes) @classmethod def from_json(self, json_filepath: str) -> ArEnvironmentType: """ Load ArEnvironment from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: environment_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return ArEnvironment.from_dict(environment_data, working_directory) def __str__(self) -> str: """ Returns: String representation """ output = f'Name:\n{self.name}\n' output += f'ArUcoDetector:\n{self.aruco_detector}\n' for name, scene in self.scenes.items(): output += f'\"{name}\" ArScene:\n{scene}\n' return output @property def image(self): """Get camera frame image""" # Can't use camera frame when it is locked if self.__camera_frame_lock.locked(): return # Lock camera frame exploitation self.__camera_frame_lock.acquire() # Get camera frame image image = self.camera_frame.image # Unlock camera frame exploitation self.__camera_frame_lock.release() return image @property def aoi_frames(self): """Iterate over all environment scenes aoi frames""" # For each scene for scene_name, scene in self.scenes.items(): # For each aoi frame for frame_name, aoi_frame in scene.aoi_frames.items(): yield aoi_frame def detect_and_project(self, image: numpy.array) -> Tuple[float, dict]: """Detect environment aruco markers from image and project scenes into camera frame. Returns: - detection_time: aruco marker detection time in ms - exceptions: dictionary with exception raised per scene """ # Detect aruco markers detection_time = self.aruco_detector.detect_markers(image) # Lock camera frame exploitation self.__camera_frame_lock.acquire() # Fill camera frame background with image self.camera_frame.background = image # Clear former scenes projection into camera frame self.camera_frame.aoi_2d_scene = AOI2DScene.AOI2DScene() # Store exceptions for each scene exceptions = {} # Project each aoi 3d scene into camera frame for scene_name, scene in self.scenes.items(): ''' TODO: Enable aruco_aoi processing if scene.aruco_aoi: try: # Build AOI scene directly from detected ArUco marker corners self.camera_frame.aoi_2d_scene |= scene.build_aruco_aoi_scene(self.aruco_detector.detected_markers) except SceneProjectionFailed: pass ''' try: # Estimate scene markers poses self.aruco_detector.estimate_markers_pose(scene.aruco_scene.identifiers) # Estimate scene pose from detected scene markers tvec, rmat, _, _ = scene.estimate_pose(self.aruco_detector.detected_markers) # Project scene into camera frame according estimated pose self.camera_frame.aoi_2d_scene |= scene.project(tvec, rmat) # Store exceptions and continue except Exception as e: exceptions[scene_name] = e # Unlock camera frame exploitation self.__camera_frame_lock.release() # Return dection time and exceptions return detection_time, exceptions def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition): """Project timestamped gaze position into each frame.""" # Can't use camera frame when it is locked if self.__camera_frame_lock.locked(): # TODO: Store ignored timestamped gaze positions for further projections # PB: This would imply to also store frame projections !!! self.ignored_gaze_positions[timestamp] = gaze_position return # Lock camera frame exploitation self.__camera_frame_lock.acquire() # Project gaze position into camera frame yield self.camera_frame, self.camera_frame.look(timestamp, gaze_position) # Project gaze position into each aoi frames if possible for aoi_frame in self.aoi_frames: # Is aoi frame projected into camera frame ? try: aoi_2d = self.camera_frame.aoi_2d_scene[aoi_frame.name] # TODO: Add option to use gaze precision circle if aoi_2d.contains_point(gaze_position.value): inner_x, inner_y = aoi_2d.clockwise().inner_axis(gaze_position.value) # QUESTION: How to project gaze precision? inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) yield aoi_frame, aoi_frame.look(timestamp, inner_gaze_position * aoi_frame.size) # Ignore missing aoi frame projection except KeyError: pass # Unlock camera frame exploitation self.__camera_frame_lock.release() def to_json(self, json_filepath): """Save environment to .json file.""" with open(json_filepath, 'w', encoding='utf-8') as file: json.dump(self, file, ensure_ascii=False, indent=4, cls=DataStructures.JsonEncoder) def draw(self, image: numpy.array): """Draw ArUco detection visualisation and camera frame projections.""" # Draw detected markers self.aruco_detector.draw_detected_markers(image) # Can't use camera frame when it is locked if self.__camera_frame_lock.locked(): return # Lock camera frame exploitation self.__camera_frame_lock.acquire() # Draw camera frame self.camera_frame.draw(image) # Unlock camera frame exploitation self.__camera_frame_lock.release()