diff options
24 files changed, 562 insertions, 533 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 7cc1b9d..56a941d 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -876,6 +876,7 @@ class ArScene(DataFeatures.PipelineStepObject): Define abstract Augmented Reality scene with ArLayers and ArFrames inside. """ + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArScene""" @@ -1303,6 +1304,7 @@ class ArContext(DataFeatures.PipelineStepObject): Define class to ... """ + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): diff --git a/src/argaze/ArUcoMarkers/ArUcoBoard.py b/src/argaze/ArUcoMarkers/ArUcoBoard.py index 74dad94..a6d8b02 100644 --- a/src/argaze/ArUcoMarkers/ArUcoBoard.py +++ b/src/argaze/ArUcoMarkers/ArUcoBoard.py @@ -17,6 +17,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" from dataclasses import dataclass, field +from typing import Sequence import cv2 as cv import cv2.aruco as aruco @@ -49,13 +50,13 @@ class ArUcoBoard(): self.model = aruco.CharucoBoard((self.columns, self.rows), self.square_size/100., self.marker_size/100., self.dictionary.markers) @property - def identifiers(self) -> list[int]: + def identifiers(self) -> Sequence[int]: """Get board markers identifiers.""" return self.model.getIds() @property - def size(self)-> int: + def size(self) -> Sequence[int]: """Get numbers of columns and rows.""" return self.model.getChessboardSize() diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py index ce7e38c..cd8ff20 100644 --- a/src/argaze/ArUcoMarkers/ArUcoDetector.py +++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py @@ -126,6 +126,7 @@ class DetectorParameters(): class ArUcoDetector(DataFeatures.PipelineStepObject): """OpenCV ArUco library wrapper.""" + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArUcoDetector.""" @@ -334,7 +335,7 @@ class Observer(): self.__detected_ids = [] @property - def metrics(self) -> tuple[int, dict]: + def metrics(self) -> tuple[int, int, dict]: """Get marker detection metrics. Returns: diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py index 568b251..a6f7b43 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py @@ -79,6 +79,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject): Handle group of ArUco markers as one unique spatial entity and estimate its pose. """ + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): """Initialize ArUcoMarkersGroup""" diff --git a/src/argaze/ArUcoMarkers/__init__.py b/src/argaze/ArUcoMarkers/__init__.py index 0ca48cc..b7b0bf8 100644 --- a/src/argaze/ArUcoMarkers/__init__.py +++ b/src/argaze/ArUcoMarkers/__init__.py @@ -1,4 +1,6 @@ """ -Handle [OpenCV ArUco markers](https://docs.opencv.org/4.x/d5/dae/tutorial_aruco_detection.html): generate and detect markers, calibrate camera, describe scene, ... +Handle [OpenCV ArUco markers](https://docs.opencv.org/4.x/d5/dae/tutorial_aruco_detection.html): generate and detect +markers, calibrate camera, describe scene, ... """ -__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoOpticCalibrator', 'ArUcoDetector', 'ArUcoMarkersGroup', 'ArUcoCamera', 'ArUcoScene', 'utils']
\ No newline at end of file +__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoOpticCalibrator', 'ArUcoDetector', + 'ArUcoMarkersGroup', 'ArUcoCamera', 'ArUcoScene', 'utils'] diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py index 8283e2e..2c8f003 100644 --- a/src/argaze/AreaOfInterest/AOI2DScene.py +++ b/src/argaze/AreaOfInterest/AOI2DScene.py @@ -25,6 +25,7 @@ import cv2 import numpy from xml.dom import minidom + class AOI2DScene(AOIFeatures.AOIScene): """Define AOI 2D scene.""" @@ -53,12 +54,11 @@ class AOI2DScene(AOIFeatures.AOIScene): # Load SVG path for path in description_file.getElementsByTagName('path'): - # Convert d-string into array d_string = path.getAttribute('d') - assert(d_string[0] == 'M') - assert(d_string[-1] == 'Z') + assert (d_string[0] == 'M') + assert (d_string[-1] == 'Z') points = [(float(x), float(y)) for x, y in [p.split(',') for p in d_string[1:-1].split('L')]] @@ -66,7 +66,6 @@ class AOI2DScene(AOIFeatures.AOIScene): # Load SVG rect for rect in description_file.getElementsByTagName('rect'): - # Convert rect element into dict rect_dict = { "Rectangle": { @@ -81,7 +80,6 @@ class AOI2DScene(AOIFeatures.AOIScene): # Load SVG circle for circle in description_file.getElementsByTagName('circle'): - # Convert circle element into dict circle_dict = { "Circle": { @@ -95,7 +93,6 @@ class AOI2DScene(AOIFeatures.AOIScene): # Load SVG ellipse for ellipse in description_file.getElementsByTagName('ellipse'): - # Convert ellipse element into dict ellipse_dict = { "Ellipse": { @@ -130,7 +127,7 @@ class AOI2DScene(AOIFeatures.AOIScene): if draw_aoi: aoi.draw(image, **draw_aoi) - def raycast(self, pointer:tuple) -> tuple[str, "AOIFeatures.AreaOfInterest", bool]: + def raycast(self, pointer: tuple) -> tuple[str, "AOIFeatures.AreaOfInterest", bool]: """Iterate over aoi to know which aoi is matching the given pointer position. Returns: aoi name @@ -139,30 +136,31 @@ class AOI2DScene(AOIFeatures.AOIScene): """ for name, aoi in self.items(): - matching = aoi.contains_point(pointer) yield name, aoi, matching - def draw_raycast(self, image: numpy.array, pointer:tuple, exclude=[], base_color=(0, 0, 255), matching_color=(0, 255, 0)): + def draw_raycast(self, image: numpy.array, pointer: tuple, exclude=[], base_color=(0, 0, 255), + matching_color=(0, 255, 0)): """Draw AOI with their matching status.""" for name, aoi, matching in self.raycast(pointer): if name in exclude: continue - + color = matching_color if matching else base_color if matching: - top_left_corner_pixel = numpy.rint(aoi.clockwise()[0]).astype(int) - cv2.putText(image, name, top_left_corner_pixel, cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, name, top_left_corner_pixel, cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, + cv2.LINE_AA) # Draw form aoi.draw(image, color) - def circlecast(self, center:tuple, radius:float) -> tuple[str, "AOIFeatures.AreaOfInterest", numpy.array, float, float]: + def circlecast(self, center: tuple, radius: float) -> tuple[ + str, "AOIFeatures.AreaOfInterest", numpy.array, float, float]: """Iterate over areas to know which aoi is matched circle. Returns: aoi name @@ -173,7 +171,6 @@ class AOI2DScene(AOIFeatures.AOIScene): """ for name, aoi in self.items(): - matched_region, aoi_ratio, circle_ratio = aoi.circle_intersection(center, radius) yield name, aoi, matched_region, aoi_ratio, circle_ratio @@ -211,6 +208,7 @@ class AOI2DScene(AOIFeatures.AOIScene): return aoi2D_scene ''' + def dimensionalize(self, rectangle_3d: AOIFeatures.AreaOfInterest, size: tuple) -> AOI3DScene.AOI3DScene: """ Convert to 3D scene considering it is inside of 3D rectangular frame. @@ -223,8 +221,8 @@ class AOI2DScene(AOIFeatures.AOIScene): AOI 3D scene """ - assert(rectangle_3d.dimension == 3) - assert(rectangle_3d.points_number == 4) + assert (rectangle_3d.dimension == 3) + assert (rectangle_3d.points_number == 4) # Vectorize outter_axis function vfunc = numpy.vectorize(rectangle_3d.outter_axis) @@ -233,7 +231,6 @@ class AOI2DScene(AOIFeatures.AOIScene): aoi3D_scene = AOI3DScene.AOI3DScene() for name, aoi2D in self.items(): - X, Y = (aoi2D / size).T aoi3D_scene[name] = numpy.array(vfunc(X, Y)).T.view(AOIFeatures.AreaOfInterest) diff --git a/src/argaze/AreaOfInterest/AOI3DScene.py b/src/argaze/AreaOfInterest/AOI3DScene.py index 762dab0..1964d23 100644 --- a/src/argaze/AreaOfInterest/AOI3DScene.py +++ b/src/argaze/AreaOfInterest/AOI3DScene.py @@ -178,8 +178,8 @@ class AOI3DScene(AOIFeatures.AOIScene): """Get AOI which are inside and out a given cone field. !!! note - **By default** - The cone have its tip at origin and its base oriented to positive Z axis. + **By default** + The cone have its tip at origin and its base oriented to positive Z axis. Returns: scene inside the cone diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index 88c6feb..7da5bb5 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -31,634 +31,629 @@ from argaze import DataFeatures class AreaOfInterest(numpy.ndarray): - """Define Area Of Interest as an array of points of any dimension.""" + """Define Area Of Interest as an array of points of any dimension.""" - def __new__(cls, points: numpy.array = numpy.empty(0)) -> Self: - """View casting inheritance.""" + def __new__(cls, points: numpy.array = numpy.empty(0)): + """View casting inheritance.""" - return numpy.array(points).view(AreaOfInterest) + return numpy.array(points).view(AreaOfInterest) - def __repr__(self): - """String representation""" + def __repr__(self): + """String representation""" - return repr(self.tolist()) + return repr(self.tolist()) - def __str__(self): - """String display""" + def __str__(self): + """String display""" - return repr(self.tolist()) + return repr(self.tolist()) - @classmethod - def from_dict(cls, aoi_data: dict) -> Self: - """Load attributes from dictionary. + @classmethod + def from_dict(cls, aoi_data: dict) -> Self: + """Load attributes from dictionary. - Parameters: - aoi_data: dictionary with attributes to load - """ + Parameters: + aoi_data: dictionary with attributes to load + """ - # Get first and unique shape - # TODO: allow multiple shapes to describe more complex AOI - shape, shape_data = aoi_data.popitem() + # Get first and unique shape + # TODO: allow multiple shapes to describe more complex AOI + shape, shape_data = aoi_data.popitem() - if shape == 'Rectangle': + if shape == 'Rectangle': - x = shape_data.pop('x') - y = shape_data.pop('y') - width = shape_data.pop('width') - height = shape_data.pop('height') + x = shape_data.pop('x') + y = shape_data.pop('y') + width = shape_data.pop('width') + height = shape_data.pop('height') - points = [[x, y], [x+width, y], [x+width, y+height], [x, y+height]] + points = [[x, y], [x + width, y], [x + width, y + height], [x, y + height]] - return AreaOfInterest(points) + return AreaOfInterest(points) - elif shape == 'Circle': + elif shape == 'Circle': - cx = shape_data.pop('cx') - cy = shape_data.pop('cy') - radius = shape_data.pop('radius') + cx = shape_data.pop('cx') + cy = shape_data.pop('cy') + radius = shape_data.pop('radius') - # TODO: Use pygeos - N = 32 - points = [(math.cos(2*math.pi / N*x) * radius + cx, math.sin(2*math.pi / N*x) * radius + cy) for x in range(0, N+1)] + # TODO: Use pygeos + N = 32 + points = [(math.cos(2 * math.pi / N * x) * radius + cx, math.sin(2 * math.pi / N * x) * radius + cy) for x + in range(0, N + 1)] - return AreaOfInterest(points) + return AreaOfInterest(points) - elif shape == 'Ellipse': + elif shape == 'Ellipse': - cx = shape_data.pop('cx') - cy = shape_data.pop('cy') - rx = shape_data.pop('rx') - ry = shape_data.pop('ry') + cx = shape_data.pop('cx') + cy = shape_data.pop('cy') + rx = shape_data.pop('rx') + ry = shape_data.pop('ry') - # TODO: Use pygeos - N = 32 - points = [(math.cos(2*math.pi / N*x) * rx + cx, math.sin(2*math.pi / N*x) * ry + cy) for x in range(0, N+1)] + # TODO: Use pygeos + N = 32 + points = [(math.cos(2 * math.pi / N * x) * rx + cx, math.sin(2 * math.pi / N * x) * ry + cy) for x in + range(0, N + 1)] - @property - def dimension(self) -> int: - """Number of axis coding area points positions.""" - return self.shape[1] + @property + def dimension(self) -> int: + """Number of axis coding area points positions.""" + return self.shape[1] - @property - def points_number(self) -> int: - """Number of points defining the area.""" - return self.shape[0] + @property + def points_number(self) -> int: + """Number of points defining the area.""" + return self.shape[0] - def is_empty(self) -> bool: - """Is AOI empty ?""" - return self.shape[0] == 0 + def is_empty(self) -> bool: + """Is AOI empty ?""" + return self.shape[0] == 0 - @property - def bounds(self) -> numpy.array: - """Get area's bounds.""" - min_bounds = numpy.min(self, axis=0) - max_bounds = numpy.max(self, axis=0) + @property + def bounds(self) -> numpy.array: + """Get area's bounds.""" + min_bounds = numpy.min(self, axis=0) + max_bounds = numpy.max(self, axis=0) - return numpy.array([min_bounds, max_bounds]) + return numpy.array([min_bounds, max_bounds]) - @property - def center(self) -> numpy.array: - """Center of mass.""" - return self.mean(axis=0) + @property + def center(self) -> numpy.array: + """Center of mass.""" + return self.mean(axis=0) - @property - def size(self) -> numpy.array: - """Get scene size.""" - min_bounds, max_bounds = self.bounds + @property + def size(self) -> numpy.array: + """Get scene size.""" + min_bounds, max_bounds = self.bounds - return max_bounds - min_bounds + return max_bounds - min_bounds - @property - def area(self) -> float: - """Area of the polygon defined by aoi's points.""" - return Polygon(self).area + @property + def area(self) -> float: + """Area of the polygon defined by aoi's points.""" + return Polygon(self).area - @property - def bounding_box(self) -> numpy.array: - """Get area's bounding box. - !!! warning - Available for 2D AOI only.""" + @property + def bounding_box(self) -> numpy.array: + """Get area's bounding box. + !!! warning + Available for 2D AOI only.""" - assert(self.points_number > 1) - assert(self.dimension == 2) + assert (self.points_number > 1) + assert (self.dimension == 2) - min_x, min_y = numpy.min(self, axis=0) - max_x, max_y = numpy.max(self, axis=0) + min_x, min_y = numpy.min(self, axis=0) + max_x, max_y = numpy.max(self, axis=0) - return numpy.array([(min_x, min_y), (max_x, min_y), (max_x, max_y), (min_x, max_y)]) + return numpy.array([(min_x, min_y), (max_x, min_y), (max_x, max_y), (min_x, max_y)]) - def clockwise(self) -> Self: - """Get area points in clockwise order. - !!! warning - Available for 2D AOI only.""" + def clockwise(self) -> Self: + """Get area points in clockwise order. + !!! warning + Available for 2D AOI only.""" - assert(self.dimension == 2) + assert (self.dimension == 2) - O = self.center - OP = (self - O) / numpy.linalg.norm(self - O) - angles = numpy.arctan2(OP[:, 1], OP[:, 0]) + O = self.center + OP = (self - O) / numpy.linalg.norm(self - O) + angles = numpy.arctan2(OP[:, 1], OP[:, 0]) - return self[numpy.argsort(angles)] + return self[numpy.argsort(angles)] - def contains_point(self, point: tuple) -> bool: - """Is a point inside area? - !!! warning - Available for 2D AOI only. - !!! danger - The AOI points must be sorted in clockwise order.""" + def contains_point(self, point: tuple) -> bool: + """Is a point inside area? + !!! warning + Available for 2D AOI only. + !!! danger + The AOI points must be sorted in clockwise order.""" - assert(self.dimension == 2) - assert(len(point) == self.dimension) + assert (self.dimension == 2) + assert (len(point) == self.dimension) - return mpath.Path(self).contains_points([point])[0] + return bool(mpath.Path(self).contains_points([point])[0]) - def inner_axis(self, x: float, y: float) -> tuple: - """Transform a point coordinates from global axis to AOI axis. - !!! warning - Available for 2D AOI only. - !!! danger - The AOI points must be sorted in clockwise order.""" + def inner_axis(self, x: float, y: float) -> tuple: + """Transform a point coordinates from global axis to AOI axis. + !!! warning + Available for 2D AOI only. + !!! danger + The AOI points must be sorted in clockwise order.""" - assert(self.dimension == 2) + assert (self.dimension == 2) - Src = self - Src_origin = Src[0] - Src = (Src - Src_origin).reshape((len(Src)), 2).astype(numpy.float32) + Src = self + Src_origin = Src[0] + Src = (Src - Src_origin).reshape((len(Src)), 2).astype(numpy.float32) - Dst = numpy.array([[0., 0.], [1., 0.], [1., 1.], [0., 1.]]).astype(numpy.float32) + Dst = numpy.array([[0., 0.], [1., 0.], [1., 1.], [0., 1.]]).astype(numpy.float32) - P = cv2.getPerspectiveTransform(Src, Dst) - X = numpy.append(numpy.array(numpy.array([x, y]) - Src_origin), [1.0]).astype(numpy.float32) - Y = numpy.dot(P, X) + P = cv2.getPerspectiveTransform(Src, Dst) + X = numpy.append(numpy.array(numpy.array([x, y]) - Src_origin), [1.0]).astype(numpy.float32) + Y = numpy.dot(P, X) - La = (Y/Y[2])[:-1] + La = (Y / Y[2])[:-1] - return tuple(numpy.around(La, 4)) + return tuple(numpy.around(La, 4)) - def outter_axis(self, x: float, y: float) -> tuple: - """Transform a point coordinates from AOI axis to global axis. - !!! danger - The AOI points must be sorted in clockwise order. - !!! danger - The AOI must be a rectangle.""" + def outter_axis(self, x: float, y: float) -> tuple: + """Transform a point coordinates from AOI axis to global axis. + !!! danger + The AOI points must be sorted in clockwise order. + !!! danger + The AOI must be a rectangle. + """ - # Origin point - O = self[0] + # Origin point + O = self[0] - # Horizontal axis vector - H = self[1] - self[0] + # Horizontal axis vector + H = self[1] - self[0] - # Vertical axis vector - V = self[3] - self[0] + # Vertical axis vector + V = self[3] - self[0] - return tuple(O + x * H + y * V) + return tuple(O + x * H + y * V) - def circle_intersection(self, center: tuple, radius: float) -> tuple[numpy.array, float, float]: - """Get intersection shape with a circle, intersection area / AOI area ratio and intersection area / circle area ratio. - !!! warning - Available for 2D AOI only. - Returns: - intersection shape - intersection aoi ratio - intersection circle ratio - """ + def circle_intersection(self, center: tuple, radius: float) -> tuple[numpy.array, float, float]: + """Get intersection shape with a circle, intersection area / AOI area ratio and intersection area / circle area ratio. + !!! warning + Available for 2D AOI only. - assert(self.dimension == 2) + Returns: + intersection shape + intersection aoi ratio + intersection circle ratio + """ - self_polygon = Polygon(self) - args_circle = Point(center).buffer(radius) + assert (self.dimension == 2) - if self_polygon.intersects(args_circle): + self_polygon = Polygon(self) + args_circle = Point(center).buffer(radius) - intersection = self_polygon.intersection(args_circle) + if self_polygon.intersects(args_circle): - intersection_array = numpy.array([list(xy) for xy in intersection.exterior.coords[:]]).astype(numpy.float32).view(AreaOfInterest) + intersection = self_polygon.intersection(args_circle) - return intersection_array, intersection.area / self_polygon.area, intersection.area / args_circle.area + intersection_array = numpy.array([list(xy) for xy in intersection.exterior.coords[:]]).astype( + numpy.float32).view(AreaOfInterest) - else: + return intersection_array, intersection.area / self_polygon.area, intersection.area / args_circle.area - empty_array = numpy.array([list([])]).astype(numpy.float32).view(AreaOfInterest) + else: - return empty_array, 0., 0. + empty_array = numpy.array([list([])]).astype(numpy.float32).view(AreaOfInterest) - def draw(self, image: numpy.array, color, border_size=1): - """Draw 2D AOI into image. - !!! warning - Available for 2D AOI only.""" + return empty_array, 0., 0. - assert(self.dimension == 2) + def draw(self, image: numpy.array, color, border_size=1): + """Draw 2D AOI into image. + !!! warning + Available for 2D AOI only.""" - if len(self) > 1: + assert (self.dimension == 2) - # Draw form - pixels = numpy.rint(self).astype(int) - cv2.line(image, pixels[-1], pixels[0], color, border_size) - for A, B in zip(pixels, pixels[1:]): - cv2.line(image, A, B, color, border_size) + if len(self) > 1: - # Draw center - center_pixel = numpy.rint(self.center).astype(int) - cv2.circle(image, center_pixel, 1, color, -1) + # Draw form + pixels = numpy.rint(self).astype(int) + cv2.line(image, pixels[-1], pixels[0], color, border_size) + for A, B in zip(pixels, pixels[1:]): + cv2.line(image, A, B, color, border_size) -class AOIScene(): - """Define AOI scene as a dictionary of AOI.""" - - def __init__(self, dimension: int, areas: dict = None): - """Initialisation.""" + # Draw center + center_pixel = numpy.rint(self.center).astype(int) + cv2.circle(image, center_pixel, 1, color, -1) - assert(dimension > 0) - super().__init__() +class AOIScene(): + """Define AOI scene as a dictionary of AOI.""" - self.__dimension = dimension - self.__areas = {} + def __init__(self, dimension: int, areas: dict = None): + """Initialisation.""" - # NEVER USE {} as default function argument - if areas is not None: + assert (dimension > 0) - for name, area in areas.items(): - self[name] = AreaOfInterest(area) + super().__init__() - @classmethod - def from_dict(cls, aoi_scene_data: dict) -> Self: - """Load attributes from dictionary. + self.__dimension = dimension + self.__areas = {} - Parameters: - aoi_scene_data: dictionary with attributes to load - """ + # NEVER USE {} as default function argument + if areas is not None: - # Load areas - areas = {} + for name, area in areas.items(): + self[name] = AreaOfInterest(area) - for area_name, area_data in aoi_scene_data.items(): + @classmethod + def from_dict(cls, aoi_scene_data: dict) -> Self: + """Load attributes from dictionary. - if type(area_data) == list: + Parameters: + aoi_scene_data: dictionary with attributes to load + """ - areas[area_name] = AreaOfInterest(area_data) + # Load areas + areas = {} - elif type(area_data) == dict: + for area_name, area_data in aoi_scene_data.items(): - areas[area_name] = AreaOfInterest.from_dict(area_data) + if type(area_data) == list: - # Default dimension is 0 - dimension = 0 + areas[area_name] = AreaOfInterest(area_data) - # Guess dimension from first area dimension (default: 2) - if len(areas) > 0: + elif type(area_data) == dict: - dimension = list(areas.values())[0].dimension + areas[area_name] = AreaOfInterest.from_dict(area_data) - return AOIScene(dimension = dimension, areas = areas) + # Default dimension is 0 + dimension = 0 - @classmethod - def from_json(cls, json_filepath: str) -> Self: - """ - Load attributes from .json file. + # Guess dimension from first area dimension (default: 2) + if len(areas) > 0: + dimension = list(areas.values())[0].dimension - Parameters: - json_filepath: path to json file - """ + return AOIScene(dimension=dimension, areas=areas) - with open(json_filepath) as configuration_file: + @classmethod + def from_json(cls, json_filepath: str) -> Self: + """ + Load attributes from .json file. - return AOIScene.from_dict(json.load(configuration_file)) + Parameters: + json_filepath: path to json file + """ - def __getitem__(self, name) -> AreaOfInterest: - """Get an AOI from the scene.""" + with open(json_filepath) as configuration_file: + return AOIScene.from_dict(json.load(configuration_file)) - return AreaOfInterest(self.__areas[name]) + def __getitem__(self, name) -> AreaOfInterest: + """Get an AOI from the scene.""" - def __setitem__(self, name, aoi: AreaOfInterest): - """Add an AOI to the scene.""" + return AreaOfInterest(self.__areas[name]) - assert(aoi.dimension == self.__dimension) + def __setitem__(self, name, aoi: AreaOfInterest): + """Add an AOI to the scene.""" - self.__areas[name] = AreaOfInterest(aoi) + assert (aoi.dimension == self.__dimension) - # Expose area as an attribute of the class - setattr(self, name, self.__areas[name]) + self.__areas[name] = AreaOfInterest(aoi) - def __delitem__(self, key): - """Remove an AOI from the scene.""" + # Expose area as an attribute of the class + setattr(self, name, self.__areas[name]) - del self.__areas[key] + def __delitem__(self, key): + """Remove an AOI from the scene.""" - # Stop area exposition as an attribute of the class - delattr(self, key) + del self.__areas[key] - def __or__(self, other): - """Merge another scene using | operator.""" + # Stop area exposition as an attribute of the class + delattr(self, key) - assert(other.dimension == self.__dimension) + def __or__(self, other): + """Merge another scene using | operator.""" - merged_areas = dict(self.__areas) - merged_areas.update(other.__areas) + assert (other.dimension == self.__dimension) - return AOIScene(self.dimension, merged_areas) + merged_areas = dict(self.__areas) + merged_areas.update(other.__areas) - def __ror__(self, other): - """Merge another scene using | operator.""" + return AOIScene(self.dimension, merged_areas) - assert(other.dimension == self.__dimension) + def __ror__(self, other): + """Merge another scene using | operator.""" - merged_areas = dict(other.__areas) - merged_areas.update(self.__areas) + assert (other.dimension == self.__dimension) - return AOIScene(self.dimension, merged_areas) + merged_areas = dict(other.__areas) + merged_areas.update(self.__areas) - def __ior__(self, other): - """Merge scene with another scene in-place using |= operator.""" + return AOIScene(self.dimension, merged_areas) - assert(other.dimension == self.__dimension) + def __ior__(self, other): + """Merge scene with another scene in-place using |= operator.""" - self.__areas.update(other.__areas) - self.__dict__.update(other.__areas) + assert (other.dimension == self.__dimension) - return self + self.__areas.update(other.__areas) + self.__dict__.update(other.__areas) - def __len__(self): - """Get number of AOI into scene.""" - return len(self.__areas) + return self - def __repr__(self): - """String representation""" + def __len__(self): + """Get number of AOI into scene.""" + return len(self.__areas) - return str(self.__areas) + def __repr__(self): + """String representation""" - def __add__(self, add_vector) -> Self: - """Add vector to scene.""" + return str(self.__areas) - assert(len(add_vector) == self.__dimension) + def __add__(self, add_vector) -> Self: + """Add vector to scene.""" - for name, area in self.__areas.items(): - - self.__areas[name] = self.__areas[name] + add_vector + assert (len(add_vector) == self.__dimension) - return self + for name, area in self.__areas.items(): + self.__areas[name] = self.__areas[name] + add_vector - # Allow n + scene operation - __radd__ = __add__ + return self - def __sub__(self, sub_vector) -> Self: - """Sub vector to scene.""" + # Allow n + scene operation + __radd__ = __add__ - assert(len(sub_vector) == self.__dimension) + def __sub__(self, sub_vector) -> Self: + """Sub vector to scene.""" - for name, area in self.__areas.items(): - - self.__areas[name] = self.__areas[name] - sub_vector + assert (len(sub_vector) == self.__dimension) - return self + for name, area in self.__areas.items(): + self.__areas[name] = self.__areas[name] - sub_vector - def __rsub__(self, rsub_vector) -> Self: - """RSub vector to scene.""" + return self - assert(len(rsub_vector) == self.__dimension) + def __rsub__(self, rsub_vector) -> Self: + """RSub vector to scene.""" - for name, area in self.__areas.items(): - - self.__areas[name] = rsub_vector - self.__areas[name] + assert (len(rsub_vector) == self.__dimension) - return self + for name, area in self.__areas.items(): + self.__areas[name] = rsub_vector - self.__areas[name] - def __mul__(self, scale_vector) -> Self: - """Scale scene by a vector.""" + return self - assert(len(scale_vector) == self.__dimension) + def __mul__(self, scale_vector) -> Self: + """Scale scene by a vector.""" - for name, area in self.__areas.items(): - - self.__areas[name] = self.__areas[name] * scale_vector + assert (len(scale_vector) == self.__dimension) - return self + for name, area in self.__areas.items(): + self.__areas[name] = self.__areas[name] * scale_vector - # Allow n * scene operation - __rmul__ = __mul__ + return self - def __truediv__(self, div_vector) -> Self: + # Allow n * scene operation + __rmul__ = __mul__ - assert(len(div_vector) == self.__dimension) + def __truediv__(self, div_vector) -> Self: - for name, area in self.__areas.items(): - - self.__areas[name] = self.__areas[name] / div_vector + assert (len(div_vector) == self.__dimension) - return self + for name, area in self.__areas.items(): + self.__areas[name] = self.__areas[name] / div_vector - def items(self) -> tuple[str, AreaOfInterest]: - """Iterate over areas.""" + return self - return self.__areas.items() + def items(self) -> tuple[str, AreaOfInterest]: + """Iterate over areas.""" - def keys(self) -> list[str]: - """Get areas name.""" + return self.__areas.items() - return self.__areas.keys() + def keys(self) -> list[str]: + """Get areas name.""" - @property - def dimension(self) -> int: - """Dimension of the AOI in scene.""" + return list(self.__areas.keys()) - return self.__dimension + @property + def dimension(self) -> int: + """Dimension of the AOI in scene.""" - def expand(self) -> Self: - """Add 1 dimension to the AOIs in scene.""" + return self.__dimension - new_areas = {} + def expand(self) -> Self: + """Add 1 dimension to the AOIs in scene.""" - for name, area in self.__areas.items(): + new_areas = {} - zeros = numpy.zeros((len(self.__areas[name]), 1)) - new_areas[name] = numpy.concatenate((self.__areas[name], zeros), axis=1) + for name, area in self.__areas.items(): + zeros = numpy.zeros((len(self.__areas[name]), 1)) + new_areas[name] = numpy.concatenate((self.__areas[name], zeros), axis=1) - return AOIScene(dimension = self.__dimension + 1, areas = new_areas) + return AOIScene(dimension=self.__dimension + 1, areas=new_areas) - @property - def bounds(self) -> numpy.array: - """Get scene's bounds.""" + @property + def bounds(self) -> numpy.array: + """Get scene's bounds.""" - all_vertices = [] + all_vertices = [] - for area in self.__areas.values(): - for vertice in area: - all_vertices.append(vertice) + for area in self.__areas.values(): + for vertice in area: + all_vertices.append(vertice) - all_vertices = numpy.array(all_vertices) #.astype(numpy.float32) + all_vertices = numpy.array(all_vertices) #.astype(numpy.float32) - min_bounds = numpy.min(all_vertices, axis=0) - max_bounds = numpy.max(all_vertices, axis=0) + min_bounds = numpy.min(all_vertices, axis=0) + max_bounds = numpy.max(all_vertices, axis=0) - return numpy.array([min_bounds, max_bounds]) + return numpy.array([min_bounds, max_bounds]) - @property - def center(self) -> numpy.array: - """Get scene's center point.""" + @property + def center(self) -> numpy.array: + """Get scene's center point.""" - min_bounds, max_bounds = self.bounds + min_bounds, max_bounds = self.bounds - return (min_bounds + max_bounds) / 2 + return (min_bounds + max_bounds) / 2 - @property - def size(self) -> numpy.array: - """Get scene size.""" + @property + def size(self) -> numpy.array: + """Get scene size.""" - min_bounds, max_bounds = self.bounds + min_bounds, max_bounds = self.bounds - return max_bounds - min_bounds + return max_bounds - min_bounds - def copy(self, exclude: list=None) -> Self: - """Copy scene partly excluding AOI by name.""" + def copy(self, exclude: list = None) -> Self: + """Copy scene partly excluding AOI by name.""" - if exclude is None: - exclude = [] + if exclude is None: + exclude = [] - # noinspection PyArgumentList - scene_copy = type(self)() + # noinspection PyArgumentList + scene_copy = type(self)() - for name, area in self.__areas.items(): - - if name not in exclude: + for name, area in self.__areas.items(): - scene_copy[name] = AreaOfInterest(area) #.astype(numpy.float32).view(AreaOfInterest) + if name not in exclude: + scene_copy[name] = AreaOfInterest(area) #.astype(numpy.float32).view(AreaOfInterest) - return scene_copy + return scene_copy - def clear(self): - """Clear scene.""" + def clear(self): + """Clear scene.""" - self.__areas.clear() + self.__areas.clear() - def __str__(self) -> str: - """ - String representation of pipeline step object. - - Returns: - String representation - """ + def __str__(self) -> str: + """ + String representation of pipeline step object. + + Returns: + String representation + """ - output = '' + output = '' - for name, area in self.__areas.items(): + for name, area in self.__areas.items(): + output += f'{Fore.BLUE}{Style.BRIGHT}{name}{Style.RESET_ALL} ' - output += f'{Fore.BLUE}{Style.BRIGHT}{name}{Style.RESET_ALL} ' - - return output + return output # noinspection PyAttributeOutsideInit class Heatmap(DataFeatures.PipelineStepObject): - """Define image to draw heatmap.""" - - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): + """Define image to draw heatmap.""" - # Init private attributes - self.__size = (1, 1) - self.__buffer = 0 - self.__sigma = 0.05 + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): - @property - def size(self) -> tuple[int, int]: - """Size of heatmap image in pixels.""" - return self.__size + # Init private attributes + self.__size = (1, 1) + self.__buffer = 0 + self.__sigma = 0.05 - # noinspection PyAttributeOutsideInit - @size.setter - def size(self, size: tuple[int, int]): - - self.__size = size - # noinspection PyAttributeOutsideInit - self.__rX, self.__rY = size + @property + def size(self) -> tuple[int, int]: + """Size of heatmap image in pixels.""" + return self.__size - # Init coordinates - # noinspection PyAttributeOutsideInit - self.__Sx = numpy.linspace(0., self.__rX/self.__rY, self.__rX) - # noinspection PyAttributeOutsideInit - self.__Sy = numpy.linspace(0., 1., self.__rY) + # noinspection PyAttributeOutsideInit + @size.setter + def size(self, size: tuple[int, int]): - # Init heatmap image - self.clear() + self.__size = size + # noinspection PyAttributeOutsideInit + self.__rX, self.__rY = size - @property - def sigma(self) -> float: - """Point spread factor.""" - return self.__sigma + # Init coordinates + # noinspection PyAttributeOutsideInit + self.__Sx = numpy.linspace(0., self.__rX / self.__rY, self.__rX) + # noinspection PyAttributeOutsideInit + self.__Sy = numpy.linspace(0., 1., self.__rY) - @sigma.setter - def sigma(self, sigma: float): + # Init heatmap image + self.clear() - self.__sigma = sigma + @property + def sigma(self) -> float: + """Point spread factor.""" + return self.__sigma - @property - def buffer(self) -> int: - """Size of heatmap buffer (0 means no buffering).""" - return self.__buffer + @sigma.setter + def sigma(self, sigma: float): - @buffer.setter - def buffer(self, buffer: int): + self.__sigma = sigma - self.__buffer = buffer + @property + def buffer(self) -> int: + """Size of heatmap buffer (0 means no buffering).""" + return self.__buffer - def point_spread(self, point: tuple): - """Draw gaussian point spread into image.""" + @buffer.setter + def buffer(self, buffer: int): - div = -2 * self.__sigma**2 + self.__buffer = buffer - x = point[0] / self.__rY # we use rY not rX !!! - y = point[1] / self.__rY + def point_spread(self, point: tuple): + """Draw gaussian point spread into image.""" - dX2 = (self.__Sx - x)**2 - dY2 = (self.__Sy - y)**2 + div = -2 * self.__sigma ** 2 - v_dX, v_dY = numpy.array(numpy.meshgrid(dX2, dY2)).reshape(2, -1) + x = point[0] / self.__rY # we use rY not rX !!! + y = point[1] / self.__rY - return numpy.exp((v_dX + v_dY) / div).reshape(self.__rY, self.__rX) + dX2 = (self.__Sx - x) ** 2 + dY2 = (self.__Sy - y) ** 2 - # noinspection PyAttributeOutsideInit - def clear(self): - """Clear heatmap image.""" + v_dX, v_dY = numpy.array(numpy.meshgrid(dX2, dY2)).reshape(2, -1) - # noinspection PyAttributeOutsideInit - self.__point_spread_sum = numpy.zeros((self.__rY, self.__rX)) - # noinspection PyAttributeOutsideInit - self.__point_spread_buffer = [] - # noinspection PyAttributeOutsideInit - self.__point_spread_buffer_size = self.__buffer + return numpy.exp((v_dX + v_dY) / div).reshape(self.__rY, self.__rX) - # noinspection PyAttributeOutsideInit - @DataFeatures.PipelineStepMethod - def update(self, point: tuple): - """Update heatmap image.""" + # noinspection PyAttributeOutsideInit + def clear(self): + """Clear heatmap image.""" - point_spread = self.point_spread(point) + # noinspection PyAttributeOutsideInit + self.__point_spread_sum = numpy.zeros((self.__rY, self.__rX)) + # noinspection PyAttributeOutsideInit + self.__point_spread_buffer = [] + # noinspection PyAttributeOutsideInit + self.__point_spread_buffer_size = self.__buffer - # Sum point spread - self.__point_spread_sum += point_spread + # noinspection PyAttributeOutsideInit + @DataFeatures.PipelineStepMethod + def update(self, point: tuple): + """Update heatmap image.""" - # If point spread buffering enabled - if self.__buffer > 0: + point_spread = self.point_spread(point) - self.__point_spread_buffer.append(point_spread) + # Sum point spread + self.__point_spread_sum += point_spread - # Remove oldest point spread buffer image - if len(self.__point_spread_buffer) > self.buffer: + # If point spread buffering enabled + if self.__buffer > 0: + self.__point_spread_buffer.append(point_spread) - self.__point_spread_sum -= self.__point_spread_buffer.pop(0) + # Remove oldest point spread buffer image + if len(self.__point_spread_buffer) > self.buffer: + self.__point_spread_sum -= self.__point_spread_buffer.pop(0) - # Edit heatmap - gray = (255 * self.__point_spread_sum / numpy.max(self.__point_spread_sum)).astype(numpy.uint8) - # noinspection PyAttributeOutsideInit - self.__image = cv2.applyColorMap(gray, cv2.COLORMAP_JET) + # Edit heatmap + gray = (255 * self.__point_spread_sum / numpy.max(self.__point_spread_sum)).astype(numpy.uint8) + # noinspection PyAttributeOutsideInit + self.__image = cv2.applyColorMap(gray, cv2.COLORMAP_JET) - @DataFeatures.PipelineStepImage - def image(self): - """Get heatmap image.""" - try: + @DataFeatures.PipelineStepImage + def image(self): + """Get heatmap image.""" + try: - return self.__image + return self.__image - except AttributeError: + except AttributeError: - return numpy.zeros((self.__rY, self.__rX, 3)).astype(numpy.uint8) + return numpy.zeros((self.__rY, self.__rX, 3)).astype(numpy.uint8) diff --git a/src/argaze/AreaOfInterest/__init__.py b/src/argaze/AreaOfInterest/__init__.py index a9bb2b2..084550b 100644 --- a/src/argaze/AreaOfInterest/__init__.py +++ b/src/argaze/AreaOfInterest/__init__.py @@ -1,4 +1,4 @@ """ Manage Areas of Interest (AOI) and scenes for 2D and 3D environment. """ -__all__ = ['AOIFeatures', 'AOI2DScene', 'AOI3DScene']
\ No newline at end of file +__all__ = ['AOIFeatures', 'AOI2DScene', 'AOI3DScene'] diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index a7b0a48..4e85aaf 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -330,7 +330,11 @@ class TimestampedObjectsList(list): Timestamped objects are considered to be stored according to their coming time. """ - def __init__(self, ts_object_type: type, ts_objects: list = []): + # noinspection PyMissingConstructor + def __init__(self, ts_object_type: type, ts_objects=None): + + if ts_objects is None: + ts_objects = [] self.__object_type = ts_object_type self.__object_properties = properties(self.__object_type) @@ -396,9 +400,12 @@ class TimestampedObjectsList(list): return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] @classmethod - def from_dataframe(cls, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self: + def from_dataframe(cls, ts_object_type: type, dataframe: pandas.DataFrame, exclude=None) -> Self: """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" + if exclude is None: + exclude = [] + dataframe.drop(exclude, inplace=True, axis=True) assert (dataframe.index.name == 'timestamp') @@ -583,7 +590,7 @@ class SharedObject(TimestampedObject): class TimestampedException(Exception, TimestampedObject): """Wrap exception to keep track of raising timestamp.""" - def __init__(self, exception=Exception, timestamp: int | float = math.nan): + def __init__(self, exception: Exception, timestamp: int | float = math.nan): Exception.__init__(self, exception) TimestampedObject.__init__(self, timestamp) @@ -639,10 +646,10 @@ def PipelineStepInit(method): def wrapper(self, **kwargs): """Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call. - Parameters: - self: - kwargs: any arguments defined by PipelineStepMethodInit. - """ + Parameters: + self: + kwargs: any arguments defined by PipelineStepMethodInit. + """ # Init pipeline step object attributes PipelineStepObject.__init__(self) @@ -811,9 +818,8 @@ def PipelineStepDraw(method): # noinspection PyAttributeOutsideInit class PipelineStepObject(): + """Define class to assess pipeline step methods execution time and observe them. """ - Define class to assess pipeline step methods execution time and observe them. - """ __initialized = False @@ -939,9 +945,9 @@ class PipelineStepObject(): def as_dict(self) -> dict: """Export PipelineStepObject attributes as dictionary. - Returns: - object_data: dictionary with pipeline step object attributes values. - """ + Returns: + object_data: dictionary with pipeline step object attributes values. + """ return { "name": self.__name, "observers": self.__observers @@ -964,12 +970,11 @@ class PipelineStepObject(): #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder) def __str__(self) -> str: + """String representation of pipeline step object. + + Returns: + String representation """ - String representation of pipeline step object. - - Returns: - String representation - """ logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '') @@ -1064,7 +1069,7 @@ class PipelineStepObject(): yield name, getattr(self, name) @property - def children(self) -> object: + def children(self): """Iterate over children pipeline step objects.""" for name, value in self.properties: @@ -1098,7 +1103,8 @@ def PipelineStepMethod(method): PipelineStepMethod must have a timestamp as first argument. """ - def wrapper(self, *args, timestamp: int | float = None, unwrap: bool = False, catch_exceptions: bool = True, **kwargs): + def wrapper(self, *args, timestamp: int | float = None, unwrap: bool = False, catch_exceptions: bool = True, + **kwargs): """Wrap pipeline step method to measure execution time. Parameters: @@ -1117,7 +1123,8 @@ def PipelineStepMethod(method): else: - logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.', get_class_path(self), method.__name__, type(args[0]).__name__) + logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.', + get_class_path(self), method.__name__, type(args[0]).__name__) if unwrap: return method(self, *args, **kwargs) diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index b8451f0..a860e47 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -258,7 +258,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Always return empty gaze movement at least return GazeFeatures.GazeMovement() - def current_fixation(self) -> GazeFeatures.Fixation: + def current_fixation(self) -> GazeFeatures.GazeMovement: if self.__fixation_positions: @@ -267,7 +267,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Always return empty gaze movement at least return GazeFeatures.GazeMovement() - def current_saccade(self) -> GazeFeatures.Saccade: + def current_saccade(self) -> GazeFeatures.GazeMovement: if len(self.__saccade_positions) > 1: diff --git a/src/argaze/GazeAnalysis/KCoefficient.py b/src/argaze/GazeAnalysis/KCoefficient.py index 7e3caab..9980dfe 100644 --- a/src/argaze/GazeAnalysis/KCoefficient.py +++ b/src/argaze/GazeAnalysis/KCoefficient.py @@ -97,7 +97,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__K = 0 @DataFeatures.PipelineStepMethod - def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPath) -> float: + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPath): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index d001688..78cc170 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -250,7 +250,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Always return empty gaze movement at least return GazeFeatures.GazeMovement() - def current_fixation(self) -> GazeFeatures.Fixation: + def current_fixation(self) -> GazeFeatures.GazeMovement: if self.__fixation_positions: @@ -259,7 +259,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Always return empty gaze movement at least return GazeFeatures.GazeMovement() - def current_saccade(self) -> GazeFeatures.Saccade: + def current_saccade(self) -> GazeFeatures.GazeMovement: if len(self.__saccade_positions) > 1: diff --git a/src/argaze/GazeAnalysis/__init__.py b/src/argaze/GazeAnalysis/__init__.py index f0ba9fd..2d05006 100644 --- a/src/argaze/GazeAnalysis/__init__.py +++ b/src/argaze/GazeAnalysis/__init__.py @@ -13,4 +13,4 @@ __all__ = [ 'NearestNeighborIndex', 'ExploreExploitRatio', 'LinearRegression' -]
\ No newline at end of file +] diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index dbeee61..5777a8d 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -93,11 +93,11 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): """ if self.__precision is not None and position.precision is not None: - return GazePosition(numpy.array(self) + numpy.array(position), precision = max(self.__precision, position.precision), timestamp=self.timestamp) + return GazePosition(tuple(numpy.array(self) + numpy.array(position)), precision = max(self.__precision, position.precision), timestamp = self.timestamp) else: - return GazePosition(numpy.array(self) + numpy.array(position), timestamp=self.timestamp) + return GazePosition(tuple(numpy.array(self) + numpy.array(position)), timestamp = self.timestamp) __radd__ = __add__ @@ -112,11 +112,11 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): """ if self.__precision is not None and position.precision is not None: - return GazePosition(numpy.array(self) - numpy.array(position), precision = max(self.__precision, position.precision), timestamp=self.timestamp) + return GazePosition(tuple(numpy.array(self) - numpy.array(position)), precision = max(self.__precision, position.precision), timestamp = self.timestamp) else: - return GazePosition(numpy.array(self) - numpy.array(position), timestamp=self.timestamp) + return GazePosition(tuple(numpy.array(self) - numpy.array(position)), timestamp = self.timestamp) def __rsub__(self, position: Self) -> Self: """Reversed subtract position. @@ -129,11 +129,11 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): """ if self.__precision is not None and position.precision is not None: - return GazePosition(numpy.array(position) - numpy.array(self), precision = max(self.__precision, position.precision), timestamp=self.timestamp) + return GazePosition(tuple(numpy.array(position) - numpy.array(self)), precision = max(self.__precision, position.precision), timestamp = self.timestamp) else: - return GazePosition(numpy.array(position) - numpy.array(self), timestamp=self.timestamp) + return GazePosition(tuple(numpy.array(position) - numpy.array(self)), timestamp = self.timestamp) def __mul__(self, factor: int|float) -> Self: """Multiply position by a factor. @@ -144,7 +144,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): !!! note The returned position timestamp is the self object timestamp. """ - return GazePosition(numpy.array(self) * factor, precision = self.__precision * factor if self.__precision is not None else None, timestamp=self.timestamp) + return GazePosition(tuple(numpy.array(self) * factor), precision = self.__precision * factor if self.__precision is not None else None, timestamp = self.timestamp) def __pow__(self, factor: int|float) -> Self: """Power position by a factor. @@ -155,7 +155,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): !!! note The returned position timestamp is the self object timestamp. """ - return GazePosition(numpy.array(self) ** factor, precision = self.__precision ** factor if self.__precision is not None else None, timestamp=self.timestamp) + return GazePosition(tuple(numpy.array(self) ** factor), precision = self.__precision ** factor if self.__precision is not None else None, timestamp = self.timestamp) def distance(self, gaze_position) -> float: """Distance to another gaze positions.""" @@ -291,6 +291,7 @@ class GazePositionCalibrationFailed(Exception): class GazePositionCalibrator(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a gaze position calibrator algorithm.""" + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): @@ -332,7 +333,7 @@ class GazePositionCalibrator(DataFeatures.PipelineStepObject): raise NotImplementedError('apply() method not implemented') - def draw(self, image: numpy.array): + def draw(self, image: numpy.array, **kwargs): """Draw calibration into image. Parameters: @@ -543,6 +544,7 @@ class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList): class GazeMovementIdentifier(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a gaze movement identifier.""" + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): @@ -821,6 +823,7 @@ class ScanPath(list): class ScanPathAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a scan path analyzer.""" + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): @@ -840,6 +843,7 @@ class ScanPathAnalyzer(DataFeatures.PipelineStepObject): class AOIMatcher(DataFeatures.PipelineStepObject): """Abstract class to define what should provide an AOI matcher algorithm.""" + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): @@ -1108,7 +1112,7 @@ class AOIScanPath(list): self.__movements.append(saccade) - def append_fixation(self, fixation, looked_aoi: str) -> bool: + def append_fixation(self, fixation, looked_aoi: str): """Append new fixation to aoi scan path and return last new aoi scan step if one have been created. !!! warning @@ -1197,6 +1201,7 @@ class AOIScanPath(list): class AOIScanPathAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide an aoi scan path analyzer.""" + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): diff --git a/src/argaze/PupilAnalysis/WorkloadIndex.py b/src/argaze/PupilAnalysis/WorkloadIndex.py index 99427fe..bced982 100644 --- a/src/argaze/PupilAnalysis/WorkloadIndex.py +++ b/src/argaze/PupilAnalysis/WorkloadIndex.py @@ -28,8 +28,13 @@ class PupilDiameterAnalyzer(PupilFeatures.PupilDiameterAnalyzer): reference: base line value. period: identification period length. """ + + @DataFeatures.PipelineStepInit def __init__(self, reference: PupilFeatures.PupilDiameter, period: int|float = 1): + # Init PupilDiameterAnalyzer class + super().__init__() + assert(not math.isnan(reference)) self.__reference = reference @@ -56,7 +61,7 @@ class PupilDiameterAnalyzer(PupilFeatures.PupilDiameterAnalyzer): # Ignore non valid pupil diameter if not math.isnan(pupil_diameter): - return None + return math.nan if pupil_diameter.timestamp - self.__last_ts >= self.__period: diff --git a/src/argaze/PupilAnalysis/__init__.py b/src/argaze/PupilAnalysis/__init__.py index c563968..73b15ee 100644 --- a/src/argaze/PupilAnalysis/__init__.py +++ b/src/argaze/PupilAnalysis/__init__.py @@ -1,4 +1,4 @@ """ Class interface to work with various pupil analysis algorithms. """ -__all__ = ['WorkloadIndex']
\ No newline at end of file +__all__ = ['WorkloadIndex'] diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py index bd01eda..8c0068f 100644 --- a/src/argaze/__init__.py +++ b/src/argaze/__init__.py @@ -1,4 +1,5 @@ """ ArGaze is divided in submodules dedicated to various specifics features. """ -__all__ = ['ArUcoMarkers','AreaOfInterest','ArFeatures','GazeFeatures','GazeAnalysis','PupilFeatures','PupilAnalysis','DataFeatures','utils']
\ No newline at end of file +__all__ = ['ArUcoMarkers', 'AreaOfInterest', 'ArFeatures', 'GazeFeatures', 'GazeAnalysis', 'PupilFeatures', + 'PupilAnalysis', 'DataFeatures', 'utils'] diff --git a/src/argaze/__main__.py b/src/argaze/__main__.py index 9adda75..15a78c1 100644 --- a/src/argaze/__main__.py +++ b/src/argaze/__main__.py @@ -28,23 +28,22 @@ import cv2 # Manage arguments parser = argparse.ArgumentParser(description=__doc__.split('-')[0]) parser.add_argument('context_file', metavar='CONTEXT_FILE', type=str, help='JSON context filepath') -parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') +parser.add_argument('-v', '--verbose', action='store_true', default=False, + help='enable verbose mode to print information in console') args = parser.parse_args() # Manage logging -logging.basicConfig(format = '%(levelname)s: %(message)s', level = logging.DEBUG if args.verbose else logging.INFO) +logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG if args.verbose else logging.INFO) # Load context from JSON file with from_json(args.context_file) as context: # Loaded object must be a subclass of ArContext if not issubclass(type(context), ArContext): - raise TypeError('Loaded object is not a subclass of ArContext') if args.verbose: - print(context) # Create a window to display context @@ -63,15 +62,13 @@ with from_json(args.context_file) as context: if issubclass(type(context.pipeline), ArCamera): for scene_frame in context.pipeline.scene_frames(): - cv2.imshow(scene_frame.name, scene_frame.image()) - + # Key interaction key_pressed = cv2.waitKey(10) # Esc: close window if key_pressed == 27: - raise KeyboardInterrupt() # Stop frame display diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py index 3f2ceda..133809b 100644 --- a/src/argaze/utils/UtilsFeatures.py +++ b/src/argaze/utils/UtilsFeatures.py @@ -117,7 +117,7 @@ class TimeProbe(): return lap_time * 1e3, self.__lap_counter, self.__elapsed_time * 1e3 - def end(self) -> float: + def end(self) -> tuple[float, int]: """ Stop chronometer @@ -163,6 +163,7 @@ def PrintCallStack(method): class FileWriter(DataFeatures.PipelineStepObject): """Write data into a file line by line.""" + # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): diff --git a/src/argaze/utils/__init__.py b/src/argaze/utils/__init__.py index 2cee626..ed61966 100644 --- a/src/argaze/utils/__init__.py +++ b/src/argaze/utils/__init__.py @@ -1,4 +1,4 @@ """ Miscellaneous utilities. """ -__all__ = ['UtilsFeatures', 'contexts']
\ No newline at end of file +__all__ = ['UtilsFeatures', 'contexts'] diff --git a/src/argaze/utils/contexts/PupilLabs.py b/src/argaze/utils/contexts/PupilLabs.py index 9265f2c..43fe47e 100644 --- a/src/argaze/utils/contexts/PupilLabs.py +++ b/src/argaze/utils/contexts/PupilLabs.py @@ -46,10 +46,10 @@ class LiveStream(ArFeatures.ArContext): # Create stop event self.__stop_event = threading.Event() - # Init timestamp + # Init timestamp self.__start_time = time.time() - - # Look for devices. Returns as soon as it has found the first device. + + # Look for devices. Returns as soon as it has found the first device. self.__device = discover_one_device(max_search_duration_seconds=10) if self.__device is None: diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py index f83c1ac..0fba2ff 100644 --- a/src/argaze/utils/contexts/TobiiProGlasses2.py +++ b/src/argaze/utils/contexts/TobiiProGlasses2.py @@ -135,7 +135,7 @@ class PupilCenter(): validity: int index: int - value: tuple((float, float, float)) + value: tuple[(float, float, float)] eye: str # 'right' or 'left' @dataclass @@ -153,7 +153,7 @@ class GazeDirection(): validity: int index: int - value: tuple((float, float, float)) + value: tuple[(float, float, float)] eye: str # 'right' or 'left' @dataclass @@ -163,7 +163,7 @@ class GazePosition(): validity: int index: int l: str # ? - value: tuple((float, float)) + value: tuple[(float, float)] @dataclass class GazePosition3D(): @@ -171,14 +171,14 @@ class GazePosition3D(): validity: int index: int - value: tuple((float, float)) + value: tuple[(float, float)] @dataclass class MarkerPosition(): """Define marker data (marker3d marker2d).""" - value_3d: tuple((float, float, float)) - value_2d: tuple((float, float)) + value_3d: tuple[(float, float, float)] + value_2d: tuple[(float, float)] class TobiiJsonDataParser(): @@ -245,15 +245,18 @@ class TobiiJsonDataParser(): return self.__parse_pupil_or_gaze_map[second_key](status, gaze_index, data) - def __parse_dir_sig(self, status, data): + @staticmethod + def __parse_dir_sig(status, data): return DirSig(data['dir'], data['sig']) - def __parse_pts(self, status, data): + @staticmethod + def __parse_pts(status, data): return PresentationTimeStamp(data['pts']) - def __parse_vts(self, status, data): + @staticmethod + def __parse_vts(status, data): # ts is not sent when recording try: @@ -266,43 +269,53 @@ class TobiiJsonDataParser(): return VideoTimeStamp(data['vts'], ts) - def __parse_event_synch(self, status, data): + @staticmethod + def __parse_event_synch(status, data): return EventSynch(data['evts']) - def __parse_event(self, status, data): + @staticmethod + def __parse_event(status, data): return Event(data['ets'], data['type'], data['tag']) - def __parse_accelerometer(self, status, data): + @staticmethod + def __parse_accelerometer(status, data): return Accelerometer(data['ac']) - def __parse_gyroscope(self, status, data): + @staticmethod + def __parse_gyroscope(status, data): return Gyroscope(data['gy']) - def __parse_pupil_center(self, status, gaze_index, data): + @staticmethod + def __parse_pupil_center(status, gaze_index, data): return PupilCenter(status, gaze_index, data['pc'], data['eye']) - def __parse_pupil_diameter(self, status, gaze_index, data): + @staticmethod + def __parse_pupil_diameter(status, gaze_index, data): return PupilDiameter(status, gaze_index, data['pd'], data['eye']) - def __parse_gaze_direction(self, status, gaze_index, data): + @staticmethod + def __parse_gaze_direction(status, gaze_index, data): return GazeDirection(status, gaze_index, data['gd'], data['eye']) - def __parse_gaze_position(self, status, gaze_index, data): + @staticmethod + def __parse_gaze_position(status, gaze_index, data): return GazePosition(status, gaze_index, data['l'], data['gp']) - def __parse_gaze_position_3d(self, status, gaze_index, data): + @staticmethod + def __parse_gaze_position_3d(status, gaze_index, data): return GazePosition3D(status, gaze_index, data['gp3']) - def __parse_marker_position(self, status, data): + @staticmethod + def __parse_marker_position(status, data): return MarkerPosition(data['marker3d'], data['marker2d']) @@ -518,9 +531,9 @@ class LiveStream(ArFeatures.ArContext): logging.info('%s: %s', key, str(value)) # Store video stream info - self.__video_width = configuration['sys_sc_width'] - self.__video_height = configuration['sys_sc_height'] - self.__video_fps = configuration['sys_sc_fps'] + self.__video_width = int(configuration['sys_sc_width']) + self.__video_height = int(configuration['sys_sc_height']) + self.__video_fps = float(configuration['sys_sc_fps']) # Bind to project if required if self.__project_name is not None: @@ -799,7 +812,7 @@ class LiveStream(ArFeatures.ArContext): time.sleep(1) - def __get_request(self, api_action) -> str: + def __get_request(self, api_action) -> any: """Send a GET request and get data back.""" url = self.__base_url + api_action @@ -820,7 +833,7 @@ class LiveStream(ArFeatures.ArContext): return data - def __post_request(self, api_action, data = None, wait_for_response = True) -> str: + def __post_request(self, api_action, data = None, wait_for_response = True) -> any: """Send a POST request and get result back.""" url = self.__base_url + api_action @@ -880,7 +893,8 @@ class LiveStream(ArFeatures.ArContext): return json_data[key] - def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT): + @staticmethod + def __get_current_datetime(timeformat=TOBII_DATETIME_FORMAT): return datetime.datetime.now().replace(microsecond=0).strftime(timeformat) @@ -1164,22 +1178,22 @@ class PostProcessing(ArFeatures.ArContext): self.__segment = segment @property - def start(self) -> int: + def start(self) -> float: """Start reading timestamp in millisecond.""" return self.__start @start.setter - def start(self, start: int): + def start(self, start: float): self.__start = start @property - def end(self) -> int: + def end(self) -> float: """End reading timestamp in millisecond.""" return self.__end @end.setter - def end(self, end: int): + def end(self, end: float): self.__end = end @@ -1282,7 +1296,7 @@ class PostProcessing(ArFeatures.ArContext): return self - def __next__(self) -> tuple[int, numpy.array, list[tuple[int, object, str]]]: + def __next__(self): data_list = [] video_ts, image = self.__next_video_image() @@ -1300,7 +1314,7 @@ class PostProcessing(ArFeatures.ArContext): return output - def __next_video_image(self) -> tuple[int, numpy.array]: + def __next_video_image(self): image = next(self.__video_file.decode(self.__video_file.streams.video[0])) ts = int(image.time * 1e3) diff --git a/src/argaze/utils/contexts/__init__.py b/src/argaze/utils/contexts/__init__.py index b76cd8b..19b7533 100644 --- a/src/argaze/utils/contexts/__init__.py +++ b/src/argaze/utils/contexts/__init__.py @@ -1,4 +1,4 @@ """ Collection of device interfaces. """ -__all__ = ['TobiiProGlasses2']
\ No newline at end of file +__all__ = ['TobiiProGlasses2'] |