From 2b035cad4f3af6409c380e2a3a373e9a47e574a8 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Tue, 24 Jan 2023 16:56:02 +0100 Subject: Removing data stream queue reading mecanism. Adding running property. Allowing multiple callback subscriptions. --- src/argaze/TobiiGlassesPro2/TobiiData.py | 164 +++++++++++-------------------- 1 file changed, 55 insertions(+), 109 deletions(-) (limited to 'src') diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py index 4cb4d22..ed570c9 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiData.py +++ b/src/argaze/TobiiGlassesPro2/TobiiData.py @@ -304,10 +304,7 @@ class TobiiDataSegment(): return self.__path class TobiiDataStream(): - """Capture Tobii Glasses Pro 2 data stream in separate thread.""" - - reading_callback = None - """Reading callback function to get incoming (data_ts, data_object, data_object_type) back.""" + """Handle Tobii Glasses Pro 2 data stream in separate thread.""" def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface): """Initialise network connection and prepare data reception.""" @@ -318,14 +315,12 @@ class TobiiDataStream(): # Data reception self.__data_thread = None - self.__data_queue = queue.Queue(50) # TODO : set queue size according technical reason self.__json_data_parser = TobiiJsonDataParser() self.__first_ts = 0 - # Data capture - self.__data_stream_selector = '' - self.__data_ts_buffer = None - self.__data_ts_buffer_size = 0 + # Data subscription + self.reading_callbacks = [] + self.__subcription_lock = threading.Lock() # Keep connection alive self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}" @@ -379,8 +374,7 @@ class TobiiDataStream(): self.__data_thread.daemon = True self.__stop_event = threading.Event() - self.__read_lock = threading.Lock() - + self.__data_thread.start() def close(self): @@ -393,6 +387,12 @@ class TobiiDataStream(): threading.Thread.join(self.__data_thread) self.__data_thread = None + @property + def running(self) -> bool: + """Is tobii data streaming running?""" + + return self.__data_thread != None + def __run(self): """Managed received data for sync and async reading case. - Sync: send data to callback function. @@ -400,105 +400,49 @@ class TobiiDataStream(): while not self.__stop_event.isSet(): - # Sync reading case - if self.reading_callback != None: + # grab data + data = self.__network.grab_data(self.__data_socket) - # grab data - data = self.__network.grab_data(self.__data_socket) + # decode data + json_data = json.loads(data.decode('utf-8')) - # decode data - json_data = json.loads(data.decode('utf-8')) + # parse json into timestamped data object + data_ts, data_object, data_object_type = self.__parse_json_data(json_data) - # parse json into timestamped data object - data_ts, data_object, data_object_type = self.__parse_json_data(json_data) + # lock data subcription + self.__subcription_lock.acquire() - self.reading_callback(data_ts, data_object, data_object_type) + # share incoming data to all subcribers + for callback in self.reading_callbacks: - # Async reading case - else: + callback(data_ts, data_object, data_object_type) - # wait for queue reading - if self.__data_queue.full(): + # unlock data subscription + self.__subcription_lock.release() - # sleep 1 micro second - time.sleep(0.0001) - continue + def subscribe(self, reading_callback): + """Pass reading callback function to get incoming (data_ts, data_object, data_object_type) back.""" - # lock data queue access - self.__read_lock.acquire() + # lock data subcription + self.__subcription_lock.acquire() - # grab data - data = self.__network.grab_data(self.__data_socket) + # append callback + self.reading_callbacks.append(reading_callback) - # decode data - json_data = json.loads(data.decode('utf-8')) + # unlock data subscription + self.__subcription_lock.release() - # write data in queue - self.__data_queue.put(json_data) + def unsubscribe(self, reading_callback): + """Remove reading callback function to stop data reception.""" - # unlock data queue access - self.__read_lock.release() + # lock data subcription + self.__subcription_lock.acquire() - def read(self) -> Tuple[int, dict]: - """Read incoming timestamped data. + # remove callback + self.reading_callbacks.remove(reading_callback) - * **Returns:** - - timestamp - - dictionary of TimeStampedBuffer for each data. - """ - - # no data to read - if self.__data_queue.empty(): - raise ValueError - - # create a dictionary of timestamped data buffers - ts_data_buffer_dict = { - 'DirSig': DataStructures.TimeStampedBuffer(), - 'PresentationTimeStamp': DataStructures.TimeStampedBuffer(), - 'VideoTimeStamp': DataStructures.TimeStampedBuffer(), - 'EventSynch': DataStructures.TimeStampedBuffer(), - 'Event': DataStructures.TimeStampedBuffer(), - 'Accelerometer': DataStructures.TimeStampedBuffer(), - 'Gyroscope': DataStructures.TimeStampedBuffer(), - 'PupilCenter': DataStructures.TimeStampedBuffer(), - 'PupilDiameter': DataStructures.TimeStampedBuffer(), - 'GazeDirection': DataStructures.TimeStampedBuffer(), - 'GazePosition': DataStructures.TimeStampedBuffer(), - 'GazePosition3D': DataStructures.TimeStampedBuffer(), - 'MarkerPosition': DataStructures.TimeStampedBuffer() - } - - # if the data acquisition thread is not running - if self.__stop_event.isSet(): - return ts_data_buffer_dict - - # lock data queue access - self.__read_lock.acquire() - - # search for earliest timestamp in the queue - earliest_ts = 0 - - # read data queue - while not self.__data_queue.empty(): - - # parse json into timestamped data object - data_ts, data_object, data_object_type = self.__parse_json_data(self.__data_queue.get()) - - # ignore negative timestamp - if data_ts < 0: - break - - # keep earliest timestamp - if data_ts > earliest_ts: - earliest_ts = data_ts - - # store data object into dedicated timestamped buffer - ts_data_buffer_dict[data_object_type][data_ts] = data_object - - # unlock data queue access - self.__read_lock.release() - - return earliest_ts, ts_data_buffer_dict + # unlock data subscription + self.__subcription_lock.release() def __parse_json_data(self, json_data): @@ -521,7 +465,6 @@ class TobiiDataStream(): return data_ts, data_object, data_object_type def __capture_callback(self, data_ts, data_object, data_object_type): - """Store incoming data into timestamped buffer""" if data_object_type == self.__data_stream_selector: @@ -535,23 +478,25 @@ class TobiiDataStream(): self.__data_ts_buffer[data_ts] = data_object def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500) -> int: - """Capture a data stream into buffer. + """Start data stream capture. - * **Returns:** - - buffer size each 100 ms + * **Returns: each 100 ms** + - buffer size """ # Prepare for data acquisition - self.__data_stream_selector = data_object_type self.__data_ts_buffer = data_ts_buffer + self.__data_stream_selector = data_object_type self.__data_ts_buffer_size = sample_number - # Subscribe to data stream - memo_reading_callback = self.reading_callback - self.reading_callback = self.__capture_callback + # Subscribe to tobii data stream + self.subscribe(self.__capture_callback) - # Start data reception - self.open() + # Start data stream if needed + close_after = False + if not self.running: + self.open() + close_after = True # Share data acquisition progress buffer_size = 0 @@ -563,8 +508,9 @@ class TobiiDataStream(): yield buffer_size - # Stop data reception - self.close() + # Stop data sream if needed + if close_after: + self.close() # Unsubscribe to tobii data stream - self.reading_callback = memo_reading_callback + self.unsubscribe(self.__capture_callback) -- cgit v1.1