aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/argaze/utils/tobii_stream_aruco_aoi_ivy_application.py122
-rw-r--r--src/argaze/utils/tobii_stream_aruco_aoi_ivy_controller.py297
2 files changed, 0 insertions, 419 deletions
diff --git a/src/argaze/utils/tobii_stream_aruco_aoi_ivy_application.py b/src/argaze/utils/tobii_stream_aruco_aoi_ivy_application.py
deleted file mode 100644
index 70190e2..0000000
--- a/src/argaze/utils/tobii_stream_aruco_aoi_ivy_application.py
+++ /dev/null
@@ -1,122 +0,0 @@
- #!/usr/bin/env python
-
-import argparse
-import os
-
-from argaze import DataStructures, GazeFeatures
-from argaze.ArUcoMarkers import ArUcoMarkersDictionary
-from argaze.AreaOfInterest import *
-
-import cv2 as cv
-import numpy
-
-from ivy.std_api import *
-
-def main():
- """
- Define AOI scene from a ArUco marker and bind to Ivy default bus to receive live look at pointer data.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-y', '--ivy_bus', metavar='IVY_BUS', type=str, default='0.0.0.0:2010', help='Ivy bus ip and port')
- parser.add_argument('-a', '--aoi_scene', metavar='AOI_SCENE', type=str, default='aoi3D_scene.obj', help='obj aoi scene filepath')
- parser.add_argument('-d', '--dictionary', metavar='DICT', type=str, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL,DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)')
- parser.add_argument('-m', '--marker_size', metavar='MKR', type=float, default=6, help='aruco marker size (cm)')
- parser.add_argument('-i', '--marker_id', metavar='MARKER_ID', type=int, default=0, help='marker id to display')
- args = parser.parse_args()
-
- # Enable Ivy bus
- IvyInit(os.path.basename(__file__))
- IvyStart(args.ivy_bus)
-
- def on_looking_message(*args):
-
- look_at = numpy.fromstring(args[2].replace('[','').replace(']',''), dtype=float, count=2, sep=', ')
-
- visu_gaze_pixel = aoi2D_visu_scene[args[1]].looked_pixel(look_at)
-
- cv.circle(visu_frame, visu_gaze_pixel, 4, (0, 0, 255), -1)
-
- IvyBindMsg(on_looking_message, 'looking (.*) at (.*)')
-
- # Create AOIs 3D scene
- aoi3D_scene = AOI3DScene.AOI3DScene()
- aoi3D_scene.load(args.aoi_scene)
- print(f'AOIs names: {aoi3D_scene.keys()}')
-
- # Create a visual scan visualisation frame
- visu_width = 1920
- visu_height = 1080
- visu_ratio = visu_height
- visu_frame = numpy.full((visu_height, visu_width, 3), 255, dtype=numpy.uint8)
-
- cv.imshow('Scene', visu_frame)
-
- # Project 3D scene on the reference frame
- # TODO : center projection on a reference AOI
- ref_aoi = 'Scene_Plan'
-
- # TODO: pass the reference AOI in argument
- aoi3D_scene_rotation = numpy.array([[-numpy.pi, 0.0, 0.0]])
- aoi3D_scene_translation = numpy.array([[19.0, 8.0, 25.0]])
-
- # Edit a projection matrix for the reference frame
- K0 = numpy.array([[visu_ratio, 0.0, visu_width/2], [0.0, visu_ratio, visu_height/2], [0.0, 0.0, 1.0]])
-
- aoi2D_visu_scene = aoi3D_scene.project(aoi3D_scene_translation, aoi3D_scene_rotation, K0)
-
- # Create aruco markers dictionary
- aruco_markers_dict = ArUcoMarkersDictionary.ArUcoMarkersDictionary(args.dictionary)
-
- # Create aruco marker
- marker_box = aoi2D_visu_scene['Marker_Plan'].bounding_box().astype(int)
- marker_size = marker_box[2] - marker_box[0]
- marker = aruco_markers_dict.create_marker(args.marker_id, int(marker_size[0]))
- print(f'Creating Aruco marker {args.marker_id} from the {args.dictionary} dictionary')
-
- def draw_scene():
-
- # Clear frame
- visu_frame[:] = 255
-
- # Display AOI 2D scene
- for name, aoi in aoi2D_visu_scene.items():
- aoi.draw(visu_frame, (0, 0, 0))
-
- # Display aruco marker
- visu_frame[marker_box[0][1]:marker_box[2][1], marker_box[0][0]:marker_box[2][0], :] = marker
-
- # On mouse over : redraw scene and draw target
- def on_mouse_event(event, x, y, flags, param):
-
- draw_scene()
-
- # Draw target
- cv.circle(visu_frame, (x, y), 40, (0, 255, 255), -1)
-
- cv.setMouseCallback('Scene', on_mouse_event)
-
- # Screen display loop
- try:
-
- draw_scene()
-
- while True:
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- cv.imshow('Scene', visu_frame)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_stream_aruco_aoi_ivy_controller.py b/src/argaze/utils/tobii_stream_aruco_aoi_ivy_controller.py
deleted file mode 100644
index 0e9aeea..0000000
--- a/src/argaze/utils/tobii_stream_aruco_aoi_ivy_controller.py
+++ /dev/null
@@ -1,297 +0,0 @@
- #!/usr/bin/env python
-
-import argparse
-import os
-import json
-
-from argaze import DataStructures, GazeFeatures
-from argaze.TobiiGlassesPro2 import *
-from argaze.ArUcoMarkers import ArUcoTracker, ArUcoCamera
-from argaze.AreaOfInterest import *
-
-import cv2 as cv
-import numpy
-
-from ivy.std_api import *
-
-def main():
- """
- Track any ArUco marker into Tobii Glasses Pro 2 camera video stream.
- For each loaded AOI scene .obj file, position the scene virtually relatively to each detected ArUco markers and project the scene into camera frame.
- Then, detect if Tobii gaze point is inside any AOI and send the look at pointer over Ivy default bus.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default='192.168.1.10', help='tobii glasses ip')
- parser.add_argument('-p', '--project_name', metavar='PROJECT_NAME', type=str, default=TobiiController.DEFAULT_PROJECT_NAME, help='project name')
- parser.add_argument('-u', '--participant_name', metavar='PARTICIPANT_NAME', type=str, default=TobiiController.DEFAULT_PARTICIPANT_NAME, help='participant name')
- parser.add_argument('-c', '--camera_calibration', metavar='CAM_CALIB', type=str, default='tobii_camera.json', help='json camera calibration filepath')
- parser.add_argument('-y', '--ivy_bus', metavar='IVY_BUS', type=str, default='0.0.0.0:2010', help='Ivy bus ip and port')
- parser.add_argument('-md', '--marker_dictionary', metavar='DICT', type=str, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL,DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)')
- parser.add_argument('-ms', '--marker_size', metavar='MKR', type=float, default=6, help='aruco marker size (cm)')
- parser.add_argument('-mi', '--marker_id_scene', metavar='MARKER_ID_SCENE', type=json.loads, help='{"marker": "aoi scene filepath"} dictionary')
- args = parser.parse_args()
-
- # Manage markers id to track
- if args.marker_id_scene == None:
- print(f'Track any Aruco markers from the {args.marker_dictionary} dictionary')
- else:
- print(f'Track Aruco markers {args.marker_id_scene.keys()} from the {args.marker_dictionary} dictionary')
-
- # Enable Ivy bus
- IvyInit(os.path.basename(__file__))
- IvyStart(args.ivy_bus)
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(args.tobii_ip)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Setup camera at 25 fps to work on Full HD video stream
- tobii_controller.set_scene_camera_freq_25()
-
- # Print current confirugration
- print(f'Tobii Glasses Pro 2 configuration:')
- for key, value in tobii_controller.get_configuration().items():
- print(f'\t{key}: {value}')
-
- # Calibrate tobii glasses
- tobii_controller.calibrate()
-
- # Enable tobii data stream
- tobii_data_stream = tobii_controller.enable_data_stream()
-
- # Enable tobii video stream
- tobii_video_stream = tobii_controller.enable_video_stream()
-
- # create aruco camera
- aruco_camera = ArUcoCamera.ArUcoCamera()
- aruco_camera.load_calibration_file(args.camera_calibration)
-
- # Create aruco tracker
- aruco_tracker = ArUcoTracker.ArUcoTracker(args.marker_dictionary, args.marker_size, aruco_camera)
-
- # Load AOI 3D scene for each marker
- aoi3D_scenes = {}
-
- for marker_id, aoi_scene_filepath in args.marker_id_scene.items():
-
- marker_id = int(marker_id)
-
- aoi3D_scenes[marker_id] = AOI3DScene.AOI3DScene()
- aoi3D_scenes[marker_id].load(aoi_scene_filepath)
-
- print(f'AOI in {os.path.basename(aoi_scene_filepath)} scene related to marker #{marker_id}')
- for aoi in aoi3D_scenes[marker_id].keys():
- print(f'\t{aoi}')
-
- def aoi3D_scene_selector(marker_id):
- return aoi3D_scenes.get(marker_id, None)
-
- # Start streaming
- tobii_controller.start_streaming()
-
- # Live video stream capture loop
- try:
-
- past_gaze_positions = DataStructures.TimeStampedBuffer()
- past_head_rotations = DataStructures.TimeStampedBuffer()
-
- head_moving = False
- head_movement_last = 0.
-
- while tobii_video_stream.is_alive():
-
- video_ts, video_frame = tobii_video_stream.read()
-
- # Copy video frame to edit visualisation on it without disrupting aruco tracking
- visu_frame = video_frame.copy()
-
- # Process video and data frame
- try:
-
- # Read data stream
- data_stream = tobii_data_stream.read()
-
- # Store last received data
- past_head_rotations.append(data_stream['Gyroscope'])
- past_gaze_positions.append(data_stream['GazePosition'])
-
- # Get nearest head rotation before video timestamp and remove all head rotations before
- _, nearest_head_rotation = tobii_ts_head_rotations.pop_first_until(video_ts)
-
- # Calculate head movement considering only head yaw and pitch
- head_movement = numpy.array(nearest_head_rotation.value)
- head_movement_px = head_movement.astype(int)
- head_movement_norm = numpy.linalg.norm(head_movement[0:2])
-
- # Draw movement vector
- cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2)), (int(visu_frame.width/2) + head_movement_px[1], int(visu_frame.height/2) - head_movement_px[0]), (150, 150, 150), 3)
-
- # Head movement detection hysteresis
- # TODO : pass the threshold value as argument
- if not head_moving and head_movement_norm > 50:
- head_moving = True
-
- if head_moving and head_movement_norm < 10:
- head_moving = False
-
- # When head is moving, ArUco tracking could return bad pose estimation and so bad AOI scene projection
- if head_moving:
- raise AOIFeatures.AOISceneMissing('Head is moving')
-
- # Get nearest gaze position before video timestamp and remove all gaze positions before
- _, nearest_gaze_position = tobii_ts_gaze_positions.pop_first_until(video_ts)
-
- # Ignore frame when gaze position is not valid
- if nearest_gaze_position.validity == 1:
- raise GazeFeatures.GazePositionMissing('Unvalid gaze position')
-
- gaze_position_pixel = GazeFeatures.GazePosition( (int(nearest_gaze_position.value[0] * visu_frame.width), int(nearest_gaze_position.value[1] * visu_frame.height)) )
-
- # Draw gaze position
- cv.circle(visu_frame.matrix, gaze_position_pixel, 2, (0, 255, 255), -1)
-
- # Get nearest gaze position 3D before video timestamp and remove all gaze positions before
- _, nearest_gaze_position_3d = tobii_ts_gaze_positions_3d.pop_first_until(video_ts)
-
- # Ignore frame when gaze position 3D is not valid
- if nearest_gaze_position_3d.validity == 1:
- raise GazeFeatures.GazePositionMissing('Unvalid gaze position 3D')
-
- gaze_position_pixel = (int(nearest_gaze_position.value[0] * visu_frame.width), int(nearest_gaze_position.value[1] * visu_frame.height))
-
- gaze_accuracy_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.ACCURACY)) * nearest_gaze_position_3d.value[2]
- tobii_camera_hfov_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV / 2)) * nearest_gaze_position_3d.value[2]
-
- gaze_position_pixel.accuracy = round(visu_frame.width * float(gaze_accuracy_mm) / float(tobii_camera_hfov_mm))
-
- # Draw gaze position and accuracy
- cv.circle(visu_frame.matrix, gaze_position_pixel, 2, (0, 255, 255), -1)
- cv.circle(visu_frame.matrix, gaze_position_pixel, gaze_position_pixel.accuracy, (0, 255, 255), 1)
-
- # Hide frame left and right borders before tracking to ignore markers outside focus area
- cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width/6), int(video_frame.height)), (0, 0, 0), -1)
- cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - 1/6)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1)
-
- # Track markers with pose estimation and draw them
- aruco_tracker.track(video_frame.matrix)
- aruco_tracker.draw(visu_frame.matrix)
-
- # When no marker is detected, no AOI scene projection can't be done
- if aruco_tracker.get_markers_number() == 0:
- raise AOIFeatures.AOISceneMissing('No marker detected')
-
- # Store aoi 2D video for further scene merging
- aoi2D_dict = {}
-
- # Project 3D scenes related to each aruco markers
- for (i, marker_id) in enumerate(aruco_tracker.get_markers_ids()):
-
- # Select 3D scene related to detected marker
- aoi3D_scene = aoi3D_scene_selector(marker_id)
-
- if aoi3D_scene == None:
- continue
-
- # Transform scene into camera referential
- aoi3D_camera = aoi3D_scene.transform(aruco_tracker.get_marker_translation(i), aruco_tracker.get_marker_rotation(i))
-
- # Get aoi inside vision cone field
- cone_vision_height_cm = nearest_gaze_position_3d.value[2]/10 # cm
- cone_vision_radius_cm = numpy.tan(numpy.deg2rad(tobii_visual_hfov / 2)) * cone_vision_height_cm
-
- aoi3D_inside, aoi3D_outside = aoi3D_camera.vision_cone(cone_vision_radius_cm, cone_vision_height_cm)
-
- # Keep only aoi inside vision cone field
- aoi3D_scene = aoi3D_scene.copy(exclude=aoi3D_outside.keys())
-
- # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it
- # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable.
- aoi2D_video_scene = aoi3D_scene.project(aruco_tracker.get_marker_translation(i), aruco_tracker.get_marker_rotation(i), aruco_camera.get_K())
-
- # Store each 2D aoi for further scene merging
- for name, aoi in aoi2D_video_scene.items():
-
- if name not in aoi2D_dict.keys():
- aoi2D_dict[name] = []
-
- aoi2D_dict[name].append(aoi.clockwise())
-
- # Merge all 2D aoi into a single 2D scene
- aoi2D_merged_scene = AOI2DScene.AOI2DScene()
- for name, aoi_array in aoi2D_dict.items():
- aoi2D_merged_scene[name] = numpy.sum(aoi_array, axis=0) / len(aoi_array)
-
- aoi2D_merged_scene.draw(visu_frame.matrix, gaze_position_pixel, gaze_position_pixel.accuracy, exclude=['Visualisation_Plan'])
-
- # When the merged scene is empty
- if len(aoi2D_merged_scene.keys()) == 0:
- raise AOIFeatures.AOISceneMissing('Scene is empty')
-
- # Send look at aoi pointer
- for name, aoi in aoi2D_merged_scene.items():
-
- if aoi.looked(video_gaze_pixel):
-
- # 4 corners aoi
- if len(aoi) == 4:
- IvySendMsg(f'looking {name} at {aoi.look_at(video_gaze_pixel)}')
- else:
- IvySendMsg(f'looking {name}')
-
- # Raised when gaze data can't be processed
- except GazeFeatures.GazeDataMissing as e:
-
- cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Raised when aoi scene is missing
- except AOIFeatures.AOISceneMissing as e:
-
- cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Raised when buffer is empty
- except KeyError:
- pass
-
- # Draw focus area
- cv.rectangle(visu_frame.matrix, (int(video_frame.width/6), 0), (int(visu_frame.width*(1-1/6)), int(visu_frame.height)), (255, 150, 150), 1)
-
- # Draw center
- cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1)
- cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1)
-
- # Write stream timing
- cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(visu_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- cv.imshow('Live Scene', visu_frame.matrix)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
- # Stop streaming
- tobii_controller.stop_streaming()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file