1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
|
#!/usr/bin/env python
import threading
import uuid
import time
import copy
from argaze import DataStructures
from argaze.TobiiGlassesPro2 import TobiiNetworkInterface
import cv2 as cv
import av
import numpy
class TobiiVideoFrame(DataStructures.DictObject):
"""Define tobii video frame"""
def __init__(self, matrix, width, height):
super().__init__(type(self).__name__, **{'matrix': matrix, 'width': width, 'height': height})
class TobiiVideoSegment():
"""Handle Tobii Glasses Pro 2 segment video file."""
def __init__(self, segment_video_path):
"""Load segment video from segment directory"""
self.__segment_video_path = segment_video_path
self.__container = av.open(self.__segment_video_path)
self.__stream = self.__container.streams.video[0]
self.__width = int(cv.VideoCapture(self.__segment_video_path).get(cv.CAP_PROP_FRAME_WIDTH))
self.__height = int(cv.VideoCapture(self.__segment_video_path).get(cv.CAP_PROP_FRAME_HEIGHT))
self.__vts_data_buffer = None
def get_path(self):
return self.__segment_video_path
def get_duration(self):
return float(self.__stream.duration * self.__stream.time_base)
def get_frame_number(self):
return self.__stream.frames
def get_width(self):
return self.__width
def get_height(self):
return self.__height
def get_stream(self):
return self.__stream
def frames(self, vts_data_buffer = None):
"""Access to frame iterator and optionnaly setup vide / data timestamp synchronisation through vts data buffer."""
self.__vts_data_buffer = vts_data_buffer
# Enable video / data timestamp synchronisation
if self.__vts_data_buffer != None:
self.__vts_ts, self.__vts = self.__vts_data_buffer.pop_first()
self.__vts_offset = (self.__vts_ts - self.__vts.vts)
return self.__iter__()
def __iter__(self):
# start decoding
self.__container.decode(self.__stream)
return self
def __next__(self):
frame = self.__container.decode(self.__stream).__next__()
video_ts = int(frame.time * 1000000)
# If video / data synchronisation is active
if self.__vts_data_buffer != None:
if video_ts > self.__vts.vts:
if len(self.__vts_data_buffer) > 0:
self.__vts_ts, self.__vts = self.__vts_data_buffer.pop_first()
self.__vts_offset = (self.__vts_ts - self.__vts.vts)
video_ts += self.__vts_offset
# return micro second timestamp and frame data
return video_ts, TobiiVideoFrame(frame.to_ndarray(format='bgr24'), frame.width, frame.height)
class TobiiVideoStream(threading.Thread):
"""Capture Tobii Glasses Pro 2 video camera stream."""
def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface):
"""Initialise video stream reception."""
threading.Thread.__init__(self)
threading.Thread.daemon = True
self.__network = network_interface
self.__video_socket = self.__network.make_socket()
self.__stop_event = threading.Event()
self.__read_lock = threading.Lock()
self.__frame_tuple = None
# prepare keep alive message
self.__keep_alive_msg = "{\"type\": \"live.video.unicast\",\"key\": \""+ str(uuid.uuid4()) +"_video\", \"op\": \"start\"}"
self.__keep_alive_thread = threading.Timer(0, self.__keep_alive)
self.__keep_alive_thread.daemon = True
def __del__(self):
"""Stop data reception before destruction."""
self.close()
def __keep_alive(self):
"""Maintain connection."""
while not self.__stop_event.isSet():
self.__network.send_keep_alive_msg(self.__video_socket, self.__keep_alive_msg)
time.sleep(1)
def open(self):
"""Start data reception."""
self.__keep_alive_thread.start()
threading.Thread.start(self)
def close(self):
"""Stop data reception definitively."""
self.__stop_event.set()
threading.Thread.join(self.__keep_alive_thread)
threading.Thread.join(self)
self.__video_socket.close()
def run(self):
"""Store frame for further reading."""
container = av.open(f'rtsp://{self.__network.get_address()}:8554/live/scene', options={'rtsp_transport': 'tcp'})
stream = container.streams.video[0]
for frame in container.decode(stream):
# quit if the video acquisition thread have been stopped
if self.__stop_event.isSet():
break
# lock frame access
self.__read_lock.acquire()
# store frame time, matrix, width, height and pts into a tuple
self.__frame_tuple = (frame.time, frame.to_ndarray(format='bgr24'), frame.width, frame.height)
# unlock frame access
self.__read_lock.release()
def read(self):
# if the video acquisition thread have been stopped or isn't started
if self.__stop_event.isSet() or self.__frame_tuple == None:
return -1, TobiiVideoFrame(numpy.zeros((1, 1, 3), numpy.uint8), 1, 1)
# lock frame access
self.__read_lock.acquire()
# copy frame tuple
frame_tuple = copy.deepcopy(self.__frame_tuple)
# unlock frame access
self.__read_lock.release()
return int(frame_tuple[0] * 1000000), TobiiVideoFrame(frame_tuple[1], frame_tuple[2], frame_tuple[3])
class TobiiVideoOutput():
"""Export a video file at the same format than a given referent stream."""
# TODO : Make a generic video managment to handle video from any device (not only Tobii)
def __init__(self, output_video_path: str, referent_stream: av.stream.Stream):
"""Create a video file"""
self.__output_video_path = output_video_path
self.__container = av.open(self.__output_video_path, 'w')
self.__stream = self.__container.add_stream(\
referent_stream.codec_context.name, \
width=referent_stream.codec_context.width, \
height=referent_stream.codec_context.height, \
rate=referent_stream.codec_context.framerate, \
gop_size=referent_stream.codec_context.gop_size, \
pix_fmt=referent_stream.codec_context.pix_fmt, \
bit_rate=referent_stream.codec_context.bit_rate)
def get_path(self):
return self.__output_video_path
def write(self, frame):
"""Write a frame into the output video file"""
formated_frame = av.VideoFrame.from_ndarray(frame, format='bgr24')
formated_frame.reformat(format=self.__stream.codec_context.pix_fmt, interpolation=None)
self.__container.mux(self.__stream.encode(formated_frame))
def close(self):
"""End the writing of the video file"""
self.__container.mux(self.__stream.encode())
self.__container.close()
|