aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/argaze/ArFeatures.py2
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoBoard.py5
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoDetector.py3
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py1
-rw-r--r--src/argaze/ArUcoMarkers/__init__.py6
-rw-r--r--src/argaze/AreaOfInterest/AOI2DScene.py31
-rw-r--r--src/argaze/AreaOfInterest/AOI3DScene.py4
-rw-r--r--src/argaze/AreaOfInterest/AOIFeatures.py845
-rw-r--r--src/argaze/AreaOfInterest/__init__.py2
-rw-r--r--src/argaze/DataFeatures.py47
-rw-r--r--src/argaze/GazeAnalysis/DispersionThresholdIdentification.py4
-rw-r--r--src/argaze/GazeAnalysis/KCoefficient.py2
-rw-r--r--src/argaze/GazeAnalysis/VelocityThresholdIdentification.py4
-rw-r--r--src/argaze/GazeAnalysis/__init__.py2
-rw-r--r--src/argaze/GazeFeatures.py25
-rw-r--r--src/argaze/PupilAnalysis/WorkloadIndex.py7
-rw-r--r--src/argaze/PupilAnalysis/__init__.py2
-rw-r--r--src/argaze/__init__.py3
-rw-r--r--src/argaze/__main__.py11
-rw-r--r--src/argaze/utils/UtilsFeatures.py3
-rw-r--r--src/argaze/utils/__init__.py2
-rw-r--r--src/argaze/utils/contexts/PupilLabs.py6
-rw-r--r--src/argaze/utils/contexts/TobiiProGlasses2.py76
-rw-r--r--src/argaze/utils/contexts/__init__.py2
24 files changed, 562 insertions, 533 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 7cc1b9d..56a941d 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -876,6 +876,7 @@ class ArScene(DataFeatures.PipelineStepObject):
Define abstract Augmented Reality scene with ArLayers and ArFrames inside.
"""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
"""Initialize ArScene"""
@@ -1303,6 +1304,7 @@ class ArContext(DataFeatures.PipelineStepObject):
Define class to ...
"""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
diff --git a/src/argaze/ArUcoMarkers/ArUcoBoard.py b/src/argaze/ArUcoMarkers/ArUcoBoard.py
index 74dad94..a6d8b02 100644
--- a/src/argaze/ArUcoMarkers/ArUcoBoard.py
+++ b/src/argaze/ArUcoMarkers/ArUcoBoard.py
@@ -17,6 +17,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
from dataclasses import dataclass, field
+from typing import Sequence
import cv2 as cv
import cv2.aruco as aruco
@@ -49,13 +50,13 @@ class ArUcoBoard():
self.model = aruco.CharucoBoard((self.columns, self.rows), self.square_size/100., self.marker_size/100., self.dictionary.markers)
@property
- def identifiers(self) -> list[int]:
+ def identifiers(self) -> Sequence[int]:
"""Get board markers identifiers."""
return self.model.getIds()
@property
- def size(self)-> int:
+ def size(self) -> Sequence[int]:
"""Get numbers of columns and rows."""
return self.model.getChessboardSize()
diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py
index ce7e38c..cd8ff20 100644
--- a/src/argaze/ArUcoMarkers/ArUcoDetector.py
+++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py
@@ -126,6 +126,7 @@ class DetectorParameters():
class ArUcoDetector(DataFeatures.PipelineStepObject):
"""OpenCV ArUco library wrapper."""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
"""Initialize ArUcoDetector."""
@@ -334,7 +335,7 @@ class Observer():
self.__detected_ids = []
@property
- def metrics(self) -> tuple[int, dict]:
+ def metrics(self) -> tuple[int, int, dict]:
"""Get marker detection metrics.
Returns:
diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py
index 568b251..a6f7b43 100644
--- a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py
+++ b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py
@@ -79,6 +79,7 @@ class ArUcoMarkersGroup(DataFeatures.PipelineStepObject):
Handle group of ArUco markers as one unique spatial entity and estimate its pose.
"""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
"""Initialize ArUcoMarkersGroup"""
diff --git a/src/argaze/ArUcoMarkers/__init__.py b/src/argaze/ArUcoMarkers/__init__.py
index 0ca48cc..b7b0bf8 100644
--- a/src/argaze/ArUcoMarkers/__init__.py
+++ b/src/argaze/ArUcoMarkers/__init__.py
@@ -1,4 +1,6 @@
"""
-Handle [OpenCV ArUco markers](https://docs.opencv.org/4.x/d5/dae/tutorial_aruco_detection.html): generate and detect markers, calibrate camera, describe scene, ...
+Handle [OpenCV ArUco markers](https://docs.opencv.org/4.x/d5/dae/tutorial_aruco_detection.html): generate and detect
+markers, calibrate camera, describe scene, ...
"""
-__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoOpticCalibrator', 'ArUcoDetector', 'ArUcoMarkersGroup', 'ArUcoCamera', 'ArUcoScene', 'utils'] \ No newline at end of file
+__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoOpticCalibrator', 'ArUcoDetector',
+ 'ArUcoMarkersGroup', 'ArUcoCamera', 'ArUcoScene', 'utils']
diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py
index 8283e2e..2c8f003 100644
--- a/src/argaze/AreaOfInterest/AOI2DScene.py
+++ b/src/argaze/AreaOfInterest/AOI2DScene.py
@@ -25,6 +25,7 @@ import cv2
import numpy
from xml.dom import minidom
+
class AOI2DScene(AOIFeatures.AOIScene):
"""Define AOI 2D scene."""
@@ -53,12 +54,11 @@ class AOI2DScene(AOIFeatures.AOIScene):
# Load SVG path
for path in description_file.getElementsByTagName('path'):
-
# Convert d-string into array
d_string = path.getAttribute('d')
- assert(d_string[0] == 'M')
- assert(d_string[-1] == 'Z')
+ assert (d_string[0] == 'M')
+ assert (d_string[-1] == 'Z')
points = [(float(x), float(y)) for x, y in [p.split(',') for p in d_string[1:-1].split('L')]]
@@ -66,7 +66,6 @@ class AOI2DScene(AOIFeatures.AOIScene):
# Load SVG rect
for rect in description_file.getElementsByTagName('rect'):
-
# Convert rect element into dict
rect_dict = {
"Rectangle": {
@@ -81,7 +80,6 @@ class AOI2DScene(AOIFeatures.AOIScene):
# Load SVG circle
for circle in description_file.getElementsByTagName('circle'):
-
# Convert circle element into dict
circle_dict = {
"Circle": {
@@ -95,7 +93,6 @@ class AOI2DScene(AOIFeatures.AOIScene):
# Load SVG ellipse
for ellipse in description_file.getElementsByTagName('ellipse'):
-
# Convert ellipse element into dict
ellipse_dict = {
"Ellipse": {
@@ -130,7 +127,7 @@ class AOI2DScene(AOIFeatures.AOIScene):
if draw_aoi:
aoi.draw(image, **draw_aoi)
- def raycast(self, pointer:tuple) -> tuple[str, "AOIFeatures.AreaOfInterest", bool]:
+ def raycast(self, pointer: tuple) -> tuple[str, "AOIFeatures.AreaOfInterest", bool]:
"""Iterate over aoi to know which aoi is matching the given pointer position.
Returns:
aoi name
@@ -139,30 +136,31 @@ class AOI2DScene(AOIFeatures.AOIScene):
"""
for name, aoi in self.items():
-
matching = aoi.contains_point(pointer)
yield name, aoi, matching
- def draw_raycast(self, image: numpy.array, pointer:tuple, exclude=[], base_color=(0, 0, 255), matching_color=(0, 255, 0)):
+ def draw_raycast(self, image: numpy.array, pointer: tuple, exclude=[], base_color=(0, 0, 255),
+ matching_color=(0, 255, 0)):
"""Draw AOI with their matching status."""
for name, aoi, matching in self.raycast(pointer):
if name in exclude:
continue
-
+
color = matching_color if matching else base_color
if matching:
-
top_left_corner_pixel = numpy.rint(aoi.clockwise()[0]).astype(int)
- cv2.putText(image, name, top_left_corner_pixel, cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, name, top_left_corner_pixel, cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1,
+ cv2.LINE_AA)
# Draw form
aoi.draw(image, color)
- def circlecast(self, center:tuple, radius:float) -> tuple[str, "AOIFeatures.AreaOfInterest", numpy.array, float, float]:
+ def circlecast(self, center: tuple, radius: float) -> tuple[
+ str, "AOIFeatures.AreaOfInterest", numpy.array, float, float]:
"""Iterate over areas to know which aoi is matched circle.
Returns:
aoi name
@@ -173,7 +171,6 @@ class AOI2DScene(AOIFeatures.AOIScene):
"""
for name, aoi in self.items():
-
matched_region, aoi_ratio, circle_ratio = aoi.circle_intersection(center, radius)
yield name, aoi, matched_region, aoi_ratio, circle_ratio
@@ -211,6 +208,7 @@ class AOI2DScene(AOIFeatures.AOIScene):
return aoi2D_scene
'''
+
def dimensionalize(self, rectangle_3d: AOIFeatures.AreaOfInterest, size: tuple) -> AOI3DScene.AOI3DScene:
"""
Convert to 3D scene considering it is inside of 3D rectangular frame.
@@ -223,8 +221,8 @@ class AOI2DScene(AOIFeatures.AOIScene):
AOI 3D scene
"""
- assert(rectangle_3d.dimension == 3)
- assert(rectangle_3d.points_number == 4)
+ assert (rectangle_3d.dimension == 3)
+ assert (rectangle_3d.points_number == 4)
# Vectorize outter_axis function
vfunc = numpy.vectorize(rectangle_3d.outter_axis)
@@ -233,7 +231,6 @@ class AOI2DScene(AOIFeatures.AOIScene):
aoi3D_scene = AOI3DScene.AOI3DScene()
for name, aoi2D in self.items():
-
X, Y = (aoi2D / size).T
aoi3D_scene[name] = numpy.array(vfunc(X, Y)).T.view(AOIFeatures.AreaOfInterest)
diff --git a/src/argaze/AreaOfInterest/AOI3DScene.py b/src/argaze/AreaOfInterest/AOI3DScene.py
index 762dab0..1964d23 100644
--- a/src/argaze/AreaOfInterest/AOI3DScene.py
+++ b/src/argaze/AreaOfInterest/AOI3DScene.py
@@ -178,8 +178,8 @@ class AOI3DScene(AOIFeatures.AOIScene):
"""Get AOI which are inside and out a given cone field.
!!! note
- **By default**
- The cone have its tip at origin and its base oriented to positive Z axis.
+ **By default**
+ The cone have its tip at origin and its base oriented to positive Z axis.
Returns:
scene inside the cone
diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py
index 88c6feb..7da5bb5 100644
--- a/src/argaze/AreaOfInterest/AOIFeatures.py
+++ b/src/argaze/AreaOfInterest/AOIFeatures.py
@@ -31,634 +31,629 @@ from argaze import DataFeatures
class AreaOfInterest(numpy.ndarray):
- """Define Area Of Interest as an array of points of any dimension."""
+ """Define Area Of Interest as an array of points of any dimension."""
- def __new__(cls, points: numpy.array = numpy.empty(0)) -> Self:
- """View casting inheritance."""
+ def __new__(cls, points: numpy.array = numpy.empty(0)):
+ """View casting inheritance."""
- return numpy.array(points).view(AreaOfInterest)
+ return numpy.array(points).view(AreaOfInterest)
- def __repr__(self):
- """String representation"""
+ def __repr__(self):
+ """String representation"""
- return repr(self.tolist())
+ return repr(self.tolist())
- def __str__(self):
- """String display"""
+ def __str__(self):
+ """String display"""
- return repr(self.tolist())
+ return repr(self.tolist())
- @classmethod
- def from_dict(cls, aoi_data: dict) -> Self:
- """Load attributes from dictionary.
+ @classmethod
+ def from_dict(cls, aoi_data: dict) -> Self:
+ """Load attributes from dictionary.
- Parameters:
- aoi_data: dictionary with attributes to load
- """
+ Parameters:
+ aoi_data: dictionary with attributes to load
+ """
- # Get first and unique shape
- # TODO: allow multiple shapes to describe more complex AOI
- shape, shape_data = aoi_data.popitem()
+ # Get first and unique shape
+ # TODO: allow multiple shapes to describe more complex AOI
+ shape, shape_data = aoi_data.popitem()
- if shape == 'Rectangle':
+ if shape == 'Rectangle':
- x = shape_data.pop('x')
- y = shape_data.pop('y')
- width = shape_data.pop('width')
- height = shape_data.pop('height')
+ x = shape_data.pop('x')
+ y = shape_data.pop('y')
+ width = shape_data.pop('width')
+ height = shape_data.pop('height')
- points = [[x, y], [x+width, y], [x+width, y+height], [x, y+height]]
+ points = [[x, y], [x + width, y], [x + width, y + height], [x, y + height]]
- return AreaOfInterest(points)
+ return AreaOfInterest(points)
- elif shape == 'Circle':
+ elif shape == 'Circle':
- cx = shape_data.pop('cx')
- cy = shape_data.pop('cy')
- radius = shape_data.pop('radius')
+ cx = shape_data.pop('cx')
+ cy = shape_data.pop('cy')
+ radius = shape_data.pop('radius')
- # TODO: Use pygeos
- N = 32
- points = [(math.cos(2*math.pi / N*x) * radius + cx, math.sin(2*math.pi / N*x) * radius + cy) for x in range(0, N+1)]
+ # TODO: Use pygeos
+ N = 32
+ points = [(math.cos(2 * math.pi / N * x) * radius + cx, math.sin(2 * math.pi / N * x) * radius + cy) for x
+ in range(0, N + 1)]
- return AreaOfInterest(points)
+ return AreaOfInterest(points)
- elif shape == 'Ellipse':
+ elif shape == 'Ellipse':
- cx = shape_data.pop('cx')
- cy = shape_data.pop('cy')
- rx = shape_data.pop('rx')
- ry = shape_data.pop('ry')
+ cx = shape_data.pop('cx')
+ cy = shape_data.pop('cy')
+ rx = shape_data.pop('rx')
+ ry = shape_data.pop('ry')
- # TODO: Use pygeos
- N = 32
- points = [(math.cos(2*math.pi / N*x) * rx + cx, math.sin(2*math.pi / N*x) * ry + cy) for x in range(0, N+1)]
+ # TODO: Use pygeos
+ N = 32
+ points = [(math.cos(2 * math.pi / N * x) * rx + cx, math.sin(2 * math.pi / N * x) * ry + cy) for x in
+ range(0, N + 1)]
- @property
- def dimension(self) -> int:
- """Number of axis coding area points positions."""
- return self.shape[1]
+ @property
+ def dimension(self) -> int:
+ """Number of axis coding area points positions."""
+ return self.shape[1]
- @property
- def points_number(self) -> int:
- """Number of points defining the area."""
- return self.shape[0]
+ @property
+ def points_number(self) -> int:
+ """Number of points defining the area."""
+ return self.shape[0]
- def is_empty(self) -> bool:
- """Is AOI empty ?"""
- return self.shape[0] == 0
+ def is_empty(self) -> bool:
+ """Is AOI empty ?"""
+ return self.shape[0] == 0
- @property
- def bounds(self) -> numpy.array:
- """Get area's bounds."""
- min_bounds = numpy.min(self, axis=0)
- max_bounds = numpy.max(self, axis=0)
+ @property
+ def bounds(self) -> numpy.array:
+ """Get area's bounds."""
+ min_bounds = numpy.min(self, axis=0)
+ max_bounds = numpy.max(self, axis=0)
- return numpy.array([min_bounds, max_bounds])
+ return numpy.array([min_bounds, max_bounds])
- @property
- def center(self) -> numpy.array:
- """Center of mass."""
- return self.mean(axis=0)
+ @property
+ def center(self) -> numpy.array:
+ """Center of mass."""
+ return self.mean(axis=0)
- @property
- def size(self) -> numpy.array:
- """Get scene size."""
- min_bounds, max_bounds = self.bounds
+ @property
+ def size(self) -> numpy.array:
+ """Get scene size."""
+ min_bounds, max_bounds = self.bounds
- return max_bounds - min_bounds
+ return max_bounds - min_bounds
- @property
- def area(self) -> float:
- """Area of the polygon defined by aoi's points."""
- return Polygon(self).area
+ @property
+ def area(self) -> float:
+ """Area of the polygon defined by aoi's points."""
+ return Polygon(self).area
- @property
- def bounding_box(self) -> numpy.array:
- """Get area's bounding box.
- !!! warning
- Available for 2D AOI only."""
+ @property
+ def bounding_box(self) -> numpy.array:
+ """Get area's bounding box.
+ !!! warning
+ Available for 2D AOI only."""
- assert(self.points_number > 1)
- assert(self.dimension == 2)
+ assert (self.points_number > 1)
+ assert (self.dimension == 2)
- min_x, min_y = numpy.min(self, axis=0)
- max_x, max_y = numpy.max(self, axis=0)
+ min_x, min_y = numpy.min(self, axis=0)
+ max_x, max_y = numpy.max(self, axis=0)
- return numpy.array([(min_x, min_y), (max_x, min_y), (max_x, max_y), (min_x, max_y)])
+ return numpy.array([(min_x, min_y), (max_x, min_y), (max_x, max_y), (min_x, max_y)])
- def clockwise(self) -> Self:
- """Get area points in clockwise order.
- !!! warning
- Available for 2D AOI only."""
+ def clockwise(self) -> Self:
+ """Get area points in clockwise order.
+ !!! warning
+ Available for 2D AOI only."""
- assert(self.dimension == 2)
+ assert (self.dimension == 2)
- O = self.center
- OP = (self - O) / numpy.linalg.norm(self - O)
- angles = numpy.arctan2(OP[:, 1], OP[:, 0])
+ O = self.center
+ OP = (self - O) / numpy.linalg.norm(self - O)
+ angles = numpy.arctan2(OP[:, 1], OP[:, 0])
- return self[numpy.argsort(angles)]
+ return self[numpy.argsort(angles)]
- def contains_point(self, point: tuple) -> bool:
- """Is a point inside area?
- !!! warning
- Available for 2D AOI only.
- !!! danger
- The AOI points must be sorted in clockwise order."""
+ def contains_point(self, point: tuple) -> bool:
+ """Is a point inside area?
+ !!! warning
+ Available for 2D AOI only.
+ !!! danger
+ The AOI points must be sorted in clockwise order."""
- assert(self.dimension == 2)
- assert(len(point) == self.dimension)
+ assert (self.dimension == 2)
+ assert (len(point) == self.dimension)
- return mpath.Path(self).contains_points([point])[0]
+ return bool(mpath.Path(self).contains_points([point])[0])
- def inner_axis(self, x: float, y: float) -> tuple:
- """Transform a point coordinates from global axis to AOI axis.
- !!! warning
- Available for 2D AOI only.
- !!! danger
- The AOI points must be sorted in clockwise order."""
+ def inner_axis(self, x: float, y: float) -> tuple:
+ """Transform a point coordinates from global axis to AOI axis.
+ !!! warning
+ Available for 2D AOI only.
+ !!! danger
+ The AOI points must be sorted in clockwise order."""
- assert(self.dimension == 2)
+ assert (self.dimension == 2)
- Src = self
- Src_origin = Src[0]
- Src = (Src - Src_origin).reshape((len(Src)), 2).astype(numpy.float32)
+ Src = self
+ Src_origin = Src[0]
+ Src = (Src - Src_origin).reshape((len(Src)), 2).astype(numpy.float32)
- Dst = numpy.array([[0., 0.], [1., 0.], [1., 1.], [0., 1.]]).astype(numpy.float32)
+ Dst = numpy.array([[0., 0.], [1., 0.], [1., 1.], [0., 1.]]).astype(numpy.float32)
- P = cv2.getPerspectiveTransform(Src, Dst)
- X = numpy.append(numpy.array(numpy.array([x, y]) - Src_origin), [1.0]).astype(numpy.float32)
- Y = numpy.dot(P, X)
+ P = cv2.getPerspectiveTransform(Src, Dst)
+ X = numpy.append(numpy.array(numpy.array([x, y]) - Src_origin), [1.0]).astype(numpy.float32)
+ Y = numpy.dot(P, X)
- La = (Y/Y[2])[:-1]
+ La = (Y / Y[2])[:-1]
- return tuple(numpy.around(La, 4))
+ return tuple(numpy.around(La, 4))
- def outter_axis(self, x: float, y: float) -> tuple:
- """Transform a point coordinates from AOI axis to global axis.
- !!! danger
- The AOI points must be sorted in clockwise order.
- !!! danger
- The AOI must be a rectangle."""
+ def outter_axis(self, x: float, y: float) -> tuple:
+ """Transform a point coordinates from AOI axis to global axis.
+ !!! danger
+ The AOI points must be sorted in clockwise order.
+ !!! danger
+ The AOI must be a rectangle.
+ """
- # Origin point
- O = self[0]
+ # Origin point
+ O = self[0]
- # Horizontal axis vector
- H = self[1] - self[0]
+ # Horizontal axis vector
+ H = self[1] - self[0]
- # Vertical axis vector
- V = self[3] - self[0]
+ # Vertical axis vector
+ V = self[3] - self[0]
- return tuple(O + x * H + y * V)
+ return tuple(O + x * H + y * V)
- def circle_intersection(self, center: tuple, radius: float) -> tuple[numpy.array, float, float]:
- """Get intersection shape with a circle, intersection area / AOI area ratio and intersection area / circle area ratio.
- !!! warning
- Available for 2D AOI only.
- Returns:
- intersection shape
- intersection aoi ratio
- intersection circle ratio
- """
+ def circle_intersection(self, center: tuple, radius: float) -> tuple[numpy.array, float, float]:
+ """Get intersection shape with a circle, intersection area / AOI area ratio and intersection area / circle area ratio.
+ !!! warning
+ Available for 2D AOI only.
- assert(self.dimension == 2)
+ Returns:
+ intersection shape
+ intersection aoi ratio
+ intersection circle ratio
+ """
- self_polygon = Polygon(self)
- args_circle = Point(center).buffer(radius)
+ assert (self.dimension == 2)
- if self_polygon.intersects(args_circle):
+ self_polygon = Polygon(self)
+ args_circle = Point(center).buffer(radius)
- intersection = self_polygon.intersection(args_circle)
+ if self_polygon.intersects(args_circle):
- intersection_array = numpy.array([list(xy) for xy in intersection.exterior.coords[:]]).astype(numpy.float32).view(AreaOfInterest)
+ intersection = self_polygon.intersection(args_circle)
- return intersection_array, intersection.area / self_polygon.area, intersection.area / args_circle.area
+ intersection_array = numpy.array([list(xy) for xy in intersection.exterior.coords[:]]).astype(
+ numpy.float32).view(AreaOfInterest)
- else:
+ return intersection_array, intersection.area / self_polygon.area, intersection.area / args_circle.area
- empty_array = numpy.array([list([])]).astype(numpy.float32).view(AreaOfInterest)
+ else:
- return empty_array, 0., 0.
+ empty_array = numpy.array([list([])]).astype(numpy.float32).view(AreaOfInterest)
- def draw(self, image: numpy.array, color, border_size=1):
- """Draw 2D AOI into image.
- !!! warning
- Available for 2D AOI only."""
+ return empty_array, 0., 0.
- assert(self.dimension == 2)
+ def draw(self, image: numpy.array, color, border_size=1):
+ """Draw 2D AOI into image.
+ !!! warning
+ Available for 2D AOI only."""
- if len(self) > 1:
+ assert (self.dimension == 2)
- # Draw form
- pixels = numpy.rint(self).astype(int)
- cv2.line(image, pixels[-1], pixels[0], color, border_size)
- for A, B in zip(pixels, pixels[1:]):
- cv2.line(image, A, B, color, border_size)
+ if len(self) > 1:
- # Draw center
- center_pixel = numpy.rint(self.center).astype(int)
- cv2.circle(image, center_pixel, 1, color, -1)
+ # Draw form
+ pixels = numpy.rint(self).astype(int)
+ cv2.line(image, pixels[-1], pixels[0], color, border_size)
+ for A, B in zip(pixels, pixels[1:]):
+ cv2.line(image, A, B, color, border_size)
-class AOIScene():
- """Define AOI scene as a dictionary of AOI."""
-
- def __init__(self, dimension: int, areas: dict = None):
- """Initialisation."""
+ # Draw center
+ center_pixel = numpy.rint(self.center).astype(int)
+ cv2.circle(image, center_pixel, 1, color, -1)
- assert(dimension > 0)
- super().__init__()
+class AOIScene():
+ """Define AOI scene as a dictionary of AOI."""
- self.__dimension = dimension
- self.__areas = {}
+ def __init__(self, dimension: int, areas: dict = None):
+ """Initialisation."""
- # NEVER USE {} as default function argument
- if areas is not None:
+ assert (dimension > 0)
- for name, area in areas.items():
- self[name] = AreaOfInterest(area)
+ super().__init__()
- @classmethod
- def from_dict(cls, aoi_scene_data: dict) -> Self:
- """Load attributes from dictionary.
+ self.__dimension = dimension
+ self.__areas = {}
- Parameters:
- aoi_scene_data: dictionary with attributes to load
- """
+ # NEVER USE {} as default function argument
+ if areas is not None:
- # Load areas
- areas = {}
+ for name, area in areas.items():
+ self[name] = AreaOfInterest(area)
- for area_name, area_data in aoi_scene_data.items():
+ @classmethod
+ def from_dict(cls, aoi_scene_data: dict) -> Self:
+ """Load attributes from dictionary.
- if type(area_data) == list:
+ Parameters:
+ aoi_scene_data: dictionary with attributes to load
+ """
- areas[area_name] = AreaOfInterest(area_data)
+ # Load areas
+ areas = {}
- elif type(area_data) == dict:
+ for area_name, area_data in aoi_scene_data.items():
- areas[area_name] = AreaOfInterest.from_dict(area_data)
+ if type(area_data) == list:
- # Default dimension is 0
- dimension = 0
+ areas[area_name] = AreaOfInterest(area_data)
- # Guess dimension from first area dimension (default: 2)
- if len(areas) > 0:
+ elif type(area_data) == dict:
- dimension = list(areas.values())[0].dimension
+ areas[area_name] = AreaOfInterest.from_dict(area_data)
- return AOIScene(dimension = dimension, areas = areas)
+ # Default dimension is 0
+ dimension = 0
- @classmethod
- def from_json(cls, json_filepath: str) -> Self:
- """
- Load attributes from .json file.
+ # Guess dimension from first area dimension (default: 2)
+ if len(areas) > 0:
+ dimension = list(areas.values())[0].dimension
- Parameters:
- json_filepath: path to json file
- """
+ return AOIScene(dimension=dimension, areas=areas)
- with open(json_filepath) as configuration_file:
+ @classmethod
+ def from_json(cls, json_filepath: str) -> Self:
+ """
+ Load attributes from .json file.
- return AOIScene.from_dict(json.load(configuration_file))
+ Parameters:
+ json_filepath: path to json file
+ """
- def __getitem__(self, name) -> AreaOfInterest:
- """Get an AOI from the scene."""
+ with open(json_filepath) as configuration_file:
+ return AOIScene.from_dict(json.load(configuration_file))
- return AreaOfInterest(self.__areas[name])
+ def __getitem__(self, name) -> AreaOfInterest:
+ """Get an AOI from the scene."""
- def __setitem__(self, name, aoi: AreaOfInterest):
- """Add an AOI to the scene."""
+ return AreaOfInterest(self.__areas[name])
- assert(aoi.dimension == self.__dimension)
+ def __setitem__(self, name, aoi: AreaOfInterest):
+ """Add an AOI to the scene."""
- self.__areas[name] = AreaOfInterest(aoi)
+ assert (aoi.dimension == self.__dimension)
- # Expose area as an attribute of the class
- setattr(self, name, self.__areas[name])
+ self.__areas[name] = AreaOfInterest(aoi)
- def __delitem__(self, key):
- """Remove an AOI from the scene."""
+ # Expose area as an attribute of the class
+ setattr(self, name, self.__areas[name])
- del self.__areas[key]
+ def __delitem__(self, key):
+ """Remove an AOI from the scene."""
- # Stop area exposition as an attribute of the class
- delattr(self, key)
+ del self.__areas[key]
- def __or__(self, other):
- """Merge another scene using | operator."""
+ # Stop area exposition as an attribute of the class
+ delattr(self, key)
- assert(other.dimension == self.__dimension)
+ def __or__(self, other):
+ """Merge another scene using | operator."""
- merged_areas = dict(self.__areas)
- merged_areas.update(other.__areas)
+ assert (other.dimension == self.__dimension)
- return AOIScene(self.dimension, merged_areas)
+ merged_areas = dict(self.__areas)
+ merged_areas.update(other.__areas)
- def __ror__(self, other):
- """Merge another scene using | operator."""
+ return AOIScene(self.dimension, merged_areas)
- assert(other.dimension == self.__dimension)
+ def __ror__(self, other):
+ """Merge another scene using | operator."""
- merged_areas = dict(other.__areas)
- merged_areas.update(self.__areas)
+ assert (other.dimension == self.__dimension)
- return AOIScene(self.dimension, merged_areas)
+ merged_areas = dict(other.__areas)
+ merged_areas.update(self.__areas)
- def __ior__(self, other):
- """Merge scene with another scene in-place using |= operator."""
+ return AOIScene(self.dimension, merged_areas)
- assert(other.dimension == self.__dimension)
+ def __ior__(self, other):
+ """Merge scene with another scene in-place using |= operator."""
- self.__areas.update(other.__areas)
- self.__dict__.update(other.__areas)
+ assert (other.dimension == self.__dimension)
- return self
+ self.__areas.update(other.__areas)
+ self.__dict__.update(other.__areas)
- def __len__(self):
- """Get number of AOI into scene."""
- return len(self.__areas)
+ return self
- def __repr__(self):
- """String representation"""
+ def __len__(self):
+ """Get number of AOI into scene."""
+ return len(self.__areas)
- return str(self.__areas)
+ def __repr__(self):
+ """String representation"""
- def __add__(self, add_vector) -> Self:
- """Add vector to scene."""
+ return str(self.__areas)
- assert(len(add_vector) == self.__dimension)
+ def __add__(self, add_vector) -> Self:
+ """Add vector to scene."""
- for name, area in self.__areas.items():
-
- self.__areas[name] = self.__areas[name] + add_vector
+ assert (len(add_vector) == self.__dimension)
- return self
+ for name, area in self.__areas.items():
+ self.__areas[name] = self.__areas[name] + add_vector
- # Allow n + scene operation
- __radd__ = __add__
+ return self
- def __sub__(self, sub_vector) -> Self:
- """Sub vector to scene."""
+ # Allow n + scene operation
+ __radd__ = __add__
- assert(len(sub_vector) == self.__dimension)
+ def __sub__(self, sub_vector) -> Self:
+ """Sub vector to scene."""
- for name, area in self.__areas.items():
-
- self.__areas[name] = self.__areas[name] - sub_vector
+ assert (len(sub_vector) == self.__dimension)
- return self
+ for name, area in self.__areas.items():
+ self.__areas[name] = self.__areas[name] - sub_vector
- def __rsub__(self, rsub_vector) -> Self:
- """RSub vector to scene."""
+ return self
- assert(len(rsub_vector) == self.__dimension)
+ def __rsub__(self, rsub_vector) -> Self:
+ """RSub vector to scene."""
- for name, area in self.__areas.items():
-
- self.__areas[name] = rsub_vector - self.__areas[name]
+ assert (len(rsub_vector) == self.__dimension)
- return self
+ for name, area in self.__areas.items():
+ self.__areas[name] = rsub_vector - self.__areas[name]
- def __mul__(self, scale_vector) -> Self:
- """Scale scene by a vector."""
+ return self
- assert(len(scale_vector) == self.__dimension)
+ def __mul__(self, scale_vector) -> Self:
+ """Scale scene by a vector."""
- for name, area in self.__areas.items():
-
- self.__areas[name] = self.__areas[name] * scale_vector
+ assert (len(scale_vector) == self.__dimension)
- return self
+ for name, area in self.__areas.items():
+ self.__areas[name] = self.__areas[name] * scale_vector
- # Allow n * scene operation
- __rmul__ = __mul__
+ return self
- def __truediv__(self, div_vector) -> Self:
+ # Allow n * scene operation
+ __rmul__ = __mul__
- assert(len(div_vector) == self.__dimension)
+ def __truediv__(self, div_vector) -> Self:
- for name, area in self.__areas.items():
-
- self.__areas[name] = self.__areas[name] / div_vector
+ assert (len(div_vector) == self.__dimension)
- return self
+ for name, area in self.__areas.items():
+ self.__areas[name] = self.__areas[name] / div_vector
- def items(self) -> tuple[str, AreaOfInterest]:
- """Iterate over areas."""
+ return self
- return self.__areas.items()
+ def items(self) -> tuple[str, AreaOfInterest]:
+ """Iterate over areas."""
- def keys(self) -> list[str]:
- """Get areas name."""
+ return self.__areas.items()
- return self.__areas.keys()
+ def keys(self) -> list[str]:
+ """Get areas name."""
- @property
- def dimension(self) -> int:
- """Dimension of the AOI in scene."""
+ return list(self.__areas.keys())
- return self.__dimension
+ @property
+ def dimension(self) -> int:
+ """Dimension of the AOI in scene."""
- def expand(self) -> Self:
- """Add 1 dimension to the AOIs in scene."""
+ return self.__dimension
- new_areas = {}
+ def expand(self) -> Self:
+ """Add 1 dimension to the AOIs in scene."""
- for name, area in self.__areas.items():
+ new_areas = {}
- zeros = numpy.zeros((len(self.__areas[name]), 1))
- new_areas[name] = numpy.concatenate((self.__areas[name], zeros), axis=1)
+ for name, area in self.__areas.items():
+ zeros = numpy.zeros((len(self.__areas[name]), 1))
+ new_areas[name] = numpy.concatenate((self.__areas[name], zeros), axis=1)
- return AOIScene(dimension = self.__dimension + 1, areas = new_areas)
+ return AOIScene(dimension=self.__dimension + 1, areas=new_areas)
- @property
- def bounds(self) -> numpy.array:
- """Get scene's bounds."""
+ @property
+ def bounds(self) -> numpy.array:
+ """Get scene's bounds."""
- all_vertices = []
+ all_vertices = []
- for area in self.__areas.values():
- for vertice in area:
- all_vertices.append(vertice)
+ for area in self.__areas.values():
+ for vertice in area:
+ all_vertices.append(vertice)
- all_vertices = numpy.array(all_vertices) #.astype(numpy.float32)
+ all_vertices = numpy.array(all_vertices) #.astype(numpy.float32)
- min_bounds = numpy.min(all_vertices, axis=0)
- max_bounds = numpy.max(all_vertices, axis=0)
+ min_bounds = numpy.min(all_vertices, axis=0)
+ max_bounds = numpy.max(all_vertices, axis=0)
- return numpy.array([min_bounds, max_bounds])
+ return numpy.array([min_bounds, max_bounds])
- @property
- def center(self) -> numpy.array:
- """Get scene's center point."""
+ @property
+ def center(self) -> numpy.array:
+ """Get scene's center point."""
- min_bounds, max_bounds = self.bounds
+ min_bounds, max_bounds = self.bounds
- return (min_bounds + max_bounds) / 2
+ return (min_bounds + max_bounds) / 2
- @property
- def size(self) -> numpy.array:
- """Get scene size."""
+ @property
+ def size(self) -> numpy.array:
+ """Get scene size."""
- min_bounds, max_bounds = self.bounds
+ min_bounds, max_bounds = self.bounds
- return max_bounds - min_bounds
+ return max_bounds - min_bounds
- def copy(self, exclude: list=None) -> Self:
- """Copy scene partly excluding AOI by name."""
+ def copy(self, exclude: list = None) -> Self:
+ """Copy scene partly excluding AOI by name."""
- if exclude is None:
- exclude = []
+ if exclude is None:
+ exclude = []
- # noinspection PyArgumentList
- scene_copy = type(self)()
+ # noinspection PyArgumentList
+ scene_copy = type(self)()
- for name, area in self.__areas.items():
-
- if name not in exclude:
+ for name, area in self.__areas.items():
- scene_copy[name] = AreaOfInterest(area) #.astype(numpy.float32).view(AreaOfInterest)
+ if name not in exclude:
+ scene_copy[name] = AreaOfInterest(area) #.astype(numpy.float32).view(AreaOfInterest)
- return scene_copy
+ return scene_copy
- def clear(self):
- """Clear scene."""
+ def clear(self):
+ """Clear scene."""
- self.__areas.clear()
+ self.__areas.clear()
- def __str__(self) -> str:
- """
- String representation of pipeline step object.
-
- Returns:
- String representation
- """
+ def __str__(self) -> str:
+ """
+ String representation of pipeline step object.
+
+ Returns:
+ String representation
+ """
- output = ''
+ output = ''
- for name, area in self.__areas.items():
+ for name, area in self.__areas.items():
+ output += f'{Fore.BLUE}{Style.BRIGHT}{name}{Style.RESET_ALL} '
- output += f'{Fore.BLUE}{Style.BRIGHT}{name}{Style.RESET_ALL} '
-
- return output
+ return output
# noinspection PyAttributeOutsideInit
class Heatmap(DataFeatures.PipelineStepObject):
- """Define image to draw heatmap."""
-
- @DataFeatures.PipelineStepInit
- def __init__(self, **kwargs):
+ """Define image to draw heatmap."""
- # Init private attributes
- self.__size = (1, 1)
- self.__buffer = 0
- self.__sigma = 0.05
+ # noinspection PyMissingConstructor
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
- @property
- def size(self) -> tuple[int, int]:
- """Size of heatmap image in pixels."""
- return self.__size
+ # Init private attributes
+ self.__size = (1, 1)
+ self.__buffer = 0
+ self.__sigma = 0.05
- # noinspection PyAttributeOutsideInit
- @size.setter
- def size(self, size: tuple[int, int]):
-
- self.__size = size
- # noinspection PyAttributeOutsideInit
- self.__rX, self.__rY = size
+ @property
+ def size(self) -> tuple[int, int]:
+ """Size of heatmap image in pixels."""
+ return self.__size
- # Init coordinates
- # noinspection PyAttributeOutsideInit
- self.__Sx = numpy.linspace(0., self.__rX/self.__rY, self.__rX)
- # noinspection PyAttributeOutsideInit
- self.__Sy = numpy.linspace(0., 1., self.__rY)
+ # noinspection PyAttributeOutsideInit
+ @size.setter
+ def size(self, size: tuple[int, int]):
- # Init heatmap image
- self.clear()
+ self.__size = size
+ # noinspection PyAttributeOutsideInit
+ self.__rX, self.__rY = size
- @property
- def sigma(self) -> float:
- """Point spread factor."""
- return self.__sigma
+ # Init coordinates
+ # noinspection PyAttributeOutsideInit
+ self.__Sx = numpy.linspace(0., self.__rX / self.__rY, self.__rX)
+ # noinspection PyAttributeOutsideInit
+ self.__Sy = numpy.linspace(0., 1., self.__rY)
- @sigma.setter
- def sigma(self, sigma: float):
+ # Init heatmap image
+ self.clear()
- self.__sigma = sigma
+ @property
+ def sigma(self) -> float:
+ """Point spread factor."""
+ return self.__sigma
- @property
- def buffer(self) -> int:
- """Size of heatmap buffer (0 means no buffering)."""
- return self.__buffer
+ @sigma.setter
+ def sigma(self, sigma: float):
- @buffer.setter
- def buffer(self, buffer: int):
+ self.__sigma = sigma
- self.__buffer = buffer
+ @property
+ def buffer(self) -> int:
+ """Size of heatmap buffer (0 means no buffering)."""
+ return self.__buffer
- def point_spread(self, point: tuple):
- """Draw gaussian point spread into image."""
+ @buffer.setter
+ def buffer(self, buffer: int):
- div = -2 * self.__sigma**2
+ self.__buffer = buffer
- x = point[0] / self.__rY # we use rY not rX !!!
- y = point[1] / self.__rY
+ def point_spread(self, point: tuple):
+ """Draw gaussian point spread into image."""
- dX2 = (self.__Sx - x)**2
- dY2 = (self.__Sy - y)**2
+ div = -2 * self.__sigma ** 2
- v_dX, v_dY = numpy.array(numpy.meshgrid(dX2, dY2)).reshape(2, -1)
+ x = point[0] / self.__rY # we use rY not rX !!!
+ y = point[1] / self.__rY
- return numpy.exp((v_dX + v_dY) / div).reshape(self.__rY, self.__rX)
+ dX2 = (self.__Sx - x) ** 2
+ dY2 = (self.__Sy - y) ** 2
- # noinspection PyAttributeOutsideInit
- def clear(self):
- """Clear heatmap image."""
+ v_dX, v_dY = numpy.array(numpy.meshgrid(dX2, dY2)).reshape(2, -1)
- # noinspection PyAttributeOutsideInit
- self.__point_spread_sum = numpy.zeros((self.__rY, self.__rX))
- # noinspection PyAttributeOutsideInit
- self.__point_spread_buffer = []
- # noinspection PyAttributeOutsideInit
- self.__point_spread_buffer_size = self.__buffer
+ return numpy.exp((v_dX + v_dY) / div).reshape(self.__rY, self.__rX)
- # noinspection PyAttributeOutsideInit
- @DataFeatures.PipelineStepMethod
- def update(self, point: tuple):
- """Update heatmap image."""
+ # noinspection PyAttributeOutsideInit
+ def clear(self):
+ """Clear heatmap image."""
- point_spread = self.point_spread(point)
+ # noinspection PyAttributeOutsideInit
+ self.__point_spread_sum = numpy.zeros((self.__rY, self.__rX))
+ # noinspection PyAttributeOutsideInit
+ self.__point_spread_buffer = []
+ # noinspection PyAttributeOutsideInit
+ self.__point_spread_buffer_size = self.__buffer
- # Sum point spread
- self.__point_spread_sum += point_spread
+ # noinspection PyAttributeOutsideInit
+ @DataFeatures.PipelineStepMethod
+ def update(self, point: tuple):
+ """Update heatmap image."""
- # If point spread buffering enabled
- if self.__buffer > 0:
+ point_spread = self.point_spread(point)
- self.__point_spread_buffer.append(point_spread)
+ # Sum point spread
+ self.__point_spread_sum += point_spread
- # Remove oldest point spread buffer image
- if len(self.__point_spread_buffer) > self.buffer:
+ # If point spread buffering enabled
+ if self.__buffer > 0:
+ self.__point_spread_buffer.append(point_spread)
- self.__point_spread_sum -= self.__point_spread_buffer.pop(0)
+ # Remove oldest point spread buffer image
+ if len(self.__point_spread_buffer) > self.buffer:
+ self.__point_spread_sum -= self.__point_spread_buffer.pop(0)
- # Edit heatmap
- gray = (255 * self.__point_spread_sum / numpy.max(self.__point_spread_sum)).astype(numpy.uint8)
- # noinspection PyAttributeOutsideInit
- self.__image = cv2.applyColorMap(gray, cv2.COLORMAP_JET)
+ # Edit heatmap
+ gray = (255 * self.__point_spread_sum / numpy.max(self.__point_spread_sum)).astype(numpy.uint8)
+ # noinspection PyAttributeOutsideInit
+ self.__image = cv2.applyColorMap(gray, cv2.COLORMAP_JET)
- @DataFeatures.PipelineStepImage
- def image(self):
- """Get heatmap image."""
- try:
+ @DataFeatures.PipelineStepImage
+ def image(self):
+ """Get heatmap image."""
+ try:
- return self.__image
+ return self.__image
- except AttributeError:
+ except AttributeError:
- return numpy.zeros((self.__rY, self.__rX, 3)).astype(numpy.uint8)
+ return numpy.zeros((self.__rY, self.__rX, 3)).astype(numpy.uint8)
diff --git a/src/argaze/AreaOfInterest/__init__.py b/src/argaze/AreaOfInterest/__init__.py
index a9bb2b2..084550b 100644
--- a/src/argaze/AreaOfInterest/__init__.py
+++ b/src/argaze/AreaOfInterest/__init__.py
@@ -1,4 +1,4 @@
"""
Manage Areas of Interest (AOI) and scenes for 2D and 3D environment.
"""
-__all__ = ['AOIFeatures', 'AOI2DScene', 'AOI3DScene'] \ No newline at end of file
+__all__ = ['AOIFeatures', 'AOI2DScene', 'AOI3DScene']
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index a7b0a48..4e85aaf 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -330,7 +330,11 @@ class TimestampedObjectsList(list):
Timestamped objects are considered to be stored according to their coming time.
"""
- def __init__(self, ts_object_type: type, ts_objects: list = []):
+ # noinspection PyMissingConstructor
+ def __init__(self, ts_object_type: type, ts_objects=None):
+
+ if ts_objects is None:
+ ts_objects = []
self.__object_type = ts_object_type
self.__object_properties = properties(self.__object_type)
@@ -396,9 +400,12 @@ class TimestampedObjectsList(list):
return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self]
@classmethod
- def from_dataframe(cls, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self:
+ def from_dataframe(cls, ts_object_type: type, dataframe: pandas.DataFrame, exclude=None) -> Self:
"""Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)."""
+ if exclude is None:
+ exclude = []
+
dataframe.drop(exclude, inplace=True, axis=True)
assert (dataframe.index.name == 'timestamp')
@@ -583,7 +590,7 @@ class SharedObject(TimestampedObject):
class TimestampedException(Exception, TimestampedObject):
"""Wrap exception to keep track of raising timestamp."""
- def __init__(self, exception=Exception, timestamp: int | float = math.nan):
+ def __init__(self, exception: Exception, timestamp: int | float = math.nan):
Exception.__init__(self, exception)
TimestampedObject.__init__(self, timestamp)
@@ -639,10 +646,10 @@ def PipelineStepInit(method):
def wrapper(self, **kwargs):
"""Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call.
- Parameters:
- self:
- kwargs: any arguments defined by PipelineStepMethodInit.
- """
+ Parameters:
+ self:
+ kwargs: any arguments defined by PipelineStepMethodInit.
+ """
# Init pipeline step object attributes
PipelineStepObject.__init__(self)
@@ -811,9 +818,8 @@ def PipelineStepDraw(method):
# noinspection PyAttributeOutsideInit
class PipelineStepObject():
+ """Define class to assess pipeline step methods execution time and observe them.
"""
- Define class to assess pipeline step methods execution time and observe them.
- """
__initialized = False
@@ -939,9 +945,9 @@ class PipelineStepObject():
def as_dict(self) -> dict:
"""Export PipelineStepObject attributes as dictionary.
- Returns:
- object_data: dictionary with pipeline step object attributes values.
- """
+ Returns:
+ object_data: dictionary with pipeline step object attributes values.
+ """
return {
"name": self.__name,
"observers": self.__observers
@@ -964,12 +970,11 @@ class PipelineStepObject():
#json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder)
def __str__(self) -> str:
+ """String representation of pipeline step object.
+
+ Returns:
+ String representation
"""
- String representation of pipeline step object.
-
- Returns:
- String representation
- """
logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '')
@@ -1064,7 +1069,7 @@ class PipelineStepObject():
yield name, getattr(self, name)
@property
- def children(self) -> object:
+ def children(self):
"""Iterate over children pipeline step objects."""
for name, value in self.properties:
@@ -1098,7 +1103,8 @@ def PipelineStepMethod(method):
PipelineStepMethod must have a timestamp as first argument.
"""
- def wrapper(self, *args, timestamp: int | float = None, unwrap: bool = False, catch_exceptions: bool = True, **kwargs):
+ def wrapper(self, *args, timestamp: int | float = None, unwrap: bool = False, catch_exceptions: bool = True,
+ **kwargs):
"""Wrap pipeline step method to measure execution time.
Parameters:
@@ -1117,7 +1123,8 @@ def PipelineStepMethod(method):
else:
- logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.', get_class_path(self), method.__name__, type(args[0]).__name__)
+ logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.',
+ get_class_path(self), method.__name__, type(args[0]).__name__)
if unwrap:
return method(self, *args, **kwargs)
diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
index b8451f0..a860e47 100644
--- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
@@ -258,7 +258,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Always return empty gaze movement at least
return GazeFeatures.GazeMovement()
- def current_fixation(self) -> GazeFeatures.Fixation:
+ def current_fixation(self) -> GazeFeatures.GazeMovement:
if self.__fixation_positions:
@@ -267,7 +267,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Always return empty gaze movement at least
return GazeFeatures.GazeMovement()
- def current_saccade(self) -> GazeFeatures.Saccade:
+ def current_saccade(self) -> GazeFeatures.GazeMovement:
if len(self.__saccade_positions) > 1:
diff --git a/src/argaze/GazeAnalysis/KCoefficient.py b/src/argaze/GazeAnalysis/KCoefficient.py
index 7e3caab..9980dfe 100644
--- a/src/argaze/GazeAnalysis/KCoefficient.py
+++ b/src/argaze/GazeAnalysis/KCoefficient.py
@@ -97,7 +97,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer):
self.__K = 0
@DataFeatures.PipelineStepMethod
- def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPath) -> float:
+ def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPath):
assert(len(aoi_scan_path) > 1)
diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
index d001688..78cc170 100644
--- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
@@ -250,7 +250,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Always return empty gaze movement at least
return GazeFeatures.GazeMovement()
- def current_fixation(self) -> GazeFeatures.Fixation:
+ def current_fixation(self) -> GazeFeatures.GazeMovement:
if self.__fixation_positions:
@@ -259,7 +259,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Always return empty gaze movement at least
return GazeFeatures.GazeMovement()
- def current_saccade(self) -> GazeFeatures.Saccade:
+ def current_saccade(self) -> GazeFeatures.GazeMovement:
if len(self.__saccade_positions) > 1:
diff --git a/src/argaze/GazeAnalysis/__init__.py b/src/argaze/GazeAnalysis/__init__.py
index f0ba9fd..2d05006 100644
--- a/src/argaze/GazeAnalysis/__init__.py
+++ b/src/argaze/GazeAnalysis/__init__.py
@@ -13,4 +13,4 @@ __all__ = [
'NearestNeighborIndex',
'ExploreExploitRatio',
'LinearRegression'
-] \ No newline at end of file
+]
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index dbeee61..5777a8d 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -93,11 +93,11 @@ class GazePosition(tuple, DataFeatures.TimestampedObject):
"""
if self.__precision is not None and position.precision is not None:
- return GazePosition(numpy.array(self) + numpy.array(position), precision = max(self.__precision, position.precision), timestamp=self.timestamp)
+ return GazePosition(tuple(numpy.array(self) + numpy.array(position)), precision = max(self.__precision, position.precision), timestamp = self.timestamp)
else:
- return GazePosition(numpy.array(self) + numpy.array(position), timestamp=self.timestamp)
+ return GazePosition(tuple(numpy.array(self) + numpy.array(position)), timestamp = self.timestamp)
__radd__ = __add__
@@ -112,11 +112,11 @@ class GazePosition(tuple, DataFeatures.TimestampedObject):
"""
if self.__precision is not None and position.precision is not None:
- return GazePosition(numpy.array(self) - numpy.array(position), precision = max(self.__precision, position.precision), timestamp=self.timestamp)
+ return GazePosition(tuple(numpy.array(self) - numpy.array(position)), precision = max(self.__precision, position.precision), timestamp = self.timestamp)
else:
- return GazePosition(numpy.array(self) - numpy.array(position), timestamp=self.timestamp)
+ return GazePosition(tuple(numpy.array(self) - numpy.array(position)), timestamp = self.timestamp)
def __rsub__(self, position: Self) -> Self:
"""Reversed subtract position.
@@ -129,11 +129,11 @@ class GazePosition(tuple, DataFeatures.TimestampedObject):
"""
if self.__precision is not None and position.precision is not None:
- return GazePosition(numpy.array(position) - numpy.array(self), precision = max(self.__precision, position.precision), timestamp=self.timestamp)
+ return GazePosition(tuple(numpy.array(position) - numpy.array(self)), precision = max(self.__precision, position.precision), timestamp = self.timestamp)
else:
- return GazePosition(numpy.array(position) - numpy.array(self), timestamp=self.timestamp)
+ return GazePosition(tuple(numpy.array(position) - numpy.array(self)), timestamp = self.timestamp)
def __mul__(self, factor: int|float) -> Self:
"""Multiply position by a factor.
@@ -144,7 +144,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject):
!!! note
The returned position timestamp is the self object timestamp.
"""
- return GazePosition(numpy.array(self) * factor, precision = self.__precision * factor if self.__precision is not None else None, timestamp=self.timestamp)
+ return GazePosition(tuple(numpy.array(self) * factor), precision = self.__precision * factor if self.__precision is not None else None, timestamp = self.timestamp)
def __pow__(self, factor: int|float) -> Self:
"""Power position by a factor.
@@ -155,7 +155,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject):
!!! note
The returned position timestamp is the self object timestamp.
"""
- return GazePosition(numpy.array(self) ** factor, precision = self.__precision ** factor if self.__precision is not None else None, timestamp=self.timestamp)
+ return GazePosition(tuple(numpy.array(self) ** factor), precision = self.__precision ** factor if self.__precision is not None else None, timestamp = self.timestamp)
def distance(self, gaze_position) -> float:
"""Distance to another gaze positions."""
@@ -291,6 +291,7 @@ class GazePositionCalibrationFailed(Exception):
class GazePositionCalibrator(DataFeatures.PipelineStepObject):
"""Abstract class to define what should provide a gaze position calibrator algorithm."""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
@@ -332,7 +333,7 @@ class GazePositionCalibrator(DataFeatures.PipelineStepObject):
raise NotImplementedError('apply() method not implemented')
- def draw(self, image: numpy.array):
+ def draw(self, image: numpy.array, **kwargs):
"""Draw calibration into image.
Parameters:
@@ -543,6 +544,7 @@ class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList):
class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
"""Abstract class to define what should provide a gaze movement identifier."""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
@@ -821,6 +823,7 @@ class ScanPath(list):
class ScanPathAnalyzer(DataFeatures.PipelineStepObject):
"""Abstract class to define what should provide a scan path analyzer."""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
@@ -840,6 +843,7 @@ class ScanPathAnalyzer(DataFeatures.PipelineStepObject):
class AOIMatcher(DataFeatures.PipelineStepObject):
"""Abstract class to define what should provide an AOI matcher algorithm."""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
@@ -1108,7 +1112,7 @@ class AOIScanPath(list):
self.__movements.append(saccade)
- def append_fixation(self, fixation, looked_aoi: str) -> bool:
+ def append_fixation(self, fixation, looked_aoi: str):
"""Append new fixation to aoi scan path and return last new aoi scan step if one have been created.
!!! warning
@@ -1197,6 +1201,7 @@ class AOIScanPath(list):
class AOIScanPathAnalyzer(DataFeatures.PipelineStepObject):
"""Abstract class to define what should provide an aoi scan path analyzer."""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
diff --git a/src/argaze/PupilAnalysis/WorkloadIndex.py b/src/argaze/PupilAnalysis/WorkloadIndex.py
index 99427fe..bced982 100644
--- a/src/argaze/PupilAnalysis/WorkloadIndex.py
+++ b/src/argaze/PupilAnalysis/WorkloadIndex.py
@@ -28,8 +28,13 @@ class PupilDiameterAnalyzer(PupilFeatures.PupilDiameterAnalyzer):
reference: base line value.
period: identification period length.
"""
+
+ @DataFeatures.PipelineStepInit
def __init__(self, reference: PupilFeatures.PupilDiameter, period: int|float = 1):
+ # Init PupilDiameterAnalyzer class
+ super().__init__()
+
assert(not math.isnan(reference))
self.__reference = reference
@@ -56,7 +61,7 @@ class PupilDiameterAnalyzer(PupilFeatures.PupilDiameterAnalyzer):
# Ignore non valid pupil diameter
if not math.isnan(pupil_diameter):
- return None
+ return math.nan
if pupil_diameter.timestamp - self.__last_ts >= self.__period:
diff --git a/src/argaze/PupilAnalysis/__init__.py b/src/argaze/PupilAnalysis/__init__.py
index c563968..73b15ee 100644
--- a/src/argaze/PupilAnalysis/__init__.py
+++ b/src/argaze/PupilAnalysis/__init__.py
@@ -1,4 +1,4 @@
"""
Class interface to work with various pupil analysis algorithms.
"""
-__all__ = ['WorkloadIndex'] \ No newline at end of file
+__all__ = ['WorkloadIndex']
diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py
index bd01eda..8c0068f 100644
--- a/src/argaze/__init__.py
+++ b/src/argaze/__init__.py
@@ -1,4 +1,5 @@
"""
ArGaze is divided in submodules dedicated to various specifics features.
"""
-__all__ = ['ArUcoMarkers','AreaOfInterest','ArFeatures','GazeFeatures','GazeAnalysis','PupilFeatures','PupilAnalysis','DataFeatures','utils'] \ No newline at end of file
+__all__ = ['ArUcoMarkers', 'AreaOfInterest', 'ArFeatures', 'GazeFeatures', 'GazeAnalysis', 'PupilFeatures',
+ 'PupilAnalysis', 'DataFeatures', 'utils']
diff --git a/src/argaze/__main__.py b/src/argaze/__main__.py
index 9adda75..15a78c1 100644
--- a/src/argaze/__main__.py
+++ b/src/argaze/__main__.py
@@ -28,23 +28,22 @@ import cv2
# Manage arguments
parser = argparse.ArgumentParser(description=__doc__.split('-')[0])
parser.add_argument('context_file', metavar='CONTEXT_FILE', type=str, help='JSON context filepath')
-parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console')
+parser.add_argument('-v', '--verbose', action='store_true', default=False,
+ help='enable verbose mode to print information in console')
args = parser.parse_args()
# Manage logging
-logging.basicConfig(format = '%(levelname)s: %(message)s', level = logging.DEBUG if args.verbose else logging.INFO)
+logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG if args.verbose else logging.INFO)
# Load context from JSON file
with from_json(args.context_file) as context:
# Loaded object must be a subclass of ArContext
if not issubclass(type(context), ArContext):
-
raise TypeError('Loaded object is not a subclass of ArContext')
if args.verbose:
-
print(context)
# Create a window to display context
@@ -63,15 +62,13 @@ with from_json(args.context_file) as context:
if issubclass(type(context.pipeline), ArCamera):
for scene_frame in context.pipeline.scene_frames():
-
cv2.imshow(scene_frame.name, scene_frame.image())
-
+
# Key interaction
key_pressed = cv2.waitKey(10)
# Esc: close window
if key_pressed == 27:
-
raise KeyboardInterrupt()
# Stop frame display
diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py
index 3f2ceda..133809b 100644
--- a/src/argaze/utils/UtilsFeatures.py
+++ b/src/argaze/utils/UtilsFeatures.py
@@ -117,7 +117,7 @@ class TimeProbe():
return lap_time * 1e3, self.__lap_counter, self.__elapsed_time * 1e3
- def end(self) -> float:
+ def end(self) -> tuple[float, int]:
"""
Stop chronometer
@@ -163,6 +163,7 @@ def PrintCallStack(method):
class FileWriter(DataFeatures.PipelineStepObject):
"""Write data into a file line by line."""
+ # noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
diff --git a/src/argaze/utils/__init__.py b/src/argaze/utils/__init__.py
index 2cee626..ed61966 100644
--- a/src/argaze/utils/__init__.py
+++ b/src/argaze/utils/__init__.py
@@ -1,4 +1,4 @@
"""
Miscellaneous utilities.
"""
-__all__ = ['UtilsFeatures', 'contexts'] \ No newline at end of file
+__all__ = ['UtilsFeatures', 'contexts']
diff --git a/src/argaze/utils/contexts/PupilLabs.py b/src/argaze/utils/contexts/PupilLabs.py
index 9265f2c..43fe47e 100644
--- a/src/argaze/utils/contexts/PupilLabs.py
+++ b/src/argaze/utils/contexts/PupilLabs.py
@@ -46,10 +46,10 @@ class LiveStream(ArFeatures.ArContext):
# Create stop event
self.__stop_event = threading.Event()
- # Init timestamp
+ # Init timestamp
self.__start_time = time.time()
-
- # Look for devices. Returns as soon as it has found the first device.
+
+ # Look for devices. Returns as soon as it has found the first device.
self.__device = discover_one_device(max_search_duration_seconds=10)
if self.__device is None:
diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py
index f83c1ac..0fba2ff 100644
--- a/src/argaze/utils/contexts/TobiiProGlasses2.py
+++ b/src/argaze/utils/contexts/TobiiProGlasses2.py
@@ -135,7 +135,7 @@ class PupilCenter():
validity: int
index: int
- value: tuple((float, float, float))
+ value: tuple[(float, float, float)]
eye: str # 'right' or 'left'
@dataclass
@@ -153,7 +153,7 @@ class GazeDirection():
validity: int
index: int
- value: tuple((float, float, float))
+ value: tuple[(float, float, float)]
eye: str # 'right' or 'left'
@dataclass
@@ -163,7 +163,7 @@ class GazePosition():
validity: int
index: int
l: str # ?
- value: tuple((float, float))
+ value: tuple[(float, float)]
@dataclass
class GazePosition3D():
@@ -171,14 +171,14 @@ class GazePosition3D():
validity: int
index: int
- value: tuple((float, float))
+ value: tuple[(float, float)]
@dataclass
class MarkerPosition():
"""Define marker data (marker3d marker2d)."""
- value_3d: tuple((float, float, float))
- value_2d: tuple((float, float))
+ value_3d: tuple[(float, float, float)]
+ value_2d: tuple[(float, float)]
class TobiiJsonDataParser():
@@ -245,15 +245,18 @@ class TobiiJsonDataParser():
return self.__parse_pupil_or_gaze_map[second_key](status, gaze_index, data)
- def __parse_dir_sig(self, status, data):
+ @staticmethod
+ def __parse_dir_sig(status, data):
return DirSig(data['dir'], data['sig'])
- def __parse_pts(self, status, data):
+ @staticmethod
+ def __parse_pts(status, data):
return PresentationTimeStamp(data['pts'])
- def __parse_vts(self, status, data):
+ @staticmethod
+ def __parse_vts(status, data):
# ts is not sent when recording
try:
@@ -266,43 +269,53 @@ class TobiiJsonDataParser():
return VideoTimeStamp(data['vts'], ts)
- def __parse_event_synch(self, status, data):
+ @staticmethod
+ def __parse_event_synch(status, data):
return EventSynch(data['evts'])
- def __parse_event(self, status, data):
+ @staticmethod
+ def __parse_event(status, data):
return Event(data['ets'], data['type'], data['tag'])
- def __parse_accelerometer(self, status, data):
+ @staticmethod
+ def __parse_accelerometer(status, data):
return Accelerometer(data['ac'])
- def __parse_gyroscope(self, status, data):
+ @staticmethod
+ def __parse_gyroscope(status, data):
return Gyroscope(data['gy'])
- def __parse_pupil_center(self, status, gaze_index, data):
+ @staticmethod
+ def __parse_pupil_center(status, gaze_index, data):
return PupilCenter(status, gaze_index, data['pc'], data['eye'])
- def __parse_pupil_diameter(self, status, gaze_index, data):
+ @staticmethod
+ def __parse_pupil_diameter(status, gaze_index, data):
return PupilDiameter(status, gaze_index, data['pd'], data['eye'])
- def __parse_gaze_direction(self, status, gaze_index, data):
+ @staticmethod
+ def __parse_gaze_direction(status, gaze_index, data):
return GazeDirection(status, gaze_index, data['gd'], data['eye'])
- def __parse_gaze_position(self, status, gaze_index, data):
+ @staticmethod
+ def __parse_gaze_position(status, gaze_index, data):
return GazePosition(status, gaze_index, data['l'], data['gp'])
- def __parse_gaze_position_3d(self, status, gaze_index, data):
+ @staticmethod
+ def __parse_gaze_position_3d(status, gaze_index, data):
return GazePosition3D(status, gaze_index, data['gp3'])
- def __parse_marker_position(self, status, data):
+ @staticmethod
+ def __parse_marker_position(status, data):
return MarkerPosition(data['marker3d'], data['marker2d'])
@@ -518,9 +531,9 @@ class LiveStream(ArFeatures.ArContext):
logging.info('%s: %s', key, str(value))
# Store video stream info
- self.__video_width = configuration['sys_sc_width']
- self.__video_height = configuration['sys_sc_height']
- self.__video_fps = configuration['sys_sc_fps']
+ self.__video_width = int(configuration['sys_sc_width'])
+ self.__video_height = int(configuration['sys_sc_height'])
+ self.__video_fps = float(configuration['sys_sc_fps'])
# Bind to project if required
if self.__project_name is not None:
@@ -799,7 +812,7 @@ class LiveStream(ArFeatures.ArContext):
time.sleep(1)
- def __get_request(self, api_action) -> str:
+ def __get_request(self, api_action) -> any:
"""Send a GET request and get data back."""
url = self.__base_url + api_action
@@ -820,7 +833,7 @@ class LiveStream(ArFeatures.ArContext):
return data
- def __post_request(self, api_action, data = None, wait_for_response = True) -> str:
+ def __post_request(self, api_action, data = None, wait_for_response = True) -> any:
"""Send a POST request and get result back."""
url = self.__base_url + api_action
@@ -880,7 +893,8 @@ class LiveStream(ArFeatures.ArContext):
return json_data[key]
- def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT):
+ @staticmethod
+ def __get_current_datetime(timeformat=TOBII_DATETIME_FORMAT):
return datetime.datetime.now().replace(microsecond=0).strftime(timeformat)
@@ -1164,22 +1178,22 @@ class PostProcessing(ArFeatures.ArContext):
self.__segment = segment
@property
- def start(self) -> int:
+ def start(self) -> float:
"""Start reading timestamp in millisecond."""
return self.__start
@start.setter
- def start(self, start: int):
+ def start(self, start: float):
self.__start = start
@property
- def end(self) -> int:
+ def end(self) -> float:
"""End reading timestamp in millisecond."""
return self.__end
@end.setter
- def end(self, end: int):
+ def end(self, end: float):
self.__end = end
@@ -1282,7 +1296,7 @@ class PostProcessing(ArFeatures.ArContext):
return self
- def __next__(self) -> tuple[int, numpy.array, list[tuple[int, object, str]]]:
+ def __next__(self):
data_list = []
video_ts, image = self.__next_video_image()
@@ -1300,7 +1314,7 @@ class PostProcessing(ArFeatures.ArContext):
return output
- def __next_video_image(self) -> tuple[int, numpy.array]:
+ def __next_video_image(self):
image = next(self.__video_file.decode(self.__video_file.streams.video[0]))
ts = int(image.time * 1e3)
diff --git a/src/argaze/utils/contexts/__init__.py b/src/argaze/utils/contexts/__init__.py
index b76cd8b..19b7533 100644
--- a/src/argaze/utils/contexts/__init__.py
+++ b/src/argaze/utils/contexts/__init__.py
@@ -1,4 +1,4 @@
"""
Collection of device interfaces.
"""
-__all__ = ['TobiiProGlasses2'] \ No newline at end of file
+__all__ = ['TobiiProGlasses2']