From 0485c264560b4b5a549d23c9414d0c3341a19438 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 28 Mar 2024 01:51:16 +0100 Subject: Adding PostProcessing context. --- src/argaze/utils/contexts/TobiiProGlasses2.py | 360 +++++++++++++++++++++++--- 1 file changed, 318 insertions(+), 42 deletions(-) diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py index 8b92fef..6b7236b 100644 --- a/src/argaze/utils/contexts/TobiiProGlasses2.py +++ b/src/argaze/utils/contexts/TobiiProGlasses2.py @@ -18,12 +18,15 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import sys +import os import logging import socket import threading import collections import json import time +import math +import gzip import datetime import uuid from dataclasses import dataclass @@ -54,9 +57,23 @@ DEFAULT_PROJECT_NAME = 'DefaultProject' DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant' DEFAULT_RECORD_NAME = 'DefaultRecord' +TOBII_PROJECTS_DIRNAME = "projects" +TOBII_PROJECT_FILENAME = "project.json" + +TOBII_PARTICIPANTS_DIRNAME = "participants" +TOBII_PARTICIPANT_FILENAME = "participant.json" + +TOBII_RECORDINGS_DIRNAME = "recordings" +TOBII_RECORD_FILENAME = "recording.json" + +TOBII_SEGMENTS_DIRNAME = "segments" +TOBII_SEGMENT_INFO_FILENAME = "segment.json" +TOBII_SEGMENT_VIDEO_FILENAME = "fullstream.mp4" +TOBII_SEGMENT_DATA_FILENAME = "livedata.json.gz" + # Define default Tobii image_parameters values DEFAULT_TOBII_IMAGE_PARAMETERS = { - "draw_something": False + "draw_something": False } # Define extra classes to support Tobii data parsing @@ -188,94 +205,107 @@ class TobiiJsonDataParser(): 'gp3': self.__parse_gaze_position_3d } - def parse(self, data): + def parse_json(self, json_data) -> tuple[int, object, type]: + """Parse JSON and return timestamp, object and type.""" - json_data = json.loads(data.decode('utf-8')) + data = json.loads(json_data.decode('utf-8')) # Parse data status - status = json_data.pop('s', -1) + status = data.pop('s', -1) # Parse timestamp - data_ts = json_data.pop('ts') + data_ts = data.pop('ts') # Parse data depending first json key - first_key = next(iter(json_data)) + first_key = next(iter(data)) - # Convert json data into data object - data_object = self.__parse_data_map[first_key](status, json_data) + # Convert data into data object + data_object = self.__parse_data_map[first_key](status, data) data_object_type = type(data_object).__name__ return data_ts, data_object, data_object_type - def __parse_pupill_or_gaze(self, status, json_data): + def parse_data(self, status, data) -> tuple[object, type]: + """Parse data and return object and type.""" + + # Parse data depending first json key + first_key = next(iter(data)) + + # Convert data into data object + data_object = self.__parse_data_map[first_key](status, data) + data_object_type = type(data_object).__name__ + + return data_object, data_object_type - gaze_index = json_data.pop('gidx') + def __parse_pupill_or_gaze(self, status, data): + + gaze_index = data.pop('gidx') # parse pupill or gaze data depending second json key - second_key = next(iter(json_data)) + second_key = next(iter(data)) - return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, json_data) + return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, data) - def __parse_dir_sig(self, status, json_data): + def __parse_dir_sig(self, status, data): - return DirSig(json_data['dir'], json_data['sig']) + return DirSig(data['dir'], data['sig']) - def __parse_pts(self, status, json_data): + def __parse_pts(self, status, data): - return PresentationTimeStamp(json_data['pts']) + return PresentationTimeStamp(data['pts']) - def __parse_vts(self, status, json_data): + def __parse_vts(self, status, data): # ts is not sent when recording try: - ts = json_data['ts'] + ts = data['ts'] except KeyError: ts = -1 - return VideoTimeStamp(json_data['vts'], ts) + return VideoTimeStamp(data['vts'], ts) - def __parse_event_synch(self, status, json_data): + def __parse_event_synch(self, status, data): - return EventSynch(json_data['evts']) + return EventSynch(data['evts']) - def __parse_event(self, status, json_data): + def __parse_event(self, status, data): - return Event(json_data['ets'], json_data['type'], json_data['tag']) + return Event(data['ets'], data['type'], data['tag']) - def __parse_accelerometer(self, status, json_data): + def __parse_accelerometer(self, status, data): - return Accelerometer(json_data['ac']) + return Accelerometer(data['ac']) - def __parse_gyroscope(self, status, json_data): + def __parse_gyroscope(self, status, data): - return Gyroscope(json_data['gy']) + return Gyroscope(data['gy']) - def __parse_pupill_center(self, status, gaze_index, json_data): + def __parse_pupill_center(self, status, gaze_index, data): - return PupillCenter(status, gaze_index, json_data['pc'], json_data['eye']) + return PupillCenter(status, gaze_index, data['pc'], data['eye']) - def __parse_pupill_diameter(self, status, gaze_index, json_data): + def __parse_pupill_diameter(self, status, gaze_index, data): - return PupillDiameter(status, gaze_index, json_data['pd'], json_data['eye']) + return PupillDiameter(status, gaze_index, data['pd'], data['eye']) - def __parse_gaze_direction(self, status, gaze_index, json_data): + def __parse_gaze_direction(self, status, gaze_index, data): - return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye']) + return GazeDirection(status, gaze_index, data['gd'], data['eye']) - def __parse_gaze_position(self, status, gaze_index, json_data): + def __parse_gaze_position(self, status, gaze_index, data): - return GazePosition(status, gaze_index, json_data['l'], json_data['gp']) + return GazePosition(status, gaze_index, data['l'], data['gp']) - def __parse_gaze_position_3d(self, status, gaze_index, json_data): + def __parse_gaze_position_3d(self, status, gaze_index, data): - return GazePosition3D(status, gaze_index, json_data['gp3']) + return GazePosition3D(status, gaze_index, data['gp3']) - def __parse_marker_position(self, status, json_data): + def __parse_marker_position(self, status, data): - return MarkerPosition(json_data['marker3d'], json_data['marker2d']) + return MarkerPosition(data['marker3d'], data['marker2d']) class LiveStream(ArFeatures.ArContext): @@ -627,17 +657,17 @@ class LiveStream(ArFeatures.ArContext): try: - data, _ = self.__data_socket.recvfrom(1024) + json_data, _ = self.__data_socket.recvfrom(1024) except TimeoutError: logging.error('> timeout occurred while receiving data') continue - if data is not None: + if json_data is not None: # Parse json into timestamped data object - data_ts, data_object, data_object_type = self.__parser.parse(data) + data_ts, data_object, data_object_type = self.__parser.parse_json(json_data) # Store first timestamp if first_ts == 0: @@ -1160,3 +1190,249 @@ class LiveStream(ArFeatures.ArContext): def set_scene_camera_freq_50(self): data = {'sys_sc_fps': 50} json_data = self.__post_request('/api/system/conf/', data) + + +class PostProcessing(ArFeatures.ArContext): + + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + # Init parent classes + super().__init__() + + # Init private attributes + self.__segment = None + self.__start = math.nan + self.__end = math.nan + self.__parser = TobiiJsonDataParser() + + self.__data_counts_dict = { + 'DirSig': 0, + 'PresentationTimeStamp': 0, + 'VideoTimeStamp': 0, + 'EventSynch': 0, + 'Event': 0, + 'Accelerometer': 0, + 'Gyroscope': 0, + 'PupillCenter': 0, + 'PupillDiameter': 0, + 'GazeDirection': 0, + 'GazePosition': 0, + 'GazePosition3D': 0, + 'MarkerPosition': 0 + } + + self.__data_list = [] + + self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS} + + @property + def segment(self) -> str: + """Path to segment folder.""" + return self.__segment + + @segment.setter + def segment(self, segment: str): + + self.__segment = segment + + @property + def start(self) -> int: + """Start reading timestamp in millisecond.""" + return self.__start + + @start.setter + def start(self, start: int): + + self.__start = start + + @property + def end(self) -> int: + """End reading timestamp in millisecond.""" + return self.__end + + @end.setter + def end(self, end: int): + + self.__end = end + + @DataFeatures.PipelineStepEnter + def __enter__(self): + + # Read segment info + with open(os.path.join(self.__segment, TOBII_SEGMENT_INFO_FILENAME)) as info_file: + + try: + + info = json.load(info_file) + + except: + + raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}') + + # Constrain reading dates + self.__end = min(self.__end, int(info["seg_length"] * 1e3)) if self.__end != None else int(info["seg_length"] * 1e3) + + if self.__start >= self.__end: + + raise ValueError('Start reading timestamp is equal or greater than end reading timestamp.') + + # TODO: log various info + calibrated = bool(info["seg_calibrated"]) + start_date = datetime.datetime.strptime(info["seg_t_start"], TOBII_DATETIME_FORMAT) + stop_date = datetime.datetime.strptime(info["seg_t_stop"], TOBII_DATETIME_FORMAT) + + # Create stop event + self.__stop_event = threading.Event() + + # Open reading thread + self.__reading_thread = threading.Thread(target = self.__read) + + logging.debug('> starting reading thread...') + self.__reading_thread.start() + + @DataFeatures.PipelineStepExit + def __exit__(self, exception_type, exception_value, exception_traceback): + + logging.debug('%s.__exit__', type(self).__name__) + + # Close data stream + self.__stop_event.set() + + # Stop reading thread + threading.Thread.join(self.__reading_thread) + + def __read(self): + """Iterate on video images and their related data.""" + + for video_ts, video_image, data_list in self: + + if self.__stop_event.is_set(): + + break + + logging.debug('> read image at %i timestamp', video_ts) + + # Process camera image + self._process_camera_image( + timestamp = video_ts, + image = video_image) + + height, width, _ = video_image.shape + + logging.debug('> read %i data related to image', len(data_list)) + + # Process data + for data_ts, data_object, data_object_type in data_list: + + match data_object_type: + + case 'GazePosition': + + logging.debug('> reading %s at %i timestamp', data_object_type, data_ts) + + # When gaze position is valid + if data_object.validity == 0: + + # Process timestamped gaze position + self._process_gaze_position( + timestamp = data_ts, + x = int(data_object.value[0] * width), + y = int(data_object.value[1] * height) ) + + else: + + # Process empty gaze position + self._process_gaze_position(timestamp = data_ts) + + def __iter__(self): + + self.__data_file = gzip.open(os.path.join(self.__segment, TOBII_SEGMENT_DATA_FILENAME)) + self.__video_file = av.open(os.path.join(self.__segment, TOBII_SEGMENT_VIDEO_FILENAME)) + + self.__vts_offset = 0 + self.__vts_ts = -1 + + return self + + def __next__(self) -> tuple[int, numpy.array, list[tuple[int, object, str]]]: + + data_list = [] + video_ts, image = self.__next_video_image() + next_video_ts, next_video_image = self.__next_video_image() + next_data_ts, next_data_object, next_data_object_type = self.__next_data() + + while next_data_ts < next_video_ts: + + data_list.append((next_data_ts, next_data_object, next_data_object_type)) + next_data_ts, next_data_object, next_data_object_type = self.__next_data() + + output = video_ts, image, data_list + + video_ts, video_image = next_video_ts, next_video_image + + return output + + def __next_video_image(self) -> tuple[int, numpy.array]: + + image = next(self.__video_file.decode(self.__video_file.streams.video[0])) + ts = int(image.time * 1e3) + + # Ignore before start timestamp + if ts < self.__start: + + return self.__next__() + + # Ignore images after end timestamp + if self.__end != None: + + if ts >= self.__end: + + raise StopIteration + + # Return millisecond timestamp and image + return ts, image.to_ndarray(format='bgr24') + + def __next_data(self): + + data = json.loads(next(self.__data_file).decode('utf-8')) + + # Parse data status + status = data.pop('s', -1) + + # Convert timestamp + ts = data.pop('ts') + + # Watch for vts data to offset timestamps + try: + self.__vts_offset = data['vts'] + self.__vts_ts = ts + + # Store primary ts value to allow further reverse offset operation + data['ts'] = ts + + except KeyError: + pass + + # Ignore data before first vts entry + if self.__vts_ts == -1: + + return self.__next_data() + + ts -= self.__vts_ts + ts += self.__vts_offset + + # Ignore timestamps out of the given time range + if ts < self.__start * 1e3: + + return self.__next_data() + + if ts >= self.__end * 1e3: + + raise StopIteration + + # Parse data + data_object, data_object_type = self.__parser.parse_data(status, data) + + # Return millisecond timestamp, data object and type + return ts * 1e-3, data_object, data_object_type \ No newline at end of file -- cgit v1.1