From be8ec51b2d91b6156beeb77b528cb47ea039dfb3 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Tue, 11 Oct 2022 15:23:28 +0200 Subject: allowing sync and async data streaming. --- src/argaze/TobiiGlassesPro2/TobiiData.py | 114 ++++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 32 deletions(-) (limited to 'src') diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py index 975afce..903dcc7 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiData.py +++ b/src/argaze/TobiiGlassesPro2/TobiiData.py @@ -11,6 +11,8 @@ import queue from argaze import DataStructures from argaze.TobiiGlassesPro2 import TobiiNetworkInterface +from argaze.utils import MiscFeatures + @dataclass class DirSig(): """Define dir sig data (dir sig).""" @@ -287,6 +289,8 @@ class TobiiDataSegment(): class TobiiDataStream(threading.Thread): """Capture Tobii Glasses Pro 2 data stream in separate thread.""" + reading_callback = None + def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface): """Initialise thread super class as a deamon dedicated to data reception.""" @@ -296,7 +300,7 @@ class TobiiDataStream(threading.Thread): self.__network = network_interface self.__data_socket = self.__network.make_socket() - self.__data_queue = queue.Queue() + self.__data_queue = queue.Queue(50) # TODO : set queue size according technical reason self.__stop_event = threading.Event() self.__read_lock = threading.Lock() @@ -306,6 +310,8 @@ class TobiiDataStream(threading.Thread): self.__keep_alive_thread = threading.Thread(target = self.__keep_alive) self.__keep_alive_thread.daemon = True + self.__json_data_parser = TobiiJsonDataParser() + def __del__(self): """Stop data reception before destruction.""" @@ -338,24 +344,56 @@ class TobiiDataStream(threading.Thread): self.__data_socket.close() def run(self): - """Store received data into a queue for further reading.""" + """Managed received data for sync and async reading case. + - Sync: send data to callback function. + - Async: store data into a locked queue for further reading.""" while not self.__stop_event.isSet(): - # lock data queue access - self.__read_lock.acquire() + # Sync reading case + if self.reading_callback != None: + + # grab data + data = self.__network.grab_data(self.__data_socket) + + # decode data + json_data = json.loads(data.decode('utf-8')) + + # parse json into timestamped data object + data_ts, data_object, data_object_type = self.__parse_json_data(json_data) + + self.reading_callback(data_ts, data_object, data_object_type) + + # Async reading case + else: - # write in data queue - data = self.__network.grab_data(self.__data_socket) - json_data = json.loads(data.decode('utf-8')) - self.__data_queue.put(json_data) + # wait for queue reading + if self.__data_queue.full(): - # unlock data queue access - self.__read_lock.release() + # sleep 1 micro second + time.sleep(0.0001) + continue + + # lock data queue access + self.__read_lock.acquire() + + # grab data + data = self.__network.grab_data(self.__data_socket) + + # decode data + json_data = json.loads(data.decode('utf-8')) + + # write data in queue + self.__data_queue.put(json_data) + + # unlock data queue access + self.__read_lock.release() def read(self): - json_data_parser = TobiiJsonDataParser() + # no data to read + if self.__data_queue.empty(): + raise ValueError # create a dictionary of timestamped data buffers ts_data_buffer_dict = { @@ -377,39 +415,51 @@ class TobiiDataStream(threading.Thread): # if the data acquisition thread is not running if self.__stop_event.isSet(): return ts_data_buffer_dict - + # lock data queue access self.__read_lock.acquire() + # search for earliest timestamp in the queue + earliest_ts = 0 + # read data queue while not self.__data_queue.empty(): - json_data = self.__data_queue.get() - - # parse data status - status = json_data.pop('s', -1) - - # convert timestamp - ts = json_data.pop('ts') - - # keep first timestamp to offset all timestamps - if self.__first_ts == 0: - self.__first_ts = ts - - ts -= self.__first_ts + # parse json into timestamped data object + data_ts, data_object, data_object_type = self.__parse_json_data(self.__data_queue.get()) # ignore negative timestamp - if ts < 0: + if data_ts < 0: break - # convert json data into data object - data_object = json_data_parser.parse_data( status, json_data) - data_object_type = type(data_object).__name__ + # keep earliest timestamp + if data_ts > earliest_ts: + earliest_ts = data_ts # store data object into dedicated timestamped buffer - ts_data_buffer_dict[data_object_type][ts] = data_object - + ts_data_buffer_dict[data_object_type][data_ts] = data_object + # unlock data queue access self.__read_lock.release() - return ts, ts_data_buffer_dict + return earliest_ts, ts_data_buffer_dict + + def __parse_json_data(self, json_data): + + # parse data status + status = json_data.pop('s', -1) + + # convert timestamp + data_ts = json_data.pop('ts') + + # keep first timestamp to offset all timestamps + if self.__first_ts == 0: + self.__first_ts = data_ts + + data_ts -= self.__first_ts + + # convert json data into data object + data_object = self.__json_data_parser.parse_data(status, json_data) + data_object_type = type(data_object).__name__ + + return data_ts, data_object, data_object_type -- cgit v1.1