aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/utils/tobii_stream_arscene_display.py
blob: e7a3bfb80da834e12cddeeb213b96abff37b0bf8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python

import argparse
import os, json

from argaze import *
from argaze.TobiiGlassesPro2 import *
from argaze.ArUcoMarkers import *
from argaze.AreaOfInterest import *
from argaze.utils import MiscFeatures

import cv2 as cv
import numpy

def main():
    """
    Detect ArUcoScene into Tobii Glasses Pro 2 camera video stream.
    """

    # Manage arguments
    parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
    parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
    parser.add_argument('-p', '--env_path', metavar='ENVIRONMENT_PATH', type=str, default=None, help='json argaze environment filepath')
    parser.add_argument('-b', '--borders', metavar='BORDERS', type=float, default=16.666, help='define left and right borders mask (%) to not detect aruco out of these borders')
    parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs')
    args = parser.parse_args()

    # Create tobii controller (with auto discovery network process if no ip argument is provided)
    print("Looking for a Tobii Glasses Pro 2 device ...")

    try:

        tobii_controller = TobiiController.TobiiController(args.tobii_ip)
        print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')

    except ConnectionError as e:

        print(e)
        exit()

    # Setup camera at 25 fps to work on Full HD video stream
    tobii_controller.set_scene_camera_freq_25()

    # Print current confirugration
    print(f'Tobii Glasses Pro 2 configuration:')
    for key, value in tobii_controller.get_configuration().items():
        print(f'\t{key}: {value}')

    # Enable tobii data stream 
    tobii_data_stream = tobii_controller.enable_data_stream()

    # Enable tobii video stream
    tobii_video_stream = tobii_controller.enable_video_stream()

    # Load ArEnvironment
    ar_env = ArFeatures.ArEnvironment.from_json(args.env_path)

    if args.debug:
        print(ar_env)

    # Work with first scene only
    _, ar_scene = next(iter(ar_env.items()))

    # Start streaming
    tobii_controller.start_streaming()

    # Live video stream capture loop
    try:

        # Assess loop performance
        loop_chrono = MiscFeatures.TimeProbe()
        fps = 0

        while tobii_video_stream.is_alive():

            # Read video stream
            video_ts, video_frame = tobii_video_stream.read()

            # Copy video frame to edit visualisation on it without disrupting aruco detection
            visu_frame = video_frame.copy()

            # Hide frame left and right borders before detection to ignore markers outside focus area
            cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width*args.borders/100), int(video_frame.height)), (0, 0, 0), -1)
            cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - args.borders/100)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1)

            # Process video and data frame
            try:

                # Detect aruco markers into frame
                ar_env.aruco_detector.detect_markers(video_frame.matrix)

                # Estimate markers poses
                ar_env.aruco_detector.estimate_markers_pose()

                # Estimate scene pose from ArUco markers into frame.
                tvec, rmat, _ = ar_scene.estimate_pose(ar_env.aruco_detector.detected_markers)

                # Project AOI scene into frame according estimated pose
                aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV)

                # Draw scene axis
                ar_scene.draw_axis(visu_frame.matrix)

                # Draw scene places
                ar_scene.draw_places(visu_frame.matrix)

                # Draw AOI
                aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255))

                # Draw detected markers
                ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)

            # Catch exceptions raised by estimate_pose and project methods
            except (ArFeatures.PoseEstimationFailed, ArFeatures.SceneProjectionFailed) as e:

                # Draw detected markers
                ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)

                cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1)
                cv.putText(visu_frame.matrix, str(e), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)

            # Assess loop performance
            lap_time, lap_counter, elapsed_time = loop_chrono.lap()

            # Update fps each 10 loops
            if lap_counter >= 10:

                fps = 1e3 * lap_counter / elapsed_time
                loop_chrono.restart()

            # Write stream timing
            cv.rectangle(visu_frame.matrix, (0, 0), (700, 50), (63, 63, 63), -1)
            cv.putText(visu_frame.matrix, f'Video stream time: {int(video_ts*1e-3)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
            cv.putText(visu_frame.matrix, f'Fps: {int(fps)}', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)

            cv.imshow(f'Stream ArUco AOI', visu_frame.matrix)
            
            # Close window using 'Esc' key
            if cv.waitKey(1) == 27:
                break

    # Exit on 'ctrl+C' interruption
    except KeyboardInterrupt:
        pass

    # Stop frame display
    cv.destroyAllWindows()

    # Stop streaming
    tobii_controller.stop_streaming()
    
if __name__ == '__main__':

    main()