aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/ArFeatures.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/argaze/ArFeatures.py')
-rw-r--r--src/argaze/ArFeatures.py265
1 files changed, 218 insertions, 47 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 5e219ff..6b4b182 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -28,7 +28,6 @@ import time
from argaze import DataFeatures, GazeFeatures
from argaze.AreaOfInterest import *
from argaze.GazeAnalysis import *
-from argaze.utils import Providers
import numpy
import cv2
@@ -53,15 +52,6 @@ class SceneProjectionFailed(Exception):
super().__init__(message)
-class LoadingFailed(Exception):
- """
- Exception raised when attributes loading fails.
- """
-
- def __init__(self, message):
-
- super().__init__(message)
-
class DrawingFailed(Exception):
"""
Exception raised when drawing fails.
@@ -263,7 +253,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if not found:
- raise LoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
+ raise DataFeatures.PipelineStepLoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None:
@@ -473,7 +463,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Init private attributes
self.__size = (1, 1)
- self.__provider = None
self.__gaze_position_calibrator = None
self.__gaze_movement_identifier = None
self.__filter_in_progress_identification = True
@@ -497,24 +486,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@size.setter
def size(self, size: tuple[int]):
self.__size = size
-
- @property
- def provider(self) -> DataFeatures.PipelineInputProvider:
- """Provider object related to this frame."""
- return self.__provider
-
- @provider.setter
- @DataFeatures.PipelineStepAttributeSetter
- def provider(self, provider: DataFeatures.PipelineInputProvider):
-
- assert(issubclass(type(provider), DataFeatures.PipelineInputProvider))
-
- self.__provider = provider
-
- # Edit parent
- if self.__provider is not None:
-
- self.__provider.parent = self
@property
def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator:
@@ -625,7 +596,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if not found:
- raise LoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
+ raise DataFeatures.PipelineStepLoadingFaile(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
if len(self.__scan_path_analyzers) > 0 and self.scan_path == None:
@@ -734,7 +705,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
d = {
**DataFeatures.PipelineStepObject.as_dict(self),
"size": self.__size,
- "provider": self.__provider,
"gaze_position_calibrator": self.__gaze_position_calibrator,
"gaze_movement_identifier": self.__gaze_movement_identifier,
"filter_in_progress_identification": self.__filter_in_progress_identification,
@@ -963,7 +933,17 @@ class ArScene(DataFeatures.PipelineStepObject):
for layer_name, layer_data in layers.items():
- self._layers[layer_name] = ArLayer(name = layer_name, **layer_data)
+ if type(layer_data) == dict:
+
+ self._layers[layer_name] = ArLayer(name = layer_name, **layer_data)
+
+ # str: relative path to JSON file
+ elif type(layer_data) == str:
+
+ self._layers[layer_name] = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), layer_data))
+
+ # Loaded layer name have to be equals to dictionary key
+ assert(self._layers[layer_name].name == frame_name)
# Edit parent
for name, layer in self._layers.items():
@@ -984,7 +964,17 @@ class ArScene(DataFeatures.PipelineStepObject):
for frame_name, frame_data in frames.items():
- new_frame = ArFrame(name = frame_name, **frame_data)
+ if type(frame_data) == dict:
+
+ new_frame = ArFrame(name = frame_name, **frame_data)
+
+ # str: relative path to JSON file
+ elif type(frame_data) == str:
+
+ new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data))
+
+ # Loaded frame name have to be equals to dictionary key
+ assert(new_frame.name == frame_name)
# Look for a scene layer with an AOI named like the frame
for scene_layer_name, scene_layer in self.layers.items():
@@ -1295,27 +1285,29 @@ class ArCamera(ArFrame):
# Is there an AOI inside camera frame layers projection which its name equals to a scene frame name?
for camera_layer_name, camera_layer in self.layers.items():
- try:
+ if camera_layer.aoi_scene:
- aoi_2d = camera_layer.aoi_scene[scene_frame.name]
+ try:
- if timestamped_gaze_position:
+ aoi_2d = camera_layer.aoi_scene[scene_frame.name]
- # TODO?: Should we prefer to use camera frame AOIMatcher object?
- if aoi_2d.contains_point(timestamped_gaze_position):
+ if timestamped_gaze_position:
- inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position)
+ # TODO?: Should we prefer to use camera frame AOIMatcher object?
+ if aoi_2d.contains_point(timestamped_gaze_position):
- # QUESTION: How to project gaze precision?
- inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp)
+ inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position)
- # Project inner gaze position into scene frame
- scene_frame.look(inner_gaze_position * scene_frame.size)
+ # QUESTION: How to project gaze precision?
+ inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp)
- # Ignore missing aoi in camera frame layer projection
- except KeyError as e:
+ # Project inner gaze position into scene frame
+ scene_frame.look(inner_gaze_position * scene_frame.size)
- pass
+ # Ignore missing aoi in camera frame layer projection
+ except KeyError as e:
+
+ pass
@DataFeatures.PipelineStepMethod
def map(self):
@@ -1348,3 +1340,182 @@ class ArCamera(ArFrame):
except KeyError:
pass
+
+
+# Define default ArContext image parameters
+DEFAULT_ARCONTEXT_IMAGE_PARAMETERS = {
+ "draw_exceptions": True
+}
+
+class ArContext(DataFeatures.PipelineStepObject):
+ """
+ Define class to ...
+ """
+
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+
+ logging.debug('ArContext.__init__')
+
+ DataFeatures.PipelineStepObject.__init__(self)
+
+ # Init private attributes
+ self.__pipeline = None
+ self.__exceptions = DataFeatures.TimeStampedExceptions()
+
+ # Init protected attributes
+ self._image_parameters = DEFAULT_ARCONTEXT_IMAGE_PARAMETERS
+
+ @property
+ def pipeline(self) -> DataFeatures.PipelineStepObject:
+ """ArFrame used to process gaze data or ArCamera used to process gaze data and video of environment."""
+ return self.__pipeline
+
+ @pipeline.setter
+ @DataFeatures.PipelineStepAttributeSetter
+ def pipeline(self, pipeline: DataFeatures.PipelineStepObject):
+
+ assert(issubclass(type(pipeline), DataFeatures.PipelineStepObject))
+
+ self.__pipeline = pipeline
+
+ @property
+ def image_parameters(self) -> dict:
+ """Default image method parameters dictionary."""
+ return self._image_parameters
+
+ @image_parameters.setter
+ @DataFeatures.PipelineStepAttributeSetter
+ def image_parameters(self, image_parameters: dict):
+
+ self._image_parameters = image_parameters
+
+ def exceptions(self) -> DataFeatures.TimestampedException:
+ """Get exceptions list"""
+ return self.__exceptions
+
+ def as_dict(self) -> dict:
+ """Export Arcontext properties as dictionary."""
+
+ return {
+ **DataFeatures.PipelineStepObject.as_dict(self),
+ "pipeline": self.__pipeline,
+ "image_parameters": self._image_parameters
+ }
+
+ def __enter__(self):
+ """
+ Define abstract __enter__ method to use device as a context.
+
+ !!! warning
+ This method is called provided that the PipelineInput is created as a context using a with statement.
+ """
+ return self
+
+ def __exit__(self, type, value, traceback):
+ """
+ Define abstract __exit__ method to use device as a context.
+
+ !!! warning
+ This method is called provided that the PipelineInput is created as a context using a with statement.
+ """
+ pass
+
+ def _process_gaze_position(self, timestamp: int|float, x: int|float = None, y: int|float = None, precision: int|float = None):
+ """Request pipeline to process new gaze position at a timestamp."""
+
+ logging.debug('%s._process_gaze_position timestamp: %f', type(self).__name__, timestamp)
+
+ if issubclass(type(self.__pipeline), ArFrame):
+
+ try:
+
+ if x is None and y is None:
+
+ # Edit empty gaze position
+ self.__pipeline.look( GazeFeatures.GazePosition( timestamp = timestamp) )
+
+ else:
+
+ # Edit gaze position
+ self.__pipeline.look( GazeFeatures.GazePosition( (x, y), precision = precision, timestamp = timestamp) )
+
+ except DataFeatures.TimestampedException as e:
+
+ self.__exceptions.append(e)
+
+ else:
+
+ raise(TypeError('Pipeline is not ArFrame instance.'))
+
+ def _process_camera_image(self, timestamp: int|float, image: numpy.ndarray):
+ """Request pipeline to process new camera image at a timestamp."""
+
+ logging.debug('%s._process_camera_image timestamp: %f', type(self).__name__, timestamp)
+
+ if issubclass(type(self.__pipeline), ArCamera):
+
+ height, width, _ = image.shape
+
+ # Compare image size with ArCamera frame size
+ if width != self.__pipeline.size[0] or height != self.__pipeline.size[1]:
+
+ logging.warning('image size (%i x %i) is different of ArCamera frame size (%i x %i)', width, height, self.__pipeline.size[0], self.__pipeline.size[1])
+ return
+
+ try:
+
+ logging.debug('\t> watch image (%i x %i)', width, height)
+
+ self.__pipeline.watch( image, timestamp = timestamp)
+
+ except DataFeatures.TimestampedException as e:
+
+ logging.warning('%s', e)
+
+ self.__exceptions.append(e)
+
+ else:
+
+ raise(TypeError('Pipeline is not ArCamera instance.'))
+
+ def __image(self, draw_exceptions: bool):
+ """
+ Get pipeline image with execution informations.
+
+ Parameters:
+ draw_exceptions: ...
+ """
+ logging.debug('%s.__image', type(self).__name__)
+
+ image = self.__pipeline.image()
+ height, width, _ = image.shape
+
+ logging.debug('\t> get image (%i x %i)', width, height)
+
+ if draw_exceptions:
+
+ # Write exceptions
+ while self.__exceptions:
+
+ e = self.__exceptions.pop()
+ i = len(self.__exceptions)
+
+ cv2.rectangle(image, (0, height-(i+1)*50), (width, height-(i)*50), (0, 0, 127), -1)
+ cv2.putText(image, f'error: {e}', (20, height-(i+1)*50+25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+
+ return image
+
+ def image(self, **kwargs: dict) -> numpy.array:
+ """
+ Get pipeline image.
+
+ Parameters:
+ kwargs: PipelineInput.__image parameters
+ """
+ # Use image_parameters attribute if no kwargs
+ if kwargs:
+
+ return self.__image(**kwargs)
+
+ return self.__image(**self._image_parameters)