From 1f544431d0ed2a874cd77c8d034e5f4978e9c92e Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 19 Oct 2022 10:44:23 +0200 Subject: Adding a first workaround ArCube concept. --- src/argaze/utils/tobii_stream_arcube_display.py | 420 ++++++++++++++++++++++++ 1 file changed, 420 insertions(+) create mode 100644 src/argaze/utils/tobii_stream_arcube_display.py diff --git a/src/argaze/utils/tobii_stream_arcube_display.py b/src/argaze/utils/tobii_stream_arcube_display.py new file mode 100644 index 0000000..c4a2930 --- /dev/null +++ b/src/argaze/utils/tobii_stream_arcube_display.py @@ -0,0 +1,420 @@ +#!/usr/bin/env python + +import argparse +import os, json + +from argaze import DataStructures +from argaze import GazeFeatures +from argaze.TobiiGlassesPro2 import * +from argaze.ArUcoMarkers import * +from argaze.AreaOfInterest import * +from argaze.utils import MiscFeatures + +import cv2 as cv +import numpy +import math +import itertools + +def isRotationMatrix(R): + """Checks if a matrix is a valid rotation matrix.""" + + I = numpy.identity(3, dtype = R.dtype) + return numpy.linalg.norm(I - numpy.dot(R.T, R)) < 1e-6 + +def draw_axis(img, rvec, tvec, K): + + points = numpy.float32([[6, 0, 0], [0, 6, 0], [0, 0, 6], [0, 0, 0]]).reshape(-1, 3) + axisPoints, _ = cv.projectPoints(points, rvec, tvec, K, (0, 0, 0, 0)) + axisPoints = axisPoints.astype(int) + + img = cv.line(img, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (255,0,0), 5) + img = cv.line(img, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (0,255,0), 5) + img = cv.line(img, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (0,0,255), 5) + + return img + +def main(): + """ + Track ArCube into Tobii Glasses Pro 2 camera video stream. + """ + + # Manage arguments + parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) + parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip') + parser.add_argument('-c', '--camera_calibration', metavar='CAM_CALIB', type=str, default=None, help='json camera calibration filepath') + parser.add_argument('-p', '--aruco_tracker_configuration', metavar='TRACK_CONFIG', type=str, default=None, help='json aruco tracker configuration filepath') + parser.add_argument('-ac', '--arcube', metavar='ARCUBE', type=str, help='json arcube description filepath') + parser.add_argument('-to', '--tolerance', metavar='TOLERANCE', type=float, default=1, help='arcube face pose estimation tolerance') + parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction) + args = parser.parse_args() + + # Load ArCube json description + with open(args.arcube) as arcube_file: + arcube = json.load(arcube_file) + + # Process each face translation vector to speed up further calculations + arcube_size = arcube['size'] + for face, distances in arcube['translations'].items(): + + # Create translation vector + T = numpy.array([distances['x'], distances['y'], distances['z']]) * arcube_size / 2 + + # Store translation vector + arcube['translations'][face]['vector'] = T + + print(f'*** {face}') + print('translation vector:') + print(T) + + # Process each face rotation matrix to speed up further calculations + for face, angles in arcube['rotations'].items(): + + # Create rotation matrix around x axis + c = numpy.cos(numpy.deg2rad(angles['x'])) + s = numpy.sin(numpy.deg2rad(angles['x'])) + Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) + + # Create rotation matrix around y axis + c = numpy.cos(numpy.deg2rad(angles['y'])) + s = numpy.sin(numpy.deg2rad(angles['y'])) + Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) + + # Create rotation matrix around z axis + c = numpy.cos(numpy.deg2rad(angles['z'])) + s = numpy.sin(numpy.deg2rad(angles['z'])) + Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) + + # Create intrinsic rotation matrix + R = Rx.dot(Ry.dot(Rz)) + + assert(isRotationMatrix(R)) + + # Store rotation matrix + arcube['rotations'][face]['matrix'] = R + + print(f'*** {face}') + print('rotation matrix:') + print(R) + + # Process each axis-angle face combination to speed up further calculations + for (A_face, A_item), (B_face, B_item) in itertools.combinations(arcube['rotations'].items(), 2): + + print(f'** {A_face} > {B_face}') + + A = A_item['matrix'] + B = B_item['matrix'] + + # Rotation matrix from A face to B face + AB = B.dot(A.T) + + assert(isRotationMatrix(AB)) + + # Calculate axis-angle representation of AB rotation matrix + angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2)) + + arcube['rotations'][A_face][B_face] = angle + + print('rotation angle:') + print(angle) + + # Manage ArCube markers id to track + arcube_ids = list(arcube['markers']['ids'].values()) + arcube_dictionary = arcube['markers']['dictionary'] + print(f'Track Aruco markers {arcube_ids} from the {arcube_dictionary} dictionary') + + # Create tobii controller (with auto discovery network process if no ip argument is provided) + print("Looking for a Tobii Glasses Pro 2 device ...") + + try: + + tobii_controller = TobiiController.TobiiController(args.tobii_ip) + print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') + + except ConnectionError as e: + + print(e) + exit() + + # Enable tobii data stream + tobii_data_stream = tobii_controller.enable_data_stream() + + # Enable tobii video stream + tobii_video_stream = tobii_controller.enable_video_stream() + + # Create aruco camera + aruco_camera = ArUcoCamera.ArUcoCamera() + + # Load calibration file + if args.camera_calibration != None: + + aruco_camera.load_calibration_file(args.camera_calibration) + + else: + + raise UserWarning('.json camera calibration filepath required. Use -c option.') + + # Create aruco tracker + aruco_tracker = ArUcoTracker.ArUcoTracker(arcube['markers']['dictionary'], arcube['markers']['size'], aruco_camera) + + # Load specific configuration file + if args.aruco_tracker_configuration != None: + + aruco_tracker.load_configuration_file(args.aruco_tracker_configuration) + + print(f'ArUcoTracker configuration for {aruco_tracker.get_markers_dictionay().get_markers_format()} markers detection:') + aruco_tracker.print_configuration() + + # Init head pose tracking + head_translation = numpy.array((0, 0, 0)) + head_rotation = numpy.array((0, 0, 0)) + + # Init data timestamped in millisecond + data_ts_ms = 0 + + # Assess temporal performance + loop_chrono = MiscFeatures.TimeProbe() + loop_ps = 0 + + def data_stream_callback(data_ts, data_object, data_object_type): + + nonlocal head_translation + nonlocal head_rotation + nonlocal data_ts_ms + + data_ts_ms = data_ts / 1e3 + + match data_object_type: + + case 'Accelerometer': + + # Integrate head translation over time + #head_translation += numpy.array(data_object.value) + pass + + case 'Gyroscope': + + # Integrate head rotation over time + #head_rotation += numpy.array(data_object.value) + pass + + tobii_data_stream.reading_callback = data_stream_callback + + # Start streaming + tobii_controller.start_streaming() + + # Live video stream capture loop + try: + + # Assess loop performance + loop_chrono = MiscFeatures.TimeProbe() + fps = 0 + + while tobii_video_stream.is_alive(): + + # Read video stream + video_ts, video_frame = tobii_video_stream.read() + video_ts_ms = video_ts / 1e3 + + # Copy video frame to edit visualisation on it without disrupting aruco tracking + visu_frame = video_frame.copy() + + # Process video and data frame + try: + + # Track markers with pose estimation and draw them + aruco_tracker.track(video_frame.matrix) + #aruco_tracker.draw(visu_frame.matrix) + + # Pose can't be estimated from markers + if aruco_tracker.get_markers_number() == 0: + + raise UserWarning('No marker detected') + + # Look for ArCube's faces among tracked markers and store their pose + arcube_tracked_faces = {} + for (face, marker_id) in arcube['markers']['ids'].items(): + + try: + marker_index = aruco_tracker.get_marker_index(marker_id) + + arcube_tracked_faces[face] = {} + arcube_tracked_faces[face]['rotation'] = aruco_tracker.get_marker_rotation(marker_index) + arcube_tracked_faces[face]['translation'] = aruco_tracker.get_marker_translation(marker_index)[0] + + except ValueError: + continue + + print('-------------- ArCube pose estimation --------------') + + # Pose validity check is'nt possible when only one face of the cube is tracked + if len(arcube_tracked_faces.keys()) == 1: + + # Get arcube pose from to the unique face pose + face, pose = arcube_tracked_faces.popitem() + + # Transform face rotation into cube rotation vector + F, _ = cv.Rodrigues(pose['rotation']) + R = arcube['rotations'][face]['matrix'] + arcube_rvec, _ = cv.Rodrigues(F.dot(R)) + + # Transform face translation into cube translation vector + OF = pose['translation'] + T = arcube['translations'][face]['vector'] + FC = F.dot(R.dot(T)) + + arcube_tvec = OF + FC + + print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!') + print(f'arcube rotation vector: {arcube_rvec[0][0]:3f} {arcube_rvec[1][0]:3f} {arcube_rvec[2][0]:3f}') + print(f'arcube translation vector: {arcube_tvec[0]:3f} {arcube_tvec[1]:3f} {arcube_tvec[2]:3f}') + + draw_axis(visu_frame.matrix, arcube_rvec, arcube_tvec, aruco_camera.get_K()) + + # Check faces pose validity two by two + else: + + arcube_valid_faces = [] + arcube_valid_rvecs = [] + arcube_valid_tvecs = [] + + for (A_face, A_pose), (B_face, B_pose) in itertools.combinations(arcube_tracked_faces.items(), 2): + + #print(f'** {A_face} > {B_face}') + + # Get face rotation estimation + # Use rotation matrix instead of rotation vector + A, _ = cv.Rodrigues(A_pose['rotation']) + B, _ = cv.Rodrigues(B_pose['rotation']) + + # Rotation matrix from A face to B face + AB = B.dot(A.T) + + assert(isRotationMatrix(AB)) + + # Calculate axis-angles representation of AB rotation matrix + angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2)) + + #print('rotation angle:') + #print(angle) + + try: + expected_angle = arcube['rotations'][A_face][B_face] + + except KeyError: + expected_angle = arcube['rotations'][B_face][A_face] + + #print('expected angle:') + #print(expected_angle) + + # Check angle according given tolerance then normalise face rotation + if math.isclose(angle, expected_angle, abs_tol=args.tolerance): + + if A_face not in arcube_valid_faces: + + # Remember this face is already validated + arcube_valid_faces.append(A_face) + + # Transform face rotation into cube rotation vector + R = arcube['rotations'][A_face]['matrix'] + rvec, _ = cv.Rodrigues(A.dot(R)) + + #print(f'{A_face} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}') + + # Transform face translation into cube translation vector + OA = A_pose['translation'] + T = arcube['translations'][A_face]['vector'] + AC = A.dot(R.dot(T)) + + tvec = OA + AC + + #print(f'{A_face} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}') + + # Store normalised face pose + arcube_valid_rvecs.append(rvec) + arcube_valid_tvecs.append(tvec) + + if B_face not in arcube_valid_faces: + + # Remember this face is already validated + arcube_valid_faces.append(B_face) + + # Normalise face rotation + R = arcube['rotations'][B_face]['matrix'] + rvec, _ = cv.Rodrigues(B.dot(R)) + + #print(f'{B_face} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}') + + # Normalise face translation + OB = B_pose['translation'] + T = arcube['translations'][B_face]['vector'] + BC = B.dot(R.dot(T)) + + tvec = OB + BC + + #print(f'{B_face} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}') + + # Store normalised face pose + arcube_valid_rvecs.append(rvec) + arcube_valid_tvecs.append(tvec) + + if len(arcube_valid_faces) > 1: + + # Consider arcube rotation as the mean of all valid translations + # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before. + arcube_rvec = numpy.mean(numpy.array(arcube_valid_rvecs), axis=0) + + # Consider arcube translation as the mean of all valid translations + arcube_tvec = numpy.mean(numpy.array(arcube_valid_tvecs), axis=0) + + print(':::::::::::::::::::::::::::::::::::::::::::::::::::') + print(f'arcube rotation vector: {arcube_rvec[0][0]:3f} {arcube_rvec[1][0]:3f} {arcube_rvec[2][0]:3f}') + print(f'arcube translation vector: {arcube_tvec[0]:3f} {arcube_tvec[1]:3f} {arcube_tvec[2]:3f}') + + draw_axis(visu_frame.matrix, arcube_rvec, arcube_tvec, aruco_camera.get_K()) + + print('----------------------------------------------------') + + # Write warning + except UserWarning as w: + + cv.rectangle(visu_frame.matrix, (0, 100), (500, 150), (127, 127, 127), -1) + cv.putText(visu_frame.matrix, str(w), (20, 140), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) + + # Assess loop performance + lap_time, lap_counter, elapsed_time = loop_chrono.lap() + + # Update fps each 10 loops + if lap_counter >= 10: + + loop_ps = 1e3 * lap_counter / elapsed_time + loop_chrono.restart() + + # Draw center + cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1) + cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1) + + # Write stream timing + cv.rectangle(visu_frame.matrix, (0, 0), (1100, 50), (63, 63, 63), -1) + cv.putText(visu_frame.matrix, f'Data stream time: {int(data_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) + cv.putText(visu_frame.matrix, f'Video delay: {int(data_ts_ms - video_ts_ms)} ms', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) + cv.putText(visu_frame.matrix, f'Fps: {int(loop_ps)}', (950, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) + + cv.imshow(f'Stream ArCube', visu_frame.matrix) + + # Close window using 'Esc' key + if cv.waitKey(1) == 27: + break + + # Exit on 'ctrl+C' interruption + except KeyboardInterrupt: + pass + + # Stop frame display + cv.destroyAllWindows() + + # Stop streaming + tobii_controller.stop_streaming() + +if __name__ == '__main__': + + main() \ No newline at end of file -- cgit v1.1