""" Handle network connection to Tobii Pro Glasses 2 device. It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py). This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import collections import datetime import gzip import json import logging import math import os import socket import sys import threading import time import uuid from dataclasses import dataclass try: from urllib.parse import urlparse, urlencode from urllib.request import urlopen, Request from urllib.error import URLError, HTTPError except ImportError: from urlparse import urlparse from urllib import urlencode from urllib2 import urlopen, Request, HTTPError, URLError from argaze import ArFeatures, DataFeatures import numpy import cv2 import av socket.IPPROTO_IPV6 = 41 TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f' TOBII_DATETIME_FORMAT_HUMREAD = '%d/%m/%Y %H:%M:%S' DEFAULT_PROJECT_NAME = 'DefaultProject' DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant' DEFAULT_RECORD_NAME = 'DefaultRecord' TOBII_PROJECTS_DIRNAME = "projects" TOBII_PROJECT_FILENAME = "project.json" TOBII_PARTICIPANTS_DIRNAME = "participants" TOBII_PARTICIPANT_FILENAME = "participant.json" TOBII_RECORDINGS_DIRNAME = "recordings" TOBII_RECORD_FILENAME = "recording.json" TOBII_SEGMENTS_DIRNAME = "segments" TOBII_SEGMENT_INFO_FILENAME = "segment.json" TOBII_SEGMENT_VIDEO_FILENAME = "fullstream.mp4" TOBII_SEGMENT_DATA_FILENAME = "livedata.json.gz" # Define default Tobii image_parameters values DEFAULT_TOBII_IMAGE_PARAMETERS = { "draw_something": False } # Define extra classes to support Tobii data parsing @dataclass class DirSig(): """Define dir sig data (dir sig).""" dir: int # meaning ? sig: int # meaning ? @dataclass class PresentationTimeStamp(): """Define presentation time stamp (pts) data.""" value: int """Pts value.""" @dataclass class VideoTimeStamp(): """Define video time stamp (vts) data.""" value: int """Vts value.""" offset: int """Primary time stamp value.""" @dataclass class EventSynch(): """Define event synch (evts) data.""" value: int # meaning ? """Evts value.""" @dataclass class Event(): """Define event data (ets type tag).""" ets: int # meaning ? type: str tag: str # dict ? @dataclass class Accelerometer(): """Define accelerometer data (ac).""" value: numpy.array """Accelerometer value""" @dataclass class Gyroscope(): """Define gyroscope data (gy).""" value: numpy.array """Gyroscope value""" @dataclass class PupilCenter(): """Define pupil center data (gidx pc eye).""" validity: int index: int value: tuple[(float, float, float)] eye: str # 'right' or 'left' @dataclass class PupilDiameter(): """Define pupil diameter data (gidx pd eye).""" validity: int index: int value: float eye: str # 'right' or 'left' @dataclass class GazeDirection(): """Define gaze direction data (gidx gd eye).""" validity: int index: int value: tuple[(float, float, float)] eye: str # 'right' or 'left' @dataclass class GazePosition(): """Define gaze position data (gidx l gp).""" validity: int index: int l: str # ? value: tuple[(float, float)] @dataclass class GazePosition3D(): """Define gaze position 3D data (gidx gp3).""" validity: int index: int value: tuple[(float, float)] @dataclass class MarkerPosition(): """Define marker data (marker3d marker2d).""" value_3d: tuple[(float, float, float)] value_2d: tuple[(float, float)] class TobiiJsonDataParser(): def __init__(self): self.__parse_data_map = { 'dir': self.__parse_dir_sig, 'pts': self.__parse_pts, 'vts': self.__parse_vts, 'evts': self.__parse_event_synch, 'ets': self.__parse_event, 'ac': self.__parse_accelerometer, 'gy': self.__parse_gyroscope, 'gidx': self.__parse_pupil_or_gaze, 'marker3d': self.__parse_marker_position } self.__parse_pupil_or_gaze_map = { 'pc': self.__parse_pupil_center, 'pd': self.__parse_pupil_diameter, 'gd': self.__parse_gaze_direction, 'l': self.__parse_gaze_position, 'gp3': self.__parse_gaze_position_3d } def parse_json(self, json_data) -> tuple[int, object, type]: """Parse JSON and return timestamp, object and type.""" data = json.loads(json_data.decode('utf-8')) # Parse data status status = data.pop('s', -1) # Parse timestamp data_ts = data.pop('ts') # Parse data depending first json key first_key = next(iter(data)) # Convert data into data object data_object = self.__parse_data_map[first_key](status, data) data_object_type = type(data_object).__name__ return data_ts, data_object, data_object_type def parse_data(self, status, data) -> tuple[object, type]: """Parse data and return object and type.""" # Parse data depending first json key first_key = next(iter(data)) # Convert data into data object data_object = self.__parse_data_map[first_key](status, data) data_object_type = type(data_object).__name__ return data_object, data_object_type def __parse_pupil_or_gaze(self, status, data): gaze_index = data.pop('gidx') # parse pupil or gaze data depending second json key second_key = next(iter(data)) return self.__parse_pupil_or_gaze_map[second_key](status, gaze_index, data) @staticmethod def __parse_dir_sig(status, data): return DirSig(data['dir'], data['sig']) @staticmethod def __parse_pts(status, data): return PresentationTimeStamp(data['pts']) @staticmethod def __parse_vts(status, data): # ts is not sent when recording try: ts = data['ts'] except KeyError: ts = -1 return VideoTimeStamp(data['vts'], ts) @staticmethod def __parse_event_synch(status, data): return EventSynch(data['evts']) @staticmethod def __parse_event(status, data): return Event(data['ets'], data['type'], data['tag']) @staticmethod def __parse_accelerometer(status, data): return Accelerometer(data['ac']) @staticmethod def __parse_gyroscope(status, data): return Gyroscope(data['gy']) @staticmethod def __parse_pupil_center(status, gaze_index, data): return PupilCenter(status, gaze_index, data['pc'], data['eye']) @staticmethod def __parse_pupil_diameter(status, gaze_index, data): return PupilDiameter(status, gaze_index, data['pd'], data['eye']) @staticmethod def __parse_gaze_direction(status, gaze_index, data): return GazeDirection(status, gaze_index, data['gd'], data['eye']) @staticmethod def __parse_gaze_position(status, gaze_index, data): return GazePosition(status, gaze_index, data['l'], data['gp']) @staticmethod def __parse_gaze_position_3d(status, gaze_index, data): return GazePosition3D(status, gaze_index, data['gp3']) @staticmethod def __parse_marker_position(status, data): return MarkerPosition(data['marker3d'], data['marker2d']) class LiveStream(ArFeatures.ArContext): @DataFeatures.PipelineStepInit def __init__(self, **kwargs): # Init ArContext class super().__init__() # Init private attributes self.__address = None self.__udpport = 49152 self.__project_name = None self.__project_id = None self.__participant_name = None self.__participant_id = None self.__configuration = {} self.__parser = TobiiJsonDataParser() # Init protected attributes self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS} @property def address(self) -> str: """Network address where to find the device.""" return self.__address @address.setter def address(self, address: str): self.__address = address # Remove part after % on under Windows if "%" in self.__address: if sys.platform == "win32": self.__address = self.__address.split("%")[0] # Define base url if ':' in self.__address: self.__base_url = f'http://[{self.__address}]' else: # noinspection PyAttributeOutsideInit self.__base_url = 'http://' + self.__address @property def configuration(self) -> dict: """Patch system configuration dictionary.""" return self.__configuration @configuration.setter @DataFeatures.PipelineStepAttributeSetter def configuration(self, configuration: dict): self.__configuration = configuration @property def project(self) -> str: """Project name.""" return self.__project_name @project.setter def project(self, project: str): self.__project_name = project def __bind_project(self): """Bind to a project or create one if it doesn't exist.""" if self.__project_name is None: raise Exception(f'Project binding fails: setup project before.') self.__project_id = None # Check if project exist projects = self.__get_request('/api/projects') for project in projects: try: if project['pr_info']['Name'] == self.__project_name: self.__project_id = project['pr_id'] logging.debug('> %s project already exist: %s', self.__project_name, self.__project_id) except: pass # The project doesn't exist, create one if self.__project_id is None: logging.debug('> %s project doesn\'t exist', self.__project_name) data = { 'pr_info': { 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD), 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)), 'Name': self.__project_name }, 'pr_created': self.__get_current_datetime() } json_data = self.__post_request('/api/projects', data) self.__project_id = json_data['pr_id'] logging.debug('> new %s project created: %s', self.__project_name, self.__project_id) @property def participant(self) -> str: """Participant name""" return self.__participant_name @participant.setter def participant(self, participant: str): self.__participant_name = participant def __bind_participant(self): """Bind to a participant or create one if it doesn't exist. !!! warning Bind to a project before. """ if self.__participant_name is None: raise Exception(f'Participant binding fails: setup participant before.') if self.__project_id is None: raise Exception(f'Participant binding fails: bind to a project before') self.__participant_id = None # Check if participant exist participants = self.__get_request('/api/participants') for participant in participants: try: if participant['pa_info']['Name'] == self.__participant_name: self.__participant_id = participant['pa_id'] logging.debug('> %s participant already exist: %s', self.__participant_name, self.__participant_id) except: pass # The participant doesn't exist, create one if self.__participant_id is None: logging.debug('> %s participant doesn\'t exist', self.__participant_name) data = { 'pa_project': self.__project_id, 'pa_info': { 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__participant_name)), 'Name': self.__participant_name, 'Notes': '' # TODO: set participant notes }, 'pa_created': self.__get_current_datetime() } json_data = self.__post_request('/api/participants', data) self.__participant_id = json_data['pa_id'] logging.debug('> new %s participant created: %s', self.__participant_name, self.__participant_id) @DataFeatures.PipelineStepEnter def __enter__(self): logging.info('Tobii Pro Glasses 2 connexion starts...') # Update current configuration with configuration patch logging.debug('> updating configuration') # Update current configuration with configuration patch if self.__configuration: logging.debug('> updating configuration') configuration = self.__post_request('/api/system/conf', self.__configuration) # Read current configuration else: logging.debug('> reading configuration') configuration = self.__get_request('/api/system/conf') # Log current configuration logging.info('Tobii Pro Glasses 2 configuration:') for key, value in configuration.items(): logging.info('%s: %s', key, str(value)) # Store video stream info self.__video_width = int(configuration['sys_sc_width']) self.__video_height = int(configuration['sys_sc_height']) self.__video_fps = float(configuration['sys_sc_fps']) # Bind to project if required if self.__project_name is not None: logging.debug('> binding project %s', self.__project_name) self.__bind_project() logging.info('Tobii Pro Glasses 2 project id: %s', self.__project_id) # Bind to participant if required if self.__participant_name is not None: logging.debug('> binding participant %s', self.__participant_name) self.__bind_participant() logging.info('Tobii Pro Glasses 2 participant id: %s', self.__participant_id) # Create stop event self.__stop_event = threading.Event() # Open data stream self.__data_socket = self.__make_socket() self.__data_thread = threading.Thread(target=self.__stream_data) logging.debug('> starting data thread...') self.__data_thread.start() # Open video stream self.__video_socket = self.__make_socket() self.__video_thread = threading.Thread(target=self.__stream_video) logging.debug('> starting video thread...') self.__video_thread.start() # Keep connection alive self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \"" + str( uuid.uuid4()) + "\", \"op\": \"start\"}" self.__keep_alive_thread = threading.Thread(target=self.__keep_alive) logging.debug('> starting keep alive thread...') self.__keep_alive_thread.start() return self @DataFeatures.PipelineStepExit def __exit__(self, exception_type, exception_value, exception_traceback): logging.debug('%s.__exit__', DataFeatures.get_class_path(self)) # Close data stream self.__stop_event.set() # Stop keeping connection alive threading.Thread.join(self.__keep_alive_thread) # Stop data streaming threading.Thread.join(self.__data_thread) # Stop video buffer reading threading.Thread.join(self.__video_buffer_read_thread) # Stop video streaming threading.Thread.join(self.__video_thread) @DataFeatures.PipelineStepImage def image(self, draw_something: bool = None, **kwargs: dict) -> numpy.array: """Get Tobii visualization. Parameters: draw_something: example kwargs: ArContext.image parameters """ # Get context image image = super().image(**kwargs) if draw_something: cv2.putText(image, 'SOMETHING', (512, 512), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) return image def __make_socket(self): """Create a socket to enable network communication.""" iptype = socket.AF_INET if ':' in self.__address: iptype = socket.AF_INET6 res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE) family, socktype, proto, canonname, sockaddr = res[0] new_socket = socket.socket(family, socktype, proto) new_socket.settimeout(5.0) try: if iptype == socket.AF_INET6: new_socket.setsockopt(socket.SOL_SOCKET, 25, 1) except socket.error as e: if e.errno == 1: logging.error('Binding to a network interface is permitted only for root users.') return new_socket def __stream_data(self): """Stream data from dedicated socket.""" logging.debug('%s.__stream_data', DataFeatures.get_class_path(self)) # First timestamp to offset all timestamps first_ts = 0 while not self.__stop_event.is_set(): try: json_data, _ = self.__data_socket.recvfrom(1024) except TimeoutError: logging.error('> timeout occurred while receiving data') continue if json_data is not None: # Parse json into timestamped data object data_ts, data_object, data_object_type = self.__parser.parse_json(json_data) # Store first timestamp if first_ts == 0: first_ts = data_ts # Edit millisecond timestamp timestamp = int((data_ts - first_ts) * 1e-3) match data_object_type: case 'GazePosition': logging.debug('> received %s at %i timestamp', data_object_type, timestamp) # When gaze position is valid if data_object.validity == 0: # Process timestamped gaze position self._process_gaze_position( timestamp=timestamp, x=int(data_object.value[0] * self.__video_width), y=int(data_object.value[1] * self.__video_height)) else: # Process empty gaze position self._process_gaze_position(timestamp=timestamp) def __stream_video(self): """Stream video from dedicated socket.""" logging.debug('%s.__stream_video', DataFeatures.get_class_path(self)) # Open video stream container = av.open(f'rtsp://{self.__address}:8554/live/scene', options={'rtsp_transport': 'tcp'}) self.__stream = container.streams.video[0] # Create a video buffer with a lock self.__video_buffer = collections.OrderedDict() self.__video_buffer_lock = threading.Lock() # Open video buffer reader self.__video_buffer_read_thread = threading.Thread(target=self.__video_buffer_read) logging.debug('> starting video buffer reader thread...') self.__video_buffer_read_thread.start() # First timestamp to offset all timestamps first_ts = 0 for image in container.decode(self.__stream): logging.debug('> new image decoded') # Quit if the video acquisition thread have been stopped if self.__stop_event.is_set(): logging.debug('> stop event is set') break if image is not None: if image.time is not None: # Store first timestamp if first_ts == 0: first_ts = image.time # Edit millisecond timestamp timestamp = int((image.time - first_ts) * 1e3) logging.debug('> store image at %i timestamp', timestamp) # Lock buffer access self.__video_buffer_lock.acquire() # Decode image and store it at time index self.__video_buffer[timestamp] = image.to_ndarray(format='bgr24') # Unlock buffer access self.__video_buffer_lock.release() def __video_buffer_read(self): """Read incoming buffered video images.""" logging.debug('%s.__video_buffer_read', DataFeatures.get_class_path(self)) while not self.__stop_event.is_set(): # Can't read image while it is locked while self.__video_buffer_lock.locked(): # Check 10 times per frame time.sleep(1 / (10 * self.__video_fps)) # Lock buffer access self.__video_buffer_lock.acquire() # Video buffer not empty if len(self.__video_buffer) > 0: logging.debug('> %i images in buffer', len(self.__video_buffer)) # Get last stored image try: timestamp, image = self.__video_buffer.popitem(last=True) logging.debug('> read image at %i timestamp', timestamp) if len(self.__video_buffer) > 0: logging.warning('skipping %i image', len(self.__video_buffer)) # Clear buffer self.__video_buffer = collections.OrderedDict() # Process camera image self._process_camera_image( timestamp=timestamp, image=image) except Exception as e: logging.warning('%s.__video_buffer_read: %s', DataFeatures.get_class_path(self), e) # Unlock buffer access self.__video_buffer_lock.release() def __keep_alive(self): """Maintain network connection.""" logging.debug('%s.__keep_alive', DataFeatures.get_class_path(self)) while not self.__stop_event.is_set(): self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport)) self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport)) time.sleep(1) def __get_request(self, api_action) -> any: """Send a GET request and get data back.""" url = self.__base_url + api_action logging.debug('%s.__get_request %s', DataFeatures.get_class_path(self), url) res = urlopen(url).read() try: data = json.loads(res.decode('utf-8')) except json.JSONDecodeError: data = None logging.debug('%s.__get_request received %s', DataFeatures.get_class_path(self), data) return data def __post_request(self, api_action, data=None, wait_for_response=True) -> any: """Send a POST request and get result back.""" url = self.__base_url + api_action logging.debug('%s.__post_request %s', DataFeatures.get_class_path(self), url) req = Request(url) req.add_header('Content-Type', 'application/json') data = json.dumps(data) if wait_for_response is False: threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start() return None response = urlopen(req, data.encode('utf-8')) res = response.read() try: res = json.loads(res.decode('utf-8')) except: pass return res def __wait_for_status(self, api_action, key, values, timeout=None) -> any: """Wait until a status matches given values.""" url = self.__base_url + api_action running = True while running: req = Request(url) req.add_header('Content-Type', 'application/json') try: response = urlopen(req, None, timeout=timeout) except URLError as e: logging.error(e.reason) return -1 data = response.read() json_data = json.loads(data.decode('utf-8')) if json_data[key] in values: running = False time.sleep(1) return json_data[key] @staticmethod def __get_current_datetime(timeformat=TOBII_DATETIME_FORMAT): return datetime.datetime.now().replace(microsecond=0).strftime(timeformat) # CALIBRATION def calibrate(self, project_name, participant_name): """Handle whole Tobii glasses calibration process.""" # Start calibration self.calibration_start(project_name, participant_name) # While calibrating... status = self.calibration_status() while status == 'calibrating': time.sleep(1) status = self.calibration_status() if status == 'uncalibrated' or status == 'stale' or status == 'failed': raise Exception(f'Calibration {status}') # CALIBRATION def calibration_start(self, project_name, participant_name): """Start calibration process for project and participant.""" project_id = self.__get_project_id(project_name) participant_id = self.get_participant_id(participant_name) # Init calibration id self.__calibration_id = None # Calibration have to be done for a project and a participant if project_id is None or participant_id is None: raise Exception(f'Setup project and participant before') data = { 'ca_project': project_id, 'ca_type': 'default', 'ca_participant': participant_id, 'ca_created': self.__get_current_datetime() } # Request calibration json_data = self.__post_request('/api/calibrations', data) # noinspection PyAttributeOutsideInit self.__calibration_id = json_data['ca_id'] # Start calibration self.__post_request('/api/calibrations/' + self.__calibration_id + '/start') def calibration_status(self) -> str: """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed.""" if self.__calibration_id is not None: status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) # Forget calibration id if status != 'calibrating': # noinspection PyAttributeOutsideInit self.__calibration_id = None return status else: raise Exception(f'Start calibration before') # RECORDING FEATURES def __wait_for_recording_status(self, recording_id, status_array=['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']): return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array) def create_recording(self, participant_name, recording_name='', recording_notes='') -> str: """Create a new recording. Returns: recording id """ participant_id = self.get_participant_id(participant_name) if participant_id is None: raise NameError(f'{participant_name} participant doesn\'t exist') data = { 'rec_participant': participant_id, 'rec_info': { 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)), 'Name': recording_name, 'Notes': recording_notes }, 'rec_created': self.__get_current_datetime() } json_data = self.__post_request('/api/recordings', data) return json_data['rec_id'] def start_recording(self, recording_id) -> bool: """Start recording on the Tobii interface's SD Card.""" self.__post_request('/api/recordings/' + recording_id + '/start') return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording' def stop_recording(self, recording_id) -> bool: """Stop recording on the Tobii interface's SD Card.""" self.__post_request('/api/recordings/' + recording_id + '/stop') return self.__wait_for_recording_status(recording_id, ['done']) == "done" def pause_recording(self, recording_id) -> bool: """Pause recording on the Tobii interface's SD Card.""" self.__post_request('/api/recordings/' + recording_id + '/pause') return self.__wait_for_recording_status(recording_id, ['paused']) == "paused" def __get_recording_status(self): return self.get_status()['sys_recording'] def get_current_recording_id(self) -> str: """Get current recording id.""" return self.__get_recording_status()['rec_id'] @property def recording(self) -> bool: """Is it recording?""" rec_status = self.__get_recording_status() if rec_status != {}: if rec_status['rec_state'] == "recording": return True return False def get_recordings(self) -> str: """Get all recordings' id.""" return self.__get_request('/api/recordings') # EVENTS AND EXPERIMENTAL VARIABLES def __post_recording_data(self, event_type: str, event_tag=''): data = {'type': event_type, 'tag': event_tag} self.__post_request('/api/events', data, wait_for_response=False) def send_event(self, event_type: str, event_value=None): self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value)) def send_variable(self, variable_name: str, variable_value=None): self.__post_recording_data(str(variable_name), str(variable_value)) # MISC def eject_sd(self): self.__get_request('/api/eject') def get_battery_info(self): return ("Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % ( float(self.get_battery_level()), float(self.get_battery_remaining_time()))) def get_battery_level(self): return self.get_battery_status()['level'] def get_battery_remaining_time(self): return self.get_battery_status()['remaining_time'] def get_battery_status(self): return self.get_status()['sys_battery'] def get_et_freq(self): return self.get_configuration()['sys_et_freq'] def get_et_frequencies(self): return self.get_status()['sys_et']['frequencies'] def identify(self): self.__get_request('/api/identify') def get_configuration(self): return self.__get_request('/api/system/conf') def get_status(self): return self.__get_request('/api/system/status') def get_storage_info(self): return ("Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time())) def get_storage_remaining_time(self): return self.get_storage_status()['remaining_time'] def get_storage_status(self): return self.get_status()['sys_storage'] def get_scene_camera_freq(self): return self.get_configuration()['sys_sc_fps'] def set_et_freq_50(self): data = {'sys_et_freq': 50} json_data = self.__post_request('/api/system/conf', data) def set_et_freq_100(self): # May not be available. Check get_et_frequencies() first. data = {'sys_et_freq': 100} json_data = self.__post_request('/api/system/conf', data) def set_eye_camera_indoor_preset(self) -> str: data = {'sys_ec_preset': 'Indoor'} return self.__post_request('/api/system/conf', data) def set_eye_camera_outdoor_preset(self) -> str: data = {'sys_ec_preset': 'ClearWeather'} return self.__post_request('/api/system/conf', data) def set_scene_camera_auto_preset(self): data = {'sys_sc_preset': 'Auto'} json_data = self.__post_request('/api/system/conf', data) def set_scene_camera_gaze_preset(self): data = {'sys_sc_preset': 'GazeBasedExposure'} json_data = self.__post_request('/api/system/conf', data) def set_scene_camera_freq_25(self): data = {'sys_sc_fps': 25} json_data = self.__post_request('/api/system/conf/', data) def set_scene_camera_freq_50(self): data = {'sys_sc_fps': 50} json_data = self.__post_request('/api/system/conf/', data) class PostProcessing(ArFeatures.ArContext): @DataFeatures.PipelineStepInit def __init__(self, **kwargs): # Init ArContext class super().__init__() # Init private attributes self.__segment = None self.__start = math.nan self.__end = math.nan self.__parser = TobiiJsonDataParser() self.__data_counts_dict = { 'DirSig': 0, 'PresentationTimeStamp': 0, 'VideoTimeStamp': 0, 'EventSynch': 0, 'Event': 0, 'Accelerometer': 0, 'Gyroscope': 0, 'PupilCenter': 0, 'PupilDiameter': 0, 'GazeDirection': 0, 'GazePosition': 0, 'GazePosition3D': 0, 'MarkerPosition': 0 } self.__data_list = [] # Initialize synchronisation self.__sync_event = None self.__sync_event_unit = None self.__sync_event_factor = None self.__sync_data_ts = None self.__sync_ts = None self.__last_sync_data_ts = None self.__last_sync_ts = None self.__time_unit_factor = { "µs": 1e-3, "ms": 1, "s": 1e3 } # Initialize inconsistent timestamp monitoring self.__last_data_ts = None # Init protected attributes self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS} @property def segment(self) -> str: """Path to segment folder.""" return self.__segment @segment.setter def segment(self, segment: str): self.__segment = segment @property def start(self) -> float: """Start reading timestamp in millisecond.""" return self.__start @start.setter def start(self, start: float): self.__start = start @property def end(self) -> float: """End reading timestamp in millisecond.""" return self.__end @end.setter def end(self, end: float): self.__end = end @property def sync_event(self) -> str: """Optional event type dedicated to syncrhonize Tobii timestamps with external time source.""" return self.__sync_event @sync_event.setter def sync_event(self, sync_event: str): self.__sync_event = sync_event @property def sync_event_unit(self) -> str: """Define sync event unit for conversion purpose ('µs', 'ms' or 's')""" return self.__sync_event_unit @sync_event_unit.setter def sync_event_unit(self, sync_event_unit: str): self.__sync_event_unit = sync_event_unit self.__sync_event_factor = self.__time_unit_factor.get(sync_event_unit) @DataFeatures.PipelineStepEnter def __enter__(self): # Read segment info with open(os.path.join(self.__segment, TOBII_SEGMENT_INFO_FILENAME)) as info_file: try: info = json.load(info_file) except: raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}') # Constrain reading dates self.__end = min(self.__end, int(info["seg_length"] * 1e3)) if self.__end != None else int( info["seg_length"] * 1e3) if self.__start >= self.__end: raise ValueError('Start reading timestamp is equal or greater than end reading timestamp.') # TODO: log various info calibrated = bool(info["seg_calibrated"]) start_date = datetime.datetime.strptime(info["seg_t_start"], TOBII_DATETIME_FORMAT) stop_date = datetime.datetime.strptime(info["seg_t_stop"], TOBII_DATETIME_FORMAT) # Create stop event self.__stop_event = threading.Event() # Create pause event self.__pause_event = threading.Event() # Open reading thread self.__reading_thread = threading.Thread(target=self.__read) logging.debug('> starting reading thread...') self.__reading_thread.start() @DataFeatures.PipelineStepExit def __exit__(self, exception_type, exception_value, exception_traceback): logging.debug('%s.__exit__', DataFeatures.get_class_path(self)) # Close data stream self.__stop_event.set() # Stop reading thread threading.Thread.join(self.__reading_thread) def __read(self): """Iterate on video images and their related data.""" for video_ts, video_image, data_list in self: # Check pause event (and stop event) while self.__pause_event.is_set() and not self.__stop_event.is_set(): logging.debug('> reading is paused at %i', video_ts) self._process_camera_image(timestamp=video_ts, image=video_image) time.sleep(1) # Check stop event if self.__stop_event.is_set(): break logging.debug('> read image at %i timestamp', video_ts) # Process camera image self._process_camera_image(timestamp=video_ts, image=video_image) height, width, _ = video_image.shape logging.debug('> read %i data related to image', len(data_list)) # Process data for data_ts, data_object, data_object_type in data_list: # Check sync event first if required if self.__sync_event is not None: if data_object_type == 'Event': logging.debug('> reading %s event (%s) at %f ms', data_object.type, data_object.tag, data_ts) if data_object.type == self.__sync_event: # Store old sync data ts if self.__last_sync_data_ts is None and self.__sync_data_ts is not None: self.__last_sync_data_ts = self.__sync_data_ts # Store old sync ts if self.__last_sync_ts is None and self.__sync_ts is not None: self.__last_sync_ts = self.__sync_ts # Store sync event timestamp self.__sync_data_ts = data_ts self.__sync_ts = float(data_object.tag) * self.__sync_event_factor # Monitor delay between data ts and sync ts if self.__last_sync_data_ts is not None and self.__last_sync_ts is not None: diff_data_ts = self.__sync_data_ts - self.__last_sync_data_ts diff_sync_ts = (self.__sync_ts - self.__last_sync_ts) # Correct sync ts self.__sync_ts += diff_data_ts-diff_sync_ts if abs(diff_data_ts-diff_sync_ts) > 0: logging.info('Difference between data and sync event timestamps is %i ms', diff_data_ts-diff_sync_ts) # Don't process gaze positions if sync is required but sync event not happened yet if self.__sync_event is not None and self.__sync_ts is None: continue # Otherwise, synchronize timestamp with sync event else: data_ts = int(self.__sync_ts + data_ts - self.__sync_data_ts) # Catch inconstistent timestamps if self.__last_data_ts is not None: if self.__data_ts - self.__last_data_ts <= 0: logging.error('! %i gaze position more recent than the previous one', data_ts) last_data_ts = data_ts # Process gaze positions match data_object_type: case 'GazePosition': logging.debug('> reading %s at %i timestamp', data_object_type, data_ts) # When gaze position is valid if data_object.validity == 0: # Process timestamped gaze position self._process_gaze_position( timestamp=data_ts, x=int(data_object.value[0] * width), y=int(data_object.value[1] * height)) else: # Process empty gaze position self._process_gaze_position(timestamp=data_ts) def __iter__(self): self.__data_file = gzip.open(os.path.join(self.__segment, TOBII_SEGMENT_DATA_FILENAME)) self.__video_file = av.open(os.path.join(self.__segment, TOBII_SEGMENT_VIDEO_FILENAME)) self.__vts_offset = 0 self.__vts_ts = -1 return self def __next__(self): data_list = [] video_ts, image = self.__next_video_image() next_video_ts, next_video_image = self.__next_video_image() next_data_ts, next_data_object, next_data_object_type = self.__next_data() while next_data_ts < next_video_ts: data_list.append((next_data_ts, next_data_object, next_data_object_type)) next_data_ts, next_data_object, next_data_object_type = self.__next_data() output = video_ts, image, data_list video_ts, video_image = next_video_ts, next_video_image return output def __next_video_image(self): image = next(self.__video_file.decode(self.__video_file.streams.video[0])) ts = int(image.time * 1e3) # Ignore before start timestamp if ts < self.__start: return self.__next__() # Ignore images after end timestamp if self.__end != None: if ts >= self.__end: raise StopIteration # Return millisecond timestamp and image return ts, image.to_ndarray(format='bgr24') def __next_data(self): data = json.loads(next(self.__data_file).decode('utf-8')) # Parse data status status = data.pop('s', -1) # Convert timestamp ts = data.pop('ts') # Watch for vts data to offset timestamps try: self.__vts_offset = data['vts'] self.__vts_ts = ts # Store primary ts value to allow further reverse offset operation data['ts'] = ts except KeyError: pass # Ignore data before first vts entry if self.__vts_ts == -1: return self.__next_data() ts -= self.__vts_ts ts += self.__vts_offset # Ignore timestamps out of the given time range if ts < self.__start * 1e3: return self.__next_data() if ts >= self.__end * 1e3: raise StopIteration # Parse data data_object, data_object_type = self.__parser.parse_data(status, data) # Return millisecond timestamp, data object and type return ts * 1e-3, data_object, data_object_type @DataFeatures.PipelineStepMethod def pause(self): """Pause pipeline processing.""" self.__pause_event.set() def is_paused(self) -> bool: """Is pipeline processing paused?""" return self.__pause_event.is_set() @DataFeatures.PipelineStepMethod def resume(self): """Resume pipeline processing.""" self.__pause_event.clear()