aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThéo de la Hogue2023-07-05 16:44:11 +0200
committerThéo de la Hogue2023-07-05 16:44:11 +0200
commit5ab2a85ab3eb5d5f425fb34de4cd02d12793d17e (patch)
treea8a84ea98b7dd1a2a524c9681b78538e7cf941a9 /src
parent2d2dc282face6dfdcc12bbf7fb31e8511ed074f4 (diff)
downloadargaze-5ab2a85ab3eb5d5f425fb34de4cd02d12793d17e.zip
argaze-5ab2a85ab3eb5d5f425fb34de4cd02d12793d17e.tar.gz
argaze-5ab2a85ab3eb5d5f425fb34de4cd02d12793d17e.tar.bz2
argaze-5ab2a85ab3eb5d5f425fb34de4cd02d12793d17e.tar.xz
Moving camera frame at environment level.
Diffstat (limited to 'src')
-rw-r--r--src/argaze/ArFeatures.py974
1 files changed, 533 insertions, 441 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index a4601bb..94ae4af 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -41,463 +41,237 @@ class EnvironmentJSONLoadingFailed(Exception):
super().__init__(message)
-@dataclass
-class ArEnvironment():
+class PoseEstimationFailed(Exception):
"""
- Define Augmented Reality environment based on ArUco marker detection.
-
- Parameters:
- name: Environment name
- aruco_detector: ArUco detector
- scenes: All environment scenes
+ Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies.
"""
- name: str
- aruco_detector: ArUcoDetector.ArUcoDetector = field(default_factory=ArUcoDetector.ArUcoDetector)
- scenes: dict = field(default_factory=dict)
-
- def __post_init__(self):
-
- # Setup scenes environment after environment creation
- for name, scene in self.scenes.items():
- scene._environment = self
-
- # Init AOI scene projections
- self.__camera_frames = {}
-
- # Init a lock to share AOI scene projections between multiple threads
- self.__camera_frames_lock = threading.Lock()
-
- @classmethod
- def from_json(self, json_filepath: str) -> ArSceneType:
- """
- Load ArEnvironment from .json file.
-
- Parameters:
- json_filepath: path to json file
- """
-
- with open(json_filepath) as configuration_file:
-
- data = json.load(configuration_file)
- working_directory = os.path.dirname(json_filepath)
-
- new_name = data.pop('name')
-
- try:
- new_detector_data = data.pop('aruco_detector')
-
- new_aruco_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(**new_detector_data.pop('dictionary'))
- new_marker_size = new_detector_data.pop('marker_size')
-
- # Check optic_parameters value type
- optic_parameters_value = new_detector_data.pop('optic_parameters')
-
- # str: relative path to .json file
- if type(optic_parameters_value) == str:
-
- optic_parameters_value = os.path.join(working_directory, optic_parameters_value)
- new_optic_parameters = ArUcoOpticCalibrator.OpticParameters.from_json(optic_parameters_value)
-
- # dict:
- else:
-
- new_optic_parameters = ArUcoOpticCalibrator.OpticParameters(**optic_parameters_value)
-
- # Check detector parameters value type
- detector_parameters_value = new_detector_data.pop('parameters')
-
- # str: relative path to .json file
- if type(detector_parameters_value) == str:
-
- detector_parameters_value = os.path.join(working_directory, detector_parameters_value)
- new_aruco_detector_parameters = ArUcoDetector.DetectorParameters.from_json(detector_parameters_value)
-
- # dict:
- else:
-
- new_aruco_detector_parameters = ArUcoDetector.DetectorParameters(**detector_parameters_value)
-
- new_aruco_detector = ArUcoDetector.ArUcoDetector(new_aruco_dictionary, new_marker_size, new_optic_parameters, new_aruco_detector_parameters)
-
- except KeyError:
-
- new_aruco_detector = None
-
- # Build scenes
- new_scenes = {}
- for new_scene_name, scene_data in data.pop('scenes').items():
-
- new_aruco_scene = None
- new_aoi_scene = None
-
- try:
-
- # Check aruco_scene value type
- aruco_scene_value = scene_data.pop('aruco_scene')
-
- # str: relative path to .obj file
- if type(aruco_scene_value) == str:
-
- aruco_scene_value = os.path.join(working_directory, aruco_scene_value)
- new_aruco_scene = ArUcoScene.ArUcoScene.from_obj(aruco_scene_value)
-
- # dict:
- else:
-
- new_aruco_scene = ArUcoScene.ArUcoScene(**aruco_scene_value)
-
- except KeyError:
-
- new_aruco_scene = None
+ def __init__(self, message, unconsistencies=None):
- # Check aoi_3d_scene value type
- aoi_3d_scene_value = scene_data.pop('aoi_3d_scene')
+ super().__init__(message)
- # str: relative path to .obj file
- if type(aoi_3d_scene_value) == str:
+ self.unconsistencies = unconsistencies
- obj_filepath = os.path.join(working_directory, aoi_3d_scene_value)
- new_aoi_3d_scene = AOI3DScene.AOI3DScene.from_obj(obj_filepath)
+class SceneProjectionFailed(Exception):
+ """
+ Exception raised by ArEnvironment detect_and_project method when the scene can't be projected.
+ """
- # dict:
- else:
+ def __init__(self, message):
- new_aoi_3d_scene = AOI3DScene.AOI3DScene(aoi_3d_scene_value)
+ super().__init__(message)
- # Define frame data processor
- def frame_data_processor(frame_data, force_frame_size: list = []) -> ArFrame:
+@dataclass
+class ArFrame():
+ """
+ Define Augmented Reality frame as an AOI2DScene made from a projected then reframed parent AOI3DScene.
- if len(force_frame_size) == 2:
+ Parameters:
+ name: name of the frame
+ size: frame dimension in pixel.
+ background: image to draw behind
+ aoi_2d_scene: AOI 2D scene description ... : see [orthogonal_projection][argaze.ArFeatures.ArScene.orthogonal_projection] and [reframe][argaze.AreaOfInterest.AOI2DScene.reframe] functions.
+ ...
+ """
- new_frame_size = force_frame_size
+ name: str
+ size: tuple[int] = field(default=(1, 1))
+ background: numpy.array = field(default_factory=numpy.array)
+ aoi_2d_scene: AOI2DScene.AOI2DScene = field(init=False, default_factory=AOI2DScene.AOI2DScene)
+ gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier)
+ scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath)
+ scan_path_analyzers: dict = field(default_factory=dict)
+ aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath)
+ aoi_scan_path_analyzers: dict = field(default_factory=dict)
+ heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap)
- else:
+ def __post_init__(self):
- new_frame_size = frame_data.pop('size')
+ # Define parent attribute: it will be setup by parent later
+ self.__parent = None
- # Load background image
- try:
+ # Init current gaze position
+ self.__gaze_position = GazeFeatures.UnvalidGazePosition()
- new_frame_background_value = frame_data.pop('background')
- new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value))
- new_frame_background = cv2.resize(new_frame_background, dsize=(new_frame_size[0], new_frame_size[1]), interpolation=cv2.INTER_CUBIC)
+ # Init current gaze movement
+ self.__gaze_movement = GazeFeatures.UnvalidGazeMovement()
- except KeyError:
+ # Init current look at aoi
+ self.__look_at = None
- new_frame_background = numpy.zeros((new_frame_size[1], new_frame_size[0], 3)).astype(numpy.uint8)
+ # Init heatmap if required
+ if self.heatmap:
- # Load gaze movement identifier
- try:
+ self.heatmap.init()
- gaze_movement_identifier_value = frame_data.pop('gaze_movement_identifier')
+ # Init lock to share looked data wit hmultiples threads
+ self.__look_lock = threading.Lock()
- gaze_movement_identifier_type, gaze_movement_identifier_parameters = gaze_movement_identifier_value.popitem()
+ @classmethod
+ def from_dict(self, frame_data, working_directory: str = None) -> ArFrameType:
- gaze_movement_identifier_module = importlib.import_module(f'argaze.GazeAnalysis.{gaze_movement_identifier_type}')
- new_gaze_movement_identifier = gaze_movement_identifier_module.GazeMovementIdentifier(**gaze_movement_identifier_parameters)
+ # Load name
+ try:
- except KeyError:
+ new_frame_name = frame_data.pop('name')
- new_gaze_movement_identifier = None
+ except KeyError:
- # Load scan path analyzers
- new_scan_path_analyzers = {}
+ new_frame_name = None
- try:
+ # Load size
+ try:
- new_scan_path_analyzers_value = frame_data.pop('scan_path_analyzers')
+ new_frame_size = frame_data.pop('size')
- for scan_path_analyzer_type, scan_path_analyzer_parameters in new_scan_path_analyzers_value.items():
+ except KeyError:
- scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{scan_path_analyzer_type}')
+ new_frame_size = (0, 0)
- # Check scan path analyzer parameters type
- members = getmembers(scan_path_analyzer_module.ScanPathAnalyzer)
+ # Load background image
+ try:
- for member in members:
+ new_frame_background_value = frame_data.pop('background')
+ new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value))
+ new_frame_background = cv2.resize(new_frame_background, dsize=(new_frame_size[0], new_frame_size[1]), interpolation=cv2.INTER_CUBIC)
- if '__annotations__' in member:
+ except KeyError:
- for parameter, parameter_type in member[1].items():
+ new_frame_background = numpy.zeros((new_frame_size[1], new_frame_size[0], 3)).astype(numpy.uint8)
- # Check if parameter is part of argaze.GazeAnalysis module
- parameter_module_path = parameter_type.__module__.split('.')
+ # Load gaze movement identifier
+ try:
- if len(parameter_module_path) == 3:
+ gaze_movement_identifier_value = frame_data.pop('gaze_movement_identifier')
- if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis':
+ gaze_movement_identifier_type, gaze_movement_identifier_parameters = gaze_movement_identifier_value.popitem()
- # Try get existing analyzer instance to append as parameter
- try:
+ gaze_movement_identifier_module = importlib.import_module(f'argaze.GazeAnalysis.{gaze_movement_identifier_type}')
+ new_gaze_movement_identifier = gaze_movement_identifier_module.GazeMovementIdentifier(**gaze_movement_identifier_parameters)
- scan_path_analyzer_parameters[parameter] = new_scan_path_analyzers[parameter_module_path[2]]
+ except KeyError:
- except KeyError:
+ new_gaze_movement_identifier = None
- raise EnvironmentJSONLoadingFailed(f'{scan_path_analyzer_type} scan path analyzer loading fails because {parameter_module_path[2]} scan path analyzer is missing.')
+ # Load scan path analyzers
+ new_scan_path_analyzers = {}
- scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters)
+ try:
- new_scan_path_analyzers[scan_path_analyzer_type] = scan_path_analyzer
+ new_scan_path_analyzers_value = frame_data.pop('scan_path_analyzers')
- except KeyError:
+ for scan_path_analyzer_type, scan_path_analyzer_parameters in new_scan_path_analyzers_value.items():
- pass
-
- # Load AOI scan path analyzers
- new_aoi_scan_path_analyzers = {}
+ scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{scan_path_analyzer_type}')
- try:
+ # Check scan path analyzer parameters type
+ members = getmembers(scan_path_analyzer_module.ScanPathAnalyzer)
- new_aoi_scan_path_analyzers_value = frame_data.pop('aoi_scan_path_analyzers')
+ for member in members:
- for aoi_scan_path_analyzer_type, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items():
+ if '__annotations__' in member:
- aoi_scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_type}')
+ for parameter, parameter_type in member[1].items():
- # Check aoi scan path analyzer parameters type
- members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer)
+ # Check if parameter is part of argaze.GazeAnalysis module
+ parameter_module_path = parameter_type.__module__.split('.')
- for member in members:
+ if len(parameter_module_path) == 3:
- if '__annotations__' in member:
+ if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis':
- for parameter, parameter_type in member[1].items():
+ # Try get existing analyzer instance to append as parameter
+ try:
- # Check if parameter is part of argaze.GazeAnalysis module
- parameter_module_path = parameter_type.__module__.split('.')
+ scan_path_analyzer_parameters[parameter] = new_scan_path_analyzers[parameter_module_path[2]]
- if len(parameter_module_path) == 3:
+ except KeyError:
- if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis':
+ raise EnvironmentJSONLoadingFailed(f'{scan_path_analyzer_type} scan path analyzer loading fails because {parameter_module_path[2]} scan path analyzer is missing.')
- # Try get existing analyzer instance to append as parameter
- try:
+ scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters)
- aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_module_path[2]]
+ new_scan_path_analyzers[scan_path_analyzer_type] = scan_path_analyzer
- except KeyError:
+ except KeyError:
- raise EnvironmentJSONLoadingFailed(f'{aoi_scan_path_analyzer_type} aoi scan path analyzer loading fails because {parameter_module_path[2]} aoi scan path analyzer is missing.')
+ pass
+
+ # Load AOI scan path analyzers
+ new_aoi_scan_path_analyzers = {}
- aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters)
+ try:
- new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_type] = aoi_scan_path_analyzer
+ new_aoi_scan_path_analyzers_value = frame_data.pop('aoi_scan_path_analyzers')
- except KeyError:
+ for aoi_scan_path_analyzer_type, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items():
- pass
+ aoi_scan_path_analyzer_module = importlib.import_module(f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_type}')
- return new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers
+ # Check aoi scan path analyzer parameters type
+ members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer)
- # Load camera frame as large as aruco dectector optic parameters
- try:
+ for member in members:
- camera_frame_data = scene_data.pop('camera_frame')
- new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers = frame_data_processor(camera_frame_data, force_frame_size=new_optic_parameters.dimensions)
- new_camera_frame = ArFrame.from_scene(new_aoi_3d_scene, new_scene_name, new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers, **camera_frame_data)
+ if '__annotations__' in member:
- except KeyError:
+ for parameter, parameter_type in member[1].items():
- new_camera_frame = None
+ # Check if parameter is part of argaze.GazeAnalysis module
+ parameter_module_path = parameter_type.__module__.split('.')
- # Load AOI frames
- new_aoi_frames = {}
+ if len(parameter_module_path) == 3:
- try:
+ if parameter_module_path[0] == 'argaze' and parameter_module_path[1] == 'GazeAnalysis':
- for aoi_name, aoi_frame_data in scene_data.pop('aoi_frames').items():
+ # Try get existing analyzer instance to append as parameter
+ try:
- new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers = frame_data_processor(aoi_frame_data)
+ aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_module_path[2]]
- # Append new AOI frame
- new_aoi_frames[aoi_name] = ArFrame.from_scene(new_aoi_3d_scene, aoi_name, new_frame_size, new_frame_background, new_gaze_movement_identifier, new_scan_path_analyzers, new_aoi_scan_path_analyzers, **aoi_frame_data)
+ except KeyError:
- except KeyError:
+ raise EnvironmentJSONLoadingFailed(f'{aoi_scan_path_analyzer_type} aoi scan path analyzer loading fails because {parameter_module_path[2]} aoi scan path analyzer is missing.')
- pass
+ aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters)
- # Append new scene
- new_scenes[new_scene_name] = ArScene(new_scene_name, new_aruco_scene, new_aoi_3d_scene, new_camera_frame, new_aoi_frames, **scene_data)
+ new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_type] = aoi_scan_path_analyzer
- return ArEnvironment(new_name, new_aruco_detector, new_scenes)
+ except KeyError:
- def __str__(self) -> str:
- """
- Returns:
- String representation
- """
+ pass
- output = f'Name:\n{self.name}\n'
- output += f'ArUcoDetector:\n{self.aruco_detector}\n'
+ # Load heatmap
+ try:
- for name, scene in self.scenes.items():
- output += f'\"{name}\" ArScene:\n{scene}\n'
+ new_heatmap_value = frame_data.pop('heatmap')
- return output
+ except KeyError:
+ new_heatmap_value = False
+
+ # Create frame
+ return ArFrame(new_frame_name, \
+ new_frame_size, \
+ new_frame_background, \
+ new_gaze_movement_identifier, \
+ GazeFeatures.ScanPath() if len(new_scan_path_analyzers) > 0 else None, \
+ new_scan_path_analyzers, \
+ GazeFeatures.AOIScanPath() if len(new_aoi_scan_path_analyzers) > 0 else None, \
+ new_aoi_scan_path_analyzers, \
+ AOIFeatures.Heatmap(new_frame_size) if new_heatmap_value else None \
+ )
@property
- def frames(self):
- """Iterate over all environment frames"""
+ def parent(self):
+ """Get parent instance"""
- # For each scene
- for scene_name, scene in self.scenes.items():
+ return self.__parent
- # For each aoi frame
- for frame_name, frame in scene.aoi_frames.items():
+ @parent.setter
+ def parent(self, parent):
+ """Get parent instance"""
- yield scene_name, frame_name, frame
-
- def detect_and_project(self, image: numpy.array) -> int:
- """Detect environment aruco markers from image and project scenes into camera frame.
-
- Returns:
- - time: marker detection time in ms
- """
-
- # Detect aruco markers
- detection_time = self.aruco_detector.detect_markers(image)
-
- # Project each aoi 3d scene into camera frame
- for scene_name, scene in self.scenes.items():
-
- if scene.aruco_aoi:
-
- # Build AOI scene directly from detected ArUco marker corners
- scene.build_aruco_aoi_scene(self.aruco_detector.detected_markers)
-
- else:
-
- # Estimate scene markers poses
- self.aruco_detector.estimate_markers_pose(scene.aruco_scene.identifiers)
-
- # Clear scene projection
- scene.clear()
-
- # Estimate scene pose from detected scene markers
- tvec, rmat, _, _ = scene.estimate_pose(self.aruco_detector.detected_markers)
-
- # Project scene into camera frame according estimated pose
- scene.project(tvec, rmat)
-
- return detection_time
-
- def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition):
- """Project timestamped gaze position into each scene."""
-
- # For each aoi scene projection
- for scene_name, scene in self.scenes.items():
-
- yield scene_name, scene.look(timestamp, gaze_position)
-
- def to_json(self, json_filepath):
- """Save environment to .json file."""
-
- with open(json_filepath, 'w', encoding='utf-8') as file:
-
- json.dump(self, file, ensure_ascii=False, indent=4, cls=DataStructures.JsonEncoder)
-
- def draw(self, image: numpy.array):
- """Draw ArUco detection visualisation and scenes projections."""
-
- # Draw detected markers
- self.aruco_detector.draw_detected_markers(image)
-
- # Draw each scene
- for scene_name, scene in self.scenes.items():
-
- scene.draw(image)
-
-class PoseEstimationFailed(Exception):
- """
- Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies.
- """
-
- def __init__(self, message, unconsistencies=None):
-
- super().__init__(message)
-
- self.unconsistencies = unconsistencies
-
-class SceneProjectionFailed(Exception):
- """
- Exception raised by ArScene project method when the scene can't be projected.
- """
-
- def __init__(self, message):
-
- super().__init__(message)
-
-@dataclass
-class ArFrame():
- """
- Define Augmented Reality frame as an AOI2DScene made from a projected then reframed parent AOI3DScene.
-
- Parameters:
- name: name of the frame
- size: frame dimension in pixel.
- background: image to draw behind
- aoi_2d_scene: AOI 2D scene description ... : see [orthogonal_projection][argaze.ArFeatures.ArScene.orthogonal_projection] and [reframe][argaze.AreaOfInterest.AOI2DScene.reframe] functions.
- ...
- """
-
- name: str
- size: tuple[int] = field(default=(1, 1))
- background: numpy.array = field(default_factory=numpy.array)
- aoi_2d_scene: AOI2DScene.AOI2DScene = field(default_factory=AOI2DScene.AOI2DScene)
- gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier)
- scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath)
- scan_path_analyzers: dict = field(default_factory=dict)
- aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath)
- aoi_scan_path_analyzers: dict = field(default_factory=dict)
- heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap)
-
- def __post_init__(self):
-
- # Define scene attribute: it will be setup by parent scene later
- self._scene = None
-
- # Init current gaze position
- self.__gaze_position = GazeFeatures.UnvalidGazePosition()
-
- # Init current gaze movement
- self.__gaze_movement = GazeFeatures.UnvalidGazeMovement()
-
- # Init current look at aoi
- self.__look_at = None
-
- # Init heatmap if required
- if self.heatmap:
-
- self.heatmap.init()
-
- # Init lock to share looked data wit hmultiples threads
- self.__looking_lock = threading.Lock()
-
- @classmethod
- def from_scene(self, aoi_3d_scene, aoi_name, size, background: numpy.array = numpy.empty((0, 0)), gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = None, scan_path_analyzers: list = [], aoi_scan_path_analyzers: list = [], heatmap: bool = False) -> ArFrameType:
-
- # If aoi_name is part of the scene
- try:
-
- aoi_2d_scene = aoi_3d_scene.orthogonal_projection.reframe(aoi_name, size)
-
- except KeyError:
-
- aoi_2d_scene = aoi_3d_scene.orthogonal_projection
-
- return ArFrame(aoi_name, \
- size, \
- background, \
- aoi_2d_scene, \
- gaze_movement_identifier, \
- GazeFeatures.ScanPath() if len(scan_path_analyzers) > 0 else None, \
- scan_path_analyzers, \
- GazeFeatures.AOIScanPath(aoi_2d_scene.keys()) if len(aoi_scan_path_analyzers) > 0 else None, \
- aoi_scan_path_analyzers, \
- AOIFeatures.Heatmap(size) if heatmap else None \
- )
+ self.__parent = parent
@property
def image(self):
@@ -506,7 +280,7 @@ class ArFrame():
"""
# Lock frame exploitation
- self.__looking_lock.acquire()
+ self.__look_lock.acquire()
image = self.background.copy()
@@ -516,7 +290,7 @@ class ArFrame():
image = cv2.addWeighted(self.heatmap.image, 0.5, image, 1., 0)
# Unlock frame exploitation
- self.__looking_lock.release()
+ self.__look_lock.release()
return image
@@ -532,7 +306,7 @@ class ArFrame():
"""
# Lock frame exploitation
- self.__looking_lock.acquire()
+ self.__look_lock.acquire()
# Update current gaze position
self.__gaze_position = inner_gaze_position
@@ -619,9 +393,9 @@ class ArFrame():
self.heatmap.update(self.__gaze_position.value, sigma=0.05)
# Unlock frame exploitation
- self.__looking_lock.release()
+ self.__look_lock.release()
- # Return looking data
+ # Return look data
return new_gaze_movement, self.__look_at, scan_step_analysis, aoi_scan_step_analysis
def draw(self, image:numpy.array):
@@ -633,7 +407,7 @@ class ArFrame():
"""
# Lock frame exploitation
- self.__looking_lock.acquire()
+ self.__look_lock.acquire()
# Draw aoi
self.aoi_2d_scene.draw(image, color=(0, 0, 0))
@@ -654,7 +428,7 @@ class ArFrame():
self.aoi_2d_scene.draw_circlecast(image, self.__gaze_movement.focus, self.__gaze_movement.deviation_max, base_color=(0, 0, 0), matching_color=(255, 255, 255))
# Unlock frame exploitation
- self.__looking_lock.release()
+ self.__look_lock.release()
@dataclass
class ArScene():
@@ -669,8 +443,6 @@ class ArScene():
aoi_3d_scene: AOI 3D scene description that will be projected onto estimated scene once its pose will be estimated : see [project][argaze.ArFeatures.ArScene.project] function below.
- camera_frame: Where AOI 3D scene will be projected
-
aoi_frames: Optional dictionary to define AOI as ArFrame.
aruco_axis: Optional dictionary to define orthogonal axis where each axis is defined by list of 3 markers identifier (first is origin). \
@@ -685,7 +457,6 @@ class ArScene():
name: str
aruco_scene: ArUcoScene.ArUcoScene = field(default_factory=ArUcoScene.ArUcoScene)
aoi_3d_scene: AOI3DScene.AOI3DScene = field(default_factory=AOI3DScene.AOI3DScene)
- camera_frame: ArFrame = field(default_factory=ArFrame)
aoi_frames: dict = field(default_factory=dict)
aruco_axis: dict = field(default_factory=dict)
aruco_aoi: dict = field(default_factory=dict)
@@ -695,17 +466,15 @@ class ArScene():
def __post_init__(self):
# Define environment attribute: it will be setup by parent environment later
- self._environment = None
+ self.__environment = None
# Preprocess orthogonal projection to speed up further aruco aoi processings
self.__orthogonal_projection_cache = self.aoi_3d_scene.orthogonal_projection
- # Setup ArFrame scene attribute after ArFrame creation
+ # Setup aoi frame parent attribute
for aoi_name, frame in self.aoi_frames.items():
- frame._scene = self
- # Init lock to share camera frame with multiples threads
- self.__camera_frame_lock = threading.Lock()
+ frame.parent = self
def __str__(self) -> str:
"""
@@ -713,24 +482,102 @@ class ArScene():
String representation
"""
- output = f'ArEnvironment:\n{self._environment.name}\n'
+ output = f'ArEnvironment:\n{self.environment.name}\n'
output += f'ArUcoScene:\n{self.aruco_scene}\n'
output += f'AOI3DScene:\n{self.aoi_3d_scene}\n'
return output
- def clear(self):
- """Clear scene projection."""
+ @classmethod
+ def from_dict(self, scene_data, working_directory: str = None) -> ArSceneType:
- # Lock camera frame exploitation
- self.__camera_frame_lock.acquire()
+ # Load name
+ try:
- # Update camera frame
- self.camera_frame.aoi_2d_scene = AOI2DScene.AOI2DScene()
+ new_scene_name = scene_data.pop('name')
- # Unlock camera frame exploitation
- self.__camera_frame_lock.release()
+ except KeyError:
+
+ new_scene_name = None
+
+ # Load aruco scene
+ try:
+
+ # Check aruco_scene value type
+ aruco_scene_value = scene_data.pop('aruco_scene')
+
+ # str: relative path to .obj file
+ if type(aruco_scene_value) == str:
+
+ aruco_scene_value = os.path.join(working_directory, aruco_scene_value)
+ new_aruco_scene = ArUcoScene.ArUcoScene.from_obj(aruco_scene_value)
+
+ # dict:
+ else:
+
+ new_aruco_scene = ArUcoScene.ArUcoScene(**aruco_scene_value)
+
+ except KeyError:
+
+ new_aruco_scene = None
+
+ # Load aoi 3d scene
+ try:
+
+ # Check aoi_3d_scene value type
+ aoi_3d_scene_value = scene_data.pop('aoi_3d_scene')
+
+ # str: relative path to .obj file
+ if type(aoi_3d_scene_value) == str:
+
+ obj_filepath = os.path.join(working_directory, aoi_3d_scene_value)
+ new_aoi_3d_scene = AOI3DScene.AOI3DScene.from_obj(obj_filepath)
+
+ # dict:
+ else:
+
+ new_aoi_3d_scene = AOI3DScene.AOI3DScene(aoi_3d_scene_value)
+
+ except KeyError:
+
+ new_aoi_3d_scene = None
+
+ # Load aoi frames
+ new_aoi_frames = {}
+
+ try:
+
+ for aoi_name, aoi_frame_data in scene_data.pop('aoi_frames').items():
+
+ # Create aoi frame
+ new_aoi_frame = ArFrame.from_dict(aoi_frame_data, working_directory)
+
+ # Setup aoi frame
+ new_aoi_frame.name = aoi_name
+ new_aoi_frame.aoi_2d_scene = new_aoi_3d_scene.orthogonal_projection.reframe(aoi_name, new_aoi_frame.size)
+ new_aoi_frame.aoi_scan_path.expected_aois = list(new_aoi_3d_scene.keys())
+
+ # Append new aoi frame
+ new_aoi_frames[aoi_name] = new_aoi_frame
+
+ except KeyError:
+
+ pass
+
+ return ArScene(new_scene_name, new_aruco_scene, new_aoi_3d_scene, new_aoi_frames, **scene_data)
+
+ @property
+ def environment(self):
+ """Get parent environment instance"""
+
+ return self.__environment
+ @environment.setter
+ def environment(self, environment):
+ """Set parent environment instance"""
+
+ self.__environment = environment
+
def estimate_pose(self, detected_markers) -> Tuple[numpy.array, numpy.array, str, dict]:
"""Estimate scene pose from detected ArUco markers.
@@ -794,13 +641,16 @@ class ArScene():
return tvec, rmat, 'estimate_pose_from_markers', consistent_markers
- def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> ArFrame:
+ def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> AOI2DScene.AOI2DScene:
"""Project AOI scene according estimated pose and optional horizontal field of view clipping angle.
Parameters:
tvec: translation vector
rvec: rotation vector
visual_hfov: horizontal field of view clipping angle
+
+ Returns:
+ aoi_2d_scene: scene projection
"""
# Clip AOI out of the visual horizontal field of view (optional)
@@ -822,26 +672,14 @@ class ArScene():
aoi_3d_scene_copy = self.aoi_3d_scene.copy()
- # Lock camera frame exploitation
- self.__camera_frame_lock.acquire()
-
- # Update camera frame
- self.camera_frame.aoi_2d_scene = aoi_3d_scene_copy.project(tvec, rvec, self._environment.aruco_detector.optic_parameters.K)
-
- # Unlock camera frame exploitation
- self.__camera_frame_lock.release()
-
- # Warn user when the projected scene is empty
- if len(self.camera_frame.aoi_2d_scene) == 0:
-
- raise SceneProjectionFailed('AOI projection is empty')
+ return aoi_3d_scene_copy.project(tvec, rvec, self.environment.aruco_detector.optic_parameters.K)
def build_aruco_aoi_scene(self, detected_markers) -> AOI2DScene.AOI2DScene:
"""
Build AOI scene from detected ArUco markers as defined in aruco_aoi dictionary.
Returns:
- built AOI 2D scene
+ aoi_2d_scene: built AOI 2D scene
"""
# ArUco aoi must be defined
@@ -880,17 +718,275 @@ class ArScene():
aoi_corners = [numpy.array(aruco_aoi_scene[aruco_aoi_name].outter_axis(inner)) for inner in self.__orthogonal_projection_cache[inner_aoi_name]]
aruco_aoi_scene[inner_aoi_name] = AOIFeatures.AreaOfInterest(aoi_corners)
+ return AOI2DScene.AOI2DScene(aruco_aoi_scene)
+
+ def draw_axis(self, image: numpy.array):
+ """
+ Draw scene axis into image.
+
+ Parameters:
+ image: where to draw
+ """
+
+ self.aruco_scene.draw_axis(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D)
+
+ def draw_places(self, image: numpy.array):
+ """
+ Draw scene places into image.
+
+ Parameters:
+ image: where to draw
+ """
+
+ self.aruco_scene.draw_places(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D)
+
+@dataclass
+class ArEnvironment():
+ """
+ Define Augmented Reality environment based on ArUco marker detection.
+
+ Parameters:
+ name: environment name
+ aruco_detector: ArUco marker detector
+ camera_frame: where to project scenes
+ scenes: all environment scenes
+ """
+
+ name: str
+ aruco_detector: ArUcoDetector.ArUcoDetector = field(default_factory=ArUcoDetector.ArUcoDetector)
+ camera_frame: ArFrame = field(default_factory=ArFrame)
+ scenes: dict = field(default_factory=dict)
+
+ def __post_init__(self):
+
+ # Setup camera frame parent attribute
+ if self.camera_frame != None:
+
+ self.camera_frame.parent = self
+
+ # Setup scenes environment attribute
+ for name, scene in self.scenes.items():
+
+ scene.environment = self
+
+ # Init a lock to share AOI scene projections into camera frame between multiple threads
+ self.__camera_frame_lock = threading.Lock()
+
+ @classmethod
+ def from_dict(self, environment_data, working_directory: str = None) -> ArEnvironmentType:
+
+ new_environment_name = environment_data.pop('name')
+
+ try:
+ new_detector_data = environment_data.pop('aruco_detector')
+
+ new_aruco_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(**new_detector_data.pop('dictionary'))
+ new_marker_size = new_detector_data.pop('marker_size')
+
+ # Check optic_parameters value type
+ optic_parameters_value = new_detector_data.pop('optic_parameters')
+
+ # str: relative path to .json file
+ if type(optic_parameters_value) == str:
+
+ optic_parameters_value = os.path.join(working_directory, optic_parameters_value)
+ new_optic_parameters = ArUcoOpticCalibrator.OpticParameters.from_json(optic_parameters_value)
+
+ # dict:
+ else:
+
+ new_optic_parameters = ArUcoOpticCalibrator.OpticParameters(**optic_parameters_value)
+
+ # Check detector parameters value type
+ detector_parameters_value = new_detector_data.pop('parameters')
+
+ # str: relative path to .json file
+ if type(detector_parameters_value) == str:
+
+ detector_parameters_value = os.path.join(working_directory, detector_parameters_value)
+ new_aruco_detector_parameters = ArUcoDetector.DetectorParameters.from_json(detector_parameters_value)
+
+ # dict:
+ else:
+
+ new_aruco_detector_parameters = ArUcoDetector.DetectorParameters(**detector_parameters_value)
+
+ new_aruco_detector = ArUcoDetector.ArUcoDetector(new_aruco_dictionary, new_marker_size, new_optic_parameters, new_aruco_detector_parameters)
+
+ except KeyError:
+
+ new_aruco_detector = None
+
+ # Load camera frame as large as aruco dectector optic parameters
+ try:
+
+ camera_frame_data = environment_data.pop('camera_frame')
+
+ # Create camera frame
+ new_camera_frame = ArFrame.from_dict(camera_frame_data, working_directory)
+
+ # Setup camera frame
+ new_camera_frame.name = new_environment_name
+ new_camera_frame.size = new_optic_parameters.dimensions
+ new_camera_frame.background = numpy.zeros((new_optic_parameters.dimensions[1], new_optic_parameters.dimensions[0], 3)).astype(numpy.uint8)
+
+ except KeyError:
+
+ new_camera_frame = None
+
+ # Build scenes
+ new_scenes = {}
+ for new_scene_name, scene_data in environment_data.pop('scenes').items():
+
+ # Create new scene
+ new_scene = ArScene.from_dict(scene_data, working_directory)
+
+ # Setup new scene
+ new_scene.name = new_scene_name
+
+ # Append new scene
+ new_scenes[new_scene_name] = new_scene
+
+ # Setup expected aoi for camera frame aoi scan path
+ if new_camera_frame != None:
+
+ if new_camera_frame.aoi_scan_path != None:
+
+ # List all environment aoi
+ all_aoi_list = []
+ for scene_name, scene in new_scenes.items():
+
+ all_aoi_list.extend(list(scene.aoi_3d_scene.keys()))
+
+ new_camera_frame.aoi_scan_path.expected_aois = all_aoi_list
+
+ # Create new environment
+ return ArEnvironment(new_environment_name, new_aruco_detector, new_camera_frame, new_scenes)
+
+ @classmethod
+ def from_json(self, json_filepath: str) -> ArEnvironmentType:
+ """
+ Load ArEnvironment from .json file.
+
+ Parameters:
+ json_filepath: path to json file
+ """
+
+ with open(json_filepath) as configuration_file:
+
+ environment_data = json.load(configuration_file)
+ working_directory = os.path.dirname(json_filepath)
+
+ return ArEnvironment.from_dict(environment_data, working_directory)
+
+ def __str__(self) -> str:
+ """
+ Returns:
+ String representation
+ """
+
+ output = f'Name:\n{self.name}\n'
+ output += f'ArUcoDetector:\n{self.aruco_detector}\n'
+
+ for name, scene in self.scenes.items():
+ output += f'\"{name}\" ArScene:\n{scene}\n'
+
+ return output
+
+ @property
+ def image(self):
+ """Get camera frame image"""
+
+ # Can't use camera frame when it is locked
+ if self.__camera_frame_lock.locked():
+ return
+
# Lock camera frame exploitation
self.__camera_frame_lock.acquire()
- # Update camera frame
- self.camera_frame.aoi_2d_scene = AOI2DScene.AOI2DScene(aruco_aoi_scene)
+ # Get camera frame image
+ image = self.camera_frame.image
# Unlock camera frame exploitation
self.__camera_frame_lock.release()
+ return image
+
+ @property
+ def aoi_frames(self):
+ """Iterate over all environment scenes aoi frames"""
+
+ # For each scene
+ for scene_name, scene in self.scenes.items():
+
+ # For each aoi frame
+ for frame_name, aoi_frame in scene.aoi_frames.items():
+
+ yield aoi_frame
+
+ def detect_and_project(self, image: numpy.array) -> int:
+ """Detect environment aruco markers from image and project scenes into camera frame.
+
+ Returns:
+ - detection_time: aruco marker detection time in ms
+ - exceptions: dictionary with exception raised per scene
+ """
+
+ # Detect aruco markers
+ detection_time = self.aruco_detector.detect_markers(image)
+
+ # Lock camera frame exploitation
+ self.__camera_frame_lock.acquire()
+
+ # Fill camera frame background with image
+ self.camera_frame.background = image
+
+ # Clear former scenes projection into camera frame
+ self.camera_frame.aoi_2d_scene = AOI2DScene.AOI2DScene()
+
+ # Store exceptions for each scene
+ exceptions = {}
+
+ # Project each aoi 3d scene into camera frame
+ for scene_name, scene in self.scenes.items():
+
+ ''' TODO: Enable aruco_aoi processing
+ if scene.aruco_aoi:
+
+ try:
+
+ # Build AOI scene directly from detected ArUco marker corners
+ self.camera_frame.aoi_2d_scene |= scene.build_aruco_aoi_scene(self.aruco_detector.detected_markers)
+
+ except SceneProjectionFailed:
+
+ pass
+ '''
+
+ try:
+
+ # Estimate scene markers poses
+ self.aruco_detector.estimate_markers_pose(scene.aruco_scene.identifiers)
+
+ # Estimate scene pose from detected scene markers
+ tvec, rmat, _, _ = scene.estimate_pose(self.aruco_detector.detected_markers)
+
+ # Project scene into camera frame according estimated pose
+ self.camera_frame.aoi_2d_scene |= scene.project(tvec, rmat)
+
+ # Store exceptions and continue
+ except Exception as e:
+
+ exceptions[scene_name] = e
+
+ # Unlock camera frame exploitation
+ self.__camera_frame_lock.release()
+
+ # Return dection time and exceptions
+ return detection_time, exceptions
+
def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition):
- """Project timestamped gaze position into camera frame."""
+ """Project timestamped gaze position into each scene."""
# Can't use camera frame when it is locked
if self.__camera_frame_lock.locked():
@@ -902,16 +998,16 @@ class ArScene():
# Lock camera frame exploitation
self.__camera_frame_lock.acquire()
- # Project gaze position in camera frame
- yield self.name, self.camera_frame.look(timestamp, gaze_position)
+ # Project gaze position into camera frame
+ yield self.camera_frame, self.camera_frame.look(timestamp, gaze_position)
# Project gaze position into each aoi frames if possible
- for aoi_name, frame in self.aoi_frames.items():
+ for aoi_frame in self.aoi_frames:
# Is aoi frame projected into camera frame ?
try:
- aoi_2d = self.camera_frame.aoi_2d_scene[frame.name]
+ aoi_2d = self.camera_frame.aoi_2d_scene[aoi_frame.name]
# TODO: Add option to use gaze precision circle
if aoi_2d.contains_point(gaze_position.value):
@@ -921,7 +1017,7 @@ class ArScene():
# QUESTION: How to project gaze precision?
inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y))
- yield aoi_name, frame.look(timestamp, inner_gaze_position * frame.size)
+ yield aoi_frame, aoi_frame.look(timestamp, inner_gaze_position * aoi_frame.size)
# Ignore missing aoi frame projection
except KeyError:
@@ -931,32 +1027,28 @@ class ArScene():
# Unlock camera frame exploitation
self.__camera_frame_lock.release()
- def draw(self, image: numpy.array):
- """
- Draw camera frame into image.
+ def to_json(self, json_filepath):
+ """Save environment to .json file."""
- Parameters:
- image: where to draw
- """
+ with open(json_filepath, 'w', encoding='utf-8') as file:
- self.camera_frame.draw(image)
+ json.dump(self, file, ensure_ascii=False, indent=4, cls=DataStructures.JsonEncoder)
- def draw_axis(self, image: numpy.array):
- """
- Draw scene axis into image.
-
- Parameters:
- image: where to draw
- """
+ def draw(self, image: numpy.array):
+ """Draw ArUco detection visualisation and camera frame projections."""
- self.aruco_scene.draw_axis(image, self._environment.aruco_detector.optic_parameters.K, self._environment.aruco_detector.optic_parameters.D)
+ # Draw detected markers
+ self.aruco_detector.draw_detected_markers(image)
- def draw_places(self, image: numpy.array):
- """
- Draw scene places into image.
+ # Can't use camera frame when it is locked
+ if self.__camera_frame_lock.locked():
+ return
- Parameters:
- image: where to draw
- """
+ # Lock camera frame exploitation
+ self.__camera_frame_lock.acquire()
- self.aruco_scene.draw_places(image, self._environment.aruco_detector.optic_parameters.K, self._environment.aruco_detector.optic_parameters.D)
+ # Draw camera frame
+ self.camera_frame.draw(image)
+
+ # Unlock camera frame exploitation
+ self.__camera_frame_lock.release()