diff options
-rw-r--r-- | src/argaze/ArUcoMarkers/ArUcoCube.py | 401 | ||||
-rw-r--r-- | src/argaze/ArUcoMarkers/ArUcoMarker.py | 48 | ||||
-rw-r--r-- | src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py | 12 | ||||
-rw-r--r-- | src/argaze/ArUcoMarkers/ArUcoTracker.py | 229 | ||||
-rw-r--r-- | src/argaze/ArUcoMarkers/__init__.py | 2 | ||||
-rw-r--r-- | src/argaze/TobiiGlassesPro2/TobiiData.py | 145 | ||||
-rw-r--r-- | src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py | 236 | ||||
-rw-r--r-- | src/argaze/TobiiGlassesPro2/__init__.py | 2 | ||||
-rw-r--r-- | src/argaze/utils/tobii_imu_calibrate.py | 214 | ||||
-rw-r--r-- | src/argaze/utils/tobii_stream_arcube_display.py | 302 | ||||
-rw-r--r-- | src/argaze/utils/tobii_stream_display.py | 24 |
11 files changed, 1391 insertions, 224 deletions
diff --git a/src/argaze/ArUcoMarkers/ArUcoCube.py b/src/argaze/ArUcoMarkers/ArUcoCube.py new file mode 100644 index 0000000..e2983df --- /dev/null +++ b/src/argaze/ArUcoMarkers/ArUcoCube.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python + +from dataclasses import dataclass, field +import json +import math +import itertools + +from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker + +import numpy +import cv2 as cv +import cv2.aruco as aruco + +@dataclass +class ArUcoCubeFace(): + """Define cube face pose and marker.""" + + translation: numpy.array + """Position in cube referential.""" + + rotation: numpy.array + """Rotation in cube referential.""" + + marker: dict + """ArUco marker linked to the face """ + +@dataclass +class ArUcoCube(): + """Define a cube with ArUco markers on each face and estimate its pose.""" + + dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary + """ArUco dictionary of cube markers.""" + + marker_size: int = field(init=False) + """Size of markers in centimeter.""" + + edge_size: int = field(init=False) + """Size of the cube edges in centimeter.""" + + faces: dict = field(init=False, default_factory=dict) + """All named faces of the cube and their ArUco markers.""" + + angle_tolerance: float = field(init=False) + """Angle error tolerance allowed to validate face pose in degree.""" + + distance_tolerance: float = field(init=False) + """Distance error tolerance allowed to validate face pose in centimeter.""" + + def __init__(self, configuration_filepath): + """Define cube from a .json file.""" + + with open(configuration_filepath) as configuration_file: + + # Deserialize .json + # TODO find a better way + configuration = json.load(configuration_file) + + # Load dictionary + self.dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(configuration['dictionary']) + + # Load marker size + self.marker_size = configuration['marker_size'] + + # Load edge size + self.edge_size = configuration['edge_size'] + + # Load faces + self.faces = {} + for name, face in configuration['faces'].items(): + marker = ArUcoMarker.ArUcoMarker(self.dictionary, face['marker'], self.marker_size) + self.faces[name] = ArUcoCubeFace(numpy.array(face['translation']).astype(numpy.float32), numpy.array(face['rotation']).astype(numpy.float32), marker) + + # Load angle tolerance + self.angle_tolerance = configuration['angle_tolerance'] + + # Load distance tolerance + self.distance_tolerance = configuration['distance_tolerance'] + + # Init pose data + self.__translation = numpy.zeros(3) + self.__rotation = numpy.zeros(3) + self.__succeded = False + self.__validity = 0 + + # Process markers ids to speed up further calculations + self.__identifier_cache = {} + for name, face in self.faces.items(): + self.__identifier_cache[face.marker.identifier] = name + + # Process each face pose to speed up further calculations + self.__translation_cache = {} + for name, face in self.faces.items(): + self.__translation_cache[name] = face.translation * self.edge_size / 2 + + # Process each face rotation matrix to speed up further calculations + self.__rotation_cache = {} + for name, face in self.faces.items(): + + # Create intrinsic rotation matrix + R = self.__make_rotation_matrix(*face.rotation) + + assert(self.__is_rotation_matrix(R)) + + # Store rotation matrix + self.__rotation_cache[name] = R + + # Process each axis-angle face combination to speed up further calculations + self.__angle_cache = {} + for (A_name, A_face), (B_name, B_face) in itertools.combinations(self.faces.items(), 2): + + A = self.__rotation_cache[A_name] + B = self.__rotation_cache[B_name] + + # Rotation matrix from A face to B face + AB = B.dot(A.T) + + assert(self.__is_rotation_matrix(AB)) + + # Calculate axis-angle representation of AB rotation matrix + angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2)) + + try: + self.__angle_cache[A_name][B_name] = angle + except: + self.__angle_cache[A_name] = {B_name: angle} + + try: + self.__angle_cache[B_name][A_name] = angle + except: + self.__angle_cache[B_name] = {A_name: angle} + + # Process distance between face combination to speed up further calculations + self.__distance_cache = numpy.linalg.norm(numpy.array([0, 0, self.edge_size/2]) - numpy.array([0, self.edge_size/2, 0])) + + def print_cache(self): + """Print pre-processed data.""" + + print('\nIdentifier cache:') + for i, name in self.__identifier_cache.items(): + print(f'- {i}: {name}') + + print('\nTranslation cache:') + for name, item in self.__translation_cache.items(): + print(f'- {name}: {item}') + + print('\nRotation cache:') + for name, item in self.__rotation_cache.items(): + print(f'- {name}:\n{item}') + + print('\nAngle cache:') + for A_name, A_angle_cache in self.__angle_cache.items(): + for B_name, angle in A_angle_cache.items(): + print(f'- {A_name}/{B_name}: {angle:3f}') + + print(f'\nDistance cache: {self.__distance_cache}') + + def __make_rotation_matrix(self, x, y, z): + + # Create rotation matrix around x axis + c = numpy.cos(numpy.deg2rad(x)) + s = numpy.sin(numpy.deg2rad(x)) + Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) + + # Create rotation matrix around y axis + c = numpy.cos(numpy.deg2rad(y)) + s = numpy.sin(numpy.deg2rad(y)) + Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) + + # Create rotation matrix around z axis + c = numpy.cos(numpy.deg2rad(z)) + s = numpy.sin(numpy.deg2rad(z)) + Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) + + # Return intrinsic rotation matrix + return Rx.dot(Ry.dot(Rz)) + + def __is_rotation_matrix(self, R): + """Checks if a matrix is a valid rotation matrix.""" + + I = numpy.identity(3, dtype = R.dtype) + return numpy.linalg.norm(I - numpy.dot(R.T, R)) < 1e-6 + + def __normalise_face_pose(self, name, face, F): + + # Transform face rotation into cube rotation vector + R = self.__rotation_cache[name] + rvec, _ = cv.Rodrigues(F.dot(R)) + + #print(f'{name} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}') + + # Transform face translation into cube translation vector + OF = face.translation + T = self.__translation_cache[name] + FC = F.dot(R.dot(T)) + + tvec = OF + FC + + #print(f'{name} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}') + + return rvec, tvec + + def estimate_pose(self, tracked_markers): + + # Init pose data + self.__translation = numpy.zeros(3) + self.__rotation = numpy.zeros(3) + self.__succeded = False + self.__validity = 0 + + # Don't try to estimate pose if there is no tracked markers + if len(tracked_markers) == 0: + + return self.get_pose() + + # Look for faces related to tracked markers + tracked_faces = {} + for (marker_id, marker) in tracked_markers.items(): + + try: + name = self.__identifier_cache[marker_id] + tracked_faces[name] = marker + + except KeyError: + continue + + #print('-------------- ArUcoCube pose estimation --------------') + + # Pose validity checking is'nt possible when only one face of the cube is tracked + if len(tracked_faces.keys()) == 1: + + # Get arcube pose from to the unique face pose + name, face = tracked_faces.popitem() + F, _ = cv.Rodrigues(face.rotation) + + self.__rotation, self.__translation = self.__normalise_face_pose(name,face, F) + self.__succeded = True + self.__validity = 1 + + #print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!') + #print(f'arcube rotation vector: {self.__rotation[0][0]:3f} {self.__rotation[1][0]:3f} {self.__rotation[2][0]:3f}') + #print(f'arcube translation vector: {self.__translation[0]:3f} {self.__translation[1]:3f} {self.__translation[2]:3f}') + + # Pose validity checking processes faces two by two + else: + + valid_faces = [] + valid_rvecs = [] + valid_tvecs = [] + + for (A_name, A_face), (B_name, B_face) in itertools.combinations(tracked_faces.items(), 2): + + #print(f'** {A_name} > {B_name}') + + # Get face rotation estimation + # Use rotation matrix instead of rotation vector + A, _ = cv.Rodrigues(A_face.rotation) + B, _ = cv.Rodrigues(B_face.rotation) + + # Rotation matrix from A face to B face + AB = B.dot(A.T) + + assert(self.__is_rotation_matrix(AB)) + + # Calculate axis-angles representation of AB rotation matrix + angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2)) + + #print('rotation angle:') + #print(angle) + + expected_angle = self.__angle_cache[A_name][B_name] + + #print('expected angle:') + #print(expected_angle) + + # Calculate distance between A face center and B face center + distance = numpy.linalg.norm(A_face.translation - B_face.translation) + expected_distance = self.__distance_cache + + # Check angle and distance according given tolerance then normalise face pose + valid_angle = math.isclose(angle, expected_angle, abs_tol=self.angle_tolerance) + valid_distance = math.isclose(distance, expected_distance, abs_tol=self.distance_tolerance) + + if valid_angle and valid_distance: + + if A_name not in valid_faces: + + # Remember this face is already validated + valid_faces.append(A_name) + + rvec, tvec = self.__normalise_face_pose(A_name, A_face, A) + + # Store normalised face pose + valid_rvecs.append(rvec) + valid_tvecs.append(tvec) + + if B_name not in valid_faces: + + # Remember this face is already validated + valid_faces.append(B_name) + + rvec, tvec = self.__normalise_face_pose(B_name, B_face, B) + + # Store normalised face pose + valid_rvecs.append(rvec) + valid_tvecs.append(tvec) + + if len(valid_faces) > 1: + + # Consider arcube rotation as the mean of all valid translations + # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before. + self.__rotation = numpy.mean(numpy.array(valid_rvecs), axis=0) + + # Consider arcube translation as the mean of all valid translations + self.__translation = numpy.mean(numpy.array(valid_tvecs), axis=0) + + #print(':::::::::::::::::::::::::::::::::::::::::::::::::::') + #print(f'arcube rotation vector: {self.__rotation[0][0]:3f} {self.__rotation[1][0]:3f} {self.__rotation[2][0]:3f}') + #print(f'arcube translation vector: {self.__translation[0]:3f} {self.__translation[1]:3f} {self.__translation[2]:3f}') + + self.__succeded = True + self.__validity = len(valid_faces) + + #print('----------------------------------------------------') + + return self.get_pose() + + def get_pose(self): + + return self.__translation, self.__rotation, self.__succeded, self.__validity + + def set_pose(self, tvec = numpy.array([]), rvec = numpy.array([])): + + if tvec.size == 3: + self.__translation = tvec + + if rvec.size == 3: + self.__rotation = rvec + + self.__succeded = True + self.__validity = 0 + + def draw(self, frame, K, D): + + l = self.edge_size / 2 + ll = self.edge_size + + # Select color according validity score + n = 95 * self.__validity if self.__validity < 2 else 0 + f = 159 * self.__validity if self.__validity < 2 else 255 + + try: + + # Draw axis + axisPoints = numpy.float32([[ll, 0, 0], [0, ll, 0], [0, 0, ll], [0, 0, 0]]).reshape(-1, 3) + axisPoints, _ = cv.projectPoints(axisPoints, self.__rotation, self.__translation, K, D) + axisPoints = axisPoints.astype(int) + + frame = cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (n,n,f), 5) # X (red) + frame = cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (n,f,n), 5) # Y (green) + frame = cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (f,n,n), 5) # Z (blue) + + # Draw left face + leftPoints = numpy.float32([[-l, l, l], [-l, -l, l], [-l, -l, -l], [-l, l, -l]]).reshape(-1, 3) + leftPoints, _ = cv.projectPoints(leftPoints, self.__rotation, self.__translation, K, D) + leftPoints = leftPoints.astype(int) + + frame = cv.line(frame, tuple(leftPoints[0].ravel()), tuple(leftPoints[1].ravel()), (n,n,f), 2) + frame = cv.line(frame, tuple(leftPoints[1].ravel()), tuple(leftPoints[2].ravel()), (n,n,f), 2) + frame = cv.line(frame, tuple(leftPoints[2].ravel()), tuple(leftPoints[3].ravel()), (n,n,f), 2) + frame = cv.line(frame, tuple(leftPoints[3].ravel()), tuple(leftPoints[0].ravel()), (n,n,f), 2) + + # Draw top face + topPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, l, -l], [l, l, -l]]).reshape(-1, 3) + topPoints, _ = cv.projectPoints(topPoints, self.__rotation, self.__translation, K, D) + topPoints = topPoints.astype(int) + + frame = cv.line(frame, tuple(topPoints[0].ravel()), tuple(topPoints[1].ravel()), (n,f,n), 2) + frame = cv.line(frame, tuple(topPoints[1].ravel()), tuple(topPoints[2].ravel()), (n,f,n), 2) + frame = cv.line(frame, tuple(topPoints[2].ravel()), tuple(topPoints[3].ravel()), (n,f,n), 2) + frame = cv.line(frame, tuple(topPoints[3].ravel()), tuple(topPoints[0].ravel()), (n,f,n), 2) + + # Draw front face + frontPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, -l, l], [l, -l, l]]).reshape(-1, 3) + frontPoints, _ = cv.projectPoints(frontPoints, self.__rotation, self.__translation, K, D) + frontPoints = frontPoints.astype(int) + + frame = cv.line(frame, tuple(frontPoints[0].ravel()), tuple(frontPoints[1].ravel()), (f,n,n), 2) + frame = cv.line(frame, tuple(frontPoints[1].ravel()), tuple(frontPoints[2].ravel()), (f,n,n), 2) + frame = cv.line(frame, tuple(frontPoints[2].ravel()), tuple(frontPoints[3].ravel()), (f,n,n), 2) + frame = cv.line(frame, tuple(frontPoints[3].ravel()), tuple(frontPoints[0].ravel()), (f,n,n), 2) + + except Exception as e: + + print(e) + print(self.__translation) + print(self.__rotation) + print(self.__succeded) + print(self.__validity) + print(axisPoints) + + return frame diff --git a/src/argaze/ArUcoMarkers/ArUcoMarker.py b/src/argaze/ArUcoMarkers/ArUcoMarker.py new file mode 100644 index 0000000..4f716bb --- /dev/null +++ b/src/argaze/ArUcoMarkers/ArUcoMarker.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +from dataclasses import dataclass, field + +from argaze.ArUcoMarkers import ArUcoMarkersDictionary + +import numpy +import cv2 as cv +import cv2.aruco as aruco + +@dataclass +class ArUcoMarker(): + """Define ArUco marker class.""" + + dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary + """ """ + + identifier: int + """ """ + + size: float + """Size of marker in centimeters.""" + + corners: numpy.array = field(init=False, repr=False) + """Estimated 2D corner positions in camera image referential.""" + + translation: numpy.array = field(init=False, repr=False) + """Estimated 3D center position in camera referential.""" + + rotation: numpy.array = field(init=False, repr=False) + """Estimated 3D marker rotation in camera referential.""" + + points: numpy.array = field(init=False, repr=False) + """Estimated 3D corners positions in camera referential.""" + + def center(self, i): + """Get 2D center position in camera image referential.""" + return self.corners[0].mean(axis=0) + + def draw(self, frame, K, D): + """Draw marker in frame.""" + + # Draw marker axis if pose has been estimated + if self.translation.size == 3 and self.rotation.size == 3: + + cv.drawFrameAxes(frame, K, D, self.rotation, self.translation, self.size) + + aruco.drawDetectedMarkers(frame, [self.corners], numpy.array([self.identifier]))
\ No newline at end of file diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py index 58b55b8..84a78b9 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py @@ -32,12 +32,14 @@ all_aruco_markers_dictionaries = { class ArUcoMarkersDictionary(): """Handle an ArUco markers dictionary.""" - def __init__(self, aruco_dictionary_name): + def __init__(self, name): - if all_aruco_markers_dictionaries.get(aruco_dictionary_name, None) is None: - raise NameError(f'Bad ArUco markers dictionary name: {aruco_dictionary_name}') + if all_aruco_markers_dictionaries.get(name, None) is None: + raise NameError(f'Bad ArUco markers dictionary name: {name}') - dict_name_split = aruco_dictionary_name.split('_') + self.name = name + + dict_name_split = name.split('_') self.__format = dict_name_split[1] @@ -77,7 +79,7 @@ class ArUcoMarkersDictionary(): self.__number = int(dict_name_split[2]) - self.__aruco_dict = aruco.Dictionary_get(all_aruco_markers_dictionaries[aruco_dictionary_name]) + self.__aruco_dict = aruco.Dictionary_get(all_aruco_markers_dictionaries[self.name]) def create_marker(self, i, dpi=300): """Create a marker image.""" diff --git a/src/argaze/ArUcoMarkers/ArUcoTracker.py b/src/argaze/ArUcoMarkers/ArUcoTracker.py index b887812..f5b1715 100644 --- a/src/argaze/ArUcoMarkers/ArUcoTracker.py +++ b/src/argaze/ArUcoMarkers/ArUcoTracker.py @@ -3,7 +3,7 @@ import json from collections import Counter -from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoCamera +from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoCamera import numpy import cv2 as cv @@ -45,14 +45,14 @@ ArUcoTrackerParameters = [ class ArUcoTracker(): """Track ArUco markers into a frame.""" - def __init__(self, aruco_dictionary_name: str, marker_length: float, camera: ArUcoCamera.ArUcoCamera): + def __init__(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary, marker_size: float, camera: ArUcoCamera.ArUcoCamera): """Define which markers library to track and their size""" # load ArUco markers dictionary - self.__aruco_dict = ArUcoMarkersDictionary.ArUcoMarkersDictionary(aruco_dictionary_name) + self.__dictionary = dictionary # define marker length in centimeter - self.__marker_length = marker_length + self.__marker_size = marker_size # define camera self.__camera = camera @@ -62,19 +62,8 @@ class ArUcoTracker(): self.__detector_parameters.cornerRefinementMethod = aruco.CORNER_REFINE_CONTOUR # to get a better pose estimation self.__detector_parameters_loaded = {} - # define tracked markers data - self.__markers_corners = [] - self.__markers_ids = [] - self.__rvecs = [] - self.__tvecs = [] - self.__points = [] - - # define rejected markers data - self.__rejected_markers_corners = [] - self.__rejected_markers_ids = [] - self.__rejected_rvecs = [] - self.__rejected_tvecs = [] - self.__rejected_points = [] + # init tracked markers data + self.__tracked_markers = {} # define tracked board data self.__board = None @@ -84,7 +73,7 @@ class ArUcoTracker(): # define track metrics data self.__track_count = 0 - self.__tracked_markers = [] + self.__tracked_ids = [] self.__rejected_markers = [] def load_configuration_file(self, configuration_filepath): @@ -115,98 +104,56 @@ class ArUcoTracker(): def track(self, frame, estimate_pose = True, check_rotation = False): """Track ArUco markers in frame.""" + self.__tracked_markers = {} + markers_corners, markers_ids, markers_rvecs, markers_tvecs, markers_points = [], [], [], [], [] + # DON'T MIRROR FRAME : it makes the markers detection to fail - # detect markers from gray picture - gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) - self.__markers_corners, self.__markers_ids, rejectedPoints = aruco.detectMarkers(gray, self.__aruco_dict.get_markers(), parameters = self.__detector_parameters) - self.__rejected_markers_corners, __rejected_markers_ids = [], [] - - if len(self.__markers_corners) > 0 and estimate_pose: - - # markers pose estimation - self.__rvecs, self.__tvecs, self.__points = aruco.estimatePoseSingleMarkers(self.__markers_corners, self.__marker_length, self.__camera.get_K(), self.__camera.get_D()) - - # optional: check marker rotation as described in [this issue](https://github.com/opencv/opencv/issues/8813) - if check_rotation: - - valid_rotation_markers = [] - bad_rotation_markers = [] - for i, rvec in enumerate(self.__rvecs): - - tvec = self.__tvecs[i][0] - R, _ = cv.Rodrigues(rvec) - - zAxisPoint = (tvec.dot(R) + numpy.array([0., 0., 1.])).dot(R.T) - zAxis = zAxisPoint - tvec - - # TODO: How to describe the expected Z axis orientation ? - # In some situations, you can't provide such information. - - # !!! Here a description specific to SimOne cockpit !!! - zAxisExpectedDict = { - 1: numpy.array([0.5, 0.5, -1]), - 2: numpy.array([0.5, 0.5, -1]), - 3: numpy.array([1, -1, -1]), - 4: numpy.array([1, -1, -1]), - 5: numpy.array([1, -1, -1]), - 6: numpy.array([1, 1, -1]), - 7: numpy.array([1, 1, -1]), - 8: numpy.array([1, -1, -1]) - } - - zAxisExpected = zAxisExpectedDict[self.__markers_ids[i][0]] - - cosine_angle = numpy.dot(zAxis/numpy.linalg.norm(zAxis), zAxisExpected) - degree_angle = numpy.rad2deg(numpy.arccos(cosine_angle)) - ''' - print(self.__markers_ids[i][0]) - print('marker position: ', tvec) - print('zAxisPoint: ', zAxisPoint) - print('zAxis: ', zAxis) - print('zAxisExpected: ', zAxisExpected) - print('cosine_angle: ', cosine_angle) - print('degree_angle: ', degree_angle) - ''' - # Is the marker oriented as expected ? - if cosine_angle < 0 or cosine_angle > 1: - - #print('valid') - valid_rotation_markers.append(i) - - else: - - #print('bad') - bad_rotation_markers.append(i) - - # update track metrics - self.__rejected_markers.append(self.__markers_ids[i][0]) - - # keep markers with bad rotation - self.__rejected_markers_corners = tuple([self.__markers_corners[i] for i in bad_rotation_markers]) - self.__rejected_markers_ids = self.__markers_ids[bad_rotation_markers] - self.__rejected_rvecs = self.__rvecs[bad_rotation_markers] - self.__rejected_tvecs = self.__tvecs[bad_rotation_markers] - self.__rejected_points = self.__points[bad_rotation_markers] - - # keep markers with valid rotation - self.__markers_corners = tuple([self.__markers_corners[i] for i in valid_rotation_markers]) - self.__markers_ids = self.__markers_ids[valid_rotation_markers] - self.__rvecs = self.__rvecs[valid_rotation_markers] - self.__tvecs = self.__tvecs[valid_rotation_markers] - self.__points = self.__points[valid_rotation_markers] + # Track markers into gray picture + markers_corners, markers_ids, _ = aruco.detectMarkers(cv.cvtColor(frame, cv.COLOR_BGR2GRAY), self.__dictionary.get_markers(), parameters = self.__detector_parameters) - else: + if len(markers_corners) > 0: + + # Pose estimation is optional + if estimate_pose: + + markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(markers_corners, self.__marker_size, self.__camera.get_K(), self.__camera.get_D()) + + # Gather tracked markers data and update metrics + self.__track_count += 1 + + for i, marker_id in enumerate(markers_ids.T[0]): + + marker = ArUcoMarker.ArUcoMarker(self.__dictionary, marker_id, self.__marker_size) - self.__rvecs = [] - self.__tvecs = [] - self.__points = [] + marker.corners = markers_corners[i] - # update track metrics - self.__track_count += 1 - for marker_id in self.get_markers_ids(): - self.__tracked_markers.append(marker_id) + if estimate_pose: + marker.translation = markers_tvecs[i][0] + marker.rotation = markers_rvecs[i] + marker.points = markers_points[i] + self.__tracked_markers[marker_id] = marker + + self.__tracked_ids.append(marker_id) + + def get_tracked_markers(self): + """Access to tracked markers dictionary.""" + + return self.__tracked_markers + + def get_tracked_markers_number(self): + """Return tracked markers number.""" + + return len(list(self.__tracked_markers.keys())) + + def draw_tracked_markers(self, frame): + """Draw traked markers.""" + + for marker_id, marker in self.__tracked_markers.items(): + + marker.draw(frame, self.__camera.get_K(), self.__camera.get_D()) + def track_board(self, frame, board, expected_markers_number): """Track ArUco markers board in frame setting up the number of detected markers needed to agree detection.""" @@ -214,7 +161,7 @@ class ArUcoTracker(): # detect markers from gray picture gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) - self.__markers_corners, self.__markers_ids, rejectedPoints = aruco.detectMarkers(gray, self.__aruco_dict.get_markers(), parameters = self.__detector_parameters) + self.__markers_corners, self.__markers_ids, rejectedPoints = aruco.detectMarkers(gray, self.__dictionary.get_markers(), parameters = self.__detector_parameters) # if all board markers are detected if self.get_markers_number() == expected_markers_number: @@ -229,33 +176,6 @@ class ArUcoTracker(): self.__board_corners = [] self.__board_corners_ids = [] - def draw(self, frame): - """Draw tracked markers in frame.""" - - # draw detected markers square - if len(self.__markers_corners) > 0: - - # draw marker axis if pose has been estimated - if len(self.__rvecs) > 0: - - for (i, marker_id) in enumerate(self.__markers_ids): - - cv.drawFrameAxes(frame, self.__camera.get_K(), self.__camera.get_D(), self.__rvecs[i], self.__tvecs[i], self.__marker_length) - - aruco.drawDetectedMarkers(frame, self.__markers_corners, self.__markers_ids) - - # draw rejected markers square - if len(self.__rejected_markers_corners) > 0: - - # draw marker axis if pose has been estimated - if len(self.__rejected_rvecs) > 0: - - for (i, marker_id) in enumerate(self.__rejected_markers_ids): - - cv.drawFrameAxes(frame, self.__camera.get_K(), self.__camera.get_D(), self.__rejected_rvecs[i], self.__rejected_tvecs[i], self.__marker_length) - - aruco.drawDetectedMarkers(frame, self.__rejected_markers_corners, self.__rejected_markers_ids, borderColor=(0, 255, 255)) - def draw_board(self, frame): """Draw tracked board corners in frame.""" @@ -266,52 +186,11 @@ class ArUcoTracker(): def reset_track_metrics(self): """Enable marker tracking metrics.""" self.__track_count = 0 - self.__tracked_markers = [] + self.__tracked_ids = [] def get_track_metrics(self): """Get marker tracking metrics.""" - return self.__track_count, Counter(self.__tracked_markers), Counter(self.__rejected_markers) - - def get_markers_dictionay(self): - """Get tracked aruco markers dictionary.""" - return self.__aruco_dict - - def get_markers_number(self): - """Get tracked markers number.""" - return len(self.__markers_corners) - - def get_markers_ids(self): - """Get tracked markers identifers.""" - if self.__markers_ids is not None: - return [i[0] for i in self.__markers_ids] - else: - return [] - - def get_marker_index(self, marker_id): - """Retreive marker index of a given merker id. - Raise ValueError if not found.""" - - return list(self.__markers_ids).index(marker_id) - - def get_marker_corners(self, i): - """Get marker i corners.""" - return self.__markers_corners[i][0] - - def get_marker_center(self, i): - """Get marker i center coordinates.""" - return self.__markers_corners[i][0].mean(axis=0) - - def get_marker_rotation(self, i): - """Get marker i rotation vector.""" - return self.__rvecs[i] - - def get_marker_translation(self, i): - """Get marker i translation vector.""" - return self.__tvecs[i] - - def get_marker_points(self, i): - """Get marker i points.""" - return self.__points[i] + return self.__track_count, Counter(self.__tracked_ids), Counter(self.__rejected_markers) def get_board_corners_number(self): """Get tracked board corners number.""" diff --git a/src/argaze/ArUcoMarkers/__init__.py b/src/argaze/ArUcoMarkers/__init__.py index 28386d2..3a74eeb 100644 --- a/src/argaze/ArUcoMarkers/__init__.py +++ b/src/argaze/ArUcoMarkers/__init__.py @@ -2,4 +2,4 @@ .. include:: README.md """ __docformat__ = "restructuredtext" -__all__ = ['ArUcoMarkersDictionary', 'ArUcoBoard', 'ArUcoCamera', 'ArUcoTracker']
\ No newline at end of file +__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoCamera', 'ArUcoTracker', 'ArUcoCube']
\ No newline at end of file diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py index 95fa947..71ce234 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiData.py +++ b/src/argaze/TobiiGlassesPro2/TobiiData.py @@ -13,6 +13,8 @@ from argaze.TobiiGlassesPro2 import TobiiNetworkInterface from argaze.utils import MiscFeatures +import numpy + @dataclass class DirSig(): """Define dir sig data (dir sig).""" @@ -50,13 +52,13 @@ class Event(): class Accelerometer(): """Define accelerometer data (ac).""" - value: tuple((float, float, float)) + value: numpy.array @dataclass class Gyroscope(): """Define gyroscope data (gy).""" - value: tuple((float, float, float)) + value: numpy.array @dataclass class PupilCenter(): @@ -286,66 +288,96 @@ class TobiiDataSegment(): def get_path(self): return self.__segment_data_path -class TobiiDataStream(threading.Thread): +class TobiiDataStream(): """Capture Tobii Glasses Pro 2 data stream in separate thread.""" reading_callback = None def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface): - """Initialise thread super class as a deamon dedicated to data reception.""" + """Initialise network connection and prepare data reception.""" - threading.Thread.__init__(self) - threading.Thread.daemon = True - + # Enable network connection self.__network = network_interface self.__data_socket = self.__network.make_socket() + # Data reception + self.__data_thread = None self.__data_queue = queue.Queue(50) # TODO : set queue size according technical reason - - self.__stop_event = threading.Event() - self.__read_lock = threading.Lock() + self.__json_data_parser = TobiiJsonDataParser() + self.__first_ts = 0 - # prepare keep alive message + # Data capture + self.__data_stream_selector = '' + self.__data_ts_buffer = None + self.__data_ts_buffer_size = 0 + + # Keep connection alive self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}" self.__keep_alive_thread = threading.Thread(target = self.__keep_alive) self.__keep_alive_thread.daemon = True - - self.__json_data_parser = TobiiJsonDataParser() + self.__keep_alive_thread.start() def __del__(self): - """Stop data reception before destruction.""" + """Stop data reception and network connection before destruction.""" - if self.is_alive(): - - self.close() + if self.__data_thread != None: + + threading.Thread.join(self.__data_thread) + self.__data_thread = None + + threading.Thread.join(self.__keep_alive_thread) + + self.__data_socket.close() def __keep_alive(self): - """Maintain connection.""" + """Maintain network connection and clear socket when is not read.""" - while not self.__stop_event.isSet(): + while True: + + # if no thread is reading data socket + if self.__data_thread == None: - self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg) + self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg) - time.sleep(1) + clear_count = 0 + while clear_count < 1000 and self.__data_thread == None: + + # Clear socket each milli second + time.sleep(0.001) + self.__network.grab_data(self.__data_socket) + clear_count += 1 + + else: + + self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg) + time.sleep(1) def open(self): """Start data reception.""" - self.__first_ts = 0 - self.__keep_alive_thread.start() - threading.Thread.start(self) + if self.__data_thread == None: + + self.__first_ts = 0 + + self.__data_thread = threading.Thread(target = self.__run) + self.__data_thread.daemon = True + + self.__stop_event = threading.Event() + self.__read_lock = threading.Lock() + + self.__data_thread.start() def close(self): """Stop data reception definitively.""" - self.__stop_event.set() + if self.__data_thread != None: - threading.Thread.join(self.__keep_alive_thread) - threading.Thread.join(self) + self.__stop_event.set() - self.__data_socket.close() + threading.Thread.join(self.__data_thread) + self.__data_thread = None - def run(self): + def __run(self): """Managed received data for sync and async reading case. - Sync: send data to callback function. - Async: store data into a locked queue for further reading.""" @@ -454,14 +486,59 @@ class TobiiDataStream(threading.Thread): # convert timestamp data_ts = json_data.pop('ts') + # convert json data into data object + data_object = self.__json_data_parser.parse_data(status, json_data) + data_object_type = type(data_object).__name__ + # keep first timestamp to offset all timestamps if self.__first_ts == 0: self.__first_ts = data_ts - + data_ts -= self.__first_ts - # convert json data into data object - data_object = self.__json_data_parser.parse_data(status, json_data) - data_object_type = type(data_object).__name__ - return data_ts, data_object, data_object_type + + def __capture_callback(self, data_ts, data_object, data_object_type): + """Store incoming data into timestamped buffer""" + + if data_object_type == self.__data_stream_selector: + + if len(self.__data_ts_buffer.keys()) < self.__data_ts_buffer_size: + + # update first timestamp if next timestamps are negative + if data_ts < 0: + self.__first_ts += data_ts + data_ts = 0 + + self.__data_ts_buffer[data_ts] = data_object + + def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500): + """Capture a data stream into buffer.""" + + # Prepare for data acquisition + self.__data_stream_selector = data_object_type + self.__data_ts_buffer = data_ts_buffer + self.__data_ts_buffer_size = sample_number + + # Subscribe to data stream + memo_reading_callback = self.reading_callback + self.reading_callback = self.__capture_callback + + # Start data reception + self.open() + + # Share data acquisition progress + buffer_size = 0 + while buffer_size < sample_number: + + time.sleep(0.1) + + buffer_size = len(self.__data_ts_buffer.keys()) + + yield buffer_size + + # Stop data reception + self.close() + + # Unsubscribe to tobii data stream + self.reading_callback = memo_reading_callback diff --git a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py b/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py new file mode 100644 index 0000000..a3c2bbd --- /dev/null +++ b/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python + +import json +import time +import math + +from argaze import DataStructures +from argaze.TobiiGlassesPro2 import TobiiData + +import numpy +from scipy.optimize import curve_fit +import cv2 as cv + +EARTH_GRAVITY = -9.81 # m/s2 +EARTH_GRAVITY_VECTOR = [0, EARTH_GRAVITY, 0] + +class TobiiInertialMeasureUnit(): + """Ease Tobbi IMU data handling.""" + + def __init__(self): + """Define IMU calibration data.""" + + self.__gyroscope_offset = numpy.zeros(3) + self.__accelerometer_coefficients = numpy.array([[1., 0.], [1., 0.], [1., 0.]]) + + self.__plumb = numpy.array(EARTH_GRAVITY_VECTOR) + + self.reset_rotation() + self.reset_translation() + + def load_calibration_file(self, calibration_filepath): + """Load IMU calibration from a .json file.""" + + with open(calibration_filepath) as calibration_file: + + # Deserialize .json + # TODO find a better way + calibration_data = json.load(calibration_file) + + # Load calibration data + self.__gyroscope_offset = numpy.array(calibration_data['gyroscope_offset']) + self.__accelerometer_coefficients = numpy.array(calibration_data['accelerometer_coefficients']) + + def save_calibration_file(self, calibration_filepath): + """Save IMU calibration into .json file.""" + + calibration_data = { + 'gyroscope_offset': list(self.__gyroscope_offset), + 'accelerometer_coefficients': [list(self.__accelerometer_coefficients[0]), list(self.__accelerometer_coefficients[1]), list(self.__accelerometer_coefficients[2])] + } + + with open(calibration_filepath, 'w', encoding='utf-8') as calibration_file: + + json.dump(calibration_data, calibration_file, ensure_ascii=False, indent=4) + + def calibrate_gyroscope_offset(self, gyroscope_ts_buffer): + + # Consider gyroscope values without timestamps + gyroscope_values = [] + for ts, data_object in gyroscope_ts_buffer.items(): + gyroscope_values.append(data_object.value) + + # Calculate average value for each axis + gx_offset = numpy.mean(numpy.array(gyroscope_values)[:, 0]) + gy_offset = numpy.mean(numpy.array(gyroscope_values)[:, 1]) + gz_offset = numpy.mean(numpy.array(gyroscope_values)[:, 2]) + + # Store result + self.__gyroscope_offset = numpy.array([gx_offset, gy_offset, gz_offset]) + + return self.__gyroscope_offset + + def get_gyroscope_offset(self): + """Get gyroscope offset.""" + + return self.__gyroscope_offset + + def apply_gyroscope_offset(self, gyroscope_data_object): + """Remove gyroscope offset to given gyroscope data.""" + + return TobiiData.Gyroscope(gyroscope_data_object.value - self.__gyroscope_offset) + + def reset_rotation(self, ts = 0, gyroscope = numpy.zeros(3), rotation = numpy.zeros(3)): + """Reset rotation value before to start integration process.""" + + self.__last_gyroscope_ts = ts + self.__last_gyroscope = gyroscope + + self.__rotation = rotation + + def update_rotation(self, gyroscope_data_ts, gyroscope_data_object): + """Integrate timestamped gyroscope values to update rotation.""" + + # Convert deg/s into deg/ms + current_gyroscope = gyroscope_data_object.value * 1e-3 + + # Init gyroscope integration + if self.__last_gyroscope_ts == 0 and self.__last_gyroscope.all() == 0: + + self.reset_rotation(gyroscope_data_ts, current_gyroscope) + + # Calculate elapsed time in ms + delta_time = (gyroscope_data_ts - self.__last_gyroscope_ts) / 1e3 + + # Integrate gyroscope + self.__rotation += self.__last_gyroscope * delta_time + + # Store current as last + self.__last_gyroscope_ts = gyroscope_data_ts + self.__last_gyroscope = current_gyroscope + + def get_rotation(self): + """Return current rotation value.""" + + return self.__rotation + + def _accelerometer_linear_fit(self, x, a, b): + """Linear function for accelerometer axis correction.""" + return a * x + b + + def calibrate_accelerometer_axis_coefficients(self, axis, upward_ts_buffer, downward_ts_buffer, perpendicular_ts_buffer): + """Calibrate one accelerometer axis using three data set (upward/+1g, downward/-1g, perpendicular/0g) for linear fit.""" + + # Consider accelerometer axis values without timestamps + accelerometer_values = [] + expected_values = [] + + for (upward_ts, upward_data_object), (downward_ts, downward_data_object), (perpendicular_ts, perpendicular_data_object) in zip(upward_ts_buffer.items(), downward_ts_buffer.items(), perpendicular_ts_buffer.items()): + + accelerometer_values.append(upward_data_object.value[axis]) + expected_values.append(+EARTH_GRAVITY) + + accelerometer_values.append(downward_data_object.value[axis]) + expected_values.append(-EARTH_GRAVITY) + + accelerometer_values.append(perpendicular_data_object.value[axis]) + expected_values.append(0.0) + + # Find optimal coefficients according linear fit between accelerometer values and expected values + optimal_coefficients, _ = curve_fit(self._accelerometer_linear_fit, accelerometer_values, expected_values, maxfev = 10000) + + # Store results for the given axis + self.__accelerometer_coefficients[axis] = numpy.array(optimal_coefficients) + + def get_accelerometer_coefficients(self): + """Return accelerometer coefficients.""" + + return self.__accelerometer_coefficients + + def apply_accelerometer_coefficients(self, accelerometer_data_object): + """Add accelerometer offset to given accelerometer data.""" + + x = self._accelerometer_linear_fit(accelerometer_data_object.value[0], *self.__accelerometer_coefficients[0]) + y = self._accelerometer_linear_fit(accelerometer_data_object.value[1], *self.__accelerometer_coefficients[1]) + z = self._accelerometer_linear_fit(accelerometer_data_object.value[2], *self.__accelerometer_coefficients[2]) + + return TobiiData.Accelerometer(numpy.array([x, y , z])) + + def reset_translation(self, ts = 0, accelerometer = numpy.zeros(3), translation_speed = numpy.zeros(3), translation = numpy.zeros(3)): + """Reset translation value before to start integration process.""" + + print(f'> reset_translation: translation_speed={translation_speed}, translation={translation}') + + self.__last_accelerometer_ts = ts + self.__last_accelerometer = accelerometer + self.__last_translation_speed = translation_speed + + self.__translation_speed = translation_speed + self.__translation = translation + + print('\tself.__translation_speed(cm/s)=', self.__translation_speed) + print('\tself.__translation(cm)=', self.__translation) + + print('< reset_translation') + + def update_translation(self, accelerometer_data_ts, accelerometer_data_object): + """Integrate timestamped accelerometer values to update translation.""" + print('> update_translation: accelerometer_data_ts=', accelerometer_data_ts) + + # Convert m/s2 into cm/ms2 + current_accelerometer = accelerometer_data_object.value * 1e-4 + + print('\tcurrent_accelerometer(cm/ms2)=', current_accelerometer) + print('\tcurrent_accelerometer norm=', numpy.linalg.norm(current_accelerometer)) + + # Init accelerometer integration + if self.__last_accelerometer_ts == 0 and self.__last_accelerometer.all() == 0: + + self.reset_translation(ts = accelerometer_data_ts, accelerometer = current_accelerometer) + + # Calculate elapsed time in ms + delta_time = (accelerometer_data_ts - self.__last_accelerometer_ts) / 1e3 + + print('\tdelta_time=', delta_time) + + # Integrate accelerometer + self.__translation_speed += self.__last_accelerometer * delta_time + self.__translation += self.__last_translation_speed * delta_time + + print('\tself.__translation_speed(cm/s)=', self.__translation_speed) + print('\tself.__translation(cm)=', self.__translation) + + # Store current as last + self.__last_accelerometer = current_accelerometer + self.__last_accelerometer_ts = accelerometer_data_ts + self.__last_translation_speed = self.__translation_speed + + print('< update_translation') + + #else: + + # print('no valid head plumb') + + def get_translation(self): + """Return current translation speed and translation values.""" + + return self.__translation_speed, self.__translation + + def rotate_plumb(self, rvec): + """Rotate imu plumb to remove gravity effect in accelerometer data.""" + + C, _ = cv.Rodrigues(rvec) + self.__plumb = -C.dot(EARTH_GRAVITY_VECTOR) + + # Check plumb length + assert(math.isclose(numpy.linalg.norm(self.__plumb), math.fabs(EARTH_GRAVITY), abs_tol=1e-3)) + + def get_plumb(self): + """Return plumb vector.""" + + return self.__plumb + + def apply_plumb(self, accelerometer_data_object): + """Remove gravity along plumb vector to given accelerometer data.""" + + return TobiiData.Accelerometer(accelerometer_data_object.value - self.__plumb) diff --git a/src/argaze/TobiiGlassesPro2/__init__.py b/src/argaze/TobiiGlassesPro2/__init__.py index 4fc65bb..754e5a1 100644 --- a/src/argaze/TobiiGlassesPro2/__init__.py +++ b/src/argaze/TobiiGlassesPro2/__init__.py @@ -2,4 +2,4 @@ .. include:: README.md """ __docformat__ = "restructuredtext" -__all__ = ['TobiiEntities', 'TobiiNetworkInterface', 'TobiiController', 'TobiiData', 'TobiiVideo', 'TobiiSpecifications']
\ No newline at end of file +__all__ = ['TobiiEntities', 'TobiiNetworkInterface', 'TobiiController', 'TobiiData', 'TobiiVideo', 'TobiiSpecifications', 'TobiiInertialMeasureUnit']
\ No newline at end of file diff --git a/src/argaze/utils/tobii_imu_calibrate.py b/src/argaze/utils/tobii_imu_calibrate.py new file mode 100644 index 0000000..85472d9 --- /dev/null +++ b/src/argaze/utils/tobii_imu_calibrate.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python + +import argparse +import os +import time + +from argaze import DataStructures +from argaze.TobiiGlassesPro2 import TobiiController, TobiiInertialMeasureUnit +from argaze.utils import MiscFeatures + +import numpy +import matplotlib.pyplot as mpyplot +import matplotlib.patches as mpatches + +def main(): + """ + Calibrate Tobbi gyroscope and accelerometer sensors and finally outputs camera calibration data into a .json file. + + ### Reference: + - [Inertial Measure Unit calibration tutorial](https://makersportal.com/blog/calibration-of-an-inertial-measurement-unit-imu-with-raspberry-pi-part-ii) + """ + + # manage arguments + parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) + parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip') + parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath') + parser.add_argument('-n', '--sample_number', metavar='BUFFER_SIZE', type=int, default=500, help='number of samples to store into calibration buffer') + parser.add_argument('-o', '--output', metavar='OUT', type=str, default='imu.json', help='destination filepath') + args = parser.parse_args() + + # Create tobii controller (with auto discovery network process if no ip argument is provided) + print("Looking for a Tobii Glasses Pro 2 device ...") + + try: + + tobii_controller = TobiiController.TobiiController(args.tobii_ip) + print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') + + except ConnectionError as e: + + print(e) + exit() + + # Create tobii imu handler + tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit() + + # Load optional imu calibration file + if args.imu_calibration != None: + + tobii_imu.load_calibration_file(args.imu_calibration) + + # Enable tobii data stream + tobii_data_stream = tobii_controller.enable_data_stream() + + # Menu loop + try: + + while True: + + print('-' * 52) + menu_input = input('Tobii Inertial Measure Unit sensor calibration menu:\n\t\'a\' for accelerometer calibration.\n\t\'A\' for accelerometer visualisation.\n\t\'g\' for gyroscope calibration.\n\t\'G\' for gyroscope visualisation.\n\t\'p\' print current calibration.\n\t\'s\' save calibration.\n\t\'q\' quit calibration without saving.\n>') + + match menu_input: + + case 'a': + + axis = ['X', 'Y', 'Z'] + directions = ['upward', 'downward', 'perpendicular'] + + for i, axis in enumerate(axis): + + print(f'\nACCELEROMETER {axis} AXIS CALIBRATION') + + axis_buffers = {} + + for j, direction in enumerate(directions): + + input(f'\nKeep Tobii Glasses accelerometer {axis} axis {direction} then press \'Enter\' to start data acquisition.\n') + + # Initialise progress bar + MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) + + # Capture accelerometer data stream + data_ts_buffer = DataStructures.TimeStampedBuffer() + for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number): + + # Update progress Bar + MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) + + axis_buffers[direction] = data_ts_buffer + + tobii_imu.calibrate_accelerometer_axis_coefficients(i, axis_buffers['upward'], axis_buffers['downward'], axis_buffers['perpendicular']) + + accelerometer_coefficients = tobii_imu.get_accelerometer_coefficients() + + print(f'\n\nAccelerometer optimal linear fit coefficients over {progress} values for each axis:') + print('\tX coefficients: ', accelerometer_coefficients[0]) + print('\tY coefficients: ', accelerometer_coefficients[1]) + print('\tZ coefficients: ', accelerometer_coefficients[2]) + + case 'A': + + print('\nCAPTURE AND PLOT ACCELEROMETER STREAM') + + # Initialise progress bar + MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) + + # Capture accelerometer data stream + data_ts_buffer = DataStructures.TimeStampedBuffer() + for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number): + + # Update progress Bar + MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) + + # Edit figure + figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels + data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max + figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144) + + # Plot data + subplot = figure.add_subplot(111) + subplot.set_title('Accelerometer', loc='left') + patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample) + subplot.legend(handles=patches, loc='upper left') + + # Display figure + mpyplot.show() + figure.clear() + + case 'g': + + print('\nGYROSCOPE CALIBRATION') + input('Keep Tobii Glasses steady then press \'Enter\' to start data acquisition.\n') + + # Initialise progress bar + MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) + + # Capture gyroscope data stream + data_ts_buffer = DataStructures.TimeStampedBuffer() + for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number): + + # Update progress Bar + MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) + + gyroscope_offset = tobii_imu.calibrate_gyroscope_offset(data_ts_buffer) + + print(f'\n\nGyroscope average over {progress} values for each axis:') + print('\tX offset: ', gyroscope_offset[0]) + print('\tY offset: ', gyroscope_offset[1]) + print('\tZ offset: ', gyroscope_offset[2]) + + case 'G': + + print('\nCAPTURE AND PLOT GYROSCOPE STREAM') + + # Initialise progress bar + MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) + + # Capture accelerometer data stream + data_ts_buffer = DataStructures.TimeStampedBuffer() + for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number): + + # Update progress Bar + MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) + + # Edit figure + figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels + data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max + figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144) + + # Plot data + subplot = figure.add_subplot(111) + subplot.set_title('Gyroscope', loc='left') + patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample) + subplot.legend(handles=patches, loc='upper left') + + # Display figure + mpyplot.show() + figure.clear() + + case 'p': + + gyroscope_offset = tobii_imu.get_gyroscope_offset() + + print(f'\nGyroscope offset for each axis:') + print('\tX offset: ', gyroscope_offset[0]) + print('\tY offset: ', gyroscope_offset[1]) + print('\tZ offset: ', gyroscope_offset[2]) + + accelerometer_coefficients = tobii_imu.get_accelerometer_coefficients() + + print(f'\nAccelerometer optimal linear fit coefficients for each axis:') + print('\tX coefficients: ', accelerometer_coefficients[0]) + print('\tY coefficients: ', accelerometer_coefficients[1]) + print('\tZ coefficients: ', accelerometer_coefficients[2]) + + case 's': + + tobii_imu.save_calibration_file(args.output) + print(f'\nCalibration data exported into {args.output} file') + + break + + case 'q': + + break + + # exit on 'ctrl+C' interruption + except KeyboardInterrupt: + pass + +if __name__ == '__main__': + + main() diff --git a/src/argaze/utils/tobii_stream_arcube_display.py b/src/argaze/utils/tobii_stream_arcube_display.py new file mode 100644 index 0000000..a910c41 --- /dev/null +++ b/src/argaze/utils/tobii_stream_arcube_display.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python + +import argparse +import os, json +import math + +from argaze import DataStructures +from argaze import GazeFeatures +from argaze.TobiiGlassesPro2 import * +from argaze.ArUcoMarkers import * +from argaze.AreaOfInterest import * +from argaze.utils import MiscFeatures + +import cv2 as cv +import numpy + +def make_rotation_matrix(x, y, z): + + # Create rotation matrix around x axis + c = numpy.cos(numpy.deg2rad(x)) + s = numpy.sin(numpy.deg2rad(x)) + Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) + + # Create rotation matrix around y axis + c = numpy.cos(numpy.deg2rad(y)) + s = numpy.sin(numpy.deg2rad(y)) + Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) + + # Create rotation matrix around z axis + c = numpy.cos(numpy.deg2rad(z)) + s = numpy.sin(numpy.deg2rad(z)) + Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) + + # Return intrinsic rotation matrix + return Rx.dot(Ry.dot(Rz)) + +def main(): + """ + Track ArUcoCube into Tobii Glasses Pro 2 camera video stream. + """ + + # Manage arguments + parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) + parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip') + parser.add_argument('-c', '--camera_calibration', metavar='CAM_CALIB', type=str, default=None, help='json camera calibration filepath') + parser.add_argument('-p', '--aruco_tracker_configuration', metavar='TRACK_CONFIG', type=str, default=None, help='json aruco tracker configuration filepath') + parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath') + parser.add_argument('-ac', '--aruco_cube', metavar='ARUCO_CUBE', type=str, help='json aruco cube description filepath') + parser.add_argument('-s', '--aoi_scene', metavar='AOI_SCENE', type=str, help='obj aoi 3D scene description filepath') + parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction) + args = parser.parse_args() + + # Create tobii controller (with auto discovery network process if no ip argument is provided) + print('\nLooking for a Tobii Glasses Pro 2 device ...') + + try: + + tobii_controller = TobiiController.TobiiController(args.tobii_ip) + print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') + + except ConnectionError as e: + + print(e) + exit() + + # Enable tobii data stream + tobii_data_stream = tobii_controller.enable_data_stream() + + # Enable tobii video stream + tobii_video_stream = tobii_controller.enable_video_stream() + + # Load aruco cube description + aruco_cube = ArUcoCube.ArUcoCube(args.aruco_cube) + aruco_cube.print_cache() + + # Load AOI 3D scene centered onto aruco cube + aoi3D_scene = AOI3DScene.AOI3DScene() + aoi3D_scene.load(args.aoi_scene) + + print(f'\nAOI in {os.path.basename(args.aoi_scene)} scene related to ArCube:') + for aoi in aoi3D_scene.keys(): + print(f'\t{aoi}') + + # Create aruco camera + aruco_camera = ArUcoCamera.ArUcoCamera() + + # Load calibration file + if args.camera_calibration != None: + + aruco_camera.load_calibration_file(args.camera_calibration) + + else: + + raise UserWarning('.json camera calibration filepath required. Use -c option.') + + # Create aruco tracker + aruco_tracker = ArUcoTracker.ArUcoTracker(aruco_cube.dictionary, aruco_cube.marker_size, aruco_camera) + + # Load specific configuration file + if args.aruco_tracker_configuration != None: + + aruco_tracker.load_configuration_file(args.aruco_tracker_configuration) + + print(f'\nArUcoTracker configuration for markers detection:') + aruco_tracker.print_configuration() + + # Create tobii imu handler to track head pose changes when arcuco cube pose can't be estimated + # So, the resulting head pose is relative to last pose estimation + tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit() + + # Load optional imu calibration file + if args.imu_calibration != None: + + tobii_imu.load_calibration_file(args.imu_calibration) + + # Init data timestamped in millisecond + data_ts_ms = 0 + + # Assess temporal performance + loop_chrono = MiscFeatures.TimeProbe() + loop_ps = 0 + + def data_stream_callback(data_ts, data_object, data_object_type): + + nonlocal tobii_imu + nonlocal data_ts_ms + + data_ts_ms = data_ts / 1e3 + + match data_object_type: + + case 'Gyroscope': + + data_object = tobii_imu.apply_gyroscope_offset(data_object) + + tobii_imu.update_rotation(data_ts, data_object) + + case 'Accelerometer': + + print('raw accelerometer=', data_object.value) + + data_object = tobii_imu.apply_accelerometer_coefficients(data_object) + + print('corrected accelerometer=', data_object.value) + + print('current plumb=', tobii_imu.get_plumb()) + + data_object = tobii_imu.apply_plumb(data_object) + + print('corrected accelerometer - gravity=', data_object.value) + + tobii_imu.update_translation(data_ts, data_object) + + tobii_data_stream.reading_callback = data_stream_callback + + # Start streaming + tobii_controller.start_streaming() + + # Live video stream capture loop + try: + + # Assess loop performance + loop_chrono = MiscFeatures.TimeProbe() + fps = 0 + + # Track aruco cube pose + aruco_cube_tvec = numpy.zeros(3) + aruco_cube_rvec = numpy.zeros(3) + aruco_cube_success = False + aruco_cube_validity = False + aruco_cube_ts_ms = 0 + + while tobii_video_stream.is_alive(): + + # Read video stream + video_ts, video_frame = tobii_video_stream.read() + video_ts_ms = video_ts / 1e3 + + # Copy video frame to edit visualisation on it without disrupting aruco tracking + visu_frame = video_frame.copy() + + # Process video and data frame + try: + + # Track markers with pose estimation + aruco_tracker.track(video_frame.matrix) + + # Estimate cube pose from tracked markers + tvec, rvec, success, validity = aruco_cube.estimate_pose(aruco_tracker.get_tracked_markers()) + + # Cube pose estimation succeed and is validated by 2 faces at least + if success and validity >= 2: + + # Reset head rotation, translation and translation speed (cm/s) + # Note : head translation speed is also estimated thanks to accelerometer sensor (see upward) + tobii_imu.reset_rotation() + tobii_imu.reset_translation(translation_speed = (tvec - aruco_cube_tvec) / (video_ts_ms - aruco_cube_ts_ms)) + + # Update head plumb orientation + tobii_imu.rotate_plumb(rvec) + + # Store cube pose + aruco_cube_tvec = tvec + aruco_cube_rvec = rvec + aruco_cube_success = success + aruco_cube_validity = validity + aruco_cube_ts_ms = video_ts_ms + + # Cube pose estimation fails + elif aruco_cube_success: + + # Use tobii glasses inertial sensors to estimate cube pose from last estimated pose + + # Translate cube according head translation + head_translation_speed, head_translation = tobii_imu.get_translation() + new_tvec = aruco_cube_tvec + head_translation + + #if numpy.linalg.norm(head_rotation) > 0: + # print(f'X={head_rotation[0]:3f}, Y={head_rotation[1]:3f}, Z={head_rotation[2]:3f}') + + # Rotate cube around origin according head rotation + R = make_rotation_matrix(*tobii_imu.get_rotation()) + + # rotate tvec ??? + #new_tvec = aruco_cube_tvec.dot(R.T) + + # rotate rvec + C, _ = cv.Rodrigues(aruco_cube_rvec) + C = C.dot(R) + new_rvec, _ = cv.Rodrigues(C) + + # Set cube pose estimation + aruco_cube.set_pose(tvec = new_tvec, rvec = new_rvec) + + else: + + raise UserWarning('Cube pose estimation fails.') + + # Project AOI 3 scene onto camera frame + + # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it + # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable. + aoi2D_scene = aoi3D_scene.project(aruco_cube_tvec, aruco_cube_rvec, aruco_camera.get_K()) + + # Draw projected scene + aoi2D_scene.draw(visu_frame.matrix) + + # Draw markers pose estimation + aruco_tracker.draw_tracked_markers(visu_frame.matrix) + + # Draw cube pose estimation (without camera distorsion) + aruco_cube.draw(visu_frame.matrix, aruco_camera.get_K(), (0, 0, 0, 0)) + + # Warn about cube pose validity + if not aruco_cube_validity: + + raise UserWarning('Cube pose estimation is not validated.') + + # Write warning + except UserWarning as w: + + cv.rectangle(visu_frame.matrix, (0, 100), (600, 150), (127, 127, 127), -1) + cv.putText(visu_frame.matrix, str(w), (20, 140), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) + + # Assess loop performance + lap_time, lap_counter, elapsed_time = loop_chrono.lap() + + # Update fps each 10 loops + if lap_counter >= 10: + + loop_ps = 1e3 * lap_counter / elapsed_time + loop_chrono.restart() + + # Draw center + cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1) + cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1) + + # Write stream timing + cv.rectangle(visu_frame.matrix, (0, 0), (1100, 50), (63, 63, 63), -1) + cv.putText(visu_frame.matrix, f'Data stream time: {int(data_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) + cv.putText(visu_frame.matrix, f'Video delay: {int(data_ts_ms - video_ts_ms)} ms', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) + cv.putText(visu_frame.matrix, f'Fps: {int(loop_ps)}', (950, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) + + cv.imshow(f'Stream ArUcoCube', visu_frame.matrix) + + # Close window using 'Esc' key + if cv.waitKey(1) == 27: + break + + # Exit on 'ctrl+C' interruption + except KeyboardInterrupt: + pass + + # Stop frame display + cv.destroyAllWindows() + + # Stop streaming + tobii_controller.stop_streaming() + +if __name__ == '__main__': + + main()
\ No newline at end of file diff --git a/src/argaze/utils/tobii_stream_display.py b/src/argaze/utils/tobii_stream_display.py index 09360ef..f437de9 100644 --- a/src/argaze/utils/tobii_stream_display.py +++ b/src/argaze/utils/tobii_stream_display.py @@ -17,6 +17,7 @@ def main(): # Manage arguments parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip') + parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath') args = parser.parse_args() @@ -39,8 +40,16 @@ def main(): # Enable tobii video stream tobii_video_stream = tobii_controller.enable_video_stream() - # Init head movement - head_movement_px = numpy.array((0, 0)) + # Create tobii imu handler + tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit() + + # Load optional imu calibration file + if args.imu_calibration != None: + + tobii_imu.load_calibration_file(args.imu_calibration) + + # Init head rotation speed + head_rotation_speed = numpy.array((0, 0, 0)) # Init gaze position gaze_position_px = GazeFeatures.GazePosition((0, 0)) @@ -59,7 +68,7 @@ def main(): def data_stream_callback(data_ts, data_object, data_object_type): - nonlocal head_movement_px + nonlocal head_rotation_speed nonlocal gaze_position_px nonlocal data_ts_ms nonlocal gyroscope_chrono @@ -74,9 +83,8 @@ def main(): # Assess gyroscope stream performance gyroscope_chrono.lap() - # Calculate head movement considering only head yaw and pitch - head_movement = numpy.array(data_object.value) - head_movement_px = head_movement.astype(int) + # Apply imu calibration + head_rotation_speed = tobii_imu.apply_gyroscope_offset(data_object).value.astype(int) * 5 case 'GazePosition': @@ -131,8 +139,8 @@ def main(): gaze_ps = 1e3 * lap_counter / elapsed_time gaze_chrono.restart() - # Draw head movement - cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_movement_px[1], int(video_frame.height/2) - head_movement_px[0]), (150, 150, 150), 3) + # Draw head rotation speed considering only yaw and pitch values + cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_rotation_speed[1], int(video_frame.height/2) - head_rotation_speed[0]), (150, 150, 150), 3) # Draw gaze gaze_position_px.draw(video_frame.matrix) |