diff options
24 files changed, 226 insertions, 472 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index dc34992..9892b6a 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -16,12 +16,11 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple, Any, Iterator, Union +from typing import Iterator, Union import json import os import sys import importlib -from inspect import getmembers import threading import time @@ -33,18 +32,6 @@ from argaze.utils import Providers import numpy import cv2 -ArLayerType = TypeVar('ArLayer', bound="ArLayer") -# Type definition for type annotation convenience - -ArFrameType = TypeVar('ArFrame', bound="ArFrame") -# Type definition for type annotation convenience - -ArSceneType = TypeVar('ArScene', bound="ArScene") -# Type definition for type annotation convenience - -ArCameraType = TypeVar('ArCamera', bound="ArCamera") -# Type definition for type annotation convenience - class PoseEstimationFailed(Exception): """ Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies. @@ -990,7 +977,7 @@ class ArScene(DataFeatures.PipelineStepObject): } @DataFeatures.PipelineStepMethod - def estimate_pose(self, detected_features: any) -> Tuple[numpy.array, numpy.array, any]: + def estimate_pose(self, detected_features: any) -> tuple[numpy.array, numpy.array, any]: """Define abstract estimate scene pose method. Parameters: diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index 38b3e36..9fc6117 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -16,7 +16,7 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple +from typing import Self import json import os import time @@ -28,9 +28,6 @@ from argaze.AreaOfInterest import AOI2DScene import cv2 import numpy -ArUcoCameraType = TypeVar('ArUcoCamera', bound="ArUcoCamera") -# Type definition for type annotation convenience - # Define default ArUcoCamera image_paremeters values DEFAULT_ARUCOCAMERA_IMAGE_PARAMETERS = { "draw_detected_markers": { diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py index 9a1431d..3a2423f 100644 --- a/src/argaze/ArUcoMarkers/ArUcoDetector.py +++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py @@ -16,7 +16,7 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple +from typing import Self import json import os from collections import Counter @@ -29,21 +29,6 @@ import numpy import cv2 as cv from cv2 import aruco -ArUcoMarkerDictionaryType = TypeVar('ArUcoMarkerDictionary', bound="ArUcoMarkerDictionary") -# Type definition for type annotation convenience - -ArUcoMarkerType = TypeVar('ArUcoMarker', bound="ArUcoMarker") -# Type definition for type annotation convenience - -OpticParametersType = TypeVar('OpticParameters', bound="OpticParameters") -# Type definition for type annotation convenience - -DetectorParametersType = TypeVar('DetectorParameters', bound="DetectorParameters") -# Type definition for type annotation convenience - -ArUcoDetectorType = TypeVar('ArUcoDetector', bound="ArUcoDetector") -# Type definition for type annotation convenience - class DetectorParameters(): """Wrapper class around ArUco marker detector parameters. @@ -102,7 +87,7 @@ class DetectorParameters(): return getattr(self.__parameters, parameter) @classmethod - def from_json(self, json_filepath) -> DetectorParametersType: + def from_json(self, json_filepath) -> DetectorParameters: """Load detector parameters from .json file.""" with open(json_filepath) as configuration_file: @@ -142,22 +127,17 @@ class DetectorParameters(): class ArUcoDetector(DataFeatures.PipelineStepObject): """OpenCV ArUco library wrapper.""" - def __init__(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary = None, optic_parameters: ArUcoOpticCalibrator.OpticParameters = None, parameters: DetectorParameters = None, **kwargs): - """Initialize ArUcoDetector. - - Parameters: - dictionary: ArUco markers dictionary to detect. - optic_parameters: Optic parameters to use for ArUco detection into image. - parameters: ArUco detector parameters. - """ + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + """Initialize ArUcoDetector.""" # Init parent class - super().__init__(**kwargs) + super().__init__() # Init private attributes - self.__dictionary = dictionary - self.__optic_parameters = optic_parameters - self.__parameters = parameters + self.__dictionary = None + self.__optic_parameters = None + self.__parameters = None # Init detected markers data self.__detected_markers = {} @@ -170,101 +150,36 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): @property def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary: - """Get aruco detector's dictionary object.""" + """ArUco markers dictionary to detect.""" return self.__dictionary + + @dictionary.setter + @DataFeatures.PipelineStepAttributeSetter + def dictionary(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary): + + self.__dictionary = dictionary @property def optic_parameters(self) -> ArUcoOpticCalibrator.OpticParameters: - """Get aruco detector's opetic parameters object.""" + """Optic parameters to use for ArUco detection into image.""" return self.__optic_parameters @optic_parameters.setter - def optic_parameters(self, value: ArUcoOpticCalibrator.OpticParameters): - """Set aruco detector's opetic parameters object.""" - self.__optic_parameters = value + @DataFeatures.PipelineStepAttributeSetter + def optic_parameters(self, optic_parameters: ArUcoOpticCalibrator.OpticParameters): + + self.__optic_parameters = optic_parameters @property def parameters(self) -> DetectorParameters: - """Get aruco detector's parameters object.""" + """ArUco detector parameters.""" return self.__parameters @parameters.setter - def parameters(self, value: DetectorParameters): - """Set aruco detector's parameters object.""" - self.__parameters = value - - @classmethod - def from_dict(cls, aruco_detector_data: dict, working_directory: str) -> ArUcoDetectorType: - """Load ArUcoDetector attributes from dictionary. - - Parameters: - aruco_detector_data: dictionary with attributes to load - working_directory: folder path where to load files when a dictionary value is a relative filepath. - """ - - # Load ArUco dictionary - dictionary_value = aruco_detector_data.pop('dictionary') - - # str: dictionary name - if type(dictionary_value) == str: + @DataFeatures.PipelineStepAttributeSetter + def parameters(self, parameters: DetectorParameters): - new_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(dictionary_value) - - # dict: - else: - - new_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(**dictionary_value) - - # Load optic parameters - try: - optic_parameters_value = aruco_detector_data.pop('optic_parameters') - - # str: relative path to .json file - if type(optic_parameters_value) == str: - - optic_parameters_value = os.path.join(working_directory, optic_parameters_value) - new_optic_parameters = ArUcoOpticCalibrator.OpticParameters.from_json(optic_parameters_value) - - # dict: - else: - - new_optic_parameters = ArUcoOpticCalibrator.OpticParameters(**optic_parameters_value) - - except KeyError: - - new_optic_parameters = None - - # Load ArUco detector parameters - try: - - # Check detector parameters value type - parameters_value = aruco_detector_data.pop('parameters') - - # str: relative path to .json file - if type(parameters_value) == str: - - parameters_value = os.path.join(working_directory, parameters_value) - new_parameters = DetectorParameters.from_json(parameters_value) - - # dict: - else: - - new_parameters = DetectorParameters(**parameters_value) - - except KeyError: - - new_parameters = DetectorParameters() - - # Load temporary pipeline step object from aruco_detector_data then export it as dict - temp_pipeline_step_object_data = DataFeatures.PipelineStepObject.from_dict(aruco_detector_data, working_directory).as_dict() - - # Create aruco detector - return ArUcoDetector( \ - new_dictionary, \ - new_optic_parameters, \ - new_parameters, \ - **temp_pipeline_step_object_data \ - ) + self.__parameters = parameters @DataFeatures.PipelineStepMethod def detect_markers(self, image: numpy.array): @@ -342,7 +257,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): marker.size = size marker.points = markers_points.reshape(4, 3).dot(marker.rotation) - marker.translation - def detected_markers(self) -> dict[ArUcoMarkerType]: + def detected_markers(self) -> dict[ArUcoMarker.ArUcoMarker]: """Access to detected markers dictionary.""" return self.__detected_markers @@ -415,15 +330,18 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): class Observer(DataFeatures.PipelineStepObserver): """Define ArUcoDetector observer to count how many times detection succeeded and how many times markers are detected.""" - def __init__(self): + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): """Initialize marker detection metrics.""" + super().__init__() + self.__try_count = 0 self.__success_count = 0 self.__detected_ids = [] @property - def metrics(self) -> Tuple[int, dict]: + def metrics(self) -> tuple[int, dict]: """Get marker detection metrics. Returns: diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py index 71efc77..f02b179 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py @@ -16,8 +16,7 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar -from dataclasses import dataclass, field +from typing import Self import cv2 as cv import cv2.aruco as aruco @@ -48,26 +47,26 @@ all_aruco_markers_dictionaries = { } """Dictionnary to list all built-in ArUco markers dictionaries from OpenCV ArUco package.""" -ArUcoMarkerType = TypeVar('ArUcoMarker', bound="ArUcoMarker") -# Type definition for type annotation convenience - -@dataclass(frozen=True) class ArUcoMarkersDictionary(): """Handle an ArUco markers dictionary.""" - name: str = field(default='DICT_ARUCO_ORIGINAL') - """Dictionary name""" + def __init__(self, name: str = 'DICT_ARUCO_ORIGINAL'): + + self.__name = name - def __post_init__(self): + if all_aruco_markers_dictionaries.get(self.__name, None) is None: + raise NameError(f'Bad ArUco markers dictionary name: {self.__name}') - if all_aruco_markers_dictionaries.get(self.name, None) is None: - raise NameError(f'Bad ArUco markers dictionary name: {self.name}') + @property + def name(self): + """Dictionary name""" + + return self.__name def __str__(self) -> str: """String display""" output = f'{self.name}\n' - return output @property @@ -145,7 +144,7 @@ class ArUcoMarkersDictionary(): return int(dict_name_split[2]) - def create_marker(self, i, size) -> ArUcoMarkerType: + def create_marker(self, i, size) -> Self: """Create a marker.""" if i >= 0 and i < self.number: diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py index 7231384..1723f1c 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py @@ -16,8 +16,7 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple -from dataclasses import dataclass, field +from typing import Self import json import math import itertools @@ -35,9 +34,6 @@ T0 = numpy.array([0., 0., 0.]) R0 = numpy.array([[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]]) """Define no rotation matrix.""" -ArUcoMarkersGroupType = TypeVar('ArUcoMarkersGroup', bound="ArUcoMarkersGroup") -# Type definition for type annotation convenience - def make_rotation_matrix(x, y, z): # Create rotation matrix around x axis @@ -205,7 +201,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): } @classmethod - def from_obj(self, obj_filepath: str) -> ArUcoMarkersGroupType: + def from_obj(self, obj_filepath: str) -> Self: """Load ArUco markers group from .obj file. !!! note @@ -322,7 +318,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): return ArUcoMarkersGroup(new_dictionary, new_places) - def filter_markers(self, detected_markers: dict) -> Tuple[dict, dict]: + def filter_markers(self, detected_markers: dict) -> tuple[dict, dict]: """Sort markers belonging to the group from given detected markers dict (cf ArUcoDetector.detect_markers()). Returns: @@ -345,7 +341,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): return group_markers, remaining_markers - def estimate_pose_from_markers_corners(self, markers: dict, K: numpy.array, D: numpy.array) -> Tuple[bool, numpy.array, numpy.array]: + def estimate_pose_from_markers_corners(self, markers: dict, K: numpy.array, D: numpy.array) -> tuple[bool, numpy.array, numpy.array]: """Estimate pose from markers corners and places corners. Parameters: diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py index 5000833..999fd6f 100644 --- a/src/argaze/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze/ArUcoMarkers/ArUcoScene.py @@ -16,7 +16,6 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple import json import os @@ -27,9 +26,6 @@ from argaze.AreaOfInterest import AOI2DScene import cv2 import numpy -ArUcoSceneType = TypeVar('ArUcoScene', bound="ArUcoScene") -# Type definition for type annotation convenience - class ArUcoScene(ArFeatures.ArScene): """ Define an ArScene based on an ArUcoMarkersGroup description. @@ -82,58 +78,8 @@ class ArUcoScene(ArFeatures.ArScene): self.__aruco_markers_group.parent = self - @classmethod - def from_dict(cls, aruco_scene_data: dict, working_directory: str = None) -> ArUcoSceneType: - """ - Load ArUcoScene from dictionary. - - Parameters: - aruco_scene_data: dictionary - working_directory: folder path where to load files when a dictionary value is a relative filepath. - """ - - # Load aruco markers group - try: - - # Check aruco_markers_group value type - aruco_markers_group_value = aruco_scene_data.pop('aruco_markers_group') - - # str: relative path to description file - if type(aruco_markers_group_value) == str: - - filepath = os.path.join(working_directory, aruco_markers_group_value) - file_format = filepath.split('.')[-1] - - # JSON file format - if file_format == 'json': - - new_aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup.from_json(filepath) - - # OBJ file format - elif file_format == 'obj': - - new_aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup.from_obj(filepath) - - # dict: - else: - - new_aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(**aruco_markers_group_value) - - except KeyError: - - new_aruco_markers_group = None - - # Load temporary scene from aruco_scene_data then export it as dict - temp_scene_data = ArFeatures.ArScene.from_dict(aruco_scene_data, working_directory).as_dict() - - # Create new aruco scene using temporary ar scene values - return ArUcoScene( \ - aruco_markers_group = new_aruco_markers_group, \ - **temp_scene_data \ - ) - @DataFeatures.PipelineStepMethod - def estimate_pose(self, detected_markers: dict) -> Tuple[numpy.array, numpy.array, dict]: + def estimate_pose(self, detected_markers: dict) -> tuple[numpy.array, numpy.array, dict]: """Estimate scene pose from detected ArUco markers. Parameters: diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py index 0af1c63..4a2a2be 100644 --- a/src/argaze/AreaOfInterest/AOI2DScene.py +++ b/src/argaze/AreaOfInterest/AOI2DScene.py @@ -16,7 +16,7 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple +from typing import Self from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures, AOI3DScene @@ -26,12 +26,6 @@ import cv2 import numpy from xml.dom import minidom -AOI2DSceneType = TypeVar('AOI2DScene', bound="AOI2DScene") -# Type definition for type annotation convenience - -AOI3DSceneType = TypeVar('AOI3DScene', bound="AOI3DScene") -# Type definition for type annotation convenience - class AOI2DScene(AOIFeatures.AOIScene): """Define AOI 2D scene.""" @@ -40,7 +34,7 @@ class AOI2DScene(AOIFeatures.AOIScene): super().__init__(2, aoi_2d) @classmethod - def from_svg(self, svg_filepath: str) -> AOI2DSceneType: + def from_svg(self, svg_filepath: str) -> Self: """ Load areas from .svg file. @@ -132,7 +126,7 @@ class AOI2DScene(AOIFeatures.AOIScene): if draw_aoi: aoi.draw(image, **draw_aoi) - def raycast(self, pointer:tuple) -> Tuple[str, "AOIFeatures.AreaOfInterest", bool]: + def raycast(self, pointer:tuple) -> tuple[str, "AOIFeatures.AreaOfInterest", bool]: """Iterate over aoi to know which aoi is matching the given pointer position. Returns: aoi name @@ -164,7 +158,7 @@ class AOI2DScene(AOIFeatures.AOIScene): # Draw form aoi.draw(image, color) - def circlecast(self, center:tuple, radius:float) -> Tuple[str, "AOIFeatures.AreaOfInterest", numpy.array, float, float]: + def circlecast(self, center:tuple, radius:float) -> tuple[str, "AOIFeatures.AreaOfInterest", numpy.array, float, float]: """Iterate over areas to know which aoi is matched circle. Returns: aoi name @@ -181,7 +175,7 @@ class AOI2DScene(AOIFeatures.AOIScene): yield name, aoi, matched_region, aoi_ratio, circle_ratio '''DEPRECATED: but maybe still usefull? - def reframe(self, aoi: AOIFeatures.AreaOfInterest, size: tuple) -> AOI2DSceneType: + def reframe(self, aoi: AOIFeatures.AreaOfInterest, size: tuple) -> AOI2DScene: """ Reframe whole scene to a scene bounded by a 4 vertices 2D AOI. @@ -213,7 +207,7 @@ class AOI2DScene(AOIFeatures.AOIScene): return aoi2D_scene ''' - def dimensionalize(self, rectangle_3d: AOIFeatures.AreaOfInterest, size: tuple) -> AOI3DSceneType: + def dimensionalize(self, rectangle_3d: AOIFeatures.AreaOfInterest, size: tuple) -> AOI3DScene.AOI3DScene: """ Convert to 3D scene considering it is inside of 3D rectangular frame. diff --git a/src/argaze/AreaOfInterest/AOI3DScene.py b/src/argaze/AreaOfInterest/AOI3DScene.py index e9ca687..83180a8 100644 --- a/src/argaze/AreaOfInterest/AOI3DScene.py +++ b/src/argaze/AreaOfInterest/AOI3DScene.py @@ -16,7 +16,7 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple +from typing import Self import math import re @@ -38,12 +38,6 @@ K0 = numpy.array([[1., 0., 0.], [0., 1., 0.], [0., 0., 0.]]) D0 = numpy.array([0.0, 0.0, 0.0, 0.0, 0.0]) """Define default optic distorsion coefficients vector.""" -AOI3DSceneType = TypeVar('AOI3DScene', bound="AOI3DScene") -# Type definition for type annotation convenience - -AOI2DSceneType = TypeVar('AOI2DScene', bound="AOI2DScene") -# Type definition for type annotation convenience - class AOI3DScene(AOIFeatures.AOIScene): """Define AOI 3D scene.""" @@ -52,7 +46,7 @@ class AOI3DScene(AOIFeatures.AOIScene): super().__init__(3, aoi_3d) @classmethod - def from_obj(self, obj_filepath: str) -> AOI3DSceneType: + def from_obj(self, obj_filepath: str) -> Self: """Load AOI3D scene from .obj file.""" aoi_3d = {} @@ -160,7 +154,7 @@ class AOI3DScene(AOIFeatures.AOIScene): '''DEPRECATED: but maybe still usefull? @property - def orthogonal_projection(self) -> AOI2DSceneType: + def orthogonal_projection(self) -> AOI2DScene.AOI2DScene: """ Orthogonal projection of whole scene. @@ -180,7 +174,7 @@ class AOI3DScene(AOIFeatures.AOIScene): return self.project(tvec, rvec, K) ''' - def vision_cone(self, cone_radius, cone_height, cone_tip=[0., 0., 0.], cone_direction=[0., 0., 1.]) -> Tuple[AOI3DSceneType, AOI3DSceneType]: + def vision_cone(self, cone_radius, cone_height, cone_tip=[0., 0., 0.], cone_direction=[0., 0., 1.]) -> tuple[Self, Self]: """Get AOI which are inside and out a given cone field. !!! note @@ -221,7 +215,7 @@ class AOI3DScene(AOIFeatures.AOIScene): return aoi3D_scene_inside, aoi3D_scene_outside - def project(self, T: numpy.array = T0, R: numpy.array = R0, K: numpy.array = K0, D: numpy.array = D0) -> AOI2DSceneType: + def project(self, T: numpy.array = T0, R: numpy.array = R0, K: numpy.array = K0, D: numpy.array = D0): """Project 3D scene onto 2D scene according translation, rotation and optical parameters. Parameters: @@ -249,7 +243,7 @@ class AOI3DScene(AOIFeatures.AOIScene): return aoi2D_scene - def transform(self, T: numpy.array = T0, R: numpy.array = R0) -> AOI3DSceneType: + def transform(self, T: numpy.array = T0, R: numpy.array = R0) -> Self: """Translate and/or rotate 3D scene. Parameters: diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index 31da8f4..7374e83 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -16,8 +16,7 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple -from dataclasses import dataclass, field +from typing import Self import json import os import math @@ -31,13 +30,10 @@ from shapely.geometry import Polygon from shapely.geometry.point import Point from colorama import Style, Fore -AreaOfInterestType = TypeVar('AreaOfInterest', bound="AreaOfInterest") -# Type definition for type annotation convenience - class AreaOfInterest(numpy.ndarray): """Define Area Of Interest as an array of points of any dimension.""" - def __new__(cls, points: numpy.array = numpy.empty(0)) -> AreaOfInterestType: + def __new__(cls, points: numpy.array = numpy.empty(0)) -> Self: """View casting inheritance.""" return numpy.array(points).view(AreaOfInterest) @@ -53,7 +49,7 @@ class AreaOfInterest(numpy.ndarray): return repr(self.tolist()) @classmethod - def from_dict(cls, aoi_data: dict, working_directory: str = None) -> AreaOfInterestType: + def from_dict(cls, aoi_data: dict, working_directory: str = None) -> Self: """Load attributes from dictionary. Parameters: @@ -152,7 +148,7 @@ class AreaOfInterest(numpy.ndarray): return numpy.array([(min_x, min_y), (max_x, min_y), (max_x, max_y), (min_x, max_y)]) - def clockwise(self) -> AreaOfInterestType: + def clockwise(self) -> Self: """Get area points in clockwise order. !!! warning Available for 2D AOI only.""" @@ -218,7 +214,7 @@ class AreaOfInterest(numpy.ndarray): return tuple(O + x * H + y * V) - def circle_intersection(self, center: tuple, radius: float) -> Tuple[numpy.array, float, float]: + def circle_intersection(self, center: tuple, radius: float) -> tuple[numpy.array, float, float]: """Get intersection shape with a circle, intersection area / AOI area ratio and intersection area / circle area ratio. !!! warning Available for 2D AOI only. @@ -267,9 +263,6 @@ class AreaOfInterest(numpy.ndarray): center_pixel = numpy.rint(self.center).astype(int) cv2.circle(image, center_pixel, 1, color, -1) -AOISceneType = TypeVar('AOIScene', bound="AOIScene") -# Type definition for type annotation convenience - class AOIScene(): """Define AOI scene as a dictionary of AOI.""" @@ -290,7 +283,7 @@ class AOIScene(): self[name] = AreaOfInterest(area) @classmethod - def from_dict(cls, aoi_scene_data: dict, working_directory: str = None) -> AOISceneType: + def from_dict(cls, aoi_scene_data: dict, working_directory: str = None) -> Self: """Load attributes from dictionary. Parameters: @@ -322,7 +315,7 @@ class AOIScene(): return AOIScene(dimension = dimension, areas = areas) @classmethod - def from_json(self, json_filepath: str) -> AOISceneType: + def from_json(self, json_filepath: str) -> Self: """ Load attributes from .json file. @@ -399,7 +392,7 @@ class AOIScene(): return str(self.__areas) - def __add__(self, add_vector) -> AOISceneType: + def __add__(self, add_vector) -> Self: """Add vector to scene.""" assert(len(add_vector) == self.__dimension) @@ -413,7 +406,7 @@ class AOIScene(): # Allow n + scene operation __radd__ = __add__ - def __sub__(self, sub_vector) -> AOISceneType: + def __sub__(self, sub_vector) -> Self: """Sub vector to scene.""" assert(len(sub_vector) == self.__dimension) @@ -424,7 +417,7 @@ class AOIScene(): return self - def __rsub__(self, rsub_vector) -> AOISceneType: + def __rsub__(self, rsub_vector) -> Self: """RSub vector to scene.""" assert(len(rsub_vector) == self.__dimension) @@ -435,7 +428,7 @@ class AOIScene(): return self - def __mul__(self, scale_vector) -> AOISceneType: + def __mul__(self, scale_vector) -> Self: """Scale scene by a vector.""" assert(len(scale_vector) == self.__dimension) @@ -449,7 +442,7 @@ class AOIScene(): # Allow n * scene operation __rmul__ = __mul__ - def __truediv__(self, div_vector) -> AOISceneType: + def __truediv__(self, div_vector) -> Self: assert(len(div_vector) == self.__dimension) @@ -459,7 +452,7 @@ class AOIScene(): return self - def items(self) -> Tuple[str, AreaOfInterest]: + def items(self) -> tuple[str, AreaOfInterest]: """Iterate over areas.""" return self.__areas.items() @@ -475,7 +468,7 @@ class AOIScene(): return self.__dimension - def expand(self) -> AOISceneType: + def expand(self) -> Self: """Add 1 dimension to the AOIs in scene.""" new_areas = {} @@ -520,7 +513,7 @@ class AOIScene(): return max_bounds - min_bounds - def copy(self, exclude=[]) -> AOISceneType: + def copy(self, exclude=[]) -> Self: """Copy scene partly excluding AOI by name.""" scene_copy = type(self)() @@ -554,9 +547,6 @@ class AOIScene(): return output -HeatmapType = TypeVar('Heatmap', bound="Heatmap") -# Type definition for type annotation convenience - class Heatmap(DataFeatures.PipelineStepObject): """Define image to draw heatmap.""" diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 415b5c2..ff88693 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -16,15 +16,13 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple, Any +from typing import Self import os import sys import traceback import importlib -from inspect import getmembers, getmodule import collections import json -import ast import bisect import threading import math @@ -37,15 +35,6 @@ import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches from colorama import Style, Fore -TimestampType = TypeVar('Timestamp', int, float) -"""Type definition for timestamp as integer or float value.""" - -TimestampedObjectType = TypeVar('TimestampedObject', bound="TimestampedObject") -# Type definition for type annotation convenience - -TimestampedObjectsListType = TypeVar('TimestampedObjectsList', bound="TimestampedObjectsList") -# Type definition for type annotation convenience - def module_path(obj) -> str: """ Get object module path. @@ -210,7 +199,7 @@ class TimestampedObjectsList(list): """Get object type handled by the list.""" return self.__object_type - def append(self, ts_object: TimestampedObjectType|dict): + def append(self, ts_object: TimestampedObject|dict): """Append timestamped object.""" # Convert dict into GazePosition @@ -231,7 +220,7 @@ class TimestampedObjectsList(list): super().append(ts_object) - def look_for(self, timestamp: TimestampType) -> TimestampedObjectType: + def look_for(self, timestamp: int|float) -> TimestampedObject: """Look for object at given timestamp.""" for ts_object in self: @@ -239,7 +228,7 @@ class TimestampedObjectsList(list): return ts_object - def __add__(self, ts_objects: list = []) -> TimestampedObjectsListType: + def __add__(self, ts_objects: list = []) -> Self: """Append timestamped objects list.""" for ts_object in ts_objects: @@ -268,7 +257,7 @@ class TimestampedObjectsList(list): return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] @classmethod - def from_dataframe(self, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> TimestampedObjectsListType: + def from_dataframe(self, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self: """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" dataframe.drop(exclude, inplace=True, axis=True) @@ -329,7 +318,7 @@ class TimestampedObjectsList(list): return df @classmethod - def from_json(self, ts_object_type: type, json_filepath: str) -> TimestampedObjectsListType: + def from_json(self, ts_object_type: type, json_filepath: str) -> Self: """Create a TimestampedObjectsList from .json file.""" with open(json_filepath, encoding='utf-8') as ts_objects_file: @@ -353,7 +342,7 @@ class TimestampedObjectsList(list): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) - def pop_last_until(self, timestamp: TimestampType) -> TimestampedObjectType: + def pop_last_until(self, timestamp: int|float) -> TimestampedObject: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp @@ -365,7 +354,7 @@ class TimestampedObjectsList(list): return self[0] - def pop_last_before(self, timestamp: TimestampType) -> TimestampedObjectType: + def pop_last_before(self, timestamp: int|float) -> TimestampedObject: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp @@ -379,7 +368,7 @@ class TimestampedObjectsList(list): return poped_value - def get_first_from(self, timestamp: TimestampType) -> TimestampedObjectType: + def get_first_from(self, timestamp: int|float) -> TimestampedObject: """Retreive first item timestamp from a given timestamp value.""" ts_list = self.timestamps() @@ -393,7 +382,7 @@ class TimestampedObjectsList(list): raise KeyError(f'No data stored after {timestamp} timestamp.') - def get_last_before(self, timestamp: TimestampType) -> TimestampedObjectType: + def get_last_before(self, timestamp: int|float) -> TimestampedObject: """Retreive last item timestamp before a given timestamp value.""" ts_list = self.timestamps() @@ -407,7 +396,7 @@ class TimestampedObjectsList(list): raise KeyError(f'No data stored before {timestamp} timestamp.') - def get_last_until(self, timestamp: TimestampType) -> TimestampedObjectType: + def get_last_until(self, timestamp: int|float) -> TimestampedObject: """Retreive last item timestamp until a given timestamp value.""" ts_list = self.timestamps() @@ -463,7 +452,7 @@ def PipelineStepInit(method): """Wrap pipeline step init method to update PipelineStepObject attributes with arguments after init call. Parameters: - kwargs: Any arguments defined by PipelineStepMethodInit. + kwargs: any arguments defined by PipelineStepMethodInit. """ method(self, **kwargs) @@ -813,7 +802,7 @@ class PipelineStepObject(): return tabs @property - def properties(self) -> Tuple[name, any]: + def properties(self) -> tuple[name, any]: """Iterate over pipeline step properties values.""" for name, item in self.__class__.__dict__.items(): @@ -857,9 +846,9 @@ def PipelineStepMethod(method): """Wrap pipeline step method to measure execution time. Parameters: - args: Any arguments defined by PipelineStepMethod. - timestamp: Optional method call timestamp (unit does'nt matter) if first args parameter is not a TimestampedObject instance. - unwrap: Extra arguments used in wrapper function to call wrapped method directly. + args: any arguments defined by PipelineStepMethod. + timestamp: optional method call timestamp (unit does'nt matter) if first args parameter is not a TimestampedObject instance. + unwrap: extra arguments used in wrapper function to call wrapped method directly. """ if timestamp is None and len(args) > 0: diff --git a/src/argaze/GazeAnalysis/Basic.py b/src/argaze/GazeAnalysis/Basic.py index 7c020d7..74426de 100644 --- a/src/argaze/GazeAnalysis/Basic.py +++ b/src/argaze/GazeAnalysis/Basic.py @@ -34,7 +34,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__step_fixation_durations_average = 0 @DataFeatures.PipelineStepMethod - def analyze(self, scan_path: GazeFeatures.ScanPathType): + def analyze(self, scan_path: GazeFeatures.ScanPath): self.__path_duration = scan_path.duration @@ -80,7 +80,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__aoi_fixation_distribution = {} @DataFeatures.PipelineStepMethod - def analyze(self, aoi_scan_path: GazeFeatures.ScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.ScanPath): self.__path_duration = aoi_scan_path.duration diff --git a/src/argaze/GazeAnalysis/Entropy.py b/src/argaze/GazeAnalysis/Entropy.py index e241feb..c1cddd6 100644 --- a/src/argaze/GazeAnalysis/Entropy.py +++ b/src/argaze/GazeAnalysis/Entropy.py @@ -57,7 +57,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__transition_matrix_analyzer = transition_matrix_analyzer @DataFeatures.PipelineStepMethod - def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPath): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/ExploreExploitRatio.py b/src/argaze/GazeAnalysis/ExploreExploitRatio.py index 14b3c9d..44addd3 100644 --- a/src/argaze/GazeAnalysis/ExploreExploitRatio.py +++ b/src/argaze/GazeAnalysis/ExploreExploitRatio.py @@ -49,7 +49,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__short_fixation_duration_threshold = short_fixation_duration_threshold @DataFeatures.PipelineStepMethod - def analyze(self, scan_path: GazeFeatures.ScanPathType): + def analyze(self, scan_path: GazeFeatures.ScanPath): assert(len(scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/KCoefficient.py b/src/argaze/GazeAnalysis/KCoefficient.py index 9084bd9..9bed17c 100644 --- a/src/argaze/GazeAnalysis/KCoefficient.py +++ b/src/argaze/GazeAnalysis/KCoefficient.py @@ -38,7 +38,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__K = 0 @DataFeatures.PipelineStepMethod - def analyze(self, scan_path: GazeFeatures.ScanPathType): + def analyze(self, scan_path: GazeFeatures.ScanPath): assert(len(scan_path) > 1) @@ -95,7 +95,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__K = 0 @DataFeatures.PipelineStepMethod - def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType) -> float: + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPath) -> float: assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/LempelZivComplexity.py b/src/argaze/GazeAnalysis/LempelZivComplexity.py index 57a82d4..fc50991 100644 --- a/src/argaze/GazeAnalysis/LempelZivComplexity.py +++ b/src/argaze/GazeAnalysis/LempelZivComplexity.py @@ -39,7 +39,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__lempel_ziv_complexity = 0 @DataFeatures.PipelineStepMethod - def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPath): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/NGram.py b/src/argaze/GazeAnalysis/NGram.py index d302f07..ac5a0dd 100644 --- a/src/argaze/GazeAnalysis/NGram.py +++ b/src/argaze/GazeAnalysis/NGram.py @@ -17,12 +17,8 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple, Any -from dataclasses import dataclass, field - from argaze import GazeFeatures, DataFeatures -@dataclass class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): """Implementation of N-Gram algorithm as proposed in: @@ -62,7 +58,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__n_max = n_max @DataFeatures.PipelineStepMethod - def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPath): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/NearestNeighborIndex.py b/src/argaze/GazeAnalysis/NearestNeighborIndex.py index d874352..615643e 100644 --- a/src/argaze/GazeAnalysis/NearestNeighborIndex.py +++ b/src/argaze/GazeAnalysis/NearestNeighborIndex.py @@ -50,7 +50,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__size = size @DataFeatures.PipelineStepMethod - def analyze(self, scan_path: GazeFeatures.ScanPathType): + def analyze(self, scan_path: GazeFeatures.ScanPath): assert(len(scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/TransitionMatrix.py b/src/argaze/GazeAnalysis/TransitionMatrix.py index b91599b..16cb56e 100644 --- a/src/argaze/GazeAnalysis/TransitionMatrix.py +++ b/src/argaze/GazeAnalysis/TransitionMatrix.py @@ -40,7 +40,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__transition_matrix_density = 0. @DataFeatures.PipelineStepMethod - def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPath): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index e63ad46..9ea877a 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -16,13 +16,10 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar, Tuple, Any -from dataclasses import dataclass, field +from typing import Self import math -import ast import json import importlib -from inspect import getmembers from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures @@ -32,9 +29,6 @@ import numpy import pandas import cv2 -GazePositionType = TypeVar('GazePosition', bound="GazePosition") -# Type definition for type annotation convenience - class GazePosition(tuple, DataFeatures.TimestampedObject): """Define gaze position as a tuple of coordinates with precision. @@ -69,7 +63,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): return self.__message @classmethod - def from_dict(self, position_data: dict) -> GazePositionType: + def from_dict(self, position_data: dict) -> Self: if 'value' in position_data.keys(): @@ -89,7 +83,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): return json.dumps(DataFeatures.as_dict(self)) - def __add__(self, position: GazePositionType) -> GazePositionType: + def __add__(self, position: Self) -> Self: """Add position. !!! note @@ -108,7 +102,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): __radd__ = __add__ - def __sub__(self, position: GazePositionType) -> GazePositionType: + def __sub__(self, position: Self) -> Self: """Substract position. !!! note @@ -125,7 +119,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): return GazePosition(numpy.array(self) - numpy.array(position), timestamp=self.timestamp) - def __rsub__(self, position: GazePositionType) -> GazePositionType: + def __rsub__(self, position: Self) -> Self: """Reversed substract position. !!! note @@ -142,7 +136,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): return GazePosition(numpy.array(position) - numpy.array(self), timestamp=self.timestamp) - def __mul__(self, factor: int|float) -> GazePositionType: + def __mul__(self, factor: int|float) -> Self: """Multiply position by a factor. !!! note @@ -153,7 +147,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): """ return GazePosition(numpy.array(self) * factor, precision = self.__precision * factor if self.__precision is not None else None, timestamp=self.timestamp) - def __pow__(self, factor: int|float) -> GazePositionType: + def __pow__(self, factor: int|float) -> Self: """Power position by a factor. !!! note @@ -198,9 +192,6 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): if self.__precision is not None and draw_precision: cv2.circle(image, int_value, round(self.__precision), color, 1) -TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeStampedGazePositions") -# Type definition for type annotation convenience - class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze positions into a list.""" @@ -214,8 +205,8 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): ''' Is it still needed as there is a TimestampedObjectsList.from_json method? @classmethod - def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType: - """Create a TimeStampedGazePositionsType from .json file.""" + def from_json(self, json_filepath: str) -> TimeStampedGazePositions: + """Create a TimeStampedGazePositions from .json file.""" with open(json_filepath, encoding='utf-8') as ts_positions_file: @@ -225,7 +216,7 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): ''' @classmethod - def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> TimeStampedGazePositionsType: + def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> Self: """Create a TimeStampedGazePositions from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). Parameters: @@ -294,9 +285,6 @@ class GazePositionCalibrationFailed(Exception): super().__init__(message) -GazePositionCalibratorType = TypeVar('GazePositionCalibrator', bound="GazePositionCalibrator") -# Type definition for type annotation convenience - class GazePositionCalibrator(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a gaze position calibrator algorithm.""" @@ -320,7 +308,7 @@ class GazePositionCalibrator(DataFeatures.PipelineStepObject): raise NotImplementedError('reset() method not implemented') - def calibrate(self) -> Any: + def calibrate(self) -> any: """Process calibration from observed and expected gaze positions. Returns: @@ -329,7 +317,7 @@ class GazePositionCalibrator(DataFeatures.PipelineStepObject): raise NotImplementedError('terminate() method not implemented') - def apply(self, observed_gaze_position: GazePosition) -> GazePositionType: + def apply(self, observed_gaze_position: GazePosition) -> GazePosition: """Apply calibration onto observed gaze position. Parameters: @@ -355,9 +343,6 @@ class GazePositionCalibrator(DataFeatures.PipelineStepObject): raise NotImplementedError('ready getter not implemented') -GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement") -# Type definition for type annotation convenience - class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): """Define abstract gaze movement class as timestamped gaze positions list. @@ -402,7 +387,7 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): """Is the movement finished?""" return self.__finished - def finish(self) -> GazeMovementType: + def finish(self) -> Self: """Set gaze movement as finished""" self.__finished = True return self @@ -470,9 +455,6 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): raise NotImplementedError('draw() method not implemented') -FixationType = TypeVar('Fixation', bound="Fixation") -# Type definition for type annotation convenience - class Fixation(GazeMovement): """Define abstract fixation as gaze movement.""" @@ -492,7 +474,7 @@ class Fixation(GazeMovement): """Set representative position of the fixation.""" self._focus = focus - def merge(self, fixation) -> FixationType: + def merge(self, fixation) -> Self: """Merge another fixation into this fixation.""" raise NotImplementedError('merge() method not implemented') @@ -502,9 +484,6 @@ def is_fixation(gaze_movement): return type(gaze_movement).__bases__[0] == Fixation or type(gaze_movement) == Fixation -SaccadeType = TypeVar('Saccade', bound="Saccade") -# Type definition for type annotation convenience - class Saccade(GazeMovement): """Define abstract saccade as gaze movement.""" @@ -517,9 +496,6 @@ def is_saccade(gaze_movement): return type(gaze_movement).__bases__[0] == Saccade or type(gaze_movement) == Saccade -TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeStampedGazeMovements") -# Type definition for type annotation convenience - class TimeStampedGazeMovements(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze movements into a list""" @@ -527,11 +503,8 @@ class TimeStampedGazeMovements(DataFeatures.TimestampedObjectsList): DataFeatures.TimestampedObjectsList.__init__(self, GazeMovement, gaze_movements) -GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus") -# Type definition for type annotation convenience - class GazeStatus(list, DataFeatures.TimestampedObject): - """Define gaze status as a list of 1 or 2 (index, GazeMovementType) tuples. + """Define gaze status as a list of 1 or 2 (index, GazeMovement) tuples. Parameters: position: the position that the status represents. @@ -553,9 +526,6 @@ class GazeStatus(list, DataFeatures.TimestampedObject): super().append((movement_index, movement_type)) -TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus") -# Type definition for type annotation convenience - class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze status into a list.""" @@ -572,7 +542,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): super().__init__() @DataFeatures.PipelineStepMethod - def identify(self, timestamped_gaze_position: GazePosition, terminate:bool=False) -> GazeMovementType: + def identify(self, timestamped_gaze_position: GazePosition, terminate:bool=False) -> GazeMovement: """Identify gaze movement from successive timestamped gaze positions. !!! warning "Mandatory" @@ -588,22 +558,22 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): raise NotImplementedError('identify() method not implemented') - def current_gaze_movement(self) -> GazeMovementType: + def current_gaze_movement(self) -> GazeMovement: """Get the current identified gaze movement (finished or in progress) if it exists otherwise, an empty gaze movement.""" raise NotImplementedError('current_gaze_movement getter not implemented') - def current_fixation(self) -> FixationType: + def current_fixation(self) -> Fixation: """Get the current identified fixation (finished or in progress) if it exists otherwise, an empty gaze movement.""" raise NotImplementedError('current_fixation getter not implemented') - def current_saccade(self) -> SaccadeType: + def current_saccade(self) -> Saccade: """Get the current identified saccade (finished or in progress) if it exists otherwise, an empty gaze movement.""" raise NotImplementedError('current_saccade getter not implemented') - def browse(self, ts_gaze_positions: TimeStampedGazePositions) -> Tuple[TimeStampedGazeMovementsType, TimeStampedGazeMovementsType, TimeStampedGazeStatusType]: + def browse(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[TimeStampedGazeMovements, TimeStampedGazeMovements, TimeStampedGazeStatus]: """Identify fixations and saccades browsing timestamped gaze positions. Returns: @@ -652,7 +622,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): return ts_fixations, ts_saccades, ts_status - def __call__(self, ts_gaze_positions: TimeStampedGazePositions) -> Tuple[int|float, GazeMovementType]: + def __call__(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[int|float, GazeMovement]: """GazeMovement generator. Parameters: @@ -677,9 +647,6 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): yield gaze_movement -ScanStepType = TypeVar('ScanStep', bound="ScanStep") -# Type definition for type annotation convenience - class ScanStepError(Exception): """Exception raised at ScanStep creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade.""" @@ -743,9 +710,6 @@ class ScanStep(): return self.__first_fixation.duration + self.__last_saccade.duration -ScanPathType = TypeVar('ScanPathType', bound="ScanPathType") -# Type definition for type annotation convenience - class ScanPath(list): """List of scan steps. @@ -783,7 +747,7 @@ class ScanPath(list): self.__duration -= oldest_step.duration - def append_saccade(self, saccade) -> ScanStepType: + def append_saccade(self, saccade) -> ScanStep: """Append new saccade to scan path and return last new scan step if one have been created.""" # Ignore saccade if no fixation came before @@ -855,7 +819,7 @@ class ScanPathAnalyzer(DataFeatures.PipelineStepObject): return DataFeatures.DataDictionary( {a: getattr(self, a) for a in self.__analysis} ) @DataFeatures.PipelineStepMethod - def analyze(self, scan_path: ScanPathType): + def analyze(self, scan_path: ScanPath): """Analyze scan path.""" raise NotImplementedError('analyze() method not implemented') @@ -880,7 +844,7 @@ class AOIMatcher(DataFeatures.PipelineStepObject): self.__exclude = exclude - def match(self, aoi_scene: AOIFeatures.AOIScene, gaze_movement: GazeMovement) -> Tuple[str, AOIFeatures.AreaOfInterest]: + def match(self, aoi_scene: AOIFeatures.AOIScene, gaze_movement: GazeMovement) -> tuple[str, AOIFeatures.AreaOfInterest]: """Which AOI is looked in the scene?""" raise NotImplementedError('match() method not implemented') @@ -904,9 +868,6 @@ class AOIMatcher(DataFeatures.PipelineStepObject): """Get most likely looked aoi name.""" raise NotImplementedError('looked_aoi_name() method not implemented') -AOIScanStepType = TypeVar('AOIScanStep', bound="AOIScanStep") -# Type definition for type annotation convenience - class AOIScanStepError(Exception): """Exception raised at AOIScanStep creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade.""" @@ -987,9 +948,6 @@ class AOIScanStep(): """ return self.last_saccade[-1].timestamp - self.first_fixation[0].timestamp -AOIScanPathType = TypeVar('AOIScanPathType', bound="AOIScanPathType") -# Type definition for type annotation convenience - # Define strings for outside AOI case OutsideAOI = 'GazeFeatures.OutsideAOI' @@ -1213,7 +1171,7 @@ class AOIScanPathAnalyzer(DataFeatures.PipelineStepObject): return DataFeatures.DataDictionary( {a: getattr(self, a) for a in self.__analysis} ) @DataFeatures.PipelineStepMethod - def analyze(self, aoi_scan_path: AOIScanPathType): + def analyze(self, aoi_scan_path: AOIScanPath): """Analyze aoi scan path.""" raise NotImplementedError('analyze() method not implemented') diff --git a/src/argaze/PupillAnalysis/WorkloadIndex.py b/src/argaze/PupillAnalysis/WorkloadIndex.py index aef56f9..23c3bab 100644 --- a/src/argaze/PupillAnalysis/WorkloadIndex.py +++ b/src/argaze/PupillAnalysis/WorkloadIndex.py @@ -16,7 +16,6 @@ __credits__ = ["Jean-Paul Imbert"] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar import math from argaze import DataFeatures, PupillFeatures diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupillFeatures.py index 2c150d4..70ffb31 100644 --- a/src/argaze/PupillFeatures.py +++ b/src/argaze/PupillFeatures.py @@ -16,15 +16,11 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import TypeVar import json import math from argaze import DataFeatures -PupillDiameterType = TypeVar('PupillDiameter', bound="PupillDiameter") -# Type definition for type annotation convenience - class PupillDiameter(float, DataFeatures.TimestampedObject): """Define pupill diameter as a single float value. @@ -44,9 +40,6 @@ class PupillDiameter(float, DataFeatures.TimestampedObject): """Get pupill diameter value.""" return float(self) -TimeStampedPupillDiametersType = TypeVar('TimeStampedPupillDiameters', bound="TimeStampedPupillDiameters") -# Type definition for type annotation convenience - class TimeStampedPupillDiameters(DataFeatures.TimestampedObjectsList): """Handle timestamped pupill diamters into a list.""" @@ -58,7 +51,7 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a pupill diameter analyser.""" @DataFeatures.PipelineStepMethod - def analyze(self, pupill_diameter: PupillDiameterType) -> any: + def analyze(self, pupill_diameter: PupillDiameter) -> any: """Analyze pupill diameter from successive timestamped pupill diameters.""" raise NotImplementedError('analyze() method not implemented') diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py index a315def..26a63ea 100644 --- a/src/argaze/utils/UtilsFeatures.py +++ b/src/argaze/utils/UtilsFeatures.py @@ -16,7 +16,6 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Tuple import time import types import traceback @@ -121,7 +120,7 @@ class TimeProbe(): self.__lap_counter = 0 self.__elapsed_time = 0 - def lap(self) -> Tuple[float, int, float]: + def lap(self) -> tuple[float, int, float]: """ Get lap info. diff --git a/src/argaze/utils/demo_aruco_markers_run.py b/src/argaze/utils/demo_aruco_markers_run.py index d43e6a9..f5bc756 100644 --- a/src/argaze/utils/demo_aruco_markers_run.py +++ b/src/argaze/utils/demo_aruco_markers_run.py @@ -41,95 +41,95 @@ args = parser.parse_args() def main(): # Load ArUcoCamera - aruco_camera = ArUcoCamera.ArUcoCamera.from_json(args.configuration) + with ArUcoCamera.ArUcoCamera.from_json(args.configuration) as aruco_camera: - if args.verbose: + if args.verbose: - print(aruco_camera) + print(aruco_camera) - # Create a window to display ArUcoCamera - cv2.namedWindow(aruco_camera.name, cv2.WINDOW_AUTOSIZE) + # Create a window to display ArUcoCamera + cv2.namedWindow(aruco_camera.name, cv2.WINDOW_AUTOSIZE) - # Init timestamp - start_time = time.time() + # Init timestamp + start_time = time.time() - # Prepare gaze analysis assessment - call_chrono = UtilsFeatures.TimeProbe() - call_chrono.start() + # Prepare gaze analysis assessment + call_chrono = UtilsFeatures.TimeProbe() + call_chrono.start() - gaze_positions_frequency = 0 - gaze_analysis_time = 0 + gaze_positions_frequency = 0 + gaze_analysis_time = 0 - # Fake gaze position with mouse pointer - def on_mouse_event(event, x, y, flags, param): + # Fake gaze position with mouse pointer + def on_mouse_event(event, x, y, flags, param): - nonlocal gaze_positions_frequency - nonlocal gaze_analysis_time - - # Assess gaze analysis - lap_time, nb_laps, elapsed_time = call_chrono.lap() + nonlocal gaze_positions_frequency + nonlocal gaze_analysis_time + + # Assess gaze analysis + lap_time, nb_laps, elapsed_time = call_chrono.lap() - if elapsed_time > 1e3: + if elapsed_time > 1e3: - gaze_positions_frequency = nb_laps - call_chrono.restart() + gaze_positions_frequency = nb_laps + call_chrono.restart() - # Edit millisecond timestamp - timestamp = int((time.time() - start_time) * 1e3) + # Edit millisecond timestamp + timestamp = int((time.time() - start_time) * 1e3) - #try: + #try: - # Project gaze position into camera - aruco_camera.look(GazeFeatures.GazePosition((x, y), timestamp=timestamp)) + # Project gaze position into camera + aruco_camera.look(GazeFeatures.GazePosition((x, y), timestamp=timestamp)) - # Assess gaze analysis - gaze_analysis_time = aruco_camera.execution_times['look'] + # Assess gaze analysis + gaze_analysis_time = aruco_camera.execution_times['look'] - #except Exception as e: + #except Exception as e: - # print(e) - # gaze_analysis_time = 0 + # print(e) + # gaze_analysis_time = 0 - # Attach mouse callback to window - cv2.setMouseCallback(aruco_camera.name, on_mouse_event) + # Attach mouse callback to window + cv2.setMouseCallback(aruco_camera.name, on_mouse_event) - # Prepare video fps assessment - video_fps = 0 - video_chrono = UtilsFeatures.TimeProbe() - video_chrono.start() + # Prepare video fps assessment + video_fps = 0 + video_chrono = UtilsFeatures.TimeProbe() + video_chrono.start() - # Prepare visualisation time assessment - visualisation_time = 0 + # Prepare visualisation time assessment + visualisation_time = 0 - # Enable camera video capture into separate thread - video_capture = cv2.VideoCapture(int(args.source) if args.source.isdecimal() else args.source) + # Enable camera video capture into separate thread + video_capture = cv2.VideoCapture(int(args.source) if args.source.isdecimal() else args.source) - # Waiting for 'ctrl+C' interruption - with contextlib.suppress(KeyboardInterrupt): + # Waiting for 'ctrl+C' interruption + with contextlib.suppress(KeyboardInterrupt): - # Assess capture time - capture_start = time.time() + # Assess capture time + capture_start = time.time() - # Capture images - while video_capture.isOpened(): + # Capture images + while video_capture.isOpened(): - # Read video image - success, video_image = video_capture.read() + # Read video image + success, video_image = video_capture.read() - # Assess capture time - capture_time = int((time.time() - capture_start) * 1e3) + # Assess capture time + capture_time = int((time.time() - capture_start) * 1e3) - if success: + if success: - # Assess video fps - lap_time, nb_laps, elapsed_time = video_chrono.lap() + # Assess video fps + lap_time, nb_laps, elapsed_time = video_chrono.lap() - if elapsed_time > 1e3: + if elapsed_time > 1e3: - video_fps = nb_laps - video_chrono.restart() + video_fps = nb_laps + video_chrono.restart() - try: + #try: # Detect and project AR features aruco_camera.watch(video_image, timestamp=capture_time) @@ -137,58 +137,58 @@ def main(): # Detection suceeded exception = None - # Write errors - except Exception as e: + # Write errors + #except Exception as e: - exception = e + # exception = e - # Assess visualisation time - visualisation_start = time.time() + # Assess visualisation time + visualisation_start = time.time() - # Get ArUcoCamera frame image - aruco_camera_image = aruco_camera.image() + # Get ArUcoCamera frame image + aruco_camera_image = aruco_camera.image() - # Get execution times - detection_time = aruco_camera.aruco_detector.execution_times['detect_markers'] - projection_time = aruco_camera.execution_times['watch'] - detection_time + # Get execution times + detection_time = aruco_camera.aruco_detector.execution_times['detect_markers'] + projection_time = aruco_camera.execution_times['watch'] - detection_time - # Write time info - cv2.rectangle(aruco_camera_image, (0, 0), (aruco_camera.size[0], 100), (63, 63, 63), -1) - cv2.putText(aruco_camera_image, f'{video_fps} FPS | Capture {capture_time}ms | Detection {int(detection_time)}ms | Projection {int(projection_time)}ms | Visualisation {visualisation_time}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - cv2.putText(aruco_camera_image, f'{gaze_positions_frequency} gaze positions/s | Gaze analysis {gaze_analysis_time:.2f}ms', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + # Write time info + cv2.rectangle(aruco_camera_image, (0, 0), (aruco_camera.size[0], 100), (63, 63, 63), -1) + cv2.putText(aruco_camera_image, f'{video_fps} FPS | Capture {capture_time}ms | Detection {int(detection_time)}ms | Projection {int(projection_time)}ms | Visualisation {visualisation_time}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(aruco_camera_image, f'{gaze_positions_frequency} gaze positions/s | Gaze analysis {gaze_analysis_time:.2f}ms', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - # Handle exceptions - if exception is not None: + # Handle exceptions + if exception is not None: - cv2.rectangle(aruco_camera_image, (0, 100), (aruco_camera.size[0], 80), (127, 127, 127), -1) - cv2.putText(aruco_camera_image, f'error: {exception}', (20, 140), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.rectangle(aruco_camera_image, (0, 100), (aruco_camera.size[0], 80), (127, 127, 127), -1) + cv2.putText(aruco_camera_image, f'error: {exception}', (20, 140), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - # Write hint - cv2.putText(aruco_camera_image, 'Mouve mouse pointer over gray rectangle area', (20, aruco_camera.size[1]-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + # Write hint + cv2.putText(aruco_camera_image, 'Mouve mouse pointer over gray rectangle area', (20, aruco_camera.size[1]-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - # Display ArUcoCamera frame image - cv2.imshow(aruco_camera.name, aruco_camera_image) + # Display ArUcoCamera frame image + cv2.imshow(aruco_camera.name, aruco_camera_image) - # Draw and display each scene frames - for scene_frame in aruco_camera.scene_frames(): + # Draw and display each scene frames + for scene_frame in aruco_camera.scene_frames(): - # Display scene frame - cv2.imshow(f'{scene_frame.parent.name}:{scene_frame.name}', scene_frame.image()) + # Display scene frame + cv2.imshow(f'{scene_frame.parent.name}:{scene_frame.name}', scene_frame.image()) - else: + else: - # Assess visualisation time - visualisation_start = time.time() + # Assess visualisation time + visualisation_start = time.time() - # Stop by pressing 'Esc' key - # NOTE: on MacOS, cv2.waitKey(1) waits ~40ms - if cv2.waitKey(1) == 27: + # Stop by pressing 'Esc' key + # NOTE: on MacOS, cv2.waitKey(1) waits ~40ms + if cv2.waitKey(1) == 27: - # Close camera video capture - video_capture.release() + # Close camera video capture + video_capture.release() - # Assess visualisation time - visualisation_time = int((time.time() - visualisation_start) * 1e3) + # Assess visualisation time + visualisation_time = int((time.time() - visualisation_start) * 1e3) # Stop image display cv2.destroyAllWindows() diff --git a/src/argaze/utils/demo_data/demo_aruco_markers_setup.json b/src/argaze/utils/demo_data/demo_aruco_markers_setup.json index 7c0dd2f..9a95524 100644 --- a/src/argaze/utils/demo_data/demo_aruco_markers_setup.json +++ b/src/argaze/utils/demo_data/demo_aruco_markers_setup.json @@ -1,7 +1,6 @@ { "name": "demo_camera", "size": [1280, 720], - "provider": "provider_setup.json", "aruco_detector": { "dictionary": "DICT_APRILTAG_16h5", "parameters": { |