aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/ArFeatures.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/argaze/ArFeatures.py')
-rw-r--r--src/argaze/ArFeatures.py299
1 files changed, 138 insertions, 161 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 3b05482..7cc1b9d 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -34,35 +34,36 @@ from argaze.utils import UtilsFeatures
import numpy
import cv2
+
class PoseEstimationFailed(Exception):
"""
- Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies.
+ Exception raised by ArScene estimate_pose method when the pose can't be estimated due to inconsistencies.
"""
- def __init__(self, message, unconsistencies=None):
-
+ def __init__(self, message, inconsistencies=None):
super().__init__(message)
- self.unconsistencies = unconsistencies
+ self.inconsistencies = inconsistencies
+
class SceneProjectionFailed(Exception):
"""
Exception raised by ArCamera watch method when the scene can't be projected.
"""
- def __init__(self, message):
-
+ def __init__(self, message):
super().__init__(message)
+
class DrawingFailed(Exception):
"""
Exception raised when drawing fails.
"""
- def __init__(self, message):
-
+ def __init__(self, message):
super().__init__(message)
+
# Define default ArLayer draw parameters
DEFAULT_ARLAYER_DRAW_PARAMETERS = {
"draw_aoi_scene": {
@@ -92,9 +93,10 @@ DEFAULT_ARLAYER_DRAW_PARAMETERS = {
}
}
+
class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
- Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed.
+ Defines a space where to make matching of gaze movements and AOI and inside which those matching need to be analyzed.
!!! note
Inherits from DataFeatures.SharedObject class to be shared by multiple threads.
@@ -118,14 +120,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Init pipeline step object attributes
self.draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS
-
+
@property
def aoi_scene(self) -> AOIFeatures.AOIScene:
"""AOI scene description."""
return self.__aoi_scene
@aoi_scene.setter
- def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene|str|dict):
+ def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene | str | dict):
if issubclass(type(aoi_scene_value), AOIFeatures.AOIScene):
@@ -139,7 +141,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# JSON file format for 2D or 3D dimension
if file_format == 'json':
-
new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath)
# SVG file format for 2D dimension only
@@ -168,7 +169,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Edit parent
if self.__aoi_scene is not None:
-
self.__aoi_scene.parent = self
@property
@@ -180,15 +180,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@DataFeatures.PipelineStepAttributeSetter
def aoi_matcher(self, aoi_matcher: GazeFeatures.AOIMatcher):
- assert(issubclass(type(aoi_matcher), GazeFeatures.AOIMatcher))
+ assert (issubclass(type(aoi_matcher), GazeFeatures.AOIMatcher))
self.__aoi_matcher = aoi_matcher
# Edit parent
if self.__aoi_matcher is not None:
-
self.__aoi_matcher.parent = self
-
+
@property
def aoi_scan_path(self) -> GazeFeatures.AOIScanPath:
"""AOI scan path object."""
@@ -198,7 +197,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@DataFeatures.PipelineStepAttributeSetter
def aoi_scan_path(self, aoi_scan_path: GazeFeatures.AOIScanPath):
- assert(isinstance(aoi_scan_path, GazeFeatures.AOIScanPath))
+ assert (isinstance(aoi_scan_path, GazeFeatures.AOIScanPath))
self.__aoi_scan_path = aoi_scan_path
@@ -207,9 +206,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Edit parent
if self.__aoi_scan_path is not None:
-
self.__aoi_scan_path.parent = self
-
+
@property
def aoi_scan_path_analyzers(self) -> list:
"""AOI scan path analyzers list."""
@@ -224,7 +222,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Connect analyzers if required
for analyzer in self.__aoi_scan_path_analyzers:
- assert(issubclass(type(analyzer), GazeFeatures.AOIScanPathAnalyzer))
+ assert (issubclass(type(analyzer), GazeFeatures.AOIScanPathAnalyzer))
# Check scan path analyzer properties type
for name, item in type(analyzer).__dict__.items():
@@ -238,7 +236,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
except KeyError:
- raise(ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}'))
+ raise (ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}'))
if issubclass(property_type, GazeFeatures.AOIScanPathAnalyzer):
@@ -248,28 +246,25 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
for a in self.__aoi_scan_path_analyzers:
if type(a) == property_type:
-
setattr(analyzer, name, a)
found = True
if not found:
-
- raise DataFeatures.PipelineStepLoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
+ raise DataFeatures.PipelineStepLoadingFailed(
+ f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None:
-
- self.scan_path = GazeFeatures.ScanPath()
+ self.__aoi_scan_path = GazeFeatures.ScanPath()
# Edit parent
for analyzer in self.__aoi_scan_path_analyzers:
-
analyzer.parent = self
def last_looked_aoi_name(self) -> str:
"""Get last looked aoi name."""
return self.__looked_aoi_name
-
+
def is_analysis_available(self) -> bool:
"""Are aoi scan path analysis ready?"""
return self.__aoi_scan_path_analyzed
@@ -279,7 +274,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
analysis = {}
for analyzer in self.__aoi_scan_path_analyzers:
-
analysis[DataFeatures.get_class_path(analyzer)] = analyzer.analysis()
return analysis
@@ -300,19 +294,17 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""Update expected AOI of AOI scan path considering AOI scene and layer name."""
if self.__aoi_scene is None:
-
logging.debug('ArLayer._update_expected_aoi %s (parent: %s): missing aoi scene', self.name, self.parent)
return
logging.debug('ArLayer._update_expected_aoi %s (parent: %s)', self.name, self.parent)
- # Get aoi names from aoi scene
+ # Get aoi names from aoi scene
expected_aoi = list(self.__aoi_scene.keys())
# Remove layer name from expected aoi
if self.name in expected_aoi:
-
expected_aoi.remove(self.name)
# Update expected aoi: this will clear the scan path
@@ -345,9 +337,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__aoi_scan_path_analyzed = False
if self.__aoi_matcher is not None and self.__aoi_scene is not None:
-
# Update looked aoi thanks to aoi matcher
- # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally
+ # Note: don't filter valid/invalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally
self.__looked_aoi_name, _ = self.__aoi_matcher.match(gaze_movement, self.__aoi_scene)
logging.debug('\t> looked aoi name: %s', self.__looked_aoi_name)
@@ -372,7 +363,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Analyze aoi scan path
for aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers:
-
aoi_scan_path_analyzer.analyze(self.__aoi_scan_path, timestamp=gaze_movement.timestamp)
# Update aoi scan path analyzed state
@@ -382,7 +372,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Append saccade to aoi scan path
if self.__aoi_scan_path is not None:
-
logging.debug('\t> append saccade')
self.__aoi_scan_path.append_saccade(gaze_movement)
@@ -393,8 +382,10 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
Draw into image.
Parameters:
+ image: image where to draw.
draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn)
- draw_aoi_matching: AOIMatcher.draw parameters (which depends of the loaded aoi matcher module, if None, no aoi matching is drawn)
+ draw_aoi_matching: AOIMatcher.draw parameters (which depends on the loaded aoi matcher module,
+ if None, no aoi matching is drawn)
"""
# Use layer lock feature
@@ -402,14 +393,13 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Draw aoi if required
if draw_aoi_scene is not None and self.__aoi_scene is not None:
-
self.__aoi_scene.draw(image, **draw_aoi_scene)
# Draw aoi matching if required
if draw_aoi_matching is not None and self.__aoi_matcher is not None:
-
self.__aoi_matcher.draw(image, self.__aoi_scene, **draw_aoi_matching)
+
# Define default ArFrame image parameters
DEFAULT_ARFRAME_IMAGE_PARAMETERS = {
"background_weight": 1.,
@@ -431,6 +421,7 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = {
}
}
+
class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed.
@@ -453,7 +444,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__filter_in_progress_identification = True
self.__scan_path = None
self.__scan_path_analyzers = []
- self.__background = DataFeatures.TimestampedImage( numpy.full((1, 1, 3), 127).astype(numpy.uint8) )
+ self.__background = DataFeatures.TimestampedImage(numpy.full((1, 1, 3), 127).astype(numpy.uint8))
self.__heatmap = None
self.__calibrated_gaze_position = GazeFeatures.GazePosition()
self.__identified_gaze_movement = GazeFeatures.GazeMovement()
@@ -464,32 +455,31 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self._image_parameters = DEFAULT_ARFRAME_IMAGE_PARAMETERS
@property
- def size(self) -> tuple[int]:
+ def size(self) -> tuple[int, int]:
"""Defines the dimension of the rectangular area where gaze positions are projected."""
return self.__size
@size.setter
- def size(self, size: tuple[int]):
+ def size(self, size: tuple[int, int]):
self.__size = size
-
+
@property
def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator:
- """Select gaze position calibration algoritm."""
+ """Select gaze position calibration algorithm."""
return self.__gaze_position_calibrator
@gaze_position_calibrator.setter
@DataFeatures.PipelineStepAttributeSetter
def gaze_position_calibrator(self, gaze_position_calibrator: GazeFeatures.GazePositionCalibrator):
- assert(issubclass(type(gaze_position_calibrator), GazeFeatures.GazePositionCalibrator))
+ assert (issubclass(type(gaze_position_calibrator), GazeFeatures.GazePositionCalibrator))
self.__gaze_position_calibrator = gaze_position_calibrator
# Edit parent
if self.__gaze_position_calibrator is not None:
-
self.__gaze_position_calibrator.parent = self
-
+
@property
def gaze_movement_identifier(self) -> GazeFeatures.GazeMovementIdentifier:
"""Select gaze movement identification algorithm."""
@@ -499,15 +489,14 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@DataFeatures.PipelineStepAttributeSetter
def gaze_movement_identifier(self, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier):
- assert(issubclass(type(gaze_movement_identifier), GazeFeatures.GazeMovementIdentifier))
+ assert (issubclass(type(gaze_movement_identifier), GazeFeatures.GazeMovementIdentifier))
self.__gaze_movement_identifier = gaze_movement_identifier
# Edit parent
if self.__gaze_movement_identifier is not None:
-
self.__gaze_movement_identifier.parent = self
-
+
@property
def filter_in_progress_identification(self) -> bool:
"""Is frame ignores in progress gaze movement identification?"""
@@ -526,15 +515,14 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@scan_path.setter
@DataFeatures.PipelineStepAttributeSetter
- def scan_path(self, scan_path: GazeFeatures.ScanPath) -> GazeFeatures.ScanPath:
+ def scan_path(self, scan_path: GazeFeatures.ScanPath):
- assert(isinstance(scan_path, GazeFeatures.ScanPath))
+ assert (isinstance(scan_path, GazeFeatures.ScanPath))
self.__scan_path = scan_path
# Edit parent
if self.__scan_path is not None:
-
self.__scan_path.parent = self
@property
@@ -551,7 +539,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Connect analyzers if required
for analyzer in self.__scan_path_analyzers:
- assert(issubclass(type(analyzer), GazeFeatures.ScanPathAnalyzer))
+ assert (issubclass(type(analyzer), GazeFeatures.ScanPathAnalyzer))
# Check scan path analyzer properties type
for name, item in type(analyzer).__dict__.items():
@@ -565,7 +553,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
except KeyError:
- raise(ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}'))
+ raise (ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}'))
if issubclass(property_type, GazeFeatures.AOIScanPathAnalyzer):
@@ -575,22 +563,19 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
for a in self.__scan_path_analyzers:
if type(a) == property_type:
-
setattr(analyzer, name, a)
found = True
if not found:
-
- raise DataFeatures.PipelineStepLoadingFaile(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
+ raise DataFeatures.PipelineStepLoadingFaile(
+ f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
- if len(self.__scan_path_analyzers) > 0 and self.scan_path == None:
-
- self.scan_path = GazeFeatures.ScanPath()
+ if len(self.__scan_path_analyzers) > 0 and self.__scan_path == None:
+ self.__scan_path = GazeFeatures.ScanPath()
# Edit parent
for analyzer in self.__scan_path_analyzers:
-
analyzer.parent = self
@property
@@ -602,12 +587,13 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@DataFeatures.PipelineStepAttributeSetter
def background(self, background: DataFeatures.TimestampedImage):
- assert(isinstance(background, DataFeatures.TimestampedImage))
+ assert (isinstance(background, DataFeatures.TimestampedImage))
if background.size != self.size:
# Resize image to frame size
- self.__background = DataFeatures.TimestampedImage( cv2.resize(background, dsize = self.size, interpolation = cv2.INTER_CUBIC), background.timestamp)
+ self.__background = DataFeatures.TimestampedImage(
+ cv2.resize(background, dsize=self.size, interpolation=cv2.INTER_CUBIC), background.timestamp)
else:
@@ -622,20 +608,18 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@DataFeatures.PipelineStepAttributeSetter
def heatmap(self, heatmap: AOIFeatures.Heatmap):
- assert(isinstance(heatmap, AOIFeatures.Heatmap))
+ assert (isinstance(heatmap, AOIFeatures.Heatmap))
self.__heatmap = heatmap
# Default heatmap size equals frame size
if self.__heatmap.size == (1, 1):
-
self.__heatmap.size = self.size
# Edit parent
if self.__heatmap is not None:
-
self.__heatmap.parent = self
-
+
@property
def layers(self) -> dict:
"""Layers dictionary."""
@@ -648,12 +632,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self._layers = {}
for layer_name, layer_data in layers.items():
-
- self._layers[layer_name] = ArLayer(name = layer_name, **layer_data)
+ self._layers[layer_name] = ArLayer(name=layer_name, **layer_data)
# Edit parent
for name, layer in self._layers.items():
-
layer.parent = self
def last_gaze_position(self) -> object:
@@ -673,7 +655,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
analysis = {}
for analyzer in self.__scan_path_analyzers:
-
analysis[DataFeatures.get_class_path(analyzer)] = analyzer.analysis()
return analysis
@@ -701,7 +682,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return d
@DataFeatures.PipelineStepMethod
- def look(self, timestamped_gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()) -> Iterator[Union[object, type, dict]]:
+ def look(self, timestamped_gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()):
"""
Project timestamped gaze position into frame.
@@ -733,60 +714,60 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Identify gaze movement
if self.__gaze_movement_identifier is not None:
-
# Identify finished gaze movement
- self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(self.__calibrated_gaze_position)
+ self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(
+ self.__calibrated_gaze_position)
# Valid and finished gaze movement has been identified
if self.__identified_gaze_movement and self.__identified_gaze_movement.is_finished():
-
+
if GazeFeatures.is_fixation(self.__identified_gaze_movement):
-
+
# Append fixation to scan path
if self.__scan_path is not None:
-
self.__scan_path.append_fixation(self.__identified_gaze_movement)
elif GazeFeatures.is_saccade(self.__identified_gaze_movement):
-
+
# Append saccade to scan path
if self.__scan_path is not None:
-
+
scan_step = self.__scan_path.append_saccade(self.__identified_gaze_movement)
# Is there a new step?
if scan_step and len(self.__scan_path) > 1:
-
+
# Analyze aoi scan path
for scan_path_analyzer in self.__scan_path_analyzers:
-
- scan_path_analyzer.analyze(self.__scan_path, timestamp=self.__identified_gaze_movement.timestamp)
+ scan_path_analyzer.analyze(self.__scan_path,
+ timestamp=self.__identified_gaze_movement.timestamp)
# Update scan path analyzed state
self.__scan_path_analyzed = True
- # No valid finished gaze movement: optionnaly stop in progress identification filtering
+ # No valid finished gaze movement: optionally stop in progress identification filtering
elif self.__gaze_movement_identifier is not None and not self.__filter_in_progress_identification:
self.__identified_gaze_movement = self.__gaze_movement_identifier.current_gaze_movement()
# Update heatmap
if self.__heatmap is not None:
-
# Scale gaze position value
scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]])
# Update heatmap image
- self.__heatmap.update(self.__calibrated_gaze_position * scale, timestamp=self.__calibrated_gaze_position.timestamp)
+ self.__heatmap.update(self.__calibrated_gaze_position * scale,
+ timestamp=self.__calibrated_gaze_position.timestamp)
# Look layers with valid identified gaze movement
- # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally
+ # Note: don't filter valid/invalid finished/unfinished gaze movement to allow layers to reset internally
for layer_name, layer in self._layers.items():
-
layer.look(self.__identified_gaze_movement)
@DataFeatures.PipelineStepImage
- def image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array:
+ def image(self, background_weight: float = None, heatmap_weight: float = None,
+ draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None,
+ draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array:
"""
Get background image with overlaid visualisations.
@@ -838,14 +819,12 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Draw gaze position calibrator
if draw_gaze_position_calibrator is not None:
-
logging.debug('\t> drawing gaze position calibrator')
self.__gaze_position_calibrator.draw(image, size=self.__size, **draw_gaze_position_calibrator)
# Draw scan path if required
if draw_scan_path is not None and self.__scan_path is not None:
-
logging.debug('\t> drawing scan path')
self.__scan_path.draw(image, **draw_scan_path)
@@ -854,7 +833,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if draw_fixations is not None and self.__gaze_movement_identifier is not None:
if self.__gaze_movement_identifier.current_fixation():
-
logging.debug('\t> drawing current fixation')
self.__gaze_movement_identifier.current_fixation().draw(image, **draw_fixations)
@@ -863,7 +841,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if draw_saccades is not None and self.__gaze_movement_identifier is not None:
if self.__gaze_movement_identifier.current_saccade():
-
logging.debug('\t> drawing current saccade')
self.__gaze_movement_identifier.current_saccade().draw(image, **draw_saccades)
@@ -881,24 +858,24 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
except KeyError:
- raise(DrawingFailed(f'\'{layer_name}\' layer doesn\'t exist.'))
+ raise (DrawingFailed(f'\'{layer_name}\' layer doesn\'t exist.'))
# Draw current gaze position if required
if draw_gaze_positions is not None:
-
logging.debug('\t> drawing current gaze position')
self.__calibrated_gaze_position.draw(image, **draw_gaze_positions)
logging.debug('\t> returning image (%i x %i)', image.shape[1], image.shape[0])
- return DataFeatures.TimestampedImage(image, timestamp = self.__background.timestamp)
+ return DataFeatures.TimestampedImage(image, timestamp=self.__background.timestamp)
+
class ArScene(DataFeatures.PipelineStepObject):
"""
Define abstract Augmented Reality scene with ArLayers and ArFrames inside.
"""
-
+
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
"""Initialize ArScene"""
@@ -906,8 +883,8 @@ class ArScene(DataFeatures.PipelineStepObject):
# Init private attributes
self._layers = {}
self.__frames = {}
- self.__angle_tolerance = 0,
- self.__distance_tolerance = 0,
+ self.__angle_tolerance = 0.
+ self.__distance_tolerance = 0.
@property
def layers(self) -> dict:
@@ -917,7 +894,7 @@ class ArScene(DataFeatures.PipelineStepObject):
@layers.setter
@DataFeatures.PipelineStepAttributeSetter
- def layers(self, layers:dict):
+ def layers(self, layers: dict):
self._layers = {}
@@ -925,21 +902,18 @@ class ArScene(DataFeatures.PipelineStepObject):
if type(layer_data) == dict:
- self._layers[layer_name] = ArLayer(name = layer_name, **layer_data)
+ self._layers[layer_name] = ArLayer(name=layer_name, **layer_data)
# str: relative path to JSON file
elif type(layer_data) == str:
- self._layers[layer_name] = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), layer_data))
-
- # Loaded layer name have to be equals to dictionary key
- assert(self._layers[layer_name].name == frame_name)
+ self._layers[layer_name] = DataFeatures.from_json(
+ os.path.join(DataFeatures.get_working_directory(), layer_data))
# Edit parent
for name, layer in self._layers.items():
-
layer.parent = self
-
+
@property
def frames(self) -> dict:
"""Dictionary of ArFrames to project once the pose is estimated.
@@ -956,7 +930,7 @@ class ArScene(DataFeatures.PipelineStepObject):
if type(frame_data) == dict:
- new_frame = ArFrame(name = frame_name, **frame_data)
+ new_frame = ArFrame(name=frame_name, **frame_data)
# str: relative path to JSON file
elif type(frame_data) == str:
@@ -964,7 +938,7 @@ class ArScene(DataFeatures.PipelineStepObject):
new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data))
# Loaded frame name have to be equals to dictionary key
- assert(new_frame.name == frame_name)
+ assert (new_frame.name == frame_name)
# Look for a scene layer with an AOI named like the frame
for scene_layer_name, scene_layer in self.layers.items():
@@ -976,7 +950,7 @@ class ArScene(DataFeatures.PipelineStepObject):
# Check that the frame have a layer named like this scene layer
aoi_2d_scene = new_frame.layers[scene_layer_name].aoi_scene
- # Transform 2D frame layer AOI into 3D scene layer AOI
+ # Transform 2D frame layer AOI into 3D scene layer AOI
# Then, add them to scene layer
scene_layer.aoi_scene |= aoi_2d_scene.dimensionalize(frame_3d, new_frame.size)
@@ -989,9 +963,8 @@ class ArScene(DataFeatures.PipelineStepObject):
# Edit parent
for name, frame in self.__frames.items():
-
frame.parent = self
-
+
@property
def angle_tolerance(self) -> float:
"""Angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function."""
@@ -1001,7 +974,7 @@ class ArScene(DataFeatures.PipelineStepObject):
def angle_tolerance(self, value: float):
self.__angle_tolerance = value
-
+
@property
def distance_tolerance(self) -> float:
"""Distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function."""
@@ -1039,8 +1012,9 @@ class ArScene(DataFeatures.PipelineStepObject):
raise NotImplementedError('estimate_pose() method not implemented')
@DataFeatures.PipelineStepMethod
- def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]:
- """Project layers according estimated pose and optional field of view clipping angles.
+ def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> \
+ Iterator[Union[str, AOI2DScene.AOI2DScene]]:
+ """Project layers according estimated pose and optional field of view clipping angles.
Parameters:
tvec: translation vector
@@ -1061,8 +1035,8 @@ class ArScene(DataFeatures.PipelineStepObject):
# Transform layer aoi scene into camera referential
aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec)
- # Get aoi inside vision cone field
- cone_vision_height_cm = 200 # cm
+ # Get aoi inside vision cone field
+ cone_vision_height_cm = 200 # cm
cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm
_, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm)
@@ -1077,6 +1051,7 @@ class ArScene(DataFeatures.PipelineStepObject):
# Project layer aoi scene
yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K)
+
class ArCamera(ArFrame):
"""
Define abstract Augmented Reality camera as ArFrame with ArScenes inside.
@@ -1085,7 +1060,7 @@ class ArCamera(ArFrame):
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
"""Initialize ArCamera."""
-
+
# Init ArFrame class
super().__init__()
@@ -1103,12 +1078,10 @@ class ArCamera(ArFrame):
self._layers = {}
for layer_name, layer_data in layers.items():
-
- self._layers[layer_name] = ArLayer(name = layer_name, **layer_data)
+ self._layers[layer_name] = ArLayer(name=layer_name, **layer_data)
# Edit parent
for name, layer in self._layers.items():
-
layer.parent = self
# Update expected and excluded aoi
@@ -1126,12 +1099,10 @@ class ArCamera(ArFrame):
self._scenes = {}
for scene_name, scene_data in scenes.items():
-
- self._scenes[scene_name] = ArScene(name = scene_name, **scene_data)
+ self._scenes[scene_name] = ArScene(name=scene_name, **scene_data)
# Edit parent
for name, scene in self._scenes.items():
-
scene.parent = self
# Update expected and excluded aoi
@@ -1146,7 +1117,7 @@ class ArCamera(ArFrame):
def visual_hfov(self, value: float):
"""Set camera's visual horizontal field of view."""
self.__visual_hfov = value
-
+
@property
def visual_vfov(self) -> float:
"""Angle in degree to clip scenes projection according visual vertical field of view (VFOV)."""
@@ -1156,7 +1127,7 @@ class ArCamera(ArFrame):
def visual_vfov(self, value: float):
"""Set camera's visual vertical field of view."""
self.__visual_vfov = value
-
+
def scene_frames(self) -> Iterator[ArFrame]:
"""Iterate over all scenes frames"""
@@ -1165,7 +1136,6 @@ class ArCamera(ArFrame):
# For each scene frame
for name, scene_frame in scene.frames.items():
-
yield scene_frame
def as_dict(self) -> dict:
@@ -1184,7 +1154,6 @@ class ArCamera(ArFrame):
"""
if not self._layers or not self._scenes:
-
logging.debug('ArCamera._update_expected_and_excluded_aoi %s: missing layers or scenes', self.name)
return
@@ -1214,7 +1183,7 @@ class ArCamera(ArFrame):
for frame_name, frame in scene.frames.items():
try:
-
+
expected_aoi_list.remove(frame_name)
excluded_aoi_list.append(frame_name)
@@ -1223,11 +1192,9 @@ class ArCamera(ArFrame):
continue
if layer.aoi_scan_path is not None:
-
layer.aoi_scan_path.expected_aoi = expected_aoi_list
if layer.aoi_matcher is not None:
-
layer.aoi_matcher.exclude = excluded_aoi_list
@DataFeatures.PipelineStepMethod
@@ -1275,11 +1242,11 @@ class ArCamera(ArFrame):
# TODO?: Should we prefer to use camera frame AOIMatcher object?
if aoi_2d.contains_point(timestamped_gaze_position):
-
inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position)
# QUESTION: How to project gaze precision?
- inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp)
+ inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y),
+ timestamp=timestamped_gaze_position.timestamp)
# Project inner gaze position into scene frame
scene_frame.look(inner_gaze_position * scene_frame.size)
@@ -1314,19 +1281,23 @@ class ArCamera(ArFrame):
width, height = frame.size
destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]])
mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination)
- frame.background = DataFeatures.TimestampedImage( cv2.warpPerspective(self.background, mapping, (width, height)), timestamp = self.background.timestamp)
+ frame.background = DataFeatures.TimestampedImage(
+ cv2.warpPerspective(self.background, mapping, (width, height)),
+ timestamp=self.background.timestamp)
# Ignore missing frame projection
except KeyError:
pass
+
# Define default ArContext image parameters
DEFAULT_ARCONTEXT_IMAGE_PARAMETERS = {
"draw_times": True,
"draw_exceptions": True
}
+
class ArContext(DataFeatures.PipelineStepObject):
"""
Define class to ...
@@ -1340,14 +1311,14 @@ class ArContext(DataFeatures.PipelineStepObject):
self.__catch_exceptions = True
self.__exceptions = DataFeatures.TimestampedExceptions()
- # Init gaze position processing assement
+ # Init gaze position processing assessment
self.__process_gaze_position_chrono = UtilsFeatures.TimeProbe()
self.__process_gaze_position_frequency = 0
- # Init camera image processing assement
+ # Init camera image processing assessment
self.__process_camera_image_chrono = UtilsFeatures.TimeProbe()
self.__process_camera_image_frequency = 0
-
+
# Init protected attributes
self._image_parameters = DEFAULT_ARCONTEXT_IMAGE_PARAMETERS
@@ -1360,7 +1331,7 @@ class ArContext(DataFeatures.PipelineStepObject):
@DataFeatures.PipelineStepAttributeSetter
def pipeline(self, pipeline: DataFeatures.PipelineStepObject):
- assert(issubclass(type(pipeline), DataFeatures.PipelineStepObject))
+ assert (issubclass(type(pipeline), DataFeatures.PipelineStepObject))
self.__pipeline = pipeline
@@ -1374,12 +1345,12 @@ class ArContext(DataFeatures.PipelineStepObject):
self.__catch_exceptions = catch_exceptions
- def exceptions(self) -> DataFeatures.TimestampedException:
+ def exceptions(self) -> DataFeatures.TimestampedExceptions:
"""Get exceptions list"""
return self.__exceptions
def as_dict(self) -> dict:
- """Export Arcontext properties as dictionary."""
+ """Export ArContext properties as dictionary."""
return {
**DataFeatures.PipelineStepObject.as_dict(self),
@@ -1402,7 +1373,8 @@ class ArContext(DataFeatures.PipelineStepObject):
"""Exit from ArContext."""
pass
- def _process_gaze_position(self, timestamp: int|float, x: int|float = None, y: int|float = None, precision: int|float = None):
+ def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None,
+ precision: int | float = None):
"""Request pipeline to process new gaze position at a timestamp."""
logging.debug('ArContext._process_gaze_position %s', self.name)
@@ -1411,7 +1383,6 @@ class ArContext(DataFeatures.PipelineStepObject):
lap_time, nb_laps, elapsed_time = self.__process_gaze_position_chrono.lap()
if elapsed_time > 1e3:
-
self.__process_gaze_position_frequency = nb_laps
self.__process_gaze_position_chrono.restart()
@@ -1422,12 +1393,14 @@ class ArContext(DataFeatures.PipelineStepObject):
if x is None and y is None:
# Edit empty gaze position
- self.__pipeline.look( GazeFeatures.GazePosition( timestamp = timestamp), catch_exceptions = self.__catch_exceptions )
+ self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp),
+ catch_exceptions=self.__catch_exceptions)
else:
# Edit gaze position
- self.__pipeline.look( GazeFeatures.GazePosition( (x, y), precision = precision, timestamp = timestamp), catch_exceptions = self.__catch_exceptions)
+ self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp),
+ catch_exceptions=self.__catch_exceptions)
except DataFeatures.TimestampedException as e:
@@ -1435,9 +1408,9 @@ class ArContext(DataFeatures.PipelineStepObject):
else:
- raise(TypeError('Pipeline is not ArFrame instance.'))
+ raise (TypeError('Pipeline is not ArFrame instance.'))
- def _process_camera_image(self, timestamp: int|float, image: numpy.array):
+ def _process_camera_image(self, timestamp: int | float, image: numpy.array):
"""Request pipeline to process new camera image at a timestamp."""
logging.debug('ArContext._process_camera_image %s', self.name)
@@ -1446,7 +1419,6 @@ class ArContext(DataFeatures.PipelineStepObject):
lap_time, nb_laps, elapsed_time = self.__process_camera_image_chrono.lap()
if elapsed_time > 1e3:
-
self.__process_camera_image_frequency = nb_laps
self.__process_camera_image_chrono.restart()
@@ -1456,18 +1428,20 @@ class ArContext(DataFeatures.PipelineStepObject):
# Compare image size with ArCamera frame size
if list(image.shape[0:2][::-1]) != self.__pipeline.size:
-
- logging.warning('%s._process_camera_image: image size (%i x %i) is different of ArCamera frame size (%i x %i)', DataFeatures.get_class_path(self) , width, height, self.__pipeline.size[0], self.__pipeline.size[1])
+ logging.warning(
+ '%s._process_camera_image: image size (%i x %i) is different of ArCamera frame size (%i x %i)',
+ DataFeatures.get_class_path(self), width, height, self.__pipeline.size[0], self.__pipeline.size[1])
return
try:
logging.debug('\t> watch image (%i x %i)', width, height)
- self.__pipeline.watch( DataFeatures.TimestampedImage(image, timestamp = timestamp), catch_exceptions = self.__catch_exceptions )
+ self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp),
+ catch_exceptions=self.__catch_exceptions)
# TODO: make this step optional
- self.__pipeline.map(timestamp = timestamp, catch_exceptions = self.__catch_exceptions)
+ self.__pipeline.map(timestamp=timestamp, catch_exceptions=self.__catch_exceptions)
except DataFeatures.TimestampedException as e:
@@ -1477,15 +1451,16 @@ class ArContext(DataFeatures.PipelineStepObject):
else:
- raise(TypeError('Pipeline is not ArCamera instance.'))
+ raise (TypeError('Pipeline is not ArCamera instance.'))
@DataFeatures.PipelineStepImage
def image(self, draw_times: bool = None, draw_exceptions: bool = None):
"""
- Get pipeline image with execution informations.
+ Get pipeline image with execution information.
Parameters:
- draw_exceptions: ...
+ draw_times: draw pipeline execution times
+ draw_exceptions: draw pipeline exception messages
"""
logging.debug('ArContext.image %s', self.name)
@@ -1499,9 +1474,9 @@ class ArContext(DataFeatures.PipelineStepObject):
if draw_times:
if image.is_timestamped():
-
info_stack += 1
- cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1,
+ (255, 255, 255), 1, cv2.LINE_AA)
if issubclass(type(self.__pipeline), ArCamera):
@@ -1514,7 +1489,8 @@ class ArContext(DataFeatures.PipelineStepObject):
watch_time = math.nan
info_stack += 1
- cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz',
+ (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
if issubclass(type(self.__pipeline), ArFrame):
@@ -1527,17 +1503,18 @@ class ArContext(DataFeatures.PipelineStepObject):
look_time = math.nan
info_stack += 1
- cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz',
+ (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
if draw_exceptions:
# Write exceptions
while self.__exceptions:
-
e = self.__exceptions.pop()
i = len(self.__exceptions)
- cv2.rectangle(image, (0, height-(i+1)*50), (width, height-(i)*50), (0, 0, 127), -1)
- cv2.putText(image, f'error: {e}', (20, height-(i+1)*50+25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - (i) * 50), (0, 0, 127), -1)
+ cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1,
+ (255, 255, 255), 1, cv2.LINE_AA)
return image