diff options
Diffstat (limited to 'src/argaze/ArFeatures.py')
-rw-r--r-- | src/argaze/ArFeatures.py | 265 |
1 files changed, 218 insertions, 47 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 5e219ff..6b4b182 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -28,7 +28,6 @@ import time from argaze import DataFeatures, GazeFeatures from argaze.AreaOfInterest import * from argaze.GazeAnalysis import * -from argaze.utils import Providers import numpy import cv2 @@ -53,15 +52,6 @@ class SceneProjectionFailed(Exception): super().__init__(message) -class LoadingFailed(Exception): - """ - Exception raised when attributes loading fails. - """ - - def __init__(self, message): - - super().__init__(message) - class DrawingFailed(Exception): """ Exception raised when drawing fails. @@ -263,7 +253,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if not found: - raise LoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') + raise DataFeatures.PipelineStepLoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None: @@ -473,7 +463,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Init private attributes self.__size = (1, 1) - self.__provider = None self.__gaze_position_calibrator = None self.__gaze_movement_identifier = None self.__filter_in_progress_identification = True @@ -497,24 +486,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @size.setter def size(self, size: tuple[int]): self.__size = size - - @property - def provider(self) -> DataFeatures.PipelineInputProvider: - """Provider object related to this frame.""" - return self.__provider - - @provider.setter - @DataFeatures.PipelineStepAttributeSetter - def provider(self, provider: DataFeatures.PipelineInputProvider): - - assert(issubclass(type(provider), DataFeatures.PipelineInputProvider)) - - self.__provider = provider - - # Edit parent - if self.__provider is not None: - - self.__provider.parent = self @property def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator: @@ -625,7 +596,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if not found: - raise LoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') + raise DataFeatures.PipelineStepLoadingFaile(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation if len(self.__scan_path_analyzers) > 0 and self.scan_path == None: @@ -734,7 +705,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): d = { **DataFeatures.PipelineStepObject.as_dict(self), "size": self.__size, - "provider": self.__provider, "gaze_position_calibrator": self.__gaze_position_calibrator, "gaze_movement_identifier": self.__gaze_movement_identifier, "filter_in_progress_identification": self.__filter_in_progress_identification, @@ -963,7 +933,17 @@ class ArScene(DataFeatures.PipelineStepObject): for layer_name, layer_data in layers.items(): - self._layers[layer_name] = ArLayer(name = layer_name, **layer_data) + if type(layer_data) == dict: + + self._layers[layer_name] = ArLayer(name = layer_name, **layer_data) + + # str: relative path to JSON file + elif type(layer_data) == str: + + self._layers[layer_name] = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), layer_data)) + + # Loaded layer name have to be equals to dictionary key + assert(self._layers[layer_name].name == frame_name) # Edit parent for name, layer in self._layers.items(): @@ -984,7 +964,17 @@ class ArScene(DataFeatures.PipelineStepObject): for frame_name, frame_data in frames.items(): - new_frame = ArFrame(name = frame_name, **frame_data) + if type(frame_data) == dict: + + new_frame = ArFrame(name = frame_name, **frame_data) + + # str: relative path to JSON file + elif type(frame_data) == str: + + new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data)) + + # Loaded frame name have to be equals to dictionary key + assert(new_frame.name == frame_name) # Look for a scene layer with an AOI named like the frame for scene_layer_name, scene_layer in self.layers.items(): @@ -1295,27 +1285,29 @@ class ArCamera(ArFrame): # Is there an AOI inside camera frame layers projection which its name equals to a scene frame name? for camera_layer_name, camera_layer in self.layers.items(): - try: + if camera_layer.aoi_scene: - aoi_2d = camera_layer.aoi_scene[scene_frame.name] + try: - if timestamped_gaze_position: + aoi_2d = camera_layer.aoi_scene[scene_frame.name] - # TODO?: Should we prefer to use camera frame AOIMatcher object? - if aoi_2d.contains_point(timestamped_gaze_position): + if timestamped_gaze_position: - inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position) + # TODO?: Should we prefer to use camera frame AOIMatcher object? + if aoi_2d.contains_point(timestamped_gaze_position): - # QUESTION: How to project gaze precision? - inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp) + inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position) - # Project inner gaze position into scene frame - scene_frame.look(inner_gaze_position * scene_frame.size) + # QUESTION: How to project gaze precision? + inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp) - # Ignore missing aoi in camera frame layer projection - except KeyError as e: + # Project inner gaze position into scene frame + scene_frame.look(inner_gaze_position * scene_frame.size) - pass + # Ignore missing aoi in camera frame layer projection + except KeyError as e: + + pass @DataFeatures.PipelineStepMethod def map(self): @@ -1348,3 +1340,182 @@ class ArCamera(ArFrame): except KeyError: pass + + +# Define default ArContext image parameters +DEFAULT_ARCONTEXT_IMAGE_PARAMETERS = { + "draw_exceptions": True +} + +class ArContext(DataFeatures.PipelineStepObject): + """ + Define class to ... + """ + + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + logging.debug('ArContext.__init__') + + DataFeatures.PipelineStepObject.__init__(self) + + # Init private attributes + self.__pipeline = None + self.__exceptions = DataFeatures.TimeStampedExceptions() + + # Init protected attributes + self._image_parameters = DEFAULT_ARCONTEXT_IMAGE_PARAMETERS + + @property + def pipeline(self) -> DataFeatures.PipelineStepObject: + """ArFrame used to process gaze data or ArCamera used to process gaze data and video of environment.""" + return self.__pipeline + + @pipeline.setter + @DataFeatures.PipelineStepAttributeSetter + def pipeline(self, pipeline: DataFeatures.PipelineStepObject): + + assert(issubclass(type(pipeline), DataFeatures.PipelineStepObject)) + + self.__pipeline = pipeline + + @property + def image_parameters(self) -> dict: + """Default image method parameters dictionary.""" + return self._image_parameters + + @image_parameters.setter + @DataFeatures.PipelineStepAttributeSetter + def image_parameters(self, image_parameters: dict): + + self._image_parameters = image_parameters + + def exceptions(self) -> DataFeatures.TimestampedException: + """Get exceptions list""" + return self.__exceptions + + def as_dict(self) -> dict: + """Export Arcontext properties as dictionary.""" + + return { + **DataFeatures.PipelineStepObject.as_dict(self), + "pipeline": self.__pipeline, + "image_parameters": self._image_parameters + } + + def __enter__(self): + """ + Define abstract __enter__ method to use device as a context. + + !!! warning + This method is called provided that the PipelineInput is created as a context using a with statement. + """ + return self + + def __exit__(self, type, value, traceback): + """ + Define abstract __exit__ method to use device as a context. + + !!! warning + This method is called provided that the PipelineInput is created as a context using a with statement. + """ + pass + + def _process_gaze_position(self, timestamp: int|float, x: int|float = None, y: int|float = None, precision: int|float = None): + """Request pipeline to process new gaze position at a timestamp.""" + + logging.debug('%s._process_gaze_position timestamp: %f', type(self).__name__, timestamp) + + if issubclass(type(self.__pipeline), ArFrame): + + try: + + if x is None and y is None: + + # Edit empty gaze position + self.__pipeline.look( GazeFeatures.GazePosition( timestamp = timestamp) ) + + else: + + # Edit gaze position + self.__pipeline.look( GazeFeatures.GazePosition( (x, y), precision = precision, timestamp = timestamp) ) + + except DataFeatures.TimestampedException as e: + + self.__exceptions.append(e) + + else: + + raise(TypeError('Pipeline is not ArFrame instance.')) + + def _process_camera_image(self, timestamp: int|float, image: numpy.ndarray): + """Request pipeline to process new camera image at a timestamp.""" + + logging.debug('%s._process_camera_image timestamp: %f', type(self).__name__, timestamp) + + if issubclass(type(self.__pipeline), ArCamera): + + height, width, _ = image.shape + + # Compare image size with ArCamera frame size + if width != self.__pipeline.size[0] or height != self.__pipeline.size[1]: + + logging.warning('image size (%i x %i) is different of ArCamera frame size (%i x %i)', width, height, self.__pipeline.size[0], self.__pipeline.size[1]) + return + + try: + + logging.debug('\t> watch image (%i x %i)', width, height) + + self.__pipeline.watch( image, timestamp = timestamp) + + except DataFeatures.TimestampedException as e: + + logging.warning('%s', e) + + self.__exceptions.append(e) + + else: + + raise(TypeError('Pipeline is not ArCamera instance.')) + + def __image(self, draw_exceptions: bool): + """ + Get pipeline image with execution informations. + + Parameters: + draw_exceptions: ... + """ + logging.debug('%s.__image', type(self).__name__) + + image = self.__pipeline.image() + height, width, _ = image.shape + + logging.debug('\t> get image (%i x %i)', width, height) + + if draw_exceptions: + + # Write exceptions + while self.__exceptions: + + e = self.__exceptions.pop() + i = len(self.__exceptions) + + cv2.rectangle(image, (0, height-(i+1)*50), (width, height-(i)*50), (0, 0, 127), -1) + cv2.putText(image, f'error: {e}', (20, height-(i+1)*50+25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + + return image + + def image(self, **kwargs: dict) -> numpy.array: + """ + Get pipeline image. + + Parameters: + kwargs: PipelineInput.__image parameters + """ + # Use image_parameters attribute if no kwargs + if kwargs: + + return self.__image(**kwargs) + + return self.__image(**self._image_parameters) |