#!/usr/bin/env python import threading import uuid import json import time import queue from argaze import DataStructures from argaze.TobiiGlassesPro2 import TobiiNetworkInterface class TobiiSegmentData(DataStructures.DictObject): """Handle Tobii Glasses Pro 2 segment data file.""" def __init__(self, segment_data_path): """Load segment data from segment directory then parse and register each recorded dataflow as a TimeStampedBuffer member of the TobiiSegmentData instance.""" self.__segment_data_path = segment_data_path self.__ts_start = 0 ts_data_buffer_dict = {} # define a decoder function def decode(json_item): # accept only valid data (e.g. with status value equal to 0) if json_item.pop('s', -1) == 0: # convert timestamp ts = json_item.pop('ts') # keep first timestamp to offset all timestamps if self.__ts_start == 0: self.__ts_start = ts ts -= self.__ts_start # ignore negative timestamp if ts < 0: return # convert json data into data object data_object_type = '_'.join(json_item.keys()) data_object = DataStructures.DictObject(data_object_type, **json_item) # append a dedicated timestamped buffer for each data object type if data_object.get_type() not in ts_data_buffer_dict.keys(): ts_data_buffer_dict[data_object.get_type()] = DataStructures.TimeStampedBuffer() # store data object into the timestamped buffer dedicated to its type ts_data_buffer_dict[data_object.get_type()][ts] = data_object # start loading with gzip.open(self.__segment_data_path) as f: for item in f: json.loads(item.decode('utf-8'), object_hook=decode) super().__init__(type(self).__name__, **ts_data_buffer_dict) def keys(self): """Get all registered data keys""" return list(self.__dict__.keys())[2:-1] def get_path(self): return self.__segment_data_path class TobiiDataStream(threading.Thread): """Capture Tobii Glasses Pro 2 data stream in separate thread.""" def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface): """Initialise thread super class as a deamon dedicated to data reception.""" threading.Thread.__init__(self) threading.Thread.daemon = True self.__network = network_interface self.__data_socket = self.__network.make_socket() self.__data_queue = queue.Queue() self.__stop_event = threading.Event() self.__read_lock = threading.Lock() self.__sleep = 1. / self.__network.get_et_freq() # prepare keep alive message self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}" self.__keep_alive_thread = threading.Thread(target = self.__keep_alive) self.__keep_alive_thread.daemon = True def __del__(self): """Stop data reception before destruction.""" self.close() def __keep_alive(self): """Maintain connection.""" while not self.__stop_event.isSet(): self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg) time.sleep(1) def open(self): """Start data reception.""" self.__keep_alive_thread.start() threading.Thread.start(self) def close(self): """Stop data reception definitively.""" self.__stop_event.set() threading.Thread.join(self.__keep_alive_thread) threading.Thread.join(self) self.__data_socket.close() def run(self): """Store received data into a queue for further reading.""" while not self.__stop_event.isSet(): # wait time.sleep(self.__sleep) # lock data queue access self.__read_lock.acquire() # write in data queue data = self.__network.grab_data(self.__data_socket) self.__data_queue.put(data) # unlock data queue access self.__read_lock.release() def read(self): # create a dictionary of timestamped data buffers ts_data_buffer_dict = DataStructures.DictObject('TobiiDataStream', **{}) # if the data acquisition thread is not running if self.__stop_event.isSet(): return ts_data_buffer_dict # lock data queue access self.__read_lock.acquire() # read data queue while not self.__data_queue.empty(): data = self.__data_queue.get() json_item = json.loads(data.decode('utf-8')) # accept only valid data (e.g. with status value equal to 0) if json_item.pop('s', -1) == 0: # convert timestamp ts = json_item.pop('ts') #print(f'json_item at {ts}: {json_item}') ''' # keep first timestamp to offset all timestamps if self.__ts_start == 0: self.__ts_start = ts ts -= self.__ts_start # ignore negative timestamp if ts < 0: break ''' # convert json data into data object data_object_type = '_'.join(json_item.keys()) data_object = DataStructures.DictObject(data_object_type, **json_item) # append a dedicated timestamped buffer for each data object type if data_object.get_type() not in ts_data_buffer_dict.keys(): ts_data_buffer_dict.append(data_object.get_type(), DataStructures.TimeStampedBuffer()) # store data object into the timestamped buffer dedicated to its type ts_data_buffer_dict[data_object.get_type()][ts] = data_object # unlock data queue access self.__read_lock.release() return ts_data_buffer_dict