diff options
author | Théo de la Hogue | 2022-11-09 13:45:49 +0100 |
---|---|---|
committer | Théo de la Hogue | 2022-11-09 13:45:49 +0100 |
commit | d56194d29c792a375abf306f22254e2ab51f11f6 (patch) | |
tree | 61cc6784db2230cee05cfb49de9d91e07d4dc116 | |
parent | a2701988bb9eea04ee83de20b7e71caccbe5b5af (diff) | |
download | argaze-d56194d29c792a375abf306f22254e2ab51f11f6.zip argaze-d56194d29c792a375abf306f22254e2ab51f11f6.tar.gz argaze-d56194d29c792a375abf306f22254e2ab51f11f6.tar.bz2 argaze-d56194d29c792a375abf306f22254e2ab51f11f6.tar.xz |
Large documentation improvement.
30 files changed, 762 insertions, 596 deletions
@@ -4,11 +4,11 @@ An open-source python toolkit to deal with dynamic Areas Of Interest (AOI) and g The ArGaze toolkit provides some generics data structures and algorithms to build AR environement with dynamic AOI and so allow gaze tracking with mobil eye tracker devices. It is divided in submodules dedicated to various specifics features: -* ArUcoMarkers: ArUco markers generator, traking, camera calibration, ... -* AreaOfInterest: Area Of Interest (AOI) scene management for 2D and 3D environment. -* TobiiGlassesPro2: A gaze tracking device interface. -* GazeFeatures: Generic gaze data structures definitions. -* utils: Collection of command-line high level features scripts based on ArGaze toolkit. +* `argaze.ArUcoMarkers`: ArUco markers generator, traking, camera calibration, ... +* `argaze.AreaOfInterest`: Area Of Interest (AOI) scene management for 2D and 3D environment. +* `argaze.GazeFeatures`: Generic gaze data structures definitions. +* `argaze.TobiiGlassesPro2`: A gaze tracking device interface. +* `argaze.utils`: Collection of command-line high level features scripts based on ArGaze toolkit. ## Installation @@ -32,11 +32,12 @@ python -m build pip install ./dist/argaze-VERSION.whl ``` -*As Argaze library developper, you should prefer to install the package in developer mode to test live code changes:* +.. note:: As Argaze library developper + *You should prefer to install the package in developer mode to test live code changes:* -``` -pip install -e . -``` + ``` + pip install -e . + ``` ## Documentation @@ -46,7 +47,7 @@ The [wiki](https://git.recherche.enac.fr/projects/argaze/wiki) provides many exp ### Cookbook -The **utils** submodule is a good place to get ready made code examples. +The `argaze.utils` submodule is a good place to get ready made code examples. ### Code @@ -65,8 +66,9 @@ pip install pdoc pdoc -o ./doc ./src/argaze/ ``` -*As Argaze library developper, you should prefer to create a local html server to watch live documentation changes:* +.. note:: As Argaze library developper + *You should prefer to create a local html server to watch live documentation changes:* -``` -pdoc ./src/argaze/ -```
\ No newline at end of file + ``` + pdoc ./src/argaze/ + ```
\ No newline at end of file diff --git a/src/argaze/ArUcoMarkers/ArUcoBoard.py b/src/argaze/ArUcoMarkers/ArUcoBoard.py index eae9c48..baf373f 100644 --- a/src/argaze/ArUcoMarkers/ArUcoBoard.py +++ b/src/argaze/ArUcoMarkers/ArUcoBoard.py @@ -9,58 +9,58 @@ import cv2.aruco as aruco class ArUcoBoard(): """Calibration chess board with ArUco markers inside.""" - def __init__(self, aruco_dictionary_name: str, columns: int, rows: int, square_size: float, marker_size: float): + def __init__(self, aruco_dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary, columns: int, rows: int, square_size: float, marker_size: float): """Create columns x rows chess board with ArUco markers at given size in centimeters.""" - # load ArUco markers dictionary - self.__aruco_dict = ArUcoMarkersDictionary.ArUcoMarkersDictionary(aruco_dictionary_name) + # Store ArUco markers dictionary + self.__aruco_dict = aruco_dictionary - # store property + # Store properties self.__columns = columns self.__rows = rows self.__square_size = square_size # in cm self.__marker_size = marker_size # in cm - # create board model - self.__board = aruco.CharucoBoard_create(self.__columns, self.__rows, self.__square_size/100., self.__marker_size/100., self.__aruco_dict.get_markers()) + # Create board model + self.__board = aruco.CharucoBoard_create(self.__columns, self.__rows, self.__square_size/100., self.__marker_size/100., self.__aruco_dict.markers) def __del__(self): pass - def get_model(self): - """Get the board model. - **Returns:** aruco.CharucoBoard""" + @property + def model(self) -> aruco.CharucoBoard: + """Get the board model.""" return self.__board - def get_ids(self): - """Get board markers ids. - **Returns:** list""" + @property + def ids(self) -> list[int]: + """Get board markers ids.""" return self.__board.ids - def get_size(self): - """Get numbers of columns and rows. - **Returns:** int""" + @property + def size(self)-> int: + """Get numbers of columns and rows.""" return self.__board.getChessboardSize() - def get_markers_number(self): - """Get number of markers. - **Returns:** int""" + @property + def markers_number(self) -> int: + """Get number of markers.""" return len(self.__board.ids) - def get_corners_number(self): - """Get number of corners. - **Returns:** int""" + @property + def corners_number(self) -> int: + """Get number of corners.""" return (self.__board.getChessboardSize()[0] - 1 ) * (self.__board.getChessboardSize()[1] - 1) def export(self, destination_folder: str, dpi: int): """Save a picture of the calibration board.""" - output_filename = f'board_{self.__columns*self.__square_size}cmx{self.__rows*self.__square_size}cm_markers_{self.__aruco_dict.get_markers_format()}_{self.__marker_size}cm.png' + output_filename = f'board_{self.__columns*self.__square_size}cmx{self.__rows*self.__square_size}cm_markers_{self.__aruco_dict.format}_{self.__marker_size}cm.png' dimension = [int(e * self.__board.getSquareLength() * 254 * dpi) for e in self.__board.getChessboardSize()] # 1 meter = 254 inches diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index aa7426c..b63743d 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -48,19 +48,22 @@ class ArUcoCamera(): json.dump(calibration_data, calibration_file, ensure_ascii=False, indent=4) - def get_rms(self): - """Get Root Mean Square (rms) error. - **Returns:** float""" + @property + def rms(self) -> float: + """Get Root Mean Square (rms) error.""" + return self.__rms - def get_K(self): - """Get camera matrix. - **Returns:** numpy.array""" + @property + def K(self) -> numpy.array: + """Get camera matrix.""" + return self.__K - def get_D(self): - """Get camera distorsion coefficients. - **Returns:** numpy.array""" + @property + def D(self) -> numpy.array: + """Get camera distorsion coefficients.""" + return self.__D def calibrate(self, board, frame_width, frame_height): @@ -69,7 +72,7 @@ class ArUcoCamera(): if self.__corners_set_number > 0: self.__dimensions = [frame_width, frame_height] - self.__rms, self.__K, self.__D, r, t = aruco.calibrateCameraCharuco(self.__corners_set, self.__corners_set_ids, board.get_model(), self.__dimensions, None, None) + self.__rms, self.__K, self.__D, r, t = aruco.calibrateCameraCharuco(self.__corners_set, self.__corners_set_ids, board.model, self.__dimensions, None, None) def reset_calibration_data(self): """Clear all calibration data.""" @@ -85,9 +88,9 @@ class ArUcoCamera(): self.__corners_set.append(corners) self.__corners_set_ids.append(corners_ids) - def get_calibration_data_count(self): - """Get how much calibration data are stored. - **Returns:** int""" + @property + def calibration_data_count(self) -> int: + """Get how much calibration data are stored.""" return self.__corners_set_number diff --git a/src/argaze/ArUcoMarkers/ArUcoCube.py b/src/argaze/ArUcoMarkers/ArUcoCube.py index 51ea314..3d889da 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCube.py +++ b/src/argaze/ArUcoMarkers/ArUcoCube.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import Tuple from dataclasses import dataclass, field import json import math @@ -201,7 +202,13 @@ class ArUcoCube(): def estimate_pose(self, tracked_markers): """Estimate cube pose from tracked markers (cf ArUcoTracker.track()) - **Returns:** numpy.array, numpy.array, bool, int""" + + * **Returns:** + - translation vector + - rotation vector + - pose estimation success status + - the number of faces used to estimate the pose as validity score + """ # Init pose data self.__translation = numpy.zeros(3) @@ -326,27 +333,52 @@ class ArUcoCube(): return self.get_pose() - def get_pose(self): - """Get cube pose. - **Returns:** numpy.array, numpy.array, bool, int""" + @property + def translation(self) -> numpy.array: + """Access to cube translation vector. - return self.__translation, self.__rotation, self.__succeded, self.__validity + .. warning:: + Setting cube translation vector implies succeded status to be True and validity score to be 0.""" - def set_pose(self, tvec = numpy.array([]), rvec = numpy.array([])): - """Set cube pose.""" + return self.__translation - if tvec.size == 3: - self.__translation = tvec + @translation.setter + def translation(self, tvec): - if rvec.size == 3: - self.__rotation = rvec + self.__translation = tvec + self.__succeded = True + self.__validity = 0 + + @property + def rotation(self) -> numpy.array: + """Access to cube rotation vector. + + .. warning:: + Setting cube rotation vector implies succeded status to be True and validity score to be 0.""" + + return self.__translation + + @rotation.setter + def rotation(self, rvec): + self.__rotation = rvec self.__succeded = True self.__validity = 0 + @property + def succeded(self) -> bool: + """Access to cube pose estimation succeded status.""" + + return self.__succeded + + @property + def validity(self) -> int: + """Access to cube pose estimation validity score.""" + + return self.__validity + def draw(self, frame, K, D, draw_faces=True): - """Draw cube axis and faces. - **Returns:** frame""" + """Draw cube axis and faces.""" l = self.edge_size / 2 ll = self.edge_size @@ -362,9 +394,9 @@ class ArUcoCube(): axisPoints, _ = cv.projectPoints(axisPoints, self.__rotation, self.__translation, K, D) axisPoints = axisPoints.astype(int) - frame = cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (n,n,f), 5) # X (red) - frame = cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (n,f,n), 5) # Y (green) - frame = cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (f,n,n), 5) # Z (blue) + cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (n,n,f), 5) # X (red) + cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (n,f,n), 5) # Y (green) + cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (f,n,n), 5) # Z (blue) if draw_faces: @@ -373,30 +405,30 @@ class ArUcoCube(): leftPoints, _ = cv.projectPoints(leftPoints, self.__rotation, self.__translation, K, D) leftPoints = leftPoints.astype(int) - frame = cv.line(frame, tuple(leftPoints[0].ravel()), tuple(leftPoints[1].ravel()), (n,n,f), 2) - frame = cv.line(frame, tuple(leftPoints[1].ravel()), tuple(leftPoints[2].ravel()), (n,n,f), 2) - frame = cv.line(frame, tuple(leftPoints[2].ravel()), tuple(leftPoints[3].ravel()), (n,n,f), 2) - frame = cv.line(frame, tuple(leftPoints[3].ravel()), tuple(leftPoints[0].ravel()), (n,n,f), 2) + cv.line(frame, tuple(leftPoints[0].ravel()), tuple(leftPoints[1].ravel()), (n,n,f), 2) + cv.line(frame, tuple(leftPoints[1].ravel()), tuple(leftPoints[2].ravel()), (n,n,f), 2) + cv.line(frame, tuple(leftPoints[2].ravel()), tuple(leftPoints[3].ravel()), (n,n,f), 2) + cv.line(frame, tuple(leftPoints[3].ravel()), tuple(leftPoints[0].ravel()), (n,n,f), 2) # Draw top face topPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, l, -l], [l, l, -l]]).reshape(-1, 3) topPoints, _ = cv.projectPoints(topPoints, self.__rotation, self.__translation, K, D) topPoints = topPoints.astype(int) - frame = cv.line(frame, tuple(topPoints[0].ravel()), tuple(topPoints[1].ravel()), (n,f,n), 2) - frame = cv.line(frame, tuple(topPoints[1].ravel()), tuple(topPoints[2].ravel()), (n,f,n), 2) - frame = cv.line(frame, tuple(topPoints[2].ravel()), tuple(topPoints[3].ravel()), (n,f,n), 2) - frame = cv.line(frame, tuple(topPoints[3].ravel()), tuple(topPoints[0].ravel()), (n,f,n), 2) + cv.line(frame, tuple(topPoints[0].ravel()), tuple(topPoints[1].ravel()), (n,f,n), 2) + cv.line(frame, tuple(topPoints[1].ravel()), tuple(topPoints[2].ravel()), (n,f,n), 2) + cv.line(frame, tuple(topPoints[2].ravel()), tuple(topPoints[3].ravel()), (n,f,n), 2) + cv.line(frame, tuple(topPoints[3].ravel()), tuple(topPoints[0].ravel()), (n,f,n), 2) # Draw front face frontPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, -l, l], [l, -l, l]]).reshape(-1, 3) frontPoints, _ = cv.projectPoints(frontPoints, self.__rotation, self.__translation, K, D) frontPoints = frontPoints.astype(int) - frame = cv.line(frame, tuple(frontPoints[0].ravel()), tuple(frontPoints[1].ravel()), (f,n,n), 2) - frame = cv.line(frame, tuple(frontPoints[1].ravel()), tuple(frontPoints[2].ravel()), (f,n,n), 2) - frame = cv.line(frame, tuple(frontPoints[2].ravel()), tuple(frontPoints[3].ravel()), (f,n,n), 2) - frame = cv.line(frame, tuple(frontPoints[3].ravel()), tuple(frontPoints[0].ravel()), (f,n,n), 2) + cv.line(frame, tuple(frontPoints[0].ravel()), tuple(frontPoints[1].ravel()), (f,n,n), 2) + cv.line(frame, tuple(frontPoints[1].ravel()), tuple(frontPoints[2].ravel()), (f,n,n), 2) + cv.line(frame, tuple(frontPoints[2].ravel()), tuple(frontPoints[3].ravel()), (f,n,n), 2) + cv.line(frame, tuple(frontPoints[3].ravel()), tuple(frontPoints[0].ravel()), (f,n,n), 2) except Exception as e: @@ -406,5 +438,3 @@ class ArUcoCube(): print(self.__succeded) print(self.__validity) print(axisPoints) - - return frame diff --git a/src/argaze/ArUcoMarkers/ArUcoMarker.py b/src/argaze/ArUcoMarkers/ArUcoMarker.py index 28f1f31..9d4a53e 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarker.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarker.py @@ -25,17 +25,17 @@ class ArUcoMarker(): """Estimated 2D corner positions in camera image referential.""" translation: numpy.array = field(init=False, repr=False) - """Estimated 3D center position in camera referential.""" + """Estimated 3D center position in camera world referential.""" rotation: numpy.array = field(init=False, repr=False) - """Estimated 3D marker rotation in camera referential.""" + """Estimated 3D marker rotation in camera world referential.""" points: numpy.array = field(init=False, repr=False) - """Estimated 3D corners positions in camera referential.""" + """Estimated 3D corners positions in camera world referential.""" - def center(self, i): - """Get 2D center position in camera image referential. - **Returns:** numpy.array""" + @property + def center(self, i) -> numpy.array: + """Get 2D center position in camera image referential.""" return self.corners[0].mean(axis=0) diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py index ce68703..3b23acb 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py @@ -27,7 +27,7 @@ all_aruco_markers_dictionaries = { 'DICT_APRILTAG_36h10': aruco.DICT_APRILTAG_36h10, 'DICT_APRILTAG_36h11': aruco.DICT_APRILTAG_36h11 } -"""Dictionnary to list all built-in ArUco markers dictionaries from OpenCV package.""" +"""Dictionnary to list all built-in ArUco markers dictionaries from OpenCV ArUco package.""" class ArUcoMarkersDictionary(): """Handle an ArUco markers dictionary.""" @@ -81,30 +81,29 @@ class ArUcoMarkersDictionary(): self.__aruco_dict = aruco.Dictionary_get(all_aruco_markers_dictionaries[self.name]) - def create_marker(self, i, dpi=300): - """Create a marker image. - **Returns:** numpy.array""" + def create_marker(self, i, dpi=300) -> numpy.array: + """Create a marker image.""" marker = numpy.zeros((dpi, dpi, 1), dtype="uint8") aruco.drawMarker(self.__aruco_dict, i, dpi, marker, 1) return numpy.repeat(marker, 3).reshape(dpi, dpi, 3) - def get_markers(self): - """Get all markers from dictionary. - **Returns:** aruco.Dictionary""" + @property + def markers(self)-> aruco.Dictionary: + """Get all markers from dictionary.""" return self.__aruco_dict - def get_markers_format(self): - """Get markers format. - **Returns:** str""" + @property + def format(self) -> str: + """Get markers format.""" return self.__format - def get_markers_number(self): - """Get markers format. - **Returns:** int""" + @property + def size(self) -> int: + """Get number of markers inside dictionary.""" return self.__number @@ -115,8 +114,6 @@ class ArUcoMarkersDictionary(): output_filename = f'marker_{self.__format}_{i}.png' - print(destination_folder, output_filename) - # create marker marker = self.create_marker(i, dpi) diff --git a/src/argaze/ArUcoMarkers/ArUcoTracker.py b/src/argaze/ArUcoMarkers/ArUcoTracker.py index f7ea23a..ef095f2 100644 --- a/src/argaze/ArUcoMarkers/ArUcoTracker.py +++ b/src/argaze/ArUcoMarkers/ArUcoTracker.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import TypeVar, Tuple import json from collections import Counter @@ -42,6 +43,9 @@ ArUcoTrackerParameters = [ ] """All parameters are detailled on [opencv page](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html)""" +ArUcoMarker = TypeVar('ArUcoMarker', bound="ArUcoMarker") +# Type definition for type annotation convenience + class ArUcoTracker(): """Track ArUco markers into a frame.""" @@ -74,7 +78,6 @@ class ArUcoTracker(): # define track metrics data self.__track_count = 0 self.__tracked_ids = [] - self.__rejected_markers = [] def load_configuration_file(self, configuration_filepath): """Load aruco detection parameters from .json file.""" @@ -102,22 +105,24 @@ class ArUcoTracker(): print(f'\t{parameter}: {getattr(self.__detector_parameters, parameter)}') def track(self, frame, estimate_pose = True, check_rotation = False): - """Track ArUco markers in frame.""" + """Track ArUco markers in frame. + + .. danger:: DON'T MIRROR FRAME + It makes the markers detection to fail. + """ self.__tracked_markers = {} markers_corners, markers_ids, markers_rvecs, markers_tvecs, markers_points = [], [], [], [], [] - # DON'T MIRROR FRAME : it makes the markers detection to fail - # Track markers into gray picture - markers_corners, markers_ids, _ = aruco.detectMarkers(cv.cvtColor(frame, cv.COLOR_BGR2GRAY), self.__dictionary.get_markers(), parameters = self.__detector_parameters) + markers_corners, markers_ids, _ = aruco.detectMarkers(cv.cvtColor(frame, cv.COLOR_BGR2GRAY), self.__dictionary.markers, parameters = self.__detector_parameters) if len(markers_corners) > 0: # Pose estimation is optional if estimate_pose: - markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(markers_corners, self.__marker_size, self.__camera.get_K(), self.__camera.get_D()) + markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(markers_corners, self.__marker_size, self.__camera.K, self.__camera.D) # Gather tracked markers data and update metrics self.__track_count += 1 @@ -137,15 +142,15 @@ class ArUcoTracker(): self.__tracked_ids.append(marker_id) - def get_tracked_markers(self): - """Access to tracked markers dictionary. - **Returns:** dict""" + @property + def tracked_markers(self) -> dict[ArUcoMarker]: + """Access to tracked markers dictionary.""" return self.__tracked_markers - def get_tracked_markers_number(self): - """Return tracked markers number. - **Returns:** int""" + @property + def tracked_markers_number(self) -> int: + """Return tracked markers number.""" return len(list(self.__tracked_markers.keys())) @@ -154,22 +159,24 @@ class ArUcoTracker(): for marker_id, marker in self.__tracked_markers.items(): - marker.draw(frame, self.__camera.get_K(), self.__camera.get_D()) + marker.draw(frame, self.__camera.K, self.__camera.D) def track_board(self, frame, board, expected_markers_number): - """Track ArUco markers board in frame setting up the number of detected markers needed to agree detection.""" + """Track ArUco markers board in frame setting up the number of detected markers needed to agree detection. - # DON'T MIRROR FRAME : it makes the markers detection to fail + .. danger:: DON'T MIRROR FRAME + It makes the markers detection to fail. + """ # detect markers from gray picture gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) - self.__markers_corners, self.__markers_ids, rejectedPoints = aruco.detectMarkers(gray, self.__dictionary.get_markers(), parameters = self.__detector_parameters) + markers_corners, markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers, parameters = self.__detector_parameters) # if all board markers are detected - if self.get_markers_number() == expected_markers_number: + if len(markers_corners) == expected_markers_number: self.__board = board - self.__board_corners_number, self.__board_corners, self.__board_corners_ids = aruco.interpolateCornersCharuco(self.__markers_corners, self.__markers_ids, gray, self.__board.get_model()) + self.__board_corners_number, self.__board_corners, self.__board_corners_ids = aruco.interpolateCornersCharuco(markers_corners, markers_ids, gray, self.__board.model) else: @@ -183,7 +190,7 @@ class ArUcoTracker(): if self.__board != None: - cv.drawChessboardCorners(frame, ((self.__board.get_size()[0] - 1 ), (self.__board.get_size()[1] - 1)), self.__board_corners, True) + cv.drawChessboardCorners(frame, ((self.__board.size[0] - 1 ), (self.__board.size[1] - 1)), self.__board_corners, True) def reset_track_metrics(self): """Enable marker tracking metrics.""" @@ -191,27 +198,30 @@ class ArUcoTracker(): self.__track_count = 0 self.__tracked_ids = [] - def get_track_metrics(self): + @property + def track_metrics(self) -> Tuple[int, dict]: """Get marker tracking metrics. - **Returns:** int, list""" + * **Returns:** + - number of track function call + - dict with number of tracking detection for each marker ids""" - return self.__track_count, Counter(self.__tracked_ids), Counter(self.__rejected_markers) + return self.__track_count, Counter(self.__tracked_ids) - def get_board_corners_number(self): - """Get tracked board corners number. - **Returns:** int""" + @property + def board_corners_number(self) -> int: + """Get tracked board corners number.""" return self.__board_corners_number - def get_board_corners_ids(self): - """Get tracked board corners identifiers. - **Returns:** list""" + @property + def board_corners_ids(self) -> list[int]: + """Get tracked board corners identifiers.""" return self.__board_corners_ids - def get_board_corners(self): - """Get tracked board corners. - **Returns:** list""" + @property + def board_corners(self) -> list: + """Get tracked board corners.""" return self.__board_corners diff --git a/src/argaze/ArUcoMarkers/utils/tobiiglassespro2_hd.json b/src/argaze/ArUcoMarkers/utils/tobiiglassespro2_hd.json new file mode 100644 index 0000000..d21295a --- /dev/null +++ b/src/argaze/ArUcoMarkers/utils/tobiiglassespro2_hd.json @@ -0,0 +1,33 @@ +{ + "rms": 0.6688921504088245, + "dimensions": [ + 1920, + 1080 + ], + "camera matrix": [ + [ + 1135.6524381415752, + 0.0, + 956.0685325355497 + ], + [ + 0.0, + 1135.9272506869524, + 560.059099810324 + ], + [ + 0.0, + 0.0, + 1.0 + ] + ], + "distortion coefficients": [ + [ + 0.01655492265003404, + 0.1985524264972037, + 0.002129965902489484, + -0.0019528582922179365, + -0.5792910353639452 + ] + ] +}
\ No newline at end of file diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py index c669def..99bf8a9 100644 --- a/src/argaze/AreaOfInterest/AOI2DScene.py +++ b/src/argaze/AreaOfInterest/AOI2DScene.py @@ -1,5 +1,7 @@ #!/usr/bin/env python +from typing import Tuple + from argaze import DataStructures from argaze.AreaOfInterest import AOIFeatures from argaze import GazeFeatures @@ -27,9 +29,13 @@ class AOI2DScene(AOIFeatures.AOIScene): aoi.draw(frame, color) - def raycast(self, gaze_position): + def raycast(self, gaze_position) -> Tuple[str, "AOIFeatures.AreaOfInterest", bool]: """Iterate over aoi to know which aoi is looked considering only gaze position value. - **Returns:** str, AreaOfInterest, bool""" + * **Returns:** + - aoi name + - aoi object + - looked status + """ for name, aoi in self.items(): @@ -55,9 +61,15 @@ class AOI2DScene(AOIFeatures.AOIScene): # Draw form aoi.draw(frame, color) - def regioncast(self, gaze_position): + def regioncast(self, gaze_position) -> Tuple[str, "AOIFeatures.AreaOfInterest", numpy.array, float, float]: """Iterate over areas to know which aoi is looked considering gaze position value and its accuracy. - **Returns:** str, AreaOfInterest, numpy array, float, float""" + * **Returns:** + - aoi name + - aoi object + - looked region points + - ratio of looked region relatively to aoi + - ratio of looked region relatively to gaze position accuracy + """ for name, aoi in self.items(): diff --git a/src/argaze/AreaOfInterest/AOI3DScene.py b/src/argaze/AreaOfInterest/AOI3DScene.py index 0ee4442..37bc1e2 100644 --- a/src/argaze/AreaOfInterest/AOI3DScene.py +++ b/src/argaze/AreaOfInterest/AOI3DScene.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import TypeVar, Tuple from dataclasses import dataclass, field import math import re @@ -23,6 +24,12 @@ K0 = numpy.array([[1., 0., 1.], [0., 1., 1.], [0., 0., 1.]]) D0 = numpy.array([0.0, 0.0, 0.0, 0.0, 0.0]) """Define a zero distorsion matrix.""" +Self = TypeVar('Self', bound="AOI3DScene") +# Type definition for type annotation convenience + +AOI2DScene = TypeVar('AOI2DScene', bound="AOI2DScene") +# Type definition for type annotation convenience + @dataclass class AOI3DScene(AOIFeatures.AOIScene): """Define AOI 3D scene.""" @@ -134,10 +141,16 @@ class AOI3DScene(AOIFeatures.AOIScene): file.write('s off\n') file.write(vertices_ids + '\n') - def vision_cone(self, cone_radius, cone_height, cone_tip=[0., 0., 0.], cone_direction=[0., 0., 1.]): + def vision_cone(self, cone_radius, cone_height, cone_tip=[0., 0., 0.], cone_direction=[0., 0., 1.]) -> Tuple[Self, Self]: """Get AOI which are inside and out a given cone field. - By default, the cone have the tip at origin and the base oriented to positive Z axis. - **Returns:** AOI3DScene, AOI3DScene""" + + .. note:: By default + The cone have its tip at origin and its base oriented to positive Z axis. + + * **Returns:** + - scene inside of the cone + - scene outside of the cone + """ # define cone tip and direction as numpy array cone_tip = numpy.array(cone_tip).astype(numpy.float32) @@ -168,9 +181,8 @@ class AOI3DScene(AOIFeatures.AOIScene): return aoi3D_scene_inside, aoi3D_scene_outside - def project(self, T=T0, R=R0, K=K0, D=D0): - """Project 3D scene onto 2D scene according translation, rotation and optical parameters. - **Returns:** AOI2DScene""" + def project(self, T=T0, R=R0, K=K0, D=D0) -> AOI2DScene: + """Project 3D scene onto 2D scene according translation, rotation and optical parameters.""" aoi2D_scene = AOI2DScene.AOI2DScene() @@ -184,9 +196,8 @@ class AOI3DScene(AOIFeatures.AOIScene): return aoi2D_scene - def transform(self, T=T0, R=D0): - """Translate and/or rotate 3D scene. - **Returns:** AOI3DScene""" + def transform(self, T=T0, R=D0) -> Self: + """Translate and/or rotate 3D scene.""" aoi3D_scene = AOI3DScene() diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index bc6e732..9c04095 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -1,6 +1,7 @@ #!/usr/bin/env python from dataclasses import dataclass, field +from typing import TypeVar, Tuple from argaze import DataStructures, GazeFeatures @@ -10,37 +11,40 @@ import numpy from shapely.geometry import Polygon from shapely.geometry.point import Point +SelfArea = TypeVar('Self', bound="AreaOfInterest") +# Type definition for type annotation convenience + @dataclass class AreaOfInterest(numpy.ndarray): """Define 2D/3D Area Of Interest as an array of points.""" - def dimension(self): - """Number of coordinates coding area points positions. - **Returns:** int""" + @property + def dimension(self) -> int: + """Number of axis coding area points positions.""" return self.shape[1] - def bounding_box(self): - """Get area's bounding box. - **Returns:** numpy.array""" + @property + def bounding_box(self) -> numpy.array: + """Get area's bounding box.""" min_x, min_y = numpy.min(self, axis=0) max_x, max_y = numpy.max(self, axis=0) return numpy.array([(min_x, min_y), (max_x, min_y), (max_x, max_y), (min_x, max_y)]) - def center(self): - """Center of mass. - **Returns:** numpy.array""" + @property + def center(self) -> numpy.array: + """Center of mass.""" + return self.mean(axis=0) - def clockwise(self): + def clockwise(self) -> SelfArea: """Get area points in clocwise order. - Available for 2D AOI only. - **Returns:** numpy.array""" + .. warning:: + Available for 2D AOI only.""" - if self.dimension() != 2: - raise RuntimeError(f'Bad area dimension ({self.dimension()})') + assert(self.dimension() == 2) O = self.center() OP = (self - O) / numpy.linalg.norm(self - O) @@ -48,23 +52,21 @@ class AreaOfInterest(numpy.ndarray): return self[numpy.argsort(angles)] - def looked(self, gaze_position): + def looked(self, gaze_position) -> bool: """Is gaze position inside area? - Available for 2D AOI only. - **Returns:** bool""" + .. warning:: + Available for 2D AOI only.""" - if self.dimension() != 2: - raise RuntimeError(f'Bad area dimension ({self.dimension()})') + assert(self.dimension() == 2) return mpath.Path(self).contains_points([tuple(gaze_position)])[0] - def look_at(self, pixel_position): + def look_at(self, pixel_position) -> numpy.array: """Get where the area is looked using perpespective transformation. - Available for 2D AOI only. - **Returns:** list""" + .. warning:: + Available for 2D AOI only.""" - if self.dimension() != 2: - raise RuntimeError(f'Bad area dimension ({self.dimension()})') + assert(self.dimension() == 2) Src = self.clockwise() Src_origin = Src[0] @@ -80,12 +82,12 @@ class AreaOfInterest(numpy.ndarray): return numpy.around(La, 4).tolist() - def looked_pixel(self, look_at): + def looked_pixel(self, look_at) -> numpy.array: """Get which pixel is looked inside 2D AOI. - **Returns:** numpy.array""" + .. warning:: + Available for 2D AOI only.""" - if self.dimension() != 2: - raise RuntimeError(f'Bad area dimension ({self.dimension()})') + assert(self.dimension() == 2) Src = numpy.array([[0., 0.], [1., 0.], [1., 1.], [0., 1.]]).astype(numpy.float32) @@ -101,13 +103,12 @@ class AreaOfInterest(numpy.ndarray): return numpy.rint(Lp).astype(int).tolist() - def looked_region(self, gaze_position): + def looked_region(self, gaze_position) -> Tuple[numpy.array, float, float]: """Get intersection shape with gaze accuracy circle as the looked area, (looked area / AOI area) and (looked area / gaze accuracy circle area). - Available for 2D AOI only. - **Returns:** numpy.array, float, float""" + .. warning:: + Available for 2D AOI only.""" - if self.dimension() != 2: - raise RuntimeError(f'Bad area dimension ({self.dimension()})') + assert(self.dimension() == 2) self_polygon = Polygon(self) gaze_circle = Point(gaze_position).buffer(gaze_position.accuracy) @@ -127,10 +128,11 @@ class AreaOfInterest(numpy.ndarray): return empty_array, 0., 0. def draw(self, frame, color, border_size=1): - """Draw 2D AOI into frame.""" + """Draw 2D AOI into frame. + .. warning:: + Available for 2D AOI only.""" - if self.dimension() != 2: - raise RuntimeError(f'Bad area dimension ({self.dimension()})') + assert(self.dimension() == 2) if len(self) > 1: @@ -144,6 +146,9 @@ class AreaOfInterest(numpy.ndarray): center_pixel = numpy.rint(self.center()).astype(int) cv.circle(frame, center_pixel, 1, color, -1) +SelfScene = TypeVar('Self', bound="AOIScene") +# Type definition for type annotation convenience + @dataclass class AOIScene(): """Define 2D/3D AOI scene.""" @@ -154,10 +159,10 @@ class AOIScene(): areas: dict = field(init=False, default_factory=dict) """All aois in the scene.""" - def __getitem__(self, key): + def __getitem__(self, name) -> AreaOfInterest: """Get an aoi from the scene.""" - return numpy.array(self.areas[key]).astype(numpy.float32).view(AreaOfInterest) + return numpy.array(self.areas[name]).astype(numpy.float32).view(AreaOfInterest) def __setitem__(self, name, aoi: AreaOfInterest): """Add an aoi to the scene.""" @@ -169,22 +174,20 @@ class AOIScene(): del self.areas[key] - def items(self): - """Iterate over areas. - **Returns:** str, AreaOfInterest""" + def items(self) -> Tuple[str, AreaOfInterest]: + """Iterate over areas.""" for name, area in self.areas.items(): yield name, numpy.array(area).astype(numpy.float32).view(AreaOfInterest) - def keys(self): - """Get areas name. - **Returns:** dict_keys""" + def keys(self) -> list[str]: + """Get areas name.""" return self.areas.keys() - def bounds(self): - """Get scene's bounds. - **Returns:** numpy.array""" + @property + def bounds(self) -> numpy.array: + """Get scene's bounds.""" all_vertices = [] @@ -199,25 +202,24 @@ class AOIScene(): return numpy.array([min_bounds, max_bounds]) - def center(self): - """Get scene's center point. - **Returns:** numpy.array""" + @property + def center(self) -> numpy.array: + """Get scene's center point.""" min_bounds, max_bounds = self.bounds() return (min_bounds + max_bounds) / 2 - def size(self): - """Get scene size. - **Returns:** numpy.array""" + @property + def size(self) -> numpy.array: + """Get scene size.""" min_bounds, max_bounds = self.bounds() return max_bounds - min_bounds - def copy(self, exclude=[]): - """Copy scene partly excluding aoi by name. - **Returns:** AOIScene""" + def copy(self, exclude=[]) -> SelfScene: + """Copy scene partly excluding aoi by name.""" scene_copy = type(self)() @@ -240,11 +242,10 @@ class EmptyAOIScene(AOIScene): class TimeStampedAOIScenes(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store AOI scenes in time.""" - def __setitem__(self, key, value): + def __setitem__(self, ts, scene): """Force value to inherit from AOIScene.""" - if type(value).__bases__[0] != AOIScene: - raise ValueError(f'value must inherit from AOIScene') + assert(type(value).__bases__[0] == AOIScene) - super().__setitem__(key, value) + super().__setitem__(ts, scene) diff --git a/src/argaze/DataStructures.py b/src/argaze/DataStructures.py index 44f64cc..e93dc9f 100644 --- a/src/argaze/DataStructures.py +++ b/src/argaze/DataStructures.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import TypeVar, Tuple import collections import json import bisect @@ -8,6 +9,15 @@ import pandas import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches +TimeStamp = TypeVar('TimeStamp', int, float) +"""Type definition for timestamp as integer or float values.""" + +Data = TypeVar('Data') +"""Type definition for data to store anything in time.""" + +Self = TypeVar('Self', bound="TimeStampedBuffer") +# Type definition for type annotation convenience + class TimeStampedBuffer(collections.OrderedDict): """Ordered dictionary to handle timestamped data. ``` @@ -17,42 +27,49 @@ class TimeStampedBuffer(collections.OrderedDict): ... } ``` + + .. warning:: + Timestamps must be numbers. + + .. warning:: + Timestamps are not sorted by any order. """ def __new__(cls, args = None): + """Inheritance""" + return super(TimeStampedBuffer, cls).__new__(cls) - def __setitem__(self, key: float, value): - """Force key to be a number""" - if type(key) != int and type(key) != float: - raise KeyError('key must be a number') + def __setitem__(self, ts: TimeStamp, data: Data): + """Store data at given timestamp.""" + + assert(type(ts) == int or type(ts) == float) - super().__setitem__(key, value) + super().__setitem__(ts, data) def __str__(self): + """String representation""" + return json.dumps(self, default=vars) - def append(self, timestamped_buffer): + def append(self, timestamped_buffer: Self): """Append a timestamped buffer.""" for ts, value in timestamped_buffer.items(): self[ts] = value - def get_first(self): - """Easing access to first item. - **Returns:** data""" + def get_first(self) -> Tuple[TimeStamp, Data]: + """Easing access to first item.""" return list(self.items())[0] - def pop_first(self): - """Easing FIFO access mode. - **Returns:** data""" + def pop_first(self) -> Tuple[TimeStamp, Data]: + """Easing FIFO access mode.""" return self.popitem(last=False) - def pop_first_until(self, ts): - """Pop all item until a given timestamped value and return the last poped item. - **Returns:** data""" + def pop_first_until(self, ts: TimeStamp) -> Tuple[TimeStamp, Data]: + """Pop all item until a given timestamped value and return the last poped item.""" # get last timestamp before given timestamp earliest_ts = self.get_last_before(ts) @@ -68,21 +85,18 @@ class TimeStampedBuffer(collections.OrderedDict): return popep_ts, poped_value - def get_last(self): - """Easing access to last item. - **Returns:** data""" + def get_last(self) -> Tuple[TimeStamp, Data]: + """Easing access to last item.""" return list(self.items())[-1] - def pop_last(self): - """Easing FIFO access mode. - **Returns:** data""" + def pop_last(self) -> Tuple[TimeStamp, Data]: + """Easing FIFO access mode.""" return self.popitem(last=True) - def get_last_before(self, ts): - """Retreive last item timestamp before a given timestamp value. - **Returns:** data""" + def get_last_before(self, ts) -> Tuple[TimeStamp, Data] | None: + """Retreive last item timestamp before a given timestamp value.""" ts_list = list(self.keys()) last_before_index = bisect.bisect_left(ts_list, ts) - 1 @@ -101,12 +115,12 @@ class TimeStampedBuffer(collections.OrderedDict): try: with open(filepath, 'w', encoding='utf-8') as jsonfile: json.dump(self, jsonfile, ensure_ascii = False, default=vars) + except: raise RuntimeError(f'Can\' write {filepath}') - def as_dataframe(self, exclude=[], split={}): - """Convert buffer as pandas dataframe. Timestamped values must be stored as dictionary where each keys will be related to a column. - **Returns:** pandas.Dataframe""" + def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: + """Convert buffer as pandas dataframe. Timestamped values must be stored as dictionary where each keys will be related to a column.""" df = pandas.DataFrame.from_dict(self.values()) df.drop(exclude, inplace=True, axis=True) @@ -124,15 +138,13 @@ class TimeStampedBuffer(collections.OrderedDict): """Write buffer content into a csv file.""" try: - self.as_dataframe(exclude=exclude).to_csv(filepath, index=True) except: raise RuntimeError(f'Can\' write {filepath}') - def plot(self, names=[], colors=[], split={}, samples=None): - """Plot data into time chart. - **Returns:** list""" + def plot(self, names=[], colors=[], split={}, samples=None) -> list: + """Plot data into time chart.""" df = self.as_dataframe(split=split) legend_patches = [] diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 23364ae..678ddb8 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -14,41 +14,46 @@ import cv2 as cv class GazePosition(): """Define gaze position as a tuple of coordinates with accuracy.""" - value: tuple + value: tuple[int | float] = (0, 0) """Positon value.""" accuracy: float = 0. """Positon accuracy.""" - def __getitem__(self, key): - """Get a position coordinate.""" + def __getitem__(self, axis: int) -> int | float: + """Get position value along a particular axis.""" - return self.value[key] + return self.value[axis] - def __setitem__(self, key, coord): - """Set a position coordinate.""" + def __setitem__(self, axis, axis_value: int | float): + """Set position value along a particular axis.""" - self.value[key] = coord + self.value[axis] = axis_value + + def __iter__(self) -> iter: + """Iterate over each position value axis.""" - def __iter__(self): return iter(self.value) - def __len__(self): + def __len__(self) -> int: + """Number of axis in position value.""" + return len(self.value) def __array__(self): + """Cast as numpy array.""" return numpy.array(self.value) - def valid(self): - """Is the accuracy greater than 0 ? - **Returns:** bool""" + @property + def valid(self) -> bool: + """Is the accuracy greater than 0 ?""" return self.accuracy >= 0 def draw(self, frame, color=(0, 255, 255)): """Draw gaze position point and accuracy circle.""" - if self.valid(): + if self.valid: # Draw point at position cv.circle(frame, self.value, 2, color, -1) @@ -328,15 +333,13 @@ class VisualScanGenerator(): def __iter__(self): raise NotImplementedError('__iter__() method not implemented') - def steps(self): - """Get visual scan steps. - **Returns:** list""" + def steps(self) -> list: + """Get visual scan steps.""" return self.visual_scan_steps - def as_dataframe(self): - """Convert buffer as pandas dataframe. - **Returns:** pandas.Dataframe""" + def as_dataframe(self) -> pandas.DataFrame: + """Convert buffer as pandas dataframe.""" df = pandas.DataFrame.from_dict(self.visual_scan_steps) df.set_index('timestamp', inplace=True) diff --git a/src/argaze/TobiiGlassesPro2/README.md b/src/argaze/TobiiGlassesPro2/README.md index 881363d..f13490a 100644 --- a/src/argaze/TobiiGlassesPro2/README.md +++ b/src/argaze/TobiiGlassesPro2/README.md @@ -1,13 +1,14 @@ Class interface to handle Tobbi Glasses Pro 2 device. This work is greatly inspired by the David de Tommaso and Agnieszka Wykowska [TobiiGlassesPySuite](https://arxiv.org/pdf/1912.09142.pdf). -* [Tobii Glasses Pro 2 device user manual](https://www.tobiipro.com/siteassets/tobii-pro/user-manuals/tobii-pro-glasses-2-user-manual.pdf). +.. note:: + Read [Tobii Glasses Pro 2 device user manual](https://www.tobiipro.com/siteassets/tobii-pro/user-manuals/tobii-pro-glasses-2-user-manual.pdf). ## Utils -Print **A4_calibration_target.pdf** onto A4 paper sheet to get calibration target at expected dimension. +* Print **A4_calibration_target.pdf** onto A4 paper sheet to get calibration target at expected dimension. -Load **imu.json** file with argaze utils **tobii_imu_calibrate.py** script with -i option. This is an example file to illustrate how to load Inertial Measure Unit (IMU) calibration parameters. +* Load **imu.json** file with argaze utils **tobii_imu_calibrate.py** script with -i option. This is an example file to illustrate how to load Inertial Measure Unit (IMU) calibration parameters. ## Local network configuration diff --git a/src/argaze/TobiiGlassesPro2/TobiiController.py b/src/argaze/TobiiGlassesPro2/TobiiController.py index 21db697..31bd3d8 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiController.py +++ b/src/argaze/TobiiGlassesPro2/TobiiController.py @@ -58,18 +58,16 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): # STREAMING FEATURES - def enable_data_stream(self): - """Enable Tobii Glasses Pro 2 data streaming. - **Returns:** TobiiDataStream""" + def enable_data_stream(self) -> "TobiiData.TobiiDataStream": + """Enable Tobii Glasses Pro 2 data streaming.""" if self.__data_stream == None: self.__data_stream = TobiiData.TobiiDataStream(self) return self.__data_stream - def enable_video_stream(self): - """Enable Tobii Glasses Pro 2 video camera streaming. - **Returns:** TobiiVideoStream""" + def enable_video_stream(self) -> "TobiiVideo.TobiiVideoStream": + """Enable Tobii Glasses Pro 2 video camera streaming.""" if self.__video_stream == None: self.__video_stream = TobiiVideo.TobiiVideoStream(self) @@ -96,9 +94,12 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): # PROJECT FEATURES - def set_project(self, project_name = DEFAULT_PROJECT_NAME): + def set_project(self, project_name = DEFAULT_PROJECT_NAME) -> str: """Bind to a project or create one if it doesn't exist. - **Returns:** str""" + + * **Returns:** + - project id + """ project_id = self.get_project_id(project_name) @@ -121,9 +122,8 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): return project_id - def get_project_id(self, project_name): - """Get project id. - **Returns:** str""" + def get_project_id(self, project_name) -> str: + """Get project id.""" project_id = None projects = super().get_request('/api/projects') @@ -138,17 +138,19 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): return project_id - def get_projects(self): - """Get all projects id. - **Returns:** json str""" + def get_projects(self) -> str: + """Get all projects id.""" return super().get_request('/api/projects') # PARTICIPANT FEATURES - def set_participant(self, project_id, participant_name = DEFAULT_PARTICIPANT_NAME, participant_notes = ''): + def set_participant(self, project_id, participant_name = DEFAULT_PARTICIPANT_NAME, participant_notes = '') -> str: """Bind to a participant or create one if it doesn't exist. - **Returns:** str""" + + * **Returns:** + - participant id + """ participant_id = self.get_participant_id(participant_name) @@ -172,9 +174,8 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): return participant_id - def get_participant_id(self, participant_name): - """Get participant id. - **Returns:** str""" + def get_participant_id(self, participant_name) -> str: + """Get participant id.""" participant_id = None participants = super().get_request('/api/participants') @@ -190,9 +191,8 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): return participant_id - def get_participants(self): - """Get all participants id. - **Returns:** json str""" + def get_participants(self) -> str: + """Get all participants id.""" return super().get_request('/api/participants') @@ -226,9 +226,12 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']): return super().wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array) - def create_recording(self, participant_name, recording_notes = ''): + def create_recording(self, participant_name, recording_notes = '') -> str: """Create a new recording. - **Returns:** str""" + + * **Returns:** + - recording id + """ participant_id = self.get_participant_id(participant_name) @@ -252,23 +255,20 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): return json_data['rec_id'] - def start_recording(self, recording_id): - """Start recording on the Tobii interface's SD Card. - **Returns:** bool""" + def start_recording(self, recording_id) -> bool: + """Start recording on the Tobii interface's SD Card.""" super().post_request('/api/recordings/' + recording_id + '/start') return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording' - def stop_recording(self, recording_id): - """Stop recording on the Tobii interface's SD Card. - **Returns:** bool""" + def stop_recording(self, recording_id) -> bool: + """Stop recording on the Tobii interface's SD Card.""" super().post_request('/api/recordings/' + recording_id + '/stop') return self.__wait_for_recording_status(recording_id, ['done']) == "done" - def pause_recording(self, recording_id): - """Pause recording on the Tobii interface's SD Card. - **Returns:** bool""" + def pause_recording(self, recording_id) -> bool: + """Pause recording on the Tobii interface's SD Card.""" super().post_request('/api/recordings/' + recording_id + '/pause') return self.__wait_for_recording_status(recording_id, ['paused']) == "paused" @@ -276,15 +276,14 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): def __get_recording_status(self): return self.get_status()['sys_recording'] - def get_current_recording_id(self): - """Get current recording id. - **Returns:** str""" + def get_current_recording_id(self) -> str: + """Get current recording id.""" return self.__get_recording_status()['rec_id'] - def is_recording(self): - """Is it recording? - **Returns:** bool""" + @property + def recording(self) -> bool: + """Is it recording?""" rec_status = self.__get_recording_status() @@ -294,9 +293,8 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): return False - def get_recordings(self): - """Get all recordings id. - **Returns:** json str""" + def get_recordings(self) -> str: + """Get all recordings id.""" return super().get_request('/api/recordings') @@ -364,7 +362,7 @@ class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): json_data = super().post_request('/api/system/conf', data) def set_et_freq_100(self): - """May not be available. Check get_et_frequencies() first.""" + # May not be available. Check get_et_frequencies() first. data = {'sys_et_freq': 100} json_data = super().post_request('/api/system/conf', data) diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py index f1338a4..ac24fb3 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiData.py +++ b/src/argaze/TobiiGlassesPro2/TobiiData.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import Tuple from dataclasses import dataclass import threading import uuid @@ -116,7 +117,7 @@ class MarkerPosition(): value_3d: tuple((float, float, float)) value_2d: tuple((float, float)) -class __TobiiJsonDataParser(): +class TobiiJsonDataParser(): def parse_dir_sig(self, status, json_data): @@ -212,12 +213,12 @@ class TobiiDataSegment(): def __init__(self, segment_data_path, start_timestamp = 0, end_timestamp = None): """Load segment data from segment directory then parse and register each recorded dataflow as a TimeStampedBuffer member of the TobiiSegmentData instance.""" - self.__segment_data_path = segment_data_path + self.__path = segment_data_path self.__vts_offset = 0 self.__vts_ts = -1 - self.__json_data_parser = __TobiiJsonDataParser() + self.__json_data_parser = TobiiJsonDataParser() self.__ts_data_buffer_dict = { 'DirSig': DataStructures.TimeStampedBuffer(), @@ -277,7 +278,7 @@ class TobiiDataSegment(): return True # continue # start loading - with gzip.open(self.__segment_data_path) as f: + with gzip.open(self.__path) as f: for item in f: if not json.loads(item.decode('utf-8'), object_hook=decode): @@ -286,17 +287,16 @@ class TobiiDataSegment(): def __getitem__(self, key): return self.__ts_data_buffer_dict[key] - def keys(self): - """Get all registered data keys. - **Returns:** dict_keys""" + def keys(self) -> list[str]: + """Get all registered data keys.""" return list(self.__ts_data_buffer_dict.keys()) - def get_path(self): - """Get segment data path. - **Returns:** str""" + @property + def path(self) -> str: + """Get segment data path.""" - return self.__segment_data_path + return self.__path class TobiiDataStream(): """Capture Tobii Glasses Pro 2 data stream in separate thread.""" @@ -314,7 +314,7 @@ class TobiiDataStream(): # Data reception self.__data_thread = None self.__data_queue = queue.Queue(50) # TODO : set queue size according technical reason - self.__json_data_parser = __TobiiJsonDataParser() + self.__json_data_parser = TobiiJsonDataParser() self.__first_ts = 0 # Data capture @@ -434,9 +434,13 @@ class TobiiDataStream(): # unlock data queue access self.__read_lock.release() - def read(self): + def read(self) -> Tuple[int, dict]: """Read incoming timestamped data. - **Returns:** int, dict""" + + * **Returns:** + - timestamp + - dictionary of TimeStampedBuffer for each data. + """ # no data to read if self.__data_queue.empty(): @@ -525,8 +529,12 @@ class TobiiDataStream(): self.__data_ts_buffer[data_ts] = data_object - def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500): - """Capture a data stream into buffer.""" + def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500) -> int: + """Capture a data stream into buffer. + + * **Returns:** + - buffer size each 100 ms + """ # Prepare for data acquisition self.__data_stream_selector = data_object_type diff --git a/src/argaze/TobiiGlassesPro2/TobiiEntities.py b/src/argaze/TobiiGlassesPro2/TobiiEntities.py index f64328f..81c227f 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiEntities.py +++ b/src/argaze/TobiiGlassesPro2/TobiiEntities.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import TypeVar import datetime import json import os @@ -26,6 +27,9 @@ TOBII_SEGMENT_INFO_FILENAME = "segment.json" TOBII_SEGMENT_VIDEO_FILENAME = "fullstream.mp4" TOBII_SEGMENT_DATA_FILENAME = "livedata.json.gz" +Datetime = TypeVar('datetime', bound="datetime") +# Type definition for type annotation convenience + class TobiiSegment: """Handle Tobii Glasses Pro 2 segment info.""" @@ -33,14 +37,14 @@ class TobiiSegment: """Load segment info from segment directory. Optionnaly select a time range in microsecond.""" - self.__segment_id = os.path.basename(segment_path) - self.__segment_path = segment_path + self.__id = os.path.basename(segment_path) + self.__path = segment_path - with open(os.path.join(self.__segment_path, TOBII_SEGMENT_INFO_FILENAME)) as f: + with open(os.path.join(self.__path, TOBII_SEGMENT_INFO_FILENAME)) as f: try: item = json.load(f) except: - raise RuntimeError(f'JSON fails to load {self.__segment_path}/{TOBII_SEGMENT_INFO_FILENAME}') + raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}') self.__start_timestamp = start_timestamp self.__end_timestamp = min(end_timestamp, int(item["seg_length"] * 1e6)) if end_timestamp != None else int(item["seg_length"] * 1e6) @@ -53,57 +57,56 @@ class TobiiSegment: self.__start_date = datetime.datetime.strptime(item["seg_t_start"], TOBII_DATETIME_FORMAT) self.__stop_date = datetime.datetime.strptime(item["seg_t_stop"], TOBII_DATETIME_FORMAT) - def get_path(self): - """Get segment path. - **Returns:** str""" + @property + def path(self) -> str: + """Get segment path.""" - return self.__segment_path + return self.__path - def get_id(self): - """Get segment id. - **Returns:** str""" - return self.__segment_id + @property + def id(self) -> str: + """Get segment id.""" + return self.__id - def get_start_timestamp(self): - """Get the timestamp where the segment loading starts. - **Returns:** int""" + @property + def start_timestamp(self) -> int: + """Get the timestamp where the segment loading starts.""" return self.__start_timestamp - def get_end_timestamp(self): - """Get the timestamp where the segment loading ends. - **Returns:** int""" + @property + def end_timestamp(self) -> int: + """Get the timestamp where the segment loading ends.""" return self.__end_timestamp - def get_start_date(self): - """Get the date when the segment has started. - **Returns:** datetime""" + @property + def start_date(self) -> Datetime: + """Get the date when the segment has started.""" return self.__start_date - def get_stop_date(self): - """Get the date when the segment has stopped. - **Returns:** datetime""" + @property + def stop_date(self) -> Datetime: + """Get the date when the segment has stopped.""" return self.__stop_date - def is_calibrated(self): - """Is the segment has been calibrated? - **Returns:** bool""" + @property + def calibrated(self) -> bool: + """Is the segment has been calibrated?""" + return self.__calibrated - def load_data(self): - """Load recorded data stream. - **Returns:** TobiiData.TobiiDataSegment""" + def load_data(self) -> "TobiiData.TobiiDataSegment": + """Load recorded data stream.""" - return TobiiData.TobiiDataSegment(os.path.join(self.__segment_path, TOBII_SEGMENT_DATA_FILENAME), self.__start_timestamp, self.__end_timestamp) + return TobiiData.TobiiDataSegment(os.path.join(self.__path, TOBII_SEGMENT_DATA_FILENAME), self.__start_timestamp, self.__end_timestamp) - def load_video(self): - """Load recorded video stream. - **Returns:** TobiiData.TobiiVideoSegment""" + def load_video(self) -> "TobiiVideo.TobiiVideoSegment": + """Load recorded video stream.""" - return TobiiVideo.TobiiVideoSegment(os.path.join(self.__segment_path, TOBII_SEGMENT_VIDEO_FILENAME), self.__start_timestamp, self.__end_timestamp) + return TobiiVideo.TobiiVideoSegment(os.path.join(self.__path, TOBII_SEGMENT_VIDEO_FILENAME), self.__start_timestamp, self.__end_timestamp) class TobiiRecording: """Handle Tobii Glasses Pro 2 recording info and segments.""" @@ -111,89 +114,88 @@ class TobiiRecording: def __init__(self, recording_path): """Load recording info from recording directory.""" - self.__recording_id = os.path.basename(recording_path) - self.__recording_path = recording_path + self.__id = os.path.basename(recording_path) + self.__path = recording_path - self.__project_path = os.path.dirname(os.path.dirname(os.path.abspath(self.__recording_path))) + self.__path = os.path.dirname(os.path.dirname(os.path.abspath(self.__path))) - with open(os.path.join(self.__recording_path, TOBII_RECORD_FILENAME)) as f: + with open(os.path.join(self.__path, TOBII_RECORD_FILENAME)) as f: try: item = json.load(f) except: - raise RuntimeError(f'JSON fails to load {self.__recording_path}/{TOBII_RECORD_FILENAME}') + raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_RECORD_FILENAME}') - self.__recording_name = item["rec_info"]["Name"] - self.__recording_created = datetime.datetime.strptime(item["rec_created"], TOBII_DATETIME_FORMAT) + self.__name = item["rec_info"]["Name"] + self.__creation_date = datetime.datetime.strptime(item["rec_created"], TOBII_DATETIME_FORMAT) - self.__recording_length = int(item["rec_length"]) - self.__recording_et_samples = int(item["rec_et_samples"]) - self.__recording_et_valid_samples = int(item["rec_et_valid_samples"]) - - self.__recording_segments = int(item["rec_segments"]) + self.__length = int(item["rec_length"]) + self.__et_samples = int(item["rec_et_samples"]) + self.__et_samples_valid = int(item["rec_et_valid_samples"]) - self.__project = TobiiProject(self.__project_path) - self.__participant = TobiiParticipant(self.__recording_path) + self.__segments = int(item["rec_segments"]) - def get_path(self): - """Get recording path. - **Returns:** str""" + self.__project = TobiiProject(self.__path) + self.__participant = TobiiParticipant(self.__path) - return self.__recording_path + @property + def path(self) -> str: + """Get recording path.""" - def get_id(self): - """Get recording id. - **Returns:** str""" + return self.__path - return self.__recording_id + @property + def id(self) -> str: + """Get recording id.""" + return self.__id - def get_name(self): - """Get recording name. - **Returns:** str""" + @property + def name(self) -> str: + """Get recording name.""" - return self.__recording_name + return self.__name - def get_creation_date(self): - """Get date when the recording has been done. - **Returns:** datetime""" + @property + def creation_date(self) -> Datetime: + """Get date when the recording has been done.""" - return self.__recording_created + return self.__creation_date - def get_length(self): - """Get record duration in second. - **Returns:** int""" + @property + def length(self): + """Get record duration in second.""" - return self.__recording_length + return self.__length - def get_et_samples(self): - """Get numbers of recorded eye tracker samples. - **Returns:** int""" + @property + def eyetracker_samples(self) -> int: + """Get numbers of recorded eye tracker samples.""" - return self.__recording_et_samples + return self.__et_samples - def get_et_valid_samples(self): - """Get numbers of recorded eye tracker valid samples. - **Returns:** int""" + @property + def eyetracker_samples_valid(self) -> int: + """Get numbers of recorded eye tracker valid samples.""" - return self.__recording_et_valid_samples + return self.__et_samples_valid - def get_project(self): - """Get project to which it belongs. - **Returns:** TobiiProject""" + @property + def project(self) -> "TobiiProject": + """Get project to which it belongs.""" return self.__project - def get_participant(self): - """Get participant to which it belongs. - **Returns:** TobiiParticipant""" + @property + def participant(self) -> "TobiiParticipant": + """Get participant to which it belongs.""" return self.__participant - def get_segments(self): - """Get all recorded segments. - **Returns:** list of TobiiSegment""" + @property + def segments(self) -> list["TobiiSegment"]: + """Get all recorded segments.""" all_segments = [] - segments_path = os.path.join(self.__recording_path, TOBII_SEGMENTS_DIRNAME) + segments_path = os.path.join(self.__path, TOBII_SEGMENTS_DIRNAME) for item in os.listdir(segments_path): segment_path = os.path.join(segments_path, item) @@ -208,34 +210,33 @@ class TobiiParticipant: def __init__(self, participant_path): """Load participant data from path""" - self.__participant_id = os.path.basename(participant_path) - self.__participant_path = participant_path + self.__id = os.path.basename(participant_path) + self.__path = participant_path - with open(os.path.join(self.__participant_path, TOBII_PARTICIPANT_FILENAME)) as f: + with open(os.path.join(self.__path, TOBII_PARTICIPANT_FILENAME)) as f: try: item = json.load(f) except: raise RuntimeError(f'JSON fails to load {source_dir}/{TOBII_PARTICIPANT_FILENAME}') - self.__participant_name = item["pa_info"]["Name"] + self.__name = item["pa_info"]["Name"] - def get_path(self): - """Get participant path. - **Returns:** str""" + @property + def path(self) -> str: + """Get participant path.""" - return self.__participant_path + return self.__path - def get_id(self): - """Get participant id. - **Returns:** str""" + @property + def id(self) -> str: + """Get participant id.""" + return self.__id - return self.__participant_id + @property + def name(self) -> str: + """Get participant name.""" - def get_name(self): - """Get participant name. - **Returns:** str""" - - return self.__participant_name + return self.__name class TobiiProject: """Handle Tobii Glasses Pro 2 project data.""" @@ -243,52 +244,51 @@ class TobiiProject: def __init__(self, project_path): """Load project data from projects directory and project id.""" - self.__project_id = os.path.basename(project_path) - self.__project_path = project_path + self.__id = os.path.basename(project_path) + self.__path = project_path - with open(os.path.join(self.__project_path, TOBII_PROJECT_FILENAME)) as f: + with open(os.path.join(self.__path, TOBII_PROJECT_FILENAME)) as f: try: item = json.load(f) except: - raise RuntimeError(f'JSON fails to load {self.__project_path}/{TOBII_PROJECT_FILENAME}') + raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_PROJECT_FILENAME}') - self.__project_created = datetime.datetime.strptime(item["pr_created"], TOBII_DATETIME_FORMAT) + self.__creation_date = datetime.datetime.strptime(item["pr_created"], TOBII_DATETIME_FORMAT) try: - self.__project_name = item["pr_info"]["Name"] + self.__name = item["pr_info"]["Name"] except: - self.__project_name = None - - def get_path(self): - """Get project path. - **Returns:** str""" + self.__name = None - return self.__project_path + @property + def path(self) -> str: + """Get project path.""" - def get_creation_date(self): - """Get date when the project has been created. - **Returns:** datetime""" + return self.__path - return self.__project_created + @property + def id(self) -> str: + """Get project id.""" + return self.__id - def get_id(self): - """Get project id. - **Returns:** str""" + @property + def name(self) -> str: + """Get project name.""" - return self.__project_id + return self.__name - def get_name(self): - """Get project name. - **Returns:** str""" + @property + def creation_date(self) -> Datetime: + """Get date when the project has been created.""" - return self.__project_name + return self.__creation_date - def get_participants(self): - """Get all participants. - **Returns:** list of TobiiParticipant""" + @property + def participants(self) -> list["TobiiParticipant"]: + """Get all participants.""" all_participants = [] - participants_path = os.path.join(self.__project_path, TOBII_PARTICIPANTS_DIRNAME) + participants_path = os.path.join(self.__path, TOBII_PARTICIPANTS_DIRNAME) for item in os.listdir(participants_path): participant_path = os.path.join(participants_path, item) @@ -297,12 +297,11 @@ class TobiiProject: return all_participants - def get_recordings(self): - """Get all recordings. - **Returns:** list of TobiiRecording""" + def recordings(self) -> list["TobiiRecording"]: + """Get all recordings.""" all_recordings = [] - recordings_path = os.path.join(self.__project_path, TOBII_RECORDINGS_DIRNAME) + recordings_path = os.path.join(self.__path, TOBII_RECORDINGS_DIRNAME) for item in os.listdir(recordings_path): recording_path = os.path.join(recordings_path, item) @@ -317,20 +316,20 @@ class TobiiDrive: def __init__(self, drive_path): """Load drive data from drive directory path.""" - self.__drive_path = drive_path + self.__path = drive_path - def get_path(self): - """Get drive path. - **Returns:** str""" + @property + def path(self) -> str: + """Get drive path.""" - return self.__drive_path + return self.__path - def get_projects(self): - """Get all projects. - **Returns:** list of TobiiProject""" + @property + def projects(self) -> list["TobiiProject"]: + """Get all projects.""" all_projects = [] - projects_path = os.path.join(self.__drive_path, TOBII_PROJECTS_DIRNAME) + projects_path = os.path.join(self.__path, TOBII_PROJECTS_DIRNAME) for item in os.listdir(projects_path): project_path = os.path.join(projects_path, item) diff --git a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py b/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py index e41055f..e37a1de 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py +++ b/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import Tuple import json import time import math @@ -63,7 +64,7 @@ class TobiiInertialMeasureUnit(): json.dump(calibration_data, calibration_file, ensure_ascii=False, indent=4) - def calibrate_gyroscope_offset(self, gyroscope_ts_buffer): + def calibrate_gyroscope_offset(self, gyroscope_ts_buffer) -> numpy.array: """Calibrate gyroscope offset from a timestamped gyroscope buffer. **Returns:** numpy.array""" @@ -82,15 +83,14 @@ class TobiiInertialMeasureUnit(): return self.__gyroscope_offset - def get_gyroscope_offset(self): - """Get gyroscope offset. - **Returns:** numpy.array""" + @property + def gyroscope_offset(self) -> numpy.array: + """Get gyroscope offset.""" return self.__gyroscope_offset - def apply_gyroscope_offset(self, gyroscope_data_object): - """Remove gyroscope offset to given gyroscope data. - **Returns:** TobiiData.Gyroscope""" + def apply_gyroscope_offset(self, gyroscope_data_object: TobiiData.Gyroscope) -> "TobiiData.Gyroscope": + """Remove gyroscope offset to given gyroscope data.""" return TobiiData.Gyroscope(gyroscope_data_object.value - self.__gyroscope_offset) @@ -123,9 +123,9 @@ class TobiiInertialMeasureUnit(): self.__last_gyroscope_ts = gyroscope_data_ts self.__last_gyroscope = current_gyroscope - def get_rotation(self): - """Return current rotation value (euler angles in degree). - **Returns:** numpy.array""" + @property + def rotation(self) -> numpy.array: + """Return current rotation value (euler angles in degree).""" return self.__rotation @@ -157,15 +157,14 @@ class TobiiInertialMeasureUnit(): # Store results for the given axis self.__accelerometer_coefficients[axis] = numpy.array(optimal_coefficients) - def get_accelerometer_coefficients(self): - """Return accelerometer coefficients. - **Returns:** numpy.array""" + @property + def accelerometer_coefficients(self) -> numpy.array: + """Return accelerometer coefficients.""" return self.__accelerometer_coefficients - def apply_accelerometer_coefficients(self, accelerometer_data_object): - """Add accelerometer offset to given accelerometer data. - **Returns:** TobiiData.Accelerometer""" + def apply_accelerometer_coefficients(self, accelerometer_data_object: TobiiData.Accelerometer) -> "TobiiData.Accelerometer": + """Add accelerometer offset to given accelerometer data.""" x = self._accelerometer_linear_fit(accelerometer_data_object.value[0], *self.__accelerometer_coefficients[0]) y = self._accelerometer_linear_fit(accelerometer_data_object.value[1], *self.__accelerometer_coefficients[1]) @@ -183,6 +182,7 @@ class TobiiInertialMeasureUnit(): def update_translation(self, accelerometer_data_ts, accelerometer_data_object): """Integrate timestamped accelerometer values to update translation.""" + print('> update_translation: accelerometer_data_ts=', accelerometer_data_ts) # Convert m/s2 into cm/ms2 @@ -221,9 +221,14 @@ class TobiiInertialMeasureUnit(): # print('no valid head plumb') - def get_translation(self): + @property + def translation(self) -> Tuple[numpy.array, numpy.array]: """Return current translation speed and translation values. - **Returns:** numpy.array, numpy array""" + + * **Returns:** + - translation speed vector + - translation vector + """ return self.__translation_speed, self.__translation @@ -236,14 +241,13 @@ class TobiiInertialMeasureUnit(): # Check plumb length assert(math.isclose(numpy.linalg.norm(self.__plumb), math.fabs(EARTH_GRAVITY), abs_tol=1e-3)) - def get_plumb(self): - """Return plumb vector. - **Returns:** numpy.array""" + @property + def plumb(self) -> numpy.array: + """Return plumb vector.""" return self.__plumb - def apply_plumb(self, accelerometer_data_object): - """Remove gravity along plumb vector to given accelerometer data. - **Returns:** TobiiData.Accelerometer""" + def apply_plumb(self, accelerometer_data_object: TobiiData.Accelerometer) -> "TobiiData.Accelerometer": + """Remove gravity along plumb vector to given accelerometer data.""" return TobiiData.Accelerometer(accelerometer_data_object.value - self.__plumb) diff --git a/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py b/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py index 3d7d1e1..1576975 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py +++ b/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py @@ -1,3 +1,4 @@ +from typing import TypeVar, Any import logging import sys import socket @@ -28,6 +29,9 @@ except ImportError: socket.IPPROTO_IPV6 = 41 +Socket = TypeVar('socket', bound="socket") +# Type definition for type annotation convenience + class TobiiNetworkInterface(): """Handle network connection to Tobii glasses Pro 2 device. It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py).""" @@ -63,9 +67,8 @@ class TobiiNetworkInterface(): self.__peer = (self.address, self.udpport) - def make_socket(self): - """Create a socket to enable network communication. - **Returns:** socket""" + def make_socket(self) -> Socket: + """Create a socket to enable network communication.""" iptype = socket.AF_INET @@ -137,9 +140,8 @@ class TobiiNetworkInterface(): return (None, None) - def get_request(self, api_action): - """Send a GET request and get data back. - **Returns:** json str""" + def get_request(self, api_action) -> str: + """Send a GET request and get data back.""" url = self.base_url + api_action res = urlopen(url).read() @@ -151,9 +153,8 @@ class TobiiNetworkInterface(): return data - def post_request(self, api_action, data=None, wait_for_response=True): - """Send a POST request and get result back. - **Returns:** json str""" + def post_request(self, api_action, data=None, wait_for_response=True) -> str: + """Send a POST request and get result back.""" url = self.base_url + api_action req = Request(url) @@ -183,9 +184,8 @@ class TobiiNetworkInterface(): res = socket.sendto(msg.encode('utf-8'), self.__peer) - def grab_data(self, socket): - """Read incoming socket data. - **Returns:** bytes""" + def grab_data(self, socket) -> bytes: + """Read incoming socket data.""" try: data, address = socket.recvfrom(1024) @@ -195,9 +195,8 @@ class TobiiNetworkInterface(): logging.error("A timeout occurred while receiving data") - def wait_for_status(self, api_action, key, values, timeout = None): - """Wait until a status matches given values. - **Returns:** status value""" + def wait_for_status(self, api_action, key, values, timeout = None) -> Any: + """Wait until a status matches given values.""" url = self.base_url + api_action running = True diff --git a/src/argaze/TobiiGlassesPro2/TobiiVideo.py b/src/argaze/TobiiGlassesPro2/TobiiVideo.py index 1e0899e..cf1d995 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiVideo.py +++ b/src/argaze/TobiiGlassesPro2/TobiiVideo.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import TypeVar, Tuple from dataclasses import dataclass, field import threading import uuid @@ -13,6 +14,12 @@ import cv2 as cv import av import numpy +Self = TypeVar('Self', bound="TobiiVideoFrame") +# Type definition for type annotation convenience + +Stream = TypeVar('av.stream', bound="av.stream") +# Type definition for type annotation convenience + @dataclass class TobiiVideoFrame(): """Define tobii video frame""" @@ -31,9 +38,8 @@ class TobiiVideoFrame(): self.height, self.width = self.matrix.shape[:2] - def copy(self): - """Copy tobii video frame. - **Returns:** TobiiVideoFrame""" + def copy(self) -> Self: + """Copy tobii video frame.""" return TobiiVideoFrame(self.matrix.copy()) @@ -43,13 +49,13 @@ class TobiiVideoSegment(): def __init__(self, segment_video_path, start_timestamp:int = 0, end_timestamp:int = None): """Load segment video from segment directory""" - self.__segment_video_path = segment_video_path + self.__path = segment_video_path - self.__container = av.open(self.__segment_video_path) + self.__container = av.open(self.__path) self.__stream = self.__container.streams.video[0] - self.__width = int(cv.VideoCapture(self.__segment_video_path).get(cv.CAP_PROP_FRAME_WIDTH)) - self.__height = int(cv.VideoCapture(self.__segment_video_path).get(cv.CAP_PROP_FRAME_HEIGHT)) + self.__width = int(cv.VideoCapture(self.__path).get(cv.CAP_PROP_FRAME_WIDTH)) + self.__height = int(cv.VideoCapture(self.__path).get(cv.CAP_PROP_FRAME_HEIGHT)) self.__start_timestamp = start_timestamp self.__end_timestamp = end_timestamp @@ -57,42 +63,41 @@ class TobiiVideoSegment(): # position at the given start time self.__container.seek(self.__start_timestamp) - def get_path(self): - """Get video segment path. - **Returns:** str""" + @property + def path(self) -> str: + """Get video segment path.""" - return self.__segment_video_path + return self.__path - def get_duration(self): - """Duration in microsecond. - **Returns:** int""" + @property + def duration(self) -> int: + """Duration in microsecond.""" if self.__end_timestamp == None: return int((self.__stream.duration * self.__stream.time_base) * 1e6) - self.__start_timestamp else: return self.__end_timestamp - self.__start_timestamp - def get_width(self): - """Video width dimension. - **Returns:** int""" + @property + def width(self) -> int: + """Video width dimension.""" return self.__width - def get_height(self): - """Video height dimension. - **Returns:** int""" + @property + def height(self) -> int: + """Video height dimension.""" return self.__height - def get_stream(self): - """Video stream. - **Returns:** av stream""" + @property + def stream(self) -> Stream: + """Video stream.""" return self.__stream - def get_frame(self, i): - """Access to a frame. - **Returns:** int, TobiiVideoFrame""" + def get_frame(self, i) -> Tuple[int, "TobiiVideoFrame"]: + """Access to a frame.""" if i < 0: ValueError('Frame index must be a positive integer.') @@ -116,9 +121,8 @@ class TobiiVideoSegment(): # return micro second timestamp and frame data return video_ts, TobiiVideoFrame(frame.to_ndarray(format='bgr24')) - def frames(self): - """Access to frame iterator. - **Returns:** int, TobiiVideoFrame""" + def frames(self) -> Tuple[int, "TobiiVideoFrame"]: + """Access to frame iterator.""" return self.__iter__() @@ -223,9 +227,8 @@ class TobiiVideoStream(threading.Thread): # unlock frame access self.__read_lock.release() - def read(self): - """Read incoming video frames. - **Returns:** int, TobiiVideoFrame""" + def read(self) -> Tuple[int, "TobiiVideoFrame"]: + """Read incoming video frames.""" # if the video acquisition thread have been stopped or isn't started if self.__stop_event.isSet() or self.__frame_tuple == None: @@ -249,8 +252,8 @@ class TobiiVideoOutput(): def __init__(self, output_video_path: str, referent_stream: av.stream.Stream): """Create a video file""" - self.__output_video_path = output_video_path - self.__container = av.open(self.__output_video_path, 'w') + self.__path = output_video_path + self.__container = av.open(self.__path, 'w') self.__stream = self.__container.add_stream(\ referent_stream.codec_context.name, \ width=referent_stream.codec_context.width, \ @@ -260,11 +263,11 @@ class TobiiVideoOutput(): pix_fmt=referent_stream.codec_context.pix_fmt, \ bit_rate=referent_stream.codec_context.bit_rate) - def get_path(self): - """Get video file path. - **Returns:** str""" + @property + def path(self) -> str: + """Get video file path.""" - return self.__output_video_path + return self.__path def write(self, frame): """Write a frame into the output video file""" diff --git a/src/argaze/TobiiGlassesPro2/__init__.py b/src/argaze/TobiiGlassesPro2/__init__.py index 754e5a1..50fb742 100644 --- a/src/argaze/TobiiGlassesPro2/__init__.py +++ b/src/argaze/TobiiGlassesPro2/__init__.py @@ -2,4 +2,4 @@ .. include:: README.md """ __docformat__ = "restructuredtext" -__all__ = ['TobiiEntities', 'TobiiNetworkInterface', 'TobiiController', 'TobiiData', 'TobiiVideo', 'TobiiSpecifications', 'TobiiInertialMeasureUnit']
\ No newline at end of file +__all__ = ['TobiiEntities', 'TobiiController', 'TobiiNetworkInterface', 'TobiiData', 'TobiiVideo', 'TobiiInertialMeasureUnit', 'TobiiSpecifications',]
\ No newline at end of file diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py index a57d280..57ccd2c 100644 --- a/src/argaze/__init__.py +++ b/src/argaze/__init__.py @@ -2,4 +2,4 @@ .. include:: ../../README.md """ __docformat__ = "restructuredtext" -__all__ = ['utils','GazeFeatures','AreaOfInterest','TobiiGlassesPro2','ArUcoMarkers','DataStructures']
\ No newline at end of file +__all__ = ['utils','ArUcoMarkers','AreaOfInterest','GazeFeatures','DataStructures','TobiiGlassesPro2']
\ No newline at end of file diff --git a/src/argaze/utils/README.md b/src/argaze/utils/README.md index 58a892d..fd1c332 100644 --- a/src/argaze/utils/README.md +++ b/src/argaze/utils/README.md @@ -1,39 +1,47 @@ Collection of command-line high level features based on ArGaze toolkit. -Use -help to get arguments documentation. - ## Ready-to-use commands -Consider that all inline commands below needs to be executed into ArGaze root folder. +.. note:: + *Consider that all inline commands below needs to be executed into ArGaze root folder.* + +.. note:: + *Use -h option to get command arguments documentation.* -- Ask for command help (replace UTILS_SCRIPT) +### ArUco factory + +- Export all markers from DICT_APRILTAG_16h5 dictionary at 300 dpi into an export/markers folder: ``` -python ./src/argaze/utils/UTILS_SCRIPT.py -h +python ./src/argaze/utils/aruco_markers_export.py -o export/markers -d DICT_APRILTAG_16h5 ``` -- Export 50 4x4 markers at 300 dpi into an export/markers folder: +- Export a 7 columns and 5 rows calibration board made of 5cm squares with 3cm markers from DICT_APRILTAG_16h5 dictionary at 50 dpi into an export folder: ``` -python ./src/argaze/utils/aruco_markers_export.py -o export/markers +python ./src/argaze/utils/aruco_calibration_board_export.py 7 5 5 3 -o export -d DICT_APRILTAG_16h5 ``` -- Export a 7 columns and 5 rows calibration board with 5cm squares and 3cm markers inside at 50 dpi into an export folder: +### Tobii calibration + +- Calibrate Tobii Glasses Pro 2 camera (-t IP_ADDRESS) using a 7 columns and 5 rows calibration board made of 5cm squares with 3cm markers from DICT_APRILTAG_16h5 dictionary. Then, export its optical parameters into an tobii_camera.json file: ``` -python ./src/argaze/utils/aruco_calibration_board_export.py 7 5 5 3 -o export +python ./src/argaze/utils/tobii_camera_calibrate.py 7 5 5 3 -t IP_ADDRESS -d DICT_APRILTAG_16h5 -o export/tobii_camera.json ``` -- Calibrate Tobii Glasses Pro 2 camera (-t IP_ADDRESS) using a 7 columns and 5 rows calibration board with 5cm squares and 3cm markers inside. Then, export its optical parameters into an tobii_camera.json file: +- Calibrate Tobii Glasses Pro 2 inertial measure unit (-t IP_ADDRESS) then, export calibration parameters into an imu.json file: ``` -python ./src/argaze/utils/tobii_camera_calibrate.py 7 5 5 3 -t IP_ADDRESS -o export/tobii_camera.json +python ./src/argaze/utils/tobii_imu_calibrate.py -t IP_ADDRESS -o export/imu.json ``` -- Display Tobii Glasses Pro 2 camera video stream (-t IP_ADDRESS) with a live gaze pointer: +### Tobii session + +- Display Tobii Glasses Pro 2 camera video stream (-t IP_ADDRESS) with a live gaze pointer. Loading calibration file to display inertial sensors data: ``` -python ./src/argaze/utils/tobii_stream_display.py -t IP_ADDRESS +python ./src/argaze/utils/tobii_stream_display.py -t IP_ADDRESS -i export/imu.json ``` - Record a Tobii Glasses Pro 2 'myProject' session for a 'myUser' participant on Tobii interface's SD card (-t IP_ADDRESS): @@ -42,6 +50,8 @@ python ./src/argaze/utils/tobii_stream_display.py -t IP_ADDRESS python ./src/argaze/utils/tobii_segment_record.py -t IP_ADDRESS -p myProject -u myUser ``` +### Tobii drive + - Explore Tobii Glasses Pro 2 interface's SD Card (-d DRIVE_PATH, -p PROJECT_PATH, -r RECORDING_PATH, -s SEGMENT_PATH): ``` @@ -60,6 +70,8 @@ python ./src/argaze/utils/tobii_sdcard_explore.py -r RECORDING_PATH python ./src/argaze/utils/tobii_sdcard_explore.py -s SEGMENT_PATH ``` +### Tobii post-processing + - Replay a time range selection (-r IN OUT) Tobii Glasses Pro 2 session (-s SEGMENT_PATH) synchronizing video and some data together: ``` @@ -72,20 +84,15 @@ python ./src/argaze/utils/tobii_segment_display.py -s SEGMENT_PATH -r IN OUT python ./src/argaze/utils/tobii_segment_gaze_movements_export.py -s SEGMENT_PATH -r IN OUT ``` -- Track ArUco markers into a Tobii camera video segment (-s SEGMENT_PATH) into a time range selection (-r IN OUT). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame. Export aoi video and data as a aruco_aoi.csv, aruco_aoi.mp4 files: -``` -python ./src/argaze/utils/tobii_segment_aruco_aoi_export.py -s SEGMENT_PATH -c export/tobii_camera.json -r IN OUT -ms 5 -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}' -``` +### Tobii with ArUco -- Track ArUco markers into Tobii camera video stream (-t IP_ADDRESS). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame. Then, detect if Tobii gaze point is inside any AOI and send the look at pointer over Ivy default bus: +- Track ArUco markers into Tobii camera video stream (-t IP_ADDRESS). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame: ``` -python ./src/argaze/utils/tobii_stream_aruco_aoi_ivy_controller.py -t IP_ADDRESS -c export/tobii_camera.json -ms 5 -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}' +python ./src/argaze/utils/tobii_stream_aruco_aoi_display.py -t IP_ADDRESS -c export/tobii_camera.json -ms 5 -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}' ``` -- Define AOI scene from a ArUco marker (-a AOI_SCENE) and bind to Ivy default bus to receive live look at pointer data.: - +- Track ArUco markers into a Tobii camera video segment (-s SEGMENT_PATH) into a time range selection (-r IN OUT). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame. Export aoi video and data as a aruco_aoi.csv, aruco_aoi.mp4 files: ``` -python ./src/argaze/utils/tobii_stream_aruco_aoi_ivy_application.py.py -a AOI_SCENE -i MARKERS_ID +python ./src/argaze/utils/tobii_segment_aruco_aoi_export.py -s SEGMENT_PATH -c export/tobii_camera.json -r IN OUT -ms 5 -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}' ``` - diff --git a/src/argaze/utils/aruco_calibration_board_export.py b/src/argaze/utils/aruco_calibration_board_export.py index 6d925bd..42d75c1 100644 --- a/src/argaze/utils/aruco_calibration_board_export.py +++ b/src/argaze/utils/aruco_calibration_board_export.py @@ -3,31 +3,31 @@ import argparse import os -from argaze.ArUcoMarkers import ArUcoBoard +from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoBoard def main(): """Generates ArUco board to calibrate a camera.""" - # manage arguments + # Manage arguments parser = argparse.ArgumentParser(description=main.__doc__) parser.add_argument('columns', metavar='COLS_NUMBER', type=int, default=7, help='number of columns') parser.add_argument('rows', metavar='ROWS_NUMBER', type=int, default=5, help='number of rows') parser.add_argument('square_size', metavar='SQUARE_SIZE', type=int, default=5, help='square size (cm)') parser.add_argument('marker_size', metavar='MARKER_SIZE', type=int, default=3, help='marker size (cm)') parser.add_argument('-o', '--output', metavar='OUT', type=str, default='.', help='destination path') - parser.add_argument('-d', '--dictionary', metavar='DICT', type=str, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL,DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)') + parser.add_argument('-d', '--dictionary', metavar='DICT', type=ArUcoMarkersDictionary.ArUcoMarkersDictionary, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL,DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)') parser.add_argument('-r', '--resolution', metavar='RES', type=int, default=50, help='picture resolution in dpi') args = parser.parse_args() - # manage destination folder + # Manage destination folder if not os.path.exists(args.output): os.makedirs(args.output) print(f'{args.output} folder created') - # create aruco board + # Create aruco board aruco_board = ArUcoBoard.ArUcoBoard(args.dictionary, args.columns, args.rows, args.square_size, args.marker_size) - # export aruco board + # Export aruco board aruco_board.export(args.output, args.resolution) if __name__ == '__main__': diff --git a/src/argaze/utils/tobii_camera_calibrate.py b/src/argaze/utils/tobii_camera_calibrate.py index 61fc56c..2994c36 100644 --- a/src/argaze/utils/tobii_camera_calibrate.py +++ b/src/argaze/utils/tobii_camera_calibrate.py @@ -5,7 +5,7 @@ import os import time from argaze.TobiiGlassesPro2 import TobiiController, TobiiVideo -from argaze.ArUcoMarkers import ArUcoBoard, ArUcoTracker, ArUcoCamera +from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoBoard, ArUcoTracker, ArUcoCamera import cv2 as cv @@ -16,7 +16,7 @@ def main(): - Export and print a calibration board using - Place the calibration board in order to view it entirely on screen and move the camera in many configurations (orientation and distance) : the script will automatically take pictures. Do this step with a good lighting and a clear background. - Once enough pictures have been captured (~20), press Esc key then, wait for the camera calibration processing. - - Finally, check rms parameter: it should be between 0. and 1. if the calibration suceeded (lower is better). + - Finally, check rms parameter: it should be between 0. and 1. if the calibration succeeded (lower is better). ### Reference: - [Camera calibration using ArUco marker tutorial](https://automaticaddison.com/how-to-perform-camera-calibration-using-opencv/) @@ -28,14 +28,25 @@ def main(): parser.add_argument('rows', metavar='ROWS_NUMBER', type=int, default=5, help='number of rows') parser.add_argument('square_size', metavar='SQUARE_SIZE', type=float, default=5, help='square size (cm)') parser.add_argument('marker_size', metavar='MARKER_SIZE', type=float, default=3, help='marker size (cm)') - parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default='192.168.1.10', help='tobii glasses ip') + parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip') parser.add_argument('-o', '--output', metavar='OUT', type=str, default='camera.json', help='destination filepath') - parser.add_argument('-d', '--dictionary', metavar='DICT', type=str, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL,DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)') + parser.add_argument('-d', '--dictionary', metavar='DICT', type=ArUcoMarkersDictionary.ArUcoMarkersDictionary, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL, DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)') args = parser.parse_args() - # Create tobii controller - tobii_controller = TobiiController.TobiiController(args.tobii_ip, 'myProject', 'mySelf') + # Create tobii controller (with auto discovery network process if no ip argument is provided) + print("Looking for a Tobii Glasses Pro 2 device ...") + try: + + tobii_controller = TobiiController.TobiiController(args.tobii_ip) + print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') + + except ConnectionError as e: + + print(e) + exit() + + # Setup camera at 25 fps to work on Full HD video stream tobii_controller.set_video_freq_25() # Enable tobii video stream @@ -56,10 +67,10 @@ def main(): print("Camera calibration starts") print("Waiting for calibration board...") - expected_markers_number = aruco_board.get_markers_number() - expected_corners_number = aruco_board.get_corners_number() + expected_markers_number = aruco_board.markers_number + expected_corners_number = aruco_board.corners_number - # capture loop + # Capture loop try: while tobii_video_stream.is_alive(): @@ -71,20 +82,20 @@ def main(): aruco_tracker.track_board(video_frame.matrix, aruco_board, expected_markers_number) # draw only markers - aruco_tracker.draw(video_frame.matrix) + aruco_tracker.draw_tracked_markers(video_frame.matrix) # draw current calibration data count - cv.putText(video_frame.matrix, f'Capture: {aruco_camera.get_calibration_data_count()}', (50, 50), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv.LINE_AA) + cv.putText(video_frame.matrix, f'Capture: {aruco_camera.calibration_data_count}', (50, 50), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv.LINE_AA) cv.imshow('Tobii Camera Calibration', video_frame.matrix) # if all board corners are detected - if aruco_tracker.get_board_corners_number() == expected_corners_number: + if aruco_tracker.board_corners_number == expected_corners_number: # draw board corners to notify a capture is done aruco_tracker.draw_board(video_frame.matrix) # append data - aruco_camera.store_calibration_data(aruco_tracker.get_board_corners(), aruco_tracker.get_board_corners_ids()) + aruco_camera.store_calibration_data(aruco_tracker.board_corners, aruco_tracker.board_corners_ids) cv.imshow('Tobii Camera Calibration', video_frame.matrix) @@ -106,10 +117,10 @@ def main(): aruco_camera.calibrate(aruco_board, video_frame.width, video_frame.height) print('\nCalibration succeeded!') - print(f'\nRMS:\n{aruco_camera.get_rms()}') + print(f'\nRMS:\n{aruco_camera.rms}') print(f'\nDimensions:\n{video_frame.width}x{video_frame.height}') - print(f'\nCamera matrix:\n{aruco_camera.get_K()}') - print(f'\nDistortion coefficients:\n{aruco_camera.get_D()}') + print(f'\nCamera matrix:\n{aruco_camera.K}') + print(f'\nDistortion coefficients:\n{aruco_camera.D}') aruco_camera.save_calibration_file(args.output) diff --git a/src/argaze/utils/tobii_imu_calibrate.py b/src/argaze/utils/tobii_imu_calibrate.py index 85472d9..c9e4813 100644 --- a/src/argaze/utils/tobii_imu_calibrate.py +++ b/src/argaze/utils/tobii_imu_calibrate.py @@ -91,7 +91,7 @@ def main(): tobii_imu.calibrate_accelerometer_axis_coefficients(i, axis_buffers['upward'], axis_buffers['downward'], axis_buffers['perpendicular']) - accelerometer_coefficients = tobii_imu.get_accelerometer_coefficients() + accelerometer_coefficients = tobii_imu.accelerometer_coefficients print(f'\n\nAccelerometer optimal linear fit coefficients over {progress} values for each axis:') print('\tX coefficients: ', accelerometer_coefficients[0]) @@ -180,14 +180,14 @@ def main(): case 'p': - gyroscope_offset = tobii_imu.get_gyroscope_offset() + gyroscope_offset = tobii_imu.gyroscope_offset print(f'\nGyroscope offset for each axis:') print('\tX offset: ', gyroscope_offset[0]) print('\tY offset: ', gyroscope_offset[1]) print('\tZ offset: ', gyroscope_offset[2]) - accelerometer_coefficients = tobii_imu.get_accelerometer_coefficients() + accelerometer_coefficients = tobii_imu.accelerometer_coefficients print(f'\nAccelerometer optimal linear fit coefficients for each axis:') print('\tX coefficients: ', accelerometer_coefficients[0]) diff --git a/src/argaze/utils/tobii_segment_record.py b/src/argaze/utils/tobii_segment_record.py index a45727b..a312aa5 100644 --- a/src/argaze/utils/tobii_segment_record.py +++ b/src/argaze/utils/tobii_segment_record.py @@ -20,8 +20,21 @@ def main(): parser.add_argument('-u', '--participant_name', metavar='PARTICIPANT_NAME', type=str, default=TobiiController.DEFAULT_PARTICIPANT_NAME, help='participant name') args = parser.parse_args() - # Create tobii controller - tobii_controller = TobiiController.TobiiController(args.tobii_ip, args.project_name, args.participant_name) + # Create tobii controller (with auto discovery network process if no ip argument is provided) + print("Looking for a Tobii Glasses Pro 2 device ...") + + try: + + tobii_controller = TobiiController.TobiiController(ip_address = args.tobii_ip, project_name = args.project_name, participant_name = args.participant_name) + print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') + + except ConnectionError as e: + + print(e) + exit() + + # Setup camera at 25 fps to work on Full HD video stream + tobii_controller.set_video_freq_25() # Calibrate tobii glasses tobii_controller.calibrate() @@ -31,7 +44,7 @@ def main(): # Start recording tobii_controller.start_recording(recording_id) - print('Recording started') + print(f'Recording {recording_id} started') # Define loop last_battery_level = 0 @@ -71,7 +84,7 @@ def main(): # Stop recording tobii_controller.stop_recording(recording_id) - print('Recording stopped') + print(f'Recording {recording_id} stopped') if __name__ == '__main__': diff --git a/src/argaze/utils/tobii_stream_arcube_display.py b/src/argaze/utils/tobii_stream_arcube_display.py index cc7e70f..aea547f 100644 --- a/src/argaze/utils/tobii_stream_arcube_display.py +++ b/src/argaze/utils/tobii_stream_arcube_display.py @@ -64,6 +64,9 @@ def main(): print(e) exit() + # Setup camera at 25 fps to work on Full HD video stream + tobii_controller.set_video_freq_25() + # Enable tobii data stream tobii_data_stream = tobii_controller.enable_data_stream() diff --git a/src/argaze/utils/tobii_stream_aruco_aoi_display.py b/src/argaze/utils/tobii_stream_aruco_aoi_display.py index b9e669e..ea699d4 100644 --- a/src/argaze/utils/tobii_stream_aruco_aoi_display.py +++ b/src/argaze/utils/tobii_stream_aruco_aoi_display.py @@ -49,6 +49,9 @@ def main(): print(e) exit() + # Setup camera at 25 fps to work on Full HD video stream + tobii_controller.set_video_freq_25() + # Enable tobii data stream tobii_data_stream = tobii_controller.enable_data_stream() diff --git a/src/argaze/utils/tobii_stream_display.py b/src/argaze/utils/tobii_stream_display.py index f437de9..d8dbc97 100644 --- a/src/argaze/utils/tobii_stream_display.py +++ b/src/argaze/utils/tobii_stream_display.py @@ -34,6 +34,9 @@ def main(): print(e) exit() + # Setup camera at 25 fps to work on Full HD video stream + tobii_controller.set_video_freq_25() + # Enable tobii data stream tobii_data_stream = tobii_controller.enable_data_stream() @@ -49,7 +52,7 @@ def main(): tobii_imu.load_calibration_file(args.imu_calibration) # Init head rotation speed - head_rotation_speed = numpy.array((0, 0, 0)) + head_rotation_speed = numpy.zeros(3).astype(int) # Init gaze position gaze_position_px = GazeFeatures.GazePosition((0, 0)) @@ -87,15 +90,15 @@ def main(): head_rotation_speed = tobii_imu.apply_gyroscope_offset(data_object).value.astype(int) * 5 case 'GazePosition': - + # Assess gaze position stream performance gaze_chrono.lap() # Ignore frame when gaze position is not valid if data_object.validity == 0: - + gaze_position_px = GazeFeatures.GazePosition( (int(data_object.value[0] * video_frame.width), int(data_object.value[1] * video_frame.height)) ) - + case 'GazePosition3D': # Ignore frame when gaze position 3D is not valid |