From 145219241449badc2f0da6449b717255e2dd70b2 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Fri, 20 May 2022 11:09:16 +0200 Subject: Refactoring tobii data parsing. --- src/argaze/TobiiGlassesPro2/TobiiData.py | 236 ++++++++++++++++++++++++++++--- 1 file changed, 215 insertions(+), 21 deletions(-) diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py index 1b6dacf..3a1c27b 100644 --- a/src/argaze/TobiiGlassesPro2/TobiiData.py +++ b/src/argaze/TobiiGlassesPro2/TobiiData.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from dataclasses import dataclass import threading import uuid import gzip @@ -10,7 +11,177 @@ import queue from argaze import DataStructures from argaze.TobiiGlassesPro2 import TobiiNetworkInterface -class TobiiDataSegment(DataStructures.DictObject): +@dataclass +class DirSig(): + """Define dir sig data (dir sig).""" + + dir: int # meaning ? + sig: int # meaning ? + +@dataclass +class PTS(): + """Define pts data (pts).""" + + value: int + +@dataclass +class EventSynch(): + """Define event synch data (evts).""" + + value: int # meaning ? + +@dataclass +class Event(): + """Define event data (ets type tag).""" + + ets: int # meaning ? + type: str + tag: str # dict ? + +@dataclass +class Accelerometer(): + """Define accelerometer data (ac).""" + + value: tuple((float, float, float)) + +@dataclass +class Gyroscope(): + """Define gyroscope data (gy).""" + + value: tuple((float, float, float)) + +@dataclass +class PupilCenter(): + """Define pupil center data (gidx pc eye).""" + + index: int + value: tuple((float, float, float)) + eye: str # 'right' or 'left' + +@dataclass +class PupilDiameter(): + """Define pupil diameter data (gidx pd eye).""" + + index: int + value: float + eye: str # 'right' or 'left' + +@dataclass +class GazeDirection(): + """Define gaze direction data (gidx gd eye).""" + + index: int + value: tuple((float, float, float)) + eye: str # 'right' or 'left' + +@dataclass +class GazePosition(): + """Define gaze position data (gidx l gp).""" + + index: int + l: str # ? + value: tuple((float, float)) + +@dataclass +class GazePosition3D(): + """Define gaze position 3D data (gidx gp3).""" + + index: int + value: tuple((float, float)) + +@dataclass +class MarkerPosition(): + """Define marker data (marker3d marker2d).""" + + value_3d: tuple((float, float, float)) + value_2d: tuple((float, float)) + +class TobiiJsonDataParser(): + + def parse_dir_sig(self, json_data): + + return DirSig(json_data['dir'], json_data['sig']) + + def parse_pts(self, json_data): + + return PTS(json_data['pts']) + + def parse_event_synch(self, json_data): + + return EventSynch(json_data['evts']) + + def parse_event(self, json_data): + + return Event(json_data['ets'], json_data['type'], json_data['tag']) + + def parse_accelerometer(self, json_data): + + return Accelerometer(json_data['ac']) + + def parse_gyroscope(self, json_data): + + return Gyroscope(json_data['gy']) + + def parse_pupil_center(self, gaze_index, json_data): + + return PupilCenter(gaze_index, json_data['pc'], json_data['eye']) + + def parse_pupil_diameter(self, gaze_index, json_data): + + return PupilDiameter(gaze_index, json_data['pd'], json_data['eye']) + + def parse_gaze_direction(self, gaze_index, json_data): + + return GazeDirection(gaze_index, json_data['gd'], json_data['eye']) + + def parse_gaze_position(self, gaze_index, json_data): + + return GazePosition(gaze_index, json_data['l'], json_data['gp']) + + def parse_gaze_position_3d(self, gaze_index, json_data): + + return GazePosition3D(gaze_index, json_data['gp3']) + + def parse_marker_position(self, json_data): + + return MarkerPosition(json_data['marker3d'], json_data['marker2d']) + + def parse_pupil_or_gaze(self, json_data): + + gaze_index = json_data.pop('gidx') + + # parse pupil or gaze data depending second json key + second_key = next(iter(json_data)) + + parse_map = { + 'pc': self.parse_pupil_center, + 'pd': self.parse_pupil_diameter, + 'gd': self.parse_gaze_direction, + 'l': self.parse_gaze_position, + 'gp3': self.parse_gaze_position_3d + } + + return parse_map[second_key](gaze_index, json_data) + + def parse_data(self, json_data): + + # parse data depending first json key + first_key = next(iter(json_data)) + + parse_map = { + 'dir': self.parse_dir_sig, + 'pts': self.parse_pts, + 'evts': self.parse_event_synch, + 'ets': self.parse_event, + 'ac': self.parse_accelerometer, + 'gy': self.parse_gyroscope, + 'gidx': self.parse_pupil_or_gaze, + 'marker3d': self.parse_marker_position + } + + return parse_map[first_key](json_data) + +class TobiiDataSegment(): """Handle Tobii Glasses Pro 2 segment data file.""" def __init__(self, segment_data_path, start_timestamp = 0, end_timestamp = None): @@ -21,7 +192,22 @@ class TobiiDataSegment(DataStructures.DictObject): self.__vts_offset = 0 self.__vts_ts = -1 - ts_data_buffer_dict = {} + self.__json_data_parser = TobiiJsonDataParser() + + self.__ts_data_buffer_dict = { + 'DirSig': DataStructures.TimeStampedBuffer(), + 'PTS': DataStructures.TimeStampedBuffer(), + 'EventSynch': DataStructures.TimeStampedBuffer(), + 'Event': DataStructures.TimeStampedBuffer(), + 'Accelerometer': DataStructures.TimeStampedBuffer(), + 'Gyroscope': DataStructures.TimeStampedBuffer(), + 'PupilCenter': DataStructures.TimeStampedBuffer(), + 'PupilDiameter': DataStructures.TimeStampedBuffer(), + 'GazeDirection': DataStructures.TimeStampedBuffer(), + 'GazePosition': DataStructures.TimeStampedBuffer(), + 'GazePosition3D': DataStructures.TimeStampedBuffer(), + 'MarkerPosition': DataStructures.TimeStampedBuffer() + } # define a decoder function def decode(json_data): @@ -57,15 +243,11 @@ class TobiiDataSegment(DataStructures.DictObject): return False # stop # convert json data into data object - data_object_type = '_'.join(json_data.keys()) - data_object = DataStructures.DictObject(data_object_type, **json_data) + data_object = self.__json_data_parser.parse_data(json_data) + data_object_type = type(data_object).__name__ - # append a dedicated timestamped buffer for each data object type - if data_object.get_type() not in ts_data_buffer_dict.keys(): - ts_data_buffer_dict[data_object.get_type()] = DataStructures.TimeStampedBuffer() - - # store data object into the timestamped buffer dedicated to its type - ts_data_buffer_dict[data_object.get_type()][ts] = data_object + # store data object into dedicated timestamped buffer + self.__ts_data_buffer_dict[data_object_type][ts] = data_object return True # continue @@ -76,11 +258,12 @@ class TobiiDataSegment(DataStructures.DictObject): if not json.loads(item.decode('utf-8'), object_hook=decode): break - super().__init__(type(self).__name__, **ts_data_buffer_dict) + def __getitem__(self, key): + return self.__ts_data_buffer_dict[key] def keys(self): """Get all registered data keys""" - return list(self.__dict__.keys())[3:-1] + return list(self.__ts_data_buffer_dict.keys()) def get_path(self): return self.__segment_data_path @@ -156,8 +339,23 @@ class TobiiDataStream(threading.Thread): def read(self): + json_data_parser = TobiiJsonDataParser() + # create a dictionary of timestamped data buffers - ts_data_buffer_dict = DataStructures.DictObject('TobiiDataStream', **{}) + ts_data_buffer_dict = { + 'DirSig': DataStructures.TimeStampedBuffer(), + 'PTS': DataStructures.TimeStampedBuffer(), + 'EventSynch': DataStructures.TimeStampedBuffer(), + 'Event': DataStructures.TimeStampedBuffer(), + 'Accelerometer': DataStructures.TimeStampedBuffer(), + 'Gyroscope': DataStructures.TimeStampedBuffer(), + 'PupilCenter': DataStructures.TimeStampedBuffer(), + 'PupilDiameter': DataStructures.TimeStampedBuffer(), + 'GazeDirection': DataStructures.TimeStampedBuffer(), + 'GazePosition': DataStructures.TimeStampedBuffer(), + 'GazePosition3D': DataStructures.TimeStampedBuffer(), + 'MarkerPosition': DataStructures.TimeStampedBuffer() + } # if the data acquisition thread is not running if self.__stop_event.isSet(): @@ -188,15 +386,11 @@ class TobiiDataStream(threading.Thread): break # convert json data into data object - data_object_type = '_'.join(json_data.keys()) - data_object = DataStructures.DictObject(data_object_type, **json_data) - - # append a dedicated timestamped buffer for each data object type - if data_object.get_type() not in ts_data_buffer_dict.keys(): - ts_data_buffer_dict.append(data_object.get_type(), DataStructures.TimeStampedBuffer()) + data_object = json_data_parser.parse_data(json_data) + data_object_type = type(data_object).__name__ - # store data object into the timestamped buffer dedicated to its type - ts_data_buffer_dict[data_object.get_type()][ts] = data_object + # store data object into dedicated timestamped buffer + ts_data_buffer_dict[data_object_type][ts] = data_object # unlock data queue access self.__read_lock.release() -- cgit v1.1