From 5ab2a85ab3eb5d5f425fb34de4cd02d12793d17e Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 5 Jul 2023 16:44:11 +0200 Subject: Moving camera frame at environment level. --- src/argaze/ArFeatures.py | 974 ++++++++++++++++++++++++++--------------------- 1 file changed, 533 insertions(+), 441 deletions(-) (limited to 'src') diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index a4601bb..94ae4af 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -41,463 +41,237 @@ class EnvironmentJSONLoadingFailed(Exception): super().__init__(message) -@dataclass -class ArEnvironment(): +class PoseEstimationFailed(Exception): """ - Define Augmented Reality environment based on ArUco marker detection. - - Parameters: - name: Environment name - aruco_detector: ArUco detector - scenes: All environment scenes + Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. """ - name: str - aruco_detector: ArUcoDetector.ArUcoDetector = field(default_factory=ArUcoDetector.ArUcoDetector) - scenes: dict = field(default_factory=dict) - - def __post_init__(self): - - # Setup scenes environment after environment creation - for name, scene in self.scenes.items(): - scene._environment = self - - # Init AOI scene projections - self.__camera_frames = {} - - # Init a lock to share AOI scene projections between multiple threads - self.__camera_frames_lock = threading.Lock() - - @classmethod - def from_json(self, json_filepath: str) -> ArSceneType: - """ - Load ArEnvironment from .json file. - - Parameters: - json_filepath: path to json file - """ - - with open(json_filepath) as configuration_file: - - data = json.load(configuration_file) - working_directory = os.path.dirname(json_filepath) - - new_name = data.pop('name') - - try: - new_detector_data = data.pop('aruco_detector') - - new_aruco_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(**new_detector_data.pop('dictionary')) - new_marker_size = new_detector_data.pop('marker_size') - - # Check optic_parameters value type - optic_parameters_value = new_detector_data.pop('optic_parameters') - - # str: relative path to .json file - if type(optic_parameters_value) == str: - - optic_parameters_value = os.path.join(working_directory, optic_parameters_value) - new_optic_parameters = ArUcoOpticCalibrator.OpticParameters.from_json(optic_parameters_value) - - # dict: - else: - - new_optic_parameters = ArUcoOpticCalibrator.OpticParameters(**optic_parameters_value) - - # Check detector parameters value type - detector_parameters_value = new_detector_data.pop('parameters') - - # str: relative path to .json file - if type(detector_parameters_value) == str: - - detector_parameters_value = os.path.join(working_directory, detector_parameters_value) - new_aruco_detector_parameters = ArUcoDetector.DetectorParameters.from_json(detector_parameters_value) - - # dict: - else: - - new_aruco_detector_parameters = ArUcoDetector.DetectorParameters(**detector_parameters_value) - - new_aruco_detector = ArUcoDetector.ArUcoDetector(new_aruco_dictionary, new_marker_size, new_optic_parameters, new_aruco_detector_parameters) - - except KeyError: - - new_aruco_detector = None - - # Build scenes - new_scenes = {} - for new_scene_name, scene_data in data.pop('scenes').items(): - - new_aruco_scene = None - new_aoi_scene = None - - try: - - # Check aruco_scene value type - aruco_scene_value = scene_data.pop('aruco_scene') - - # str: relative path to .obj file - if type(aruco_scene_value) == str: - - aruco_scene_value = os.path.join(working_directory, aruco_scene_value) - new_aruco_scene = ArUcoScene.ArUcoScene.from_obj(aruco_scene_value) - - # dict: - else: - - new_aruco_scene = ArUcoScene.ArUcoScene(**aruco_scene_value) - - except KeyError: - - new_aruco_scene = None + def __init__(self, message, unconsistencies=None): - # Check aoi_3d_scene value type - aoi_3d_scene_value = scene_data.pop('aoi_3d_scene') + super().__init__(message) - # str: relative path to .obj file - if type(aoi_3d_scene_value) == str: + self.unconsistencies = unconsistencies - obj_filepath = os.path.join(working_directory, aoi_3d_scene_value) - new_aoi_3d_scene = AOI3DScene.AOI3DScene.from_obj(obj_filepath) +class SceneProjectionFailed(Exception): + """ + Exception raised by ArEnvironment detect_and_project method when the scene can't be projected. + """ - # dict: - else: + def __init__(self, message): - new_aoi_3d_scene = AOI3DScene.AOI3DScene(aoi_3d_scene_value) + super().__init__(message) - # Define frame data processor - def frame_data_processor(frame_data, force_frame_size: list = []) -> ArFrame: +@dataclass +class ArFrame(): + """ + Define Augmented Reality frame as an AOI2DScene made from a projected then reframed parent AOI3DScene. - if len(force_frame_size) == 2: + Parameters: + name: name of the frame + size: frame dimension in pixel. + background: image to draw behind + aoi_2d_scene: AOI 2D scene description ... : see [orthogonal_projection][argaze.ArFeatures.ArScene.orthogonal_projection] and [reframe][argaze.AreaOfInterest.AOI2DScene.reframe] functions. + ... + """ - new_frame_size = force_frame_size + name: str + size: tuple[int] = field(default=(1, 1)) + background: numpy.array = field(default_factory=numpy.array) + aoi_2d_scene: AOI2DScene.AOI2DScene = field(init=False, default_factory=AOI2DScene.AOI2DScene) + gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier) + scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath) + scan_path_analyzers: dict = field(default_factory=dict) + aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) + aoi_scan_path_analyzers: dict = field(default_factory=dict) + heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap) - else: + def __post_init__(self): - new_frame_size = frame_data.pop('size') + # Define parent attribute: it will be setup by parent later + self.__parent = None - # Load background image - try: + # Init current gaze position + self.__gaze_position = GazeFeatures.UnvalidGazePosition() - new_frame_background_value = frame_data.pop('background') - new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) - new_frame_background = cv2.resize(new_frame_background, dsize=(new_frame_size[0], new_frame_size[1]), interpolation=cv2.INTER_CUBIC) + # Init current gaze movement + self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() - except KeyError: + # Init current look at aoi + self.__look_at = None - new_frame_background = numpy.zeros((new_frame_size[1], new_frame_size[0], 3)).astype(numpy.uint8) + # Init heatmap if required + if self.heatmap: - # Load gaze movement identifier - try: + self.heatmap.init() - gaze_movement_identifier_value = frame_data.pop('gaze_movement_identifier') + # Init lock to share looked data wit hmultiples threads + self.__look_lock = threading.Lock() - gaze_movement_identifier_type, gaze_movement_identifier_parameters = gaze_movement_identifier_value.popitem() + @classmethod + def from_dict(self, frame_data, working_directory: str = None) -> ArFrameType: - gaze_movement_identifier_module = importlib.import_module(f'argaze.GazeAnalysis.{gaze_movement_identifier_type}') - new_gaze_movement_identifier = gaze_movement_identifier_module.GazeMovementIdentifier(**gaze_movement_identifier_parameters) + # Load name + try: - except KeyError: + new_frame_name = frame_data.pop('name') - new_gaze_movement_identifier = None + except KeyError: - # Load scan path analyzers - new_scan_path_analyzers = {} + new_frame_name = None - try: + # Load size + try: - new_scan_path_analyzers_value = frame_data.pop('scan_path_analyzers') + new_frame_size = frame_data.pop('size') - for scan_path_analyzer_type, scan_path_analyzer_parameters in new_scan_path_analyzers_value.items(): + except KeyError: - scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{scan_path_analyzer_type}') + new_frame_size = (0, 0) - # Check scan path analyzer parameters type - members = getmembers(scan_path_analyzer_module.ScanPathAnalyzer) + # Load background image + try: - for member in members: + new_frame_background_value = frame_data.pop('background') + new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value)) + new_frame_background = cv2.resize(new_frame_background, dsize=(new_frame_size[0], new_frame_size[1]), interpolation=cv2.INTER_CUBIC) - if '__annotations__' in member: + except KeyError: - for parameter, parameter_type in member[1].items(): + new_frame_background = numpy.zeros((new_frame_size[1], new_frame_size[0], 3)).astype(numpy.uint8) - # Check if parameter is part of argaze.GazeAnalysis module - parameter_module_path = parameter_type.__module__.split('.') + # Load gaze movement identifier + try: - if len(parameter_module_path) == 3: + gaze_movement_identifier_value = frame_data.pop('gaze_movement_identifier') - if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis': + gaze_movement_identifier_type, gaze_movement_identifier_parameters = gaze_movement_identifier_value.popitem() - # Try get existing analyzer instance to append as parameter - try: + gaze_movement_identifier_module = importlib.import_module(f'argaze.GazeAnalysis.{gaze_movement_identifier_type}') + new_gaze_movement_identifier = gaze_movement_identifier_module.GazeMovementIdentifier(**gaze_movement_identifier_parameters) - scan_path_analyzer_parameters[parameter] = new_scan_path_analyzers[parameter_module_path[2]] + except KeyError: - except KeyError: + new_gaze_movement_identifier = None - raise EnvironmentJSONLoadingFailed(f'{scan_path_analyzer_type} scan path analyzer loading fails because {parameter_module_path[2]} scan path analyzer is missing.') + # Load scan path analyzers + new_scan_path_analyzers = {} - scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters) + try: - new_scan_path_analyzers[scan_path_analyzer_type] = scan_path_analyzer + new_scan_path_analyzers_value = frame_data.pop('scan_path_analyzers') - except KeyError: + for scan_path_analyzer_type, scan_path_analyzer_parameters in new_scan_path_analyzers_value.items(): - pass - - # Load AOI scan path analyzers - new_aoi_scan_path_analyzers = {} + scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{scan_path_analyzer_type}') - try: + # Check scan path analyzer parameters type + members = getmembers(scan_path_analyzer_module.ScanPathAnalyzer) - new_aoi_scan_path_analyzers_value = frame_data.pop('aoi_scan_path_analyzers') + for member in members: - for aoi_scan_path_analyzer_type, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items(): + if '__annotations__' in member: - aoi_scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_type}') + for parameter, parameter_type in member[1].items(): - # Check aoi scan path analyzer parameters type - members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer) + # Check if parameter is part of argaze.GazeAnalysis module + parameter_module_path = parameter_type.__module__.split('.') - for member in members: + if len(parameter_module_path) == 3: - if '__annotations__' in member: + if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis': - for parameter, parameter_type in member[1].items(): + # Try get existing analyzer instance to append as parameter + try: - # Check if parameter is part of argaze.GazeAnalysis module - parameter_module_path = parameter_type.__module__.split('.') + scan_path_analyzer_parameters[parameter] = new_scan_path_analyzers[parameter_module_path[2]] - if len(parameter_module_path) == 3: + except KeyError: - if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis': + raise EnvironmentJSONLoadingFailed(f'{scan_path_analyzer_type} scan path analyzer loading fails because {parameter_module_path[2]} scan path analyzer is missing.') - # Try get existing analyzer instance to append as parameter - try: + scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters) - aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_module_path[2]] + new_scan_path_analyzers[scan_path_analyzer_type] = scan_path_analyzer - except KeyError: + except KeyError: - raise EnvironmentJSONLoadingFailed(f'{aoi_scan_path_analyzer_type} aoi scan path analyzer loading fails because {parameter_module_path[2]} aoi scan path analyzer is missing.') + pass + + # Load AOI scan path analyzers + new_aoi_scan_path_analyzers = {} - aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters) + try: - new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_type] = aoi_scan_path_analyzer + new_aoi_scan_path_analyzers_value = frame_data.pop('aoi_scan_path_analyzers') - except KeyError: + for aoi_scan_path_analyzer_type, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items(): - pass + aoi_scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_type}') - return new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers + # Check aoi scan path analyzer parameters type + members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer) - # Load camera frame as large as aruco dectector optic parameters - try: + for member in members: - camera_frame_data = scene_data.pop('camera_frame') - new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers = frame_data_processor(camera_frame_data, force_frame_size=new_optic_parameters.dimensions) - new_camera_frame = ArFrame.from_scene(new_aoi_3d_scene, new_scene_name, new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers, **camera_frame_data) + if '__annotations__' in member: - except KeyError: + for parameter, parameter_type in member[1].items(): - new_camera_frame = None + # Check if parameter is part of argaze.GazeAnalysis module + parameter_module_path = parameter_type.__module__.split('.') - # Load AOI frames - new_aoi_frames = {} + if len(parameter_module_path) == 3: - try: + if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis': - for aoi_name, aoi_frame_data in scene_data.pop('aoi_frames').items(): + # Try get existing analyzer instance to append as parameter + try: - new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers = frame_data_processor(aoi_frame_data) + aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_module_path[2]] - # Append new AOI frame - new_aoi_frames[aoi_name] = ArFrame.from_scene(new_aoi_3d_scene, aoi_name, new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers, **aoi_frame_data) + except KeyError: - except KeyError: + raise EnvironmentJSONLoadingFailed(f'{aoi_scan_path_analyzer_type} aoi scan path analyzer loading fails because {parameter_module_path[2]} aoi scan path analyzer is missing.') - pass + aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters) - # Append new scene - new_scenes[new_scene_name] = ArScene(new_scene_name, new_aruco_scene, new_aoi_3d_scene, new_camera_frame, new_aoi_frames, **scene_data) + new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_type] = aoi_scan_path_analyzer - return ArEnvironment(new_name, new_aruco_detector, new_scenes) + except KeyError: - def __str__(self) -> str: - """ - Returns: - String representation - """ + pass - output = f'Name:\n{self.name}\n' - output += f'ArUcoDetector:\n{self.aruco_detector}\n' + # Load heatmap + try: - for name, scene in self.scenes.items(): - output += f'\"{name}\" ArScene:\n{scene}\n' + new_heatmap_value = frame_data.pop('heatmap') - return output + except KeyError: + new_heatmap_value = False + + # Create frame + return ArFrame(new_frame_name, \ + new_frame_size, \ + new_frame_background, \ + new_gaze_movement_identifier, \ + GazeFeatures.ScanPath() if len(new_scan_path_analyzers) > 0 else None, \ + new_scan_path_analyzers, \ + GazeFeatures.AOIScanPath() if len(new_aoi_scan_path_analyzers) > 0 else None, \ + new_aoi_scan_path_analyzers, \ + AOIFeatures.Heatmap(new_frame_size) if new_heatmap_value else None \ + ) @property - def frames(self): - """Iterate over all environment frames""" + def parent(self): + """Get parent instance""" - # For each scene - for scene_name, scene in self.scenes.items(): + return self.__parent - # For each aoi frame - for frame_name, frame in scene.aoi_frames.items(): + @parent.setter + def parent(self, parent): + """Get parent instance""" - yield scene_name, frame_name, frame - - def detect_and_project(self, image: numpy.array) -> int: - """Detect environment aruco markers from image and project scenes into camera frame. - - Returns: - - time: marker detection time in ms - """ - - # Detect aruco markers - detection_time = self.aruco_detector.detect_markers(image) - - # Project each aoi 3d scene into camera frame - for scene_name, scene in self.scenes.items(): - - if scene.aruco_aoi: - - # Build AOI scene directly from detected ArUco marker corners - scene.build_aruco_aoi_scene(self.aruco_detector.detected_markers) - - else: - - # Estimate scene markers poses - self.aruco_detector.estimate_markers_pose(scene.aruco_scene.identifiers) - - # Clear scene projection - scene.clear() - - # Estimate scene pose from detected scene markers - tvec, rmat, _, _ = scene.estimate_pose(self.aruco_detector.detected_markers) - - # Project scene into camera frame according estimated pose - scene.project(tvec, rmat) - - return detection_time - - def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition): - """Project timestamped gaze position into each scene.""" - - # For each aoi scene projection - for scene_name, scene in self.scenes.items(): - - yield scene_name, scene.look(timestamp, gaze_position) - - def to_json(self, json_filepath): - """Save environment to .json file.""" - - with open(json_filepath, 'w', encoding='utf-8') as file: - - json.dump(self, file, ensure_ascii=False, indent=4, cls=DataStructures.JsonEncoder) - - def draw(self, image: numpy.array): - """Draw ArUco detection visualisation and scenes projections.""" - - # Draw detected markers - self.aruco_detector.draw_detected_markers(image) - - # Draw each scene - for scene_name, scene in self.scenes.items(): - - scene.draw(image) - -class PoseEstimationFailed(Exception): - """ - Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. - """ - - def __init__(self, message, unconsistencies=None): - - super().__init__(message) - - self.unconsistencies = unconsistencies - -class SceneProjectionFailed(Exception): - """ - Exception raised by ArScene project method when the scene can't be projected. - """ - - def __init__(self, message): - - super().__init__(message) - -@dataclass -class ArFrame(): - """ - Define Augmented Reality frame as an AOI2DScene made from a projected then reframed parent AOI3DScene. - - Parameters: - name: name of the frame - size: frame dimension in pixel. - background: image to draw behind - aoi_2d_scene: AOI 2D scene description ... : see [orthogonal_projection][argaze.ArFeatures.ArScene.orthogonal_projection] and [reframe][argaze.AreaOfInterest.AOI2DScene.reframe] functions. - ... - """ - - name: str - size: tuple[int] = field(default=(1, 1)) - background: numpy.array = field(default_factory=numpy.array) - aoi_2d_scene: AOI2DScene.AOI2DScene = field(default_factory=AOI2DScene.AOI2DScene) - gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier) - scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath) - scan_path_analyzers: dict = field(default_factory=dict) - aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) - aoi_scan_path_analyzers: dict = field(default_factory=dict) - heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap) - - def __post_init__(self): - - # Define scene attribute: it will be setup by parent scene later - self._scene = None - - # Init current gaze position - self.__gaze_position = GazeFeatures.UnvalidGazePosition() - - # Init current gaze movement - self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() - - # Init current look at aoi - self.__look_at = None - - # Init heatmap if required - if self.heatmap: - - self.heatmap.init() - - # Init lock to share looked data wit hmultiples threads - self.__looking_lock = threading.Lock() - - @classmethod - def from_scene(self, aoi_3d_scene, aoi_name, size, background: numpy.array = numpy.empty((0, 0)), gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = None, scan_path_analyzers: list = [], aoi_scan_path_analyzers: list = [], heatmap: bool = False) -> ArFrameType: - - # If aoi_name is part of the scene - try: - - aoi_2d_scene = aoi_3d_scene.orthogonal_projection.reframe(aoi_name, size) - - except KeyError: - - aoi_2d_scene = aoi_3d_scene.orthogonal_projection - - return ArFrame(aoi_name, \ - size, \ - background, \ - aoi_2d_scene, \ - gaze_movement_identifier, \ - GazeFeatures.ScanPath() if len(scan_path_analyzers) > 0 else None, \ - scan_path_analyzers, \ - GazeFeatures.AOIScanPath(aoi_2d_scene.keys()) if len(aoi_scan_path_analyzers) > 0 else None, \ - aoi_scan_path_analyzers, \ - AOIFeatures.Heatmap(size) if heatmap else None \ - ) + self.__parent = parent @property def image(self): @@ -506,7 +280,7 @@ class ArFrame(): """ # Lock frame exploitation - self.__looking_lock.acquire() + self.__look_lock.acquire() image = self.background.copy() @@ -516,7 +290,7 @@ class ArFrame(): image = cv2.addWeighted(self.heatmap.image, 0.5, image, 1., 0) # Unlock frame exploitation - self.__looking_lock.release() + self.__look_lock.release() return image @@ -532,7 +306,7 @@ class ArFrame(): """ # Lock frame exploitation - self.__looking_lock.acquire() + self.__look_lock.acquire() # Update current gaze position self.__gaze_position = inner_gaze_position @@ -619,9 +393,9 @@ class ArFrame(): self.heatmap.update(self.__gaze_position.value, sigma=0.05) # Unlock frame exploitation - self.__looking_lock.release() + self.__look_lock.release() - # Return looking data + # Return look data return new_gaze_movement, self.__look_at, scan_step_analysis, aoi_scan_step_analysis def draw(self, image:numpy.array): @@ -633,7 +407,7 @@ class ArFrame(): """ # Lock frame exploitation - self.__looking_lock.acquire() + self.__look_lock.acquire() # Draw aoi self.aoi_2d_scene.draw(image, color=(0, 0, 0)) @@ -654,7 +428,7 @@ class ArFrame(): self.aoi_2d_scene.draw_circlecast(image, self.__gaze_movement.focus, self.__gaze_movement.deviation_max, base_color=(0, 0, 0), matching_color=(255, 255, 255)) # Unlock frame exploitation - self.__looking_lock.release() + self.__look_lock.release() @dataclass class ArScene(): @@ -669,8 +443,6 @@ class ArScene(): aoi_3d_scene: AOI 3D scene description that will be projected onto estimated scene once its pose will be estimated : see [project][argaze.ArFeatures.ArScene.project] function below. - camera_frame: Where AOI 3D scene will be projected - aoi_frames: Optional dictionary to define AOI as ArFrame. aruco_axis: Optional dictionary to define orthogonal axis where each axis is defined by list of 3 markers identifier (first is origin). \ @@ -685,7 +457,6 @@ class ArScene(): name: str aruco_scene: ArUcoScene.ArUcoScene = field(default_factory=ArUcoScene.ArUcoScene) aoi_3d_scene: AOI3DScene.AOI3DScene = field(default_factory=AOI3DScene.AOI3DScene) - camera_frame: ArFrame = field(default_factory=ArFrame) aoi_frames: dict = field(default_factory=dict) aruco_axis: dict = field(default_factory=dict) aruco_aoi: dict = field(default_factory=dict) @@ -695,17 +466,15 @@ class ArScene(): def __post_init__(self): # Define environment attribute: it will be setup by parent environment later - self._environment = None + self.__environment = None # Preprocess orthogonal projection to speed up further aruco aoi processings self.__orthogonal_projection_cache = self.aoi_3d_scene.orthogonal_projection - # Setup ArFrame scene attribute after ArFrame creation + # Setup aoi frame parent attribute for aoi_name, frame in self.aoi_frames.items(): - frame._scene = self - # Init lock to share camera frame with multiples threads - self.__camera_frame_lock = threading.Lock() + frame.parent = self def __str__(self) -> str: """ @@ -713,24 +482,102 @@ class ArScene(): String representation """ - output = f'ArEnvironment:\n{self._environment.name}\n' + output = f'ArEnvironment:\n{self.environment.name}\n' output += f'ArUcoScene:\n{self.aruco_scene}\n' output += f'AOI3DScene:\n{self.aoi_3d_scene}\n' return output - def clear(self): - """Clear scene projection.""" + @classmethod + def from_dict(self, scene_data, working_directory: str = None) -> ArSceneType: - # Lock camera frame exploitation - self.__camera_frame_lock.acquire() + # Load name + try: - # Update camera frame - self.camera_frame.aoi_2d_scene = AOI2DScene.AOI2DScene() + new_scene_name = scene_data.pop('name') - # Unlock camera frame exploitation - self.__camera_frame_lock.release() + except KeyError: + + new_scene_name = None + + # Load aruco scene + try: + + # Check aruco_scene value type + aruco_scene_value = scene_data.pop('aruco_scene') + + # str: relative path to .obj file + if type(aruco_scene_value) == str: + + aruco_scene_value = os.path.join(working_directory, aruco_scene_value) + new_aruco_scene = ArUcoScene.ArUcoScene.from_obj(aruco_scene_value) + + # dict: + else: + + new_aruco_scene = ArUcoScene.ArUcoScene(**aruco_scene_value) + + except KeyError: + + new_aruco_scene = None + + # Load aoi 3d scene + try: + + # Check aoi_3d_scene value type + aoi_3d_scene_value = scene_data.pop('aoi_3d_scene') + + # str: relative path to .obj file + if type(aoi_3d_scene_value) == str: + + obj_filepath = os.path.join(working_directory, aoi_3d_scene_value) + new_aoi_3d_scene = AOI3DScene.AOI3DScene.from_obj(obj_filepath) + + # dict: + else: + + new_aoi_3d_scene = AOI3DScene.AOI3DScene(aoi_3d_scene_value) + + except KeyError: + + new_aoi_3d_scene = None + + # Load aoi frames + new_aoi_frames = {} + + try: + + for aoi_name, aoi_frame_data in scene_data.pop('aoi_frames').items(): + + # Create aoi frame + new_aoi_frame = ArFrame.from_dict(aoi_frame_data, working_directory) + + # Setup aoi frame + new_aoi_frame.name = aoi_name + new_aoi_frame.aoi_2d_scene = new_aoi_3d_scene.orthogonal_projection.reframe(aoi_name, new_aoi_frame.size) + new_aoi_frame.aoi_scan_path.expected_aois = list(new_aoi_3d_scene.keys()) + + # Append new aoi frame + new_aoi_frames[aoi_name] = new_aoi_frame + + except KeyError: + + pass + + return ArScene(new_scene_name, new_aruco_scene, new_aoi_3d_scene, new_aoi_frames, **scene_data) + + @property + def environment(self): + """Get parent environment instance""" + + return self.__environment + @environment.setter + def environment(self, environment): + """Set parent environment instance""" + + self.__environment = environment + def estimate_pose(self, detected_markers) -> Tuple[numpy.array, numpy.array, str, dict]: """Estimate scene pose from detected ArUco markers. @@ -794,13 +641,16 @@ class ArScene(): return tvec, rmat, 'estimate_pose_from_markers', consistent_markers - def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> ArFrame: + def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> AOI2DScene.AOI2DScene: """Project AOI scene according estimated pose and optional horizontal field of view clipping angle. Parameters: tvec: translation vector rvec: rotation vector visual_hfov: horizontal field of view clipping angle + + Returns: + aoi_2d_scene: scene projection """ # Clip AOI out of the visual horizontal field of view (optional) @@ -822,26 +672,14 @@ class ArScene(): aoi_3d_scene_copy = self.aoi_3d_scene.copy() - # Lock camera frame exploitation - self.__camera_frame_lock.acquire() - - # Update camera frame - self.camera_frame.aoi_2d_scene = aoi_3d_scene_copy.project(tvec, rvec, self._environment.aruco_detector.optic_parameters.K) - - # Unlock camera frame exploitation - self.__camera_frame_lock.release() - - # Warn user when the projected scene is empty - if len(self.camera_frame.aoi_2d_scene) == 0: - - raise SceneProjectionFailed('AOI projection is empty') + return aoi_3d_scene_copy.project(tvec, rvec, self.environment.aruco_detector.optic_parameters.K) def build_aruco_aoi_scene(self, detected_markers) -> AOI2DScene.AOI2DScene: """ Build AOI scene from detected ArUco markers as defined in aruco_aoi dictionary. Returns: - built AOI 2D scene + aoi_2d_scene: built AOI 2D scene """ # ArUco aoi must be defined @@ -880,17 +718,275 @@ class ArScene(): aoi_corners = [numpy.array(aruco_aoi_scene[aruco_aoi_name].outter_axis(inner)) for inner in self.__orthogonal_projection_cache[inner_aoi_name]] aruco_aoi_scene[inner_aoi_name] = AOIFeatures.AreaOfInterest(aoi_corners) + return AOI2DScene.AOI2DScene(aruco_aoi_scene) + + def draw_axis(self, image: numpy.array): + """ + Draw scene axis into image. + + Parameters: + image: where to draw + """ + + self.aruco_scene.draw_axis(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D) + + def draw_places(self, image: numpy.array): + """ + Draw scene places into image. + + Parameters: + image: where to draw + """ + + self.aruco_scene.draw_places(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D) + +@dataclass +class ArEnvironment(): + """ + Define Augmented Reality environment based on ArUco marker detection. + + Parameters: + name: environment name + aruco_detector: ArUco marker detector + camera_frame: where to project scenes + scenes: all environment scenes + """ + + name: str + aruco_detector: ArUcoDetector.ArUcoDetector = field(default_factory=ArUcoDetector.ArUcoDetector) + camera_frame: ArFrame = field(default_factory=ArFrame) + scenes: dict = field(default_factory=dict) + + def __post_init__(self): + + # Setup camera frame parent attribute + if self.camera_frame != None: + + self.camera_frame.parent = self + + # Setup scenes environment attribute + for name, scene in self.scenes.items(): + + scene.environment = self + + # Init a lock to share AOI scene projections into camera frame between multiple threads + self.__camera_frame_lock = threading.Lock() + + @classmethod + def from_dict(self, environment_data, working_directory: str = None) -> ArEnvironmentType: + + new_environment_name = environment_data.pop('name') + + try: + new_detector_data = environment_data.pop('aruco_detector') + + new_aruco_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(**new_detector_data.pop('dictionary')) + new_marker_size = new_detector_data.pop('marker_size') + + # Check optic_parameters value type + optic_parameters_value = new_detector_data.pop('optic_parameters') + + # str: relative path to .json file + if type(optic_parameters_value) == str: + + optic_parameters_value = os.path.join(working_directory, optic_parameters_value) + new_optic_parameters = ArUcoOpticCalibrator.OpticParameters.from_json(optic_parameters_value) + + # dict: + else: + + new_optic_parameters = ArUcoOpticCalibrator.OpticParameters(**optic_parameters_value) + + # Check detector parameters value type + detector_parameters_value = new_detector_data.pop('parameters') + + # str: relative path to .json file + if type(detector_parameters_value) == str: + + detector_parameters_value = os.path.join(working_directory, detector_parameters_value) + new_aruco_detector_parameters = ArUcoDetector.DetectorParameters.from_json(detector_parameters_value) + + # dict: + else: + + new_aruco_detector_parameters = ArUcoDetector.DetectorParameters(**detector_parameters_value) + + new_aruco_detector = ArUcoDetector.ArUcoDetector(new_aruco_dictionary, new_marker_size, new_optic_parameters, new_aruco_detector_parameters) + + except KeyError: + + new_aruco_detector = None + + # Load camera frame as large as aruco dectector optic parameters + try: + + camera_frame_data = environment_data.pop('camera_frame') + + # Create camera frame + new_camera_frame = ArFrame.from_dict(camera_frame_data, working_directory) + + # Setup camera frame + new_camera_frame.name = new_environment_name + new_camera_frame.size = new_optic_parameters.dimensions + new_camera_frame.background = numpy.zeros((new_optic_parameters.dimensions[1], new_optic_parameters.dimensions[0], 3)).astype(numpy.uint8) + + except KeyError: + + new_camera_frame = None + + # Build scenes + new_scenes = {} + for new_scene_name, scene_data in environment_data.pop('scenes').items(): + + # Create new scene + new_scene = ArScene.from_dict(scene_data, working_directory) + + # Setup new scene + new_scene.name = new_scene_name + + # Append new scene + new_scenes[new_scene_name] = new_scene + + # Setup expected aoi for camera frame aoi scan path + if new_camera_frame != None: + + if new_camera_frame.aoi_scan_path != None: + + # List all environment aoi + all_aoi_list = [] + for scene_name, scene in new_scenes.items(): + + all_aoi_list.extend(list(scene.aoi_3d_scene.keys())) + + new_camera_frame.aoi_scan_path.expected_aois = all_aoi_list + + # Create new environment + return ArEnvironment(new_environment_name, new_aruco_detector, new_camera_frame, new_scenes) + + @classmethod + def from_json(self, json_filepath: str) -> ArEnvironmentType: + """ + Load ArEnvironment from .json file. + + Parameters: + json_filepath: path to json file + """ + + with open(json_filepath) as configuration_file: + + environment_data = json.load(configuration_file) + working_directory = os.path.dirname(json_filepath) + + return ArEnvironment.from_dict(environment_data, working_directory) + + def __str__(self) -> str: + """ + Returns: + String representation + """ + + output = f'Name:\n{self.name}\n' + output += f'ArUcoDetector:\n{self.aruco_detector}\n' + + for name, scene in self.scenes.items(): + output += f'\"{name}\" ArScene:\n{scene}\n' + + return output + + @property + def image(self): + """Get camera frame image""" + + # Can't use camera frame when it is locked + if self.__camera_frame_lock.locked(): + return + # Lock camera frame exploitation self.__camera_frame_lock.acquire() - # Update camera frame - self.camera_frame.aoi_2d_scene = AOI2DScene.AOI2DScene(aruco_aoi_scene) + # Get camera frame image + image = self.camera_frame.image # Unlock camera frame exploitation self.__camera_frame_lock.release() + return image + + @property + def aoi_frames(self): + """Iterate over all environment scenes aoi frames""" + + # For each scene + for scene_name, scene in self.scenes.items(): + + # For each aoi frame + for frame_name, aoi_frame in scene.aoi_frames.items(): + + yield aoi_frame + + def detect_and_project(self, image: numpy.array) -> int: + """Detect environment aruco markers from image and project scenes into camera frame. + + Returns: + - detection_time: aruco marker detection time in ms + - exceptions: dictionary with exception raised per scene + """ + + # Detect aruco markers + detection_time = self.aruco_detector.detect_markers(image) + + # Lock camera frame exploitation + self.__camera_frame_lock.acquire() + + # Fill camera frame background with image + self.camera_frame.background = image + + # Clear former scenes projection into camera frame + self.camera_frame.aoi_2d_scene = AOI2DScene.AOI2DScene() + + # Store exceptions for each scene + exceptions = {} + + # Project each aoi 3d scene into camera frame + for scene_name, scene in self.scenes.items(): + + ''' TODO: Enable aruco_aoi processing + if scene.aruco_aoi: + + try: + + # Build AOI scene directly from detected ArUco marker corners + self.camera_frame.aoi_2d_scene |= scene.build_aruco_aoi_scene(self.aruco_detector.detected_markers) + + except SceneProjectionFailed: + + pass + ''' + + try: + + # Estimate scene markers poses + self.aruco_detector.estimate_markers_pose(scene.aruco_scene.identifiers) + + # Estimate scene pose from detected scene markers + tvec, rmat, _, _ = scene.estimate_pose(self.aruco_detector.detected_markers) + + # Project scene into camera frame according estimated pose + self.camera_frame.aoi_2d_scene |= scene.project(tvec, rmat) + + # Store exceptions and continue + except Exception as e: + + exceptions[scene_name] = e + + # Unlock camera frame exploitation + self.__camera_frame_lock.release() + + # Return dection time and exceptions + return detection_time, exceptions + def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition): - """Project timestamped gaze position into camera frame.""" + """Project timestamped gaze position into each scene.""" # Can't use camera frame when it is locked if self.__camera_frame_lock.locked(): @@ -902,16 +998,16 @@ class ArScene(): # Lock camera frame exploitation self.__camera_frame_lock.acquire() - # Project gaze position in camera frame - yield self.name, self.camera_frame.look(timestamp, gaze_position) + # Project gaze position into camera frame + yield self.camera_frame, self.camera_frame.look(timestamp, gaze_position) # Project gaze position into each aoi frames if possible - for aoi_name, frame in self.aoi_frames.items(): + for aoi_frame in self.aoi_frames: # Is aoi frame projected into camera frame ? try: - aoi_2d = self.camera_frame.aoi_2d_scene[frame.name] + aoi_2d = self.camera_frame.aoi_2d_scene[aoi_frame.name] # TODO: Add option to use gaze precision circle if aoi_2d.contains_point(gaze_position.value): @@ -921,7 +1017,7 @@ class ArScene(): # QUESTION: How to project gaze precision? inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) - yield aoi_name, frame.look(timestamp, inner_gaze_position * frame.size) + yield aoi_frame, aoi_frame.look(timestamp, inner_gaze_position * aoi_frame.size) # Ignore missing aoi frame projection except KeyError: @@ -931,32 +1027,28 @@ class ArScene(): # Unlock camera frame exploitation self.__camera_frame_lock.release() - def draw(self, image: numpy.array): - """ - Draw camera frame into image. + def to_json(self, json_filepath): + """Save environment to .json file.""" - Parameters: - image: where to draw - """ + with open(json_filepath, 'w', encoding='utf-8') as file: - self.camera_frame.draw(image) + json.dump(self, file, ensure_ascii=False, indent=4, cls=DataStructures.JsonEncoder) - def draw_axis(self, image: numpy.array): - """ - Draw scene axis into image. - - Parameters: - image: where to draw - """ + def draw(self, image: numpy.array): + """Draw ArUco detection visualisation and camera frame projections.""" - self.aruco_scene.draw_axis(image, self._environment.aruco_detector.optic_parameters.K, self._environment.aruco_detector.optic_parameters.D) + # Draw detected markers + self.aruco_detector.draw_detected_markers(image) - def draw_places(self, image: numpy.array): - """ - Draw scene places into image. + # Can't use camera frame when it is locked + if self.__camera_frame_lock.locked(): + return - Parameters: - image: where to draw - """ + # Lock camera frame exploitation + self.__camera_frame_lock.acquire() - self.aruco_scene.draw_places(image, self._environment.aruco_detector.optic_parameters.K, self._environment.aruco_detector.optic_parameters.D) + # Draw camera frame + self.camera_frame.draw(image) + + # Unlock camera frame exploitation + self.__camera_frame_lock.release() -- cgit v1.1