#!/usr/bin/env python import threading import time from argaze.TobiiGlassesPro2 import TobiiController class TobiiDataThread(threading.Thread): """Handle data reception in a separate thread.""" def __init__(self, controller: TobiiController.TobiiController): """Initialise thread super class and prepare data reception.""" threading.Thread.__init__(self) self.stop_event = threading.Event() self.read_lock = threading.Lock() self.controller = controller self.fps = self.controller.get_et_freq() self.sleep = 1./self.fps self.__ac_buffer = [] self.__gy_buffer = [] self.__gp_buffer = [] self.__pts_buffer = [] self.__start_ts = 0 def __del__(self): pass def __get_ac(self, data): ac_value = data['mems']['ac']['ac'] ac_ts = data['mems']['ac']['ts'] ac_data = { 'TIMESTAMP': ac_ts, 'TIME': (ac_ts - self.__start_ts) / 1000000., 'X': ac_value[0], 'Y': ac_value[1], 'Z': ac_value[2] } return ac_data def __get_gy(self, data): gy_value = data['mems']['gy']['gy'] gy_ts = data['mems']['gy']['ts'] gy_data = { 'TIMESTAMP': gy_ts, 'TIME': (gy_ts - self.__start_ts) / 1000000., 'X': gy_value[0], 'Y': gy_value[1], 'Z': gy_value[2] } return gy_data def __get_gp(self, data): gp_value = data['gp']['gp'] gp_ts = data['gp']['ts'] gp_data = { 'TIMESTAMP': gp_ts, 'TIME': (gp_ts - self.__start_ts) / 1000000., 'X': gp_value[0], 'Y': gp_value[1] } return gp_data def __get_pts(self, data): pts_value = data['pts']['pts'] pts_ts = data['pts']['ts'] pts_data = { 'TIMESTAMP': pts_ts, 'TIME': (pts_ts - self.__start_ts) / 1000000., 'PTS': pts_value } return pts_data def run(self): """Data reception function.""" while not self.stop_event.isSet(): time.sleep(self.sleep) self.read_lock.acquire() data = self.controller.get_data() # store only timestamped datas if 'pts' in data: pts_data = data['pts'] if 'pts' in pts_data: ac_ts = data['mems']['ac']['ts'] gy_ts = data['mems']['gy']['ts'] gp_ts = data['gp']['ts'] pts_ts = pts_data['ts'] # get start timestamp if self.__start_ts == 0: # ignore -1 timestamp valid_ts = [] for ts in [ac_ts, gy_ts, gp_ts, pts_ts]: if ts > 0: valid_ts.append(ts) self.__start_ts = min(valid_ts) #print(f'Tobii Data Frame: __start_ts = {self.__start_ts}') #print(f'Tobii Data Frame: ac_ts = {ac_ts}, gy_ts = {gy_ts}, gp_ts = {gp_ts}, pts_ts = {pts_ts}') # ignore -1 timestamp and filter repetitions if ac_ts != -1: if len(self.__ac_buffer) == 0: self.__ac_buffer.append(self.__get_ac(data)) elif ac_ts != self.__ac_buffer[-1]['TIMESTAMP']: self.__ac_buffer.append(self.__get_ac(data)) if gy_ts != -1: if len(self.__gy_buffer) == 0: self.__gy_buffer.append(self.__get_gy(data)) elif gy_ts != self.__gy_buffer[-1]['TIMESTAMP']: self.__gy_buffer.append(self.__get_gy(data)) if gp_ts != -1: if len(self.__gp_buffer) == 0: self.__gp_buffer.append(self.__get_gp(data)) elif gp_ts != self.__gp_buffer[-1]['TIMESTAMP']: self.__gp_buffer.append(self.__get_gp(data)) if pts_ts != -1: if len(self.__pts_buffer) == 0: self.__pts_buffer.append(self.__get_pts(data)) elif pts_ts != self.__pts_buffer[-1]['TIMESTAMP']: self.__pts_buffer.append(self.__get_pts(data)) self.read_lock.release() def read_accelerometer_data(self, timestamp: int = -1): """Get accelerometer data at a given timestamp. **Returns:** accelerometer dictionary ``` { 'TIMESTAMP': int, 'TIME': int, 'X': float, 'Y': float, 'Z': float } ``` """ if len(self.__ac_buffer): self.read_lock.acquire() # TODO : find closest timestamp data ac_data = self.__ac_buffer[-1].copy() self.read_lock.release() return ac_data else: return {} def read_accelerometer_buffer(self): """Get accelerometer data buffer. **Returns:** accelerometer dictionary array""" self.read_lock.acquire() ac_buffer = self.__ac_buffer.copy() self.read_lock.release() return ac_buffer def read_gyroscope_data(self, timestamp: int = -1): """Get gyroscope data at a given timestamp. **Returns:** gyroscope dictionary ``` { 'TIMESTAMP': int, 'TIME': int, 'X': float, 'Y': float, 'Z': float } ``` """ if len(self.__gy_buffer): self.read_lock.acquire() # TODO : find closest timestamp data gy_data = self.__gy_buffer[-1].copy() self.read_lock.release() return gy_data else: return {} def read_gyroscope_buffer(self): """Get gyroscope data buffer. **Returns:** gyroscope dictionary array""" self.read_lock.acquire() gy_buffer = self.__gy_buffer.copy() self.read_lock.release() return gy_buffer def read_gaze_data(self, timestamp: int = -1): """Get gaze data at a given timestamp. **Returns:** gaze dictionary ``` { 'TIMESTAMP': int, 'TIME': int, 'X': float, 'Y': float } ``` """ if len(self.__gp_buffer): self.read_lock.acquire() # TODO : find closest timestamp data gp_data = self.__gp_buffer[-1].copy() self.read_lock.release() return gp_data else: return {} def read_gaze_buffer(self): """Get gaze data buffer. **Returns:** gaze dictionary array""" self.read_lock.acquire() gp_buffer = self.__gp_buffer.copy() self.read_lock.release() return gp_buffer def read_pts_data(self, timestamp: int = -1): """Get Presentation Time Stamp (pts) data at a given timestamp. **Returns:** pts dictionary ``` { 'TIMESTAMP': int, 'TIME': int, 'PTS': int } ``` """ if len(self.__pts_buffer): self.read_lock.acquire() # TODO : find closest timestamp data pts_data = self.__pts_buffer[-1].copy() self.read_lock.release() return pts_data else: return {} def read_pts_buffer(self): """Get Presentation Time Stamp (pts) data buffer. **Returns:** pts dictionary array""" self.read_lock.acquire() pts_buffer = self.__pts_buffer.copy() self.read_lock.release() return pts_buffer def stop(self): """Stop data reception definitively.""" self.stop_event.set() threading.Thread.join(self)