diff options
61 files changed, 1728 insertions, 1734 deletions
@@ -645,7 +645,7 @@ the "copyright" line and a pointer to where the full notice is found. GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. + along with this program. If not, see <https://www.gnu.org/licenses/>. Also add information on how to contact you by electronic and paper mail. @@ -664,7 +664,7 @@ might be different; for a GUI interface, you would use an "about box". You should also get your employer (if you work as a programmer) or school, if any, to sign a "copyright disclaimer" for the program, if necessary. For more information on this, and how to apply and follow the GNU GPL, see -<http://www.gnu.org/licenses/>. +<https://www.gnu.org/licenses/>. The GNU General Public License does not permit incorporating your program into proprietary programs. If your program is a subroutine library, you diff --git a/src/argaze.test/ArUcoMarkers/ArUcoBoard.py b/src/argaze.test/ArUcoMarkers/ArUcoBoard.py index f3ce194..0bfa568 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoBoard.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoBoard.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py index 5864465..091383e 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/ArUcoMarkers/ArUcoDetector.py b/src/argaze.test/ArUcoMarkers/ArUcoDetector.py index 7babb94..62e8a09 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoDetector.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoDetector.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/ArUcoMarkers/ArUcoMarker.py b/src/argaze.test/ArUcoMarkers/ArUcoMarker.py index bb83a51..de88623 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoMarker.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoMarker.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/ArUcoMarkers/ArUcoMarkersDictionary.py b/src/argaze.test/ArUcoMarkers/ArUcoMarkersDictionary.py index b8156ba..7a5e9e8 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoMarkersDictionary.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoMarkersDictionary.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/ArUcoMarkers/ArUcoOpticCalibrator.py b/src/argaze.test/ArUcoMarkers/ArUcoOpticCalibrator.py index b51647e..79d2ead 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoOpticCalibrator.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoOpticCalibrator.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/ArUcoMarkers/ArUcoScene.py b/src/argaze.test/ArUcoMarkers/ArUcoScene.py index 68c8d4d..f29b1d3 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoScene.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/AreaOfInterest/AOI2DScene.py b/src/argaze.test/AreaOfInterest/AOI2DScene.py index daa6431..ae9e6b2 100644 --- a/src/argaze.test/AreaOfInterest/AOI2DScene.py +++ b/src/argaze.test/AreaOfInterest/AOI2DScene.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/AreaOfInterest/AOI3DScene.py b/src/argaze.test/AreaOfInterest/AOI3DScene.py index f16b1f5..591b5df 100644 --- a/src/argaze.test/AreaOfInterest/AOI3DScene.py +++ b/src/argaze.test/AreaOfInterest/AOI3DScene.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/AreaOfInterest/AOIFeatures.py b/src/argaze.test/AreaOfInterest/AOIFeatures.py index 9b93a9f..25fdca9 100644 --- a/src/argaze.test/AreaOfInterest/AOIFeatures.py +++ b/src/argaze.test/AreaOfInterest/AOIFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/DataFeatures.py b/src/argaze.test/DataFeatures.py index 04163be..a0031bc 100644 --- a/src/argaze.test/DataFeatures.py +++ b/src/argaze.test/DataFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index 87f439f..9326060 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeAnalysis/Entropy.py b/src/argaze.test/GazeAnalysis/Entropy.py index 2b09b27..0bfd7b8 100644 --- a/src/argaze.test/GazeAnalysis/Entropy.py +++ b/src/argaze.test/GazeAnalysis/Entropy.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeAnalysis/ExploreExploitRatio.py b/src/argaze.test/GazeAnalysis/ExploreExploitRatio.py index ceef343..0c2eb96 100644 --- a/src/argaze.test/GazeAnalysis/ExploreExploitRatio.py +++ b/src/argaze.test/GazeAnalysis/ExploreExploitRatio.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeAnalysis/KCoefficient.py b/src/argaze.test/GazeAnalysis/KCoefficient.py index d6af6b6..3c79cc3 100644 --- a/src/argaze.test/GazeAnalysis/KCoefficient.py +++ b/src/argaze.test/GazeAnalysis/KCoefficient.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeAnalysis/LempelZivComplexity.py b/src/argaze.test/GazeAnalysis/LempelZivComplexity.py index 13e44c0..91d7df1 100644 --- a/src/argaze.test/GazeAnalysis/LempelZivComplexity.py +++ b/src/argaze.test/GazeAnalysis/LempelZivComplexity.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeAnalysis/NGram.py b/src/argaze.test/GazeAnalysis/NGram.py index 54986fb..69c06cd 100644 --- a/src/argaze.test/GazeAnalysis/NGram.py +++ b/src/argaze.test/GazeAnalysis/NGram.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeAnalysis/NearestNeighborIndex.py b/src/argaze.test/GazeAnalysis/NearestNeighborIndex.py index 080a28c..3325a2a 100644 --- a/src/argaze.test/GazeAnalysis/NearestNeighborIndex.py +++ b/src/argaze.test/GazeAnalysis/NearestNeighborIndex.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeAnalysis/TransitionMatrix.py b/src/argaze.test/GazeAnalysis/TransitionMatrix.py index c1d3ee2..5aeef94 100644 --- a/src/argaze.test/GazeAnalysis/TransitionMatrix.py +++ b/src/argaze.test/GazeAnalysis/TransitionMatrix.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py index 55a1932..39a8834 100644 --- a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index 35574bd..74cbc12 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/OpenCVCuda.py b/src/argaze.test/OpenCVCuda.py index 2689db8..e552436 100644 --- a/src/argaze.test/OpenCVCuda.py +++ b/src/argaze.test/OpenCVCuda.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/PupillAnalysis/WorkloadIndex.py b/src/argaze.test/PupillAnalysis/WorkloadIndex.py index da9b72d..ebb043a 100644 --- a/src/argaze.test/PupillAnalysis/WorkloadIndex.py +++ b/src/argaze.test/PupillAnalysis/WorkloadIndex.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze.test/PupillFeatures.py b/src/argaze.test/PupillFeatures.py index b0cf65d..085973b 100644 --- a/src/argaze.test/PupillFeatures.py +++ b/src/argaze.test/PupillFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 56a941d..32d1542 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" @@ -16,24 +16,18 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Iterator, Union import logging -import json -import os -import sys -import importlib -import threading -import time import math +import os +from typing import Iterator, Union + +import cv2 +import numpy from argaze import DataFeatures, GazeFeatures from argaze.AreaOfInterest import * -from argaze.GazeAnalysis import * from argaze.utils import UtilsFeatures -import numpy -import cv2 - class PoseEstimationFailed(Exception): """ @@ -129,12 +123,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @aoi_scene.setter def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene | str | dict): + new_aoi_scene = None + if issubclass(type(aoi_scene_value), AOIFeatures.AOIScene): new_aoi_scene = aoi_scene_value # str: relative path to file - elif type(aoi_scene_value) == str: + elif type(aoi_scene_value) is str: filepath = os.path.join(DataFeatures.get_working_directory(), aoi_scene_value) file_format = filepath.split('.')[-1] @@ -154,10 +150,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath) # dict: - elif type(aoi_scene_value) == dict: + elif type(aoi_scene_value) is dict: new_aoi_scene = AOIFeatures.AOIScene.from_dict(aoi_scene_value) + else: + + raise ValueError("Bad aoi scene value") + # Cast aoi scene to its effective dimension if new_aoi_scene.dimension == 2: @@ -213,6 +213,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """AOI scan path analyzers list.""" return self.__aoi_scan_path_analyzers + # noinspection PyUnresolvedReferences @aoi_scan_path_analyzers.setter @DataFeatures.PipelineStepAttributeSetter def aoi_scan_path_analyzers(self, aoi_scan_path_analyzers: list): @@ -245,7 +246,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): for a in self.__aoi_scan_path_analyzers: - if type(a) == property_type: + if type(a) is property_type: setattr(analyzer, name, a) found = True @@ -254,7 +255,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation - if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None: + if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path is None: self.__aoi_scan_path = GazeFeatures.ScanPath() # Edit parent @@ -311,7 +312,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__aoi_scan_path.expected_aoi = expected_aoi @DataFeatures.PipelineStepMethod - def look(self, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): + def look(self, gaze_movement: GazeFeatures.GazeMovement = None): """ Project timestamped gaze movement into layer. @@ -321,6 +322,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Parameters: gaze_movement: gaze movement to project """ + if gaze_movement is None: + gaze_movement = GazeFeatures.GazeMovement() # Use layer lock feature with self._lock: @@ -383,8 +386,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Parameters: image: image where to draw. - draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn) - draw_aoi_matching: AOIMatcher.draw parameters (which depends on the loaded aoi matcher module, + draw_aoi_scene: [AOI2DScene.draw][argaze.AreaOfInterest.AOI2DScene.draw] parameters (if None, no aoi scene is drawn) + draw_aoi_matching: [AOIMatcher.draw][argaze.GazeFeatures.AOIMatcher.draw] parameters (which depends on the loaded aoi matcher module, if None, no aoi matching is drawn) """ @@ -530,6 +533,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """Scan path analyzers list.""" return self.__scan_path_analyzers + # noinspection PyUnresolvedReferences @scan_path_analyzers.setter @DataFeatures.PipelineStepAttributeSetter def scan_path_analyzers(self, scan_path_analyzers: list): @@ -562,7 +566,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): for a in self.__scan_path_analyzers: - if type(a) == property_type: + if type(a) is property_type: setattr(analyzer, name, a) found = True @@ -571,7 +575,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation - if len(self.__scan_path_analyzers) > 0 and self.__scan_path == None: + if len(self.__scan_path_analyzers) > 0 and self.__scan_path is None: self.__scan_path = GazeFeatures.ScanPath() # Edit parent @@ -739,8 +743,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Analyze aoi scan path for scan_path_analyzer in self.__scan_path_analyzers: - scan_path_analyzer.analyze(self.__scan_path, - timestamp=self.__identified_gaze_movement.timestamp) + scan_path_analyzer.analyze(self.__scan_path, timestamp=self.__identified_gaze_movement.timestamp) # Update scan path analyzed state self.__scan_path_analyzed = True @@ -756,8 +759,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image - self.__heatmap.update(self.__calibrated_gaze_position * scale, - timestamp=self.__calibrated_gaze_position.timestamp) + self.__heatmap.update(self.__calibrated_gaze_position * scale, timestamp=self.__calibrated_gaze_position.timestamp) # Look layers with valid identified gaze movement # Note: don't filter valid/invalid finished/unfinished gaze movement to allow layers to reset internally @@ -765,9 +767,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): layer.look(self.__identified_gaze_movement) @DataFeatures.PipelineStepImage - def image(self, background_weight: float = None, heatmap_weight: float = None, - draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, - draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: + def image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. @@ -901,12 +901,12 @@ class ArScene(DataFeatures.PipelineStepObject): for layer_name, layer_data in layers.items(): - if type(layer_data) == dict: + if type(layer_data) is dict: self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # str: relative path to JSON file - elif type(layer_data) == str: + elif type(layer_data) is str: self._layers[layer_name] = DataFeatures.from_json( os.path.join(DataFeatures.get_working_directory(), layer_data)) @@ -929,18 +929,22 @@ class ArScene(DataFeatures.PipelineStepObject): for frame_name, frame_data in frames.items(): - if type(frame_data) == dict: + if type(frame_data) is dict: new_frame = ArFrame(name=frame_name, **frame_data) # str: relative path to JSON file - elif type(frame_data) == str: + elif type(frame_data) is str: new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data)) # Loaded frame name have to be equals to dictionary key assert (new_frame.name == frame_name) + else: + + raise ValueError("Bad frame data.") + # Look for a scene layer with an AOI named like the frame for scene_layer_name, scene_layer in self.layers.items(): @@ -1013,8 +1017,7 @@ class ArScene(DataFeatures.PipelineStepObject): raise NotImplementedError('estimate_pose() method not implemented') @DataFeatures.PipelineStepMethod - def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> \ - Iterator[Union[str, AOI2DScene.AOI2DScene]]: + def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]: """Project layers according estimated pose and optional field of view clipping angles. Parameters: @@ -1050,6 +1053,7 @@ class ArScene(DataFeatures.PipelineStepObject): aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene + # noinspection PyUnresolvedReferences yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) @@ -1246,15 +1250,13 @@ class ArCamera(ArFrame): inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position) # QUESTION: How to project gaze precision? - inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), - timestamp=timestamped_gaze_position.timestamp) + inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp) # Project inner gaze position into scene frame scene_frame.look(inner_gaze_position * scene_frame.size) # Ignore missing aoi in camera frame layer projection - except KeyError as e: - + except KeyError: pass @DataFeatures.PipelineStepMethod @@ -1375,8 +1377,7 @@ class ArContext(DataFeatures.PipelineStepObject): """Exit from ArContext.""" pass - def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None, - precision: int | float = None): + def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None, precision: int | float = None): """Request pipeline to process new gaze position at a timestamp.""" logging.debug('ArContext._process_gaze_position %s', self.name) @@ -1395,14 +1396,12 @@ class ArContext(DataFeatures.PipelineStepObject): if x is None and y is None: # Edit empty gaze position - self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp), - catch_exceptions=self.__catch_exceptions) + self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp), catch_exceptions=self.__catch_exceptions) else: # Edit gaze position - self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp), - catch_exceptions=self.__catch_exceptions) + self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp), catch_exceptions=self.__catch_exceptions) except DataFeatures.TimestampedException as e: @@ -1439,8 +1438,7 @@ class ArContext(DataFeatures.PipelineStepObject): logging.debug('\t> watch image (%i x %i)', width, height) - self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp), - catch_exceptions=self.__catch_exceptions) + self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp), catch_exceptions=self.__catch_exceptions) # TODO: make this step optional self.__pipeline.map(timestamp=timestamp, catch_exceptions=self.__catch_exceptions) @@ -1477,8 +1475,7 @@ class ArContext(DataFeatures.PipelineStepObject): if image.is_timestamped(): info_stack += 1 - cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, - (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArCamera): @@ -1491,8 +1488,7 @@ class ArContext(DataFeatures.PipelineStepObject): watch_time = math.nan info_stack += 1 - cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', - (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArFrame): @@ -1505,8 +1501,7 @@ class ArContext(DataFeatures.PipelineStepObject): look_time = math.nan info_stack += 1 - cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', - (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if draw_exceptions: @@ -1515,8 +1510,7 @@ class ArContext(DataFeatures.PipelineStepObject): e = self.__exceptions.pop() i = len(self.__exceptions) - cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - (i) * 50), (0, 0, 127), -1) - cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, - (255, 255, 255), 1, cv2.LINE_AA) + cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - i * 50), (0, 0, 127), -1) + cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) return image diff --git a/src/argaze/ArUcoMarkers/ArUcoBoard.py b/src/argaze/ArUcoMarkers/ArUcoBoard.py index a6d8b02..be475d5 100644 --- a/src/argaze/ArUcoMarkers/ArUcoBoard.py +++ b/src/argaze/ArUcoMarkers/ArUcoBoard.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index bf4e5d3..e61bddc 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py index cd8ff20..f675c8f 100644 --- a/src/argaze/ArUcoMarkers/ArUcoDetector.py +++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" @@ -29,336 +29,336 @@ from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoOpticC class DetectorParameters(): - """Wrapper class around ArUco marker detector parameters. + """Wrapper class around ArUco marker detector parameters. - !!! note - More details on [opencv page](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html) - """ + !!! note + More details on [opencv page](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html) + """ - __parameters = aruco.DetectorParameters() - __parameters_names = [ - 'adaptiveThreshConstant', - 'adaptiveThreshWinSizeMax', - 'adaptiveThreshWinSizeMin', - 'adaptiveThreshWinSizeStep', - 'aprilTagCriticalRad', - 'aprilTagDeglitch', - 'aprilTagMaxLineFitMse', - 'aprilTagMaxNmaxima', - 'aprilTagMinClusterPixels', - 'aprilTagMinWhiteBlackDiff', - 'aprilTagQuadDecimate', - 'aprilTagQuadSigma', - 'cornerRefinementMaxIterations', - 'cornerRefinementMethod', - 'cornerRefinementMinAccuracy', - 'cornerRefinementWinSize', - 'markerBorderBits', - 'minMarkerPerimeterRate', - 'maxMarkerPerimeterRate', - 'minMarkerDistanceRate', - 'detectInvertedMarker', - 'errorCorrectionRate', - 'maxErroneousBitsInBorderRate', - 'minCornerDistanceRate', - 'minDistanceToBorder', - 'minOtsuStdDev', - 'perspectiveRemoveIgnoredMarginPerCell', - 'perspectiveRemovePixelPerCell', - 'polygonalApproxAccuracyRate', - 'useAruco3Detection' - ] + __parameters = aruco.DetectorParameters() + __parameters_names = [ + 'adaptiveThreshConstant', + 'adaptiveThreshWinSizeMax', + 'adaptiveThreshWinSizeMin', + 'adaptiveThreshWinSizeStep', + 'aprilTagCriticalRad', + 'aprilTagDeglitch', + 'aprilTagMaxLineFitMse', + 'aprilTagMaxNmaxima', + 'aprilTagMinClusterPixels', + 'aprilTagMinWhiteBlackDiff', + 'aprilTagQuadDecimate', + 'aprilTagQuadSigma', + 'cornerRefinementMaxIterations', + 'cornerRefinementMethod', + 'cornerRefinementMinAccuracy', + 'cornerRefinementWinSize', + 'markerBorderBits', + 'minMarkerPerimeterRate', + 'maxMarkerPerimeterRate', + 'minMarkerDistanceRate', + 'detectInvertedMarker', + 'errorCorrectionRate', + 'maxErroneousBitsInBorderRate', + 'minCornerDistanceRate', + 'minDistanceToBorder', + 'minOtsuStdDev', + 'perspectiveRemoveIgnoredMarginPerCell', + 'perspectiveRemovePixelPerCell', + 'polygonalApproxAccuracyRate', + 'useAruco3Detection' + ] - def __init__(self, **kwargs): + def __init__(self, **kwargs): - for parameter, value in kwargs.items(): + for parameter, value in kwargs.items(): + setattr(self.__parameters, parameter, value) - setattr(self.__parameters, parameter, value) + self.__dict__.update(kwargs) - self.__dict__.update(kwargs) + def __setattr__(self, parameter, value): - def __setattr__(self, parameter, value): + setattr(self.__parameters, parameter, value) - setattr(self.__parameters, parameter, value) + def __getattr__(self, parameter): - def __getattr__(self, parameter): + return getattr(self.__parameters, parameter) - return getattr(self.__parameters, parameter) + @classmethod + def from_json(cls, json_filepath) -> Self: + """Load detector parameters from .json file.""" - @classmethod - def from_json(cls, json_filepath) -> Self: - """Load detector parameters from .json file.""" + with open(json_filepath) as configuration_file: + return DetectorParameters(**json.load(configuration_file)) - with open(json_filepath) as configuration_file: + def __str__(self) -> str: + """Detector parameters string representation.""" - return DetectorParameters(**json.load(configuration_file)) + return f'{self}' - def __str__(self) -> str: - """Detector parameters string representation.""" + def __format__(self, spec: str) -> str: + """Formated detector parameters string representation. - return f'{self}' + Parameters: + spec: 'modified' to get only modified parameters. + """ - def __format__(self, spec: str) -> str: - """Formated detector parameters string representation. + output = '' - Parameters: - spec: 'modified' to get only modified parameters. - """ + for parameter in self.__parameters_names: - output = '' + if parameter in self.__dict__.keys(): - for parameter in self.__parameters_names: + output += f'\t*{parameter}: {getattr(self.__parameters, parameter)}\n' - if parameter in self.__dict__.keys(): + elif spec == "": - output += f'\t*{parameter}: {getattr(self.__parameters, parameter)}\n' + output += f'\t{parameter}: {getattr(self.__parameters, parameter)}\n' - elif spec == "": + return output - output += f'\t{parameter}: {getattr(self.__parameters, parameter)}\n' + @property + def internal(self): + return self.__parameters - return output - - @property - def internal(self): - return self.__parameters class ArUcoDetector(DataFeatures.PipelineStepObject): - """OpenCV ArUco library wrapper.""" + """OpenCV ArUco library wrapper.""" - # noinspection PyMissingConstructor - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): - """Initialize ArUcoDetector.""" + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + """Initialize ArUcoDetector.""" - # Init private attributes - self.__dictionary = None - self.__optic_parameters = None - self.__parameters = None + # Init private attributes + self.__dictionary = None + self.__optic_parameters = None + self.__parameters = None - # Init detected markers data - self.__detected_markers = {} + # Init detected markers data + self.__detected_markers = {} - # Init detected board data - self.__board = None - self.__board_corners_number = 0 - self.__board_corners = [] - self.__board_corners_ids = [] + # Init detected board data + self.__board = None + self.__board_corners_number = 0 + self.__board_corners = [] + self.__board_corners_ids = [] - @property - def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary: - """ArUco markers dictionary to detect.""" - return self.__dictionary + @property + def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary: + """ArUco markers dictionary to detect.""" + return self.__dictionary - @dictionary.setter - @DataFeatures.PipelineStepAttributeSetter - def dictionary(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary): + @dictionary.setter + @DataFeatures.PipelineStepAttributeSetter + def dictionary(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary): - self.__dictionary = dictionary - - @property - def optic_parameters(self) -> ArUcoOpticCalibrator.OpticParameters: - """Optic parameters to use for ArUco detection into image.""" - return self.__optic_parameters + self.__dictionary = dictionary - @optic_parameters.setter - @DataFeatures.PipelineStepAttributeSetter - def optic_parameters(self, optic_parameters: ArUcoOpticCalibrator.OpticParameters): + @property + def optic_parameters(self) -> ArUcoOpticCalibrator.OpticParameters: + """Optic parameters to use for ArUco detection into image.""" + return self.__optic_parameters - self.__optic_parameters = optic_parameters - - @property - def parameters(self) -> DetectorParameters: - """ArUco detector parameters.""" - return self.__parameters + @optic_parameters.setter + @DataFeatures.PipelineStepAttributeSetter + def optic_parameters(self, optic_parameters: ArUcoOpticCalibrator.OpticParameters): - @parameters.setter - @DataFeatures.PipelineStepAttributeSetter - def parameters(self, parameters: DetectorParameters): + self.__optic_parameters = optic_parameters - self.__parameters = parameters + @property + def parameters(self) -> DetectorParameters: + """ArUco detector parameters.""" + return self.__parameters - @DataFeatures.PipelineStepMethod - def detect_markers(self, image: numpy.array): - """Detect all ArUco markers into an image. + @parameters.setter + @DataFeatures.PipelineStepAttributeSetter + def parameters(self, parameters: DetectorParameters): - !!! danger "DON'T MIRROR IMAGE" - It makes the markers detection to fail. + self.__parameters = parameters - !!! danger "DON'T UNDISTORTED IMAGE" - Camera intrinsic parameters and distortion coefficients are used later during pose estimation. - """ + @DataFeatures.PipelineStepMethod + def detect_markers(self, image: numpy.array): + """Detect all ArUco markers into an image. - # Reset detected markers data - self.__detected_markers, detected_markers_corners, detected_markers_ids = {}, [], [] + !!! danger "DON'T MIRROR IMAGE" + It makes the markers detection to fail. - # Detect markers into gray picture - detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(cv.cvtColor(image, cv.COLOR_BGR2GRAY), self.__dictionary.markers, parameters = self.__parameters.internal) - - # Is there detected markers ? - if len(detected_markers_corners) > 0: + !!! danger "DON'T UNDISTORTED IMAGE" + Camera intrinsic parameters and distortion coefficients are used later during pose estimation. + """ - # Transform markers ids array into list - detected_markers_ids = detected_markers_ids.T[0] + # Reset detected markers data + self.__detected_markers, detected_markers_corners, detected_markers_ids = {}, [], [] - for i, marker_id in enumerate(detected_markers_ids): + # Detect markers into gray picture + detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(cv.cvtColor(image, cv.COLOR_BGR2GRAY), + self.__dictionary.markers, + parameters=self.__parameters.internal) - marker = ArUcoMarker.ArUcoMarker(self.__dictionary, marker_id) - marker.corners = detected_markers_corners[i][0] + # Is there detected markers ? + if len(detected_markers_corners) > 0: - # No pose estimation: call estimate_markers_pose to get one - marker.translation = numpy.empty([0]) - marker.rotation = numpy.empty([0]) - marker.points = numpy.empty([0]) + # Transform markers ids array into list + detected_markers_ids = detected_markers_ids.T[0] - self.__detected_markers[marker_id] = marker + for i, marker_id in enumerate(detected_markers_ids): + marker = ArUcoMarker.ArUcoMarker(self.__dictionary, marker_id) + marker.corners = detected_markers_corners[i][0] - def estimate_markers_pose(self, size: float, ids: list = []): - """Estimate pose detected markers pose considering a marker size. + # No pose estimation: call estimate_markers_pose to get one + marker.translation = numpy.empty([0]) + marker.rotation = numpy.empty([0]) + marker.points = numpy.empty([0]) - Parameters: - size: size of markers in centimeters. - ids: markers id list to select detected markers. - """ + self.__detected_markers[marker_id] = marker - # Is there detected markers ? - if len(self.__detected_markers) > 0: + def estimate_markers_pose(self, size: float, ids: list = []): + """Estimate pose detected markers pose considering a marker size. - # Select all markers by default - if len(ids) == 0: + Parameters: + size: size of markers in centimeters. + ids: markers id list to select detected markers. + """ - ids = self.__detected_markers.keys() + # Is there detected markers ? + if len(self.__detected_markers) > 0: - # Prepare data for aruco.estimatePoseSingleMarkers function - selected_markers_corners = tuple() - selected_markers_ids = [] + # Select all markers by default + if len(ids) == 0: + ids = self.__detected_markers.keys() - for marker_id, marker in self.__detected_markers.items(): + # Prepare data for aruco.estimatePoseSingleMarkers function + selected_markers_corners = tuple() + selected_markers_ids = [] - if marker_id in ids: + for marker_id, marker in self.__detected_markers.items(): - selected_markers_corners += (marker.corners,) - selected_markers_ids.append(marker_id) + if marker_id in ids: + selected_markers_corners += (marker.corners,) + selected_markers_ids.append(marker_id) - # Estimate pose of selected markers - if len(selected_markers_corners) > 0: + # Estimate pose of selected markers + if len(selected_markers_corners) > 0: - markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(selected_markers_corners, size, numpy.array(self.__optic_parameters.K), numpy.array(self.__optic_parameters.D)) + markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(selected_markers_corners, + size, numpy.array( + self.__optic_parameters.K), numpy.array(self.__optic_parameters.D)) - for i, marker_id in enumerate(selected_markers_ids): + for i, marker_id in enumerate(selected_markers_ids): + marker = self.__detected_markers[marker_id] - marker = self.__detected_markers[marker_id] + marker.translation = markers_tvecs[i][0] + marker.rotation, _ = cv.Rodrigues(markers_rvecs[i][0]) + marker.size = size + marker.points = markers_points.reshape(4, 3).dot(marker.rotation) - marker.translation - marker.translation = markers_tvecs[i][0] - marker.rotation, _ = cv.Rodrigues(markers_rvecs[i][0]) - marker.size = size - marker.points = markers_points.reshape(4, 3).dot(marker.rotation) - marker.translation + def detected_markers(self) -> dict[int, ArUcoMarker.ArUcoMarker]: + """Access to detected markers' dictionary.""" - def detected_markers(self) -> dict[int, ArUcoMarker.ArUcoMarker]: - """Access to detected markers' dictionary.""" + return self.__detected_markers - return self.__detected_markers + def detected_markers_number(self) -> int: + """Return detected markers number.""" - def detected_markers_number(self) -> int: - """Return detected markers number.""" + return len(list(self.__detected_markers.keys())) - return len(list(self.__detected_markers.keys())) + def draw_detected_markers(self, image: numpy.array, draw_marker: dict = None): + """Draw detected markers. - def draw_detected_markers(self, image: numpy.array, draw_marker: dict = None): - """Draw detected markers. + Parameters: + image: image where to draw + draw_marker: ArucoMarker.draw parameters (if None, no marker drawn) + """ - Parameters: - image: image where to draw - draw_marker: ArucoMarker.draw parameters (if None, no marker drawn) - """ + if draw_marker is not None: - if draw_marker is not None: + for marker_id, marker in self.__detected_markers.items(): + marker.draw(image, self.__optic_parameters.K, self.__optic_parameters.D, **draw_marker) - for marker_id, marker in self.__detected_markers.items(): + def detect_board(self, image: numpy.array, board, expected_markers_number): + """Detect ArUco markers board in image setting up the number of detected markers needed to agree detection. - marker.draw(image, self.__optic_parameters.K, self.__optic_parameters.D, **draw_marker) - - def detect_board(self, image: numpy.array, board, expected_markers_number): - """Detect ArUco markers board in image setting up the number of detected markers needed to agree detection. + !!! danger "DON'T MIRROR IMAGE" + It makes the markers detection to fail. + """ - !!! danger "DON'T MIRROR IMAGE" - It makes the markers detection to fail. - """ - - # detect markers from gray picture - gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY) - detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers, parameters = self.__parameters.internal) + # detect markers from gray picture + gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY) + detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers, + parameters=self.__parameters.internal) - # if all board markers are detected - if len(detected_markers_corners) == expected_markers_number: + # if all board markers are detected + if len(detected_markers_corners) == expected_markers_number: - self.__board = board - self.__board_corners_number, self.__board_corners, self.__board_corners_ids = aruco.interpolateCornersCharuco(detected_markers_corners, detected_markers_ids, gray, self.__board.model) + self.__board = board + self.__board_corners_number, self.__board_corners, self.__board_corners_ids = aruco.interpolateCornersCharuco( + detected_markers_corners, detected_markers_ids, gray, self.__board.model) - else: + else: - self.__board = None - self.__board_corners_number = 0 - self.__board_corners = [] - self.__board_corners_ids = [] + self.__board = None + self.__board_corners_number = 0 + self.__board_corners = [] + self.__board_corners_ids = [] - def draw_board(self, image: numpy.array): - """Draw detected board corners in image.""" + def draw_board(self, image: numpy.array): + """Draw detected board corners in image.""" - if self.__board != None: + if self.__board is not None: + cv.drawChessboardCorners(image, ((self.__board.size[0] - 1), (self.__board.size[1] - 1)), + self.__board_corners, True) - cv.drawChessboardCorners(image, ((self.__board.size[0] - 1 ), (self.__board.size[1] - 1)), self.__board_corners, True) + def board_corners_number(self) -> int: + """Get detected board corners number.""" - def board_corners_number(self) -> int: - """Get detected board corners number.""" + return self.__board_corners_number - return self.__board_corners_number + def board_corners_identifier(self) -> list[int]: + """Get detected board corners identifier.""" - def board_corners_identifier(self) -> list[int]: - """Get detected board corners identifier.""" + return self.__board_corners_ids - return self.__board_corners_ids + def board_corners(self) -> list: + """Get detected board corners.""" - def board_corners(self) -> list: - """Get detected board corners.""" + return self.__board_corners - return self.__board_corners class Observer(): - """Define ArUcoDetector observer to count how many times detection succeeded and how many times markers are detected.""" - - def __init__(self): - """Initialize marker detection metrics.""" + """Define ArUcoDetector observer to count how many times detection succeeded and how many times markers are detected.""" - self.__try_count = 0 - self.__success_count = 0 - self.__detected_ids = [] + def __init__(self): + """Initialize marker detection metrics.""" - @property - def metrics(self) -> tuple[int, int, dict]: - """Get marker detection metrics. + self.__try_count = 0 + self.__success_count = 0 + self.__detected_ids = [] - Returns: - number of detect function call - dict with number of detection for each marker identifier - """ + @property + def metrics(self) -> tuple[int, int, dict]: + """Get marker detection metrics. - return self.__try_count, self.__success_count, Counter(self.__detected_ids) + Returns: + number of detect function call + dict with number of detection for each marker identifier + """ - def reset(self): - """Reset marker detection metrics.""" + return self.__try_count, self.__success_count, Counter(self.__detected_ids) - self.__try_count = 0 - self.__success_count = 0 - self.__detected_ids = [] + def reset(self): + """Reset marker detection metrics.""" - def on_detect_markers(self, timestamp, aruco_detector, exception): - """Update ArUco markers detection metrics.""" + self.__try_count = 0 + self.__success_count = 0 + self.__detected_ids = [] - self.__try_count += 1 - detected_markers_list = list(aruco_detector.detected_markers().keys()) + def on_detect_markers(self, timestamp, aruco_detector, exception): + """Update ArUco markers detection metrics.""" - if len(detected_markers_list): + self.__try_count += 1 + detected_markers_list = list(aruco_detector.detected_markers().keys()) - self.__success_count += 1 - self.__detected_ids.extend(detected_markers_list) + if len(detected_markers_list): + self.__success_count += 1 + self.__detected_ids.extend(detected_markers_list) diff --git a/src/argaze/ArUcoMarkers/ArUcoMarker.py b/src/argaze/ArUcoMarkers/ArUcoMarker.py index 42cb174..cf573dc 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarker.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarker.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py index 72fc688..613a3c5 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py index a6f7b43..fd33664 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" @@ -33,447 +33,444 @@ T0 = numpy.array([0., 0., 0.]) R0 = numpy.array([[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]]) """Define no rotation matrix.""" + def make_rotation_matrix(x, y, z): + # Create rotation matrix around x-axis + c = numpy.cos(numpy.deg2rad(x)) + s = numpy.sin(numpy.deg2rad(x)) + rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) - # Create rotation matrix around x-axis - c = numpy.cos(numpy.deg2rad(x)) - s = numpy.sin(numpy.deg2rad(x)) - Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) + # Create rotation matrix around y-axis + c = numpy.cos(numpy.deg2rad(y)) + s = numpy.sin(numpy.deg2rad(y)) + ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) - # Create rotation matrix around y-axis - c = numpy.cos(numpy.deg2rad(y)) - s = numpy.sin(numpy.deg2rad(y)) - Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) + # Create rotation matrix around z axis + c = numpy.cos(numpy.deg2rad(z)) + s = numpy.sin(numpy.deg2rad(z)) + rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) - # Create rotation matrix around z axis - c = numpy.cos(numpy.deg2rad(z)) - s = numpy.sin(numpy.deg2rad(z)) - Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) + # Return intrinsic rotation matrix + return rx.dot(ry.dot(rz)) - # Return intrinsic rotation matrix - return Rx.dot(Ry.dot(Rz)) -def is_rotation_matrix(R): +def is_rotation_matrix(mat): + rt = numpy.transpose(mat) + should_be_identity = numpy.dot(rt, mat) + i = numpy.identity(3, dtype=mat.dtype) + n = numpy.linalg.norm(i - should_be_identity) - Rt = numpy.transpose(R) - shouldBeIdentity = numpy.dot(Rt, R) - I = numpy.identity(3, dtype = R.dtype) - n = numpy.linalg.norm(I - shouldBeIdentity) + return n < 1e-3 - return n < 1e-3 @dataclass(frozen=True) -class Place(): - """Define a place as list of corners position and a marker. +class Place: + """Define a place as list of corners position and a marker. + + Parameters: + corners: 3D corners position in group referential. + marker: ArUco marker linked to the place. + """ - Parameters: - corners: 3D corners position in group referential. - marker: ArUco marker linked to the place. - """ + corners: numpy.array + marker: ArUcoMarker.ArUcoMarker - corners: numpy.array - marker: ArUcoMarker.ArUcoMarker class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): - """ - Handle group of ArUco markers as one unique spatial entity and estimate its pose. - """ + """ + Handle group of ArUco markers as one unique spatial entity and estimate its pose. + """ + + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + """Initialize ArUcoMarkersGroup""" + + # Init private attributes + self.marker_size = None + self.__dictionary = None + self.__places = {} + self.__translation = numpy.zeros(3) + self.__rotation = numpy.zeros(3) - # noinspection PyMissingConstructor - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): - """Initialize ArUcoMarkersGroup""" + @property + def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary: + """Expected dictionary of all markers in the group.""" + return self.__dictionary - # Init private attributes - self.marker_size = None - self.__dictionary = None - self.__places = {} - self.__translation = numpy.zeros(3) - self.__rotation = numpy.zeros(3) + @dictionary.setter + def dictionary(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary): - @property - def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary: - """Expected dictionary of all markers in the group.""" - return self.__dictionary + self.__dictionary = dictionary - @dictionary.setter - def dictionary(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary): + @property + def places(self) -> dict: + """Expected markers place.""" + return self.__places - self.__dictionary = dictionary - - @property - def places(self) -> dict: - """Expected markers place.""" - return self.__places + @places.setter + def places(self, places: dict): - @places.setter - def places(self, places: dict): + # Normalize places data + new_places = {} - # Normalize places data - new_places = {} + for identifier, data in places.items(): - for identifier, data in places.items(): + # Convert string identifier to int value + if type(identifier) is str: - # Convert string identifier to int value - if type(identifier) == str: + identifier = int(identifier) - identifier = int(identifier) + # Get translation vector + tvec = numpy.array(data.pop('translation')).astype(numpy.float32) - # Get translation vector - tvec = numpy.array(data.pop('translation')).astype(numpy.float32) + # Check rotation value shape + rvalue = numpy.array(data.pop('rotation')).astype(numpy.float32) - # Check rotation value shape - rvalue = numpy.array(data.pop('rotation')).astype(numpy.float32) + # Rotation matrix + if rvalue.shape == (3, 3): - # Rotation matrix - if rvalue.shape == (3, 3): + rmat = rvalue - rmat = rvalue + # Rotation vector (expected in degree) + elif rvalue.shape == (3,): - # Rotation vector (expected in degree) - elif rvalue.shape == (3,): + rmat = make_rotation_matrix(rvalue[0], rvalue[1], rvalue[2]).astype(numpy.float32) - rmat = make_rotation_matrix(rvalue[0], rvalue[1], rvalue[2]).astype(numpy.float32) + else: - else: + raise ValueError(f'Bad rotation value: {rvalue}') - raise ValueError(f'Bad rotation value: {rvalue}') + assert (is_rotation_matrix(rmat)) - assert(is_rotation_matrix(rmat)) + # Get marker size + size = float(numpy.array(data.pop('size')).astype(numpy.float32)) - # Get marker size - size = float(numpy.array(data.pop('size')).astype(numpy.float32)) + new_marker = ArUcoMarker.ArUcoMarker(self.__dictionary, identifier, size) - new_marker = ArUcoMarker.ArUcoMarker(self.__dictionary, identifier, size) + # Build marker corners thanks to translation vector and rotation matrix + place_corners = numpy.array([[-size / 2, size / 2, 0], [size / 2, size / 2, 0], [size / 2, -size / 2, 0], [-size / 2, -size / 2, 0]]) + place_corners = place_corners.dot(rmat) + tvec - # Build marker corners thanks to translation vector and rotation matrix - place_corners = numpy.array([[-size/2, size/2, 0], [size/2, size/2, 0], [size/2, -size/2, 0], [-size/2, -size/2, 0]]) - place_corners = place_corners.dot(rmat) + tvec + new_places[identifier] = Place(place_corners, new_marker) - new_places[identifier] = Place(place_corners, new_marker) + # else places are configured using detected markers estimated points + elif isinstance(data, ArUcoMarker.ArUcoMarker): - # else places are configured using detected markers estimated points - elif isinstance(data, ArUcoMarker.ArUcoMarker): + new_places[identifier] = Place(data.points, data) - new_places[identifier] = Place(data.points, data) + # else places are already at expected format + elif (type(identifier) is int) and isinstance(data, Place): - # else places are already at expected format - elif (type(identifier) == int) and isinstance(data, Place): + new_places[identifier] = data - new_places[identifier] = data + self.__places = new_places - self.__places = new_places + @property + def identifiers(self) -> list: + """List place marker identifiers belonging to the group.""" + return list(self.__places.keys()) - @property - def identifiers(self) -> list: - """List place marker identifiers belonging to the group.""" - return list(self.__places.keys()) + @property + def translation(self) -> numpy.array: + """Get ArUco marker group translation vector.""" + return self.__translation - @property - def translation(self) -> numpy.array: - """Get ArUco marker group translation vector.""" - return self.__translation + @translation.setter + def translation(self, tvec): + """Set ArUco marker group translation vector.""" + self.__translation = tvec - @translation.setter - def translation(self, tvec): - """Set ArUco marker group translation vector.""" - self.__translation = tvec + @property + def rotation(self) -> numpy.array: + """Get ArUco marker group rotation matrix.""" + return self.__translation - @property - def rotation(self) -> numpy.array: - """Get ArUco marker group rotation matrix.""" - return self.__translation + @rotation.setter + def rotation(self, rmat): + """Set ArUco marker group rotation matrix.""" + self.__rotation = rmat - @rotation.setter - def rotation(self, rmat): - """Set ArUco marker group rotation matrix.""" - self.__rotation = rmat + def as_dict(self) -> dict: + """Export ArUco marker group properties as dictionary.""" - def as_dict(self) -> dict: - """Export ArUco marker group properties as dictionary.""" + return { + **DataFeatures.PipelineStepObject.as_dict(self), + "dictionary": self.__dictionary, + "places": self.__places + } - return { - **DataFeatures.PipelineStepObject.as_dict(self), - "dictionary": self.__dictionary, - "places": self.__places - } - - @classmethod - def from_obj(cls, obj_filepath: str) -> Self: - """Load ArUco markers group from .obj file. + @classmethod + def from_obj(cls, obj_filepath: str) -> Self: + """Load ArUco markers group from .obj file. - !!! note - Expected object (o) name format: <DICTIONARY>#<IDENTIFIER>_Marker + !!! note + Expected object (o) name format: <DICTIONARY>#<IDENTIFIER>_Marker - !!! note - All markers have to belong to the same dictionary. + !!! note + All markers have to belong to the same dictionary. - """ + """ - new_dictionary = None - new_places = {} - - # Regex rules for .obj file parsing - OBJ_RX_DICT = { - 'object': re.compile(r'o (.*)#([0-9]+)_(.*)\n'), - 'vertices': re.compile(r'v ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+)\n'), - 'face': re.compile(r'f ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)\n'), - 'comment': re.compile(r'#(.*)\n') # keep comment regex after object regex because the # is used in object string too - } + new_dictionary = None + new_places = {} - # Regex .obj line parser - def __parse_obj_line(line): + # Regex rules for .obj file parsing + obj_rx_dict = { + 'object': re.compile(r'o (.*)#([0-9]+)_(.*)\n'), + 'vertices': re.compile(r'v ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+)\n'), + 'face': re.compile(r'f ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)\n'), + 'comment': re.compile(r'#(.*)\n') + # keep comment regex after object regex because the # is used in object string too + } - for key, rx in OBJ_RX_DICT.items(): - match = rx.search(line) - if match: - return key, match + # Regex .obj line parser + def __parse_obj_line(ln): - # If there are no matches - return None, None - - # Start parsing - try: + for k, rx in obj_rx_dict.items(): + m = rx.search(ln) + if m: + return k, m - identifier = None - vertices = [] - faces = {} + # If there are no matches + return None, None - # Open the file and read through it line by line - with open(obj_filepath, 'r') as file: + # Start parsing + try: - line = file.readline() + identifier = None + vertices = [] + faces = {} - while line: + # Open the file and read through it line by line + with open(obj_filepath, 'r') as file: - # At each line check for a match with a regex - key, match = __parse_obj_line(line) + line = file.readline() - # Extract comment - if key == 'comment': - pass + while line: - # Extract marker dictionary and identifier - elif key == 'object': + # At each line check for a match with a regex + key, match = __parse_obj_line(line) - dictionary = str(match.group(1)) - identifier = int(match.group(2)) - last = str(match.group(3)) + # Extract comment + if key == 'comment': + pass - # Init new group dictionary with first dictionary name - if new_dictionary == None: + # Extract marker dictionary and identifier + elif key == 'object': - new_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(dictionary) + dictionary = str(match.group(1)) + identifier = int(match.group(2)) - # Check all others marker dictionary are equal to new group dictionary - elif dictionary != new_dictionary.name: + # Init new group dictionary with first dictionary name + if new_dictionary is None: - raise NameError(f'Marker {identifier} dictionary is not {new_dictionary.name}') + new_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(dictionary) - # Fill vertices array - elif key == 'vertices': + # Check all others marker dictionary are equal to new group dictionary + elif dictionary != new_dictionary.name: - vertices.append(tuple([float(match.group(1)), float(match.group(2)), float(match.group(3))])) + raise NameError(f'Marker {identifier} dictionary is not {new_dictionary.name}') - # Extract vertices ids - elif key == 'face': + # Fill vertices array + elif key == 'vertices': - faces[identifier] = [int(match.group(1)), int(match.group(2)), int(match.group(3)), int(match.group(4))] + vertices.append(tuple([float(match.group(1)), float(match.group(2)), float(match.group(3))])) - # Go to next line - line = file.readline() + # Extract vertices ids + elif key == 'face': - file.close() + faces[identifier] = [int(match.group(1)), int(match.group(2)), int(match.group(3)), int(match.group(4))] - # Retrieve marker vertices thanks to face vertices ids - for identifier, face in faces.items(): + # Go to next line + line = file.readline() - # Gather place corners in clockwise order - cw_corners = numpy.array([ vertices[i-1] for i in reversed(face) ]) + file.close() - # Edit place axis from corners positions - place_x_axis = cw_corners[2] - cw_corners[3] - place_x_axis_norm = numpy.linalg.norm(place_x_axis) - - place_y_axis = cw_corners[0] - cw_corners[3] - place_y_axis_norm = numpy.linalg.norm(place_y_axis) + # Retrieve marker vertices thanks to face vertices ids + for identifier, face in faces.items(): - # Check axis size: they should be almost equal - if math.isclose(place_x_axis_norm, place_y_axis_norm, rel_tol=1e-3): + # Gather place corners in clockwise order + cw_corners = numpy.array([vertices[i - 1] for i in reversed(face)]) - new_marker_size = place_x_axis_norm + # Edit place axis from corners positions + place_x_axis = cw_corners[2] - cw_corners[3] + place_x_axis_norm = numpy.linalg.norm(place_x_axis) - else: + place_y_axis = cw_corners[0] - cw_corners[3] + place_y_axis_norm = numpy.linalg.norm(place_y_axis) - raise ValueError(f'{new_dictionary}#{identifier}_Marker is not a square.') + # Check axis size: they should be almost equal + if math.isclose(place_x_axis_norm, place_y_axis_norm, rel_tol=1e-3): - # Create a new place related to a new marker - new_marker = ArUcoMarker.ArUcoMarker(new_dictionary, identifier, new_marker_size) - new_places[identifier] = Place(cw_corners, new_marker) + new_marker_size = place_x_axis_norm - except IOError: - raise IOError(f'File not found: {obj_filepath}') + else: - # Instantiate ArUco markers group - data = { - 'dictionary': new_dictionary, - 'places': new_places - } + raise ValueError(f'{new_dictionary}#{identifier}_Marker is not a square.') - return ArUcoMarkersGroup(**data) + # Create a new place related to a new marker + new_marker = ArUcoMarker.ArUcoMarker(new_dictionary, identifier, new_marker_size) + new_places[identifier] = Place(cw_corners, new_marker) - def filter_markers(self, detected_markers: dict) -> tuple[dict, dict]: - """Sort markers belonging to the group from given detected markers dict (cf ArUcoDetector.detect_markers()). + except IOError: + raise IOError(f'File not found: {obj_filepath}') - Returns: - dict of markers belonging to this group - dict of remaining markers not belonging to this group - """ + # Instantiate ArUco markers group + data = { + 'dictionary': new_dictionary, + 'places': new_places + } - group_markers = {} - remaining_markers = {} + return ArUcoMarkersGroup(**data) - for (marker_id, marker) in detected_markers.items(): + def filter_markers(self, detected_markers: dict) -> tuple[dict, dict]: + """Sort markers belonging to the group from given detected markers dict (cf ArUcoDetector.detect_markers()). - if marker_id in self.__places.keys(): + Returns: + dict of markers belonging to this group + dict of remaining markers not belonging to this group + """ - group_markers[marker_id] = marker + group_markers = {} + remaining_markers = {} - else: - - remaining_markers[marker_id] = marker + for (marker_id, marker) in detected_markers.items(): - return group_markers, remaining_markers + if marker_id in self.__places.keys(): - def estimate_pose_from_markers_corners(self, markers: dict, K: numpy.array, D: numpy.array) -> tuple[bool, numpy.array, numpy.array]: - """Estimate pose from markers corners and places corners. + group_markers[marker_id] = marker - Parameters: - markers: detected markers to use for pose estimation. - K: intrinsic camera parameters - D: camera distorsion matrix + else: - Returns: - success: True if the pose estimation succeeded - tvec: scene translation vector - rvec: scene rotation vector - """ + remaining_markers[marker_id] = marker - markers_corners_2d = [] - places_corners_3d = [] + return group_markers, remaining_markers - for identifier, marker in markers.items(): + def estimate_pose_from_markers_corners(self, markers: dict, k: numpy.array, d: numpy.array) -> tuple[ + bool, numpy.array, numpy.array]: + """Estimate pose from markers corners and places corners. - try: + Parameters: + markers: detected markers to use for pose estimation. + k: intrinsic camera parameters + d: camera distortion matrix - place = self.__places[identifier] + Returns: + success: True if the pose estimation succeeded + tvec: scene translation vector + rvec: scene rotation vector + """ - for marker_corner in marker.corners: - markers_corners_2d.append(list(marker_corner)) + markers_corners_2d = [] + places_corners_3d = [] - for place_corner in place.corners: - places_corners_3d.append(list(place_corner)) + for identifier, marker in markers.items(): - except KeyError: + try: - raise ValueError(f'Marker {marker.identifier} doesn\'t belong to the group.') + place = self.__places[identifier] - # SolvPnP using cv2.SOLVEPNP_SQPNP flag - # TODO: it works also with cv2.SOLVEPNP_EPNP flag so we need to test which is the faster. - # About SolvPnP flags: https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html - success, rvec, tvec = cv2.solvePnP(numpy.array(places_corners_3d), numpy.array(markers_corners_2d), numpy.array(K), numpy.array(D), flags=cv2.SOLVEPNP_SQPNP) + for marker_corner in marker.corners: + markers_corners_2d.append(list(marker_corner)) - # Refine pose estimation using Gauss-Newton optimisation - if success : + for place_corner in place.corners: + places_corners_3d.append(list(place_corner)) - rvec, tvec = cv2.solvePnPRefineVVS(numpy.array(places_corners_3d), numpy.array(markers_corners_2d), numpy.array(K), numpy.array(D), rvec, tvec) + except KeyError: - self.__translation = tvec.T - self.__rotation = rvec.T + raise ValueError(f'Marker {marker.identifier} doesn\'t belong to the group.') - return success, self.__translation, self.__rotation + # SolvPnP using cv2.SOLVEPNP_SQPNP flag + # TODO: it works also with cv2.SOLVEPNP_EPNP flag so we need to test which is the faster. + # About SolvPnP flags: https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html + success, rvec, tvec = cv2.solvePnP(numpy.array(places_corners_3d), numpy.array(markers_corners_2d), numpy.array(k), numpy.array(d), flags=cv2.SOLVEPNP_SQPNP) - def draw_axes(self, image: numpy.array, K, D, thickness: int = 0, length: float = 0): - """Draw group axes.""" + # Refine pose estimation using Gauss-Newton optimisation + if success: + rvec, tvec = cv2.solvePnPRefineVVS(numpy.array(places_corners_3d), numpy.array(markers_corners_2d), numpy.array(k), numpy.array(d), rvec, tvec) - try: - axisPoints = numpy.float32([[length, 0, 0], [0, length, 0], [0, 0, length], [0, 0, 0]]).reshape(-1, 3) - axisPoints, _ = cv2.projectPoints(axisPoints, self.__rotation, self.__translation, numpy.array(K), numpy.array(D)) - axisPoints = axisPoints.astype(int) + self.__translation = tvec.T + self.__rotation = rvec.T - cv2.line(image, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (0, 0, 255), thickness) # X (red) - cv2.line(image, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (0, 255, 0), thickness) # Y (green) - cv2.line(image, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (255, 0, 0), thickness) # Z (blue) + return success, self.__translation, self.__rotation - # Ignore errors due to out of field axis: their coordinate are larger than int32 limitations. - except cv2.error: - pass + def draw_axes(self, image: numpy.array, k: numpy.array, d: numpy.array, thickness: int = 0, length: float = 0): + """Draw group axes.""" - def draw_places(self, image: numpy.array, K, D, color: tuple = None, border_size: int = 0): - """Draw group places.""" + try: + axis_points = numpy.float32([[length, 0, 0], [0, length, 0], [0, 0, length], [0, 0, 0]]).reshape(-1, 3) + axis_points, _ = cv2.projectPoints(axis_points, self.__rotation, self.__translation, numpy.array(k), numpy.array(d)) + axis_points = axis_points.astype(int) - l = self.marker_size / 2 + cv2.line(image, tuple(axis_points[3].ravel()), tuple(axis_points[0].ravel()), (0, 0, 255), thickness) # X (red) + cv2.line(image, tuple(axis_points[3].ravel()), tuple(axis_points[1].ravel()), (0, 255, 0), thickness) # Y (green) + cv2.line(image, tuple(axis_points[3].ravel()), tuple(axis_points[2].ravel()), (255, 0, 0), thickness) # Z (blue) - for identifier, place in self.__places.items(): + # Ignore errors due to out of field axis: their coordinate are larger than int32 limitations. + except cv2.error: + pass - try: + def draw_places(self, image: numpy.array, k: numpy.array, d: numpy.array, color: tuple = None, border_size: int = 0): + """Draw group places.""" - placePoints, _ = cv2.projectPoints(place.corners, self.__rotation, self.__translation, numpy.array(K), numpy.array(D)) - placePoints = placePoints.astype(int) - - cv2.line(image, tuple(placePoints[0].ravel()), tuple(placePoints[1].ravel()), color, border_size) - cv2.line(image, tuple(placePoints[1].ravel()), tuple(placePoints[2].ravel()), color, border_size) - cv2.line(image, tuple(placePoints[2].ravel()), tuple(placePoints[3].ravel()), color, border_size) - cv2.line(image, tuple(placePoints[3].ravel()), tuple(placePoints[0].ravel()), color, border_size) + for identifier, place in self.__places.items(): - # Ignore errors due to out of field places: their coordinate are larger than int32 limitations. - except cv2.error: - pass + try: - def draw(self, image: numpy.array, K: numpy.array, D: numpy.array, draw_axes: dict = None, draw_places: dict = None): - """Draw group axes and places. - - Parameters: - image: where to draw. - K: - D: - draw_axes: draw_axes parameters (if None, no axes drawn) - draw_places: draw_places parameters (if None, no places drawn) - """ + place_points, _ = cv2.projectPoints(place.corners, self.__rotation, self.__translation, numpy.array(k), numpy.array(d)) + place_points = place_points.astype(int) - # Draw axes if required - if draw_axes is not None: + cv2.line(image, tuple(place_points[0].ravel()), tuple(place_points[1].ravel()), color, border_size) + cv2.line(image, tuple(place_points[1].ravel()), tuple(place_points[2].ravel()), color, border_size) + cv2.line(image, tuple(place_points[2].ravel()), tuple(place_points[3].ravel()), color, border_size) + cv2.line(image, tuple(place_points[3].ravel()), tuple(place_points[0].ravel()), color, border_size) - self.draw_axes(image, K, D, **draw_axes) + # Ignore errors due to out of field places: their coordinate are larger than int32 limitations. + except cv2.error: + pass - # Draw places if required - if draw_places is not None: + def draw(self, image: numpy.array, k: numpy.array, d: numpy.array, draw_axes: dict = None, draw_places: dict = None): + """Draw group axes and places. + + Parameters: + image: where to draw. + k: intrinsic camera parameters + d: camera distortion matrix + draw_axes: draw_axes parameters (if None, no axes drawn) + draw_places: draw_places parameters (if None, no places drawn) + """ - self.draw_places(image, K, D, **draw_places) + # Draw axes if required + if draw_axes is not None: + self.draw_axes(image, k, d, **draw_axes) - def to_obj(self, obj_filepath): - """Save group to .obj file.""" + # Draw places if required + if draw_places is not None: + self.draw_places(image, k, d, **draw_places) - with open(obj_filepath, 'w', encoding='utf-8') as file: + def to_obj(self, obj_filepath): + """Save group to .obj file.""" - file.write('# ArGaze OBJ File\n') - file.write('# http://achil.recherche.enac.fr/features/eye/argaze/\n') + with open(obj_filepath, 'w', encoding='utf-8') as file: - v_count = 0 + file.write('# ArGaze OBJ File\n') + file.write('# https://achil.recherche.enac.fr/features/eye/argaze/\n') - for p, (identifier, place) in enumerate(self.__places.items()): + v_count = 0 - file.write(f'o {self.__dictionary.name}#{identifier}_Marker\n') + for p, (identifier, place) in enumerate(self.__places.items()): - vertices = '' + file.write(f'o {self.__dictionary.name}#{identifier}_Marker\n') - # Write vertices in reverse order - for v in [3, 2, 1, 0]: + vertices = '' - file.write(f'v {" ".join(map(str, place.corners[v]))}\n') - v_count += 1 + # Write vertices in reverse order + for v in [3, 2, 1, 0]: + file.write(f'v {" ".join(map(str, place.corners[v]))}\n') + v_count += 1 - vertices += f' {v_count}' + vertices += f' {v_count}' - #file.write('s off\n') - file.write(f'f{vertices}\n') + # file.write('s off\n') + file.write(f'f{vertices}\n') diff --git a/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py b/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py index 12cbc54..459a03e 100644 --- a/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py +++ b/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py index b818dff..bb7bdbf 100644 --- a/src/argaze/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze/ArUcoMarkers/ArUcoScene.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py index 2c8f003..0affe35 100644 --- a/src/argaze/AreaOfInterest/AOI2DScene.py +++ b/src/argaze/AreaOfInterest/AOI2DScene.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/AreaOfInterest/AOI3DScene.py b/src/argaze/AreaOfInterest/AOI3DScene.py index 1964d23..955b910 100644 --- a/src/argaze/AreaOfInterest/AOI3DScene.py +++ b/src/argaze/AreaOfInterest/AOI3DScene.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index 7da5bb5..fe4af2c 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 4e85aaf..fe7a5ac 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" @@ -894,12 +894,12 @@ class PipelineStepObject(): self.__name = name @property - def parent(self) -> object: + def parent(self) -> Self: """Get pipeline step object's parent object.""" return self.__parent @parent.setter - def parent(self, parent: object): + def parent(self, parent: Self): """Set layer's parent object.""" self.__parent = parent diff --git a/src/argaze/GazeAnalysis/Basic.py b/src/argaze/GazeAnalysis/Basic.py index ec98b30..cfe3eeb 100644 --- a/src/argaze/GazeAnalysis/Basic.py +++ b/src/argaze/GazeAnalysis/Basic.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py index 6847f44..5b51bbd 100644 --- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py +++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index a860e47..eb90a2a 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/Entropy.py b/src/argaze/GazeAnalysis/Entropy.py index 5bac43e..2f98d2c 100644 --- a/src/argaze/GazeAnalysis/Entropy.py +++ b/src/argaze/GazeAnalysis/Entropy.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/ExploreExploitRatio.py b/src/argaze/GazeAnalysis/ExploreExploitRatio.py index 3b2d53b..b907a5c 100644 --- a/src/argaze/GazeAnalysis/ExploreExploitRatio.py +++ b/src/argaze/GazeAnalysis/ExploreExploitRatio.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/FocusPointInside.py b/src/argaze/GazeAnalysis/FocusPointInside.py index 5d26650..361ea75 100644 --- a/src/argaze/GazeAnalysis/FocusPointInside.py +++ b/src/argaze/GazeAnalysis/FocusPointInside.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/KCoefficient.py b/src/argaze/GazeAnalysis/KCoefficient.py index 9980dfe..9e2f317 100644 --- a/src/argaze/GazeAnalysis/KCoefficient.py +++ b/src/argaze/GazeAnalysis/KCoefficient.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/LempelZivComplexity.py b/src/argaze/GazeAnalysis/LempelZivComplexity.py index 810dbba..696d343 100644 --- a/src/argaze/GazeAnalysis/LempelZivComplexity.py +++ b/src/argaze/GazeAnalysis/LempelZivComplexity.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/LinearRegression.py b/src/argaze/GazeAnalysis/LinearRegression.py index 5a823a1..df3fab2 100644 --- a/src/argaze/GazeAnalysis/LinearRegression.py +++ b/src/argaze/GazeAnalysis/LinearRegression.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/NGram.py b/src/argaze/GazeAnalysis/NGram.py index ca60734..fc7f2e4 100644 --- a/src/argaze/GazeAnalysis/NGram.py +++ b/src/argaze/GazeAnalysis/NGram.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/NearestNeighborIndex.py b/src/argaze/GazeAnalysis/NearestNeighborIndex.py index 81bab22..98a95a1 100644 --- a/src/argaze/GazeAnalysis/NearestNeighborIndex.py +++ b/src/argaze/GazeAnalysis/NearestNeighborIndex.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/TransitionMatrix.py b/src/argaze/GazeAnalysis/TransitionMatrix.py index 8012f5e..567bd39 100644 --- a/src/argaze/GazeAnalysis/TransitionMatrix.py +++ b/src/argaze/GazeAnalysis/TransitionMatrix.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index 78cc170..f5fb069 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 5777a8d..43c6cc7 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" @@ -29,1191 +29,1199 @@ from argaze.AreaOfInterest import AOIFeatures class GazePosition(tuple, DataFeatures.TimestampedObject): - """Define gaze position as a tuple of coordinates with precision. + """Define gaze position as a tuple of coordinates with precision. - Parameters: - precision: the radius of a circle around value where other same gaze position measurements could be. - message: a string to describe why the position is what it is. - """ + Parameters: + precision: the radius of a circle around value where other same gaze position measurements could be. + message: a string to describe why the position is what it is. + """ - def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan): + def __new__(cls, position: tuple = (), precision: int | float = None, message: str = None, + timestamp: int | float = math.nan): - return tuple.__new__(cls, position) + return tuple.__new__(cls, position) - def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan): + def __init__(self, position: tuple = (), precision: int | float = None, message: str = None, + timestamp: int | float = math.nan): - DataFeatures.TimestampedObject.__init__(self, timestamp) - self.__precision = precision - self.__message = message + DataFeatures.TimestampedObject.__init__(self, timestamp) + self.__precision = precision + self.__message = message - @property - def value(self): - """Get position's tuple value.""" - return tuple(self) + @property + def value(self): + """Get position's tuple value.""" + return tuple(self) - @property - def precision(self): - """Get position's precision.""" - return self.__precision + @property + def precision(self): + """Get position's precision.""" + return self.__precision - @property - def message(self): - """Get position's message.""" - return self.__message + @property + def message(self): + """Get position's message.""" + return self.__message - @classmethod - def from_dict(cls, position_data: dict) -> Self: + @classmethod + def from_dict(cls, position_data: dict) -> Self: - if 'value' in position_data.keys(): + if 'value' in position_data.keys(): - value = position_data.pop('value') - return GazePosition(value, **position_data) + value = position_data.pop('value') + return GazePosition(value, **position_data) - else: + else: - return GazePosition(**position_data) + return GazePosition(**position_data) - def __bool__(self) -> bool: - """Is the position value valid?""" - return len(self) > 0 + def __bool__(self) -> bool: + """Is the position value valid?""" + return len(self) > 0 - def __repr__(self): - """String representation""" + def __repr__(self): + """String representation""" - return json.dumps(DataFeatures.as_dict(self)) + return json.dumps(DataFeatures.as_dict(self)) - def __add__(self, position: Self) -> Self: - """Add position. + def __add__(self, position: Self) -> Self: + """Add position. - !!! note - The returned position precision is the maximal precision. + !!! note + The returned position precision is the maximal precision. - !!! note - The returned position timestamp is the self object timestamp. - """ - if self.__precision is not None and position.precision is not None: + !!! note + The returned position timestamp is the self object timestamp. + """ + if self.__precision is not None and position.precision is not None: - return GazePosition(tuple(numpy.array(self) + numpy.array(position)), precision = max(self.__precision, position.precision), timestamp = self.timestamp) + return GazePosition(tuple(numpy.array(self) + numpy.array(position)), + precision=max(self.__precision, position.precision), timestamp=self.timestamp) - else: + else: - return GazePosition(tuple(numpy.array(self) + numpy.array(position)), timestamp = self.timestamp) + return GazePosition(tuple(numpy.array(self) + numpy.array(position)), timestamp=self.timestamp) - __radd__ = __add__ + __radd__ = __add__ - def __sub__(self, position: Self) -> Self: - """Subtract position. + def __sub__(self, position: Self) -> Self: + """Subtract position. - !!! note - The returned position precision is the maximal precision. + !!! note + The returned position precision is the maximal precision. - !!! note - The returned position timestamp is the self object timestamp. - """ - if self.__precision is not None and position.precision is not None: + !!! note + The returned position timestamp is the self object timestamp. + """ + if self.__precision is not None and position.precision is not None: - return GazePosition(tuple(numpy.array(self) - numpy.array(position)), precision = max(self.__precision, position.precision), timestamp = self.timestamp) + return GazePosition(tuple(numpy.array(self) - numpy.array(position)), + precision=max(self.__precision, position.precision), timestamp=self.timestamp) - else: + else: - return GazePosition(tuple(numpy.array(self) - numpy.array(position)), timestamp = self.timestamp) + return GazePosition(tuple(numpy.array(self) - numpy.array(position)), timestamp=self.timestamp) - def __rsub__(self, position: Self) -> Self: - """Reversed subtract position. + def __rsub__(self, position: Self) -> Self: + """Reversed subtract position. - !!! note - The returned position precision is the maximal precision. + !!! note + The returned position precision is the maximal precision. - !!! note - The returned position timestamp is the self object timestamp. - """ - if self.__precision is not None and position.precision is not None: - - return GazePosition(tuple(numpy.array(position) - numpy.array(self)), precision = max(self.__precision, position.precision), timestamp = self.timestamp) + !!! note + The returned position timestamp is the self object timestamp. + """ + if self.__precision is not None and position.precision is not None: - else: + return GazePosition(tuple(numpy.array(position) - numpy.array(self)), + precision=max(self.__precision, position.precision), timestamp=self.timestamp) - return GazePosition(tuple(numpy.array(position) - numpy.array(self)), timestamp = self.timestamp) + else: - def __mul__(self, factor: int|float) -> Self: - """Multiply position by a factor. + return GazePosition(tuple(numpy.array(position) - numpy.array(self)), timestamp=self.timestamp) - !!! note - The returned position precision is also multiplied by the factor. + def __mul__(self, factor: int | float | tuple) -> Self: + """Multiply position by a factor. - !!! note - The returned position timestamp is the self object timestamp. - """ - return GazePosition(tuple(numpy.array(self) * factor), precision = self.__precision * factor if self.__precision is not None else None, timestamp = self.timestamp) + !!! note + The returned position precision is also multiplied by the factor. - def __pow__(self, factor: int|float) -> Self: - """Power position by a factor. + !!! note + The returned position timestamp is the self object timestamp. + """ + return GazePosition(tuple(numpy.array(self) * factor), precision=self.__precision * factor if self.__precision is not None else None, timestamp=self.timestamp) - !!! note - The returned position precision is also powered by the factor. + def __pow__(self, factor: int | float) -> Self: + """Power position by a factor. - !!! note - The returned position timestamp is the self object timestamp. - """ - return GazePosition(tuple(numpy.array(self) ** factor), precision = self.__precision ** factor if self.__precision is not None else None, timestamp = self.timestamp) + !!! note + The returned position precision is also powered by the factor. - def distance(self, gaze_position) -> float: - """Distance to another gaze positions.""" + !!! note + The returned position timestamp is the self object timestamp. + """ + return GazePosition(tuple(numpy.array(self) ** factor), + precision=self.__precision ** factor if self.__precision is not None else None, + timestamp=self.timestamp) - distance = (self[0] - gaze_position[0])**2 + (self[1] - gaze_position[1])**2 - distance = numpy.sqrt(distance) + def distance(self, gaze_position) -> float: + """Distance to another gaze positions.""" - return distance + distance = (self[0] - gaze_position[0]) ** 2 + (self[1] - gaze_position[1]) ** 2 + distance = numpy.sqrt(distance) - def overlap(self, gaze_position, both=False) -> float: - """Does this gaze position overlap another gaze position considering its precision? - Set both to True to test if the other gaze position overlaps this one too.""" + return distance - distance = numpy.sqrt(numpy.sum((self - gaze_position)**2)) + def overlap(self, gaze_position, both=False) -> float: + """Does this gaze position overlap another gaze position considering its precision? + Set both to True to test if the other gaze position overlaps this one too.""" - if both: - return distance < min(self.__precision, gaze_position.precision) - else: - return distance < self.__precision + distance = numpy.sqrt(numpy.sum((self - gaze_position) ** 2)) - def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True): - """Draw gaze position point and precision circle.""" + if both: + return distance < min(self.__precision, gaze_position.precision) + else: + return distance < self.__precision - if self: + def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True): + """Draw gaze position point and precision circle.""" - int_value = (int(self[0]), int(self[1])) + if self: - # Draw point at position if required - if color is not None: - cv2.circle(image, int_value, size, color, -1) + int_value = (int(self[0]), int(self[1])) + + # Draw point at position if required + if color is not None: + cv2.circle(image, int_value, size, color, -1) + + # Draw precision circle + if self.__precision is not None and draw_precision: + cv2.circle(image, int_value, round(self.__precision), color, 1) - # Draw precision circle - if self.__precision is not None and draw_precision: - cv2.circle(image, int_value, round(self.__precision), color, 1) class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): - """Handle timestamped gaze positions into a list.""" - - def __init__(self, gaze_positions=None): + """Handle timestamped gaze positions into a list.""" - if gaze_positions is None: - gaze_positions = [] + def __init__(self, gaze_positions=None): - DataFeatures.TimestampedObjectsList.__init__(self, GazePosition, gaze_positions) + if gaze_positions is None: + gaze_positions = [] - def values(self) -> list: - """Get all timestamped position values as list of tuple.""" - return [tuple(ts_position) for ts_position in self] + DataFeatures.TimestampedObjectsList.__init__(self, GazePosition, gaze_positions) - ''' Is it still needed as there is a TimestampedObjectsList.from_json method? - @classmethod - def from_json(self, json_filepath: str) -> TimeStampedGazePositions: - """Create a TimeStampedGazePositions from .json file.""" + def values(self) -> list: + """Get all timestamped position values as list of tuple.""" + return [tuple(ts_position) for ts_position in self] - with open(json_filepath, encoding='utf-8') as ts_positions_file: + ''' Is it still needed as there is a TimestampedObjectsList.from_json method? + @classmethod + def from_json(self, json_filepath: str) -> TimeStampedGazePositions: + """Create a TimeStampedGazePositions from .json file.""" - json_positions = json.load(ts_positions_file) + with open(json_filepath, encoding='utf-8') as ts_positions_file: - return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions}) - ''' + json_positions = json.load(ts_positions_file) - @classmethod - def from_dataframe(cls, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> Self: - """Create a TimeStampedGazePositions from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). + return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions}) + ''' - Parameters: - dataframe: - timestamp: specific timestamp column label. - x: specific x column label. - y: specific y column label. - precision: specific precision column label if exist. - message: specific message column label if exist. - """ + @classmethod + def from_dataframe(cls, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, + message: str = None) -> Self: + """Create a TimeStampedGazePositions from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). - # Copy columns - columns = (timestamp, x, y) + Parameters: + dataframe: + timestamp: specific timestamp column label. + x: specific x column label. + y: specific y column label. + precision: specific precision column label if exist. + message: specific message column label if exist. + """ - if precision is not None: + # Copy columns + columns = (timestamp, x, y) - columns += (precision,) + if precision is not None: + columns += (precision,) - if message is not None: + if message is not None: + columns += (message,) - columns += (message,) + df = dataframe.loc[:, columns] - df = dataframe.loc[:, columns] + # Merge x and y columns into one 'value' column + df['value'] = tuple(zip(df[x], df[y])) + df.drop(columns=[x, y], inplace=True, axis=1) - # Merge x and y columns into one 'value' column - df['value'] = tuple(zip(df[x], df[y])) - df.drop(columns=[x, y], inplace=True, axis=1) + # Replace tuple values containing NaN values by () + df['value'] = df.apply(lambda row: () if pandas.isnull(list(row.value)).any() else row.value, axis=True) - # Replace tuple values containing NaN values by () - df['value'] = df.apply(lambda row: () if pandas.isnull(list(row.value)).any() else row.value, axis=True) + # Handle precision data + if precision: - # Handle precision data - if precision: + # Rename precision column into 'precision' column + df.rename(columns={precision: 'precision'}, inplace=True) - # Rename precision column into 'precision' column - df.rename(columns={precision: 'precision'}, inplace=True) + else: - else: + # Append a None precision column + df['precision'] = df.apply(lambda row: None, axis=True) - # Append a None precision column - df['precision'] = df.apply(lambda row: None, axis=True) + # Handle message data + if message: - # Handle message data - if message: + # Rename message column into 'message' column + df.rename(columns={precision: 'message'}, inplace=True) - # Rename message column into 'message' column - df.rename(columns={precision: 'message'}, inplace=True) + else: - else: + # Append a None message column + df['message'] = df.apply(lambda row: None, axis=True) - # Append a None message column - df['message'] = df.apply(lambda row: None, axis=True) + # Rename timestamp column into 'timestamp' column + df.rename(columns={timestamp: 'timestamp'}, inplace=True) - # Rename timestamp column into 'timestamp' column - df.rename(columns={timestamp: 'timestamp'}, inplace=True) - - # Filter duplicate timestamps - df = df[df.timestamp.duplicated() == False] + # Filter duplicate timestamps + df = df[df.timestamp.duplicated() == False] + + # Create timestamped gaze positions + return TimeStampedGazePositions(df.apply( + lambda row: GazePosition(row.value, precision=row.precision, message=row.message, timestamp=row.timestamp), + axis=True)) - # Create timestamped gaze positions - return TimeStampedGazePositions(df.apply(lambda row: GazePosition(row.value, precision=row.precision, message=row.message, timestamp=row.timestamp), axis=True)) class GazePositionCalibrationFailed(Exception): - """Exception raised by GazePositionCalibrator.""" + """Exception raised by GazePositionCalibrator.""" - def __init__(self, message): + def __init__(self, message): + super().__init__(message) - super().__init__(message) class GazePositionCalibrator(DataFeatures.PipelineStepObject): - """Abstract class to define what should provide a gaze position calibrator algorithm.""" + """Abstract class to define what should provide a gaze position calibrator algorithm.""" - # noinspection PyMissingConstructor - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + pass - pass + def store(self, observed_gaze_position: GazePosition, expected_gaze_position: GazePosition): + """Store observed and expected gaze positions. - def store(self, observed_gaze_position: GazePosition, expected_gaze_position: GazePosition): - """Store observed and expected gaze positions. + Parameters: + observed_gaze_position: where gaze position actually is + expected_gaze_position: where gaze position should be + """ - Parameters: - observed_gaze_position: where gaze position actually is - expected_gaze_position: where gaze position should be - """ + raise NotImplementedError('calibrate() method not implemented') - raise NotImplementedError('calibrate() method not implemented') + def reset(self): + """Reset observed and expected gaze positions.""" - def reset(self): - """Reset observed and expected gaze positions.""" + raise NotImplementedError('reset() method not implemented') - raise NotImplementedError('reset() method not implemented') + def calibrate(self) -> any: + """Process calibration from observed and expected gaze positions. - def calibrate(self) -> any: - """Process calibration from observed and expected gaze positions. + Returns: + calibration outputs: any data returned to assess calibration + """ - Returns: - calibration outputs: any data returned to assess calibration - """ + raise NotImplementedError('terminate() method not implemented') - raise NotImplementedError('terminate() method not implemented') + def apply(self, observed_gaze_position: GazePosition) -> GazePosition: + """Apply calibration onto observed gaze position. - def apply(self, observed_gaze_position: GazePosition) -> GazePosition: - """Apply calibration onto observed gaze position. + Parameters: + observed_gaze_position: where gaze position actually is - Parameters: - observed_gaze_position: where gaze position actually is + Returns: + expected_gaze_position: where gaze position should be if the calibrator is ready else, observed gaze position + """ - Returns: - expected_gaze_position: where gaze position should be if the calibrator is ready else, observed gaze position - """ + raise NotImplementedError('apply() method not implemented') - raise NotImplementedError('apply() method not implemented') + def draw(self, image: numpy.array, **kwargs): + """Draw calibration into image. + + Parameters: + image: where to draw + """ - def draw(self, image: numpy.array, **kwargs): - """Draw calibration into image. - - Parameters: - image: where to draw - """ + raise NotImplementedError('draw() method not implemented') - raise NotImplementedError('draw() method not implemented') + def is_calibrating(self) -> bool: + """Is the calibration running?""" - def is_calibrating(self) -> bool: - """Is the calibration running?""" + raise NotImplementedError('ready getter not implemented') - raise NotImplementedError('ready getter not implemented') class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): - """Define abstract gaze movement class as timestamped gaze positions list. - - !!! note - Gaze movement timestamp is always equal to its first position timestamp. + """Define abstract gaze movement class as timestamped gaze positions list. - Parameters: - positions: timestamp gaze positions. - finished: is the movement finished? - message: a string to describe why the movement is what it is. - """ + !!! note + Gaze movement timestamp is always equal to its first position timestamp. - def __new__(cls, positions: TimeStampedGazePositions = None, finished: bool = False, - message: str = None, timestamp: int|float = math.nan): + Parameters: + positions: timestamp gaze positions. + finished: is the movement finished? + message: a string to describe why the movement is what it is. + """ - # noinspection PyArgumentList - return TimeStampedGazePositions.__new__(cls, positions) + def __new__(cls, positions: TimeStampedGazePositions = None, finished: bool = False, + message: str = None, timestamp: int | float = math.nan): - def __init__(self, positions: TimeStampedGazePositions = None, finished: bool = False, - message: str = None, timestamp: int|float = math.nan): - """Initialize GazeMovement""" + # noinspection PyArgumentList + return TimeStampedGazePositions.__new__(cls, positions) - TimeStampedGazePositions.__init__(self, positions) - DataFeatures.TimestampedObject.__init__(self, timestamp) + def __init__(self, positions: TimeStampedGazePositions = None, finished: bool = False, + message: str = None, timestamp: int | float = math.nan): + """Initialize GazeMovement""" - self.__finished = finished - self.__message = message + TimeStampedGazePositions.__init__(self, positions) + DataFeatures.TimestampedObject.__init__(self, timestamp) - @property - def timestamp(self) -> int|float: - """Get first position timestamp.""" - if self: - return self[0].timestamp + self.__finished = finished + self.__message = message - def is_timestamped(self) -> bool: - """If first position exist, the movement is timestamped.""" - return bool(self) + @property + def timestamp(self) -> int | float: + """Get first position timestamp.""" + if self: + return self[0].timestamp - @timestamp.setter - def timestamp(self, timestamp: int|float): - """Block gaze movement timestamp setting.""" - raise('GazeMovement timestamp is first position timestamp.') + def is_timestamped(self) -> bool: + """If first position exist, the movement is timestamped.""" + return bool(self) - def is_finished(self) -> bool: - """Is the movement finished?""" - return self.__finished + @timestamp.setter + def timestamp(self, timestamp: int | float): + """Block gaze movement timestamp setting.""" + raise ('GazeMovement timestamp is first position timestamp.') - def finish(self) -> Self: - """Set gaze movement as finished""" - self.__finished = True - return self + def is_finished(self) -> bool: + """Is the movement finished?""" + return self.__finished - @property - def message(self): - """Get movement's message.""" - return self.__message + def finish(self) -> Self: + """Set gaze movement as finished""" + self.__finished = True + return self - @property - def amplitude(self): - """Get inferred amplitude from first and last positions.""" - if self: + @property + def message(self): + """Get movement's message.""" + return self.__message - return numpy.linalg.norm(self[0] - self[-1]) + @property + def amplitude(self): + """Get inferred amplitude from first and last positions.""" + if self: - else: + return numpy.linalg.norm(self[0] - self[-1]) - return 0 + else: - def __str__(self) -> str: - """String display""" + return 0 - if self: + def __str__(self) -> str: + """String display""" - output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.is_finished()}' + if self: - for position in self: + output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.is_finished()}' - output += f'\n\t{position.timestamp}:\n\t\tvalue={position},\n\t\tprecision={position.precision}' + for position in self: + output += f'\n\t{position.timestamp}:\n\t\tvalue={position},\n\t\tprecision={position.precision}' - else: + else: - output = f'{type(self)}' + output = f'{type(self)}' - return output + return output - def draw_positions(self, image: numpy.array, position_color: tuple = None, line_color: tuple = None): - """Draw gaze movement positions with line between each position. - - Parameters: - image: where to draw - position_color: color of position point - line_color: color of line between each position - """ + def draw_positions(self, image: numpy.array, position_color: tuple = None, line_color: tuple = None): + """Draw gaze movement positions with line between each position. + + Parameters: + image: where to draw + position_color: color of position point + line_color: color of line between each position + """ - positions = self.copy() + positions = self.copy() - while len(positions) >= 2: + while len(positions) >= 2: - start_gaze_position = positions.pop(0) - next_gaze_position = positions[0] + start_gaze_position = positions.pop(0) + next_gaze_position = positions[0] - # Draw line between positions if required - if line_color is not None: + # Draw line between positions if required + if line_color is not None: + cv2.line(image, (int(start_gaze_position[0]), int(start_gaze_position[1])), + (int(next_gaze_position[0]), int(next_gaze_position[1])), line_color, 1) - cv2.line(image, (int(start_gaze_position[0]), int(start_gaze_position[1])), (int(next_gaze_position[0]), int(next_gaze_position[1])), line_color, 1) + # Draw position if required + if position_color is not None: + start_gaze_position.draw(image, position_color, draw_precision=False) - # Draw position if required - if position_color is not None: + def draw(self, image: numpy.array, **kwargs): + """Draw gaze movement into image.""" - start_gaze_position.draw(image, position_color, draw_precision=False) + raise NotImplementedError('draw() method not implemented') - def draw(self, image: numpy.array, **kwargs): - """Draw gaze movement into image.""" - - raise NotImplementedError('draw() method not implemented') class Fixation(GazeMovement): - """Define abstract fixation as gaze movement.""" + """Define abstract fixation as gaze movement.""" - def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, + message: str = None, **kwargs): + super().__init__(positions, finished, message, **kwargs) - super().__init__(positions, finished, message, **kwargs) + self._focus = () - self._focus = () + @property + def focus(self) -> tuple: + """Get representative position of the fixation.""" + return self._focus - @property - def focus(self) -> tuple: - """Get representative position of the fixation.""" - return self._focus + @focus.setter + def focus(self, focus: tuple): + """Set representative position of the fixation.""" + self._focus = focus - @focus.setter - def focus(self, focus: tuple): - """Set representative position of the fixation.""" - self._focus = focus + def merge(self, fixation) -> Self: + """Merge another fixation into this fixation.""" - def merge(self, fixation) -> Self: - """Merge another fixation into this fixation.""" + raise NotImplementedError('merge() method not implemented') - raise NotImplementedError('merge() method not implemented') def is_fixation(gaze_movement): - """Is a gaze movement a fixation?""" + """Is a gaze movement a fixation?""" + + return type(gaze_movement).__bases__[0] == Fixation or type(gaze_movement) == Fixation - return type(gaze_movement).__bases__[0] == Fixation or type(gaze_movement) == Fixation class Saccade(GazeMovement): - """Define abstract saccade as gaze movement.""" + """Define abstract saccade as gaze movement.""" - def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, + message: str = None, **kwargs): + super().__init__(positions, finished, message, **kwargs) - super().__init__(positions, finished, message, **kwargs) def is_saccade(gaze_movement): - """Is a gaze movement a saccade?""" + """Is a gaze movement a saccade?""" + + return type(gaze_movement).__bases__[0] == Saccade or type(gaze_movement) == Saccade - return type(gaze_movement).__bases__[0] == Saccade or type(gaze_movement) == Saccade class TimeStampedGazeMovements(DataFeatures.TimestampedObjectsList): - """Handle timestamped gaze movements into a list""" + """Handle timestamped gaze movements into a list""" - def __init__(self, gaze_movements: list = []): + def __init__(self, gaze_movements: list = []): + DataFeatures.TimestampedObjectsList.__init__(self, GazeMovement, gaze_movements) - DataFeatures.TimestampedObjectsList.__init__(self, GazeMovement, gaze_movements) class GazeStatus(list, DataFeatures.TimestampedObject): - """Define gaze status as a list of 1 or 2 (index, GazeMovement) tuples. + """Define gaze status as a list of 1 or 2 (index, GazeMovement) tuples. - Parameters: - position: the position that the status represents. - """ + Parameters: + position: the position that the status represents. + """ - def __init__(self, position: GazePosition): + def __init__(self, position: GazePosition): + DataFeatures.TimestampedObject.__init__(self, timestamp=position.timestamp) - DataFeatures.TimestampedObject.__init__(self, timestamp=position.timestamp) + self.__position = position - self.__position = position + @property + def position(self) -> GazePosition: + """Get gaze status position.""" + return self.__position - @property - def position(self) -> GazePosition: - """Get gaze status position.""" - return self.__position + def append(self, movement_index: int, movement_type: type): + """Append movement index and type.""" - def append(self, movement_index: int, movement_type:type): - """Append movement index and type.""" + super().append((movement_index, movement_type)) - super().append((movement_index, movement_type)) class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList): - """Handle timestamped gaze status into a list.""" + """Handle timestamped gaze status into a list.""" - def __init__(self): + def __init__(self): + super().__init__(GazeStatus) - super().__init__(GazeStatus) class GazeMovementIdentifier(DataFeatures.PipelineStepObject): - """Abstract class to define what should provide a gaze movement identifier.""" + """Abstract class to define what should provide a gaze movement identifier.""" - # noinspection PyMissingConstructor - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): - pass + pass - @DataFeatures.PipelineStepMethod - def identify(self, timestamped_gaze_position: GazePosition, terminate:bool=False) -> GazeMovement: - """Identify gaze movement from successive timestamped gaze positions. + @DataFeatures.PipelineStepMethod + def identify(self, timestamped_gaze_position: GazePosition, terminate: bool = False) -> GazeMovement: + """Identify gaze movement from successive timestamped gaze positions. - !!! warning "Mandatory" - Each identified gaze movement have to share its first/last gaze position with previous/next gaze movement. + !!! warning "Mandatory" + Each identified gaze movement have to share its first/last gaze position with previous/next gaze movement. - Parameters: - timestamped_gaze_position: new gaze position from where identification have to be done considering former gaze positions. - terminate: allows to notify identification algorithm that given gaze position will be the last one. - - Returns: - gaze_movement: identified gaze movement once it is finished otherwise it returns empty gaze movement at least. - """ + Parameters: + timestamped_gaze_position: new gaze position from where identification have to be done considering former gaze positions. + terminate: allows to notify identification algorithm that given gaze position will be the last one. + + Returns: + gaze_movement: identified gaze movement once it is finished otherwise it returns empty gaze movement at least. + """ - raise NotImplementedError('identify() method not implemented') + raise NotImplementedError('identify() method not implemented') - def current_gaze_movement(self) -> GazeMovement: - """Get the current identified gaze movement (finished or in progress) if it exists otherwise, an empty gaze movement.""" + def current_gaze_movement(self) -> GazeMovement: + """Get the current identified gaze movement (finished or in progress) if it exists otherwise, an empty gaze movement.""" - raise NotImplementedError('current_gaze_movement getter not implemented') + raise NotImplementedError('current_gaze_movement getter not implemented') - def current_fixation(self) -> Fixation: - """Get the current identified fixation (finished or in progress) if it exists otherwise, an empty gaze movement.""" + def current_fixation(self) -> Fixation: + """Get the current identified fixation (finished or in progress) if it exists otherwise, an empty gaze movement.""" - raise NotImplementedError('current_fixation getter not implemented') + raise NotImplementedError('current_fixation getter not implemented') - def current_saccade(self) -> Saccade: - """Get the current identified saccade (finished or in progress) if it exists otherwise, an empty gaze movement.""" + def current_saccade(self) -> Saccade: + """Get the current identified saccade (finished or in progress) if it exists otherwise, an empty gaze movement.""" - raise NotImplementedError('current_saccade getter not implemented') + raise NotImplementedError('current_saccade getter not implemented') - def browse(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[TimeStampedGazeMovements, TimeStampedGazeMovements, TimeStampedGazeStatus]: - """Identify fixations and saccades browsing timestamped gaze positions. + def browse(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[ + TimeStampedGazeMovements, TimeStampedGazeMovements, TimeStampedGazeStatus]: + """Identify fixations and saccades browsing timestamped gaze positions. - Returns: - timestamped_fixations: all fixations stored by timestamped. - timestamped_saccades: all saccades stored by timestamped. - timestamped_gaze_status: all gaze status stored by timestamped. - """ + Returns: + timestamped_fixations: all fixations stored by timestamped. + timestamped_saccades: all saccades stored by timestamped. + timestamped_gaze_status: all gaze status stored by timestamped. + """ - assert(type(ts_gaze_positions) == TimeStampedGazePositions) + assert (type(ts_gaze_positions) == TimeStampedGazePositions) - ts_fixations = TimeStampedGazeMovements() - ts_saccades = TimeStampedGazeMovements() - ts_status = TimeStampedGazeStatus() + ts_fixations = TimeStampedGazeMovements() + ts_saccades = TimeStampedGazeMovements() + ts_status = TimeStampedGazeStatus() - # Get last ts to terminate identification on last gaze position - last_ts = ts_gaze_positions[-1].timestamp + # Get last ts to terminate identification on last gaze position + last_ts = ts_gaze_positions[-1].timestamp - # Iterate on gaze positions - for gaze_position in ts_gaze_positions: + # Iterate on gaze positions + for gaze_position in ts_gaze_positions: - gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts)) + gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts)) - if gaze_movement: + if gaze_movement: - # First gaze movement position is always shared with previous gaze movement - for movement_position in gaze_movement: + # First gaze movement position is always shared with previous gaze movement + for movement_position in gaze_movement: - # Is a status already exist for this position? - gaze_status = ts_status.look_for(movement_position.timestamp) + # Is a status already exist for this position? + gaze_status = ts_status.look_for(movement_position.timestamp) - if not gaze_status: - - gaze_status = GazeStatus(movement_position) - ts_status.append(gaze_status) + if not gaze_status: + gaze_status = GazeStatus(movement_position) + ts_status.append(gaze_status) - gaze_status.append(len(ts_fixations), type(gaze_movement)) + gaze_status.append(len(ts_fixations), type(gaze_movement)) - # Store gaze movement into the appropriate list - if is_fixation(gaze_movement): + # Store gaze movement into the appropriate list + if is_fixation(gaze_movement): - ts_fixations.append(gaze_movement) + ts_fixations.append(gaze_movement) - elif is_saccade(gaze_movement): + elif is_saccade(gaze_movement): - ts_saccades.append(gaze_movement) + ts_saccades.append(gaze_movement) - return ts_fixations, ts_saccades, ts_status + return ts_fixations, ts_saccades, ts_status - def __call__(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[int|float, GazeMovement]: - """GazeMovement generator. + def __call__(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[int | float, GazeMovement]: + """GazeMovement generator. - Parameters: - ts_gaze_positions: timestamped gaze positions to process. + Parameters: + ts_gaze_positions: timestamped gaze positions to process. - Returns: - timestamp: first gaze position date of identified gaze movement - gaze_movement: identified gaze movement once it is finished - """ + Returns: + timestamp: first gaze position date of identified gaze movement + gaze_movement: identified gaze movement once it is finished + """ - assert(type(ts_gaze_positions) == TimeStampedGazePositions) + assert (type(ts_gaze_positions) == TimeStampedGazePositions) - # Get last ts to terminate identification on last gaze position - last_ts = ts_gaze_positions[-1] + # Get last ts to terminate identification on last gaze position + last_ts = ts_gaze_positions[-1] - # Iterate on gaze positions - for gaze_position in ts_gaze_positions: + # Iterate on gaze positions + for gaze_position in ts_gaze_positions: - gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts)) + gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts)) - if gaze_movement: + if gaze_movement: + yield gaze_movement - yield gaze_movement class ScanStepError(Exception): - """Exception raised at ScanStep creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade.""" + """Exception raised at ScanStep creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade.""" - def __init__(self, message): + def __init__(self, message): + super().__init__(message) - super().__init__(message) class ScanStep(): - """Define a scan step as a fixation and a consecutive saccade. + """Define a scan step as a fixation and a consecutive saccade. - Parameters: - first_fixation: a fixation that comes before the next saccade. - last_saccade: a saccade that comes after the previous fixation. - - !!! warning - Scan step have to start by a fixation and then end by a saccade. - """ + Parameters: + first_fixation: a fixation that comes before the next saccade. + last_saccade: a saccade that comes after the previous fixation. + + !!! warning + Scan step have to start by a fixation and then end by a saccade. + """ - def __init__(self, first_fixation: Fixation, last_saccade: Saccade): + def __init__(self, first_fixation: Fixation, last_saccade: Saccade): - self.__first_fixation = first_fixation - self.__last_saccade = last_saccade + self.__first_fixation = first_fixation + self.__last_saccade = last_saccade - # First movement have to be a fixation - if not is_fixation(self.__first_fixation): + # First movement have to be a fixation + if not is_fixation(self.__first_fixation): + raise ScanStepError('First step movement is not a fixation') - raise ScanStepError('First step movement is not a fixation') + # Last movement have to be a saccade + if not is_saccade(self.__last_saccade): + raise ScanStepError('Last step movement is not a saccade') - # Last movement have to be a saccade - if not is_saccade(self.__last_saccade): - - raise ScanStepError('Last step movement is not a saccade') + @property + def first_fixation(self): + """Get scan step first fixation.""" + return self.__first_fixation - @property - def first_fixation(self): - """Get scan step first fixation.""" - return self.__first_fixation + @property + def last_saccade(self): + """Get scan step last saccade.""" + return self.__last_saccade - @property - def last_saccade(self): - """Get scan step last saccade.""" - return self.__last_saccade - - @property - def fixation_duration(self) -> int|float: - """Time spent on AOI + @property + def fixation_duration(self) -> int | float: + """Time spent on AOI - Returns: - fixation duration - """ + Returns: + fixation duration + """ - return self.__first_fixation.duration + return self.__first_fixation.duration - @property - def duration(self) -> int|float: - """Time spent on AOI and time spent to go to next AOI + @property + def duration(self) -> int | float: + """Time spent on AOI and time spent to go to next AOI - Returns: - duration - """ + Returns: + duration + """ - return self.__first_fixation.duration + self.__last_saccade.duration + return self.__first_fixation.duration + self.__last_saccade.duration -class ScanPath(list): - """List of scan steps.""" - def __init__(self, duration_max: int|float = 0): - - super().__init__() +class ScanPath(list): + """List of scan steps.""" - self.__duration_max = duration_max - self.__last_fixation = None - self.__duration = 0 + def __init__(self, duration_max: int | float = 0): - @property - def duration_max(self) -> float: - """Duration from which older scan steps are removed each time new scan steps are added. 0 means no maximal duration.""" - return self.__duration_max + super().__init__() - @duration_max.setter - def duration_max(self, duration_max: float): + self.__duration_max = duration_max + self.__last_fixation = None + self.__duration = 0 - self.__duration_max = duration_max + @property + def duration_max(self) -> float: + """Duration from which older scan steps are removed each time new scan steps are added. 0 means no maximal duration.""" + return self.__duration_max - @property - def duration(self) -> int|float: - """Sum of all scan steps duration + @duration_max.setter + def duration_max(self, duration_max: float): - Returns: - duration - """ + self.__duration_max = duration_max - return self.__duration + @property + def duration(self) -> int | float: + """Sum of all scan steps duration - def __check_duration(self): - """Constrain path duration to maximal duration.""" + Returns: + duration + """ - if self.__duration_max > 0: + return self.__duration - while self.__duration > self.__duration_max: + def __check_duration(self): + """Constrain path duration to maximal duration.""" - oldest_step = self.pop(0) + if self.__duration_max > 0: - self.__duration -= oldest_step.duration + while self.__duration > self.__duration_max: + oldest_step = self.pop(0) - def append_saccade(self, saccade) -> ScanStep: - """Append new saccade to scan path and return last new scan step if one have been created.""" + self.__duration -= oldest_step.duration - # Ignore saccade if no fixation came before - if self.__last_fixation != None: + def append_saccade(self, saccade) -> ScanStep: + """Append new saccade to scan path and return last new scan step if one have been created.""" - try: + # Ignore saccade if no fixation came before + if self.__last_fixation != None: - # Edit new step - new_step = ScanStep(self.__last_fixation, saccade) + try: - # Append new step - super().append(new_step) + # Edit new step + new_step = ScanStep(self.__last_fixation, saccade) - # Update duration - self.__duration += new_step.duration + # Append new step + super().append(new_step) - # Constrain path duration to maximal duration - self.__check_duration() + # Update duration + self.__duration += new_step.duration - # Return new step - return new_step + # Constrain path duration to maximal duration + self.__check_duration() - finally: + # Return new step + return new_step - # Clear last fixation - self.__last_fixation = None + finally: - def append_fixation(self, fixation): - """Append new fixation to scan path. - !!! warning - Consecutive fixations are ignored keeping the last fixation""" + # Clear last fixation + self.__last_fixation = None - self.__last_fixation = fixation + def append_fixation(self, fixation): + """Append new fixation to scan path. + !!! warning + Consecutive fixations are ignored keeping the last fixation""" - def draw(self, image: numpy.array, draw_fixations: dict = None, draw_saccades: dict = None, deepness: int = 0): - """Draw scan path into image. + self.__last_fixation = fixation - Parameters: - image: where to draw - draw_fixations: Fixation.draw parameters (which depends on the loaded gaze movement identifier module, - if None, no fixation is drawn) - draw_saccades: Saccade.draw parameters (which depends on the loaded gaze movement identifier module, - if None, no saccade is drawn) - deepness: number of steps back to draw - """ + def draw(self, image: numpy.array, draw_fixations: dict = None, draw_saccades: dict = None, deepness: int = 0): + """Draw scan path into image. - for step in self[-deepness:]: + Parameters: + image: where to draw + draw_fixations: Fixation.draw parameters (which depends on the loaded gaze movement identifier module, + if None, no fixation is drawn) + draw_saccades: Saccade.draw parameters (which depends on the loaded gaze movement identifier module, + if None, no saccade is drawn) + deepness: number of steps back to draw + """ - # Draw fixation if required - if draw_fixations is not None: + for step in self[-deepness:]: - step.first_fixation.draw(image, **draw_fixations) + # Draw fixation if required + if draw_fixations is not None: + step.first_fixation.draw(image, **draw_fixations) - # Draw saccade if required - if draw_saccades is not None: + # Draw saccade if required + if draw_saccades is not None: + step.last_saccade.draw(image, **draw_saccades) - step.last_saccade.draw(image, **draw_saccades) class ScanPathAnalyzer(DataFeatures.PipelineStepObject): - """Abstract class to define what should provide a scan path analyzer.""" + """Abstract class to define what should provide a scan path analyzer.""" - # noinspection PyMissingConstructor - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if + isinstance(value, property) and value.fset is None] - self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if isinstance(value, property) and value.fset is None] + def analysis(self) -> DataFeatures.DataDictionary: + """Get all scan path analyzer analysis as data dictionary.""" - def analysis(self) -> DataFeatures.DataDictionary: - """Get all scan path analyzer analysis as data dictionary.""" + return DataFeatures.DataDictionary({a: getattr(self, a) for a in self.__analysis}) - return DataFeatures.DataDictionary( {a: getattr(self, a) for a in self.__analysis} ) + @DataFeatures.PipelineStepMethod + def analyze(self, scan_path: ScanPath): + """Analyze scan path.""" - @DataFeatures.PipelineStepMethod - def analyze(self, scan_path: ScanPath): - """Analyze scan path.""" + raise NotImplementedError('analyze() method not implemented') - raise NotImplementedError('analyze() method not implemented') class AOIMatcher(DataFeatures.PipelineStepObject): - """Abstract class to define what should provide an AOI matcher algorithm.""" + """Abstract class to define what should provide an AOI matcher algorithm.""" - # noinspection PyMissingConstructor - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + self.__exclude = [] - self.__exclude = [] + @property + def exclude(self): + """List of AOI to exclude from matching.""" + return self.__exclude - @property - def exclude(self): - """List of AOI to exclude from matching.""" - return self.__exclude + @exclude.setter + def exclude(self, exclude: list[str]): + self.__exclude = exclude - @exclude.setter - def exclude(self, exclude: list[str]): + def match(self, gaze_movement: GazeMovement, aoi_scene: AOIFeatures.AOIScene) -> tuple[ + str, AOIFeatures.AreaOfInterest]: + """Which AOI is looked in the scene?""" - self.__exclude = exclude - - def match(self, gaze_movement: GazeMovement, aoi_scene: AOIFeatures.AOIScene) -> tuple[str, AOIFeatures.AreaOfInterest]: - """Which AOI is looked in the scene?""" + raise NotImplementedError('match() method not implemented') - raise NotImplementedError('match() method not implemented') + def draw(self, image: numpy.array, aoi_scene: AOIFeatures.AOIScene): + """Draw matching into image. + + Parameters: + image: where to draw + aoi_scene: to refresh looked aoi if required + """ - def draw(self, image: numpy.array, aoi_scene: AOIFeatures.AOIScene): - """Draw matching into image. - - Parameters: - image: where to draw - aoi_scene: to refresh looked aoi if required - """ + raise NotImplementedError('draw() method not implemented') - raise NotImplementedError('draw() method not implemented') + def looked_aoi(self) -> AOIFeatures.AreaOfInterest: + """Get most likely looked aoi.""" - def looked_aoi(self) -> AOIFeatures.AreaOfInterest: - """Get most likely looked aoi.""" + raise NotImplementedError('looked_aoi() method not implemented') - raise NotImplementedError('looked_aoi() method not implemented') + def looked_aoi_name(self) -> str: + """Get most likely looked aoi name.""" + raise NotImplementedError('looked_aoi_name() method not implemented') - def looked_aoi_name(self) -> str: - """Get most likely looked aoi name.""" - raise NotImplementedError('looked_aoi_name() method not implemented') class AOIScanStepError(Exception): - """ - Exception raised at AOIScanStep creation if an aoi scan step doesn't start by a fixation or - doesn't end by a saccade. - """ + """ + Exception raised at AOIScanStep creation if an aoi scan step doesn't start by a fixation or + doesn't end by a saccade. + """ - def __init__(self, message, aoi=''): + def __init__(self, message, aoi=''): + super().__init__(message) - super().__init__(message) + self.aoi = aoi - self.aoi = aoi class AOIScanStep(): - """Define an aoi scan step as a set of successive gaze movements onto a same AOI. - - Parameters: - movements: all movements over an AOI and the last saccade that comes out. - aoi: AOI name - letter: AOI unique letter to ease sequence analysis. - - !!! warning - Aoi scan step have to start by a fixation and then end by a saccade. - """ - - def __init__(self, movements: TimeStampedGazeMovements, aoi: str = '', letter: str = ''): - - self.__movements = movements - self.__aoi = aoi - self.__letter = letter - - # First movement have to be a fixation - if not is_fixation(self.first_fixation): - - raise AOIScanStepError('First step movement is not a fixation', self.aoi) - - # Last movement have to be a saccade - if not is_saccade(self.last_saccade): - - raise AOIScanStepError('Last step movement is not a saccade', self.aoi) - - @property - def movements(self): - """Get AOI scan step movements.""" - return self.__movements - - @property - def aoi(self): - """Get AOI scan step aoi.""" - return self.__aoi - - @property - def letter(self): - """Get AOI scan step letter.""" - return self.__letter - - @property - def first_fixation(self): - """First fixation on AOI.""" - return self.movements[0] - - @property - def last_saccade(self): - """Last saccade that comes out AOI.""" - return self.movements[-1] - - @property - def fixation_duration(self) -> int|float: - """Time spent on AOI - - Returns: - fixation duration - """ - return self.last_saccade[0].timestamp - self.first_fixation[0].timestamp - - @property - def duration(self) -> int|float: - """Time spent on AOI and time spent to go to next AOI - - Returns: - duration - """ - return self.last_saccade[-1].timestamp - self.first_fixation[0].timestamp + """Define an aoi scan step as a set of successive gaze movements onto a same AOI. + + Parameters: + movements: all movements over an AOI and the last saccade that comes out. + aoi: AOI name + letter: AOI unique letter to ease sequence analysis. + + !!! warning + Aoi scan step have to start by a fixation and then end by a saccade. + """ + + def __init__(self, movements: TimeStampedGazeMovements, aoi: str = '', letter: str = ''): + + self.__movements = movements + self.__aoi = aoi + self.__letter = letter + + # First movement have to be a fixation + if not is_fixation(self.first_fixation): + raise AOIScanStepError('First step movement is not a fixation', self.aoi) + + # Last movement have to be a saccade + if not is_saccade(self.last_saccade): + raise AOIScanStepError('Last step movement is not a saccade', self.aoi) + + @property + def movements(self): + """Get AOI scan step movements.""" + return self.__movements + + @property + def aoi(self): + """Get AOI scan step aoi.""" + return self.__aoi + + @property + def letter(self): + """Get AOI scan step letter.""" + return self.__letter + + @property + def first_fixation(self): + """First fixation on AOI.""" + return self.movements[0] + + @property + def last_saccade(self): + """Last saccade that comes out AOI.""" + return self.movements[-1] + + @property + def fixation_duration(self) -> int | float: + """Time spent on AOI + + Returns: + fixation duration + """ + return self.last_saccade[0].timestamp - self.first_fixation[0].timestamp + + @property + def duration(self) -> int | float: + """Time spent on AOI and time spent to go to next AOI + + Returns: + duration + """ + return self.last_saccade[-1].timestamp - self.first_fixation[0].timestamp + # Define strings for outside AOI case OutsideAOI = 'GazeFeatures.OutsideAOI' -class AOIScanPath(list): - """List of aoi scan steps over successive aoi.""" - def __init__(self, expected_aoi: list[str] = [], duration_max: int|float = 0): +class AOIScanPath(list): + """List of aoi scan steps over successive aoi.""" - super().__init__() + def __init__(self, expected_aoi: list[str] = [], duration_max: int | float = 0): - self.__expected_aoi = expected_aoi - self.__duration_max = duration_max - self.__duration = 0 + super().__init__() - self.clear() + self.__expected_aoi = expected_aoi + self.__duration_max = duration_max + self.__duration = 0 - @property - def expected_aoi(self): - """List of all expected aoi.""" + self.clear() - return self.__expected_aoi + @property + def expected_aoi(self): + """List of all expected aoi.""" - @expected_aoi.setter - def expected_aoi(self, expected_aoi: list[str] = []): - """Edit list of all expected aoi. + return self.__expected_aoi - !!! warning - This will clear the AOIScanPath - """ + @expected_aoi.setter + def expected_aoi(self, expected_aoi: list[str] = []): + """Edit list of all expected aoi. - # Check expected aoi are not the same as previous ones - if len(expected_aoi) == len(self.__expected_aoi[1:]): + !!! warning + This will clear the AOIScanPath + """ - equal = [a == b for a, b in zip(expected_aoi, self.__expected_aoi[1:])] + # Check expected aoi are not the same as previous ones + if len(expected_aoi) == len(self.__expected_aoi[1:]): - if all(equal): + equal = [a == b for a, b in zip(expected_aoi, self.__expected_aoi[1:])] - return - - # Otherwise, update expected aoi - self.__expected_aoi = [OutsideAOI] - self.__expected_aoi += expected_aoi + if all(equal): + return - self.clear() + # Otherwise, update expected aoi + self.__expected_aoi = [OutsideAOI] + self.__expected_aoi += expected_aoi - @property - def duration_max(self) -> float: - """Duration from which older aoi scan steps are removed each time new aoi scan steps are added. 0 means no maximal duration.""" - return self.__duration_max + self.clear() - @duration_max.setter - def duration_max(self, duration_max: float): + @property + def duration_max(self) -> float: + """Duration from which older aoi scan steps are removed each time new aoi scan steps are added. 0 means no maximal duration.""" + return self.__duration_max - self.__duration_max = duration_max + @duration_max.setter + def duration_max(self, duration_max: float): - @property - def duration(self) -> float: - """Sum of all scan steps duration""" + self.__duration_max = duration_max - return self.__duration + @property + def duration(self) -> float: + """Sum of all scan steps duration""" - def __check_duration(self): - """Constrain path duration to maximal duration.""" + return self.__duration - if self.__duration_max > 0: + def __check_duration(self): + """Constrain path duration to maximal duration.""" - while self.__duration > self.__duration_max: + if self.__duration_max > 0: - oldest_step = self.pop(0) + while self.__duration > self.__duration_max: - self.__duration -= oldest_step.duration + oldest_step = self.pop(0) - # Edit transition matrix - if len(self) > 0: + self.__duration -= oldest_step.duration - # Decrement [index: source, columns: destination] value - self.__transition_matrix.loc[oldest_step.aoi, self[0].aoi,] -= 1 + # Edit transition matrix + if len(self) > 0: + # Decrement [index: source, columns: destination] value + self.__transition_matrix.loc[oldest_step.aoi, self[0].aoi,] -= 1 - def clear(self): - """Clear aoi scan steps list, letter sequence and transition matrix.""" + def clear(self): + """Clear aoi scan steps list, letter sequence and transition matrix.""" - super().clear() + super().clear() - # noinspection PyAttributeOutsideInit - self.__movements = TimeStampedGazeMovements() - # noinspection PyAttributeOutsideInit - self.__current_aoi = '' - # noinspection PyAttributeOutsideInit - self.__index = ord('A') - # noinspection PyAttributeOutsideInit - self.__aoi_letter = {} - # noinspection PyAttributeOutsideInit - self.__letter_aoi = {} + # noinspection PyAttributeOutsideInit + self.__movements = TimeStampedGazeMovements() + # noinspection PyAttributeOutsideInit + self.__current_aoi = '' + # noinspection PyAttributeOutsideInit + self.__index = ord('A') + # noinspection PyAttributeOutsideInit + self.__aoi_letter = {} + # noinspection PyAttributeOutsideInit + self.__letter_aoi = {} - size = len(self.__expected_aoi) - # noinspection PyAttributeOutsideInit - self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi, columns=self.__expected_aoi) + size = len(self.__expected_aoi) + # noinspection PyAttributeOutsideInit + self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi, + columns=self.__expected_aoi) - def __get_aoi_letter(self, aoi): + def __get_aoi_letter(self, aoi): - try : + try: - return self.__aoi_letter[aoi] + return self.__aoi_letter[aoi] - except KeyError: + except KeyError: - letter = chr(self.__index) - self.__aoi_letter[aoi] = letter - self.__index += 1 - return letter + letter = chr(self.__index) + self.__aoi_letter[aoi] = letter + self.__index += 1 + return letter - def get_letter_aoi(self, letter): - """Get which aoi is related to a unique letter.""" + def get_letter_aoi(self, letter): + """Get which aoi is related to a unique letter.""" - return self.__letter_aoi[letter] + return self.__letter_aoi[letter] - @property - def letter_sequence(self) -> str: - """Convert aoi scan path into a string with unique letter per aoi step.""" + @property + def letter_sequence(self) -> str: + """Convert aoi scan path into a string with unique letter per aoi step.""" - sequence = '' - for step in self: - sequence += step.letter + sequence = '' + for step in self: + sequence += step.letter - return sequence - - @property - def current_aoi(self): - """AOI name of aoi scan step under construction""" + return sequence - return self.__current_aoi + @property + def current_aoi(self): + """AOI name of aoi scan step under construction""" - @property - def transition_matrix(self) -> pandas.DataFrame: - """[Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) where indexes are transition departures and columns are transition destinations.""" + return self.__current_aoi - return self.__transition_matrix + @property + def transition_matrix(self) -> pandas.DataFrame: + """[Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) where indexes are transition departures and columns are transition destinations.""" - def append_saccade(self, saccade): - """Append new saccade to aoi scan path.""" + return self.__transition_matrix - # Ignore saccade if no fixation have been stored before - if len(self.__movements) > 0: + def append_saccade(self, saccade): + """Append new saccade to aoi scan path.""" - self.__movements.append(saccade) + # Ignore saccade if no fixation have been stored before + if len(self.__movements) > 0: + self.__movements.append(saccade) - def append_fixation(self, fixation, looked_aoi: str): - """Append new fixation to aoi scan path and return last new aoi scan step if one have been created. + def append_fixation(self, fixation, looked_aoi: str): + """Append new fixation to aoi scan path and return last new aoi scan step if one have been created. - !!! warning - It could raise AOIScanStepError - """ + !!! warning + It could raise AOIScanStepError + """ - # Replace None aoi by generic OutsideAOI name - if looked_aoi is None: + # Replace None aoi by generic OutsideAOI name + if looked_aoi is None: - looked_aoi = OutsideAOI + looked_aoi = OutsideAOI - # Raise error when aoi is not expected - elif looked_aoi not in self.__expected_aoi: + # Raise error when aoi is not expected + elif looked_aoi not in self.__expected_aoi: - raise AOIScanStepError('AOI not expected', looked_aoi) + raise AOIScanStepError('AOI not expected', looked_aoi) - # Is it fixation onto a new aoi? - if looked_aoi != self.__current_aoi and len(self.__movements) > 0: + # Is it fixation onto a new aoi? + if looked_aoi != self.__current_aoi and len(self.__movements) > 0: - try: + try: - # Edit unique letter per aoi - letter = self.__get_aoi_letter(self.__current_aoi) + # Edit unique letter per aoi + letter = self.__get_aoi_letter(self.__current_aoi) - # Remember which letter identify which aoi - self.__letter_aoi[letter] = self.__current_aoi + # Remember which letter identify which aoi + self.__letter_aoi[letter] = self.__current_aoi - # Edit new step - new_step = AOIScanStep(self.__movements, self.__current_aoi, letter) + # Edit new step + new_step = AOIScanStep(self.__movements, self.__current_aoi, letter) - # Edit transition matrix - if len(self) > 0: + # Edit transition matrix + if len(self) > 0: + # Increment [index: source, columns: destination] value + self.__transition_matrix.loc[self[-1].aoi, self.__current_aoi,] += 1 - # Increment [index: source, columns: destination] value - self.__transition_matrix.loc[self[-1].aoi, self.__current_aoi,] += 1 + # Append new step + super().append(new_step) - # Append new step - super().append(new_step) + # Update duration + self.__duration += new_step.duration - # Update duration - self.__duration += new_step.duration + # Constrain path duration to maximal duration + self.__check_duration() - # Constrain path duration to maximal duration - self.__check_duration() + # Return new step + return new_step - # Return new step - return new_step + finally: - finally: + # Clear movements + # noinspection PyAttributeOutsideInit + self.__movements = TimeStampedGazeMovements() - # Clear movements - # noinspection PyAttributeOutsideInit - self.__movements = TimeStampedGazeMovements() + # Append new fixation + self.__movements.append(fixation) - # Append new fixation - self.__movements.append(fixation) + # Remember new aoi + self.__current_aoi = looked_aoi + else: - # Remember new aoi - self.__current_aoi = looked_aoi - else: + # Append new fixation + self.__movements.append(fixation) - # Append new fixation - self.__movements.append(fixation) + # Remember aoi + # noinspection PyAttributeOutsideInit + self.__current_aoi = looked_aoi - # Remember aoi - # noinspection PyAttributeOutsideInit - self.__current_aoi = looked_aoi + return None - return None + def fixations_count(self): + """Get how many fixations are there in the scan path and how many fixation are there in each aoi.""" - def fixations_count(self): - """Get how many fixations are there in the scan path and how many fixation are there in each aoi.""" + scan_fixations_count = 0 + aoi_fixations_count = {aoi: 0 for aoi in self.__expected_aoi} - scan_fixations_count = 0 - aoi_fixations_count = {aoi: 0 for aoi in self.__expected_aoi} + for aoi_scan_step in self: + step_fixations_count = len(aoi_scan_step.movements) - 1 # -1: to ignore last saccade - for aoi_scan_step in self: + scan_fixations_count += step_fixations_count + aoi_fixations_count[aoi_scan_step.aoi] += step_fixations_count - step_fixations_count = len(aoi_scan_step.movements) - 1 # -1: to ignore last saccade + return scan_fixations_count, aoi_fixations_count - scan_fixations_count += step_fixations_count - aoi_fixations_count[aoi_scan_step.aoi] += step_fixations_count - - return scan_fixations_count, aoi_fixations_count class AOIScanPathAnalyzer(DataFeatures.PipelineStepObject): - """Abstract class to define what should provide an aoi scan path analyzer.""" - - # noinspection PyMissingConstructor - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): + """Abstract class to define what should provide an aoi scan path analyzer.""" - self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if isinstance(value, property) and value.fset is None] + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if + isinstance(value, property) and value.fset is None] - def analysis(self) -> DataFeatures.DataDictionary: - """Get all aoi scan path analyzer analysis as data dictionary.""" + def analysis(self) -> DataFeatures.DataDictionary: + """Get all aoi scan path analyzer analysis as data dictionary.""" - return DataFeatures.DataDictionary( {a: getattr(self, a) for a in self.__analysis} ) + return DataFeatures.DataDictionary({a: getattr(self, a) for a in self.__analysis}) - @DataFeatures.PipelineStepMethod - def analyze(self, aoi_scan_path: AOIScanPath): - """Analyze aoi scan path.""" + @DataFeatures.PipelineStepMethod + def analyze(self, aoi_scan_path: AOIScanPath): + """Analyze aoi scan path.""" - raise NotImplementedError('analyze() method not implemented') + raise NotImplementedError('analyze() method not implemented') diff --git a/src/argaze/PupilAnalysis/WorkloadIndex.py b/src/argaze/PupilAnalysis/WorkloadIndex.py index bced982..00995e9 100644 --- a/src/argaze/PupilAnalysis/WorkloadIndex.py +++ b/src/argaze/PupilAnalysis/WorkloadIndex.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/PupilFeatures.py b/src/argaze/PupilFeatures.py index c38d10a..e73176d 100644 --- a/src/argaze/PupilFeatures.py +++ b/src/argaze/PupilFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/__main__.py b/src/argaze/__main__.py index 15a78c1..3e1fe9e 100644 --- a/src/argaze/__main__.py +++ b/src/argaze/__main__.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py index 133809b..c04d20a 100644 --- a/src/argaze/utils/UtilsFeatures.py +++ b/src/argaze/utils/UtilsFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/utils/aruco_markers_group_export.py b/src/argaze/utils/aruco_markers_group_export.py index 46507b8..569ba6b 100644 --- a/src/argaze/utils/aruco_markers_group_export.py +++ b/src/argaze/utils/aruco_markers_group_export.py @@ -10,7 +10,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" @@ -27,199 +27,208 @@ from argaze import DataFeatures from argaze.ArUcoMarkers import ArUcoDetector, ArUcoOpticCalibrator, ArUcoMarkersGroup from argaze.utils import UtilsFeatures -def main(): - """ - Detect DICTIONARY and SIZE ArUco markers inside a MOVIE frame then, export detected ArUco markers group as .obj file into an OUTPUT folder. - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('movie', metavar='MOVIE', type=str, default=None, help='movie path') - parser.add_argument('dictionary', metavar='DICTIONARY', type=str, default=None, help='expected ArUco markers dictionary') - parser.add_argument('size', metavar='SIZE', type=float, default=None, help='expected ArUco markers size (in cm)') - - parser.add_argument('-p', '--parameters', metavar='PARAMETERS', type=str, default=None, help='ArUco detector parameters file') - parser.add_argument('-op', '--optic_parameters', metavar='OPTIC_PARAMETERS', type=str, default=None, help='ArUco detector optic parameters file') - - parser.add_argument('-s', '--start', metavar='START', type=float, default=0., help='start time in second') - parser.add_argument('-o', '--output', metavar='OUTPUT', type=str, default='.', help='export folder path') - parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') - args = parser.parse_args() - - # Load movie - video_capture = cv2.VideoCapture(args.movie) +def main(): + """ + Detect DICTIONARY and SIZE ArUco markers inside a MOVIE frame then, export detected ArUco markers group as .obj file into an OUTPUT folder. + """ - video_fps = video_capture.get(cv2.CAP_PROP_FPS) - image_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)) - image_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) + # Manage arguments + parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) + parser.add_argument('movie', metavar='MOVIE', type=str, default=None, help='movie path') + parser.add_argument('dictionary', metavar='DICTIONARY', type=str, default=None, + help='expected ArUco markers dictionary') + parser.add_argument('size', metavar='SIZE', type=float, default=None, help='expected ArUco markers size (in cm)') - # Edit ArUco detector configuration - configuration = { - "dictionary": args.dictionary - } + parser.add_argument('-p', '--parameters', metavar='PARAMETERS', type=str, default=None, + help='ArUco detector parameters file') + parser.add_argument('-op', '--optic_parameters', metavar='OPTIC_PARAMETERS', type=str, default=None, + help='ArUco detector optic parameters file') - if args.parameters: + parser.add_argument('-s', '--start', metavar='START', type=float, default=0., help='start time in second') + parser.add_argument('-o', '--output', metavar='OUTPUT', type=str, default='.', help='export folder path') + parser.add_argument('-v', '--verbose', action='store_true', default=False, + help='enable verbose mode to print information in console') - configuration["parameters"] = args.parameters + args = parser.parse_args() - if args.optic_parameters: + # Load movie + video_capture = cv2.VideoCapture(args.movie) - configuration["optic_parameters"] = args.optic_parameters + video_fps = video_capture.get(cv2.CAP_PROP_FPS) + image_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)) + image_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) - # Load ArUco detector configuration - aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration) + # Edit ArUco detector configuration + configuration = { + "dictionary": args.dictionary + } - if args.verbose: + if args.parameters: + configuration["parameters"] = args.parameters - print(aruco_detector) + if args.optic_parameters: + configuration["optic_parameters"] = args.optic_parameters - # Create empty ArUco scene - aruco_markers_group = None + # Load ArUco detector configuration + aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration) - # Edit draw parameters - draw_parameters = { - "color": [255, 255, 255], - "draw_axes": { - "thickness": 4 - } - } + if args.verbose: + print(aruco_detector) - # Create a window - cv2.namedWindow("Export detected ArUco markers", cv2.WINDOW_AUTOSIZE) + # Create empty ArUco scene + aruco_markers_group = None - # Init image selection - current_image_index = -1 - _, current_image = video_capture.read() - next_image_index = int(args.start * video_fps) - refresh = False + # Edit draw parameters + draw_parameters = { + "color": [255, 255, 255], + "draw_axes": { + "thickness": 4 + } + } - # Waiting for 'ctrl+C' interruption - with contextlib.suppress(KeyboardInterrupt): + # Create a window + cv2.namedWindow("Export detected ArUco markers", cv2.WINDOW_AUTOSIZE) - while True: + # Init image selection + current_image_index = -1 + _, current_image = video_capture.read() + next_image_index = int(args.start * video_fps) + refresh = False - # Select a new image and detect markers once - if next_image_index != current_image_index or refresh: + # Waiting for 'ctrl+C' interruption + with contextlib.suppress(KeyboardInterrupt): - video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index) + while True: - success, video_image = video_capture.read() + # Select a new image and detect markers once + if next_image_index != current_image_index or refresh: - video_height, video_width, _ = video_image.shape + video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index) - # Create default optic parameters adapted to frame size - if aruco_detector.optic_parameters is None: + success, video_image = video_capture.read() - # Note: The choice of 1000 for default focal length should be discussed... - aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=video_width, height=video_height)) + video_height, video_width, _ = video_image.shape - if success: + # Create default optic parameters adapted to frame size + if aruco_detector.optic_parameters is None: + # Note: The choice of 1000 for default focal length should be discussed... + aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=( + video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), + width=video_width, height=video_height)) - # Refresh once - refresh = False + if success: - current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1 - current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC) + # Refresh once + refresh = False - try: + current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1 + current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC) - # Detect and project AR features - aruco_detector.detect_markers(video_image) + try: - # Estimate all detected markers pose - aruco_detector.estimate_markers_pose(args.size) + # Detect and project AR features + aruco_detector.detect_markers(video_image) - # Build aruco scene from detected markers - aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, aruco_detector.detected_markers()) + # Estimate all detected markers pose + aruco_detector.estimate_markers_pose(args.size) - # Detection succeeded - exception = None + # Build aruco scene from detected markers + aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, + aruco_detector.detected_markers()) - # Write errors - except Exception as e: + # Detection succeeded + exception = None - aruco_markers_group = None + # Write errors + except Exception as e: - exception = e + aruco_markers_group = None - # Draw detected markers - aruco_detector.draw_detected_markers(video_image, draw_parameters) + exception = e - # Write detected markers - cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', (20, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + # Draw detected markers + aruco_detector.draw_detected_markers(video_image, draw_parameters) - # Write timing - cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + # Write detected markers + cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', + (20, video_height - 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - # Write exception - if exception is not None: + # Write timing + cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), + cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + # Write exception + if exception is not None: + cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, + (0, 255, 255), 1, cv2.LINE_AA) - # Write documentation - cv2.putText(video_image, f'<- previous image', (video_width-500, video_height-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'-> next image', (video_width-500, video_height-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'r: reload config', (video_width-500, video_height-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width-500, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + # Write documentation + cv2.putText(video_image, f'<- previous image', (video_width - 500, video_height - 160), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'-> next image', (video_width - 500, video_height - 120), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'r: reload config', (video_width - 500, video_height - 80), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width - 500, video_height - 40), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - # Copy image - current_image = video_image.copy() + # Copy image + current_image = video_image.copy() - # Keep last image - else: + # Keep last image + else: - video_image = current_image.copy() + video_image = current_image.copy() - key_pressed = cv2.waitKey(10) + key_pressed = cv2.waitKey(10) - #if key_pressed != -1: - # print(key_pressed) + #if key_pressed != -1: + # print(key_pressed) - # Select previous image with left arrow - if key_pressed == 2: - next_image_index -= 1 + # Select previous image with left arrow + if key_pressed == 2: + next_image_index -= 1 - # Select next image with right arrow - if key_pressed == 3: - next_image_index += 1 + # Select next image with right arrow + if key_pressed == 3: + next_image_index += 1 - # Clip image index - if next_image_index < 0: - next_image_index = 0 + # Clip image index + if next_image_index < 0: + next_image_index = 0 - # r: reload configuration - if key_pressed == 114: + # r: reload configuration + if key_pressed == 114: + aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration) + refresh = True + print('Configuration reloaded') - aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration) - refresh = True - print('Configuration reloaded') + # Save selected marker edition using 'Ctrl + s' + if key_pressed == 19: - # Save selected marker edition using 'Ctrl + s' - if key_pressed == 19: + if aruco_markers_group: - if aruco_markers_group: + aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj') + print(f'ArUco markers saved into {args.output}') - aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj') - print(f'ArUco markers saved into {args.output}') + else: - else: + print(f'No ArUco markers to export') - print(f'No ArUco markers to export') + # Close window using 'Esc' key + if key_pressed == 27: + break - # Close window using 'Esc' key - if key_pressed == 27: - break + # Display video + cv2.imshow(aruco_detector.name, video_image) - # Display video - cv2.imshow(aruco_detector.name, video_image) + # Close movie capture + video_capture.release() - # Close movie capture - video_capture.release() + # Stop image display + cv2.destroyAllWindows() - # Stop image display - cv2.destroyAllWindows() if __name__ == '__main__': - - main()
\ No newline at end of file + main() diff --git a/src/argaze/utils/contexts/OpenCV.py b/src/argaze/utils/contexts/OpenCV.py index f89189d..20be1a4 100644 --- a/src/argaze/utils/contexts/OpenCV.py +++ b/src/argaze/utils/contexts/OpenCV.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" diff --git a/src/argaze/utils/contexts/PupilLabs.py b/src/argaze/utils/contexts/PupilLabs.py index 43fe47e..d814deb 100644 --- a/src/argaze/utils/contexts/PupilLabs.py +++ b/src/argaze/utils/contexts/PupilLabs.py @@ -1,5 +1,5 @@ """Handle network connection to Pupil Labs devices. Tested with Pupil Invisible. - Based on Pupil Labs' Realtime Python API. + Based on Pupil Labs' Realtime Python API. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Damien Mouratille" @@ -31,6 +31,7 @@ import cv2 from pupil_labs.realtime_api.simple import discover_one_device + class LiveStream(ArFeatures.ArContext): @DataFeatures.PipelineStepInit @@ -59,75 +60,71 @@ class LiveStream(ArFeatures.ArContext): logging.info('Device found. Stream loading.') # Open gaze stream - self.__gaze_thread = threading.Thread(target = self.__stream_gaze) + self.__gaze_thread = threading.Thread(target=self.__stream_gaze) logging.debug('> starting gaze thread...') self.__gaze_thread.start() - + # Open video stream - self.__video_thread = threading.Thread(target = self.__stream_video) + self.__video_thread = threading.Thread(target=self.__stream_video) logging.debug('> starting video thread...') self.__video_thread.start() - - return self - def __stream_gaze(self): """Stream gaze.""" logging.debug('Stream gaze from Pupil Device') while not self.__stop_event.is_set(): - + try: while True: gaze = self.__device.receive_gaze_datum() gaze_timestamp = int((gaze.timestamp_unix_seconds - self.__start_time) * 1e3) - + logging.debug('Gaze received at %i timestamp', gaze_timestamp) - + # When gaze position is valid if gaze.worn is True: - + self._process_gaze_position( - timestamp = gaze_timestamp, - x = int(gaze.x), - y = int(gaze.y)) + timestamp=gaze_timestamp, + x=int(gaze.x), + y=int(gaze.y)) else: # Process empty gaze position logging.debug('Not worn at %i timestamp', gaze_timestamp) - self._process_gaze_position(timestamp = gaze_timestamp) - + self._process_gaze_position(timestamp=gaze_timestamp) + except KeyboardInterrupt: pass - - + def __stream_video(self): """Stream video.""" logging.debug('Stream video from Pupil Device') while not self.__stop_event.is_set(): - + try: while True: scene_frame, frame_datetime = self.__device.receive_scene_video_frame() - + scene_timestamp = int((frame_datetime - self.__start_time) * 1e3) - + logging.debug('Video received at %i timestamp', scene_timestamp) - + self._process_camera_image( - timestamp = scene_timestamp, - image = scene_frame) - + timestamp=scene_timestamp, + image=scene_frame) + except KeyboardInterrupt: pass @@ -135,10 +132,10 @@ class LiveStream(ArFeatures.ArContext): def __exit__(self, exception_type, exception_value, exception_traceback): logging.debug('Pupil-Labs context stops...') - + # Close data stream self.__stop_event.set() - + # Stop streaming threading.Thread.join(self.__gaze_thread) - threading.Thread.join(self.__video_thread)
\ No newline at end of file + threading.Thread.join(self.__video_thread) diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py index 0fba2ff..0c2b8f9 100644 --- a/src/argaze/utils/contexts/TobiiProGlasses2.py +++ b/src/argaze/utils/contexts/TobiiProGlasses2.py @@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" @@ -75,13 +75,15 @@ DEFAULT_TOBII_IMAGE_PARAMETERS = { "draw_something": False } + # Define extra classes to support Tobii data parsing @dataclass class DirSig(): """Define dir sig data (dir sig).""" - dir: int # meaning ? - sig: int # meaning ? + dir: int # meaning ? + sig: int # meaning ? + @dataclass class PresentationTimeStamp(): @@ -90,6 +92,7 @@ class PresentationTimeStamp(): value: int """Pts value.""" + @dataclass class VideoTimeStamp(): """Define video time stamp (vts) data.""" @@ -100,20 +103,23 @@ class VideoTimeStamp(): offset: int """Primary time stamp value.""" + @dataclass class EventSynch(): """Define event synch (evts) data.""" - value: int # meaning ? + value: int # meaning ? """Evts value.""" + @dataclass class Event(): """Define event data (ets type tag).""" - ets: int # meaning ? + ets: int # meaning ? type: str - tag: str # dict ? + tag: str # dict ? + @dataclass class Accelerometer(): @@ -122,6 +128,7 @@ class Accelerometer(): value: numpy.array """Accelerometer value""" + @dataclass class Gyroscope(): """Define gyroscope data (gy).""" @@ -129,6 +136,7 @@ class Gyroscope(): value: numpy.array """Gyroscope value""" + @dataclass class PupilCenter(): """Define pupil center data (gidx pc eye).""" @@ -136,7 +144,8 @@ class PupilCenter(): validity: int index: int value: tuple[(float, float, float)] - eye: str # 'right' or 'left' + eye: str # 'right' or 'left' + @dataclass class PupilDiameter(): @@ -145,7 +154,8 @@ class PupilDiameter(): validity: int index: int value: float - eye: str # 'right' or 'left' + eye: str # 'right' or 'left' + @dataclass class GazeDirection(): @@ -154,7 +164,8 @@ class GazeDirection(): validity: int index: int value: tuple[(float, float, float)] - eye: str # 'right' or 'left' + eye: str # 'right' or 'left' + @dataclass class GazePosition(): @@ -162,9 +173,10 @@ class GazePosition(): validity: int index: int - l: str # ? + l: str # ? value: tuple[(float, float)] + @dataclass class GazePosition3D(): """Define gaze position 3D data (gidx gp3).""" @@ -173,6 +185,7 @@ class GazePosition3D(): index: int value: tuple[(float, float)] + @dataclass class MarkerPosition(): """Define marker data (marker3d marker2d).""" @@ -180,6 +193,7 @@ class MarkerPosition(): value_3d: tuple[(float, float, float)] value_2d: tuple[(float, float)] + class TobiiJsonDataParser(): def __init__(self): @@ -319,6 +333,7 @@ class TobiiJsonDataParser(): return MarkerPosition(data['marker3d'], data['marker2d']) + class LiveStream(ArFeatures.ArContext): @DataFeatures.PipelineStepInit @@ -343,14 +358,14 @@ class LiveStream(ArFeatures.ArContext): # Init protected attributes self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS} - + @property def address(self) -> str: """Network address where to find the device.""" return self.__address @address.setter - def address(self, address:str): + def address(self, address: str): self.__address = address @@ -358,7 +373,6 @@ class LiveStream(ArFeatures.ArContext): if "%" in self.__address: if sys.platform == "win32": - self.__address = self.__address.split("%")[0] # Define base url @@ -372,7 +386,7 @@ class LiveStream(ArFeatures.ArContext): self.__base_url = 'http://' + self.__address @property - def configuration(self)-> dict: + def configuration(self) -> dict: """Patch system configuration dictionary.""" return self.__configuration @@ -388,15 +402,14 @@ class LiveStream(ArFeatures.ArContext): return self.__project_name @project.setter - def project(self, project:str): - + def project(self, project: str): + self.__project_name = project def __bind_project(self): """Bind to a project or create one if it doesn't exist.""" if self.__project_name is None: - raise Exception(f'Project binding fails: setup project before.') self.__project_id = None @@ -409,7 +422,6 @@ class LiveStream(ArFeatures.ArContext): try: if project['pr_info']['Name'] == self.__project_name: - self.__project_id = project['pr_id'] logging.debug('> %s project already exist: %s', self.__project_name, self.__project_id) @@ -420,13 +432,12 @@ class LiveStream(ArFeatures.ArContext): # The project doesn't exist, create one if self.__project_id is None: - logging.debug('> %s project doesn\'t exist', self.__project_name) data = { - 'pr_info' : { + 'pr_info': { 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD), - 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)), + 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)), 'Name': self.__project_name }, 'pr_created': self.__get_current_datetime() @@ -439,12 +450,12 @@ class LiveStream(ArFeatures.ArContext): logging.debug('> new %s project created: %s', self.__project_name, self.__project_id) @property - def participant(self)-> str: + def participant(self) -> str: """Participant name""" return self.__participant_name @participant.setter - def participant(self, participant:str): + def participant(self, participant: str): self.__participant_name = participant @@ -456,13 +467,11 @@ class LiveStream(ArFeatures.ArContext): """ if self.__participant_name is None: - raise Exception(f'Participant binding fails: setup participant before.') - if self.__project_id is None : - + if self.__project_id is None: raise Exception(f'Participant binding fails: bind to a project before') - + self.__participant_id = None # Check if participant exist @@ -473,7 +482,6 @@ class LiveStream(ArFeatures.ArContext): try: if participant['pa_info']['Name'] == self.__participant_name: - self.__participant_id = participant['pa_id'] logging.debug('> %s participant already exist: %s', self.__participant_name, self.__participant_id) @@ -484,15 +492,14 @@ class LiveStream(ArFeatures.ArContext): # The participant doesn't exist, create one if self.__participant_id is None: - logging.debug('> %s participant doesn\'t exist', self.__participant_name) data = { 'pa_project': self.__project_id, - 'pa_info': { + 'pa_info': { 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__participant_name)), 'Name': self.__participant_name, - 'Notes': '' # TODO: set participant notes + 'Notes': '' # TODO: set participant notes }, 'pa_created': self.__get_current_datetime() } @@ -507,7 +514,7 @@ class LiveStream(ArFeatures.ArContext): def __enter__(self): logging.info('Tobii Pro Glasses 2 connexion starts...') - + # Update current configuration with configuration patch logging.debug('> updating configuration') @@ -527,7 +534,6 @@ class LiveStream(ArFeatures.ArContext): logging.info('Tobii Pro Glasses 2 configuration:') for key, value in configuration.items(): - logging.info('%s: %s', key, str(value)) # Store video stream info @@ -546,7 +552,6 @@ class LiveStream(ArFeatures.ArContext): # Bind to participant if required if self.__participant_name is not None: - logging.debug('> binding participant %s', self.__participant_name) self.__bind_participant() @@ -558,21 +563,22 @@ class LiveStream(ArFeatures.ArContext): # Open data stream self.__data_socket = self.__make_socket() - self.__data_thread = threading.Thread(target = self.__stream_data) + self.__data_thread = threading.Thread(target=self.__stream_data) logging.debug('> starting data thread...') self.__data_thread.start() # Open video stream self.__video_socket = self.__make_socket() - self.__video_thread = threading.Thread(target = self.__stream_video) + self.__video_thread = threading.Thread(target=self.__stream_video) logging.debug('> starting video thread...') self.__video_thread.start() # Keep connection alive - self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}" - self.__keep_alive_thread = threading.Thread(target = self.__keep_alive) + self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \"" + str( + uuid.uuid4()) + "\", \"op\": \"start\"}" + self.__keep_alive_thread = threading.Thread(target=self.__keep_alive) logging.debug('> starting keep alive thread...') self.__keep_alive_thread.start() @@ -583,7 +589,7 @@ class LiveStream(ArFeatures.ArContext): def __exit__(self, exception_type, exception_value, exception_traceback): logging.debug('%s.__exit__', DataFeatures.get_class_path(self)) - + # Close data stream self.__stop_event.set() @@ -612,7 +618,6 @@ class LiveStream(ArFeatures.ArContext): image = super().image(**kwargs) if draw_something: - cv2.putText(image, 'SOMETHING', (512, 512), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) return image @@ -623,10 +628,10 @@ class LiveStream(ArFeatures.ArContext): iptype = socket.AF_INET if ':' in self.__address: - iptype = socket.AF_INET6 - res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE) + res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, + socket.AI_PASSIVE) family, socktype, proto, canonname, sockaddr = res[0] new_socket = socket.socket(family, socktype, proto) @@ -635,13 +640,11 @@ class LiveStream(ArFeatures.ArContext): try: if iptype == socket.AF_INET6: - new_socket.setsockopt(socket.SOL_SOCKET, 25, 1) except socket.error as e: if e.errno == 1: - logging.error('Binding to a network interface is permitted only for root users.') return new_socket @@ -672,7 +675,6 @@ class LiveStream(ArFeatures.ArContext): # Store first timestamp if first_ts == 0: - first_ts = data_ts # Edit millisecond timestamp @@ -689,15 +691,15 @@ class LiveStream(ArFeatures.ArContext): # Process timestamped gaze position self._process_gaze_position( - timestamp = timestamp, - x = int(data_object.value[0] * self.__video_width), - y = int(data_object.value[1] * self.__video_height) ) + timestamp=timestamp, + x=int(data_object.value[0] * self.__video_width), + y=int(data_object.value[1] * self.__video_height)) else: # Process empty gaze position - self._process_gaze_position(timestamp = timestamp) - + self._process_gaze_position(timestamp=timestamp) + def __stream_video(self): """Stream video from dedicated socket.""" @@ -712,7 +714,7 @@ class LiveStream(ArFeatures.ArContext): self.__video_buffer_lock = threading.Lock() # Open video buffer reader - self.__video_buffer_read_thread = threading.Thread(target = self.__video_buffer_read) + self.__video_buffer_read_thread = threading.Thread(target=self.__video_buffer_read) logging.debug('> starting video buffer reader thread...') self.__video_buffer_read_thread.start() @@ -726,7 +728,6 @@ class LiveStream(ArFeatures.ArContext): # Quit if the video acquisition thread have been stopped if self.__stop_event.is_set(): - logging.debug('> stop event is set') break @@ -736,7 +737,6 @@ class LiveStream(ArFeatures.ArContext): # Store first timestamp if first_ts == 0: - first_ts = image.time # Edit millisecond timestamp @@ -762,7 +762,6 @@ class LiveStream(ArFeatures.ArContext): # Can't read image while it is locked while self.__video_buffer_lock.locked(): - # Check 10 times per frame time.sleep(1 / (10 * self.__video_fps)) @@ -782,7 +781,6 @@ class LiveStream(ArFeatures.ArContext): logging.debug('> read image at %i timestamp', timestamp) if len(self.__video_buffer) > 0: - logging.warning('skipping %i image', len(self.__video_buffer)) # Clear buffer @@ -790,9 +788,9 @@ class LiveStream(ArFeatures.ArContext): # Process camera image self._process_camera_image( - timestamp = timestamp, - image = image) - + timestamp=timestamp, + image=image) + except Exception as e: logging.warning('%s.__video_buffer_read: %s', DataFeatures.get_class_path(self), e) @@ -806,7 +804,6 @@ class LiveStream(ArFeatures.ArContext): logging.debug('%s.__keep_alive', DataFeatures.get_class_path(self)) while not self.__stop_event.is_set(): - self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport)) self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport)) @@ -833,7 +830,7 @@ class LiveStream(ArFeatures.ArContext): return data - def __post_request(self, api_action, data = None, wait_for_response = True) -> any: + def __post_request(self, api_action, data=None, wait_for_response=True) -> any: """Send a POST request and get result back.""" url = self.__base_url + api_action @@ -845,7 +842,6 @@ class LiveStream(ArFeatures.ArContext): data = json.dumps(data) if wait_for_response is False: - threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start() return None @@ -863,7 +859,7 @@ class LiveStream(ArFeatures.ArContext): return res - def __wait_for_status(self, api_action, key, values, timeout = None) -> any: + def __wait_for_status(self, api_action, key, values, timeout=None) -> any: """Wait until a status matches given values.""" url = self.__base_url + api_action @@ -875,8 +871,8 @@ class LiveStream(ArFeatures.ArContext): req.add_header('Content-Type', 'application/json') try: - - response = urlopen(req, None, timeout = timeout) + + response = urlopen(req, None, timeout=timeout) except URLError as e: @@ -910,12 +906,10 @@ class LiveStream(ArFeatures.ArContext): status = self.calibration_status() while status == 'calibrating': - time.sleep(1) status = self.calibration_status() if status == 'uncalibrated' or status == 'stale' or status == 'failed': - raise Exception(f'Calibration {status}') # CALIBRATION @@ -931,11 +925,10 @@ class LiveStream(ArFeatures.ArContext): # Calibration have to be done for a project and a participant if project_id is None or participant_id is None: - raise Exception(f'Setup project and participant before') data = { - 'ca_project': project_id, + 'ca_project': project_id, 'ca_type': 'default', 'ca_participant': participant_id, 'ca_created': self.__get_current_datetime() @@ -954,11 +947,11 @@ class LiveStream(ArFeatures.ArContext): if self.__calibration_id is not None: - status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) + status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', + ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) # Forget calibration id if status != 'calibrating': - # noinspection PyAttributeOutsideInit self.__calibration_id = None @@ -970,10 +963,12 @@ class LiveStream(ArFeatures.ArContext): # RECORDING FEATURES - def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']): + def __wait_for_recording_status(self, recording_id, + status_array=['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', + 'stopped', 'done', 'stale', 'failed']): return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array) - def create_recording(self, participant_name, recording_name = '', recording_notes = '') -> str: + def create_recording(self, participant_name, recording_name='', recording_notes='') -> str: """Create a new recording. Returns: @@ -1001,7 +996,7 @@ class LiveStream(ArFeatures.ArContext): def start_recording(self, recording_id) -> bool: """Start recording on the Tobii interface's SD Card.""" - + self.__post_request('/api/recordings/' + recording_id + '/start') return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording' @@ -1044,14 +1039,14 @@ class LiveStream(ArFeatures.ArContext): # EVENTS AND EXPERIMENTAL VARIABLES - def __post_recording_data(self, event_type: str, event_tag = ''): + def __post_recording_data(self, event_type: str, event_tag=''): data = {'type': event_type, 'tag': event_tag} self.__post_request('/api/events', data, wait_for_response=False) - def send_event(self, event_type: str, event_value = None): + def send_event(self, event_type: str, event_value=None): self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value)) - def send_variable(self, variable_name: str, variable_value = None): + def send_variable(self, variable_name: str, variable_value=None): self.__post_recording_data(str(variable_name), str(variable_value)) # MISC @@ -1060,7 +1055,8 @@ class LiveStream(ArFeatures.ArContext): self.__get_request('/api/eject') def get_battery_info(self): - return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) ) + return ("Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % ( + float(self.get_battery_level()), float(self.get_battery_remaining_time()))) def get_battery_level(self): return self.get_battery_status()['level'] @@ -1087,7 +1083,7 @@ class LiveStream(ArFeatures.ArContext): return self.__get_request('/api/system/status') def get_storage_info(self): - return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) ) + return ("Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time())) def get_storage_remaining_time(self): return self.get_storage_status()['remaining_time'] @@ -1166,7 +1162,7 @@ class PostProcessing(ArFeatures.ArContext): # Init protected attributes self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS} - + @property def segment(self) -> str: """Path to segment folder.""" @@ -1202,7 +1198,7 @@ class PostProcessing(ArFeatures.ArContext): # Read segment info with open(os.path.join(self.__segment, TOBII_SEGMENT_INFO_FILENAME)) as info_file: - + try: info = json.load(info_file) @@ -1212,10 +1208,10 @@ class PostProcessing(ArFeatures.ArContext): raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}') # Constrain reading dates - self.__end = min(self.__end, int(info["seg_length"] * 1e3)) if self.__end != None else int(info["seg_length"] * 1e3) + self.__end = min(self.__end, int(info["seg_length"] * 1e3)) if self.__end != None else int( + info["seg_length"] * 1e3) if self.__start >= self.__end: - raise ValueError('Start reading timestamp is equal or greater than end reading timestamp.') # TODO: log various info @@ -1227,7 +1223,7 @@ class PostProcessing(ArFeatures.ArContext): self.__stop_event = threading.Event() # Open reading thread - self.__reading_thread = threading.Thread(target = self.__read) + self.__reading_thread = threading.Thread(target=self.__read) logging.debug('> starting reading thread...') self.__reading_thread.start() @@ -1236,7 +1232,7 @@ class PostProcessing(ArFeatures.ArContext): def __exit__(self, exception_type, exception_value, exception_traceback): logging.debug('%s.__exit__', DataFeatures.get_class_path(self)) - + # Close data stream self.__stop_event.set() @@ -1249,15 +1245,14 @@ class PostProcessing(ArFeatures.ArContext): for video_ts, video_image, data_list in self: if self.__stop_event.is_set(): - break logging.debug('> read image at %i timestamp', video_ts) # Process camera image self._process_camera_image( - timestamp = video_ts, - image = video_image) + timestamp=video_ts, + image=video_image) height, width, _ = video_image.shape @@ -1277,14 +1272,14 @@ class PostProcessing(ArFeatures.ArContext): # Process timestamped gaze position self._process_gaze_position( - timestamp = data_ts, - x = int(data_object.value[0] * width), - y = int(data_object.value[1] * height) ) + timestamp=data_ts, + x=int(data_object.value[0] * width), + y=int(data_object.value[1] * height)) else: # Process empty gaze position - self._process_gaze_position(timestamp = data_ts) + self._process_gaze_position(timestamp=data_ts) def __iter__(self): @@ -1304,7 +1299,6 @@ class PostProcessing(ArFeatures.ArContext): next_data_ts, next_data_object, next_data_object_type = self.__next_data() while next_data_ts < next_video_ts: - data_list.append((next_data_ts, next_data_object, next_data_object_type)) next_data_ts, next_data_object, next_data_object_type = self.__next_data() @@ -1321,14 +1315,12 @@ class PostProcessing(ArFeatures.ArContext): # Ignore before start timestamp if ts < self.__start: - return self.__next__() # Ignore images after end timestamp if self.__end != None: if ts >= self.__end: - raise StopIteration # Return millisecond timestamp and image @@ -1337,7 +1329,7 @@ class PostProcessing(ArFeatures.ArContext): def __next_data(self): data = json.loads(next(self.__data_file).decode('utf-8')) - + # Parse data status status = data.pop('s', -1) @@ -1357,7 +1349,6 @@ class PostProcessing(ArFeatures.ArContext): # Ignore data before first vts entry if self.__vts_ts == -1: - return self.__next_data() ts -= self.__vts_ts @@ -1365,15 +1356,13 @@ class PostProcessing(ArFeatures.ArContext): # Ignore timestamps out of the given time range if ts < self.__start * 1e3: - return self.__next_data() if ts >= self.__end * 1e3: - raise StopIteration # Parse data data_object, data_object_type = self.__parser.parse_data(status, data) # Return millisecond timestamp, data object and type - return ts * 1e-3, data_object, data_object_type
\ No newline at end of file + return ts * 1e-3, data_object, data_object_type diff --git a/src/argaze/utils/demo/recorders.py b/src/argaze/utils/demo/recorders.py index 0debc12..679e6f7 100644 --- a/src/argaze/utils/demo/recorders.py +++ b/src/argaze/utils/demo/recorders.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" |