diff options
42 files changed, 1376 insertions, 1427 deletions
@@ -2,6 +2,7 @@ __pycache__ _export _projects +_venv _workaround dist site diff --git a/docs/css/extra.css b/docs/css/extra.css index b0c7aef..a6b57a7 100644 --- a/docs/css/extra.css +++ b/docs/css/extra.css @@ -12,8 +12,8 @@ a {color: #0299D2;} -.doc > code {color: #0299D2; background: none; border: 0px; font-size: 100%; padding: 0;} -.doc-contents {margin: 0px 0px 0px 1%;} +.doc > code {color: #0299D2; background: none; border: 0; font-size: 100%; padding: 0;} +.doc-contents {margin: 0 0 0 1%;} .doc-module > code {color: #404040;} .doc-class > code {color: #0299D2;} @@ -24,7 +24,7 @@ a {color: #0299D2;} .docutils th, p {color: dimgray;} .docutils p {color: dimgray;} -.doc-label > code {border: 0px; border-radius: 15px; padding: 2px 8px; font-weight: bold; color: white;} +.doc-label > code {border: 0; border-radius: 15px; padding: 2px 8px; font-weight: bold; color: white;} .doc-label-dataclass > code {background: none; color: dimgray;} .doc-label-classmethod > code {background: none; color: dimgray;} diff --git a/pyproject.toml b/pyproject.toml index b0f0765..6690224 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,50 @@ [build-system] -requires = ["setuptools>=42"] +requires = [ + "setuptools", +] build-backend = "setuptools.build_meta" + +[project] +name = "argaze" +authors = [ + {name = 'ACHIL laboratory at Ecole Nationale de l\'Aviation Civile (ENAC)', email = 'achil@recherche.enac.fr'} +] +urls=[ + 'https://git.recherche.enac.fr/projects/argaze/' +] +description = "A modular real-time and post-processing gaze analysis software library." +readme= "README.md" +requires-python = ">=3.11" +keywords=[ + "eye tracking", + "gaze analysis", + "real-time", + "modular" +] +license = {text = "GPLv3"} +classifiers=[ + 'Development Status :: 3 - Alpha', + 'Intended Audience :: Developers', + 'Intended Audience :: Science/Research', + 'Topic :: Scientific/Engineering :: Human Machine Interfaces', + 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', + 'Programming Language :: Python :: 3', + 'Operating System :: OS Independent' +] +dependencies = [ + 'opencv-python>=4.7.0', + 'opencv-contrib-python>=4.7.0', + 'numpy', + 'pandas', + 'av', + 'colorama', + 'matplotlib', + 'shapely', + 'lempel_ziv_complexity', + 'scipy', + 'scikit-learn' +] +dynamic = ["version"] + +[project.scripts] +demo="argaze ./src/argaze/utils/demo/ opencv_window_context_setup.json"
\ No newline at end of file diff --git a/setup.py b/setup.py deleted file mode 100644 index f0e4bd4..0000000 --- a/setup.py +++ /dev/null @@ -1,44 +0,0 @@ -from setuptools import setup, find_packages -import pathlib - -here = pathlib.Path(__file__).parent.resolve() - -# get the long description from the README file -long_description = (here / 'README.md').read_text(encoding='utf-8') - -setup( - name='argaze', - version='0.0.4', - - description='A Python toolkit for gaze processing in AR environnement', - long_description=long_description, - long_description_content_type='text/markdown', - - url='https://git.recherche.enac.fr/projects/argaze/', - author='ACHIL laboratory at Ecole Nationale de l\'Aviation Civile (ENAC)', - author_email='achil@recherche.enac.fr', - - # see https://pypi.org/classifiers/ - classifiers=[ - 'Development Status :: 2 - Pre-Alpha', - 'Intended Audience :: Developers', - 'Intended Audience :: Science/Research', - 'Topic :: Scientific/Engineering :: Human Machine Interfaces', - 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', - 'Programming Language :: Python :: 3', - 'Operating System :: OS Independent' - ], - - keywords='eye tracking, gaze, aruco, augmented reality', - - package_dir={'':'src'}, - packages=find_packages(where='src'), - - python_requires='>=3.11', - install_requires=['opencv-python>=4.7.0', 'opencv-contrib-python>=4.7.0', 'numpy', 'pandas', 'colorama', 'matplotlib', 'shapely', 'lempel_ziv_complexity', 'scipy', 'scikit-learn'], - - project_urls={ - 'Bug Reports': 'https://git.recherche.enac.fr/projects/argaze/issues', - 'Source': 'https://git.recherche.enac.fr/projects/argaze/repository', - }, -)
\ No newline at end of file diff --git a/src/argaze.test/ArUcoMarkers/ArUcoScene.py b/src/argaze.test/ArUcoMarkers/ArUcoScene.py index 7072486..68c8d4d 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoScene.py @@ -48,7 +48,7 @@ class TestArUcoMarkersGroupClass(unittest.TestCase): def setup_markers(self): # Prepare detected markers - self.detected_markers() = { + self.detected_markers = { 0: ArUcoMarker.ArUcoMarker('DICT_ARUCO_ORIGINAL', 0, 1.), 1: ArUcoMarker.ArUcoMarker('DICT_ARUCO_ORIGINAL', 1, 1.), 2: ArUcoMarker.ArUcoMarker('DICT_ARUCO_ORIGINAL', 2, 1.), diff --git a/src/argaze.test/PupillAnalysis/WorkloadIndex.py b/src/argaze.test/PupillAnalysis/WorkloadIndex.py index 92857fa..da9b72d 100644 --- a/src/argaze.test/PupillAnalysis/WorkloadIndex.py +++ b/src/argaze.test/PupillAnalysis/WorkloadIndex.py @@ -19,8 +19,8 @@ __license__ = "GPLv3" import unittest import math -from argaze import PupillFeatures -from argaze.PupillAnalysis import WorkloadIndex +from argaze import PupilFeatures +from argaze.PupilAnalysis import WorkloadIndex class TestWorkloadIndexClass(unittest.TestCase): """Test WorkloadIndex class.""" diff --git a/src/argaze.test/PupillFeatures.py b/src/argaze.test/PupillFeatures.py index f1abb66..b0cf65d 100644 --- a/src/argaze.test/PupillFeatures.py +++ b/src/argaze.test/PupillFeatures.py @@ -19,7 +19,7 @@ __license__ = "GPLv3" import unittest import math -from argaze import PupillFeatures +from argaze import PupilFeatures import numpy diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 3b05482..7cc1b9d 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -34,35 +34,36 @@ from argaze.utils import UtilsFeatures import numpy import cv2 + class PoseEstimationFailed(Exception): """ - Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. + Exception raised by ArScene estimate_pose method when the pose can't be estimated due to inconsistencies. """ - def __init__(self, message, unconsistencies=None): - + def __init__(self, message, inconsistencies=None): super().__init__(message) - self.unconsistencies = unconsistencies + self.inconsistencies = inconsistencies + class SceneProjectionFailed(Exception): """ Exception raised by ArCamera watch method when the scene can't be projected. """ - def __init__(self, message): - + def __init__(self, message): super().__init__(message) + class DrawingFailed(Exception): """ Exception raised when drawing fails. """ - def __init__(self, message): - + def __init__(self, message): super().__init__(message) + # Define default ArLayer draw parameters DEFAULT_ARLAYER_DRAW_PARAMETERS = { "draw_aoi_scene": { @@ -92,9 +93,10 @@ DEFAULT_ARLAYER_DRAW_PARAMETERS = { } } + class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ - Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed. + Defines a space where to make matching of gaze movements and AOI and inside which those matching need to be analyzed. !!! note Inherits from DataFeatures.SharedObject class to be shared by multiple threads. @@ -118,14 +120,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Init pipeline step object attributes self.draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS - + @property def aoi_scene(self) -> AOIFeatures.AOIScene: """AOI scene description.""" return self.__aoi_scene @aoi_scene.setter - def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene|str|dict): + def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene | str | dict): if issubclass(type(aoi_scene_value), AOIFeatures.AOIScene): @@ -139,7 +141,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # JSON file format for 2D or 3D dimension if file_format == 'json': - new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath) # SVG file format for 2D dimension only @@ -168,7 +169,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Edit parent if self.__aoi_scene is not None: - self.__aoi_scene.parent = self @property @@ -180,15 +180,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def aoi_matcher(self, aoi_matcher: GazeFeatures.AOIMatcher): - assert(issubclass(type(aoi_matcher), GazeFeatures.AOIMatcher)) + assert (issubclass(type(aoi_matcher), GazeFeatures.AOIMatcher)) self.__aoi_matcher = aoi_matcher # Edit parent if self.__aoi_matcher is not None: - self.__aoi_matcher.parent = self - + @property def aoi_scan_path(self) -> GazeFeatures.AOIScanPath: """AOI scan path object.""" @@ -198,7 +197,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def aoi_scan_path(self, aoi_scan_path: GazeFeatures.AOIScanPath): - assert(isinstance(aoi_scan_path, GazeFeatures.AOIScanPath)) + assert (isinstance(aoi_scan_path, GazeFeatures.AOIScanPath)) self.__aoi_scan_path = aoi_scan_path @@ -207,9 +206,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Edit parent if self.__aoi_scan_path is not None: - self.__aoi_scan_path.parent = self - + @property def aoi_scan_path_analyzers(self) -> list: """AOI scan path analyzers list.""" @@ -224,7 +222,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Connect analyzers if required for analyzer in self.__aoi_scan_path_analyzers: - assert(issubclass(type(analyzer), GazeFeatures.AOIScanPathAnalyzer)) + assert (issubclass(type(analyzer), GazeFeatures.AOIScanPathAnalyzer)) # Check scan path analyzer properties type for name, item in type(analyzer).__dict__.items(): @@ -238,7 +236,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): except KeyError: - raise(ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) + raise (ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) if issubclass(property_type, GazeFeatures.AOIScanPathAnalyzer): @@ -248,28 +246,25 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): for a in self.__aoi_scan_path_analyzers: if type(a) == property_type: - setattr(analyzer, name, a) found = True if not found: - - raise DataFeatures.PipelineStepLoadingFailed(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') + raise DataFeatures.PipelineStepLoadingFailed( + f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None: - - self.scan_path = GazeFeatures.ScanPath() + self.__aoi_scan_path = GazeFeatures.ScanPath() # Edit parent for analyzer in self.__aoi_scan_path_analyzers: - analyzer.parent = self def last_looked_aoi_name(self) -> str: """Get last looked aoi name.""" return self.__looked_aoi_name - + def is_analysis_available(self) -> bool: """Are aoi scan path analysis ready?""" return self.__aoi_scan_path_analyzed @@ -279,7 +274,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): analysis = {} for analyzer in self.__aoi_scan_path_analyzers: - analysis[DataFeatures.get_class_path(analyzer)] = analyzer.analysis() return analysis @@ -300,19 +294,17 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """Update expected AOI of AOI scan path considering AOI scene and layer name.""" if self.__aoi_scene is None: - logging.debug('ArLayer._update_expected_aoi %s (parent: %s): missing aoi scene', self.name, self.parent) return logging.debug('ArLayer._update_expected_aoi %s (parent: %s)', self.name, self.parent) - # Get aoi names from aoi scene + # Get aoi names from aoi scene expected_aoi = list(self.__aoi_scene.keys()) # Remove layer name from expected aoi if self.name in expected_aoi: - expected_aoi.remove(self.name) # Update expected aoi: this will clear the scan path @@ -345,9 +337,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__aoi_scan_path_analyzed = False if self.__aoi_matcher is not None and self.__aoi_scene is not None: - # Update looked aoi thanks to aoi matcher - # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally + # Note: don't filter valid/invalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally self.__looked_aoi_name, _ = self.__aoi_matcher.match(gaze_movement, self.__aoi_scene) logging.debug('\t> looked aoi name: %s', self.__looked_aoi_name) @@ -372,7 +363,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Analyze aoi scan path for aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers: - aoi_scan_path_analyzer.analyze(self.__aoi_scan_path, timestamp=gaze_movement.timestamp) # Update aoi scan path analyzed state @@ -382,7 +372,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Append saccade to aoi scan path if self.__aoi_scan_path is not None: - logging.debug('\t> append saccade') self.__aoi_scan_path.append_saccade(gaze_movement) @@ -393,8 +382,10 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Draw into image. Parameters: + image: image where to draw. draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn) - draw_aoi_matching: AOIMatcher.draw parameters (which depends of the loaded aoi matcher module, if None, no aoi matching is drawn) + draw_aoi_matching: AOIMatcher.draw parameters (which depends on the loaded aoi matcher module, + if None, no aoi matching is drawn) """ # Use layer lock feature @@ -402,14 +393,13 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Draw aoi if required if draw_aoi_scene is not None and self.__aoi_scene is not None: - self.__aoi_scene.draw(image, **draw_aoi_scene) # Draw aoi matching if required if draw_aoi_matching is not None and self.__aoi_matcher is not None: - self.__aoi_matcher.draw(image, self.__aoi_scene, **draw_aoi_matching) + # Define default ArFrame image parameters DEFAULT_ARFRAME_IMAGE_PARAMETERS = { "background_weight": 1., @@ -431,6 +421,7 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = { } } + class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. @@ -453,7 +444,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__filter_in_progress_identification = True self.__scan_path = None self.__scan_path_analyzers = [] - self.__background = DataFeatures.TimestampedImage( numpy.full((1, 1, 3), 127).astype(numpy.uint8) ) + self.__background = DataFeatures.TimestampedImage(numpy.full((1, 1, 3), 127).astype(numpy.uint8)) self.__heatmap = None self.__calibrated_gaze_position = GazeFeatures.GazePosition() self.__identified_gaze_movement = GazeFeatures.GazeMovement() @@ -464,32 +455,31 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self._image_parameters = DEFAULT_ARFRAME_IMAGE_PARAMETERS @property - def size(self) -> tuple[int]: + def size(self) -> tuple[int, int]: """Defines the dimension of the rectangular area where gaze positions are projected.""" return self.__size @size.setter - def size(self, size: tuple[int]): + def size(self, size: tuple[int, int]): self.__size = size - + @property def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator: - """Select gaze position calibration algoritm.""" + """Select gaze position calibration algorithm.""" return self.__gaze_position_calibrator @gaze_position_calibrator.setter @DataFeatures.PipelineStepAttributeSetter def gaze_position_calibrator(self, gaze_position_calibrator: GazeFeatures.GazePositionCalibrator): - assert(issubclass(type(gaze_position_calibrator), GazeFeatures.GazePositionCalibrator)) + assert (issubclass(type(gaze_position_calibrator), GazeFeatures.GazePositionCalibrator)) self.__gaze_position_calibrator = gaze_position_calibrator # Edit parent if self.__gaze_position_calibrator is not None: - self.__gaze_position_calibrator.parent = self - + @property def gaze_movement_identifier(self) -> GazeFeatures.GazeMovementIdentifier: """Select gaze movement identification algorithm.""" @@ -499,15 +489,14 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def gaze_movement_identifier(self, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier): - assert(issubclass(type(gaze_movement_identifier), GazeFeatures.GazeMovementIdentifier)) + assert (issubclass(type(gaze_movement_identifier), GazeFeatures.GazeMovementIdentifier)) self.__gaze_movement_identifier = gaze_movement_identifier # Edit parent if self.__gaze_movement_identifier is not None: - self.__gaze_movement_identifier.parent = self - + @property def filter_in_progress_identification(self) -> bool: """Is frame ignores in progress gaze movement identification?""" @@ -526,15 +515,14 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @scan_path.setter @DataFeatures.PipelineStepAttributeSetter - def scan_path(self, scan_path: GazeFeatures.ScanPath) -> GazeFeatures.ScanPath: + def scan_path(self, scan_path: GazeFeatures.ScanPath): - assert(isinstance(scan_path, GazeFeatures.ScanPath)) + assert (isinstance(scan_path, GazeFeatures.ScanPath)) self.__scan_path = scan_path # Edit parent if self.__scan_path is not None: - self.__scan_path.parent = self @property @@ -551,7 +539,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Connect analyzers if required for analyzer in self.__scan_path_analyzers: - assert(issubclass(type(analyzer), GazeFeatures.ScanPathAnalyzer)) + assert (issubclass(type(analyzer), GazeFeatures.ScanPathAnalyzer)) # Check scan path analyzer properties type for name, item in type(analyzer).__dict__.items(): @@ -565,7 +553,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): except KeyError: - raise(ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) + raise (ValueError(f'Missing annotations in {item.fset.__name__}: {item.fset.__annotations__}')) if issubclass(property_type, GazeFeatures.AOIScanPathAnalyzer): @@ -575,22 +563,19 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): for a in self.__scan_path_analyzers: if type(a) == property_type: - setattr(analyzer, name, a) found = True if not found: - - raise DataFeatures.PipelineStepLoadingFaile(f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') + raise DataFeatures.PipelineStepLoadingFaile( + f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation - if len(self.__scan_path_analyzers) > 0 and self.scan_path == None: - - self.scan_path = GazeFeatures.ScanPath() + if len(self.__scan_path_analyzers) > 0 and self.__scan_path == None: + self.__scan_path = GazeFeatures.ScanPath() # Edit parent for analyzer in self.__scan_path_analyzers: - analyzer.parent = self @property @@ -602,12 +587,13 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def background(self, background: DataFeatures.TimestampedImage): - assert(isinstance(background, DataFeatures.TimestampedImage)) + assert (isinstance(background, DataFeatures.TimestampedImage)) if background.size != self.size: # Resize image to frame size - self.__background = DataFeatures.TimestampedImage( cv2.resize(background, dsize = self.size, interpolation = cv2.INTER_CUBIC), background.timestamp) + self.__background = DataFeatures.TimestampedImage( + cv2.resize(background, dsize=self.size, interpolation=cv2.INTER_CUBIC), background.timestamp) else: @@ -622,20 +608,18 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def heatmap(self, heatmap: AOIFeatures.Heatmap): - assert(isinstance(heatmap, AOIFeatures.Heatmap)) + assert (isinstance(heatmap, AOIFeatures.Heatmap)) self.__heatmap = heatmap # Default heatmap size equals frame size if self.__heatmap.size == (1, 1): - self.__heatmap.size = self.size # Edit parent if self.__heatmap is not None: - self.__heatmap.parent = self - + @property def layers(self) -> dict: """Layers dictionary.""" @@ -648,12 +632,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self._layers = {} for layer_name, layer_data in layers.items(): - - self._layers[layer_name] = ArLayer(name = layer_name, **layer_data) + self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # Edit parent for name, layer in self._layers.items(): - layer.parent = self def last_gaze_position(self) -> object: @@ -673,7 +655,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): analysis = {} for analyzer in self.__scan_path_analyzers: - analysis[DataFeatures.get_class_path(analyzer)] = analyzer.analysis() return analysis @@ -701,7 +682,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return d @DataFeatures.PipelineStepMethod - def look(self, timestamped_gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()) -> Iterator[Union[object, type, dict]]: + def look(self, timestamped_gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): """ Project timestamped gaze position into frame. @@ -733,60 +714,60 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Identify gaze movement if self.__gaze_movement_identifier is not None: - # Identify finished gaze movement - self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(self.__calibrated_gaze_position) + self.__identified_gaze_movement = self.__gaze_movement_identifier.identify( + self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified if self.__identified_gaze_movement and self.__identified_gaze_movement.is_finished(): - + if GazeFeatures.is_fixation(self.__identified_gaze_movement): - + # Append fixation to scan path if self.__scan_path is not None: - self.__scan_path.append_fixation(self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): - + # Append saccade to scan path if self.__scan_path is not None: - + scan_step = self.__scan_path.append_saccade(self.__identified_gaze_movement) # Is there a new step? if scan_step and len(self.__scan_path) > 1: - + # Analyze aoi scan path for scan_path_analyzer in self.__scan_path_analyzers: - - scan_path_analyzer.analyze(self.__scan_path, timestamp=self.__identified_gaze_movement.timestamp) + scan_path_analyzer.analyze(self.__scan_path, + timestamp=self.__identified_gaze_movement.timestamp) # Update scan path analyzed state self.__scan_path_analyzed = True - # No valid finished gaze movement: optionnaly stop in progress identification filtering + # No valid finished gaze movement: optionally stop in progress identification filtering elif self.__gaze_movement_identifier is not None and not self.__filter_in_progress_identification: self.__identified_gaze_movement = self.__gaze_movement_identifier.current_gaze_movement() # Update heatmap if self.__heatmap is not None: - # Scale gaze position value scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image - self.__heatmap.update(self.__calibrated_gaze_position * scale, timestamp=self.__calibrated_gaze_position.timestamp) + self.__heatmap.update(self.__calibrated_gaze_position * scale, + timestamp=self.__calibrated_gaze_position.timestamp) # Look layers with valid identified gaze movement - # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally + # Note: don't filter valid/invalid finished/unfinished gaze movement to allow layers to reset internally for layer_name, layer in self._layers.items(): - layer.look(self.__identified_gaze_movement) @DataFeatures.PipelineStepImage - def image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: + def image(self, background_weight: float = None, heatmap_weight: float = None, + draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, + draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. @@ -838,14 +819,12 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Draw gaze position calibrator if draw_gaze_position_calibrator is not None: - logging.debug('\t> drawing gaze position calibrator') self.__gaze_position_calibrator.draw(image, size=self.__size, **draw_gaze_position_calibrator) # Draw scan path if required if draw_scan_path is not None and self.__scan_path is not None: - logging.debug('\t> drawing scan path') self.__scan_path.draw(image, **draw_scan_path) @@ -854,7 +833,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if draw_fixations is not None and self.__gaze_movement_identifier is not None: if self.__gaze_movement_identifier.current_fixation(): - logging.debug('\t> drawing current fixation') self.__gaze_movement_identifier.current_fixation().draw(image, **draw_fixations) @@ -863,7 +841,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): if draw_saccades is not None and self.__gaze_movement_identifier is not None: if self.__gaze_movement_identifier.current_saccade(): - logging.debug('\t> drawing current saccade') self.__gaze_movement_identifier.current_saccade().draw(image, **draw_saccades) @@ -881,24 +858,24 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): except KeyError: - raise(DrawingFailed(f'\'{layer_name}\' layer doesn\'t exist.')) + raise (DrawingFailed(f'\'{layer_name}\' layer doesn\'t exist.')) # Draw current gaze position if required if draw_gaze_positions is not None: - logging.debug('\t> drawing current gaze position') self.__calibrated_gaze_position.draw(image, **draw_gaze_positions) logging.debug('\t> returning image (%i x %i)', image.shape[1], image.shape[0]) - return DataFeatures.TimestampedImage(image, timestamp = self.__background.timestamp) + return DataFeatures.TimestampedImage(image, timestamp=self.__background.timestamp) + class ArScene(DataFeatures.PipelineStepObject): """ Define abstract Augmented Reality scene with ArLayers and ArFrames inside. """ - + @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArScene""" @@ -906,8 +883,8 @@ class ArScene(DataFeatures.PipelineStepObject): # Init private attributes self._layers = {} self.__frames = {} - self.__angle_tolerance = 0, - self.__distance_tolerance = 0, + self.__angle_tolerance = 0. + self.__distance_tolerance = 0. @property def layers(self) -> dict: @@ -917,7 +894,7 @@ class ArScene(DataFeatures.PipelineStepObject): @layers.setter @DataFeatures.PipelineStepAttributeSetter - def layers(self, layers:dict): + def layers(self, layers: dict): self._layers = {} @@ -925,21 +902,18 @@ class ArScene(DataFeatures.PipelineStepObject): if type(layer_data) == dict: - self._layers[layer_name] = ArLayer(name = layer_name, **layer_data) + self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # str: relative path to JSON file elif type(layer_data) == str: - self._layers[layer_name] = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), layer_data)) - - # Loaded layer name have to be equals to dictionary key - assert(self._layers[layer_name].name == frame_name) + self._layers[layer_name] = DataFeatures.from_json( + os.path.join(DataFeatures.get_working_directory(), layer_data)) # Edit parent for name, layer in self._layers.items(): - layer.parent = self - + @property def frames(self) -> dict: """Dictionary of ArFrames to project once the pose is estimated. @@ -956,7 +930,7 @@ class ArScene(DataFeatures.PipelineStepObject): if type(frame_data) == dict: - new_frame = ArFrame(name = frame_name, **frame_data) + new_frame = ArFrame(name=frame_name, **frame_data) # str: relative path to JSON file elif type(frame_data) == str: @@ -964,7 +938,7 @@ class ArScene(DataFeatures.PipelineStepObject): new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data)) # Loaded frame name have to be equals to dictionary key - assert(new_frame.name == frame_name) + assert (new_frame.name == frame_name) # Look for a scene layer with an AOI named like the frame for scene_layer_name, scene_layer in self.layers.items(): @@ -976,7 +950,7 @@ class ArScene(DataFeatures.PipelineStepObject): # Check that the frame have a layer named like this scene layer aoi_2d_scene = new_frame.layers[scene_layer_name].aoi_scene - # Transform 2D frame layer AOI into 3D scene layer AOI + # Transform 2D frame layer AOI into 3D scene layer AOI # Then, add them to scene layer scene_layer.aoi_scene |= aoi_2d_scene.dimensionalize(frame_3d, new_frame.size) @@ -989,9 +963,8 @@ class ArScene(DataFeatures.PipelineStepObject): # Edit parent for name, frame in self.__frames.items(): - frame.parent = self - + @property def angle_tolerance(self) -> float: """Angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.""" @@ -1001,7 +974,7 @@ class ArScene(DataFeatures.PipelineStepObject): def angle_tolerance(self, value: float): self.__angle_tolerance = value - + @property def distance_tolerance(self) -> float: """Distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.""" @@ -1039,8 +1012,9 @@ class ArScene(DataFeatures.PipelineStepObject): raise NotImplementedError('estimate_pose() method not implemented') @DataFeatures.PipelineStepMethod - def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]: - """Project layers according estimated pose and optional field of view clipping angles. + def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> \ + Iterator[Union[str, AOI2DScene.AOI2DScene]]: + """Project layers according estimated pose and optional field of view clipping angles. Parameters: tvec: translation vector @@ -1061,8 +1035,8 @@ class ArScene(DataFeatures.PipelineStepObject): # Transform layer aoi scene into camera referential aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec) - # Get aoi inside vision cone field - cone_vision_height_cm = 200 # cm + # Get aoi inside vision cone field + cone_vision_height_cm = 200 # cm cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) @@ -1077,6 +1051,7 @@ class ArScene(DataFeatures.PipelineStepObject): # Project layer aoi scene yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) + class ArCamera(ArFrame): """ Define abstract Augmented Reality camera as ArFrame with ArScenes inside. @@ -1085,7 +1060,7 @@ class ArCamera(ArFrame): @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArCamera.""" - + # Init ArFrame class super().__init__() @@ -1103,12 +1078,10 @@ class ArCamera(ArFrame): self._layers = {} for layer_name, layer_data in layers.items(): - - self._layers[layer_name] = ArLayer(name = layer_name, **layer_data) + self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # Edit parent for name, layer in self._layers.items(): - layer.parent = self # Update expected and excluded aoi @@ -1126,12 +1099,10 @@ class ArCamera(ArFrame): self._scenes = {} for scene_name, scene_data in scenes.items(): - - self._scenes[scene_name] = ArScene(name = scene_name, **scene_data) + self._scenes[scene_name] = ArScene(name=scene_name, **scene_data) # Edit parent for name, scene in self._scenes.items(): - scene.parent = self # Update expected and excluded aoi @@ -1146,7 +1117,7 @@ class ArCamera(ArFrame): def visual_hfov(self, value: float): """Set camera's visual horizontal field of view.""" self.__visual_hfov = value - + @property def visual_vfov(self) -> float: """Angle in degree to clip scenes projection according visual vertical field of view (VFOV).""" @@ -1156,7 +1127,7 @@ class ArCamera(ArFrame): def visual_vfov(self, value: float): """Set camera's visual vertical field of view.""" self.__visual_vfov = value - + def scene_frames(self) -> Iterator[ArFrame]: """Iterate over all scenes frames""" @@ -1165,7 +1136,6 @@ class ArCamera(ArFrame): # For each scene frame for name, scene_frame in scene.frames.items(): - yield scene_frame def as_dict(self) -> dict: @@ -1184,7 +1154,6 @@ class ArCamera(ArFrame): """ if not self._layers or not self._scenes: - logging.debug('ArCamera._update_expected_and_excluded_aoi %s: missing layers or scenes', self.name) return @@ -1214,7 +1183,7 @@ class ArCamera(ArFrame): for frame_name, frame in scene.frames.items(): try: - + expected_aoi_list.remove(frame_name) excluded_aoi_list.append(frame_name) @@ -1223,11 +1192,9 @@ class ArCamera(ArFrame): continue if layer.aoi_scan_path is not None: - layer.aoi_scan_path.expected_aoi = expected_aoi_list if layer.aoi_matcher is not None: - layer.aoi_matcher.exclude = excluded_aoi_list @DataFeatures.PipelineStepMethod @@ -1275,11 +1242,11 @@ class ArCamera(ArFrame): # TODO?: Should we prefer to use camera frame AOIMatcher object? if aoi_2d.contains_point(timestamped_gaze_position): - inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position) # QUESTION: How to project gaze precision? - inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp) + inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), + timestamp=timestamped_gaze_position.timestamp) # Project inner gaze position into scene frame scene_frame.look(inner_gaze_position * scene_frame.size) @@ -1314,19 +1281,23 @@ class ArCamera(ArFrame): width, height = frame.size destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]]) mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination) - frame.background = DataFeatures.TimestampedImage( cv2.warpPerspective(self.background, mapping, (width, height)), timestamp = self.background.timestamp) + frame.background = DataFeatures.TimestampedImage( + cv2.warpPerspective(self.background, mapping, (width, height)), + timestamp=self.background.timestamp) # Ignore missing frame projection except KeyError: pass + # Define default ArContext image parameters DEFAULT_ARCONTEXT_IMAGE_PARAMETERS = { "draw_times": True, "draw_exceptions": True } + class ArContext(DataFeatures.PipelineStepObject): """ Define class to ... @@ -1340,14 +1311,14 @@ class ArContext(DataFeatures.PipelineStepObject): self.__catch_exceptions = True self.__exceptions = DataFeatures.TimestampedExceptions() - # Init gaze position processing assement + # Init gaze position processing assessment self.__process_gaze_position_chrono = UtilsFeatures.TimeProbe() self.__process_gaze_position_frequency = 0 - # Init camera image processing assement + # Init camera image processing assessment self.__process_camera_image_chrono = UtilsFeatures.TimeProbe() self.__process_camera_image_frequency = 0 - + # Init protected attributes self._image_parameters = DEFAULT_ARCONTEXT_IMAGE_PARAMETERS @@ -1360,7 +1331,7 @@ class ArContext(DataFeatures.PipelineStepObject): @DataFeatures.PipelineStepAttributeSetter def pipeline(self, pipeline: DataFeatures.PipelineStepObject): - assert(issubclass(type(pipeline), DataFeatures.PipelineStepObject)) + assert (issubclass(type(pipeline), DataFeatures.PipelineStepObject)) self.__pipeline = pipeline @@ -1374,12 +1345,12 @@ class ArContext(DataFeatures.PipelineStepObject): self.__catch_exceptions = catch_exceptions - def exceptions(self) -> DataFeatures.TimestampedException: + def exceptions(self) -> DataFeatures.TimestampedExceptions: """Get exceptions list""" return self.__exceptions def as_dict(self) -> dict: - """Export Arcontext properties as dictionary.""" + """Export ArContext properties as dictionary.""" return { **DataFeatures.PipelineStepObject.as_dict(self), @@ -1402,7 +1373,8 @@ class ArContext(DataFeatures.PipelineStepObject): """Exit from ArContext.""" pass - def _process_gaze_position(self, timestamp: int|float, x: int|float = None, y: int|float = None, precision: int|float = None): + def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None, + precision: int | float = None): """Request pipeline to process new gaze position at a timestamp.""" logging.debug('ArContext._process_gaze_position %s', self.name) @@ -1411,7 +1383,6 @@ class ArContext(DataFeatures.PipelineStepObject): lap_time, nb_laps, elapsed_time = self.__process_gaze_position_chrono.lap() if elapsed_time > 1e3: - self.__process_gaze_position_frequency = nb_laps self.__process_gaze_position_chrono.restart() @@ -1422,12 +1393,14 @@ class ArContext(DataFeatures.PipelineStepObject): if x is None and y is None: # Edit empty gaze position - self.__pipeline.look( GazeFeatures.GazePosition( timestamp = timestamp), catch_exceptions = self.__catch_exceptions ) + self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp), + catch_exceptions=self.__catch_exceptions) else: # Edit gaze position - self.__pipeline.look( GazeFeatures.GazePosition( (x, y), precision = precision, timestamp = timestamp), catch_exceptions = self.__catch_exceptions) + self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp), + catch_exceptions=self.__catch_exceptions) except DataFeatures.TimestampedException as e: @@ -1435,9 +1408,9 @@ class ArContext(DataFeatures.PipelineStepObject): else: - raise(TypeError('Pipeline is not ArFrame instance.')) + raise (TypeError('Pipeline is not ArFrame instance.')) - def _process_camera_image(self, timestamp: int|float, image: numpy.array): + def _process_camera_image(self, timestamp: int | float, image: numpy.array): """Request pipeline to process new camera image at a timestamp.""" logging.debug('ArContext._process_camera_image %s', self.name) @@ -1446,7 +1419,6 @@ class ArContext(DataFeatures.PipelineStepObject): lap_time, nb_laps, elapsed_time = self.__process_camera_image_chrono.lap() if elapsed_time > 1e3: - self.__process_camera_image_frequency = nb_laps self.__process_camera_image_chrono.restart() @@ -1456,18 +1428,20 @@ class ArContext(DataFeatures.PipelineStepObject): # Compare image size with ArCamera frame size if list(image.shape[0:2][::-1]) != self.__pipeline.size: - - logging.warning('%s._process_camera_image: image size (%i x %i) is different of ArCamera frame size (%i x %i)', DataFeatures.get_class_path(self) , width, height, self.__pipeline.size[0], self.__pipeline.size[1]) + logging.warning( + '%s._process_camera_image: image size (%i x %i) is different of ArCamera frame size (%i x %i)', + DataFeatures.get_class_path(self), width, height, self.__pipeline.size[0], self.__pipeline.size[1]) return try: logging.debug('\t> watch image (%i x %i)', width, height) - self.__pipeline.watch( DataFeatures.TimestampedImage(image, timestamp = timestamp), catch_exceptions = self.__catch_exceptions ) + self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp), + catch_exceptions=self.__catch_exceptions) # TODO: make this step optional - self.__pipeline.map(timestamp = timestamp, catch_exceptions = self.__catch_exceptions) + self.__pipeline.map(timestamp=timestamp, catch_exceptions=self.__catch_exceptions) except DataFeatures.TimestampedException as e: @@ -1477,15 +1451,16 @@ class ArContext(DataFeatures.PipelineStepObject): else: - raise(TypeError('Pipeline is not ArCamera instance.')) + raise (TypeError('Pipeline is not ArCamera instance.')) @DataFeatures.PipelineStepImage def image(self, draw_times: bool = None, draw_exceptions: bool = None): """ - Get pipeline image with execution informations. + Get pipeline image with execution information. Parameters: - draw_exceptions: ... + draw_times: draw pipeline execution times + draw_exceptions: draw pipeline exception messages """ logging.debug('ArContext.image %s', self.name) @@ -1499,9 +1474,9 @@ class ArContext(DataFeatures.PipelineStepObject): if draw_times: if image.is_timestamped(): - info_stack += 1 - cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, + (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArCamera): @@ -1514,7 +1489,8 @@ class ArContext(DataFeatures.PipelineStepObject): watch_time = math.nan info_stack += 1 - cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', + (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArFrame): @@ -1527,17 +1503,18 @@ class ArContext(DataFeatures.PipelineStepObject): look_time = math.nan info_stack += 1 - cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', + (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if draw_exceptions: # Write exceptions while self.__exceptions: - e = self.__exceptions.pop() i = len(self.__exceptions) - cv2.rectangle(image, (0, height-(i+1)*50), (width, height-(i)*50), (0, 0, 127), -1) - cv2.putText(image, f'error: {e}', (20, height-(i+1)*50+25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - (i) * 50), (0, 0, 127), -1) + cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, + (255, 255, 255), 1, cv2.LINE_AA) return image diff --git a/src/argaze/ArUcoMarkers/ArUcoBoard.py b/src/argaze/ArUcoMarkers/ArUcoBoard.py index 0f5b9fb..74dad94 100644 --- a/src/argaze/ArUcoMarkers/ArUcoBoard.py +++ b/src/argaze/ArUcoMarkers/ArUcoBoard.py @@ -18,12 +18,12 @@ __license__ = "GPLv3" from dataclasses import dataclass, field -from argaze.ArUcoMarkers import ArUcoMarkersDictionary - -import numpy import cv2 as cv import cv2.aruco as aruco +from argaze.ArUcoMarkers import ArUcoMarkersDictionary + + @dataclass class ArUcoBoard(): """ """ diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index 93a0b8f..bf4e5d3 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -16,22 +16,18 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Self import logging -import json -import os -import time - -from argaze import ArFeatures, DataFeatures -from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoDetector, ArUcoOpticCalibrator, ArUcoScene -from argaze.AreaOfInterest import AOI2DScene import cv2 import numpy -# Define default ArUcoCamera image_paremeters values +from argaze import ArFeatures, DataFeatures +from argaze.ArUcoMarkers import ArUcoDetector, ArUcoOpticCalibrator, ArUcoScene +from argaze.AreaOfInterest import AOI2DScene + +# Define default ArUcoCamera image_parameters values DEFAULT_ARUCOCAMERA_IMAGE_PARAMETERS = { - "draw_detected_markers": { + "draw_detected_markers": { "color": (0, 255, 0), "draw_axes": { "thickness": 3 @@ -39,6 +35,7 @@ DEFAULT_ARUCOCAMERA_IMAGE_PARAMETERS = { } } + class ArUcoCamera(ArFeatures.ArCamera): """ Define an ArCamera based on ArUco marker detection. @@ -57,7 +54,7 @@ class ArUcoCamera(ArFeatures.ArCamera): # Init protected attributes self._image_parameters = {**ArFeatures.DEFAULT_ARFRAME_IMAGE_PARAMETERS, **DEFAULT_ARUCOCAMERA_IMAGE_PARAMETERS} - + @property def aruco_detector(self) -> ArUcoDetector.ArUcoDetector: """ArUco marker detector.""" @@ -74,26 +71,30 @@ class ArUcoCamera(ArFeatures.ArCamera): # Optic parameters dimensions should be equal to camera frame size if self.__aruco_detector.optic_parameters.dimensions != self.size: - - raise DataFeatures.PipelineStepLoadingFaile('ArUcoCamera: aruco_detector.optic_parameters.dimensions have to be equal to size.') + raise DataFeatures.PipelineStepLoadingFaile( + 'ArUcoCamera: aruco_detector.optic_parameters.dimensions have to be equal to size.') # No optic parameters loaded else: # Create default optic parameters adapted to frame size # Note: The choice of 1000 for default focal length should be discussed... - self.__aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=self.size, K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=self.size[0], height=self.size[1])) + self.__aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=self.size, + K=ArUcoOpticCalibrator.K0( + focal_length=( + 1000., 1000.), + width=self.size[0], + height=self.size[1])) # Edit parent if self.__aruco_detector is not None: - self.__aruco_detector.parent = self @property def sides_mask(self) -> int: """Size of mask (pixel) to hide video left and right sides.""" return self.__sides_mask - + @sides_mask.setter def sides_mask(self, size: int): @@ -106,12 +107,10 @@ class ArUcoCamera(ArFeatures.ArCamera): self._scenes = {} for scene_name, scene_data in scenes.items(): - - self._scenes[scene_name] = ArUcoScene.ArUcoScene(name = scene_name, **scene_data) + self._scenes[scene_name] = ArUcoScene.ArUcoScene(name=scene_name, **scene_data) # Edit parent for name, scene in self._scenes.items(): - scene.parent = self # Update expected and excluded aoi @@ -128,7 +127,6 @@ class ArUcoCamera(ArFeatures.ArCamera): # Draw black rectangles to mask sides if self.__sides_mask > 0: - logging.debug('\t> drawing sides mask (%i px)', self.__sides_mask) height, width, _ = image.shape @@ -178,7 +176,8 @@ class ArUcoCamera(ArFeatures.ArCamera): tvec, rmat, _ = scene.estimate_pose(self.__aruco_detector.detected_markers(), timestamp=self.timestamp) # Project scene into camera frame according estimated pose - for layer_name, layer_projection in scene.project(tvec, rmat, self.visual_hfov, self.visual_vfov, timestamp=self.timestamp): + for layer_name, layer_projection in scene.project(tvec, rmat, self.visual_hfov, self.visual_vfov, + timestamp=self.timestamp): logging.debug('\t> project %s scene %s layer', scene_name, layer_name) @@ -195,7 +194,8 @@ class ArUcoCamera(ArFeatures.ArCamera): pass @DataFeatures.PipelineStepImage - def image(self, draw_detected_markers: dict = None, draw_scenes: dict = None, draw_optic_parameters_grid: dict = None, **kwargs: dict) -> numpy.array: + def image(self, draw_detected_markers: dict = None, draw_scenes: dict = None, + draw_optic_parameters_grid: dict = None, **kwargs: dict) -> numpy.array: """Get frame image with ArUco detection visualisation. Parameters: @@ -216,7 +216,6 @@ class ArUcoCamera(ArFeatures.ArCamera): # Draw optic parameters grid if required if draw_optic_parameters_grid is not None: - logging.debug('\t> drawing optic parameters') self.__aruco_detector.optic_parameters.draw(image, **draw_optic_parameters_grid) @@ -225,14 +224,12 @@ class ArUcoCamera(ArFeatures.ArCamera): if draw_scenes is not None: for scene_name, draw_scenes_parameters in draw_scenes.items(): - logging.debug('\t> drawing %s scene', scene_name) self.scenes[scene_name].draw(image, **draw_scenes_parameters) # Draw detected markers if required if draw_detected_markers is not None: - logging.debug('\t> drawing detected markers') self.__aruco_detector.draw_detected_markers(image, draw_detected_markers) diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py index f135c1d..ce7e38c 100644 --- a/src/argaze/ArUcoMarkers/ArUcoDetector.py +++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py @@ -16,18 +16,17 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Self import json -import os from collections import Counter -import time +from typing import Self + +import cv2 as cv +import numpy +from cv2 import aruco from argaze import DataFeatures from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoOpticCalibrator -import numpy -import cv2 as cv -from cv2 import aruco class DetectorParameters(): """Wrapper class around ArUco marker detector parameters. @@ -87,7 +86,7 @@ class DetectorParameters(): return getattr(self.__parameters, parameter) @classmethod - def from_json(self, json_filepath) -> Self: + def from_json(cls, json_filepath) -> Self: """Load detector parameters from .json file.""" with open(json_filepath) as configuration_file: @@ -185,8 +184,8 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): !!! danger "DON'T MIRROR IMAGE" It makes the markers detection to fail. - !!! danger "DON'T UNDISTORED IMAGE" - Camera intrisic parameters and distorsion coefficients are used later during pose estimation. + !!! danger "DON'T UNDISTORTED IMAGE" + Camera intrinsic parameters and distortion coefficients are used later during pose estimation. """ # Reset detected markers data @@ -254,8 +253,8 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): marker.size = size marker.points = markers_points.reshape(4, 3).dot(marker.rotation) - marker.translation - def detected_markers(self) -> dict[ArUcoMarker.ArUcoMarker]: - """Access to detected markers dictionary.""" + def detected_markers(self) -> dict[int, ArUcoMarker.ArUcoMarker]: + """Access to detected markers' dictionary.""" return self.__detected_markers diff --git a/src/argaze/ArUcoMarkers/ArUcoMarker.py b/src/argaze/ArUcoMarkers/ArUcoMarker.py index 3eb0b82..42cb174 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarker.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarker.py @@ -72,11 +72,13 @@ class ArUcoMarker(): return numpy.repeat(matrix, 3).reshape(dimension, dimension, 3) - def draw(self, image: numpy.array, K, D, color: tuple = None, draw_axes: dict = None): + def draw(self, image: numpy.array, K: numpy.array, D: numpy.array, color: tuple = None, draw_axes: dict = None): """Draw marker in image. Parameters: - image: image where to draw + image: image where to + K: + D: color: marker color (if None, no marker drawn) draw_axes: enable marker axes drawing diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py index f02b179..72fc688 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py @@ -16,11 +16,7 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Self - -import cv2 as cv import cv2.aruco as aruco -import numpy all_aruco_markers_dictionaries = { 'DICT_4X4_50': aruco.DICT_4X4_50, @@ -45,7 +41,7 @@ all_aruco_markers_dictionaries = { 'DICT_APRILTAG_36h10': aruco.DICT_APRILTAG_36h10, 'DICT_APRILTAG_36h11': aruco.DICT_APRILTAG_36h11 } -"""Dictionnary to list all built-in ArUco markers dictionaries from OpenCV ArUco package.""" +"""Dictionary to list all built-in ArUco markers dictionaries from OpenCV ArUco package.""" class ArUcoMarkersDictionary(): """Handle an ArUco markers dictionary.""" @@ -144,7 +140,7 @@ class ArUcoMarkersDictionary(): return int(dict_name_split[2]) - def create_marker(self, i, size) -> Self: + def create_marker(self, i, size): """Create a marker.""" if i >= 0 and i < self.number: diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py index 642b1d0..568b251 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py @@ -16,18 +16,16 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Self -from dataclasses import dataclass -import json import math -import itertools import re +from dataclasses import dataclass +from typing import Self -from argaze import DataFeatures -from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoOpticCalibrator - -import numpy import cv2 +import numpy + +from argaze import DataFeatures +from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker T0 = numpy.array([0., 0., 0.]) """Define no translation vector.""" @@ -37,12 +35,12 @@ R0 = numpy.array([[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]]) def make_rotation_matrix(x, y, z): - # Create rotation matrix around x axis + # Create rotation matrix around x-axis c = numpy.cos(numpy.deg2rad(x)) s = numpy.sin(numpy.deg2rad(x)) Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) - # Create rotation matrix around y axis + # Create rotation matrix around y-axis c = numpy.cos(numpy.deg2rad(y)) s = numpy.sin(numpy.deg2rad(y)) Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) @@ -74,7 +72,7 @@ class Place(): """ corners: numpy.array - marker: dict + marker: ArUcoMarker.ArUcoMarker class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): """ @@ -86,6 +84,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): """Initialize ArUcoMarkersGroup""" # Init private attributes + self.marker_size = None self.__dictionary = None self.__places = {} self.__translation = numpy.zeros(3) @@ -142,7 +141,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): assert(is_rotation_matrix(rmat)) # Get marker size - size = numpy.array(data.pop('size')).astype(numpy.float32) + size = float(numpy.array(data.pop('size')).astype(numpy.float32)) new_marker = ArUcoMarker.ArUcoMarker(self.__dictionary, identifier, size) @@ -199,7 +198,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): } @classmethod - def from_obj(self, obj_filepath: str) -> Self: + def from_obj(cls, obj_filepath: str) -> Self: """Load ArUco markers group from .obj file. !!! note @@ -216,7 +215,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): # Regex rules for .obj file parsing OBJ_RX_DICT = { 'object': re.compile(r'o (.*)#([0-9]+)_(.*)\n'), - 'vertice': re.compile(r'v ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+)\n'), + 'vertices': re.compile(r'v ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+)\n'), 'face': re.compile(r'f ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)\n'), 'comment': re.compile(r'#(.*)\n') # keep comment regex after object regex because the # is used in object string too } @@ -271,11 +270,11 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): raise NameError(f'Marker {identifier} dictionary is not {new_dictionary.name}') # Fill vertices array - elif key == 'vertice': + elif key == 'vertices': vertices.append(tuple([float(match.group(1)), float(match.group(2)), float(match.group(3))])) - # Extract vertice ids + # Extract vertices ids elif key == 'face': faces[identifier] = [int(match.group(1)), int(match.group(2)), int(match.group(3)), int(match.group(4))] @@ -285,7 +284,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): file.close() - # Retreive marker vertices thanks to face vertice ids + # Retrieve marker vertices thanks to face vertices ids for identifier, face in faces.items(): # Gather place corners in clockwise order @@ -430,10 +429,13 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): except cv2.error: pass - def draw(self, image: numpy.array, K, D, draw_axes: dict = None, draw_places: dict = None): + def draw(self, image: numpy.array, K: numpy.array, D: numpy.array, draw_axes: dict = None, draw_places: dict = None): """Draw group axes and places. Parameters: + image: where to draw. + K: + D: draw_axes: draw_axes parameters (if None, no axes drawn) draw_places: draw_places parameters (if None, no places drawn) """ diff --git a/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py b/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py index 4fd1bd6..12cbc54 100644 --- a/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py +++ b/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py @@ -25,129 +25,136 @@ import numpy import cv2 import cv2.aruco as aruco + def K0(focal_length: tuple, width: int, height: int) -> numpy.array: - """Define default optic intrinsic parameters matrix. + """Define default optic intrinsic parameters' matrix. Parameters: + focal_length: width: in pixel. height: in pixel. """ - return numpy.array([[focal_length[0], 0., width/2], [0., focal_length[1], height/2], [0., 0., 1.]]) + return numpy.array([[focal_length[0], 0., width / 2], [0., focal_length[1], height / 2], [0., 0., 1.]]) + D0 = numpy.array([0.0, 0.0, 0.0, 0.0, 0.0]) -"""Define default optic distorsion coefficients vector.""" +"""Define default optic distortion coefficients vector.""" + @dataclass class OpticParameters(): - """Define optic parameters outputed by optic calibrator.""" + """Define optic parameters output by optic calibrator.""" - rms: float = field(default=0) - """Root Mean Square error of calibration.""" + rms: float = field(default=0) + """Root Mean Square error of calibration.""" - dimensions: numpy.array = field(default_factory=lambda : numpy.array([0, 0])) - """Image dimensions in pixels from which the calibration have been done.""" + dimensions: numpy.array = field(default_factory=lambda: numpy.array([0, 0])) + """Image dimensions in pixels from which the calibration have been done.""" - K: numpy.array = field(default_factory=lambda : K0((0, 0), 0, 0)) - """Intrinsic parameters matrix (focal lengths and principal point).""" + K: numpy.array = field(default_factory=lambda: K0((0, 0), 0, 0)) + """Intrinsic parameters matrix (focal lengths and principal point).""" - D: numpy.array = field(default_factory=lambda : D0) - """Distorsion coefficients vector.""" + D: numpy.array = field(default_factory=lambda: D0) + """Distortion coefficients vector.""" - @classmethod - def from_json(self, json_filepath): - """Load optical parameters from .json file.""" + @classmethod + def from_json(cls, json_filepath): + """Load optical parameters from .json file.""" - with open(json_filepath) as calibration_file: + with open(json_filepath) as calibration_file: + return OpticParameters(**json.load(calibration_file)) - return OpticParameters(**json.load(calibration_file)) + def to_json(self, json_filepath): + """Save optical parameters into .json file.""" - def to_json(self, json_filepath): - """Save optical parameters into .json file.""" + with open(json_filepath, 'w', encoding='utf-8') as calibration_file: + json.dump(self, calibration_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) - with open(json_filepath, 'w', encoding='utf-8') as calibration_file: + def __str__(self) -> str: + """String display""" - json.dump(self, calibration_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) + output = f'\trms: {self.rms}\n' + output += f'\tdimensions: {self.dimensions}\n' + output += f'\tK: {self.K}\n' + output += f'\tD: {self.D}\n' - def __str__(self) -> str: - """String display""" + return output - output = f'\trms: {self.rms}\n' - output += f'\tdimensions: {self.dimensions}\n' - output += f'\tK: {self.K}\n' - output += f'\tD: {self.D}\n' + def draw(self, image: numpy.array, width: float = 0., height: float = 0., z: float = 0., point_size: int = 1, + point_color: tuple = (0, 0, 0)): + """Draw grid to display K and D""" - return output + if width * height > 0.: - def draw(self, image: numpy.array, width: float = 0., height:float = 0., z: float = 0., point_size: int = 1, point_color: tuple = (0, 0, 0)): - """Draw grid to display K and D""" + # Edit 3D grid + grid_3D = [] + for x in range(-int(width / 2), int(width / 2)): + for y in range(-int(height / 2), int(height / 2)): + grid_3D.append([x, y, z]) - if width * height > 0.: + # Project 3d grid + grid_2D, _ = cv2.projectPoints(numpy.array(grid_3D).astype(float), numpy.array([0., 0., 0.]), + numpy.array([0., 0., 0.]), numpy.array(self.K), -numpy.array(self.D)) - # Edit 3D grid - grid_3D = [] - for x in range(-int(width/2), int(width/2)): - for y in range(-int(height/2), int(height/2)): - grid_3D.append([x, y, z]) + # Draw projection + for point in grid_2D: - # Project 3d grid - grid_2D, _ = cv2.projectPoints(numpy.array(grid_3D).astype(float), numpy.array([0., 0., 0.]), numpy.array([0., 0., 0.]), numpy.array(self.K), -numpy.array(self.D)) + # Ignore point out field + try: - # Draw projection - for point in grid_2D: + cv2.circle(image, point.astype(int)[0], point_size, point_color, -1) - # Ignore point out out field - try: + except: - cv2.circle(image, point.astype(int)[0], point_size, point_color, -1) + pass - except: - - pass class ArUcoOpticCalibrator(): - """Handle optic calibration process.""" - - def __init__(self,): - - # Calibration data - self.__corners_set_number = 0 - self.__corners_set = [] - self.__corners_set_ids = [] + """Handle optic calibration process.""" - def calibrate(self, board, dimensions:tuple = (0, 0)) -> OpticParameters: - """Retrieve K and D parameters from stored calibration data. + def __init__(self, ): + # Calibration data + self.__corners_set_number = 0 + self.__corners_set = [] + self.__corners_set_ids = [] - Parameters: - dimensions: camera image dimensions + def calibrate(self, board, dimensions=None) -> OpticParameters: + """Retrieve K and D parameters from stored calibration data. - Returns: - Optic parameters - """ + Parameters: + board: + dimensions: camera image dimensions - if self.__corners_set_number > 0: + Returns: + Optic parameters + """ - rms, K, D, r, t = aruco.calibrateCameraCharuco(self.__corners_set, self.__corners_set_ids, board.model, dimensions, None, None) + if dimensions is None: + dimensions = [0, 0] - return OpticParameters(rms, dimensions, K, D) + if self.__corners_set_number > 0: + rms, K, D, r, t = aruco.calibrateCameraCharuco(self.__corners_set, self.__corners_set_ids, board.model, + dimensions, None, None) - def reset_calibration_data(self): - """Clear all calibration data.""" + return OpticParameters(rms, dimensions, K, D) - self.__corners_set_number = 0 - self.__corners_set = [] - self.__corners_set_ids = [] + def reset_calibration_data(self): + """Clear all calibration data.""" - def store_calibration_data(self, corners, corners_identifiers): - """Store calibration data.""" + self.__corners_set_number = 0 + self.__corners_set = [] + self.__corners_set_ids = [] - self.__corners_set_number += 1 - self.__corners_set.append(corners) - self.__corners_set_ids.append(corners_identifiers) + def store_calibration_data(self, corners, corners_identifiers): + """Store calibration data.""" - @property - def calibration_data_count(self) -> int: - """Get how much calibration data are stored.""" + self.__corners_set_number += 1 + self.__corners_set.append(corners) + self.__corners_set_ids.append(corners_identifiers) - return self.__corners_set_number + @property + def calibration_data_count(self) -> int: + """Get how much calibration data are stored.""" + return self.__corners_set_number diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py index dbad14d..b818dff 100644 --- a/src/argaze/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze/ArUcoMarkers/ArUcoScene.py @@ -16,15 +16,11 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import json -import os +import numpy from argaze import ArFeatures, DataFeatures from argaze.ArUcoMarkers import ArUcoMarkersGroup -from argaze.AreaOfInterest import AOI2DScene -import cv2 -import numpy class ArUcoScene(ArFeatures.ArScene): """ diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py index 4a2a2be..8283e2e 100644 --- a/src/argaze/AreaOfInterest/AOI2DScene.py +++ b/src/argaze/AreaOfInterest/AOI2DScene.py @@ -18,9 +18,8 @@ __license__ = "GPLv3" from typing import Self -from argaze import DataFeatures +from argaze import DataFeatures, GazeFeatures from argaze.AreaOfInterest import AOIFeatures, AOI3DScene -from argaze import GazeFeatures import cv2 import numpy @@ -34,7 +33,7 @@ class AOI2DScene(AOIFeatures.AOIScene): super().__init__(2, aoi_2d) @classmethod - def from_svg(self, svg_filepath: str) -> Self: + def from_svg(cls, svg_filepath: str) -> Self: """ Load areas from .svg file. @@ -111,13 +110,18 @@ class AOI2DScene(AOIFeatures.AOIScene): return AOI2DScene(new_areas) - def draw(self, image: numpy.array, draw_aoi: dict = None, exclude=[]): + def draw(self, image: numpy.array, draw_aoi: dict = None, exclude=None): """Draw AOI polygons on image. Parameters: + image: where to draw. draw_aoi: AOIFeatures.AOI.draw parameters (if None, no aoi is drawn) + exclude: aoi to exclude """ + if exclude is None: + exclude = [] + for name, aoi in self.items(): if name in exclude: @@ -174,7 +178,7 @@ class AOI2DScene(AOIFeatures.AOIScene): yield name, aoi, matched_region, aoi_ratio, circle_ratio - '''DEPRECATED: but maybe still usefull? + '''DEPRECATED: but maybe still useful? def reframe(self, aoi: AOIFeatures.AreaOfInterest, size: tuple) -> AOI2DScene: """ Reframe whole scene to a scene bounded by a 4 vertices 2D AOI. @@ -198,7 +202,7 @@ class AOI2DScene(AOIFeatures.AOIScene): M = cv2.getAffineTransform(Src[:3], Dst[:3])[:, :2] - # Apply affine transformationto each AOI + # Apply affine transformation to each AOI aoi2D_scene = AOI2DScene() for name, aoi2D in self.items(): diff --git a/src/argaze/AreaOfInterest/AOI3DScene.py b/src/argaze/AreaOfInterest/AOI3DScene.py index 83180a8..762dab0 100644 --- a/src/argaze/AreaOfInterest/AOI3DScene.py +++ b/src/argaze/AreaOfInterest/AOI3DScene.py @@ -16,15 +16,13 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Self -import math import re +from typing import Self -from argaze import DataFeatures -from argaze.AreaOfInterest import AOIFeatures, AOI2DScene - +import cv2 import numpy -import cv2 as cv + +from argaze.AreaOfInterest import AOIFeatures, AOI2DScene T0 = numpy.array([0., 0., 0.]) """Define no translation vector.""" @@ -36,7 +34,8 @@ K0 = numpy.array([[1., 0., 0.], [0., 1., 0.], [0., 0., 0.]]) """Define default optic intrinsic parameters matrix.""" D0 = numpy.array([0.0, 0.0, 0.0, 0.0, 0.0]) -"""Define default optic distorsion coefficients vector.""" +"""Define default optic distortion coefficients vector.""" + class AOI3DScene(AOIFeatures.AOIScene): """Define AOI 3D scene.""" @@ -46,7 +45,7 @@ class AOI3DScene(AOIFeatures.AOIScene): super().__init__(3, aoi_3d) @classmethod - def from_obj(self, obj_filepath: str) -> Self: + def from_obj(cls, obj_filepath: str) -> Self: """Load AOI3D scene from .obj file.""" aoi_3d = {} @@ -55,11 +54,11 @@ class AOI3DScene(AOIFeatures.AOIScene): OBJ_RX_DICT = { 'comment': re.compile(r'#(.*)\n'), 'name': re.compile(r'o (\w+)(.*)\n'), - 'vertice': re.compile(r'v ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+)\n'), + 'vertices': re.compile(r'v ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+)\n'), 'face': re.compile(r'f (.*)\n') } - # regex .obj line parser + # regex .obj line parser def __parse_obj_line(line): for key, rx in OBJ_RX_DICT.items(): @@ -69,8 +68,8 @@ class AOI3DScene(AOIFeatures.AOIScene): # if there are no matches return None, None - - # start parsing + + # start parsing try: name = None @@ -97,11 +96,11 @@ class AOI3DScene(AOIFeatures.AOIScene): name = str(match.group(1)) # fill vertices array - elif key == 'vertice': + elif key == 'vertices': vertices.append(tuple([float(match.group(1)), float(match.group(2)), float(match.group(3))])) - # extract aoi3D vertice id + # extract aoi3D vertices id elif key == 'face': faces[name] = [int(i) for i in match.group(1).split()] @@ -111,9 +110,9 @@ class AOI3DScene(AOIFeatures.AOIScene): file.close() - # retreive all aoi3D vertices and sort them in clockwise order + # retrieve all aoi3D vertices and sort them in clockwise order for name, face in faces.items(): - aoi3D = AOIFeatures.AreaOfInterest([ vertices[i-1] for i in reversed(face) ]) + aoi3D = AOIFeatures.AreaOfInterest([vertices[i - 1] for i in reversed(face)]) aoi_3d[name] = aoi3D except IOError: @@ -132,7 +131,7 @@ class AOI3DScene(AOIFeatures.AOIScene): for name, aoi3D in self.items(): - file.write(f'o {name}\n') + file.write(f'o {name}\n') vertices_ids = 'f' @@ -141,7 +140,6 @@ class AOI3DScene(AOIFeatures.AOIScene): vertices_coords = 'v' for coord in vertices: - vertices_coords += f' {coord:.6f}' file.write(vertices_coords + '\n') @@ -152,7 +150,7 @@ class AOI3DScene(AOIFeatures.AOIScene): file.write('s off\n') file.write(vertices_ids + '\n') - '''DEPRECATED: but maybe still usefull? + '''DEPRECATED: but maybe still useful? @property def orthogonal_projection(self) -> AOI2DScene.AOI2DScene: """ @@ -174,7 +172,9 @@ class AOI3DScene(AOIFeatures.AOIScene): return self.project(tvec, rvec, K) ''' - def vision_cone(self, cone_radius, cone_height, cone_tip=[0., 0., 0.], cone_direction=[0., 0., 1.]) -> tuple[Self, Self]: + + def vision_cone(self, cone_radius, cone_height, cone_tip=[0., 0., 0.], cone_direction=[0., 0., 1.]) -> tuple[ + Self, Self]: """Get AOI which are inside and out a given cone field. !!! note @@ -182,8 +182,8 @@ class AOI3DScene(AOIFeatures.AOIScene): The cone have its tip at origin and its base oriented to positive Z axis. Returns: - scene inside of the cone - scene outside of the cone + scene inside the cone + scene outside the cone """ # define cone tip and direction as numpy array @@ -207,7 +207,7 @@ class AOI3DScene(AOIFeatures.AOIScene): one_vertice_out = True break - # if no vertice is outside the cone, aoi is inside + # if no vertices is outside the cone, aoi is inside if not one_vertice_out: aoi3D_scene_inside[name] = aoi3D else: @@ -222,29 +222,29 @@ class AOI3DScene(AOIFeatures.AOIScene): T: translation vector R: rotation vector K: camera intrinsic parameters matrix - D: camera distorsion coefficients vector + D: camera distortion coefficients vector !!! danger - Camera distorsion coefficients could projects points which are far from image frame into it. + Camera distortion coefficients could project points which are far from image frame into it. !!! note - As gaze is mainly focusing on frame center, where the distorsion is low, it could be acceptable to not use camera distorsion. + As gaze is mainly focusing on frame center, where the distortion is low, + it could be acceptable to not use camera distortion. """ aoi2D_scene = AOI2DScene.AOI2DScene() for name, aoi3D in self.items(): + vertices_2d, J = cv2.projectPoints(aoi3D.astype(numpy.float32), R, T, numpy.array(K), numpy.array(D)) - vertices_2D, J = cv.projectPoints(aoi3D.astype(numpy.float32), R, T, numpy.array(K), numpy.array(D)) - - aoi2D = vertices_2D.reshape((len(vertices_2D), 2)).view(AOIFeatures.AreaOfInterest) + aoi2D = vertices_2d.reshape((len(vertices_2d), 2)).view(AOIFeatures.AreaOfInterest) aoi2D_scene[name] = aoi2D return aoi2D_scene def transform(self, T: numpy.array = T0, R: numpy.array = R0) -> Self: - """Translate and/or rotate 3D scene. + """Translate and/or rotate 3D scene. Parameters: T: translation vector @@ -254,7 +254,6 @@ class AOI3DScene(AOIFeatures.AOIScene): aoi3D_scene = AOI3DScene() for name, aoi3D in self.items(): - aoi3D_scene[name] = aoi3D.dot(R.T) + T return aoi3D_scene diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index 130390c..88c6feb 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -16,19 +16,19 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Self import json -import os import math - -from argaze import DataFeatures +from typing import Self import cv2 import matplotlib.path as mpath import numpy +from colorama import Style, Fore from shapely.geometry import Polygon from shapely.geometry.point import Point -from colorama import Style, Fore + +from argaze import DataFeatures + class AreaOfInterest(numpy.ndarray): """Define Area Of Interest as an array of points of any dimension.""" @@ -313,7 +313,7 @@ class AOIScene(): return AOIScene(dimension = dimension, areas = areas) @classmethod - def from_json(self, json_filepath: str) -> Self: + def from_json(cls, json_filepath: str) -> Self: """ Load attributes from .json file. @@ -508,9 +508,13 @@ class AOIScene(): return max_bounds - min_bounds - def copy(self, exclude=[]) -> Self: + def copy(self, exclude: list=None) -> Self: """Copy scene partly excluding AOI by name.""" + if exclude is None: + exclude = [] + + # noinspection PyArgumentList scene_copy = type(self)() for name, area in self.__areas.items(): @@ -542,6 +546,8 @@ class AOIScene(): return output + +# noinspection PyAttributeOutsideInit class Heatmap(DataFeatures.PipelineStepObject): """Define image to draw heatmap.""" @@ -558,14 +564,18 @@ class Heatmap(DataFeatures.PipelineStepObject): """Size of heatmap image in pixels.""" return self.__size + # noinspection PyAttributeOutsideInit @size.setter def size(self, size: tuple[int, int]): self.__size = size + # noinspection PyAttributeOutsideInit self.__rX, self.__rY = size # Init coordinates + # noinspection PyAttributeOutsideInit self.__Sx = numpy.linspace(0., self.__rX/self.__rY, self.__rX) + # noinspection PyAttributeOutsideInit self.__Sy = numpy.linspace(0., 1., self.__rY) # Init heatmap image @@ -606,13 +616,18 @@ class Heatmap(DataFeatures.PipelineStepObject): return numpy.exp((v_dX + v_dY) / div).reshape(self.__rY, self.__rX) + # noinspection PyAttributeOutsideInit def clear(self): """Clear heatmap image.""" + # noinspection PyAttributeOutsideInit self.__point_spread_sum = numpy.zeros((self.__rY, self.__rX)) + # noinspection PyAttributeOutsideInit self.__point_spread_buffer = [] + # noinspection PyAttributeOutsideInit self.__point_spread_buffer_size = self.__buffer + # noinspection PyAttributeOutsideInit @DataFeatures.PipelineStepMethod def update(self, point: tuple): """Update heatmap image.""" @@ -634,6 +649,7 @@ class Heatmap(DataFeatures.PipelineStepObject): # Edit heatmap gray = (255 * self.__point_spread_sum / numpy.max(self.__point_spread_sum)).astype(numpy.uint8) + # noinspection PyAttributeOutsideInit self.__image = cv2.applyColorMap(gray, cv2.COLORMAP_JET) @DataFeatures.PipelineStepImage diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 7189001..a7b0a48 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -16,48 +16,47 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Self +import bisect +import json +import logging +import math import os import sys -import logging -import traceback -import importlib -import collections -import json -import bisect import threading -import math import time +from typing import Self -import pandas -import numpy import cv2 -import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches +import matplotlib.pyplot as mpyplot +import numpy +import pandas from colorama import Style, Fore # Define global working directory used to load file using relative path WORKING_DIRECTORY = [None] + def get_working_directory() -> str: - """Get global working directory.""" - return WORKING_DIRECTORY[0] + """Get global working directory.""" + return WORKING_DIRECTORY[0] + def set_working_directory(working_directory: str): - """Set global working directory.""" + """Set global working directory.""" - # Forget former global working directory - if WORKING_DIRECTORY[0] is not None: + # Forget former global working directory + if WORKING_DIRECTORY[0] is not None: + sys.path.remove(WORKING_DIRECTORY[0]) - sys.path.remove(WORKING_DIRECTORY[0]) + # Append new working directory to Python path + sys.path.append(working_directory) - # Append new working directory to Python path - sys.path.append(working_directory) + WORKING_DIRECTORY[0] = working_directory - WORKING_DIRECTORY[0] = working_directory def get_class(class_path: str) -> object: - """Get class object from 'path.to.class' string. + """Get class object from 'path.to.class' string. Parameters: class_path: a 'path.to.class' string. @@ -65,18 +64,19 @@ def get_class(class_path: str) -> object: Returns: class: a 'path.to.class' class. """ - parts = class_path.split('.') - module = ".".join(parts[:-1]) + parts = class_path.split('.') + module = ".".join(parts[:-1]) - m = __import__(module) + m = __import__(module) - for comp in parts[1:]: - m = getattr(m, comp) + for comp in parts[1:]: + m = getattr(m, comp) + + return m - return m def get_class_path(o: object) -> str: - """Get 'path.to.class' class path from object. + """Get 'path.to.class' class path from object. Parameters: o: any object instance. @@ -84,33 +84,33 @@ def get_class_path(o: object) -> str: Returns: class_path: object 'path.to.class' class. """ - c = o.__class__ - m = c.__module__ + c = o.__class__ + m = c.__module__ - # Avoid outputs like 'builtins.str' - if m == 'builtins': + # Avoid outputs like 'builtins.str' + if m == 'builtins': + return c.__qualname__ - return c.__qualname__ + return m + '.' + c.__qualname__ - return m + '.' + c.__qualname__ def properties(cls) -> list: - """get class properties name.""" + """get class properties name.""" - properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)] + properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)] - for base in cls.__bases__: + for base in cls.__bases__: - for name, item in base.__dict__.items(): + for name, item in base.__dict__.items(): - if isinstance(item, property): + if isinstance(item, property): + properties.append(name) - properties.append(name) + return properties - return properties -def from_json(configuration_filepath: str, patch_filepath: str = None) -> object: - """ +def from_json(configuration_filepath: str, patch_filepath: str = None) -> any: + """ Load object instance from .json file. !!! note @@ -121,293 +121,295 @@ def from_json(configuration_filepath: str, patch_filepath: str = None) -> object patch_filepath: path to json patch file to modify any configuration entries """ - logging.debug('DataFeatures.from_json') + logging.debug('DataFeatures.from_json') - # Edit working directory once - if get_working_directory() is None: + # Edit working directory once + if get_working_directory() is None: + set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath))) - set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath))) + logging.debug('\t> set global working directory as %s', get_working_directory()) - logging.debug('\t> set global working directory as %s', get_working_directory()) + # Load configuration from JSON file + with open(configuration_filepath) as configuration_file: - # Load configuration from JSON file - with open(configuration_filepath) as configuration_file: + object_data = json.load(configuration_file) - object_data = json.load(configuration_file) + # Apply patch to configuration if required + if patch_filepath is not None: - # Apply patch to configuration if required - if patch_filepath is not None: + with open(patch_filepath) as patch_file: - with open(patch_filepath) as patch_file: + patch_data = json.load(patch_file) - patch_data = json.load(patch_file) + import collections.abc - import collections.abc + def update(d, u): - def update(d, u): + for k, v in u.items(): - for k, v in u.items(): + if isinstance(v, collections.abc.Mapping): - if isinstance(v, collections.abc.Mapping): + d[k] = update(d.get(k, {}), v) - d[k] = update(d.get(k, {}), v) + elif v is None: - elif v is None: + del d[k] - del d[k] + else: - else: + d[k] = v - d[k] = v + return d - return d + objects_data = update(object_data, patch_data) - objects_data = update(object_data, patch_data) + # Load unique object + object_class, object_data = object_data.popitem() - # Load unique object - object_class, object_data = object_data.popitem() + # Instanciate class + logging.debug('\t+ create %s object', object_class) - # Instanciate class - logging.debug('\t+ create %s object', object_class) + # noinspection PyCallingNonCallable + return get_class(object_class)(**object_data) - return get_class(object_class)(**object_data) def from_dict(expected_value_type: type, data: dict) -> any: - """Load expected type instance(s) from dict values.""" + """Load expected type instance(s) from dict values.""" - logging.debug('\t> load %s from dict', expected_value_type.__name__) + logging.debug('\t> load %s from dict', expected_value_type.__name__) - # Check if json keys are PipelineStepObject class and store them in a list - new_objects_list = [] + # Check if json keys are PipelineStepObject class and store them in a list + new_objects_list = [] - for key, value in data.items(): + for key, value in data.items(): - try: + try: - new_class = get_class(key) + new_class = get_class(key) - except ValueError as e: + except ValueError as e: - # Keys are not class name - if str(e) == 'Empty module name': + # Keys are not class name + if str(e) == 'Empty module name': - break + break - else: + else: - raise(e) + raise (e) - logging.debug('\t+ create %s object from key using value as argument', key) + logging.debug('\t+ create %s object from key using value as argument', key) - new_objects_list.append( new_class(**value) ) + # noinspection PyCallingNonCallable + new_objects_list.append(new_class(**value)) - # Only one object have been loaded: pass the object if it is a subclass of expected type - if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type): + # Only one object have been loaded: pass the object if it is a subclass of expected type + if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type): - return new_objects_list[0] + return new_objects_list[0] - # Pass non empty objects list - elif len(new_objects_list) > 0: + # Pass non-empty objects list + elif len(new_objects_list) > 0: - return new_objects_list + return new_objects_list - # Otherwise, data are parameters of the expected class - logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__) + # Otherwise, data are parameters of the expected class + logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__) - return expected_value_type(**data) + return expected_value_type(**data) -def as_dict(obj, filter: bool=True) -> dict: - """Export object as dictionary. - Parameters: - filter: remove None attribute values. - """ - _dict = {} +def as_dict(obj, filter: bool = True) -> dict: + """Export object as dictionary. + + Parameters: + obj: + filter: remove None attribute values. + """ + _dict = {} - for p in properties(obj.__class__): + for p in properties(obj.__class__): - v = getattr(obj, p) + v = getattr(obj, p) - if not filter or v is not None: + if not filter or v is not None: + _dict[p] = v - _dict[p] = v + return _dict - return _dict class JsonEncoder(json.JSONEncoder): - """Specific ArGaze JSON Encoder.""" + """Specific ArGaze JSON Encoder.""" - def default(self, obj): - """default implementation to serialize object.""" + def default(self, obj): + """default implementation to serialize object.""" - # numpy cases - if isinstance(obj, numpy.integer): - return int(obj) + # numpy cases + if isinstance(obj, numpy.integer): + return int(obj) - elif isinstance(obj, numpy.floating): - return float(obj) + elif isinstance(obj, numpy.floating): + return float(obj) - elif isinstance(obj, numpy.ndarray): - return obj.tolist() + elif isinstance(obj, numpy.ndarray): + return obj.tolist() - # default case - try: + # default case + try: - return json.JSONEncoder.default(self, obj) + return json.JSONEncoder.default(self, obj) - # class case - except: + # class case + except: - # ignore attribute starting with _ - public_dict = {} + # ignore attribute starting with _ + public_dict = {} - for k, v in vars(obj).items(): - - if not k.startswith('_'): - - # numpy cases - if isinstance(v, numpy.integer): - v = int(v) + for k, v in vars(obj).items(): - elif isinstance(v, numpy.floating): - v = float(v) + if not k.startswith('_'): - elif isinstance(v, numpy.ndarray): - v = v.tolist() + # numpy cases + if isinstance(v, numpy.integer): + v = int(v) - public_dict[k] = v + elif isinstance(v, numpy.floating): + v = float(v) + + elif isinstance(v, numpy.ndarray): + v = v.tolist() + + public_dict[k] = v + + return public_dict - return public_dict class DataDictionary(dict): - """Enable dot.notation access to dictionary attributes""" + """Enable dot notation access to dictionary attributes""" + + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ - __getattr__ = dict.get - __setattr__ = dict.__setitem__ - __delattr__ = dict.__delitem__ class TimestampedObject(): - """Abstract class to enable timestamp management.""" + """Abstract class to enable timestamp management.""" - def __init__(self, timestamp: int|float = math.nan): - """Initialize TimestampedObject.""" - self._timestamp = timestamp + def __init__(self, timestamp: int | float = math.nan): + """Initialize TimestampedObject.""" + self._timestamp = timestamp - def __repr__(self): - """String representation.""" - return json.dumps(as_dict(self)) + def __repr__(self): + """String representation.""" + return json.dumps(as_dict(self)) - @property - def timestamp(self) -> int|float: - """Get object timestamp.""" - return self._timestamp + @property + def timestamp(self) -> int | float: + """Get object timestamp.""" + return self._timestamp - @timestamp.setter - def timestamp(self, timestamp: int|float): - """Set object timestamp.""" - self._timestamp = timestamp + @timestamp.setter + def timestamp(self, timestamp: int | float): + """Set object timestamp.""" + self._timestamp = timestamp - def untimestamp(self): - """Reset object timestamp.""" - self._timestamp = math.nan + def untimestamp(self): + """Reset object timestamp.""" + self._timestamp = math.nan + + def is_timestamped(self) -> bool: + """Is the object timestamped?""" + return not math.isnan(self._timestamp) - def is_timestamped(self) -> bool: - """Is the object timestamped?""" - return not math.isnan(self._timestamp) class TimestampedObjectsList(list): - """Handle timestamped object into a list. + """Handle timestamped object into a list. !!! warning "Timestamped objects are not sorted internally" - Timestamped objects are considered to be stored according at their coming time. + Timestamped objects are considered to be stored according to their coming time. """ - def __init__(self, ts_object_type: type, ts_objects: list = []): + def __init__(self, ts_object_type: type, ts_objects: list = []): - self.__object_type = ts_object_type - self.__object_properties = properties(self.__object_type) + self.__object_type = ts_object_type + self.__object_properties = properties(self.__object_type) - for ts_object in ts_objects: + for ts_object in ts_objects: + self.append(ts_object) - self.append(ts_object) + @property + def object_type(self): + """Get object type handled by the list.""" + return self.__object_type - @property - def object_type(self): - """Get object type handled by the list.""" - return self.__object_type + def append(self, ts_object: TimestampedObject | dict): + """Append timestamped object.""" - def append(self, ts_object: TimestampedObject|dict): - """Append timestamped object.""" - - # Convert dict into object - if type(ts_object) == dict: + # Convert dict into object + if type(ts_object) == dict: + ts_object = from_dict(self.__object_type, ts_object) - ts_object = self.__object_type.from_dict(ts_object) + # Check object type + if type(ts_object) != self.__object_type: - # Check object type - if type(ts_object) != self.__object_type: + if not issubclass(ts_object.__class__, self.__object_type): + raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') - if not issubclass(ts_object.__class__, self.__object_type): + if not ts_object.is_timestamped(): + raise ValueError(f'object is not timestamped') - raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') - - if not ts_object.is_timestamped(): - - raise ValueError(f'object is not timestamped') - - super().append(ts_object) + super().append(ts_object) - def look_for(self, timestamp: int|float) -> TimestampedObject: - """Look for object at given timestamp.""" - for ts_object in self: - - if ts_object.timestamp == timestamp: + def look_for(self, timestamp: int | float) -> TimestampedObject: + """Look for object at given timestamp.""" + for ts_object in self: - return ts_object + if ts_object.timestamp == timestamp: + return ts_object - def __add__(self, ts_objects: list = []) -> Self: - """Append timestamped objects list.""" + def __add__(self, ts_objects: list = []) -> Self: + """Append timestamped objects list.""" - for ts_object in ts_objects: + for ts_object in ts_objects: + self.append(ts_object) - self.append(ts_object) + return self - return self + @property + def duration(self): + """Get inferred duration from first and last timestamps.""" + if self: - @property - def duration(self): - """Get inferred duration from first and last timestamps.""" - if self: + return self[-1].timestamp - self[0].timestamp - return self[-1].timestamp - self[0].timestamp + else: - else: + return 0 - return 0 + def timestamps(self): + """Get all timestamps in list.""" + return [ts_object.timestamp for ts_object in self] - def timestamps(self): - """Get all timestamps in list.""" - return [ts_object.timestamp for ts_object in self] + def tuples(self) -> list: + """Get all timestamped objects as list of tuple.""" + return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] - def tuples(self) -> list: - """Get all timestamped objects as list of tuple.""" - return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] + @classmethod + def from_dataframe(cls, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self: + """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" - @classmethod - def from_dataframe(self, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self: - """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" + dataframe.drop(exclude, inplace=True, axis=True) - dataframe.drop(exclude, inplace=True, axis=True) + assert (dataframe.index.name == 'timestamp') - assert(dataframe.index.name == 'timestamp') + object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in + dataframe.to_dict('index').items()] - object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in dataframe.to_dict('index').items()] + return TimestampedObjectsList(ts_object_type, object_list) - return TimestampedObjectsList(ts_object_type, object_list) - - def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: - """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). + def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: + """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). The optional *split* argument allows tuple values to be stored in dedicated columns. For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} @@ -421,760 +423,750 @@ class TimestampedObjectsList(list): Timestamps are stored as index column called 'timestamp'. """ - df = pandas.DataFrame(self.tuples(), columns=self.__object_properties) + df = pandas.DataFrame(self.tuples(), columns=self.__object_properties) - # Exclude columns - df.drop(exclude, inplace=True, axis=True) + # Exclude columns + df.drop(exclude, inplace=True, axis=True) - # Split columns - if len(split) > 0: + # Split columns + if len(split) > 0: - splited_columns = [] - - for column in df.columns: + split_columns = [] - if column in split.keys(): + for column in df.columns: - df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) - df.drop(column, inplace=True, axis=True) + if column in split.keys(): - for new_column in split[column]: + df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) + df.drop(column, inplace=True, axis=True) - splited_columns.append(new_column) + for new_column in split[column]: + split_columns.append(new_column) - else: + else: - splited_columns.append(column) + split_columns.append(column) - # Reorder splited columns - df = df[splited_columns] + # Reorder split columns + df = df[split_columns] - # Append timestamps as index column - df['timestamp'] = self.timestamps() - df.set_index('timestamp', inplace=True) + # Append timestamps as index column + df['timestamp'] = self.timestamps() + df.set_index('timestamp', inplace=True) - return df + return df - @classmethod - def from_json(self, ts_object_type: type, json_filepath: str) -> Self: - """Create a TimestampedObjectsList from .json file.""" + @classmethod + def from_json(cls, ts_object_type: type, json_filepath: str) -> Self: + """Create a TimestampedObjectsList from .json file.""" - with open(json_filepath, encoding='utf-8') as ts_objects_file: + with open(json_filepath, encoding='utf-8') as ts_objects_file: + json_ts_objects = json.load(ts_objects_file) - json_ts_objects = json.load(ts_objects_file) + return TimestampedObjectsList(ts_object_type, + [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects]) - return TimestampedObjectsList(ts_object_type, [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects]) + def to_json(self, json_filepath: str): + """Save a TimestampedObjectsList to .json file.""" - def to_json(self, json_filepath: str): - """Save a TimestampedObjectsList to .json file.""" + with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file: + json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ') - with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file: + def __repr__(self): + """String representation""" + return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, ) - json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ') + def __str__(self): + """String representation""" + return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, ) - def __repr__(self): - """String representation""" - return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) + def pop_last_until(self, timestamp: int | float) -> TimestampedObject: + """Pop all item until a given timestamped value and return the first after.""" - def __str__(self): - """String representation""" - return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) + # get last item before given timestamp + earliest_value = self.get_last_until(timestamp) - def pop_last_until(self, timestamp: int|float) -> TimestampedObject: - """Pop all item until a given timestamped value and return the first after.""" + while self[0].timestamp < earliest_value.timestamp: + self.pop(0) - # get last item before given timestamp - earliest_value = self.get_last_until(timestamp) + return self[0] - while self[0].timestamp < earliest_value.timestamp: + def pop_last_before(self, timestamp: int | float) -> TimestampedObject: + """Pop all item before a given timestamped value and return the last one.""" - self.pop(0) - - return self[0] + # get last item before given timestamp + earliest_value = self.get_last_before(timestamp) - def pop_last_before(self, timestamp: int|float) -> TimestampedObject: - """Pop all item before a given timestamped value and return the last one.""" + popped_value = self.pop(0) - # get last item before given timestamp - earliest_value = self.get_last_before(timestamp) + while popped_value.timestamp != earliest_value.timestamp: + popped_value = self.pop(0) - poped_value = self.pop(0) + return popped_value - while poped_value.timestamp != earliest_value.timestamp: + def get_first_from(self, timestamp: int | float) -> TimestampedObject: + """Retrieve first item timestamp from a given timestamp value.""" - poped_value = self.pop(0) + ts_list = self.timestamps() + first_from_index = bisect.bisect_left(ts_list, timestamp) - return poped_value + if first_from_index < len(self): - def get_first_from(self, timestamp: int|float) -> TimestampedObject: - """Retreive first item timestamp from a given timestamp value.""" + return self[ts_list[first_from_index]] - ts_list = self.timestamps() - first_from_index = bisect.bisect_left(ts_list, timestamp) + else: - if first_from_index < len(self): + raise KeyError(f'No data stored after {timestamp} timestamp.') - return self[ts_list[first_from_index]] - - else: - - raise KeyError(f'No data stored after {timestamp} timestamp.') + def get_last_before(self, timestamp: int | float) -> TimestampedObject: + """Retrieve last item timestamp before a given timestamp value.""" - def get_last_before(self, timestamp: int|float) -> TimestampedObject: - """Retreive last item timestamp before a given timestamp value.""" + ts_list = self.timestamps() + last_before_index = bisect.bisect_left(ts_list, timestamp) - 1 - ts_list = self.timestamps() - last_before_index = bisect.bisect_left(ts_list, timestamp) - 1 + if last_before_index >= 0: - if last_before_index >= 0: + return self[ts_list[last_before_index]] - return self[ts_list[last_before_index]] - - else: - - raise KeyError(f'No data stored before {timestamp} timestamp.') - - def get_last_until(self, timestamp: int|float) -> TimestampedObject: - """Retreive last item timestamp until a given timestamp value.""" + else: + + raise KeyError(f'No data stored before {timestamp} timestamp.') + + def get_last_until(self, timestamp: int | float) -> TimestampedObject: + """Retrieve last item timestamp until a given timestamp value.""" - ts_list = self.timestamps() - last_until_index = bisect.bisect_right(ts_list, timestamp) - 1 + ts_list = self.timestamps() + last_until_index = bisect.bisect_right(ts_list, timestamp) - 1 - if last_until_index >= 0: + if last_until_index >= 0: - return self[ts_list[last_until_index]] - - else: - - raise KeyError(f'No data stored until {timestamp} timestamp.') + return self[ts_list[last_until_index]] - def plot(self, names=[], colors=[], split={}, samples=None) -> list: - """Plot as [matplotlib](https://matplotlib.org/) time chart.""" + else: - df = self.as_dataframe(split=split) - legend_patches = [] + raise KeyError(f'No data stored until {timestamp} timestamp.') - # decimate data - if samples != None: + def plot(self, names=[], colors=[], split={}, samples=None) -> list: + """Plot as [matplotlib](https://matplotlib.org/) time chart.""" - if samples < len(df): + df = self.as_dataframe(split=split) + legend_patches = [] - step = int(len(df) / samples) + 1 - df = df.iloc[::step, :] + # decimate data + if samples != None: - for name, color in zip(names, colors): + if samples < len(df): + step = int(len(df) / samples) + 1 + df = df.iloc[::step, :] - markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) - mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) - mpyplot.setp(stemlines, color=color, linewidth=1) - mpyplot.setp(baseline, color=color, linewidth=1) + for name, color in zip(names, colors): + markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) + mpyplot.setp(markerline, color=color, linewidth=1, markersize=1) + mpyplot.setp(stemlines, color=color, linewidth=1) + mpyplot.setp(baseline, color=color, linewidth=1) - legend_patches.append(mpatches.Patch(color=color, label=name.upper())) + legend_patches.append(mpatches.Patch(color=color, label=name.upper())) + + return legend_patches - return legend_patches class SharedObject(TimestampedObject): - """Abstract class to enable multiple threads sharing for timestamped object.""" + """Abstract class to enable multiple threads sharing for timestamped object.""" - def __init__(self, timestamp: int|float = math.nan): + def __init__(self, timestamp: int | float = math.nan): + TimestampedObject.__init__(self, timestamp) + self._lock = threading.Lock() + self._execution_times = {} + self._exceptions = {} - TimestampedObject.__init__(self, timestamp) - self._lock = threading.Lock() - self._execution_times = {} - self._exceptions = {} class TimestampedException(Exception, TimestampedObject): - """Wrap exception to keep track of raising timestamp.""" + """Wrap exception to keep track of raising timestamp.""" - def __init__(self, exception = Exception, timestamp: int|float = math.nan): + def __init__(self, exception=Exception, timestamp: int | float = math.nan): + Exception.__init__(self, exception) + TimestampedObject.__init__(self, timestamp) - Exception.__init__(self, exception) - TimestampedObject.__init__(self, timestamp) class TimestampedExceptions(TimestampedObjectsList): - """Handle timestamped exceptions into a list.""" - - def __init__(self, exceptions: list = []): + """Handle timestamped exceptions into a list.""" + + def __init__(self, exceptions: list = []): + TimestampedObjectsList.__init__(self, TimestampedException, exceptions) - TimestampedObjectsList.__init__(self, TimestampedException, exceptions) + def values(self) -> list[str]: + """Get all timestamped exception values as list of messages.""" + return [ts_exception.message for ts_exception in self] - def values(self) -> list[str]: - """Get all timestamped exception values as list of messages.""" - return [ts_exception.message for ts_exception in self] class PipelineStepLoadingFailed(Exception): - """ + """ Exception raised when pipeline step object loading fails. """ - def __init__(self, message): - super().__init__(message) - -class TimestampedImage(numpy.ndarray, TimestampedObject): - """Wrap numpy.array to timestamp image.""" + def __init__(self, message): + super().__init__(message) - def __new__(cls, array: numpy.array, timestamp: int|float = math.nan): - return numpy.ndarray.__new__(cls, array.shape, dtype = array.dtype, buffer = array) +class TimestampedImage(numpy.ndarray, TimestampedObject): + """Wrap numpy.array to timestamp image.""" - def __init__(self, array: numpy.array, timestamp: int|float = math.nan): + def __new__(cls, array: numpy.array, timestamp: int | float = math.nan): + return numpy.ndarray.__new__(cls, array.shape, dtype=array.dtype, buffer=array) - TimestampedObject.__init__(self, timestamp) + def __init__(self, array: numpy.array, timestamp: int | float = math.nan): + TimestampedObject.__init__(self, timestamp) - def __array_finalize__(self, obj): + def __array_finalize__(self, obj): + pass - pass + @property + def size(self) -> list: + """Return list with width and height.""" + return list(self.shape[0:2][::-1]) - @property - def size(self) -> list: - """Return list with width and heigth.""" - return list(self.shape[0:2][::-1]) class TimestampedImages(TimestampedObjectsList): - """Handle timestamped images into a list.""" - - def __init__(self, images: list = []): + """Handle timestamped images into a list.""" + + def __init__(self, images: list = []): + TimestampedObjectsList.__init__(self, TimestampedImage, images) - TimestampedObjectsList.__init__(self, TimestampedImage, images) def PipelineStepInit(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step __init__ method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step __init__ method.""" - def wrapper(self, **kwargs): - """Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call. + def wrapper(self, **kwargs): + """Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call. Parameters: + self: kwargs: any arguments defined by PipelineStepMethodInit. """ - # Init pipeline step object attributes - PipelineStepObject.__init__(self) + # Init pipeline step object attributes + PipelineStepObject.__init__(self) - # Init class attributes - method(self, **kwargs) + # Init class attributes + method(self, **kwargs) - # Update all attributes - self.update_attributes(kwargs) + # Update all attributes + self.update_attributes(kwargs) + + return wrapper - return wrapper def PipelineStepEnter(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step __enter__ method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step __enter__ method.""" + + def wrapper(self): + """Wrap pipeline step __enter__ method to call super, observers and children __enter__ method.""" - def wrapper(self): - """Wrap pipeline step __enter__ method to call super, observers and children __enter__ method.""" + logging.debug('%s.__enter__', get_class_path(self)) - logging.debug('%s.__enter__', get_class_path(self)) + method(self) - method(self) + return PipelineStepObject.__enter__(self) - return PipelineStepObject.__enter__(self) + return wrapper - return wrapper def PipelineStepExit(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step __exit__ method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step __exit__ method.""" - def wrapper(self, *args): - """Wrap pipeline step __exit__ method to call super, observers and children __exit__ method.""" + def wrapper(self, *args): + """Wrap pipeline step __exit__ method to call super, observers and children __exit__ method.""" - logging.debug('%s.__exit__', get_class_path(self)) + logging.debug('%s.__exit__', get_class_path(self)) - PipelineStepObject.__exit__(self, *args) + PipelineStepObject.__exit__(self, *args) - method(self, *args) + method(self, *args) + + return wrapper - return wrapper def PipelineStepAttributeSetter(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step attribute setter.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step attribute setter.""" - def wrapper(self, new_value, unwrap: bool = False): - """Wrap pipeline step attribute setter to load attribute from file. + def wrapper(self, new_value, unwrap: bool = False): + """Wrap pipeline step attribute setter to load attribute from file. - Parameters: - new_value: value used to set attribute. - unwrap: call wrapped method directly. - """ - if unwrap: + Parameters: + self: + new_value: value used to set attribute. + unwrap: call wrapped method directly. + """ + if unwrap: + return method(self, new_value) - return method(self, new_value) + # Get new value type + new_value_type = type(new_value) - # Get new value type - new_value_type = type(new_value) + # Check setter annotations to get expected value type + try: - # Check setter annotations to get expected value type - try: + expected_value_type = list(method.__annotations__.values())[0] - expected_value_type = list(method.__annotations__.values())[0] + except KeyError: - except KeyError: + raise ( + PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}')) - raise(PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}')) + logging.debug('%s@%s.setter', get_class_path(self), method.__name__) + logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__) - logging.debug('%s@%s.setter', get_class_path(self), method.__name__) - logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__) + # String not expected: load value from file + if new_value_type == str and new_value_type != expected_value_type: - # String not expected: load value from file - if new_value_type == str and new_value_type != expected_value_type: + split_point = new_value.split('.') - split_point = new_value.split('.') + # String have a dot inside: file path with format + if len(split_point) > 1: - # String have a dot inside: file path with format - if len(split_point) > 1: + file_format = split_point[-1].upper() - file_format = split_point[-1].upper() + logging.debug('\t> %s is a path to a %s file', new_value, file_format) - logging.debug('\t> %s is a path to a %s file', new_value, file_format) - - filepath = os.path.join(get_working_directory(), new_value) + filepath = os.path.join(get_working_directory(), new_value) - # Load image from JPG and PNG formats - if file_format == 'JPG' or file_format == 'PNG': + # Load image from JPG and PNG formats + if file_format == 'JPG' or file_format == 'PNG': - return method(self, TimestampedImage(cv2.imread(filepath))) + return method(self, TimestampedImage(cv2.imread(filepath))) - # Load image from OBJ formats - elif file_format == 'OBJ': + # Load image from OBJ formats + elif file_format == 'OBJ': - return method(self, expected_value_type.from_obj(filepath)) + return method(self, expected_value_type.from_obj(filepath)) - # Load object from JSON file - elif file_format == 'JSON': + # Load object from JSON file + elif file_format == 'JSON': - with open(filepath) as file: + with open(filepath) as file: - return method(self, from_dict(expected_value_type, json.load(file))) + return method(self, from_dict(expected_value_type, json.load(file))) - # No point inside string: identifier name - else: + # No point inside string: identifier name + else: - logging.debug('\t> %s is an identifier', new_value) - logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__) - - return method(self, expected_value_type(new_value)) + logging.debug('\t> %s is an identifier', new_value) + logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__) - # Dict not expected: load value from dict - if new_value_type == dict and expected_value_type != dict: + return method(self, expected_value_type(new_value)) - return method(self, from_dict(expected_value_type, new_value)) + # Dict not expected: load value from dict + if new_value_type == dict and expected_value_type != dict: + return method(self, from_dict(expected_value_type, new_value)) - # Otherwise, pass new value to setter method - logging.debug('\t> use %s value as passed', new_value_type.__name__) + # Otherwise, pass new value to setter method + logging.debug('\t> use %s value as passed', new_value_type.__name__) - method(self, new_value) + method(self, new_value) + + return wrapper - return wrapper def PipelineStepImage(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step image method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step image method.""" + + def wrapper(self, **kwargs) -> numpy.array: + """Wrap pipeline step image method.""" - def wrapper(self, **kwargs) -> numpy.array: - """Wrap pipeline step image method.""" + if kwargs: - if kwargs: + logging.debug('\t> using kwargs') - logging.debug('\t> using kwargs') + return method(self, **kwargs) - return method(self, **kwargs) + else: - else: + logging.debug('\t> using image_parameters') - logging.debug('\t> using image_parameters') + return method(self, **self.image_parameters) - return method(self, **self.image_parameters) + return wrapper - return wrapper def PipelineStepDraw(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step draw method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step draw method.""" - def wrapper(self, image: numpy.array, **kwargs): - """Wrap pipeline step draw method.""" + def wrapper(self, image: numpy.array, **kwargs): + """Wrap pipeline step draw method.""" - if kwargs: + if kwargs: - logging.debug('\t> using kwargs') + logging.debug('\t> using kwargs') - method(self, image, **kwargs) + method(self, image, **kwargs) - else: + else: - logging.debug('\t> using draw_parameters') + logging.debug('\t> using draw_parameters') - method(self, image, **self.draw_parameters) + method(self, image, **self.draw_parameters) - return wrapper + return wrapper + +# noinspection PyAttributeOutsideInit class PipelineStepObject(): - """ + """ Define class to assess pipeline step methods execution time and observe them. """ - __initialized = False - - def __init__(self): - """Initialize PipelineStepObject.""" - - if not self.__initialized: + __initialized = False - logging.debug('%s.__init__', get_class_path(self)) + def __init__(self): + """Initialize PipelineStepObject.""" - # Init private attributes - self.__initialized = True - self.__name = None - self.__observers = [] - self.__execution_times = {} - self.__image_parameters = {} + if not self.__initialized: + logging.debug('%s.__init__', get_class_path(self)) - # Init protected attributes - self._image_parameters = {} - self._draw_parameters = {} - - # Parent attribute will be setup later by parent it self - self.__parent = None + # Init private attributes + self.__initialized = True + self.__name = None + self.__observers = [] + self.__execution_times = {} + self.__image_parameters = {} - def __enter__(self): - """Define default method to enter into pipeline step object context.""" + # Init protected attributes + self._image_parameters = {} + self._draw_parameters = {} - # Start children pipeline step objects - for child in self.children: + # Parent attribute will be setup later by parent itself + self.__parent = None - child.__enter__() + def __enter__(self): + """Define default method to enter into pipeline step object context.""" - # Start observers - for observer in self.observers: + # Start children pipeline step objects + for child in self.children: + child.__enter__() - observer.__enter__() + # Start observers + for observer in self.observers: + observer.__enter__() - return self + return self - def __exit__(self, exception_type, exception_value, exception_traceback): - """Define default method to exit from pipeline step object context.""" + def __exit__(self, exception_type, exception_value, exception_traceback): + """Define default method to exit from pipeline step object context.""" - # Stop observers - for observer in self.observers: + # Stop observers + for observer in self.observers: + observer.__exit__(exception_type, exception_value, exception_traceback) - observer.__exit__(exception_type, exception_value, exception_traceback) + # Stop children pipeline step objects + for child in self.children: + child.__exit__(exception_type, exception_value, exception_traceback) - # Stop children pipeline step objects - for child in self.children: + def update_attributes(self, object_data: dict): + """Update pipeline step object attributes with dictionary.""" - child.__exit__(exception_type, exception_value, exception_traceback) + for key, value in object_data.items(): - def update_attributes(self, object_data: dict): - """Update pipeline step object attributes with dictionary.""" + if hasattr(self, key): - for key, value in object_data.items(): + logging.debug('%s.update_attributes > update %s with %s value', get_class_path(self), key, + type(value).__name__) - if hasattr(self, key): + setattr(self, key, value) - logging.debug('%s.update_attributes > update %s with %s value', get_class_path(self), key, type(value).__name__) + else: - setattr(self, key, value) + raise (AttributeError(f'{get_class_path(self)} has not {key} attribute.')) - else: + @property + def name(self) -> str: + """Get pipeline step object's name.""" + return self.__name - raise(AttributeError(f'{get_class_path(self)} has not {key} attribute.')) + @name.setter + def name(self, name: str): + """Set pipeline step object's name.""" + self.__name = name - @property - def name(self) -> str: - """Get pipeline step object's name.""" - return self.__name + @property + def parent(self) -> object: + """Get pipeline step object's parent object.""" + return self.__parent - @name.setter - def name(self, name: str): - """Set pipeline step object's name.""" - self.__name = name + @parent.setter + def parent(self, parent: object): + """Set layer's parent object.""" + self.__parent = parent - @property - def parent(self) -> object: - """Get pipeline step object's parent object.""" - return self.__parent + @property + def observers(self) -> list: + """Pipeline step object observers list.""" + return self.__observers - @parent.setter - def parent(self, parent: object): - """Set layer's parent object.""" - self.__parent = parent + @observers.setter + @PipelineStepAttributeSetter + def observers(self, observers: list): - @property - def observers(self) -> list: - """Pipeline step object observers list.""" - return self.__observers + # Edit new observers dictionary + self.__observers = observers - @observers.setter - @PipelineStepAttributeSetter - def observers(self, observers: list): + @property + def execution_times(self): + """Get pipeline step object observers execution times dictionary.""" + return self.__execution_times - # Edit new observers dictionary - self.__observers = observers + @property + def image_parameters(self) -> dict: + """image method parameters dictionary.""" + return self._image_parameters - @property - def execution_times(self): - """Get pipeline step object observers execution times dictionary.""" - return self.__execution_times - - @property - def image_parameters(self) -> dict: - """image method parameters dictionary.""" - return self._image_parameters + @image_parameters.setter + @PipelineStepAttributeSetter + def image_parameters(self, image_parameters: dict): - @image_parameters.setter - @PipelineStepAttributeSetter - def image_parameters(self, image_parameters: dict): + self._image_parameters = image_parameters - self._image_parameters = image_parameters + @property + def draw_parameters(self) -> dict: + """draw method parameters dictionary.""" + return self._draw_parameters - @property - def draw_parameters(self) -> dict: - """draw method parameters dictionary.""" - return self._draw_parameters + @draw_parameters.setter + @PipelineStepAttributeSetter + def draw_parameters(self, draw_parameters: dict): - @draw_parameters.setter - @PipelineStepAttributeSetter - def draw_parameters(self, draw_parameters: dict): + self._draw_parameters = draw_parameters - self._draw_parameters = draw_parameters - - def as_dict(self) -> dict: - """Export PipelineStepObject attributes as dictionary. + def as_dict(self) -> dict: + """Export PipelineStepObject attributes as dictionary. Returns: object_data: dictionary with pipeline step object attributes values. """ - return { - "name": self.__name, - "observers": self.__observers - } - - def to_json(self, json_filepath: str = None): - """Save pipeline step object into .json file.""" - - # Remember file path to ease rewriting - if json_filepath is not None: + return { + "name": self.__name, + "observers": self.__observers + } - self.__json_filepath = json_filepath + # noinspection PyAttributeOutsideInit + def to_json(self, json_filepath: str = None): + """Save pipeline step object into .json file.""" - # Open file - with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: + # Remember file path to ease rewriting + if json_filepath is not None: + # noinspection PyAttributeOutsideInit + self.__json_filepath = json_filepath - json.dump({self.__class__.__module__:as_dict(self)}, object_file, ensure_ascii=False, indent=4) + # Open file + with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: + json.dump({self.__class__.__module__: as_dict(self)}, object_file, ensure_ascii=False, indent=4) - # QUESTION: maybe we need two saving mode? - #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder) + # QUESTION: maybe we need two saving mode? + #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder) - def __str__(self) -> str: - """ + def __str__(self) -> str: + """ String representation of pipeline step object. Returns: String representation """ - logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '') - - tabs = self.tabulation - output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n' - - if self.__name is not None: - output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n' - - if self.__parent is not None: - output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n' - - if len(self.__observers): - output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n' - for observer in self.__observers: - output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n' - - for name, value in self.properties: + logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '') - logging.debug('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) + tabs = self.tabulation + output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n' - output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: ' + if self.__name is not None: + output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n' - if type(value) == dict: + if self.__parent is not None: + output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n' - output += '\n' + if len(self.__observers): + output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n' + for observer in self.__observers: + output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n' - for k, v in value.items(): + for name, value in self.properties: - output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n' + logging.debug('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) - elif type(value) == list: + output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: ' - output += '\n' + if type(value) == dict: - for v in value: + output += '\n' - output += f'{tabs}\t - {v}\n' + for k, v in value.items(): + output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n' - elif type(value) == numpy.ndarray or type(value) == TimestampedImage: + elif type(value) == list: - output += f'numpy.array{value.shape}\n' + output += '\n' - elif type(value) == pandas.DataFrame: + for v in value: + output += f'{tabs}\t - {v}\n' - output += f'pandas.DataFrame{value.shape}\n' + elif type(value) == numpy.ndarray or type(value) == TimestampedImage: - else: + output += f'numpy.array{value.shape}\n' - try: + elif type(value) == pandas.DataFrame: - output += f'{value}' + output += f'pandas.DataFrame{value.shape}\n' - except TypeError as e: + else: - logging.error('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) + try: - output += f'{Fore.RED}{Style.BRIGHT}!!! {get_class_path(self)}.{name}: {e}{Style.RESET_ALL}\n\n' + output += f'{value}' - if output[-1] != '\n': + except TypeError as e: - output += '\n' + logging.error('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) - return output + output += f'{Fore.RED}{Style.BRIGHT}!!! {get_class_path(self)}.{name}: {e}{Style.RESET_ALL}\n\n' - @property - def tabulation(self) -> str: - """Edit tabulation string according parents number.""" + if output[-1] != '\n': + output += '\n' - tabs = '' - parent = self.__parent + return output - while (parent is not None): + @property + def tabulation(self) -> str: + """Edit tabulation string according parents number.""" - tabs += '\t' - parent = parent.parent + tabs = '' + parent = self.__parent - return tabs + while (parent is not None): + tabs += '\t' + parent = parent.parent - @property - def properties(self) -> tuple[name, any]: - """Iterate over pipeline step properties values.""" + return tabs - properties = [name for name, item in self.__class__.__dict__.items() if isinstance(item, property)] + @property + def properties(self) -> tuple[name, any]: + """Iterate over pipeline step properties values.""" - for base in self.__class__.__bases__: + properties = [name for name, item in self.__class__.__dict__.items() if isinstance(item, property)] - if base != PipelineStepObject and base != SharedObject: + for base in self.__class__.__bases__: - for name, item in base.__dict__.items(): + if base != PipelineStepObject and base != SharedObject: - if isinstance(item, property) and not name in properties: + for name, item in base.__dict__.items(): - properties.append(name) + if isinstance(item, property) and not name in properties: + properties.append(name) - for name in properties: + for name in properties: + yield name, getattr(self, name) - yield name, getattr(self, name) + @property + def children(self) -> object: + """Iterate over children pipeline step objects.""" - @property - def children(self) -> object: - """Iterate over children pipeline step objects.""" + for name, value in self.properties: - for name, value in self.properties: + # Pipeline step object attribute + if issubclass(type(value), PipelineStepObject) and value != self.parent: - # Pipeline step object attribute - if issubclass(type(value), PipelineStepObject) and value != self.parent: + yield value - yield value + # Pipeline step objects list attribute + elif type(value) == list: - # Pipeline step objects list attribute - elif type(value) == list: + for p in value: - for p in value: + if issubclass(type(p), PipelineStepObject): + yield p - if issubclass(type(p), PipelineStepObject): + # Pipeline step objects list attribute + elif type(value) == dict: - yield p + for p in value.values(): - # Pipeline step objects list attribute - elif type(value) == dict: + if issubclass(type(p), PipelineStepObject): + yield p - for p in value.values(): - - if issubclass(type(p), PipelineStepObject): - - yield p def PipelineStepMethod(method): - """Define a decorator use into PipelineStepObject class to declare pipeline method. - - !!! danger - PipelineStepMethod must have a timestamp as first argument. - """ + """Define a decorator use into PipelineStepObject class to declare pipeline method. - def wrapper(self, *args, timestamp: int|float = None, unwrap: bool = False, catch_exceptions: bool = True, **kwargs): - """Wrap pipeline step method to measure execution time. + !!! danger + PipelineStepMethod must have a timestamp as first argument. + """ - Parameters: - args: any arguments defined by PipelineStepMethod. - timestamp: optional method call timestamp (unit does'nt matter) if first args parameter is not a TimestampedObject instance. - unwrap: extra arguments used in wrapper function to call wrapped method directly. - catch_exceptions: extra arguments used in wrapper function to catch exception. - """ - if timestamp is None and len(args) > 0: + def wrapper(self, *args, timestamp: int | float = None, unwrap: bool = False, catch_exceptions: bool = True, **kwargs): + """Wrap pipeline step method to measure execution time. - if issubclass(type(args[0]), TimestampedObject): + Parameters: + self: + args: any arguments defined by PipelineStepMethod. + timestamp: optional method call timestamp (unit doesn't matter) if first args parameter is not a + TimestampedObject instance. + unwrap: extra arguments used in wrapper function to call wrapped method directly. + catch_exceptions: extra arguments used in wrapper function to catch exception. + """ + if timestamp is None and len(args) > 0: - timestamp = args[0].timestamp + if issubclass(type(args[0]), TimestampedObject): - else: + timestamp = args[0].timestamp - logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.', get_class_path(self), method.__name__, type(args[0]).__name__) + else: - if unwrap: + logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.', get_class_path(self), method.__name__, type(args[0]).__name__) - return method(self, *args, **kwargs) + if unwrap: + return method(self, *args, **kwargs) - # Initialize execution time assessment - start = time.perf_counter() - exception = None - result = None + # Initialize execution time assessment + start = time.perf_counter() + exception = None + result = None - if not catch_exceptions: + if not catch_exceptions: - # Execute wrapped method without catching exceptions - result = method(self, *args, **kwargs) + # Execute wrapped method without catching exceptions + result = method(self, *args, **kwargs) - # Measure execution time - self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 - - else: - - try: + # Measure execution time + self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 - # Execute wrapped method - result = method(self, *args, **kwargs) + else: - except Exception as e: + try: - exception = e + # Execute wrapped method + result = method(self, *args, **kwargs) - finally: + except Exception as e: - # Measure execution time - self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 + exception = e - # Notify observers that method has been called - subscription_name = f'on_{method.__name__}' + finally: - for observer in self.observers: + # Measure execution time + self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 - # Does the observer cares about this method? - if subscription_name in dir(observer): + # Notify observers that method has been called + subscription_name = f'on_{method.__name__}' - subscription = getattr(observer, subscription_name) + for observer in self.observers: - # Call subscription - subscription(timestamp, self, exception) + # Does the observer cares about this method? + if subscription_name in dir(observer): + subscription = getattr(observer, subscription_name) - # Raise timestamped exception - if exception is not None: + # Call subscription + subscription(timestamp, self, exception) - raise TimestampedException(exception, timestamp) + # Raise timestamped exception + if exception is not None: + raise TimestampedException(exception, timestamp) - return result + return result - return wrapper + return wrapper diff --git a/src/argaze/GazeAnalysis/Basic.py b/src/argaze/GazeAnalysis/Basic.py index 063ee2b..ec98b30 100644 --- a/src/argaze/GazeAnalysis/Basic.py +++ b/src/argaze/GazeAnalysis/Basic.py @@ -19,7 +19,6 @@ __license__ = "GPLv3" from argaze import GazeFeatures, DataFeatures -import numpy class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): """Basic scan path analysis.""" @@ -66,7 +65,9 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): """Scan path step fixation durations average.""" return self.__step_fixation_durations_average - + + +# noinspection PyAttributeOutsideInit class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): """Basic AOI scan path analysis.""" @@ -89,6 +90,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__steps_number = len(aoi_scan_path) sum_fixation_durations = 0 + # noinspection PyAttributeOutsideInit self.__sum_aoi_fixation_durations = {} for aoi_scan_step in aoi_scan_path: diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py index 9e2aa77..6847f44 100644 --- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py +++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py @@ -17,13 +17,12 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import math +import cv2 +import numpy from argaze import GazeFeatures, DataFeatures from argaze.AreaOfInterest import AOIFeatures -import numpy -import cv2 class AOIMatcher(GazeFeatures.AOIMatcher): """Matching algorithm based on fixation's deviation circle coverage over AOI.""" @@ -58,7 +57,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__matched_region = None @DataFeatures.PipelineStepMethod - def match(self, gaze_movement, aoi_scene) -> tuple[str, AOIFeatures.AreaOfInterest]: + def match(self, gaze_movement: GazeFeatures.GazeMovement, aoi_scene) -> tuple[str, AOIFeatures.AreaOfInterest]: """Returns AOI with the maximal fixation's deviation circle coverage if above coverage threshold.""" if GazeFeatures.is_fixation(gaze_movement): @@ -96,9 +95,11 @@ class AOIMatcher(GazeFeatures.AOIMatcher): if max_coverage > 0: # Update looked aoi data + # noinspection PyAttributeOutsideInit self.__looked_aoi_data = most_likely_looked_aoi_data # Calculate circle ratio means as looked probabilities + # noinspection PyAttributeOutsideInit self.__looked_probabilities = {} for aoi_name, circle_ratio_sum in self.__circle_ratio_sum.items(): @@ -109,9 +110,11 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__looked_probabilities[aoi_name] = circle_ratio_mean if circle_ratio_mean < 1 else 1 # Update matched gaze movement + # noinspection PyAttributeOutsideInit self.__matched_gaze_movement = gaze_movement # Update matched region + # noinspection PyAttributeOutsideInit self.__matched_region = matched_region # Return @@ -133,11 +136,13 @@ class AOIMatcher(GazeFeatures.AOIMatcher): Parameters: image: where to draw aoi_scene: to refresh looked aoi if required - draw_matched_fixation: Fixation.draw parameters (which depends of the loaded gaze movement identifier module, if None, no fixation is drawn) + draw_matched_fixation: Fixation.draw parameters (which depends on the loaded + gaze movement identifier module, if None, no fixation is drawn) draw_matched_region: AOIFeatures.AOI.draw parameters (if None, no matched region is drawn) draw_looked_aoi: AOIFeatures.AOI.draw parameters (if None, no looked aoi is drawn) + update_looked_aoi: looked_aoi_name_color: color of text (if None, no looked aoi name is drawn) - looked_aoi_name_offset: ofset of text from the upper left aoi bounding box corner + looked_aoi_name_offset: offset of text from the upper left aoi bounding box corner """ if self.__matched_gaze_movement is not None: @@ -156,6 +161,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): try: + # noinspection PyAttributeOutsideInit self.__looked_aoi_data = (self.looked_aoi_name(), aoi_scene[self.looked_aoi_name()]) except KeyError: diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index 0864b18..b8451f0 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -17,12 +17,11 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import math +import cv2 +import numpy from argaze import GazeFeatures, DataFeatures -import numpy -import cv2 class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" @@ -57,13 +56,16 @@ class Fixation(GazeFeatures.Fixation): return min(deviations_array) <= self.deviation_max - def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None): + def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, + duration_factor: float = 1., draw_positions: dict = None): """Draw fixation into image. Parameters: + image: where to draw deviation_circle_color: color of circle representing fixation's deviation duration_border_color: color of border representing fixation's duration duration_factor: how many pixels per duration unit + draw_positions: """ # Draw duration border if required @@ -92,6 +94,7 @@ class Saccade(GazeFeatures.Saccade): """Draw saccade into image. Parameters: + image: where to draw line_color: color of line from first position to last position """ diff --git a/src/argaze/GazeAnalysis/Entropy.py b/src/argaze/GazeAnalysis/Entropy.py index a73901e..5bac43e 100644 --- a/src/argaze/GazeAnalysis/Entropy.py +++ b/src/argaze/GazeAnalysis/Entropy.py @@ -17,11 +17,11 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" +import numpy + from argaze import GazeFeatures, DataFeatures from argaze.GazeAnalysis import TransitionMatrix -import pandas -import numpy class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): """Implementation of entropy algorithm as described in: @@ -66,12 +66,12 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): scan_fixations_count, aoi_fixations_count = aoi_scan_path.fixations_count() # Probability to have a fixation onto each aoi - stationary_probalities = {aoi: count/scan_fixations_count for aoi, count in aoi_fixations_count.items()} + stationary_probabilities = {aoi: count/scan_fixations_count for aoi, count in aoi_fixations_count.items()} # Stationary entropy self.__stationary_entropy = 0 - for aoi, p in stationary_probalities.items(): + for aoi, p in stationary_probabilities.items(): self.__stationary_entropy += p * numpy.log(p + 1e-9) @@ -84,7 +84,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): for aoi, s in destination_p_log_sum.items(): - self.__transition_entropy += s * stationary_probalities[aoi] + self.__transition_entropy += s * stationary_probabilities[aoi] self.__transition_entropy *= -1 diff --git a/src/argaze/GazeAnalysis/FocusPointInside.py b/src/argaze/GazeAnalysis/FocusPointInside.py index dbcb438..5d26650 100644 --- a/src/argaze/GazeAnalysis/FocusPointInside.py +++ b/src/argaze/GazeAnalysis/FocusPointInside.py @@ -17,13 +17,12 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import math +import cv2 +import numpy from argaze import GazeFeatures, DataFeatures from argaze.AreaOfInterest import AOIFeatures -import numpy -import cv2 class AOIMatcher(GazeFeatures.AOIMatcher): """Matching algorithm based on fixation's focus point.""" @@ -34,7 +33,8 @@ class AOIMatcher(GazeFeatures.AOIMatcher): # Init AOIMatcher class super().__init__() - self.__reset() + self.__looked_aoi_data = (None, None) + self.__matched_gaze_movement = None def __reset(self): @@ -42,7 +42,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__matched_gaze_movement = None @DataFeatures.PipelineStepMethod - def match(self, gaze_movement, aoi_scene) -> tuple[str, AOIFeatures.AreaOfInterest]: + def match(self, gaze_movement: GazeFeatures.GazeMovement, aoi_scene) -> tuple[str, AOIFeatures.AreaOfInterest]: """Returns AOI containing fixation focus point.""" if GazeFeatures.is_fixation(gaze_movement): @@ -65,16 +65,18 @@ class AOIMatcher(GazeFeatures.AOIMatcher): return (None, None) - def draw(self, image: numpy.array, aoi_scene: AOIFeatures.AOIScene, draw_matched_fixation: dict = None, draw_looked_aoi: dict = None, looked_aoi_name_color: tuple = None, looked_aoi_name_offset: tuple = (0, 0)): + def draw(self, image: numpy.array, aoi_scene: AOIFeatures.AOIScene, draw_matched_fixation: dict = None, + draw_looked_aoi: dict = None, looked_aoi_name_color: tuple = None, looked_aoi_name_offset: tuple = (0, 0)): """Draw matching into image. Parameters: image: where to draw aoi_scene: to refresh looked aoi if required - draw_matched_fixation: Fixation.draw parameters (which depends of the loaded gaze movement identifier module, if None, no fixation is drawn) + draw_matched_fixation: Fixation.draw parameters (which depends on the loaded gaze movement identifier + module, if None, no fixation is drawn) draw_looked_aoi: AOIFeatures.AOI.draw parameters (if None, no looked aoi is drawn) looked_aoi_name_color: color of text (if None, no looked aoi name is drawn) - looked_aoi_name_offset: ofset of text from the upper left aoi bounding box corner + looked_aoi_name_offset: offset of text from the upper left aoi bounding box corner """ if self.__matched_gaze_movement is not None: diff --git a/src/argaze/GazeAnalysis/LinearRegression.py b/src/argaze/GazeAnalysis/LinearRegression.py index 00fd649..5a823a1 100644 --- a/src/argaze/GazeAnalysis/LinearRegression.py +++ b/src/argaze/GazeAnalysis/LinearRegression.py @@ -72,7 +72,9 @@ class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator): def reset(self): """Reset observed and expected gaze positions.""" + # noinspection PyAttributeOutsideInit self.__observed_positions = [] + # noinspection PyAttributeOutsideInit self.__expected_positions = [] self.__linear_regression = None diff --git a/src/argaze/GazeAnalysis/NearestNeighborIndex.py b/src/argaze/GazeAnalysis/NearestNeighborIndex.py index a577eba..81bab22 100644 --- a/src/argaze/GazeAnalysis/NearestNeighborIndex.py +++ b/src/argaze/GazeAnalysis/NearestNeighborIndex.py @@ -26,7 +26,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): """Implementation of Nearest Neighbor Index algorithm as described in: **Di Nocera F., Terenzi M., Camilli M. (2006).** - *Another look at scanpath: distance to nearest neighbour as a measure of mental workload.* + *Another look at scan path: distance to nearest neighbour as a measure of mental workload.* Developments in Human Factors in Transportation, Design, and Evaluation. [https://www.researchgate.net](https://www.researchgate.net/publication/239470608_Another_look_at_scanpath_distance_to_nearest_neighbour_as_a_measure_of_mental_workload) """ @@ -63,10 +63,10 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): # Compute inter fixation distances distances = cdist(fixations_focus, fixations_focus) - # Find minimal distances between each fixations + # Find minimal distances between each fixation minimums = numpy.apply_along_axis(lambda row: numpy.min(row[numpy.nonzero(row)]), 1, distances) - # Average of minimun distances + # Average of minimum distances dNN = numpy.sum(minimums / len(fixations_focus)) # Mean random distance diff --git a/src/argaze/GazeAnalysis/TransitionMatrix.py b/src/argaze/GazeAnalysis/TransitionMatrix.py index dd5cf87..8012f5e 100644 --- a/src/argaze/GazeAnalysis/TransitionMatrix.py +++ b/src/argaze/GazeAnalysis/TransitionMatrix.py @@ -17,10 +17,11 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" +import numpy +import pandas + from argaze import GazeFeatures, DataFeatures -import pandas -import numpy class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): """Implementation of transition matrix probabilities and density algorithm as described in: @@ -49,7 +50,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): row_sum = aoi_scan_path.transition_matrix.apply(lambda row: row.sum(), axis=1) # Editing transition matrix probabilities - # Note: when no transiton starts from an aoi, destination probabilites is equal to 1/S where S is the number of aoi + # Note: when no transition starts from an aoi, destination probabilities is equal to 1/S where S is the number of aoi self.__transition_matrix_probabilities = aoi_scan_path.transition_matrix.apply(lambda row: row.apply(lambda p: p / row_sum[row.name] if row_sum[row.name] > 0 else 1 / row_sum.size), axis=1) # Calculate matrix density diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index a0aab68..d001688 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -16,12 +16,11 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import math +import cv2 +import numpy from argaze import GazeFeatures, DataFeatures -import numpy -import cv2 class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" @@ -60,9 +59,11 @@ class Fixation(GazeFeatures.Fixation): """Draw fixation into image. Parameters: + image: where to draw deviation_circle_color: color of circle representing fixation's deviation duration_border_color: color of border representing fixation's duration duration_factor: how many pixels per duration unit + draw_positions: """ # Draw duration border if required @@ -91,6 +92,7 @@ class Saccade(GazeFeatures.Saccade): """Draw saccade into image. Parameters: + image: where to draw line_color: color of line from first position to last position """ @@ -231,9 +233,6 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Output last saccade return last_saccade if not terminate else self.current_fixation().finish() - - # Always return empty gaze movement at least - return GazeFeatures.GazeMovement() def current_gaze_movement(self) -> GazeFeatures.GazeMovement: diff --git a/src/argaze/GazeAnalysis/__init__.py b/src/argaze/GazeAnalysis/__init__.py index c110eb1..f0ba9fd 100644 --- a/src/argaze/GazeAnalysis/__init__.py +++ b/src/argaze/GazeAnalysis/__init__.py @@ -1,4 +1,16 @@ """ Various gaze movement identification, AOI matching and scan path analysis algorithms. """ -__all__ = ['Basic', 'DispersionThresholdIdentification', 'VelocityThresholdIdentification', 'TransitionMatrix', 'KCoefficient', 'LempelZivComplexity', 'NGram', 'Entropy', 'NearestNeighborIndex', 'ExploreExploitRatio', 'LinearRegression']
\ No newline at end of file +__all__ = [ + 'Basic', + 'DispersionThresholdIdentification', + 'VelocityThresholdIdentification', + 'TransitionMatrix', + 'KCoefficient', + 'LempelZivComplexity', + 'NGram', + 'Entropy', + 'NearestNeighborIndex', + 'ExploreExploitRatio', + 'LinearRegression' +]
\ No newline at end of file diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index c0e5a36..dbeee61 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -16,24 +16,24 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Self -import math import json -import importlib +import math +from typing import Self + +import cv2 +import numpy +import pandas from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures -import numpy -import pandas -import cv2 class GazePosition(tuple, DataFeatures.TimestampedObject): """Define gaze position as a tuple of coordinates with precision. Parameters: precision: the radius of a circle around value where other same gaze position measurements could be. - message: a string to describe why the the position is what it is. + message: a string to describe why the position is what it is. """ def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan): @@ -62,7 +62,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): return self.__message @classmethod - def from_dict(self, position_data: dict) -> Self: + def from_dict(cls, position_data: dict) -> Self: if 'value' in position_data.keys(): @@ -102,7 +102,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): __radd__ = __add__ def __sub__(self, position: Self) -> Self: - """Substract position. + """Subtract position. !!! note The returned position precision is the maximal precision. @@ -119,7 +119,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): return GazePosition(numpy.array(self) - numpy.array(position), timestamp=self.timestamp) def __rsub__(self, position: Self) -> Self: - """Reversed substract position. + """Reversed subtract position. !!! note The returned position precision is the maximal precision. @@ -194,7 +194,10 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze positions into a list.""" - def __init__(self, gaze_positions: list = []): + def __init__(self, gaze_positions=None): + + if gaze_positions is None: + gaze_positions = [] DataFeatures.TimestampedObjectsList.__init__(self, GazePosition, gaze_positions) @@ -215,10 +218,11 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): ''' @classmethod - def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> Self: + def from_dataframe(cls, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> Self: """Create a TimeStampedGazePositions from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). Parameters: + dataframe: timestamp: specific timestamp column label. x: specific x column label. y: specific y column label. @@ -354,11 +358,14 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): message: a string to describe why the movement is what it is. """ - def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan): + def __new__(cls, positions: TimeStampedGazePositions = None, finished: bool = False, + message: str = None, timestamp: int|float = math.nan): + # noinspection PyArgumentList return TimeStampedGazePositions.__new__(cls, positions) - def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan): + def __init__(self, positions: TimeStampedGazePositions = None, finished: bool = False, + message: str = None, timestamp: int|float = math.nan): """Initialize GazeMovement""" TimeStampedGazePositions.__init__(self, positions) @@ -428,6 +435,7 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): """Draw gaze movement positions with line between each position. Parameters: + image: where to draw position_color: color of position point line_color: color of line between each position """ @@ -610,7 +618,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): gaze_status.append(len(ts_fixations), type(gaze_movement)) - # Store gaze movment into the appropriate list + # Store gaze movement into the appropriate list if is_fixation(gaze_movement): ts_fixations.append(gaze_movement) @@ -782,7 +790,7 @@ class ScanPath(list): def append_fixation(self, fixation): """Append new fixation to scan path. !!! warning - Consecutives fixations are ignored keeping the last fixation""" + Consecutive fixations are ignored keeping the last fixation""" self.__last_fixation = fixation @@ -790,8 +798,11 @@ class ScanPath(list): """Draw scan path into image. Parameters: - draw_fixations: Fixation.draw parameters (which depends of the loaded gaze movement identifier module, if None, no fixation is drawn) - draw_saccades: Saccade.draw parameters (which depends of the loaded gaze movement identifier module, if None, no saccade is drawn) + image: where to draw + draw_fixations: Fixation.draw parameters (which depends on the loaded gaze movement identifier module, + if None, no fixation is drawn) + draw_saccades: Saccade.draw parameters (which depends on the loaded gaze movement identifier module, + if None, no saccade is drawn) deepness: number of steps back to draw """ @@ -869,7 +880,10 @@ class AOIMatcher(DataFeatures.PipelineStepObject): raise NotImplementedError('looked_aoi_name() method not implemented') class AOIScanStepError(Exception): - """Exception raised at AOIScanStep creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade.""" + """ + Exception raised at AOIScanStep creation if an aoi scan step doesn't start by a fixation or + doesn't end by a saccade. + """ def __init__(self, message, aoi=''): @@ -978,7 +992,7 @@ class AOIScanPath(list): This will clear the AOIScanPath """ - # Check expected aoi are not the same than previous ones + # Check expected aoi are not the same as previous ones if len(expected_aoi) == len(self.__expected_aoi[1:]): equal = [a == b for a, b in zip(expected_aoi, self.__expected_aoi[1:])] @@ -1031,13 +1045,19 @@ class AOIScanPath(list): super().clear() + # noinspection PyAttributeOutsideInit self.__movements = TimeStampedGazeMovements() + # noinspection PyAttributeOutsideInit self.__current_aoi = '' + # noinspection PyAttributeOutsideInit self.__index = ord('A') + # noinspection PyAttributeOutsideInit self.__aoi_letter = {} + # noinspection PyAttributeOutsideInit self.__letter_aoi = {} size = len(self.__expected_aoi) + # noinspection PyAttributeOutsideInit self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi, columns=self.__expected_aoi) def __get_aoi_letter(self, aoi): @@ -1054,7 +1074,7 @@ class AOIScanPath(list): return letter def get_letter_aoi(self, letter): - """Get which aoi is related to an unique letter.""" + """Get which aoi is related to a unique letter.""" return self.__letter_aoi[letter] @@ -1140,6 +1160,7 @@ class AOIScanPath(list): finally: # Clear movements + # noinspection PyAttributeOutsideInit self.__movements = TimeStampedGazeMovements() # Append new fixation @@ -1153,6 +1174,7 @@ class AOIScanPath(list): self.__movements.append(fixation) # Remember aoi + # noinspection PyAttributeOutsideInit self.__current_aoi = looked_aoi return None @@ -1173,7 +1195,7 @@ class AOIScanPath(list): return scan_fixations_count, aoi_fixations_count class AOIScanPathAnalyzer(DataFeatures.PipelineStepObject): - """Abstract class to define what should provide a aoi scan path analyzer.""" + """Abstract class to define what should provide an aoi scan path analyzer.""" @DataFeatures.PipelineStepInit def __init__(self, **kwargs): diff --git a/src/argaze/PupillAnalysis/WorkloadIndex.py b/src/argaze/PupilAnalysis/WorkloadIndex.py index 23c3bab..99427fe 100644 --- a/src/argaze/PupillAnalysis/WorkloadIndex.py +++ b/src/argaze/PupilAnalysis/WorkloadIndex.py @@ -18,20 +18,19 @@ __license__ = "GPLv3" import math -from argaze import DataFeatures, PupillFeatures +from argaze import DataFeatures, PupilFeatures -import numpy -class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer): - """Periodic average of pupill diameter variations to pupill diameter reference value. +class PupilDiameterAnalyzer(PupilFeatures.PupilDiameterAnalyzer): + """Periodic average of pupil diameter variations to pupil diameter reference value. Parameters: reference: base line value. period: identification period length. """ - def __init__(self, reference: PupillFeatures.PupillDiameter, period: int|float = 1): + def __init__(self, reference: PupilFeatures.PupilDiameter, period: int|float = 1): - assert(not math.isnan(self.__reference)) + assert(not math.isnan(reference)) self.__reference = reference self.__period = period @@ -41,7 +40,7 @@ class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer): self.__last_ts = 0 @property - def reference(self) -> PupillFeatures.PupillDiameter: + def reference(self) -> PupilFeatures.PupilDiameter: """Get workload index reference.""" return self.__reference @@ -51,15 +50,15 @@ class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer): return self.__period @DataFeatures.PipelineStepMethod - def analyze(self, pupill_diameter: PupillFeatures.PupillDiameter) -> float: - """Analyze workload index from successive timestamped pupill diameters.""" + def analyze(self, pupil_diameter: PupilFeatures.PupilDiameter) -> float: + """Analyze workload index from successive timestamped pupil diameters.""" - # Ignore non valid pupill diameter - if not math.isnan(pupill_diameter): + # Ignore non valid pupil diameter + if not math.isnan(pupil_diameter): return None - if pupill_diameter.timestamp - self.__last_ts >= self.__period: + if pupil_diameter.timestamp - self.__last_ts >= self.__period: if self.__variations_number > 0 and self.__reference.value > 0.: @@ -69,14 +68,14 @@ class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer): workload_index = 0. - self.__variations_sum = pupill_diameter.value - self.__reference.value + self.__variations_sum = pupil_diameter.value - self.__reference.value self.__variations_number = 1 - self.__last_ts = pupill_diameter.timestamp + self.__last_ts = pupil_diameter.timestamp return workload_index else: - self.__variations_sum += pupill_diameter.value - self.__reference.value + self.__variations_sum += pupil_diameter.value - self.__reference.value self.__variations_number += 1
\ No newline at end of file diff --git a/src/argaze/PupilAnalysis/__init__.py b/src/argaze/PupilAnalysis/__init__.py new file mode 100644 index 0000000..c563968 --- /dev/null +++ b/src/argaze/PupilAnalysis/__init__.py @@ -0,0 +1,4 @@ +""" +Class interface to work with various pupil analysis algorithms. +""" +__all__ = ['WorkloadIndex']
\ No newline at end of file diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupilFeatures.py index 70ffb31..c38d10a 100644 --- a/src/argaze/PupillFeatures.py +++ b/src/argaze/PupilFeatures.py @@ -16,16 +16,16 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import json import math from argaze import DataFeatures -class PupillDiameter(float, DataFeatures.TimestampedObject): - """Define pupill diameter as a single float value. + +class PupilDiameter(float, DataFeatures.TimestampedObject): + """Define pupil diameter as a single float value. Parameters: - value: pupill diameter value. + value: pupil diameter value. """ def __new__(cls, value: float = math.nan, **kwargs): @@ -37,47 +37,47 @@ class PupillDiameter(float, DataFeatures.TimestampedObject): @property def value(self): - """Get pupill diameter value.""" + """Get pupil diameter value.""" return float(self) -class TimeStampedPupillDiameters(DataFeatures.TimestampedObjectsList): - """Handle timestamped pupill diamters into a list.""" +class TimeStampedPupilDiameters(DataFeatures.TimestampedObjectsList): + """Handle timestamped pupil diameters into a list.""" - def __init__(self, pupill_diameters: list = []): + def __init__(self, pupil_diameters: list = []): - DataFeatures.TimestampedObjectsList.__init__(self, PupillDiameter, pupill_diameters) + DataFeatures.TimestampedObjectsList.__init__(self, PupilDiameter, pupil_diameters) -class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): - """Abstract class to define what should provide a pupill diameter analyser.""" +class PupilDiameterAnalyzer(DataFeatures.PipelineStepObject): + """Abstract class to define what should provide a pupil diameter analyser.""" @DataFeatures.PipelineStepMethod - def analyze(self, pupill_diameter: PupillDiameter) -> any: - """Analyze pupill diameter from successive timestamped pupill diameters.""" + def analyze(self, pupil_diameter: PupilDiameter) -> any: + """Analyze pupil diameter from successive timestamped pupil diameters.""" raise NotImplementedError('analyze() method not implemented') - def browse(self, ts_pupill_diameters: TimeStampedPupillDiameters) -> list: - """Analyze by browsing timestamped pupill diameters. + def browse(self, ts_pupil_diameters: TimeStampedPupilDiameters) -> list: + """Analyze by browsing timestamped pupil diameters. Parameters: - ts_pupill_diameters: list of timestamped pupill diameters. + ts_pupil_diameters: list of timestamped pupil diameters. Returns: ts_analysis: list of (timestamp, analysis). """ - assert(type(ts_pupill_diameters) == TimeStampedPupillDiameters) + assert(type(ts_pupil_diameters) == TimeStampedPupilDiameters) # TODO: Have TimestampedDataDictionary and TimestampedDataDictionaryList classes? - ts_analyzis = [] + ts_analysis = [] - # Iterate on pupill diameters - for pupill_diameter in ts_pupill_diameters: + # Iterate on pupil diameters + for pupil_diameter in ts_pupil_diameters: - analysis = self.analyze(pupill_diameter.timestamp, pupill_diameter) + analysis = self.analyze(pupil_diameter.timestamp, pupil_diameter) if analysis is not None: - ts_analyzis.append((pupill_diameter.timestamp, analysis)) + ts_analysis.append((pupil_diameter.timestamp, analysis)) - return ts_analyzis + return ts_analysis diff --git a/src/argaze/PupillAnalysis/__init__.py b/src/argaze/PupillAnalysis/__init__.py deleted file mode 100644 index 18f0f15..0000000 --- a/src/argaze/PupillAnalysis/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -""" -Class interface to work with various pupill analysis algorithms. -""" -__all__ = ['WorkloadIndex']
\ No newline at end of file diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py index f29c5d3..bd01eda 100644 --- a/src/argaze/__init__.py +++ b/src/argaze/__init__.py @@ -1,4 +1,4 @@ """ ArGaze is divided in submodules dedicated to various specifics features. """ -__all__ = ['ArUcoMarkers','AreaOfInterest','ArFeatures','GazeFeatures','GazeAnalysis','PupillFeatures','PupillAnalysis','DataFeatures','utils']
\ No newline at end of file +__all__ = ['ArUcoMarkers','AreaOfInterest','ArFeatures','GazeFeatures','GazeAnalysis','PupilFeatures','PupilAnalysis','DataFeatures','utils']
\ No newline at end of file diff --git a/src/argaze/__main__.py b/src/argaze/__main__.py index 0184d69..9adda75 100644 --- a/src/argaze/__main__.py +++ b/src/argaze/__main__.py @@ -59,7 +59,7 @@ with from_json(args.context_file) as context: # Display context cv2.imshow(context.name, context.image()) - # Head-monted eye tracker case: display environment frames image + # Head-mounted eye tracker case: display environment frames image if issubclass(type(context.pipeline), ArCamera): for scene_frame in context.pipeline.scene_frames(): diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py index f38d041..3f2ceda 100644 --- a/src/argaze/utils/UtilsFeatures.py +++ b/src/argaze/utils/UtilsFeatures.py @@ -77,36 +77,6 @@ def import_from_test_package(module: str) -> types.ModuleType: return TestModule -class ExitSignalHandler(): - """ - Handle exit event - """ - - def __init__(self): - - import signal - import threading - - global __exit_event - global __on_exit_signal - - __exit_event = threading.Event() - - def __on_exit_signal(signo, _frame): - __exit_event.set() - - for sig in ('TERM', 'HUP', 'INT'): - signal.signal(getattr(signal, 'SIG'+sig), __on_exit_signal) - - def status(self) -> bool: - """ - Get exit status. - - Returns: - exit status - """ - return __exit_event.is_set() - class TimeProbe(): """ Assess temporal performance. @@ -121,8 +91,11 @@ class TimeProbe(): Start chronometer. """ + # noinspection PyAttributeOutsideInit self.__last_time = time.perf_counter() + # noinspection PyAttributeOutsideInit self.__lap_counter = 0 + # noinspection PyAttributeOutsideInit self.__elapsed_time = 0 def lap(self) -> tuple[float, int, float]: @@ -137,6 +110,7 @@ class TimeProbe(): lap_time = time.perf_counter() - self.__last_time + # noinspection PyAttributeOutsideInit self.__last_time = time.perf_counter() self.__lap_counter += 1 self.__elapsed_time += lap_time @@ -174,6 +148,7 @@ def PrintCallStack(method): """Wrap method to print call stack before its call. Parameters: + self: args: method arguments. kwargs: extra arguments. """ diff --git a/src/argaze/utils/__init__.py b/src/argaze/utils/__init__.py index a2322bb..2cee626 100644 --- a/src/argaze/utils/__init__.py +++ b/src/argaze/utils/__init__.py @@ -1,4 +1,4 @@ """ -Miscelleaneous utilities. +Miscellaneous utilities. """ -__all__ = ['UtilsFeatures', 'Providers']
\ No newline at end of file +__all__ = ['UtilsFeatures', 'contexts']
\ No newline at end of file diff --git a/src/argaze/utils/aruco_markers_group_export.py b/src/argaze/utils/aruco_markers_group_export.py index dc1f673..46507b8 100644 --- a/src/argaze/utils/aruco_markers_group_export.py +++ b/src/argaze/utils/aruco_markers_group_export.py @@ -19,15 +19,14 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import argparse -import time -import itertools +import contextlib +import cv2 + +from argaze import DataFeatures from argaze.ArUcoMarkers import ArUcoDetector, ArUcoOpticCalibrator, ArUcoMarkersGroup from argaze.utils import UtilsFeatures -import cv2 -import numpy - def main(): """ Detect DICTIONARY and SIZE ArUco markers inside a MOVIE frame then, export detected ArUco markers group as .obj file into an OUTPUT folder. @@ -69,7 +68,7 @@ def main(): configuration["optic_parameters"] = args.optic_parameters # Load ArUco detector configuration - aruco_detector = ArUcoDetector.ArUcoDetector.from_dict(configuration, '.') + aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration) if args.verbose: @@ -89,131 +88,131 @@ def main(): # Create a window cv2.namedWindow("Export detected ArUco markers", cv2.WINDOW_AUTOSIZE) - # Enable exit signal handler - exit = UtilsFeatures.ExitSignalHandler() - # Init image selection current_image_index = -1 _, current_image = video_capture.read() next_image_index = int(args.start * video_fps) refresh = False - while not exit.status(): + # Waiting for 'ctrl+C' interruption + with contextlib.suppress(KeyboardInterrupt): - # Select a new image and detect markers once - if next_image_index != current_image_index or refresh: + while True: - video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index) + # Select a new image and detect markers once + if next_image_index != current_image_index or refresh: - success, video_image = video_capture.read() + video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index) - video_height, video_width, _ = video_image.shape + success, video_image = video_capture.read() - # Create default optic parameters adapted to frame size - if aruco_detector.optic_parameters is None: + video_height, video_width, _ = video_image.shape - # Note: The choice of 1000 for default focal length should be discussed... - aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=video_width, height=video_height)) + # Create default optic parameters adapted to frame size + if aruco_detector.optic_parameters is None: - if success: + # Note: The choice of 1000 for default focal length should be discussed... + aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=video_width, height=video_height)) - # Refresh once - refresh = False + if success: - current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1 - current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC) + # Refresh once + refresh = False - try: + current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1 + current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC) - # Detect and project AR features - aruco_detector.detect_markers(video_image) + try: - # Estimate all detected markers pose - aruco_detector.estimate_markers_pose(args.size) + # Detect and project AR features + aruco_detector.detect_markers(video_image) - # Build aruco scene from detected markers - aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, aruco_detector.detected_markers()) + # Estimate all detected markers pose + aruco_detector.estimate_markers_pose(args.size) - # Detection suceeded - exception = None + # Build aruco scene from detected markers + aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, aruco_detector.detected_markers()) - # Write errors - except Exception as e: + # Detection succeeded + exception = None - aruco_markers_group = None + # Write errors + except Exception as e: - exception = e - - # Draw detected markers - aruco_detector.draw_detected_markers(video_image, draw_parameters) + aruco_markers_group = None - # Write detected markers - cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', (20, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - - # Write timing - cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - - # Write exception - if exception is not None: + exception = e - cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + # Draw detected markers + aruco_detector.draw_detected_markers(video_image, draw_parameters) - # Write documentation - cv2.putText(video_image, f'<- previous image', (video_width-500, video_height-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'-> next image', (video_width-500, video_height-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'r: reload config', (video_width-500, video_height-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width-500, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - - # Copy image - current_image = video_image.copy() + # Write detected markers + cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', (20, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - # Keep last image - else: + # Write timing + cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - video_image = current_image.copy() + # Write exception + if exception is not None: - key_pressed = cv2.waitKey(10) + cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - #if key_pressed != -1: - # print(key_pressed) + # Write documentation + cv2.putText(video_image, f'<- previous image', (video_width-500, video_height-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'-> next image', (video_width-500, video_height-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'r: reload config', (video_width-500, video_height-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width-500, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - # Select previous image with left arrow - if key_pressed == 2: - next_image_index -= 1 + # Copy image + current_image = video_image.copy() - # Select next image with right arrow - if key_pressed == 3: - next_image_index += 1 + # Keep last image + else: - # Clip image index - if next_image_index < 0: - next_image_index = 0 + video_image = current_image.copy() - # r: reload configuration - if key_pressed == 114: - - aruco_detector = ArUcoDetector.ArUcoDetector.from_dict(configuration) - refresh = True - print('Configuration reloaded') + key_pressed = cv2.waitKey(10) - # Save selected marker edition using 'Ctrl + s' - if key_pressed == 19: + #if key_pressed != -1: + # print(key_pressed) - if aruco_markers_group: + # Select previous image with left arrow + if key_pressed == 2: + next_image_index -= 1 - aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj') - print(f'ArUco markers saved into {args.output}') + # Select next image with right arrow + if key_pressed == 3: + next_image_index += 1 - else: + # Clip image index + if next_image_index < 0: + next_image_index = 0 + + # r: reload configuration + if key_pressed == 114: + + aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration) + refresh = True + print('Configuration reloaded') + + # Save selected marker edition using 'Ctrl + s' + if key_pressed == 19: + + if aruco_markers_group: + + aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj') + print(f'ArUco markers saved into {args.output}') + + else: - print(f'No ArUco markers to export') + print(f'No ArUco markers to export') - # Close window using 'Esc' key - if key_pressed == 27: - break + # Close window using 'Esc' key + if key_pressed == 27: + break - # Display video - cv2.imshow(aruco_detector.name, video_image) + # Display video + cv2.imshow(aruco_detector.name, video_image) # Close movie capture video_capture.release() diff --git a/src/argaze/utils/contexts/OpenCV.py b/src/argaze/utils/contexts/OpenCV.py index 25b3dd7..f89189d 100644 --- a/src/argaze/utils/contexts/OpenCV.py +++ b/src/argaze/utils/contexts/OpenCV.py @@ -16,16 +16,14 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import sys import logging import time -from argaze import ArFeatures, DataFeatures, GazeFeatures -from argaze.utils import UtilsFeatures - -import numpy import cv2 +from argaze import ArFeatures, DataFeatures + + class Window(ArFeatures.ArContext): @DataFeatures.PipelineStepInit diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py index c051f7f..f83c1ac 100644 --- a/src/argaze/utils/contexts/TobiiProGlasses2.py +++ b/src/argaze/utils/contexts/TobiiProGlasses2.py @@ -17,17 +17,17 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import sys -import os +import collections +import datetime +import gzip +import json import logging +import math +import os import socket +import sys import threading -import collections -import json import time -import math -import gzip -import datetime import uuid from dataclasses import dataclass @@ -41,8 +41,7 @@ except ImportError: from urllib import urlencode from urllib2 import urlopen, Request, HTTPError, URLError -from argaze import ArFeatures, DataFeatures, GazeFeatures -from argaze.utils import UtilsFeatures +from argaze import ArFeatures, DataFeatures import numpy import cv2 @@ -131,8 +130,8 @@ class Gyroscope(): """Gyroscope value""" @dataclass -class PupillCenter(): - """Define pupill center data (gidx pc eye).""" +class PupilCenter(): + """Define pupil center data (gidx pc eye).""" validity: int index: int @@ -140,8 +139,8 @@ class PupillCenter(): eye: str # 'right' or 'left' @dataclass -class PupillDiameter(): - """Define pupill diameter data (gidx pd eye).""" +class PupilDiameter(): + """Define pupil diameter data (gidx pd eye).""" validity: int index: int @@ -193,13 +192,13 @@ class TobiiJsonDataParser(): 'ets': self.__parse_event, 'ac': self.__parse_accelerometer, 'gy': self.__parse_gyroscope, - 'gidx': self.__parse_pupill_or_gaze, + 'gidx': self.__parse_pupil_or_gaze, 'marker3d': self.__parse_marker_position } - self.__parse_pupill_or_gaze_map = { - 'pc': self.__parse_pupill_center, - 'pd': self.__parse_pupill_diameter, + self.__parse_pupil_or_gaze_map = { + 'pc': self.__parse_pupil_center, + 'pd': self.__parse_pupil_diameter, 'gd': self.__parse_gaze_direction, 'l': self.__parse_gaze_position, 'gp3': self.__parse_gaze_position_3d @@ -237,14 +236,14 @@ class TobiiJsonDataParser(): return data_object, data_object_type - def __parse_pupill_or_gaze(self, status, data): + def __parse_pupil_or_gaze(self, status, data): gaze_index = data.pop('gidx') - # parse pupill or gaze data depending second json key + # parse pupil or gaze data depending second json key second_key = next(iter(data)) - return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, data) + return self.__parse_pupil_or_gaze_map[second_key](status, gaze_index, data) def __parse_dir_sig(self, status, data): @@ -283,13 +282,13 @@ class TobiiJsonDataParser(): return Gyroscope(data['gy']) - def __parse_pupill_center(self, status, gaze_index, data): + def __parse_pupil_center(self, status, gaze_index, data): - return PupillCenter(status, gaze_index, data['pc'], data['eye']) + return PupilCenter(status, gaze_index, data['pc'], data['eye']) - def __parse_pupill_diameter(self, status, gaze_index, data): + def __parse_pupil_diameter(self, status, gaze_index, data): - return PupillDiameter(status, gaze_index, data['pd'], data['eye']) + return PupilDiameter(status, gaze_index, data['pd'], data['eye']) def __parse_gaze_direction(self, status, gaze_index, data): @@ -312,7 +311,7 @@ class LiveStream(ArFeatures.ArContext): @DataFeatures.PipelineStepInit def __init__(self, **kwargs): - # Init ArContext classe + # Init ArContext class super().__init__() # Init private attributes @@ -356,6 +355,7 @@ class LiveStream(ArFeatures.ArContext): else: + # noinspection PyAttributeOutsideInit self.__base_url = 'http://' + self.__address @property @@ -588,7 +588,7 @@ class LiveStream(ArFeatures.ArContext): @DataFeatures.PipelineStepImage def image(self, draw_something: bool = None, **kwargs: dict) -> numpy.array: - """Get Tobbi visualisation. + """Get Tobii visualisation. Parameters: draw_something: example @@ -886,52 +886,6 @@ class LiveStream(ArFeatures.ArContext): # CALIBRATION - def calibration_start(self, project_name, participant_name): - """Start calibration process for project and participant.""" - - project_id = self.__get_project_id(project_name) - participant_id = self.get_participant_id(participant_name) - - # Init calibration id - self.__calibration_id = None - - # Calibration have to be done for a project and a participant - if project_id is None or participant_id is None: - - raise Exception(f'Setup project and participant before') - - data = { - 'ca_project': project_id, - 'ca_type': 'default', - 'ca_participant': participant_id, - 'ca_created': self.__get_current_datetime() - } - - # Request calibration - json_data = self.__post_request('/api/calibrations', data) - self.__calibration_id = json_data['ca_id'] - - # Start calibration - self.__post_request('/api/calibrations/' + self.__calibration_id + '/start') - - def calibration_status(self) -> str: - """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed.""" - - if self.__calibration_id is not None: - - status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) - - # Forget calibration id - if status != 'calibrating': - - self.__calibration_id = None - - return status - - else: - - raise Exception(f'Start calibration before') - def calibrate(self, project_name, participant_name): """Handle whole Tobii glasses calibration process.""" @@ -975,6 +929,7 @@ class LiveStream(ArFeatures.ArContext): # Request calibration json_data = self.__post_request('/api/calibrations', data) + # noinspection PyAttributeOutsideInit self.__calibration_id = json_data['ca_id'] # Start calibration @@ -990,6 +945,7 @@ class LiveStream(ArFeatures.ArContext): # Forget calibration id if status != 'calibrating': + # noinspection PyAttributeOutsideInit self.__calibration_id = None return status @@ -998,24 +954,6 @@ class LiveStream(ArFeatures.ArContext): raise Exception(f'Start calibration before') - def calibrate(self, project_name, participant_name): - """Handle whole Tobii glasses calibration process.""" - - # Start calibration - self.calibration_start(project_name, participant_name) - - # While calibrating... - status = self.calibration_status() - - while status == 'calibrating': - - time.sleep(1) - status = self.calibration_status() - - if status == 'uncalibrated' or status == 'stale' or status == 'failed': - - raise Exception(f'Calibration {status}') - # RECORDING FEATURES def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']): @@ -1086,7 +1024,7 @@ class LiveStream(ArFeatures.ArContext): return False def get_recordings(self) -> str: - """Get all recordings id.""" + """Get all recordings' id.""" return self.__get_request('/api/recordings') @@ -1185,7 +1123,7 @@ class PostProcessing(ArFeatures.ArContext): @DataFeatures.PipelineStepInit def __init__(self, **kwargs): - # Init ArContext classe + # Init ArContext class super().__init__() # Init private attributes @@ -1202,8 +1140,8 @@ class PostProcessing(ArFeatures.ArContext): 'Event': 0, 'Accelerometer': 0, 'Gyroscope': 0, - 'PupillCenter': 0, - 'PupillDiameter': 0, + 'PupilCenter': 0, + 'PupilDiameter': 0, 'GazeDirection': 0, 'GazePosition': 0, 'GazePosition3D': 0, |