diff options
-rw-r--r-- | src/argaze/utils/tobii_stream_aruco_aoi_ivy_application.py | 122 | ||||
-rw-r--r-- | src/argaze/utils/tobii_stream_aruco_aoi_ivy_controller.py | 297 |
2 files changed, 0 insertions, 419 deletions
diff --git a/src/argaze/utils/tobii_stream_aruco_aoi_ivy_application.py b/src/argaze/utils/tobii_stream_aruco_aoi_ivy_application.py deleted file mode 100644 index 70190e2..0000000 --- a/src/argaze/utils/tobii_stream_aruco_aoi_ivy_application.py +++ /dev/null @@ -1,122 +0,0 @@ - #!/usr/bin/env python - -import argparse -import os - -from argaze import DataStructures, GazeFeatures -from argaze.ArUcoMarkers import ArUcoMarkersDictionary -from argaze.AreaOfInterest import * - -import cv2 as cv -import numpy - -from ivy.std_api import * - -def main(): - """ - Define AOI scene from a ArUco marker and bind to Ivy default bus to receive live look at pointer data. - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-y', '--ivy_bus', metavar='IVY_BUS', type=str, default='0.0.0.0:2010', help='Ivy bus ip and port') - parser.add_argument('-a', '--aoi_scene', metavar='AOI_SCENE', type=str, default='aoi3D_scene.obj', help='obj aoi scene filepath') - parser.add_argument('-d', '--dictionary', metavar='DICT', type=str, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL,DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)') - parser.add_argument('-m', '--marker_size', metavar='MKR', type=float, default=6, help='aruco marker size (cm)') - parser.add_argument('-i', '--marker_id', metavar='MARKER_ID', type=int, default=0, help='marker id to display') - args = parser.parse_args() - - # Enable Ivy bus - IvyInit(os.path.basename(__file__)) - IvyStart(args.ivy_bus) - - def on_looking_message(*args): - - look_at = numpy.fromstring(args[2].replace('[','').replace(']',''), dtype=float, count=2, sep=', ') - - visu_gaze_pixel = aoi2D_visu_scene[args[1]].looked_pixel(look_at) - - cv.circle(visu_frame, visu_gaze_pixel, 4, (0, 0, 255), -1) - - IvyBindMsg(on_looking_message, 'looking (.*) at (.*)') - - # Create AOIs 3D scene - aoi3D_scene = AOI3DScene.AOI3DScene() - aoi3D_scene.load(args.aoi_scene) - print(f'AOIs names: {aoi3D_scene.keys()}') - - # Create a visual scan visualisation frame - visu_width = 1920 - visu_height = 1080 - visu_ratio = visu_height - visu_frame = numpy.full((visu_height, visu_width, 3), 255, dtype=numpy.uint8) - - cv.imshow('Scene', visu_frame) - - # Project 3D scene on the reference frame - # TODO : center projection on a reference AOI - ref_aoi = 'Scene_Plan' - - # TODO: pass the reference AOI in argument - aoi3D_scene_rotation = numpy.array([[-numpy.pi, 0.0, 0.0]]) - aoi3D_scene_translation = numpy.array([[19.0, 8.0, 25.0]]) - - # Edit a projection matrix for the reference frame - K0 = numpy.array([[visu_ratio, 0.0, visu_width/2], [0.0, visu_ratio, visu_height/2], [0.0, 0.0, 1.0]]) - - aoi2D_visu_scene = aoi3D_scene.project(aoi3D_scene_translation, aoi3D_scene_rotation, K0) - - # Create aruco markers dictionary - aruco_markers_dict = ArUcoMarkersDictionary.ArUcoMarkersDictionary(args.dictionary) - - # Create aruco marker - marker_box = aoi2D_visu_scene['Marker_Plan'].bounding_box().astype(int) - marker_size = marker_box[2] - marker_box[0] - marker = aruco_markers_dict.create_marker(args.marker_id, int(marker_size[0])) - print(f'Creating Aruco marker {args.marker_id} from the {args.dictionary} dictionary') - - def draw_scene(): - - # Clear frame - visu_frame[:] = 255 - - # Display AOI 2D scene - for name, aoi in aoi2D_visu_scene.items(): - aoi.draw(visu_frame, (0, 0, 0)) - - # Display aruco marker - visu_frame[marker_box[0][1]:marker_box[2][1], marker_box[0][0]:marker_box[2][0], :] = marker - - # On mouse over : redraw scene and draw target - def on_mouse_event(event, x, y, flags, param): - - draw_scene() - - # Draw target - cv.circle(visu_frame, (x, y), 40, (0, 255, 255), -1) - - cv.setMouseCallback('Scene', on_mouse_event) - - # Screen display loop - try: - - draw_scene() - - while True: - - # Close window using 'Esc' key - if cv.waitKey(1) == 27: - break - - cv.imshow('Scene', visu_frame) - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - # Stop frame display - cv.destroyAllWindows() - -if __name__ == '__main__': - - main()
\ No newline at end of file diff --git a/src/argaze/utils/tobii_stream_aruco_aoi_ivy_controller.py b/src/argaze/utils/tobii_stream_aruco_aoi_ivy_controller.py deleted file mode 100644 index 0e9aeea..0000000 --- a/src/argaze/utils/tobii_stream_aruco_aoi_ivy_controller.py +++ /dev/null @@ -1,297 +0,0 @@ - #!/usr/bin/env python - -import argparse -import os -import json - -from argaze import DataStructures, GazeFeatures -from argaze.TobiiGlassesPro2 import * -from argaze.ArUcoMarkers import ArUcoTracker, ArUcoCamera -from argaze.AreaOfInterest import * - -import cv2 as cv -import numpy - -from ivy.std_api import * - -def main(): - """ - Track any ArUco marker into Tobii Glasses Pro 2 camera video stream. - For each loaded AOI scene .obj file, position the scene virtually relatively to each detected ArUco markers and project the scene into camera frame. - Then, detect if Tobii gaze point is inside any AOI and send the look at pointer over Ivy default bus. - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default='192.168.1.10', help='tobii glasses ip') - parser.add_argument('-p', '--project_name', metavar='PROJECT_NAME', type=str, default=TobiiController.DEFAULT_PROJECT_NAME, help='project name') - parser.add_argument('-u', '--participant_name', metavar='PARTICIPANT_NAME', type=str, default=TobiiController.DEFAULT_PARTICIPANT_NAME, help='participant name') - parser.add_argument('-c', '--camera_calibration', metavar='CAM_CALIB', type=str, default='tobii_camera.json', help='json camera calibration filepath') - parser.add_argument('-y', '--ivy_bus', metavar='IVY_BUS', type=str, default='0.0.0.0:2010', help='Ivy bus ip and port') - parser.add_argument('-md', '--marker_dictionary', metavar='DICT', type=str, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL,DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)') - parser.add_argument('-ms', '--marker_size', metavar='MKR', type=float, default=6, help='aruco marker size (cm)') - parser.add_argument('-mi', '--marker_id_scene', metavar='MARKER_ID_SCENE', type=json.loads, help='{"marker": "aoi scene filepath"} dictionary') - args = parser.parse_args() - - # Manage markers id to track - if args.marker_id_scene == None: - print(f'Track any Aruco markers from the {args.marker_dictionary} dictionary') - else: - print(f'Track Aruco markers {args.marker_id_scene.keys()} from the {args.marker_dictionary} dictionary') - - # Enable Ivy bus - IvyInit(os.path.basename(__file__)) - IvyStart(args.ivy_bus) - - # Create tobii controller (with auto discovery network process if no ip argument is provided) - print("Looking for a Tobii Glasses Pro 2 device ...") - - try: - - tobii_controller = TobiiController.TobiiController(args.tobii_ip) - print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') - - except ConnectionError as e: - - print(e) - exit() - - # Setup camera at 25 fps to work on Full HD video stream - tobii_controller.set_scene_camera_freq_25() - - # Print current confirugration - print(f'Tobii Glasses Pro 2 configuration:') - for key, value in tobii_controller.get_configuration().items(): - print(f'\t{key}: {value}') - - # Calibrate tobii glasses - tobii_controller.calibrate() - - # Enable tobii data stream - tobii_data_stream = tobii_controller.enable_data_stream() - - # Enable tobii video stream - tobii_video_stream = tobii_controller.enable_video_stream() - - # create aruco camera - aruco_camera = ArUcoCamera.ArUcoCamera() - aruco_camera.load_calibration_file(args.camera_calibration) - - # Create aruco tracker - aruco_tracker = ArUcoTracker.ArUcoTracker(args.marker_dictionary, args.marker_size, aruco_camera) - - # Load AOI 3D scene for each marker - aoi3D_scenes = {} - - for marker_id, aoi_scene_filepath in args.marker_id_scene.items(): - - marker_id = int(marker_id) - - aoi3D_scenes[marker_id] = AOI3DScene.AOI3DScene() - aoi3D_scenes[marker_id].load(aoi_scene_filepath) - - print(f'AOI in {os.path.basename(aoi_scene_filepath)} scene related to marker #{marker_id}') - for aoi in aoi3D_scenes[marker_id].keys(): - print(f'\t{aoi}') - - def aoi3D_scene_selector(marker_id): - return aoi3D_scenes.get(marker_id, None) - - # Start streaming - tobii_controller.start_streaming() - - # Live video stream capture loop - try: - - past_gaze_positions = DataStructures.TimeStampedBuffer() - past_head_rotations = DataStructures.TimeStampedBuffer() - - head_moving = False - head_movement_last = 0. - - while tobii_video_stream.is_alive(): - - video_ts, video_frame = tobii_video_stream.read() - - # Copy video frame to edit visualisation on it without disrupting aruco tracking - visu_frame = video_frame.copy() - - # Process video and data frame - try: - - # Read data stream - data_stream = tobii_data_stream.read() - - # Store last received data - past_head_rotations.append(data_stream['Gyroscope']) - past_gaze_positions.append(data_stream['GazePosition']) - - # Get nearest head rotation before video timestamp and remove all head rotations before - _, nearest_head_rotation = tobii_ts_head_rotations.pop_first_until(video_ts) - - # Calculate head movement considering only head yaw and pitch - head_movement = numpy.array(nearest_head_rotation.value) - head_movement_px = head_movement.astype(int) - head_movement_norm = numpy.linalg.norm(head_movement[0:2]) - - # Draw movement vector - cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2)), (int(visu_frame.width/2) + head_movement_px[1], int(visu_frame.height/2) - head_movement_px[0]), (150, 150, 150), 3) - - # Head movement detection hysteresis - # TODO : pass the threshold value as argument - if not head_moving and head_movement_norm > 50: - head_moving = True - - if head_moving and head_movement_norm < 10: - head_moving = False - - # When head is moving, ArUco tracking could return bad pose estimation and so bad AOI scene projection - if head_moving: - raise AOIFeatures.AOISceneMissing('Head is moving') - - # Get nearest gaze position before video timestamp and remove all gaze positions before - _, nearest_gaze_position = tobii_ts_gaze_positions.pop_first_until(video_ts) - - # Ignore frame when gaze position is not valid - if nearest_gaze_position.validity == 1: - raise GazeFeatures.GazePositionMissing('Unvalid gaze position') - - gaze_position_pixel = GazeFeatures.GazePosition( (int(nearest_gaze_position.value[0] * visu_frame.width), int(nearest_gaze_position.value[1] * visu_frame.height)) ) - - # Draw gaze position - cv.circle(visu_frame.matrix, gaze_position_pixel, 2, (0, 255, 255), -1) - - # Get nearest gaze position 3D before video timestamp and remove all gaze positions before - _, nearest_gaze_position_3d = tobii_ts_gaze_positions_3d.pop_first_until(video_ts) - - # Ignore frame when gaze position 3D is not valid - if nearest_gaze_position_3d.validity == 1: - raise GazeFeatures.GazePositionMissing('Unvalid gaze position 3D') - - gaze_position_pixel = (int(nearest_gaze_position.value[0] * visu_frame.width), int(nearest_gaze_position.value[1] * visu_frame.height)) - - gaze_accuracy_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.ACCURACY)) * nearest_gaze_position_3d.value[2] - tobii_camera_hfov_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV / 2)) * nearest_gaze_position_3d.value[2] - - gaze_position_pixel.accuracy = round(visu_frame.width * float(gaze_accuracy_mm) / float(tobii_camera_hfov_mm)) - - # Draw gaze position and accuracy - cv.circle(visu_frame.matrix, gaze_position_pixel, 2, (0, 255, 255), -1) - cv.circle(visu_frame.matrix, gaze_position_pixel, gaze_position_pixel.accuracy, (0, 255, 255), 1) - - # Hide frame left and right borders before tracking to ignore markers outside focus area - cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width/6), int(video_frame.height)), (0, 0, 0), -1) - cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - 1/6)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1) - - # Track markers with pose estimation and draw them - aruco_tracker.track(video_frame.matrix) - aruco_tracker.draw(visu_frame.matrix) - - # When no marker is detected, no AOI scene projection can't be done - if aruco_tracker.get_markers_number() == 0: - raise AOIFeatures.AOISceneMissing('No marker detected') - - # Store aoi 2D video for further scene merging - aoi2D_dict = {} - - # Project 3D scenes related to each aruco markers - for (i, marker_id) in enumerate(aruco_tracker.get_markers_ids()): - - # Select 3D scene related to detected marker - aoi3D_scene = aoi3D_scene_selector(marker_id) - - if aoi3D_scene == None: - continue - - # Transform scene into camera referential - aoi3D_camera = aoi3D_scene.transform(aruco_tracker.get_marker_translation(i), aruco_tracker.get_marker_rotation(i)) - - # Get aoi inside vision cone field - cone_vision_height_cm = nearest_gaze_position_3d.value[2]/10 # cm - cone_vision_radius_cm = numpy.tan(numpy.deg2rad(tobii_visual_hfov / 2)) * cone_vision_height_cm - - aoi3D_inside, aoi3D_outside = aoi3D_camera.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) - - # Keep only aoi inside vision cone field - aoi3D_scene = aoi3D_scene.copy(exclude=aoi3D_outside.keys()) - - # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it - # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable. - aoi2D_video_scene = aoi3D_scene.project(aruco_tracker.get_marker_translation(i), aruco_tracker.get_marker_rotation(i), aruco_camera.get_K()) - - # Store each 2D aoi for further scene merging - for name, aoi in aoi2D_video_scene.items(): - - if name not in aoi2D_dict.keys(): - aoi2D_dict[name] = [] - - aoi2D_dict[name].append(aoi.clockwise()) - - # Merge all 2D aoi into a single 2D scene - aoi2D_merged_scene = AOI2DScene.AOI2DScene() - for name, aoi_array in aoi2D_dict.items(): - aoi2D_merged_scene[name] = numpy.sum(aoi_array, axis=0) / len(aoi_array) - - aoi2D_merged_scene.draw(visu_frame.matrix, gaze_position_pixel, gaze_position_pixel.accuracy, exclude=['Visualisation_Plan']) - - # When the merged scene is empty - if len(aoi2D_merged_scene.keys()) == 0: - raise AOIFeatures.AOISceneMissing('Scene is empty') - - # Send look at aoi pointer - for name, aoi in aoi2D_merged_scene.items(): - - if aoi.looked(video_gaze_pixel): - - # 4 corners aoi - if len(aoi) == 4: - IvySendMsg(f'looking {name} at {aoi.look_at(video_gaze_pixel)}') - else: - IvySendMsg(f'looking {name}') - - # Raised when gaze data can't be processed - except GazeFeatures.GazeDataMissing as e: - - cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(e), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Raised when aoi scene is missing - except AOIFeatures.AOISceneMissing as e: - - cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(e), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Raised when buffer is empty - except KeyError: - pass - - # Draw focus area - cv.rectangle(visu_frame.matrix, (int(video_frame.width/6), 0), (int(visu_frame.width*(1-1/6)), int(visu_frame.height)), (255, 150, 150), 1) - - # Draw center - cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1) - cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1) - - # Write stream timing - cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1) - cv.putText(visu_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Close window using 'Esc' key - if cv.waitKey(1) == 27: - break - - cv.imshow('Live Scene', visu_frame.matrix) - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - # Stop frame display - cv.destroyAllWindows() - - # Stop streaming - tobii_controller.stop_streaming() - -if __name__ == '__main__': - - main()
\ No newline at end of file |