aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoCube.py401
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoMarker.py48
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py12
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoTracker.py229
-rw-r--r--src/argaze/ArUcoMarkers/__init__.py2
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiData.py145
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py236
-rw-r--r--src/argaze/TobiiGlassesPro2/__init__.py2
-rw-r--r--src/argaze/utils/tobii_imu_calibrate.py214
-rw-r--r--src/argaze/utils/tobii_stream_arcube_display.py302
-rw-r--r--src/argaze/utils/tobii_stream_display.py24
11 files changed, 1391 insertions, 224 deletions
diff --git a/src/argaze/ArUcoMarkers/ArUcoCube.py b/src/argaze/ArUcoMarkers/ArUcoCube.py
new file mode 100644
index 0000000..e2983df
--- /dev/null
+++ b/src/argaze/ArUcoMarkers/ArUcoCube.py
@@ -0,0 +1,401 @@
+#!/usr/bin/env python
+
+from dataclasses import dataclass, field
+import json
+import math
+import itertools
+
+from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker
+
+import numpy
+import cv2 as cv
+import cv2.aruco as aruco
+
+@dataclass
+class ArUcoCubeFace():
+ """Define cube face pose and marker."""
+
+ translation: numpy.array
+ """Position in cube referential."""
+
+ rotation: numpy.array
+ """Rotation in cube referential."""
+
+ marker: dict
+ """ArUco marker linked to the face """
+
+@dataclass
+class ArUcoCube():
+ """Define a cube with ArUco markers on each face and estimate its pose."""
+
+ dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary
+ """ArUco dictionary of cube markers."""
+
+ marker_size: int = field(init=False)
+ """Size of markers in centimeter."""
+
+ edge_size: int = field(init=False)
+ """Size of the cube edges in centimeter."""
+
+ faces: dict = field(init=False, default_factory=dict)
+ """All named faces of the cube and their ArUco markers."""
+
+ angle_tolerance: float = field(init=False)
+ """Angle error tolerance allowed to validate face pose in degree."""
+
+ distance_tolerance: float = field(init=False)
+ """Distance error tolerance allowed to validate face pose in centimeter."""
+
+ def __init__(self, configuration_filepath):
+ """Define cube from a .json file."""
+
+ with open(configuration_filepath) as configuration_file:
+
+ # Deserialize .json
+ # TODO find a better way
+ configuration = json.load(configuration_file)
+
+ # Load dictionary
+ self.dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(configuration['dictionary'])
+
+ # Load marker size
+ self.marker_size = configuration['marker_size']
+
+ # Load edge size
+ self.edge_size = configuration['edge_size']
+
+ # Load faces
+ self.faces = {}
+ for name, face in configuration['faces'].items():
+ marker = ArUcoMarker.ArUcoMarker(self.dictionary, face['marker'], self.marker_size)
+ self.faces[name] = ArUcoCubeFace(numpy.array(face['translation']).astype(numpy.float32), numpy.array(face['rotation']).astype(numpy.float32), marker)
+
+ # Load angle tolerance
+ self.angle_tolerance = configuration['angle_tolerance']
+
+ # Load distance tolerance
+ self.distance_tolerance = configuration['distance_tolerance']
+
+ # Init pose data
+ self.__translation = numpy.zeros(3)
+ self.__rotation = numpy.zeros(3)
+ self.__succeded = False
+ self.__validity = 0
+
+ # Process markers ids to speed up further calculations
+ self.__identifier_cache = {}
+ for name, face in self.faces.items():
+ self.__identifier_cache[face.marker.identifier] = name
+
+ # Process each face pose to speed up further calculations
+ self.__translation_cache = {}
+ for name, face in self.faces.items():
+ self.__translation_cache[name] = face.translation * self.edge_size / 2
+
+ # Process each face rotation matrix to speed up further calculations
+ self.__rotation_cache = {}
+ for name, face in self.faces.items():
+
+ # Create intrinsic rotation matrix
+ R = self.__make_rotation_matrix(*face.rotation)
+
+ assert(self.__is_rotation_matrix(R))
+
+ # Store rotation matrix
+ self.__rotation_cache[name] = R
+
+ # Process each axis-angle face combination to speed up further calculations
+ self.__angle_cache = {}
+ for (A_name, A_face), (B_name, B_face) in itertools.combinations(self.faces.items(), 2):
+
+ A = self.__rotation_cache[A_name]
+ B = self.__rotation_cache[B_name]
+
+ # Rotation matrix from A face to B face
+ AB = B.dot(A.T)
+
+ assert(self.__is_rotation_matrix(AB))
+
+ # Calculate axis-angle representation of AB rotation matrix
+ angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2))
+
+ try:
+ self.__angle_cache[A_name][B_name] = angle
+ except:
+ self.__angle_cache[A_name] = {B_name: angle}
+
+ try:
+ self.__angle_cache[B_name][A_name] = angle
+ except:
+ self.__angle_cache[B_name] = {A_name: angle}
+
+ # Process distance between face combination to speed up further calculations
+ self.__distance_cache = numpy.linalg.norm(numpy.array([0, 0, self.edge_size/2]) - numpy.array([0, self.edge_size/2, 0]))
+
+ def print_cache(self):
+ """Print pre-processed data."""
+
+ print('\nIdentifier cache:')
+ for i, name in self.__identifier_cache.items():
+ print(f'- {i}: {name}')
+
+ print('\nTranslation cache:')
+ for name, item in self.__translation_cache.items():
+ print(f'- {name}: {item}')
+
+ print('\nRotation cache:')
+ for name, item in self.__rotation_cache.items():
+ print(f'- {name}:\n{item}')
+
+ print('\nAngle cache:')
+ for A_name, A_angle_cache in self.__angle_cache.items():
+ for B_name, angle in A_angle_cache.items():
+ print(f'- {A_name}/{B_name}: {angle:3f}')
+
+ print(f'\nDistance cache: {self.__distance_cache}')
+
+ def __make_rotation_matrix(self, x, y, z):
+
+ # Create rotation matrix around x axis
+ c = numpy.cos(numpy.deg2rad(x))
+ s = numpy.sin(numpy.deg2rad(x))
+ Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]])
+
+ # Create rotation matrix around y axis
+ c = numpy.cos(numpy.deg2rad(y))
+ s = numpy.sin(numpy.deg2rad(y))
+ Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]])
+
+ # Create rotation matrix around z axis
+ c = numpy.cos(numpy.deg2rad(z))
+ s = numpy.sin(numpy.deg2rad(z))
+ Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])
+
+ # Return intrinsic rotation matrix
+ return Rx.dot(Ry.dot(Rz))
+
+ def __is_rotation_matrix(self, R):
+ """Checks if a matrix is a valid rotation matrix."""
+
+ I = numpy.identity(3, dtype = R.dtype)
+ return numpy.linalg.norm(I - numpy.dot(R.T, R)) < 1e-6
+
+ def __normalise_face_pose(self, name, face, F):
+
+ # Transform face rotation into cube rotation vector
+ R = self.__rotation_cache[name]
+ rvec, _ = cv.Rodrigues(F.dot(R))
+
+ #print(f'{name} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}')
+
+ # Transform face translation into cube translation vector
+ OF = face.translation
+ T = self.__translation_cache[name]
+ FC = F.dot(R.dot(T))
+
+ tvec = OF + FC
+
+ #print(f'{name} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}')
+
+ return rvec, tvec
+
+ def estimate_pose(self, tracked_markers):
+
+ # Init pose data
+ self.__translation = numpy.zeros(3)
+ self.__rotation = numpy.zeros(3)
+ self.__succeded = False
+ self.__validity = 0
+
+ # Don't try to estimate pose if there is no tracked markers
+ if len(tracked_markers) == 0:
+
+ return self.get_pose()
+
+ # Look for faces related to tracked markers
+ tracked_faces = {}
+ for (marker_id, marker) in tracked_markers.items():
+
+ try:
+ name = self.__identifier_cache[marker_id]
+ tracked_faces[name] = marker
+
+ except KeyError:
+ continue
+
+ #print('-------------- ArUcoCube pose estimation --------------')
+
+ # Pose validity checking is'nt possible when only one face of the cube is tracked
+ if len(tracked_faces.keys()) == 1:
+
+ # Get arcube pose from to the unique face pose
+ name, face = tracked_faces.popitem()
+ F, _ = cv.Rodrigues(face.rotation)
+
+ self.__rotation, self.__translation = self.__normalise_face_pose(name,face, F)
+ self.__succeded = True
+ self.__validity = 1
+
+ #print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
+ #print(f'arcube rotation vector: {self.__rotation[0][0]:3f} {self.__rotation[1][0]:3f} {self.__rotation[2][0]:3f}')
+ #print(f'arcube translation vector: {self.__translation[0]:3f} {self.__translation[1]:3f} {self.__translation[2]:3f}')
+
+ # Pose validity checking processes faces two by two
+ else:
+
+ valid_faces = []
+ valid_rvecs = []
+ valid_tvecs = []
+
+ for (A_name, A_face), (B_name, B_face) in itertools.combinations(tracked_faces.items(), 2):
+
+ #print(f'** {A_name} > {B_name}')
+
+ # Get face rotation estimation
+ # Use rotation matrix instead of rotation vector
+ A, _ = cv.Rodrigues(A_face.rotation)
+ B, _ = cv.Rodrigues(B_face.rotation)
+
+ # Rotation matrix from A face to B face
+ AB = B.dot(A.T)
+
+ assert(self.__is_rotation_matrix(AB))
+
+ # Calculate axis-angles representation of AB rotation matrix
+ angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2))
+
+ #print('rotation angle:')
+ #print(angle)
+
+ expected_angle = self.__angle_cache[A_name][B_name]
+
+ #print('expected angle:')
+ #print(expected_angle)
+
+ # Calculate distance between A face center and B face center
+ distance = numpy.linalg.norm(A_face.translation - B_face.translation)
+ expected_distance = self.__distance_cache
+
+ # Check angle and distance according given tolerance then normalise face pose
+ valid_angle = math.isclose(angle, expected_angle, abs_tol=self.angle_tolerance)
+ valid_distance = math.isclose(distance, expected_distance, abs_tol=self.distance_tolerance)
+
+ if valid_angle and valid_distance:
+
+ if A_name not in valid_faces:
+
+ # Remember this face is already validated
+ valid_faces.append(A_name)
+
+ rvec, tvec = self.__normalise_face_pose(A_name, A_face, A)
+
+ # Store normalised face pose
+ valid_rvecs.append(rvec)
+ valid_tvecs.append(tvec)
+
+ if B_name not in valid_faces:
+
+ # Remember this face is already validated
+ valid_faces.append(B_name)
+
+ rvec, tvec = self.__normalise_face_pose(B_name, B_face, B)
+
+ # Store normalised face pose
+ valid_rvecs.append(rvec)
+ valid_tvecs.append(tvec)
+
+ if len(valid_faces) > 1:
+
+ # Consider arcube rotation as the mean of all valid translations
+ # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before.
+ self.__rotation = numpy.mean(numpy.array(valid_rvecs), axis=0)
+
+ # Consider arcube translation as the mean of all valid translations
+ self.__translation = numpy.mean(numpy.array(valid_tvecs), axis=0)
+
+ #print(':::::::::::::::::::::::::::::::::::::::::::::::::::')
+ #print(f'arcube rotation vector: {self.__rotation[0][0]:3f} {self.__rotation[1][0]:3f} {self.__rotation[2][0]:3f}')
+ #print(f'arcube translation vector: {self.__translation[0]:3f} {self.__translation[1]:3f} {self.__translation[2]:3f}')
+
+ self.__succeded = True
+ self.__validity = len(valid_faces)
+
+ #print('----------------------------------------------------')
+
+ return self.get_pose()
+
+ def get_pose(self):
+
+ return self.__translation, self.__rotation, self.__succeded, self.__validity
+
+ def set_pose(self, tvec = numpy.array([]), rvec = numpy.array([])):
+
+ if tvec.size == 3:
+ self.__translation = tvec
+
+ if rvec.size == 3:
+ self.__rotation = rvec
+
+ self.__succeded = True
+ self.__validity = 0
+
+ def draw(self, frame, K, D):
+
+ l = self.edge_size / 2
+ ll = self.edge_size
+
+ # Select color according validity score
+ n = 95 * self.__validity if self.__validity < 2 else 0
+ f = 159 * self.__validity if self.__validity < 2 else 255
+
+ try:
+
+ # Draw axis
+ axisPoints = numpy.float32([[ll, 0, 0], [0, ll, 0], [0, 0, ll], [0, 0, 0]]).reshape(-1, 3)
+ axisPoints, _ = cv.projectPoints(axisPoints, self.__rotation, self.__translation, K, D)
+ axisPoints = axisPoints.astype(int)
+
+ frame = cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (n,n,f), 5) # X (red)
+ frame = cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (n,f,n), 5) # Y (green)
+ frame = cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (f,n,n), 5) # Z (blue)
+
+ # Draw left face
+ leftPoints = numpy.float32([[-l, l, l], [-l, -l, l], [-l, -l, -l], [-l, l, -l]]).reshape(-1, 3)
+ leftPoints, _ = cv.projectPoints(leftPoints, self.__rotation, self.__translation, K, D)
+ leftPoints = leftPoints.astype(int)
+
+ frame = cv.line(frame, tuple(leftPoints[0].ravel()), tuple(leftPoints[1].ravel()), (n,n,f), 2)
+ frame = cv.line(frame, tuple(leftPoints[1].ravel()), tuple(leftPoints[2].ravel()), (n,n,f), 2)
+ frame = cv.line(frame, tuple(leftPoints[2].ravel()), tuple(leftPoints[3].ravel()), (n,n,f), 2)
+ frame = cv.line(frame, tuple(leftPoints[3].ravel()), tuple(leftPoints[0].ravel()), (n,n,f), 2)
+
+ # Draw top face
+ topPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, l, -l], [l, l, -l]]).reshape(-1, 3)
+ topPoints, _ = cv.projectPoints(topPoints, self.__rotation, self.__translation, K, D)
+ topPoints = topPoints.astype(int)
+
+ frame = cv.line(frame, tuple(topPoints[0].ravel()), tuple(topPoints[1].ravel()), (n,f,n), 2)
+ frame = cv.line(frame, tuple(topPoints[1].ravel()), tuple(topPoints[2].ravel()), (n,f,n), 2)
+ frame = cv.line(frame, tuple(topPoints[2].ravel()), tuple(topPoints[3].ravel()), (n,f,n), 2)
+ frame = cv.line(frame, tuple(topPoints[3].ravel()), tuple(topPoints[0].ravel()), (n,f,n), 2)
+
+ # Draw front face
+ frontPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, -l, l], [l, -l, l]]).reshape(-1, 3)
+ frontPoints, _ = cv.projectPoints(frontPoints, self.__rotation, self.__translation, K, D)
+ frontPoints = frontPoints.astype(int)
+
+ frame = cv.line(frame, tuple(frontPoints[0].ravel()), tuple(frontPoints[1].ravel()), (f,n,n), 2)
+ frame = cv.line(frame, tuple(frontPoints[1].ravel()), tuple(frontPoints[2].ravel()), (f,n,n), 2)
+ frame = cv.line(frame, tuple(frontPoints[2].ravel()), tuple(frontPoints[3].ravel()), (f,n,n), 2)
+ frame = cv.line(frame, tuple(frontPoints[3].ravel()), tuple(frontPoints[0].ravel()), (f,n,n), 2)
+
+ except Exception as e:
+
+ print(e)
+ print(self.__translation)
+ print(self.__rotation)
+ print(self.__succeded)
+ print(self.__validity)
+ print(axisPoints)
+
+ return frame
diff --git a/src/argaze/ArUcoMarkers/ArUcoMarker.py b/src/argaze/ArUcoMarkers/ArUcoMarker.py
new file mode 100644
index 0000000..4f716bb
--- /dev/null
+++ b/src/argaze/ArUcoMarkers/ArUcoMarker.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+
+from dataclasses import dataclass, field
+
+from argaze.ArUcoMarkers import ArUcoMarkersDictionary
+
+import numpy
+import cv2 as cv
+import cv2.aruco as aruco
+
+@dataclass
+class ArUcoMarker():
+ """Define ArUco marker class."""
+
+ dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary
+ """ """
+
+ identifier: int
+ """ """
+
+ size: float
+ """Size of marker in centimeters."""
+
+ corners: numpy.array = field(init=False, repr=False)
+ """Estimated 2D corner positions in camera image referential."""
+
+ translation: numpy.array = field(init=False, repr=False)
+ """Estimated 3D center position in camera referential."""
+
+ rotation: numpy.array = field(init=False, repr=False)
+ """Estimated 3D marker rotation in camera referential."""
+
+ points: numpy.array = field(init=False, repr=False)
+ """Estimated 3D corners positions in camera referential."""
+
+ def center(self, i):
+ """Get 2D center position in camera image referential."""
+ return self.corners[0].mean(axis=0)
+
+ def draw(self, frame, K, D):
+ """Draw marker in frame."""
+
+ # Draw marker axis if pose has been estimated
+ if self.translation.size == 3 and self.rotation.size == 3:
+
+ cv.drawFrameAxes(frame, K, D, self.rotation, self.translation, self.size)
+
+ aruco.drawDetectedMarkers(frame, [self.corners], numpy.array([self.identifier])) \ No newline at end of file
diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py
index 58b55b8..84a78b9 100644
--- a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py
+++ b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py
@@ -32,12 +32,14 @@ all_aruco_markers_dictionaries = {
class ArUcoMarkersDictionary():
"""Handle an ArUco markers dictionary."""
- def __init__(self, aruco_dictionary_name):
+ def __init__(self, name):
- if all_aruco_markers_dictionaries.get(aruco_dictionary_name, None) is None:
- raise NameError(f'Bad ArUco markers dictionary name: {aruco_dictionary_name}')
+ if all_aruco_markers_dictionaries.get(name, None) is None:
+ raise NameError(f'Bad ArUco markers dictionary name: {name}')
- dict_name_split = aruco_dictionary_name.split('_')
+ self.name = name
+
+ dict_name_split = name.split('_')
self.__format = dict_name_split[1]
@@ -77,7 +79,7 @@ class ArUcoMarkersDictionary():
self.__number = int(dict_name_split[2])
- self.__aruco_dict = aruco.Dictionary_get(all_aruco_markers_dictionaries[aruco_dictionary_name])
+ self.__aruco_dict = aruco.Dictionary_get(all_aruco_markers_dictionaries[self.name])
def create_marker(self, i, dpi=300):
"""Create a marker image."""
diff --git a/src/argaze/ArUcoMarkers/ArUcoTracker.py b/src/argaze/ArUcoMarkers/ArUcoTracker.py
index b887812..f5b1715 100644
--- a/src/argaze/ArUcoMarkers/ArUcoTracker.py
+++ b/src/argaze/ArUcoMarkers/ArUcoTracker.py
@@ -3,7 +3,7 @@
import json
from collections import Counter
-from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoCamera
+from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoCamera
import numpy
import cv2 as cv
@@ -45,14 +45,14 @@ ArUcoTrackerParameters = [
class ArUcoTracker():
"""Track ArUco markers into a frame."""
- def __init__(self, aruco_dictionary_name: str, marker_length: float, camera: ArUcoCamera.ArUcoCamera):
+ def __init__(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary, marker_size: float, camera: ArUcoCamera.ArUcoCamera):
"""Define which markers library to track and their size"""
# load ArUco markers dictionary
- self.__aruco_dict = ArUcoMarkersDictionary.ArUcoMarkersDictionary(aruco_dictionary_name)
+ self.__dictionary = dictionary
# define marker length in centimeter
- self.__marker_length = marker_length
+ self.__marker_size = marker_size
# define camera
self.__camera = camera
@@ -62,19 +62,8 @@ class ArUcoTracker():
self.__detector_parameters.cornerRefinementMethod = aruco.CORNER_REFINE_CONTOUR # to get a better pose estimation
self.__detector_parameters_loaded = {}
- # define tracked markers data
- self.__markers_corners = []
- self.__markers_ids = []
- self.__rvecs = []
- self.__tvecs = []
- self.__points = []
-
- # define rejected markers data
- self.__rejected_markers_corners = []
- self.__rejected_markers_ids = []
- self.__rejected_rvecs = []
- self.__rejected_tvecs = []
- self.__rejected_points = []
+ # init tracked markers data
+ self.__tracked_markers = {}
# define tracked board data
self.__board = None
@@ -84,7 +73,7 @@ class ArUcoTracker():
# define track metrics data
self.__track_count = 0
- self.__tracked_markers = []
+ self.__tracked_ids = []
self.__rejected_markers = []
def load_configuration_file(self, configuration_filepath):
@@ -115,98 +104,56 @@ class ArUcoTracker():
def track(self, frame, estimate_pose = True, check_rotation = False):
"""Track ArUco markers in frame."""
+ self.__tracked_markers = {}
+ markers_corners, markers_ids, markers_rvecs, markers_tvecs, markers_points = [], [], [], [], []
+
# DON'T MIRROR FRAME : it makes the markers detection to fail
- # detect markers from gray picture
- gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
- self.__markers_corners, self.__markers_ids, rejectedPoints = aruco.detectMarkers(gray, self.__aruco_dict.get_markers(), parameters = self.__detector_parameters)
- self.__rejected_markers_corners, __rejected_markers_ids = [], []
-
- if len(self.__markers_corners) > 0 and estimate_pose:
-
- # markers pose estimation
- self.__rvecs, self.__tvecs, self.__points = aruco.estimatePoseSingleMarkers(self.__markers_corners, self.__marker_length, self.__camera.get_K(), self.__camera.get_D())
-
- # optional: check marker rotation as described in [this issue](https://github.com/opencv/opencv/issues/8813)
- if check_rotation:
-
- valid_rotation_markers = []
- bad_rotation_markers = []
- for i, rvec in enumerate(self.__rvecs):
-
- tvec = self.__tvecs[i][0]
- R, _ = cv.Rodrigues(rvec)
-
- zAxisPoint = (tvec.dot(R) + numpy.array([0., 0., 1.])).dot(R.T)
- zAxis = zAxisPoint - tvec
-
- # TODO: How to describe the expected Z axis orientation ?
- # In some situations, you can't provide such information.
-
- # !!! Here a description specific to SimOne cockpit !!!
- zAxisExpectedDict = {
- 1: numpy.array([0.5, 0.5, -1]),
- 2: numpy.array([0.5, 0.5, -1]),
- 3: numpy.array([1, -1, -1]),
- 4: numpy.array([1, -1, -1]),
- 5: numpy.array([1, -1, -1]),
- 6: numpy.array([1, 1, -1]),
- 7: numpy.array([1, 1, -1]),
- 8: numpy.array([1, -1, -1])
- }
-
- zAxisExpected = zAxisExpectedDict[self.__markers_ids[i][0]]
-
- cosine_angle = numpy.dot(zAxis/numpy.linalg.norm(zAxis), zAxisExpected)
- degree_angle = numpy.rad2deg(numpy.arccos(cosine_angle))
- '''
- print(self.__markers_ids[i][0])
- print('marker position: ', tvec)
- print('zAxisPoint: ', zAxisPoint)
- print('zAxis: ', zAxis)
- print('zAxisExpected: ', zAxisExpected)
- print('cosine_angle: ', cosine_angle)
- print('degree_angle: ', degree_angle)
- '''
- # Is the marker oriented as expected ?
- if cosine_angle < 0 or cosine_angle > 1:
-
- #print('valid')
- valid_rotation_markers.append(i)
-
- else:
-
- #print('bad')
- bad_rotation_markers.append(i)
-
- # update track metrics
- self.__rejected_markers.append(self.__markers_ids[i][0])
-
- # keep markers with bad rotation
- self.__rejected_markers_corners = tuple([self.__markers_corners[i] for i in bad_rotation_markers])
- self.__rejected_markers_ids = self.__markers_ids[bad_rotation_markers]
- self.__rejected_rvecs = self.__rvecs[bad_rotation_markers]
- self.__rejected_tvecs = self.__tvecs[bad_rotation_markers]
- self.__rejected_points = self.__points[bad_rotation_markers]
-
- # keep markers with valid rotation
- self.__markers_corners = tuple([self.__markers_corners[i] for i in valid_rotation_markers])
- self.__markers_ids = self.__markers_ids[valid_rotation_markers]
- self.__rvecs = self.__rvecs[valid_rotation_markers]
- self.__tvecs = self.__tvecs[valid_rotation_markers]
- self.__points = self.__points[valid_rotation_markers]
+ # Track markers into gray picture
+ markers_corners, markers_ids, _ = aruco.detectMarkers(cv.cvtColor(frame, cv.COLOR_BGR2GRAY), self.__dictionary.get_markers(), parameters = self.__detector_parameters)
- else:
+ if len(markers_corners) > 0:
+
+ # Pose estimation is optional
+ if estimate_pose:
+
+ markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(markers_corners, self.__marker_size, self.__camera.get_K(), self.__camera.get_D())
+
+ # Gather tracked markers data and update metrics
+ self.__track_count += 1
+
+ for i, marker_id in enumerate(markers_ids.T[0]):
+
+ marker = ArUcoMarker.ArUcoMarker(self.__dictionary, marker_id, self.__marker_size)
- self.__rvecs = []
- self.__tvecs = []
- self.__points = []
+ marker.corners = markers_corners[i]
- # update track metrics
- self.__track_count += 1
- for marker_id in self.get_markers_ids():
- self.__tracked_markers.append(marker_id)
+ if estimate_pose:
+ marker.translation = markers_tvecs[i][0]
+ marker.rotation = markers_rvecs[i]
+ marker.points = markers_points[i]
+ self.__tracked_markers[marker_id] = marker
+
+ self.__tracked_ids.append(marker_id)
+
+ def get_tracked_markers(self):
+ """Access to tracked markers dictionary."""
+
+ return self.__tracked_markers
+
+ def get_tracked_markers_number(self):
+ """Return tracked markers number."""
+
+ return len(list(self.__tracked_markers.keys()))
+
+ def draw_tracked_markers(self, frame):
+ """Draw traked markers."""
+
+ for marker_id, marker in self.__tracked_markers.items():
+
+ marker.draw(frame, self.__camera.get_K(), self.__camera.get_D())
+
def track_board(self, frame, board, expected_markers_number):
"""Track ArUco markers board in frame setting up the number of detected markers needed to agree detection."""
@@ -214,7 +161,7 @@ class ArUcoTracker():
# detect markers from gray picture
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
- self.__markers_corners, self.__markers_ids, rejectedPoints = aruco.detectMarkers(gray, self.__aruco_dict.get_markers(), parameters = self.__detector_parameters)
+ self.__markers_corners, self.__markers_ids, rejectedPoints = aruco.detectMarkers(gray, self.__dictionary.get_markers(), parameters = self.__detector_parameters)
# if all board markers are detected
if self.get_markers_number() == expected_markers_number:
@@ -229,33 +176,6 @@ class ArUcoTracker():
self.__board_corners = []
self.__board_corners_ids = []
- def draw(self, frame):
- """Draw tracked markers in frame."""
-
- # draw detected markers square
- if len(self.__markers_corners) > 0:
-
- # draw marker axis if pose has been estimated
- if len(self.__rvecs) > 0:
-
- for (i, marker_id) in enumerate(self.__markers_ids):
-
- cv.drawFrameAxes(frame, self.__camera.get_K(), self.__camera.get_D(), self.__rvecs[i], self.__tvecs[i], self.__marker_length)
-
- aruco.drawDetectedMarkers(frame, self.__markers_corners, self.__markers_ids)
-
- # draw rejected markers square
- if len(self.__rejected_markers_corners) > 0:
-
- # draw marker axis if pose has been estimated
- if len(self.__rejected_rvecs) > 0:
-
- for (i, marker_id) in enumerate(self.__rejected_markers_ids):
-
- cv.drawFrameAxes(frame, self.__camera.get_K(), self.__camera.get_D(), self.__rejected_rvecs[i], self.__rejected_tvecs[i], self.__marker_length)
-
- aruco.drawDetectedMarkers(frame, self.__rejected_markers_corners, self.__rejected_markers_ids, borderColor=(0, 255, 255))
-
def draw_board(self, frame):
"""Draw tracked board corners in frame."""
@@ -266,52 +186,11 @@ class ArUcoTracker():
def reset_track_metrics(self):
"""Enable marker tracking metrics."""
self.__track_count = 0
- self.__tracked_markers = []
+ self.__tracked_ids = []
def get_track_metrics(self):
"""Get marker tracking metrics."""
- return self.__track_count, Counter(self.__tracked_markers), Counter(self.__rejected_markers)
-
- def get_markers_dictionay(self):
- """Get tracked aruco markers dictionary."""
- return self.__aruco_dict
-
- def get_markers_number(self):
- """Get tracked markers number."""
- return len(self.__markers_corners)
-
- def get_markers_ids(self):
- """Get tracked markers identifers."""
- if self.__markers_ids is not None:
- return [i[0] for i in self.__markers_ids]
- else:
- return []
-
- def get_marker_index(self, marker_id):
- """Retreive marker index of a given merker id.
- Raise ValueError if not found."""
-
- return list(self.__markers_ids).index(marker_id)
-
- def get_marker_corners(self, i):
- """Get marker i corners."""
- return self.__markers_corners[i][0]
-
- def get_marker_center(self, i):
- """Get marker i center coordinates."""
- return self.__markers_corners[i][0].mean(axis=0)
-
- def get_marker_rotation(self, i):
- """Get marker i rotation vector."""
- return self.__rvecs[i]
-
- def get_marker_translation(self, i):
- """Get marker i translation vector."""
- return self.__tvecs[i]
-
- def get_marker_points(self, i):
- """Get marker i points."""
- return self.__points[i]
+ return self.__track_count, Counter(self.__tracked_ids), Counter(self.__rejected_markers)
def get_board_corners_number(self):
"""Get tracked board corners number."""
diff --git a/src/argaze/ArUcoMarkers/__init__.py b/src/argaze/ArUcoMarkers/__init__.py
index 28386d2..3a74eeb 100644
--- a/src/argaze/ArUcoMarkers/__init__.py
+++ b/src/argaze/ArUcoMarkers/__init__.py
@@ -2,4 +2,4 @@
.. include:: README.md
"""
__docformat__ = "restructuredtext"
-__all__ = ['ArUcoMarkersDictionary', 'ArUcoBoard', 'ArUcoCamera', 'ArUcoTracker'] \ No newline at end of file
+__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoCamera', 'ArUcoTracker', 'ArUcoCube'] \ No newline at end of file
diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py
index 95fa947..71ce234 100644
--- a/src/argaze/TobiiGlassesPro2/TobiiData.py
+++ b/src/argaze/TobiiGlassesPro2/TobiiData.py
@@ -13,6 +13,8 @@ from argaze.TobiiGlassesPro2 import TobiiNetworkInterface
from argaze.utils import MiscFeatures
+import numpy
+
@dataclass
class DirSig():
"""Define dir sig data (dir sig)."""
@@ -50,13 +52,13 @@ class Event():
class Accelerometer():
"""Define accelerometer data (ac)."""
- value: tuple((float, float, float))
+ value: numpy.array
@dataclass
class Gyroscope():
"""Define gyroscope data (gy)."""
- value: tuple((float, float, float))
+ value: numpy.array
@dataclass
class PupilCenter():
@@ -286,66 +288,96 @@ class TobiiDataSegment():
def get_path(self):
return self.__segment_data_path
-class TobiiDataStream(threading.Thread):
+class TobiiDataStream():
"""Capture Tobii Glasses Pro 2 data stream in separate thread."""
reading_callback = None
def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface):
- """Initialise thread super class as a deamon dedicated to data reception."""
+ """Initialise network connection and prepare data reception."""
- threading.Thread.__init__(self)
- threading.Thread.daemon = True
-
+ # Enable network connection
self.__network = network_interface
self.__data_socket = self.__network.make_socket()
+ # Data reception
+ self.__data_thread = None
self.__data_queue = queue.Queue(50) # TODO : set queue size according technical reason
-
- self.__stop_event = threading.Event()
- self.__read_lock = threading.Lock()
+ self.__json_data_parser = TobiiJsonDataParser()
+ self.__first_ts = 0
- # prepare keep alive message
+ # Data capture
+ self.__data_stream_selector = ''
+ self.__data_ts_buffer = None
+ self.__data_ts_buffer_size = 0
+
+ # Keep connection alive
self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
self.__keep_alive_thread.daemon = True
-
- self.__json_data_parser = TobiiJsonDataParser()
+ self.__keep_alive_thread.start()
def __del__(self):
- """Stop data reception before destruction."""
+ """Stop data reception and network connection before destruction."""
- if self.is_alive():
-
- self.close()
+ if self.__data_thread != None:
+
+ threading.Thread.join(self.__data_thread)
+ self.__data_thread = None
+
+ threading.Thread.join(self.__keep_alive_thread)
+
+ self.__data_socket.close()
def __keep_alive(self):
- """Maintain connection."""
+ """Maintain network connection and clear socket when is not read."""
- while not self.__stop_event.isSet():
+ while True:
+
+ # if no thread is reading data socket
+ if self.__data_thread == None:
- self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg)
+ self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg)
- time.sleep(1)
+ clear_count = 0
+ while clear_count < 1000 and self.__data_thread == None:
+
+ # Clear socket each milli second
+ time.sleep(0.001)
+ self.__network.grab_data(self.__data_socket)
+ clear_count += 1
+
+ else:
+
+ self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg)
+ time.sleep(1)
def open(self):
"""Start data reception."""
- self.__first_ts = 0
- self.__keep_alive_thread.start()
- threading.Thread.start(self)
+ if self.__data_thread == None:
+
+ self.__first_ts = 0
+
+ self.__data_thread = threading.Thread(target = self.__run)
+ self.__data_thread.daemon = True
+
+ self.__stop_event = threading.Event()
+ self.__read_lock = threading.Lock()
+
+ self.__data_thread.start()
def close(self):
"""Stop data reception definitively."""
- self.__stop_event.set()
+ if self.__data_thread != None:
- threading.Thread.join(self.__keep_alive_thread)
- threading.Thread.join(self)
+ self.__stop_event.set()
- self.__data_socket.close()
+ threading.Thread.join(self.__data_thread)
+ self.__data_thread = None
- def run(self):
+ def __run(self):
"""Managed received data for sync and async reading case.
- Sync: send data to callback function.
- Async: store data into a locked queue for further reading."""
@@ -454,14 +486,59 @@ class TobiiDataStream(threading.Thread):
# convert timestamp
data_ts = json_data.pop('ts')
+ # convert json data into data object
+ data_object = self.__json_data_parser.parse_data(status, json_data)
+ data_object_type = type(data_object).__name__
+
# keep first timestamp to offset all timestamps
if self.__first_ts == 0:
self.__first_ts = data_ts
-
+
data_ts -= self.__first_ts
- # convert json data into data object
- data_object = self.__json_data_parser.parse_data(status, json_data)
- data_object_type = type(data_object).__name__
-
return data_ts, data_object, data_object_type
+
+ def __capture_callback(self, data_ts, data_object, data_object_type):
+ """Store incoming data into timestamped buffer"""
+
+ if data_object_type == self.__data_stream_selector:
+
+ if len(self.__data_ts_buffer.keys()) < self.__data_ts_buffer_size:
+
+ # update first timestamp if next timestamps are negative
+ if data_ts < 0:
+ self.__first_ts += data_ts
+ data_ts = 0
+
+ self.__data_ts_buffer[data_ts] = data_object
+
+ def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500):
+ """Capture a data stream into buffer."""
+
+ # Prepare for data acquisition
+ self.__data_stream_selector = data_object_type
+ self.__data_ts_buffer = data_ts_buffer
+ self.__data_ts_buffer_size = sample_number
+
+ # Subscribe to data stream
+ memo_reading_callback = self.reading_callback
+ self.reading_callback = self.__capture_callback
+
+ # Start data reception
+ self.open()
+
+ # Share data acquisition progress
+ buffer_size = 0
+ while buffer_size < sample_number:
+
+ time.sleep(0.1)
+
+ buffer_size = len(self.__data_ts_buffer.keys())
+
+ yield buffer_size
+
+ # Stop data reception
+ self.close()
+
+ # Unsubscribe to tobii data stream
+ self.reading_callback = memo_reading_callback
diff --git a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py b/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py
new file mode 100644
index 0000000..a3c2bbd
--- /dev/null
+++ b/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py
@@ -0,0 +1,236 @@
+#!/usr/bin/env python
+
+import json
+import time
+import math
+
+from argaze import DataStructures
+from argaze.TobiiGlassesPro2 import TobiiData
+
+import numpy
+from scipy.optimize import curve_fit
+import cv2 as cv
+
+EARTH_GRAVITY = -9.81 # m/s2
+EARTH_GRAVITY_VECTOR = [0, EARTH_GRAVITY, 0]
+
+class TobiiInertialMeasureUnit():
+ """Ease Tobbi IMU data handling."""
+
+ def __init__(self):
+ """Define IMU calibration data."""
+
+ self.__gyroscope_offset = numpy.zeros(3)
+ self.__accelerometer_coefficients = numpy.array([[1., 0.], [1., 0.], [1., 0.]])
+
+ self.__plumb = numpy.array(EARTH_GRAVITY_VECTOR)
+
+ self.reset_rotation()
+ self.reset_translation()
+
+ def load_calibration_file(self, calibration_filepath):
+ """Load IMU calibration from a .json file."""
+
+ with open(calibration_filepath) as calibration_file:
+
+ # Deserialize .json
+ # TODO find a better way
+ calibration_data = json.load(calibration_file)
+
+ # Load calibration data
+ self.__gyroscope_offset = numpy.array(calibration_data['gyroscope_offset'])
+ self.__accelerometer_coefficients = numpy.array(calibration_data['accelerometer_coefficients'])
+
+ def save_calibration_file(self, calibration_filepath):
+ """Save IMU calibration into .json file."""
+
+ calibration_data = {
+ 'gyroscope_offset': list(self.__gyroscope_offset),
+ 'accelerometer_coefficients': [list(self.__accelerometer_coefficients[0]), list(self.__accelerometer_coefficients[1]), list(self.__accelerometer_coefficients[2])]
+ }
+
+ with open(calibration_filepath, 'w', encoding='utf-8') as calibration_file:
+
+ json.dump(calibration_data, calibration_file, ensure_ascii=False, indent=4)
+
+ def calibrate_gyroscope_offset(self, gyroscope_ts_buffer):
+
+ # Consider gyroscope values without timestamps
+ gyroscope_values = []
+ for ts, data_object in gyroscope_ts_buffer.items():
+ gyroscope_values.append(data_object.value)
+
+ # Calculate average value for each axis
+ gx_offset = numpy.mean(numpy.array(gyroscope_values)[:, 0])
+ gy_offset = numpy.mean(numpy.array(gyroscope_values)[:, 1])
+ gz_offset = numpy.mean(numpy.array(gyroscope_values)[:, 2])
+
+ # Store result
+ self.__gyroscope_offset = numpy.array([gx_offset, gy_offset, gz_offset])
+
+ return self.__gyroscope_offset
+
+ def get_gyroscope_offset(self):
+ """Get gyroscope offset."""
+
+ return self.__gyroscope_offset
+
+ def apply_gyroscope_offset(self, gyroscope_data_object):
+ """Remove gyroscope offset to given gyroscope data."""
+
+ return TobiiData.Gyroscope(gyroscope_data_object.value - self.__gyroscope_offset)
+
+ def reset_rotation(self, ts = 0, gyroscope = numpy.zeros(3), rotation = numpy.zeros(3)):
+ """Reset rotation value before to start integration process."""
+
+ self.__last_gyroscope_ts = ts
+ self.__last_gyroscope = gyroscope
+
+ self.__rotation = rotation
+
+ def update_rotation(self, gyroscope_data_ts, gyroscope_data_object):
+ """Integrate timestamped gyroscope values to update rotation."""
+
+ # Convert deg/s into deg/ms
+ current_gyroscope = gyroscope_data_object.value * 1e-3
+
+ # Init gyroscope integration
+ if self.__last_gyroscope_ts == 0 and self.__last_gyroscope.all() == 0:
+
+ self.reset_rotation(gyroscope_data_ts, current_gyroscope)
+
+ # Calculate elapsed time in ms
+ delta_time = (gyroscope_data_ts - self.__last_gyroscope_ts) / 1e3
+
+ # Integrate gyroscope
+ self.__rotation += self.__last_gyroscope * delta_time
+
+ # Store current as last
+ self.__last_gyroscope_ts = gyroscope_data_ts
+ self.__last_gyroscope = current_gyroscope
+
+ def get_rotation(self):
+ """Return current rotation value."""
+
+ return self.__rotation
+
+ def _accelerometer_linear_fit(self, x, a, b):
+ """Linear function for accelerometer axis correction."""
+ return a * x + b
+
+ def calibrate_accelerometer_axis_coefficients(self, axis, upward_ts_buffer, downward_ts_buffer, perpendicular_ts_buffer):
+ """Calibrate one accelerometer axis using three data set (upward/+1g, downward/-1g, perpendicular/0g) for linear fit."""
+
+ # Consider accelerometer axis values without timestamps
+ accelerometer_values = []
+ expected_values = []
+
+ for (upward_ts, upward_data_object), (downward_ts, downward_data_object), (perpendicular_ts, perpendicular_data_object) in zip(upward_ts_buffer.items(), downward_ts_buffer.items(), perpendicular_ts_buffer.items()):
+
+ accelerometer_values.append(upward_data_object.value[axis])
+ expected_values.append(+EARTH_GRAVITY)
+
+ accelerometer_values.append(downward_data_object.value[axis])
+ expected_values.append(-EARTH_GRAVITY)
+
+ accelerometer_values.append(perpendicular_data_object.value[axis])
+ expected_values.append(0.0)
+
+ # Find optimal coefficients according linear fit between accelerometer values and expected values
+ optimal_coefficients, _ = curve_fit(self._accelerometer_linear_fit, accelerometer_values, expected_values, maxfev = 10000)
+
+ # Store results for the given axis
+ self.__accelerometer_coefficients[axis] = numpy.array(optimal_coefficients)
+
+ def get_accelerometer_coefficients(self):
+ """Return accelerometer coefficients."""
+
+ return self.__accelerometer_coefficients
+
+ def apply_accelerometer_coefficients(self, accelerometer_data_object):
+ """Add accelerometer offset to given accelerometer data."""
+
+ x = self._accelerometer_linear_fit(accelerometer_data_object.value[0], *self.__accelerometer_coefficients[0])
+ y = self._accelerometer_linear_fit(accelerometer_data_object.value[1], *self.__accelerometer_coefficients[1])
+ z = self._accelerometer_linear_fit(accelerometer_data_object.value[2], *self.__accelerometer_coefficients[2])
+
+ return TobiiData.Accelerometer(numpy.array([x, y , z]))
+
+ def reset_translation(self, ts = 0, accelerometer = numpy.zeros(3), translation_speed = numpy.zeros(3), translation = numpy.zeros(3)):
+ """Reset translation value before to start integration process."""
+
+ print(f'> reset_translation: translation_speed={translation_speed}, translation={translation}')
+
+ self.__last_accelerometer_ts = ts
+ self.__last_accelerometer = accelerometer
+ self.__last_translation_speed = translation_speed
+
+ self.__translation_speed = translation_speed
+ self.__translation = translation
+
+ print('\tself.__translation_speed(cm/s)=', self.__translation_speed)
+ print('\tself.__translation(cm)=', self.__translation)
+
+ print('< reset_translation')
+
+ def update_translation(self, accelerometer_data_ts, accelerometer_data_object):
+ """Integrate timestamped accelerometer values to update translation."""
+ print('> update_translation: accelerometer_data_ts=', accelerometer_data_ts)
+
+ # Convert m/s2 into cm/ms2
+ current_accelerometer = accelerometer_data_object.value * 1e-4
+
+ print('\tcurrent_accelerometer(cm/ms2)=', current_accelerometer)
+ print('\tcurrent_accelerometer norm=', numpy.linalg.norm(current_accelerometer))
+
+ # Init accelerometer integration
+ if self.__last_accelerometer_ts == 0 and self.__last_accelerometer.all() == 0:
+
+ self.reset_translation(ts = accelerometer_data_ts, accelerometer = current_accelerometer)
+
+ # Calculate elapsed time in ms
+ delta_time = (accelerometer_data_ts - self.__last_accelerometer_ts) / 1e3
+
+ print('\tdelta_time=', delta_time)
+
+ # Integrate accelerometer
+ self.__translation_speed += self.__last_accelerometer * delta_time
+ self.__translation += self.__last_translation_speed * delta_time
+
+ print('\tself.__translation_speed(cm/s)=', self.__translation_speed)
+ print('\tself.__translation(cm)=', self.__translation)
+
+ # Store current as last
+ self.__last_accelerometer = current_accelerometer
+ self.__last_accelerometer_ts = accelerometer_data_ts
+ self.__last_translation_speed = self.__translation_speed
+
+ print('< update_translation')
+
+ #else:
+
+ # print('no valid head plumb')
+
+ def get_translation(self):
+ """Return current translation speed and translation values."""
+
+ return self.__translation_speed, self.__translation
+
+ def rotate_plumb(self, rvec):
+ """Rotate imu plumb to remove gravity effect in accelerometer data."""
+
+ C, _ = cv.Rodrigues(rvec)
+ self.__plumb = -C.dot(EARTH_GRAVITY_VECTOR)
+
+ # Check plumb length
+ assert(math.isclose(numpy.linalg.norm(self.__plumb), math.fabs(EARTH_GRAVITY), abs_tol=1e-3))
+
+ def get_plumb(self):
+ """Return plumb vector."""
+
+ return self.__plumb
+
+ def apply_plumb(self, accelerometer_data_object):
+ """Remove gravity along plumb vector to given accelerometer data."""
+
+ return TobiiData.Accelerometer(accelerometer_data_object.value - self.__plumb)
diff --git a/src/argaze/TobiiGlassesPro2/__init__.py b/src/argaze/TobiiGlassesPro2/__init__.py
index 4fc65bb..754e5a1 100644
--- a/src/argaze/TobiiGlassesPro2/__init__.py
+++ b/src/argaze/TobiiGlassesPro2/__init__.py
@@ -2,4 +2,4 @@
.. include:: README.md
"""
__docformat__ = "restructuredtext"
-__all__ = ['TobiiEntities', 'TobiiNetworkInterface', 'TobiiController', 'TobiiData', 'TobiiVideo', 'TobiiSpecifications'] \ No newline at end of file
+__all__ = ['TobiiEntities', 'TobiiNetworkInterface', 'TobiiController', 'TobiiData', 'TobiiVideo', 'TobiiSpecifications', 'TobiiInertialMeasureUnit'] \ No newline at end of file
diff --git a/src/argaze/utils/tobii_imu_calibrate.py b/src/argaze/utils/tobii_imu_calibrate.py
new file mode 100644
index 0000000..85472d9
--- /dev/null
+++ b/src/argaze/utils/tobii_imu_calibrate.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+
+import argparse
+import os
+import time
+
+from argaze import DataStructures
+from argaze.TobiiGlassesPro2 import TobiiController, TobiiInertialMeasureUnit
+from argaze.utils import MiscFeatures
+
+import numpy
+import matplotlib.pyplot as mpyplot
+import matplotlib.patches as mpatches
+
+def main():
+ """
+ Calibrate Tobbi gyroscope and accelerometer sensors and finally outputs camera calibration data into a .json file.
+
+ ### Reference:
+ - [Inertial Measure Unit calibration tutorial](https://makersportal.com/blog/calibration-of-an-inertial-measurement-unit-imu-with-raspberry-pi-part-ii)
+ """
+
+ # manage arguments
+ parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
+ parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
+ parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath')
+ parser.add_argument('-n', '--sample_number', metavar='BUFFER_SIZE', type=int, default=500, help='number of samples to store into calibration buffer')
+ parser.add_argument('-o', '--output', metavar='OUT', type=str, default='imu.json', help='destination filepath')
+ args = parser.parse_args()
+
+ # Create tobii controller (with auto discovery network process if no ip argument is provided)
+ print("Looking for a Tobii Glasses Pro 2 device ...")
+
+ try:
+
+ tobii_controller = TobiiController.TobiiController(args.tobii_ip)
+ print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
+
+ except ConnectionError as e:
+
+ print(e)
+ exit()
+
+ # Create tobii imu handler
+ tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit()
+
+ # Load optional imu calibration file
+ if args.imu_calibration != None:
+
+ tobii_imu.load_calibration_file(args.imu_calibration)
+
+ # Enable tobii data stream
+ tobii_data_stream = tobii_controller.enable_data_stream()
+
+ # Menu loop
+ try:
+
+ while True:
+
+ print('-' * 52)
+ menu_input = input('Tobii Inertial Measure Unit sensor calibration menu:\n\t\'a\' for accelerometer calibration.\n\t\'A\' for accelerometer visualisation.\n\t\'g\' for gyroscope calibration.\n\t\'G\' for gyroscope visualisation.\n\t\'p\' print current calibration.\n\t\'s\' save calibration.\n\t\'q\' quit calibration without saving.\n>')
+
+ match menu_input:
+
+ case 'a':
+
+ axis = ['X', 'Y', 'Z']
+ directions = ['upward', 'downward', 'perpendicular']
+
+ for i, axis in enumerate(axis):
+
+ print(f'\nACCELEROMETER {axis} AXIS CALIBRATION')
+
+ axis_buffers = {}
+
+ for j, direction in enumerate(directions):
+
+ input(f'\nKeep Tobii Glasses accelerometer {axis} axis {direction} then press \'Enter\' to start data acquisition.\n')
+
+ # Initialise progress bar
+ MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
+
+ # Capture accelerometer data stream
+ data_ts_buffer = DataStructures.TimeStampedBuffer()
+ for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number):
+
+ # Update progress Bar
+ MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
+
+ axis_buffers[direction] = data_ts_buffer
+
+ tobii_imu.calibrate_accelerometer_axis_coefficients(i, axis_buffers['upward'], axis_buffers['downward'], axis_buffers['perpendicular'])
+
+ accelerometer_coefficients = tobii_imu.get_accelerometer_coefficients()
+
+ print(f'\n\nAccelerometer optimal linear fit coefficients over {progress} values for each axis:')
+ print('\tX coefficients: ', accelerometer_coefficients[0])
+ print('\tY coefficients: ', accelerometer_coefficients[1])
+ print('\tZ coefficients: ', accelerometer_coefficients[2])
+
+ case 'A':
+
+ print('\nCAPTURE AND PLOT ACCELEROMETER STREAM')
+
+ # Initialise progress bar
+ MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
+
+ # Capture accelerometer data stream
+ data_ts_buffer = DataStructures.TimeStampedBuffer()
+ for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number):
+
+ # Update progress Bar
+ MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
+
+ # Edit figure
+ figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
+ data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
+ figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144)
+
+ # Plot data
+ subplot = figure.add_subplot(111)
+ subplot.set_title('Accelerometer', loc='left')
+ patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
+ subplot.legend(handles=patches, loc='upper left')
+
+ # Display figure
+ mpyplot.show()
+ figure.clear()
+
+ case 'g':
+
+ print('\nGYROSCOPE CALIBRATION')
+ input('Keep Tobii Glasses steady then press \'Enter\' to start data acquisition.\n')
+
+ # Initialise progress bar
+ MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
+
+ # Capture gyroscope data stream
+ data_ts_buffer = DataStructures.TimeStampedBuffer()
+ for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number):
+
+ # Update progress Bar
+ MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
+
+ gyroscope_offset = tobii_imu.calibrate_gyroscope_offset(data_ts_buffer)
+
+ print(f'\n\nGyroscope average over {progress} values for each axis:')
+ print('\tX offset: ', gyroscope_offset[0])
+ print('\tY offset: ', gyroscope_offset[1])
+ print('\tZ offset: ', gyroscope_offset[2])
+
+ case 'G':
+
+ print('\nCAPTURE AND PLOT GYROSCOPE STREAM')
+
+ # Initialise progress bar
+ MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
+
+ # Capture accelerometer data stream
+ data_ts_buffer = DataStructures.TimeStampedBuffer()
+ for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number):
+
+ # Update progress Bar
+ MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
+
+ # Edit figure
+ figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
+ data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
+ figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144)
+
+ # Plot data
+ subplot = figure.add_subplot(111)
+ subplot.set_title('Gyroscope', loc='left')
+ patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
+ subplot.legend(handles=patches, loc='upper left')
+
+ # Display figure
+ mpyplot.show()
+ figure.clear()
+
+ case 'p':
+
+ gyroscope_offset = tobii_imu.get_gyroscope_offset()
+
+ print(f'\nGyroscope offset for each axis:')
+ print('\tX offset: ', gyroscope_offset[0])
+ print('\tY offset: ', gyroscope_offset[1])
+ print('\tZ offset: ', gyroscope_offset[2])
+
+ accelerometer_coefficients = tobii_imu.get_accelerometer_coefficients()
+
+ print(f'\nAccelerometer optimal linear fit coefficients for each axis:')
+ print('\tX coefficients: ', accelerometer_coefficients[0])
+ print('\tY coefficients: ', accelerometer_coefficients[1])
+ print('\tZ coefficients: ', accelerometer_coefficients[2])
+
+ case 's':
+
+ tobii_imu.save_calibration_file(args.output)
+ print(f'\nCalibration data exported into {args.output} file')
+
+ break
+
+ case 'q':
+
+ break
+
+ # exit on 'ctrl+C' interruption
+ except KeyboardInterrupt:
+ pass
+
+if __name__ == '__main__':
+
+ main()
diff --git a/src/argaze/utils/tobii_stream_arcube_display.py b/src/argaze/utils/tobii_stream_arcube_display.py
new file mode 100644
index 0000000..a910c41
--- /dev/null
+++ b/src/argaze/utils/tobii_stream_arcube_display.py
@@ -0,0 +1,302 @@
+#!/usr/bin/env python
+
+import argparse
+import os, json
+import math
+
+from argaze import DataStructures
+from argaze import GazeFeatures
+from argaze.TobiiGlassesPro2 import *
+from argaze.ArUcoMarkers import *
+from argaze.AreaOfInterest import *
+from argaze.utils import MiscFeatures
+
+import cv2 as cv
+import numpy
+
+def make_rotation_matrix(x, y, z):
+
+ # Create rotation matrix around x axis
+ c = numpy.cos(numpy.deg2rad(x))
+ s = numpy.sin(numpy.deg2rad(x))
+ Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]])
+
+ # Create rotation matrix around y axis
+ c = numpy.cos(numpy.deg2rad(y))
+ s = numpy.sin(numpy.deg2rad(y))
+ Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]])
+
+ # Create rotation matrix around z axis
+ c = numpy.cos(numpy.deg2rad(z))
+ s = numpy.sin(numpy.deg2rad(z))
+ Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])
+
+ # Return intrinsic rotation matrix
+ return Rx.dot(Ry.dot(Rz))
+
+def main():
+ """
+ Track ArUcoCube into Tobii Glasses Pro 2 camera video stream.
+ """
+
+ # Manage arguments
+ parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
+ parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
+ parser.add_argument('-c', '--camera_calibration', metavar='CAM_CALIB', type=str, default=None, help='json camera calibration filepath')
+ parser.add_argument('-p', '--aruco_tracker_configuration', metavar='TRACK_CONFIG', type=str, default=None, help='json aruco tracker configuration filepath')
+ parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath')
+ parser.add_argument('-ac', '--aruco_cube', metavar='ARUCO_CUBE', type=str, help='json aruco cube description filepath')
+ parser.add_argument('-s', '--aoi_scene', metavar='AOI_SCENE', type=str, help='obj aoi 3D scene description filepath')
+ parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction)
+ args = parser.parse_args()
+
+ # Create tobii controller (with auto discovery network process if no ip argument is provided)
+ print('\nLooking for a Tobii Glasses Pro 2 device ...')
+
+ try:
+
+ tobii_controller = TobiiController.TobiiController(args.tobii_ip)
+ print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
+
+ except ConnectionError as e:
+
+ print(e)
+ exit()
+
+ # Enable tobii data stream
+ tobii_data_stream = tobii_controller.enable_data_stream()
+
+ # Enable tobii video stream
+ tobii_video_stream = tobii_controller.enable_video_stream()
+
+ # Load aruco cube description
+ aruco_cube = ArUcoCube.ArUcoCube(args.aruco_cube)
+ aruco_cube.print_cache()
+
+ # Load AOI 3D scene centered onto aruco cube
+ aoi3D_scene = AOI3DScene.AOI3DScene()
+ aoi3D_scene.load(args.aoi_scene)
+
+ print(f'\nAOI in {os.path.basename(args.aoi_scene)} scene related to ArCube:')
+ for aoi in aoi3D_scene.keys():
+ print(f'\t{aoi}')
+
+ # Create aruco camera
+ aruco_camera = ArUcoCamera.ArUcoCamera()
+
+ # Load calibration file
+ if args.camera_calibration != None:
+
+ aruco_camera.load_calibration_file(args.camera_calibration)
+
+ else:
+
+ raise UserWarning('.json camera calibration filepath required. Use -c option.')
+
+ # Create aruco tracker
+ aruco_tracker = ArUcoTracker.ArUcoTracker(aruco_cube.dictionary, aruco_cube.marker_size, aruco_camera)
+
+ # Load specific configuration file
+ if args.aruco_tracker_configuration != None:
+
+ aruco_tracker.load_configuration_file(args.aruco_tracker_configuration)
+
+ print(f'\nArUcoTracker configuration for markers detection:')
+ aruco_tracker.print_configuration()
+
+ # Create tobii imu handler to track head pose changes when arcuco cube pose can't be estimated
+ # So, the resulting head pose is relative to last pose estimation
+ tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit()
+
+ # Load optional imu calibration file
+ if args.imu_calibration != None:
+
+ tobii_imu.load_calibration_file(args.imu_calibration)
+
+ # Init data timestamped in millisecond
+ data_ts_ms = 0
+
+ # Assess temporal performance
+ loop_chrono = MiscFeatures.TimeProbe()
+ loop_ps = 0
+
+ def data_stream_callback(data_ts, data_object, data_object_type):
+
+ nonlocal tobii_imu
+ nonlocal data_ts_ms
+
+ data_ts_ms = data_ts / 1e3
+
+ match data_object_type:
+
+ case 'Gyroscope':
+
+ data_object = tobii_imu.apply_gyroscope_offset(data_object)
+
+ tobii_imu.update_rotation(data_ts, data_object)
+
+ case 'Accelerometer':
+
+ print('raw accelerometer=', data_object.value)
+
+ data_object = tobii_imu.apply_accelerometer_coefficients(data_object)
+
+ print('corrected accelerometer=', data_object.value)
+
+ print('current plumb=', tobii_imu.get_plumb())
+
+ data_object = tobii_imu.apply_plumb(data_object)
+
+ print('corrected accelerometer - gravity=', data_object.value)
+
+ tobii_imu.update_translation(data_ts, data_object)
+
+ tobii_data_stream.reading_callback = data_stream_callback
+
+ # Start streaming
+ tobii_controller.start_streaming()
+
+ # Live video stream capture loop
+ try:
+
+ # Assess loop performance
+ loop_chrono = MiscFeatures.TimeProbe()
+ fps = 0
+
+ # Track aruco cube pose
+ aruco_cube_tvec = numpy.zeros(3)
+ aruco_cube_rvec = numpy.zeros(3)
+ aruco_cube_success = False
+ aruco_cube_validity = False
+ aruco_cube_ts_ms = 0
+
+ while tobii_video_stream.is_alive():
+
+ # Read video stream
+ video_ts, video_frame = tobii_video_stream.read()
+ video_ts_ms = video_ts / 1e3
+
+ # Copy video frame to edit visualisation on it without disrupting aruco tracking
+ visu_frame = video_frame.copy()
+
+ # Process video and data frame
+ try:
+
+ # Track markers with pose estimation
+ aruco_tracker.track(video_frame.matrix)
+
+ # Estimate cube pose from tracked markers
+ tvec, rvec, success, validity = aruco_cube.estimate_pose(aruco_tracker.get_tracked_markers())
+
+ # Cube pose estimation succeed and is validated by 2 faces at least
+ if success and validity >= 2:
+
+ # Reset head rotation, translation and translation speed (cm/s)
+ # Note : head translation speed is also estimated thanks to accelerometer sensor (see upward)
+ tobii_imu.reset_rotation()
+ tobii_imu.reset_translation(translation_speed = (tvec - aruco_cube_tvec) / (video_ts_ms - aruco_cube_ts_ms))
+
+ # Update head plumb orientation
+ tobii_imu.rotate_plumb(rvec)
+
+ # Store cube pose
+ aruco_cube_tvec = tvec
+ aruco_cube_rvec = rvec
+ aruco_cube_success = success
+ aruco_cube_validity = validity
+ aruco_cube_ts_ms = video_ts_ms
+
+ # Cube pose estimation fails
+ elif aruco_cube_success:
+
+ # Use tobii glasses inertial sensors to estimate cube pose from last estimated pose
+
+ # Translate cube according head translation
+ head_translation_speed, head_translation = tobii_imu.get_translation()
+ new_tvec = aruco_cube_tvec + head_translation
+
+ #if numpy.linalg.norm(head_rotation) > 0:
+ # print(f'X={head_rotation[0]:3f}, Y={head_rotation[1]:3f}, Z={head_rotation[2]:3f}')
+
+ # Rotate cube around origin according head rotation
+ R = make_rotation_matrix(*tobii_imu.get_rotation())
+
+ # rotate tvec ???
+ #new_tvec = aruco_cube_tvec.dot(R.T)
+
+ # rotate rvec
+ C, _ = cv.Rodrigues(aruco_cube_rvec)
+ C = C.dot(R)
+ new_rvec, _ = cv.Rodrigues(C)
+
+ # Set cube pose estimation
+ aruco_cube.set_pose(tvec = new_tvec, rvec = new_rvec)
+
+ else:
+
+ raise UserWarning('Cube pose estimation fails.')
+
+ # Project AOI 3 scene onto camera frame
+
+ # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it
+ # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable.
+ aoi2D_scene = aoi3D_scene.project(aruco_cube_tvec, aruco_cube_rvec, aruco_camera.get_K())
+
+ # Draw projected scene
+ aoi2D_scene.draw(visu_frame.matrix)
+
+ # Draw markers pose estimation
+ aruco_tracker.draw_tracked_markers(visu_frame.matrix)
+
+ # Draw cube pose estimation (without camera distorsion)
+ aruco_cube.draw(visu_frame.matrix, aruco_camera.get_K(), (0, 0, 0, 0))
+
+ # Warn about cube pose validity
+ if not aruco_cube_validity:
+
+ raise UserWarning('Cube pose estimation is not validated.')
+
+ # Write warning
+ except UserWarning as w:
+
+ cv.rectangle(visu_frame.matrix, (0, 100), (600, 150), (127, 127, 127), -1)
+ cv.putText(visu_frame.matrix, str(w), (20, 140), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
+
+ # Assess loop performance
+ lap_time, lap_counter, elapsed_time = loop_chrono.lap()
+
+ # Update fps each 10 loops
+ if lap_counter >= 10:
+
+ loop_ps = 1e3 * lap_counter / elapsed_time
+ loop_chrono.restart()
+
+ # Draw center
+ cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1)
+ cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1)
+
+ # Write stream timing
+ cv.rectangle(visu_frame.matrix, (0, 0), (1100, 50), (63, 63, 63), -1)
+ cv.putText(visu_frame.matrix, f'Data stream time: {int(data_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'Video delay: {int(data_ts_ms - video_ts_ms)} ms', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'Fps: {int(loop_ps)}', (950, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+
+ cv.imshow(f'Stream ArUcoCube', visu_frame.matrix)
+
+ # Close window using 'Esc' key
+ if cv.waitKey(1) == 27:
+ break
+
+ # Exit on 'ctrl+C' interruption
+ except KeyboardInterrupt:
+ pass
+
+ # Stop frame display
+ cv.destroyAllWindows()
+
+ # Stop streaming
+ tobii_controller.stop_streaming()
+
+if __name__ == '__main__':
+
+ main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_stream_display.py b/src/argaze/utils/tobii_stream_display.py
index 09360ef..f437de9 100644
--- a/src/argaze/utils/tobii_stream_display.py
+++ b/src/argaze/utils/tobii_stream_display.py
@@ -17,6 +17,7 @@ def main():
# Manage arguments
parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
+ parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath')
args = parser.parse_args()
@@ -39,8 +40,16 @@ def main():
# Enable tobii video stream
tobii_video_stream = tobii_controller.enable_video_stream()
- # Init head movement
- head_movement_px = numpy.array((0, 0))
+ # Create tobii imu handler
+ tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit()
+
+ # Load optional imu calibration file
+ if args.imu_calibration != None:
+
+ tobii_imu.load_calibration_file(args.imu_calibration)
+
+ # Init head rotation speed
+ head_rotation_speed = numpy.array((0, 0, 0))
# Init gaze position
gaze_position_px = GazeFeatures.GazePosition((0, 0))
@@ -59,7 +68,7 @@ def main():
def data_stream_callback(data_ts, data_object, data_object_type):
- nonlocal head_movement_px
+ nonlocal head_rotation_speed
nonlocal gaze_position_px
nonlocal data_ts_ms
nonlocal gyroscope_chrono
@@ -74,9 +83,8 @@ def main():
# Assess gyroscope stream performance
gyroscope_chrono.lap()
- # Calculate head movement considering only head yaw and pitch
- head_movement = numpy.array(data_object.value)
- head_movement_px = head_movement.astype(int)
+ # Apply imu calibration
+ head_rotation_speed = tobii_imu.apply_gyroscope_offset(data_object).value.astype(int) * 5
case 'GazePosition':
@@ -131,8 +139,8 @@ def main():
gaze_ps = 1e3 * lap_counter / elapsed_time
gaze_chrono.restart()
- # Draw head movement
- cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_movement_px[1], int(video_frame.height/2) - head_movement_px[0]), (150, 150, 150), 3)
+ # Draw head rotation speed considering only yaw and pitch values
+ cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_rotation_speed[1], int(video_frame.height/2) - head_rotation_speed[0]), (150, 150, 150), 3)
# Draw gaze
gaze_position_px.draw(video_frame.matrix)