From 9ef5dfa8a1b90e81b5893b963a07fa048cdb3478 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 25 Jan 2023 10:36:02 +0100 Subject: Adding asynchronous data reading feature. --- src/argaze/TobiiGlassesPro2/TobiiData.py | 55 ++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py index ed570c9..0e28054 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiData.py +++ b/src/argaze/TobiiGlassesPro2/TobiiData.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from typing import Tuple +from typing import Tuple, TypeVar from dataclasses import dataclass import threading import uuid @@ -16,6 +16,9 @@ from argaze.utils import MiscFeatures import numpy +TobiiDataObjectType = TypeVar('TobiiDataObjectType', bound="TobiiDataObjectType") +# Type definition for type annotation convenience + @dataclass class DirSig(): """Define dir sig data (dir sig).""" @@ -318,7 +321,7 @@ class TobiiDataStream(): self.__json_data_parser = TobiiJsonDataParser() self.__first_ts = 0 - # Data subscription + # Sync reading data subscription self.reading_callbacks = [] self.__subcription_lock = threading.Lock() @@ -412,7 +415,7 @@ class TobiiDataStream(): # lock data subcription self.__subcription_lock.acquire() - # share incoming data to all subcribers + # share incoming data to all subscribers for callback in self.reading_callbacks: callback(data_ts, data_object, data_object_type) @@ -514,3 +517,49 @@ class TobiiDataStream(): # Unsubscribe to tobii data stream self.unsubscribe(self.__capture_callback) + + def __buffer_callback(self, data_ts, data_object, data_object_type): + + # Lock data queue access + self.__queue_lock.acquire() + + # Put data into the queue + self.__data_queue.put((data_ts, data_object, data_object_type)) + + # Unlock data queue access + self.__queue_lock.release() + + def read(self) -> Tuple[int, TobiiDataObjectType, str]: + """Iterate over incoming data buffer asynchronously.""" + + # Setup data buffering + self.__data_queue = queue.Queue() + self.__queue_lock = threading.Lock() + + # Subscribe to tobii data stream + self.subscribe(self.__buffer_callback) + + return self.__iter__() + + def __iter__(self): + + return self + + def __next__(self): + + # Wait for data + while self.__data_queue.empty(): + + time.sleep(0.0001) + continue + + # Lock data queue access + self.__queue_lock.acquire() + + # Get data from the queue + data_ts, data_object, data_object_type = self.__data_queue.get() + + # Unlock data queue access + self.__queue_lock.release() + + return data_ts, data_object, data_object_type -- cgit v1.1