From 182f646402573e86343791b3f34778fd9f8a29e8 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 3 Nov 2022 00:50:20 +0100 Subject: Clearing data_socket when no thread is reading it. --- src/argaze/TobiiGlassesPro2/TobiiData.py | 75 ++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py index d8ddc60..71ce234 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiData.py +++ b/src/argaze/TobiiGlassesPro2/TobiiData.py @@ -300,22 +300,23 @@ class TobiiDataStream(): self.__network = network_interface self.__data_socket = self.__network.make_socket() - # Keep connection alive - self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}" - self.__keep_alive_thread = threading.Thread(target = self.__keep_alive) - self.__keep_alive_thread.daemon = True - self.__keep_alive_thread.start() - # Data reception self.__data_thread = None self.__data_queue = queue.Queue(50) # TODO : set queue size according technical reason self.__json_data_parser = TobiiJsonDataParser() + self.__first_ts = 0 # Data capture self.__data_stream_selector = '' - self.__data_ts_buffer = DataStructures.TimeStampedBuffer() + self.__data_ts_buffer = None self.__data_ts_buffer_size = 0 + # Keep connection alive + self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}" + self.__keep_alive_thread = threading.Thread(target = self.__keep_alive) + self.__keep_alive_thread.daemon = True + self.__keep_alive_thread.start() + def __del__(self): """Stop data reception and network connection before destruction.""" @@ -329,27 +330,41 @@ class TobiiDataStream(): self.__data_socket.close() def __keep_alive(self): - """Maintain network connection.""" + """Maintain network connection and clear socket when is not read.""" while True: - self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg) + # if no thread is reading data socket + if self.__data_thread == None: + + self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg) + + clear_count = 0 + while clear_count < 1000 and self.__data_thread == None: + + # Clear socket each milli second + time.sleep(0.001) + self.__network.grab_data(self.__data_socket) + clear_count += 1 + + else: - time.sleep(1) + self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg) + time.sleep(1) def open(self): """Start data reception.""" if self.__data_thread == None: + self.__first_ts = 0 + self.__data_thread = threading.Thread(target = self.__run) self.__data_thread.daemon = True self.__stop_event = threading.Event() self.__read_lock = threading.Lock() - self.__first_ts = 0 - self.__data_thread.start() def close(self): @@ -471,34 +486,39 @@ class TobiiDataStream(): # convert timestamp data_ts = json_data.pop('ts') + # convert json data into data object + data_object = self.__json_data_parser.parse_data(status, json_data) + data_object_type = type(data_object).__name__ + # keep first timestamp to offset all timestamps if self.__first_ts == 0: self.__first_ts = data_ts - + data_ts -= self.__first_ts - # convert json data into data object - data_object = self.__json_data_parser.parse_data(status, json_data) - data_object_type = type(data_object).__name__ - return data_ts, data_object, data_object_type def __capture_callback(self, data_ts, data_object, data_object_type): """Store incoming data into timestamped buffer""" if data_object_type == self.__data_stream_selector: - + if len(self.__data_ts_buffer.keys()) < self.__data_ts_buffer_size: - self.__data_ts_buffer[data_ts / 1e3] = data_object + # update first timestamp if next timestamps are negative + if data_ts < 0: + self.__first_ts += data_ts + data_ts = 0 - def capture(self, data_object_type = '', buffer_size = 500): + self.__data_ts_buffer[data_ts] = data_object + + def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500): """Capture a data stream into buffer.""" # Prepare for data acquisition self.__data_stream_selector = data_object_type - self.__data_ts_buffer = DataStructures.TimeStampedBuffer() - self.__data_ts_buffer_size = buffer_size + self.__data_ts_buffer = data_ts_buffer + self.__data_ts_buffer_size = sample_number # Subscribe to data stream memo_reading_callback = self.reading_callback @@ -508,20 +528,17 @@ class TobiiDataStream(): self.open() # Share data acquisition progress - data_size = 0 - while data_size < buffer_size: + buffer_size = 0 + while buffer_size < sample_number: time.sleep(0.1) - data_size = len(self.__data_ts_buffer.keys()) + buffer_size = len(self.__data_ts_buffer.keys()) - yield data_size, self.__data_ts_buffer + yield buffer_size # Stop data reception self.close() # Unsubscribe to tobii data stream self.reading_callback = memo_reading_callback - - # Return captured data - return data_size, self.__data_ts_buffer -- cgit v1.1