aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2023-01-24 16:56:02 +0100
committerThéo de la Hogue2023-01-24 16:56:02 +0100
commit2b035cad4f3af6409c380e2a3a373e9a47e574a8 (patch)
tree6f4b610efe11dabbbe4071994970e601f4cca291
parent768fce136c412e128ec5a527d5ec11c8fdd47d7f (diff)
downloadargaze-2b035cad4f3af6409c380e2a3a373e9a47e574a8.zip
argaze-2b035cad4f3af6409c380e2a3a373e9a47e574a8.tar.gz
argaze-2b035cad4f3af6409c380e2a3a373e9a47e574a8.tar.bz2
argaze-2b035cad4f3af6409c380e2a3a373e9a47e574a8.tar.xz
Removing data stream queue reading mecanism. Adding running property. Allowing multiple callback subscriptions.
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiData.py164
1 files changed, 55 insertions, 109 deletions
diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py
index 4cb4d22..ed570c9 100644
--- a/src/argaze/TobiiGlassesPro2/TobiiData.py
+++ b/src/argaze/TobiiGlassesPro2/TobiiData.py
@@ -304,10 +304,7 @@ class TobiiDataSegment():
return self.__path
class TobiiDataStream():
- """Capture Tobii Glasses Pro 2 data stream in separate thread."""
-
- reading_callback = None
- """Reading callback function to get incoming (data_ts, data_object, data_object_type) back."""
+ """Handle Tobii Glasses Pro 2 data stream in separate thread."""
def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface):
"""Initialise network connection and prepare data reception."""
@@ -318,14 +315,12 @@ class TobiiDataStream():
# Data reception
self.__data_thread = None
- self.__data_queue = queue.Queue(50) # TODO : set queue size according technical reason
self.__json_data_parser = TobiiJsonDataParser()
self.__first_ts = 0
- # Data capture
- self.__data_stream_selector = ''
- self.__data_ts_buffer = None
- self.__data_ts_buffer_size = 0
+ # Data subscription
+ self.reading_callbacks = []
+ self.__subcription_lock = threading.Lock()
# Keep connection alive
self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
@@ -379,8 +374,7 @@ class TobiiDataStream():
self.__data_thread.daemon = True
self.__stop_event = threading.Event()
- self.__read_lock = threading.Lock()
-
+
self.__data_thread.start()
def close(self):
@@ -393,6 +387,12 @@ class TobiiDataStream():
threading.Thread.join(self.__data_thread)
self.__data_thread = None
+ @property
+ def running(self) -> bool:
+ """Is tobii data streaming running?"""
+
+ return self.__data_thread != None
+
def __run(self):
"""Managed received data for sync and async reading case.
- Sync: send data to callback function.
@@ -400,105 +400,49 @@ class TobiiDataStream():
while not self.__stop_event.isSet():
- # Sync reading case
- if self.reading_callback != None:
+ # grab data
+ data = self.__network.grab_data(self.__data_socket)
- # grab data
- data = self.__network.grab_data(self.__data_socket)
+ # decode data
+ json_data = json.loads(data.decode('utf-8'))
- # decode data
- json_data = json.loads(data.decode('utf-8'))
+ # parse json into timestamped data object
+ data_ts, data_object, data_object_type = self.__parse_json_data(json_data)
- # parse json into timestamped data object
- data_ts, data_object, data_object_type = self.__parse_json_data(json_data)
+ # lock data subcription
+ self.__subcription_lock.acquire()
- self.reading_callback(data_ts, data_object, data_object_type)
+ # share incoming data to all subcribers
+ for callback in self.reading_callbacks:
- # Async reading case
- else:
+ callback(data_ts, data_object, data_object_type)
- # wait for queue reading
- if self.__data_queue.full():
+ # unlock data subscription
+ self.__subcription_lock.release()
- # sleep 1 micro second
- time.sleep(0.0001)
- continue
+ def subscribe(self, reading_callback):
+ """Pass reading callback function to get incoming (data_ts, data_object, data_object_type) back."""
- # lock data queue access
- self.__read_lock.acquire()
+ # lock data subcription
+ self.__subcription_lock.acquire()
- # grab data
- data = self.__network.grab_data(self.__data_socket)
+ # append callback
+ self.reading_callbacks.append(reading_callback)
- # decode data
- json_data = json.loads(data.decode('utf-8'))
+ # unlock data subscription
+ self.__subcription_lock.release()
- # write data in queue
- self.__data_queue.put(json_data)
+ def unsubscribe(self, reading_callback):
+ """Remove reading callback function to stop data reception."""
- # unlock data queue access
- self.__read_lock.release()
+ # lock data subcription
+ self.__subcription_lock.acquire()
- def read(self) -> Tuple[int, dict]:
- """Read incoming timestamped data.
+ # remove callback
+ self.reading_callbacks.remove(reading_callback)
- * **Returns:**
- - timestamp
- - dictionary of TimeStampedBuffer for each data.
- """
-
- # no data to read
- if self.__data_queue.empty():
- raise ValueError
-
- # create a dictionary of timestamped data buffers
- ts_data_buffer_dict = {
- 'DirSig': DataStructures.TimeStampedBuffer(),
- 'PresentationTimeStamp': DataStructures.TimeStampedBuffer(),
- 'VideoTimeStamp': DataStructures.TimeStampedBuffer(),
- 'EventSynch': DataStructures.TimeStampedBuffer(),
- 'Event': DataStructures.TimeStampedBuffer(),
- 'Accelerometer': DataStructures.TimeStampedBuffer(),
- 'Gyroscope': DataStructures.TimeStampedBuffer(),
- 'PupilCenter': DataStructures.TimeStampedBuffer(),
- 'PupilDiameter': DataStructures.TimeStampedBuffer(),
- 'GazeDirection': DataStructures.TimeStampedBuffer(),
- 'GazePosition': DataStructures.TimeStampedBuffer(),
- 'GazePosition3D': DataStructures.TimeStampedBuffer(),
- 'MarkerPosition': DataStructures.TimeStampedBuffer()
- }
-
- # if the data acquisition thread is not running
- if self.__stop_event.isSet():
- return ts_data_buffer_dict
-
- # lock data queue access
- self.__read_lock.acquire()
-
- # search for earliest timestamp in the queue
- earliest_ts = 0
-
- # read data queue
- while not self.__data_queue.empty():
-
- # parse json into timestamped data object
- data_ts, data_object, data_object_type = self.__parse_json_data(self.__data_queue.get())
-
- # ignore negative timestamp
- if data_ts < 0:
- break
-
- # keep earliest timestamp
- if data_ts > earliest_ts:
- earliest_ts = data_ts
-
- # store data object into dedicated timestamped buffer
- ts_data_buffer_dict[data_object_type][data_ts] = data_object
-
- # unlock data queue access
- self.__read_lock.release()
-
- return earliest_ts, ts_data_buffer_dict
+ # unlock data subscription
+ self.__subcription_lock.release()
def __parse_json_data(self, json_data):
@@ -521,7 +465,6 @@ class TobiiDataStream():
return data_ts, data_object, data_object_type
def __capture_callback(self, data_ts, data_object, data_object_type):
- """Store incoming data into timestamped buffer"""
if data_object_type == self.__data_stream_selector:
@@ -535,23 +478,25 @@ class TobiiDataStream():
self.__data_ts_buffer[data_ts] = data_object
def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500) -> int:
- """Capture a data stream into buffer.
+ """Start data stream capture.
- * **Returns:**
- - buffer size each 100 ms
+ * **Returns: each 100 ms**
+ - buffer size
"""
# Prepare for data acquisition
- self.__data_stream_selector = data_object_type
self.__data_ts_buffer = data_ts_buffer
+ self.__data_stream_selector = data_object_type
self.__data_ts_buffer_size = sample_number
- # Subscribe to data stream
- memo_reading_callback = self.reading_callback
- self.reading_callback = self.__capture_callback
+ # Subscribe to tobii data stream
+ self.subscribe(self.__capture_callback)
- # Start data reception
- self.open()
+ # Start data stream if needed
+ close_after = False
+ if not self.running:
+ self.open()
+ close_after = True
# Share data acquisition progress
buffer_size = 0
@@ -563,8 +508,9 @@ class TobiiDataStream():
yield buffer_size
- # Stop data reception
- self.close()
+ # Stop data sream if needed
+ if close_after:
+ self.close()
# Unsubscribe to tobii data stream
- self.reading_callback = memo_reading_callback
+ self.unsubscribe(self.__capture_callback)