aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2022-10-11 15:23:28 +0200
committerThéo de la Hogue2022-10-11 15:23:28 +0200
commitbe8ec51b2d91b6156beeb77b528cb47ea039dfb3 (patch)
tree655054258828754736e237df98f208ed385c49f3
parent8634cbf28592564599e22b2468bd166c7e8a9419 (diff)
downloadargaze-be8ec51b2d91b6156beeb77b528cb47ea039dfb3.zip
argaze-be8ec51b2d91b6156beeb77b528cb47ea039dfb3.tar.gz
argaze-be8ec51b2d91b6156beeb77b528cb47ea039dfb3.tar.bz2
argaze-be8ec51b2d91b6156beeb77b528cb47ea039dfb3.tar.xz
allowing sync and async data streaming.
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiData.py114
1 files changed, 82 insertions, 32 deletions
diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py
index 975afce..903dcc7 100644
--- a/src/argaze/TobiiGlassesPro2/TobiiData.py
+++ b/src/argaze/TobiiGlassesPro2/TobiiData.py
@@ -11,6 +11,8 @@ import queue
from argaze import DataStructures
from argaze.TobiiGlassesPro2 import TobiiNetworkInterface
+from argaze.utils import MiscFeatures
+
@dataclass
class DirSig():
"""Define dir sig data (dir sig)."""
@@ -287,6 +289,8 @@ class TobiiDataSegment():
class TobiiDataStream(threading.Thread):
"""Capture Tobii Glasses Pro 2 data stream in separate thread."""
+ reading_callback = None
+
def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface):
"""Initialise thread super class as a deamon dedicated to data reception."""
@@ -296,7 +300,7 @@ class TobiiDataStream(threading.Thread):
self.__network = network_interface
self.__data_socket = self.__network.make_socket()
- self.__data_queue = queue.Queue()
+ self.__data_queue = queue.Queue(50) # TODO : set queue size according technical reason
self.__stop_event = threading.Event()
self.__read_lock = threading.Lock()
@@ -306,6 +310,8 @@ class TobiiDataStream(threading.Thread):
self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
self.__keep_alive_thread.daemon = True
+ self.__json_data_parser = TobiiJsonDataParser()
+
def __del__(self):
"""Stop data reception before destruction."""
@@ -338,24 +344,56 @@ class TobiiDataStream(threading.Thread):
self.__data_socket.close()
def run(self):
- """Store received data into a queue for further reading."""
+ """Managed received data for sync and async reading case.
+ - Sync: send data to callback function.
+ - Async: store data into a locked queue for further reading."""
while not self.__stop_event.isSet():
- # lock data queue access
- self.__read_lock.acquire()
+ # Sync reading case
+ if self.reading_callback != None:
+
+ # grab data
+ data = self.__network.grab_data(self.__data_socket)
+
+ # decode data
+ json_data = json.loads(data.decode('utf-8'))
+
+ # parse json into timestamped data object
+ data_ts, data_object, data_object_type = self.__parse_json_data(json_data)
+
+ self.reading_callback(data_ts, data_object, data_object_type)
+
+ # Async reading case
+ else:
- # write in data queue
- data = self.__network.grab_data(self.__data_socket)
- json_data = json.loads(data.decode('utf-8'))
- self.__data_queue.put(json_data)
+ # wait for queue reading
+ if self.__data_queue.full():
- # unlock data queue access
- self.__read_lock.release()
+ # sleep 1 micro second
+ time.sleep(0.0001)
+ continue
+
+ # lock data queue access
+ self.__read_lock.acquire()
+
+ # grab data
+ data = self.__network.grab_data(self.__data_socket)
+
+ # decode data
+ json_data = json.loads(data.decode('utf-8'))
+
+ # write data in queue
+ self.__data_queue.put(json_data)
+
+ # unlock data queue access
+ self.__read_lock.release()
def read(self):
- json_data_parser = TobiiJsonDataParser()
+ # no data to read
+ if self.__data_queue.empty():
+ raise ValueError
# create a dictionary of timestamped data buffers
ts_data_buffer_dict = {
@@ -377,39 +415,51 @@ class TobiiDataStream(threading.Thread):
# if the data acquisition thread is not running
if self.__stop_event.isSet():
return ts_data_buffer_dict
-
+
# lock data queue access
self.__read_lock.acquire()
+ # search for earliest timestamp in the queue
+ earliest_ts = 0
+
# read data queue
while not self.__data_queue.empty():
- json_data = self.__data_queue.get()
-
- # parse data status
- status = json_data.pop('s', -1)
-
- # convert timestamp
- ts = json_data.pop('ts')
-
- # keep first timestamp to offset all timestamps
- if self.__first_ts == 0:
- self.__first_ts = ts
-
- ts -= self.__first_ts
+ # parse json into timestamped data object
+ data_ts, data_object, data_object_type = self.__parse_json_data(self.__data_queue.get())
# ignore negative timestamp
- if ts < 0:
+ if data_ts < 0:
break
- # convert json data into data object
- data_object = json_data_parser.parse_data( status, json_data)
- data_object_type = type(data_object).__name__
+ # keep earliest timestamp
+ if data_ts > earliest_ts:
+ earliest_ts = data_ts
# store data object into dedicated timestamped buffer
- ts_data_buffer_dict[data_object_type][ts] = data_object
-
+ ts_data_buffer_dict[data_object_type][data_ts] = data_object
+
# unlock data queue access
self.__read_lock.release()
- return ts, ts_data_buffer_dict
+ return earliest_ts, ts_data_buffer_dict
+
+ def __parse_json_data(self, json_data):
+
+ # parse data status
+ status = json_data.pop('s', -1)
+
+ # convert timestamp
+ data_ts = json_data.pop('ts')
+
+ # keep first timestamp to offset all timestamps
+ if self.__first_ts == 0:
+ self.__first_ts = data_ts
+
+ data_ts -= self.__first_ts
+
+ # convert json data into data object
+ data_object = self.__json_data_parser.parse_data(status, json_data)
+ data_object_type = type(data_object).__name__
+
+ return data_ts, data_object, data_object_type