aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2022-10-19 10:44:23 +0200
committerThéo de la Hogue2022-10-19 10:44:23 +0200
commit1f544431d0ed2a874cd77c8d034e5f4978e9c92e (patch)
tree0c50858dbc13b7dbe693228c61e5efc5d65eaf8d
parent26455e3b0ca43a2000e0807b3ccfc81e99828348 (diff)
downloadargaze-1f544431d0ed2a874cd77c8d034e5f4978e9c92e.zip
argaze-1f544431d0ed2a874cd77c8d034e5f4978e9c92e.tar.gz
argaze-1f544431d0ed2a874cd77c8d034e5f4978e9c92e.tar.bz2
argaze-1f544431d0ed2a874cd77c8d034e5f4978e9c92e.tar.xz
Adding a first workaround ArCube concept.
-rw-r--r--src/argaze/utils/tobii_stream_arcube_display.py420
1 files changed, 420 insertions, 0 deletions
diff --git a/src/argaze/utils/tobii_stream_arcube_display.py b/src/argaze/utils/tobii_stream_arcube_display.py
new file mode 100644
index 0000000..c4a2930
--- /dev/null
+++ b/src/argaze/utils/tobii_stream_arcube_display.py
@@ -0,0 +1,420 @@
+#!/usr/bin/env python
+
+import argparse
+import os, json
+
+from argaze import DataStructures
+from argaze import GazeFeatures
+from argaze.TobiiGlassesPro2 import *
+from argaze.ArUcoMarkers import *
+from argaze.AreaOfInterest import *
+from argaze.utils import MiscFeatures
+
+import cv2 as cv
+import numpy
+import math
+import itertools
+
+def isRotationMatrix(R):
+ """Checks if a matrix is a valid rotation matrix."""
+
+ I = numpy.identity(3, dtype = R.dtype)
+ return numpy.linalg.norm(I - numpy.dot(R.T, R)) < 1e-6
+
+def draw_axis(img, rvec, tvec, K):
+
+ points = numpy.float32([[6, 0, 0], [0, 6, 0], [0, 0, 6], [0, 0, 0]]).reshape(-1, 3)
+ axisPoints, _ = cv.projectPoints(points, rvec, tvec, K, (0, 0, 0, 0))
+ axisPoints = axisPoints.astype(int)
+
+ img = cv.line(img, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (255,0,0), 5)
+ img = cv.line(img, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (0,255,0), 5)
+ img = cv.line(img, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (0,0,255), 5)
+
+ return img
+
+def main():
+ """
+ Track ArCube into Tobii Glasses Pro 2 camera video stream.
+ """
+
+ # Manage arguments
+ parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
+ parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
+ parser.add_argument('-c', '--camera_calibration', metavar='CAM_CALIB', type=str, default=None, help='json camera calibration filepath')
+ parser.add_argument('-p', '--aruco_tracker_configuration', metavar='TRACK_CONFIG', type=str, default=None, help='json aruco tracker configuration filepath')
+ parser.add_argument('-ac', '--arcube', metavar='ARCUBE', type=str, help='json arcube description filepath')
+ parser.add_argument('-to', '--tolerance', metavar='TOLERANCE', type=float, default=1, help='arcube face pose estimation tolerance')
+ parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction)
+ args = parser.parse_args()
+
+ # Load ArCube json description
+ with open(args.arcube) as arcube_file:
+ arcube = json.load(arcube_file)
+
+ # Process each face translation vector to speed up further calculations
+ arcube_size = arcube['size']
+ for face, distances in arcube['translations'].items():
+
+ # Create translation vector
+ T = numpy.array([distances['x'], distances['y'], distances['z']]) * arcube_size / 2
+
+ # Store translation vector
+ arcube['translations'][face]['vector'] = T
+
+ print(f'*** {face}')
+ print('translation vector:')
+ print(T)
+
+ # Process each face rotation matrix to speed up further calculations
+ for face, angles in arcube['rotations'].items():
+
+ # Create rotation matrix around x axis
+ c = numpy.cos(numpy.deg2rad(angles['x']))
+ s = numpy.sin(numpy.deg2rad(angles['x']))
+ Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]])
+
+ # Create rotation matrix around y axis
+ c = numpy.cos(numpy.deg2rad(angles['y']))
+ s = numpy.sin(numpy.deg2rad(angles['y']))
+ Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]])
+
+ # Create rotation matrix around z axis
+ c = numpy.cos(numpy.deg2rad(angles['z']))
+ s = numpy.sin(numpy.deg2rad(angles['z']))
+ Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])
+
+ # Create intrinsic rotation matrix
+ R = Rx.dot(Ry.dot(Rz))
+
+ assert(isRotationMatrix(R))
+
+ # Store rotation matrix
+ arcube['rotations'][face]['matrix'] = R
+
+ print(f'*** {face}')
+ print('rotation matrix:')
+ print(R)
+
+ # Process each axis-angle face combination to speed up further calculations
+ for (A_face, A_item), (B_face, B_item) in itertools.combinations(arcube['rotations'].items(), 2):
+
+ print(f'** {A_face} > {B_face}')
+
+ A = A_item['matrix']
+ B = B_item['matrix']
+
+ # Rotation matrix from A face to B face
+ AB = B.dot(A.T)
+
+ assert(isRotationMatrix(AB))
+
+ # Calculate axis-angle representation of AB rotation matrix
+ angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2))
+
+ arcube['rotations'][A_face][B_face] = angle
+
+ print('rotation angle:')
+ print(angle)
+
+ # Manage ArCube markers id to track
+ arcube_ids = list(arcube['markers']['ids'].values())
+ arcube_dictionary = arcube['markers']['dictionary']
+ print(f'Track Aruco markers {arcube_ids} from the {arcube_dictionary} dictionary')
+
+ # Create tobii controller (with auto discovery network process if no ip argument is provided)
+ print("Looking for a Tobii Glasses Pro 2 device ...")
+
+ try:
+
+ tobii_controller = TobiiController.TobiiController(args.tobii_ip)
+ print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
+
+ except ConnectionError as e:
+
+ print(e)
+ exit()
+
+ # Enable tobii data stream
+ tobii_data_stream = tobii_controller.enable_data_stream()
+
+ # Enable tobii video stream
+ tobii_video_stream = tobii_controller.enable_video_stream()
+
+ # Create aruco camera
+ aruco_camera = ArUcoCamera.ArUcoCamera()
+
+ # Load calibration file
+ if args.camera_calibration != None:
+
+ aruco_camera.load_calibration_file(args.camera_calibration)
+
+ else:
+
+ raise UserWarning('.json camera calibration filepath required. Use -c option.')
+
+ # Create aruco tracker
+ aruco_tracker = ArUcoTracker.ArUcoTracker(arcube['markers']['dictionary'], arcube['markers']['size'], aruco_camera)
+
+ # Load specific configuration file
+ if args.aruco_tracker_configuration != None:
+
+ aruco_tracker.load_configuration_file(args.aruco_tracker_configuration)
+
+ print(f'ArUcoTracker configuration for {aruco_tracker.get_markers_dictionay().get_markers_format()} markers detection:')
+ aruco_tracker.print_configuration()
+
+ # Init head pose tracking
+ head_translation = numpy.array((0, 0, 0))
+ head_rotation = numpy.array((0, 0, 0))
+
+ # Init data timestamped in millisecond
+ data_ts_ms = 0
+
+ # Assess temporal performance
+ loop_chrono = MiscFeatures.TimeProbe()
+ loop_ps = 0
+
+ def data_stream_callback(data_ts, data_object, data_object_type):
+
+ nonlocal head_translation
+ nonlocal head_rotation
+ nonlocal data_ts_ms
+
+ data_ts_ms = data_ts / 1e3
+
+ match data_object_type:
+
+ case 'Accelerometer':
+
+ # Integrate head translation over time
+ #head_translation += numpy.array(data_object.value)
+ pass
+
+ case 'Gyroscope':
+
+ # Integrate head rotation over time
+ #head_rotation += numpy.array(data_object.value)
+ pass
+
+ tobii_data_stream.reading_callback = data_stream_callback
+
+ # Start streaming
+ tobii_controller.start_streaming()
+
+ # Live video stream capture loop
+ try:
+
+ # Assess loop performance
+ loop_chrono = MiscFeatures.TimeProbe()
+ fps = 0
+
+ while tobii_video_stream.is_alive():
+
+ # Read video stream
+ video_ts, video_frame = tobii_video_stream.read()
+ video_ts_ms = video_ts / 1e3
+
+ # Copy video frame to edit visualisation on it without disrupting aruco tracking
+ visu_frame = video_frame.copy()
+
+ # Process video and data frame
+ try:
+
+ # Track markers with pose estimation and draw them
+ aruco_tracker.track(video_frame.matrix)
+ #aruco_tracker.draw(visu_frame.matrix)
+
+ # Pose can't be estimated from markers
+ if aruco_tracker.get_markers_number() == 0:
+
+ raise UserWarning('No marker detected')
+
+ # Look for ArCube's faces among tracked markers and store their pose
+ arcube_tracked_faces = {}
+ for (face, marker_id) in arcube['markers']['ids'].items():
+
+ try:
+ marker_index = aruco_tracker.get_marker_index(marker_id)
+
+ arcube_tracked_faces[face] = {}
+ arcube_tracked_faces[face]['rotation'] = aruco_tracker.get_marker_rotation(marker_index)
+ arcube_tracked_faces[face]['translation'] = aruco_tracker.get_marker_translation(marker_index)[0]
+
+ except ValueError:
+ continue
+
+ print('-------------- ArCube pose estimation --------------')
+
+ # Pose validity check is'nt possible when only one face of the cube is tracked
+ if len(arcube_tracked_faces.keys()) == 1:
+
+ # Get arcube pose from to the unique face pose
+ face, pose = arcube_tracked_faces.popitem()
+
+ # Transform face rotation into cube rotation vector
+ F, _ = cv.Rodrigues(pose['rotation'])
+ R = arcube['rotations'][face]['matrix']
+ arcube_rvec, _ = cv.Rodrigues(F.dot(R))
+
+ # Transform face translation into cube translation vector
+ OF = pose['translation']
+ T = arcube['translations'][face]['vector']
+ FC = F.dot(R.dot(T))
+
+ arcube_tvec = OF + FC
+
+ print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
+ print(f'arcube rotation vector: {arcube_rvec[0][0]:3f} {arcube_rvec[1][0]:3f} {arcube_rvec[2][0]:3f}')
+ print(f'arcube translation vector: {arcube_tvec[0]:3f} {arcube_tvec[1]:3f} {arcube_tvec[2]:3f}')
+
+ draw_axis(visu_frame.matrix, arcube_rvec, arcube_tvec, aruco_camera.get_K())
+
+ # Check faces pose validity two by two
+ else:
+
+ arcube_valid_faces = []
+ arcube_valid_rvecs = []
+ arcube_valid_tvecs = []
+
+ for (A_face, A_pose), (B_face, B_pose) in itertools.combinations(arcube_tracked_faces.items(), 2):
+
+ #print(f'** {A_face} > {B_face}')
+
+ # Get face rotation estimation
+ # Use rotation matrix instead of rotation vector
+ A, _ = cv.Rodrigues(A_pose['rotation'])
+ B, _ = cv.Rodrigues(B_pose['rotation'])
+
+ # Rotation matrix from A face to B face
+ AB = B.dot(A.T)
+
+ assert(isRotationMatrix(AB))
+
+ # Calculate axis-angles representation of AB rotation matrix
+ angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2))
+
+ #print('rotation angle:')
+ #print(angle)
+
+ try:
+ expected_angle = arcube['rotations'][A_face][B_face]
+
+ except KeyError:
+ expected_angle = arcube['rotations'][B_face][A_face]
+
+ #print('expected angle:')
+ #print(expected_angle)
+
+ # Check angle according given tolerance then normalise face rotation
+ if math.isclose(angle, expected_angle, abs_tol=args.tolerance):
+
+ if A_face not in arcube_valid_faces:
+
+ # Remember this face is already validated
+ arcube_valid_faces.append(A_face)
+
+ # Transform face rotation into cube rotation vector
+ R = arcube['rotations'][A_face]['matrix']
+ rvec, _ = cv.Rodrigues(A.dot(R))
+
+ #print(f'{A_face} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}')
+
+ # Transform face translation into cube translation vector
+ OA = A_pose['translation']
+ T = arcube['translations'][A_face]['vector']
+ AC = A.dot(R.dot(T))
+
+ tvec = OA + AC
+
+ #print(f'{A_face} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}')
+
+ # Store normalised face pose
+ arcube_valid_rvecs.append(rvec)
+ arcube_valid_tvecs.append(tvec)
+
+ if B_face not in arcube_valid_faces:
+
+ # Remember this face is already validated
+ arcube_valid_faces.append(B_face)
+
+ # Normalise face rotation
+ R = arcube['rotations'][B_face]['matrix']
+ rvec, _ = cv.Rodrigues(B.dot(R))
+
+ #print(f'{B_face} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}')
+
+ # Normalise face translation
+ OB = B_pose['translation']
+ T = arcube['translations'][B_face]['vector']
+ BC = B.dot(R.dot(T))
+
+ tvec = OB + BC
+
+ #print(f'{B_face} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}')
+
+ # Store normalised face pose
+ arcube_valid_rvecs.append(rvec)
+ arcube_valid_tvecs.append(tvec)
+
+ if len(arcube_valid_faces) > 1:
+
+ # Consider arcube rotation as the mean of all valid translations
+ # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before.
+ arcube_rvec = numpy.mean(numpy.array(arcube_valid_rvecs), axis=0)
+
+ # Consider arcube translation as the mean of all valid translations
+ arcube_tvec = numpy.mean(numpy.array(arcube_valid_tvecs), axis=0)
+
+ print(':::::::::::::::::::::::::::::::::::::::::::::::::::')
+ print(f'arcube rotation vector: {arcube_rvec[0][0]:3f} {arcube_rvec[1][0]:3f} {arcube_rvec[2][0]:3f}')
+ print(f'arcube translation vector: {arcube_tvec[0]:3f} {arcube_tvec[1]:3f} {arcube_tvec[2]:3f}')
+
+ draw_axis(visu_frame.matrix, arcube_rvec, arcube_tvec, aruco_camera.get_K())
+
+ print('----------------------------------------------------')
+
+ # Write warning
+ except UserWarning as w:
+
+ cv.rectangle(visu_frame.matrix, (0, 100), (500, 150), (127, 127, 127), -1)
+ cv.putText(visu_frame.matrix, str(w), (20, 140), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
+
+ # Assess loop performance
+ lap_time, lap_counter, elapsed_time = loop_chrono.lap()
+
+ # Update fps each 10 loops
+ if lap_counter >= 10:
+
+ loop_ps = 1e3 * lap_counter / elapsed_time
+ loop_chrono.restart()
+
+ # Draw center
+ cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1)
+ cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1)
+
+ # Write stream timing
+ cv.rectangle(visu_frame.matrix, (0, 0), (1100, 50), (63, 63, 63), -1)
+ cv.putText(visu_frame.matrix, f'Data stream time: {int(data_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'Video delay: {int(data_ts_ms - video_ts_ms)} ms', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'Fps: {int(loop_ps)}', (950, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+
+ cv.imshow(f'Stream ArCube', visu_frame.matrix)
+
+ # Close window using 'Esc' key
+ if cv.waitKey(1) == 27:
+ break
+
+ # Exit on 'ctrl+C' interruption
+ except KeyboardInterrupt:
+ pass
+
+ # Stop frame display
+ cv.destroyAllWindows()
+
+ # Stop streaming
+ tobii_controller.stop_streaming()
+
+if __name__ == '__main__':
+
+ main() \ No newline at end of file