aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/argaze/utils/Providers/TobiiProGlasses2.py974
1 files changed, 560 insertions, 414 deletions
diff --git a/src/argaze/utils/Providers/TobiiProGlasses2.py b/src/argaze/utils/Providers/TobiiProGlasses2.py
index c8cd81b..8ab7417 100644
--- a/src/argaze/utils/Providers/TobiiProGlasses2.py
+++ b/src/argaze/utils/Providers/TobiiProGlasses2.py
@@ -18,6 +18,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
import sys
+import logging
import socket
import threading
import json
@@ -40,6 +41,7 @@ from argaze import DataFeatures, GazeFeatures
from argaze.utils import UtilsFeatures
import numpy
+import av
socket.IPPROTO_IPV6 = 41
@@ -50,20 +52,262 @@ DEFAULT_PROJECT_NAME = 'DefaultProject'
DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant'
DEFAULT_RECORD_NAME = 'DefaultRecord'
-class Provider(DataFeatures.PipelineInputProvider):
+# Define extra classes to support Tobii data parsing
+@dataclass
+class DirSig():
+ """Define dir sig data (dir sig)."""
+
+ dir: int # meaning ?
+ sig: int # meaning ?
+
+@dataclass
+class PresentationTimeStamp():
+ """Define presentation time stamp (pts) data."""
+
+ value: int
+ """Pts value."""
+
+@dataclass
+class VideoTimeStamp():
+ """Define video time stamp (vts) data."""
+
+ value: int
+ """Vts value."""
+
+ offset: int
+ """Primary time stamp value."""
+
+@dataclass
+class EventSynch():
+ """Define event synch (evts) data."""
+
+ value: int # meaning ?
+ """Evts value."""
+
+@dataclass
+class Event():
+ """Define event data (ets type tag)."""
+
+ ets: int # meaning ?
+ type: str
+ tag: str # dict ?
+
+@dataclass
+class Accelerometer():
+ """Define accelerometer data (ac)."""
+
+ value: numpy.array
+ """Accelerometer value"""
+
+@dataclass
+class Gyroscope():
+ """Define gyroscope data (gy)."""
+
+ value: numpy.array
+ """Gyroscope value"""
+
+@dataclass
+class PupillCenter():
+ """Define pupill center data (gidx pc eye)."""
+
+ validity: int
+ index: int
+ value: tuple((float, float, float))
+ eye: str # 'right' or 'left'
+
+@dataclass
+class PupillDiameter():
+ """Define pupill diameter data (gidx pd eye)."""
+
+ validity: int
+ index: int
+ value: float
+ eye: str # 'right' or 'left'
+
+@dataclass
+class GazeDirection():
+ """Define gaze direction data (gidx gd eye)."""
+
+ validity: int
+ index: int
+ value: tuple((float, float, float))
+ eye: str # 'right' or 'left'
+
+@dataclass
+class GazePosition():
+ """Define gaze position data (gidx l gp)."""
+
+ validity: int
+ index: int
+ l: str # ?
+ value: tuple((float, float))
+
+@dataclass
+class GazePosition3D():
+ """Define gaze position 3D data (gidx gp3)."""
+
+ validity: int
+ index: int
+ value: tuple((float, float))
+
+@dataclass
+class MarkerPosition():
+ """Define marker data (marker3d marker2d)."""
+
+ value_3d: tuple((float, float, float))
+ value_2d: tuple((float, float))
+
+class TobiiJsonDataParser():
+
+ def __init__(self):
+
+ self.__first_ts = 0
+
+ self.__parse_data_map = {
+ 'dir': self.__parse_dir_sig,
+ 'pts': self.__parse_pts,
+ 'vts': self.__parse_vts,
+ 'evts': self.__parse_event_synch,
+ 'ets': self.__parse_event,
+ 'ac': self.__parse_accelerometer,
+ 'gy': self.__parse_gyroscope,
+ 'gidx': self.__parse_pupill_or_gaze,
+ 'marker3d': self.__parse_marker_position
+ }
+
+ self.__parse_pupill_or_gaze_map = {
+ 'pc': self.__parse_pupill_center,
+ 'pd': self.__parse_pupill_diameter,
+ 'gd': self.__parse_gaze_direction,
+ 'l': self.__parse_gaze_position,
+ 'gp3': self.__parse_gaze_position_3d
+ }
+
+ def parse(self, data):
+
+ json_data = json.loads(data.decode('utf-8'))
+
+ # Parse data status
+ status = json_data.pop('s', -1)
+
+ # Parse timestamp
+ data_ts = json_data.pop('ts')
+
+ # Parse data depending first json key
+ first_key = next(iter(json_data))
+
+ # Convert json data into data object
+ data_object = self.__parse_data_map[first_key](status, json_data)
+ data_object_type = type(data_object).__name__
+
+ # Keep first timestamp to offset all timestamps
+ if self.__first_ts == 0:
+ self.__first_ts = data_ts
+
+ data_ts -= self.__first_ts
+
+ return data_ts, data_object, data_object_type
+
+ def __parse_pupill_or_gaze(self, status, json_data):
+
+ gaze_index = json_data.pop('gidx')
+
+ # parse pupill or gaze data depending second json key
+ second_key = next(iter(json_data))
+
+ return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, json_data)
+
+ def __parse_dir_sig(self, status, json_data):
+
+ return DirSig(json_data['dir'], json_data['sig'])
- def __init__(self, address: str = None, project: str = None, participant: str = None, **kwargs):
+ def __parse_pts(self, status, json_data):
+
+ return PresentationTimeStamp(json_data['pts'])
+
+ def __parse_vts(self, status, json_data):
+
+ # ts is not sent when recording
+ try:
+
+ ts = json_data['ts']
+
+ except KeyError:
+
+ ts = -1
+
+ return VideoTimeStamp(json_data['vts'], ts)
+
+ def __parse_event_synch(self, status, json_data):
+
+ return EventSynch(json_data['evts'])
+
+ def __parse_event(self, status, json_data):
- # DEBUG
- print('TobiiProGlasses2.__init__')
+ return Event(json_data['ets'], json_data['type'], json_data['tag'])
+
+ def __parse_accelerometer(self, status, json_data):
+
+ return Accelerometer(json_data['ac'])
+
+ def __parse_gyroscope(self, status, json_data):
+
+ return Gyroscope(json_data['gy'])
+
+ def __parse_pupill_center(self, status, gaze_index, json_data):
+
+ return PupillCenter(status, gaze_index, json_data['pc'], json_data['eye'])
+
+ def __parse_pupill_diameter(self, status, gaze_index, json_data):
+
+ return PupillDiameter(status, gaze_index, json_data['pd'], json_data['eye'])
+
+ def __parse_gaze_direction(self, status, gaze_index, json_data):
+
+ return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye'])
+
+ def __parse_gaze_position(self, status, gaze_index, json_data):
+
+ return GazePosition(status, gaze_index, json_data['l'], json_data['gp'])
+
+ def __parse_gaze_position_3d(self, status, gaze_index, json_data):
+
+ return GazePosition3D(status, gaze_index, json_data['gp3'])
+
+ def __parse_marker_position(self, status, json_data):
+
+ return MarkerPosition(json_data['marker3d'], json_data['marker2d'])
+
+class Provider(DataFeatures.PipelineInputProvider):
+
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
# Init parent classes
- DataFeatures.PipelineInputProvider.__init__(self, **kwargs)
- '''
- self.__project_name = project
- self.__participant_name = participant
+ super().__init__()
+ # Init private attributes
+ self.__address = None
self.__udpport = 49152
+
+ self.__project_name = None
+ self.__project_id = None
+
+ self.__participant_name = None
+ self.__participant_id = None
+
+ self.__configuration = {}
+
+ self.__parser = TobiiJsonDataParser()
+
+ @property
+ def address(self) -> str:
+ """Network address where to find the device."""
+ return self.__address
+
+ @address.setter
+ def address(self, address:str):
+
self.__address = address
# Remove part after % on under Windows
@@ -82,66 +326,225 @@ class Provider(DataFeatures.PipelineInputProvider):
self.__base_url = 'http://' + self.__address
- # Create Tobii data parser
- self.__parser = TobiiJsonDataParser()
- '''
+ @property
+ def configuration(self)-> dict:
+ """Patch system configuration dictionary."""
+ return self.__configuration
+
+ @configuration.setter
+ def configuration(self, configuration:dict):
+
+ self.__configuration = configuration
+
+ @property
+ def project(self) -> str:
+ """Project name."""
+ return self.__project_name
+
+ @project.setter
+ def project(self, project:str):
+
+ self.__project_name = project
+
+ def __bind_project(self):
+ """Bind to a project or create one if it doesn't exist."""
+
+ if self.__project_name is None:
+
+ raise Exception(f'Project binding fails: setup project before.')
+
+ self.__project_id = None
+
+ # Check if project exist
+ projects = self.__get_request('/api/projects')
+
+ for project in projects:
+
+ try:
+
+ if project['pr_info']['Name'] == self.__project_name:
+
+ self.__project_id = project['pr_id']
+
+ logging.debug('> %s project already exist: %s', self.__project_name, self.__project_id)
+
+ except:
+
+ pass
+
+ # The project doesn't exist, create one
+ if self.__project_id is None:
+
+ logging.debug('> %s project doesn\'t exist', self.__project_name)
+
+ data = {
+ 'pr_info' : {
+ 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD),
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)),
+ 'Name': self.__project_name
+ },
+ 'pr_created': self.__get_current_datetime()
+ }
+
+ json_data = self.__post_request('/api/projects', data)
+
+ self.__project_id = json_data['pr_id']
+
+ logging.debug('> new %s project created: %s', self.__project_name, self.__project_id)
+
+ @property
+ def participant(self)-> str:
+ """Participant name"""
+ return self.__participant_name
+
+ @participant.setter
+ def participant(self, participant:str):
+
+ self.__participant_name = participant
+
+ def __bind_participant(self):
+ """Bind to a participant or create one if it doesn't exist.
+
+ !!! warning
+ Bind to a project before.
+ """
+
+ if self.__participant_name is None:
+
+ raise Exception(f'Participant binding fails: setup participant before.')
+
+ if self.__project_id is None :
+
+ raise Exception(f'Participant binding fails: bind to a project before')
+
+ self.__participant_id = None
+
+ # Check if participant exist
+ participants = self.__get_request('/api/participants')
+
+ for participant in participants:
+
+ try:
+
+ if participant['pa_info']['Name'] == self.__participant_name:
+
+ self.__participant_id = participant['pa_id']
+
+ logging.debug('> %s participant already exist: %s', self.__participant_name, self.__participant_id)
+
+ except:
+
+ pass
+
+ # The participant doesn't exist, create one
+ if self.__participant_id is None:
+
+ logging.debug('> %s participant doesn\'t exist', self.__participant_name)
+
+ data = {
+ 'pa_project': self.__project_id,
+ 'pa_info': {
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__participant_name)),
+ 'Name': self.__participant_name,
+ 'Notes': '' # TODO: set participant notes
+ },
+ 'pa_created': self.__get_current_datetime()
+ }
+
+ json_data = self.__post_request('/api/participants', data)
+
+ self.__participant_id = json_data['pa_id']
+
+ logging.debug('> new %s participant created: %s', self.__participant_name, self.__participant_id)
+
def __enter__(self):
- # DEBUG
- print('TobiiProGlasses2.__enter__')
- '''
- # Bind to project or create one if required
+ logging.info('Tobii Pro Glasses 2 connexion starts...')
+ logging.debug('TobiiProGlasses2.Provider.__enter__')
+
+ # Update current configuration with configuration patch
+ logging.debug('> updating configuration')
+
+ configuration = self.__get_request('/api/system/conf')
+
+ if self.__configuration:
+
+ configuration.update(self.__configuration)
+ configuration = self.__post_request('/api/system/conf', configuration)
+
+ # Log current configuration
+ logging.info('Tobii Pro Glasses 2 configuration:')
+
+ for key, value in configuration.items():
+
+ logging.info('%s: %s', key, str(value))
+
+ # Store video stream dimension
+ self.__video_width = configuration['sys_sc_width']
+ self.__video_height = configuration['sys_sc_height']
+
+ # Bind to project if required
if self.__project_name is not None:
- self.set_project(self.__project_name)
+ logging.debug('> binding project %s', self.__project_name)
- # Bind to participant or create one if required
- if self.__participant_name is not None:
+ self.__bind_project()
- self.set_participant(self.__project_name, self.__participant_name)
+ logging.info('Tobii Pro Glasses 2 project id: %s', self.__project_id)
- # TODO: Configure Tobii device as required
+ # Bind to participant if required
+ if self.__participant_name is not None:
- # Setup camera at 25 fps to work on Full HD video stream
- self.set_scene_camera_freq_25()
+ logging.debug('> binding participant %s', self.__participant_name)
- # Setup eye tracking at 50 fps
- self.set_et_freq_50()
+ self.__bind_participant()
- # Update video stream dimension
- self.__video_width = self.get_configuration()['sys_sc_width']
- self.__video_height = self.get_configuration()['sys_sc_height']
+ logging.info('Tobii Pro Glasses 2 participant id: %s', self.__participant_id)
+
+ # Create stop event
+ self.__stop_event = threading.Event()
# Open data stream
self.__data_socket = self.__make_socket()
- self.__data_thread = threading.Thread(target = self.__run)
+ self.__data_thread = threading.Thread(target = self.__stream_data)
self.__data_thread.daemon = True
+
+ logging.debug('> starting data thread...')
self.__data_thread.start()
+ # Open video stream
+ self.__video_socket = self.__make_socket()
+ self.__video_thread = threading.Thread(target = self.__stream_video)
+ self.__video_thread.daemon = True
+
+ logging.debug('> starting video thread...')
+ self.__video_thread.start()
+
# Keep connection alive
self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
self.__keep_alive_thread.daemon = True
- self.__keep_alive_thread.start()
- # Create stop event
- self.__stop_event = threading.Event()
- '''
+ logging.debug('> starting keep alive thread...')
+ self.__keep_alive_thread.start()
return self
- def __exit__(self):
+ def __exit__(self, exception_type, exception_value, exception_traceback):
- # DEBUG
- print('TobiiProGlasses2.__exit__')
- '''
+ logging.debug('TobiiProGlasses2.Provider.__exit__')
+
# Close data stream
self.__stop_event.set()
+ # Stop keeping connection alive
+ threading.Thread.join(self.__keep_alive_thread)
+ self.__keep_alive_thread = None
+
+ # Stop data streaming
threading.Thread.join(self.__data_thread)
self.__data_thread = None
- '''
-'''
+
def __make_socket(self):
"""Create a socket to enable network communication."""
@@ -167,12 +570,14 @@ class Provider(DataFeatures.PipelineInputProvider):
if e.errno == 1:
- print("Binding to a network interface is permitted only for root users.")
+ logging.error('Binding to a network interface is permitted only for root users.')
return new_socket
- def __grab_data(self):
- """Grab data from dedicated socket."""
+ def __stream_data(self):
+ """Stream data from dedicated socket."""
+
+ logging.debug('TobiiProGlasses2.Provider.__stream_data')
while not self.__stop_event.is_set():
@@ -182,12 +587,12 @@ class Provider(DataFeatures.PipelineInputProvider):
except TimeoutError:
- print("A timeout occurred while receiving data")
+ logging.error('> timeout occurred while receiving data')
if data is not None:
# Parse json into timestamped data object
- data_ts, data_object, data_object_type = self.__parser.parse(json_data)
+ data_ts, data_object, data_object_type = self.__parser.parse(data)
# Edit millisecond timestamp
timestamp = int(data_ts * 1e-3)
@@ -202,10 +607,8 @@ class Provider(DataFeatures.PipelineInputProvider):
# Edit timestamped gaze position
timestamped_gaze_position = GazeFeatures.GazePosition((int(data_object.value[0] * self.__video_width), int(data_object.value[1] * self.__video_height)), timestamp=timestamp)
- # Send timestamp and gaze position coordinates
-
# DEBUG
- print('TobiiProGlasses2.__grab_data', timestamped_gaze_position)
+ print('TobiiProGlasses2.__stream_data', timestamped_gaze_position)
#self.gaze_position_callback(timestamped_gaze_position)
else:
@@ -214,36 +617,118 @@ class Provider(DataFeatures.PipelineInputProvider):
empty_gaze_position = GazeFeatures.GazePosition(timestamp=timestamp)
# DEBUG
- print('TobiiProGlasses2.__grab_data', empty_gaze_position)
+ print('TobiiProGlasses2.__stream_data', empty_gaze_position)
#self.gaze_position_callback(empty_gaze_position)
-
+
+ def __stream_video(self):
+ """Stream video from dedicated socket."""
+
+ logging.debug('TobiiProGlasses2.Provider.__stream_video')
+
+ container = av.open(f'rtsp://{self.__address}:8554/live/scene', options={'rtsp_transport': 'tcp'})
+ self.__stream = container.streams.video[0]
+ #self.__buffer = collections.OrderedDict()
+
+ for image in container.decode(self.__stream):
+
+ logging.debug('> new image decoded')
+
+ # Quit if the video acquisition thread have been stopped
+ if self.__stop_event.is_set():
+
+ logging.debug('> stop event is set')
+ break
+
+ if image is not None:
+
+ timestamp = int(image.time * 1e6)
+
+ logging.debug('> image timestamp: %d', image.time)
+ '''
+ # Select callback reading mode
+ if len(self.reading_callbacks) > 0:
+
+ # Lock data subcription
+ self.__subcription_lock.acquire()
+
+ # Share incoming data to all subscribers
+ for callback in self.reading_callbacks:
+
+ callback(timestamp, image.to_ndarray(format='bgr24'))
+
+ # Unlock data subscription
+ self.__subcription_lock.release()
+
+ # Select buffer reading mode
+ else:
+
+ # Lock buffer access
+ self.__buffer_lock.acquire()
+
+ # Decoding image and storing at time index
+ self.__buffer[timestamp] = image.to_ndarray(format='bgr24')
+
+ # Unlock buffer access
+ self.__buffer_lock.release()
+ '''
def __keep_alive(self):
"""Maintain network connection."""
- while True:
+ logging.debug('TobiiProGlasses2.Provider.__keep_alive')
+
+ while not self.__stop_event.is_set():
self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
- # TODO: send message to video socket
+ self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
+
time.sleep(1)
- def __post_request(self, api_action, data=None, wait_for_response=True) -> str:
+ def __get_request(self, api_action) -> str:
+ """Send a GET request and get data back."""
+
+ url = self.__base_url + api_action
+
+ logging.debug('TobiiProGlasses2.Provider.__get_request %s', url)
+
+ res = urlopen(url).read()
+
+ try:
+
+ data = json.loads(res.decode('utf-8'))
+
+ except json.JSONDecodeError:
+
+ data = None
+
+ logging.debug('TobiiProGlasses2.Provider.__get_request received: %s', data)
+
+ return data
+
+ def __post_request(self, api_action, data = None, wait_for_response = True) -> str:
"""Send a POST request and get result back."""
+ logging.debug('TobiiProGlasses2.Provider.__post_request %s', api_action)
+
url = self.__base_url + api_action
req = Request(url)
req.add_header('Content-Type', 'application/json')
data = json.dumps(data)
if wait_for_response is False:
+
threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start()
+
return None
response = urlopen(req, data.encode('utf-8'))
res = response.read()
try:
+
res = json.loads(res.decode('utf-8'))
+
except:
+
pass
return res
@@ -282,121 +767,12 @@ class Provider(DataFeatures.PipelineInputProvider):
return datetime.datetime.now().replace(microsecond=0).strftime(timeformat)
- # PROJECT FEATURES
-
- def set_project(self, project_name = DEFAULT_PROJECT_NAME) -> str:
- """Bind to a project or create one if it doesn't exist.
-
- Returns:
- project id
- """
-
- project_id = self.get_project_id(project_name)
-
- if project_id is None:
-
- data = {
- 'pr_info' : {
- 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD),
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, project_name)),
- 'Name': project_name
- },
- 'pr_created': self.__get_current_datetime()
- }
-
- json_data = self.__post_request('/api/projects', data)
-
- return json_data['pr_id']
-
- else:
-
- return project_id
-
- def get_project_id(self, project_name) -> str:
- """Get project id."""
-
- project_id = None
- projects = self.__get_request('/api/projects')
-
- for project in projects:
-
- try:
- if project['pr_info']['Name'] == project_name:
- project_id = project['pr_id']
- except:
- pass
-
- return project_id
-
- def get_projects(self) -> str:
- """Get all projects id."""
-
- return self.__get_request('/api/projects')
-
- # PARTICIPANT FEATURES
-
- def set_participant(self, project_name, participant_name, participant_notes = '') -> str:
- """Bind to a participant into a project or create one if it doesn't exist.
-
- Returns:
- participant id
- """
- project_id = self.get_project_id(project_name)
- participant_id = self.get_participant_id(participant_name)
-
- # Participant creation is done into a project
- if project_id is None :
-
- raise Exception(f'Participant creation fails: setup project before')
-
- if participant_id is None:
-
- data = {
- 'pa_project': project_id,
- 'pa_info': {
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)),
- 'Name': participant_name,
- 'Notes': participant_notes
- },
- 'pa_created': self.__get_current_datetime()
- }
-
- json_data = self.__post_request('/api/participants', data)
-
- return json_data['pa_id']
-
- else:
-
- return participant_id
-
- def get_participant_id(self, participant_name) -> str:
- """Get participant id."""
-
- participant_id = None
- participants = self.__get_request('/api/participants')
-
- for participant in participants:
-
- try:
- if participant['pa_info']['Name'] == participant_name:
- participant_id = participant['pa_id']
-
- except:
- pass
-
- return participant_id
-
- def get_participants(self) -> str:
- """Get all participants id."""
-
- return self.__get_request('/api/participants')
-
# CALIBRATION
def calibration_start(self, project_name, participant_name):
"""Start calibration process for project and participant."""
- project_id = self.get_project_id(project_name)
+ project_id = self.__get_project_id(project_name)
participant_id = self.get_participant_id(participant_name)
# Init calibration id
@@ -462,7 +838,7 @@ class Provider(DataFeatures.PipelineInputProvider):
def calibration_start(self, project_name, participant_name):
"""Start calibration process for project and participant."""
- project_id = self.get_project_id(project_name)
+ project_id = self.__get_project_id(project_name)
participant_id = self.get_participant_id(participant_name)
# Init calibration id
@@ -481,18 +857,18 @@ class Provider(DataFeatures.PipelineInputProvider):
}
# Request calibration
- json_data = super().post_request('/api/calibrations', data)
+ json_data = self.__post_request('/api/calibrations', data)
self.__calibration_id = json_data['ca_id']
# Start calibration
- super().post_request('/api/calibrations/' + self.__calibration_id + '/start')
+ self.__post_request('/api/calibrations/' + self.__calibration_id + '/start')
def calibration_status(self) -> str:
"""Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed."""
if self.__calibration_id is not None:
- status = super().wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
+ status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
# Forget calibration id
if status != 'calibrating':
@@ -526,7 +902,7 @@ class Provider(DataFeatures.PipelineInputProvider):
# RECORDING FEATURES
def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']):
- return super().wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array)
+ return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array)
def create_recording(self, participant_name, recording_name = '', recording_notes = '') -> str:
"""Create a new recording.
@@ -550,26 +926,26 @@ class Provider(DataFeatures.PipelineInputProvider):
'rec_created': self.__get_current_datetime()
}
- json_data = super().post_request('/api/recordings', data)
+ json_data = self.__post_request('/api/recordings', data)
return json_data['rec_id']
def start_recording(self, recording_id) -> bool:
"""Start recording on the Tobii interface's SD Card."""
- super().post_request('/api/recordings/' + recording_id + '/start')
+ self.__post_request('/api/recordings/' + recording_id + '/start')
return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording'
def stop_recording(self, recording_id) -> bool:
"""Stop recording on the Tobii interface's SD Card."""
- super().post_request('/api/recordings/' + recording_id + '/stop')
+ self.__post_request('/api/recordings/' + recording_id + '/stop')
return self.__wait_for_recording_status(recording_id, ['done']) == "done"
def pause_recording(self, recording_id) -> bool:
"""Pause recording on the Tobii interface's SD Card."""
- super().post_request('/api/recordings/' + recording_id + '/pause')
+ self.__post_request('/api/recordings/' + recording_id + '/pause')
return self.__wait_for_recording_status(recording_id, ['paused']) == "paused"
def __get_recording_status(self):
@@ -595,13 +971,13 @@ class Provider(DataFeatures.PipelineInputProvider):
def get_recordings(self) -> str:
"""Get all recordings id."""
- return super().get_request('/api/recordings')
+ return self.__get_request('/api/recordings')
# EVENTS AND EXPERIMENTAL VARIABLES
def __post_recording_data(self, event_type: str, event_tag = ''):
data = {'type': event_type, 'tag': event_tag}
- super().post_request('/api/events', data, wait_for_response=False)
+ self.__post_request('/api/events', data, wait_for_response=False)
def send_event(self, event_type: str, event_value = None):
self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value))
@@ -612,7 +988,7 @@ class Provider(DataFeatures.PipelineInputProvider):
# MISC
def eject_sd(self):
- super().get_request('/api/eject')
+ self.__get_request('/api/eject')
def get_battery_info(self):
return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) )
@@ -633,16 +1009,13 @@ class Provider(DataFeatures.PipelineInputProvider):
return self.get_status()['sys_et']['frequencies']
def identify(self):
- super().get_request('/api/identify')
-
- def get_address(self):
- return self.address
+ self.__get_request('/api/identify')
def get_configuration(self):
- return super().get_request('/api/system/conf')
+ return self.__get_request('/api/system/conf')
def get_status(self):
- return super().get_request('/api/system/status')
+ return self.__get_request('/api/system/status')
def get_storage_info(self):
return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) )
@@ -658,260 +1031,33 @@ class Provider(DataFeatures.PipelineInputProvider):
def set_et_freq_50(self):
data = {'sys_et_freq': 50}
- json_data = super().post_request('/api/system/conf', data)
+ json_data = self.__post_request('/api/system/conf', data)
def set_et_freq_100(self):
# May not be available. Check get_et_frequencies() first.
data = {'sys_et_freq': 100}
- json_data = super().post_request('/api/system/conf', data)
+ json_data = self.__post_request('/api/system/conf', data)
def set_eye_camera_indoor_preset(self) -> str:
data = {'sys_ec_preset': 'Indoor'}
- return super().post_request('/api/system/conf', data)
+ return self.__post_request('/api/system/conf', data)
def set_eye_camera_outdoor_preset(self) -> str:
data = {'sys_ec_preset': 'ClearWeather'}
- return super().post_request('/api/system/conf', data)
+ return self.__post_request('/api/system/conf', data)
def set_scene_camera_auto_preset(self):
data = {'sys_sc_preset': 'Auto'}
- json_data = super().post_request('/api/system/conf', data)
+ json_data = self.__post_request('/api/system/conf', data)
def set_scene_camera_gaze_preset(self):
data = {'sys_sc_preset': 'GazeBasedExposure'}
- json_data = super().post_request('/api/system/conf', data)
+ json_data = self.__post_request('/api/system/conf', data)
def set_scene_camera_freq_25(self):
data = {'sys_sc_fps': 25}
- json_data = super().post_request('/api/system/conf/', data)
+ json_data = self.__post_request('/api/system/conf/', data)
def set_scene_camera_freq_50(self):
data = {'sys_sc_fps': 50}
- json_data = super().post_request('/api/system/conf/', data)
-
-# Define extra classes to support Tobii data parsing
-@dataclass
-class DirSig():
- """Define dir sig data (dir sig)."""
-
- dir: int # meaning ?
- sig: int # meaning ?
-
-@dataclass
-class PresentationTimeStamp():
- """Define presentation time stamp (pts) data."""
-
- value: int
- """Pts value."""
-
-@dataclass
-class VideoTimeStamp():
- """Define video time stamp (vts) data."""
-
- value: int
- """Vts value."""
-
- offset: int
- """Primary time stamp value."""
-
-@dataclass
-class EventSynch():
- """Define event synch (evts) data."""
-
- value: int # meaning ?
- """Evts value."""
-
-@dataclass
-class Event():
- """Define event data (ets type tag)."""
-
- ets: int # meaning ?
- type: str
- tag: str # dict ?
-
-@dataclass
-class Accelerometer():
- """Define accelerometer data (ac)."""
-
- value: numpy.array
- """Accelerometer value"""
-
-@dataclass
-class Gyroscope():
- """Define gyroscope data (gy)."""
-
- value: numpy.array
- """Gyroscope value"""
-
-@dataclass
-class PupillCenter():
- """Define pupill center data (gidx pc eye)."""
-
- validity: int
- index: int
- value: tuple((float, float, float))
- eye: str # 'right' or 'left'
-
-@dataclass
-class PupillDiameter():
- """Define pupill diameter data (gidx pd eye)."""
-
- validity: int
- index: int
- value: float
- eye: str # 'right' or 'left'
-
-@dataclass
-class GazeDirection():
- """Define gaze direction data (gidx gd eye)."""
-
- validity: int
- index: int
- value: tuple((float, float, float))
- eye: str # 'right' or 'left'
-
-@dataclass
-class GazePosition():
- """Define gaze position data (gidx l gp)."""
-
- validity: int
- index: int
- l: str # ?
- value: tuple((float, float))
-
-@dataclass
-class GazePosition3D():
- """Define gaze position 3D data (gidx gp3)."""
-
- validity: int
- index: int
- value: tuple((float, float))
-
-@dataclass
-class MarkerPosition():
- """Define marker data (marker3d marker2d)."""
-
- value_3d: tuple((float, float, float))
- value_2d: tuple((float, float))
-
-class TobiiJsonDataParser():
-
- def __init__(self):
-
- self.__first_ts = 0
-
- self.__parse_data_map = {
- 'dir': self.__parse_dir_sig,
- 'pts': self.__parse_pts,
- 'vts': self.__parse_vts,
- 'evts': self.__parse_event_synch,
- 'ets': self.__parse_event,
- 'ac': self.__parse_accelerometer,
- 'gy': self.__parse_gyroscope,
- 'gidx': self.__parse_pupill_or_gaze,
- 'marker3d': self.__parse_marker_position
- }
-
- self.__parse_pupill_or_gaze_map = {
- 'pc': self.__parse_pupill_center,
- 'pd': self.__parse_pupill_diameter,
- 'gd': self.__parse_gaze_direction,
- 'l': self.__parse_gaze_position,
- 'gp3': self.__parse_gaze_position_3d
- }
-
- def parse(self, data):
-
- json_data = json.loads(data.decode('utf-8'))
-
- # Parse data status
- status = json_data.pop('s', -1)
-
- # Parse timestamp
- data_ts = json_data.pop('ts')
-
- # Parse data depending first json key
- first_key = next(iter(json_data))
-
- # Convert json data into data object
- data_object = self.__parse_data_map[first_key](status, json_data)
- data_object_type = type(data_object).__name__
-
- # Keep first timestamp to offset all timestamps
- if self.__first_ts == 0:
- self.__first_ts = data_ts
-
- data_ts -= self.__first_ts
-
- return data_ts, data_object, data_object_type
-
- def __parse_pupill_or_gaze(self, status, json_data):
-
- gaze_index = json_data.pop('gidx')
-
- # parse pupill or gaze data depending second json key
- second_key = next(iter(json_data))
-
- return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, json_data)
-
- def __parse_dir_sig(self, status, json_data):
-
- return DirSig(json_data['dir'], json_data['sig'])
-
- def __parse_pts(self, status, json_data):
-
- return PresentationTimeStamp(json_data['pts'])
-
- def __parse_vts(self, status, json_data):
-
- # ts is not sent when recording
- try:
-
- ts = json_data['ts']
-
- except KeyError:
-
- ts = -1
-
- return VideoTimeStamp(json_data['vts'], ts)
-
- def __parse_event_synch(self, status, json_data):
-
- return EventSynch(json_data['evts'])
-
- def __parse_event(self, status, json_data):
-
- return Event(json_data['ets'], json_data['type'], json_data['tag'])
-
- def __parse_accelerometer(self, status, json_data):
-
- return Accelerometer(json_data['ac'])
-
- def __parse_gyroscope(self, status, json_data):
-
- return Gyroscope(json_data['gy'])
-
- def __parse_pupill_center(self, status, gaze_index, json_data):
-
- return PupillCenter(status, gaze_index, json_data['pc'], json_data['eye'])
-
- def __parse_pupill_diameter(self, status, gaze_index, json_data):
-
- return PupillDiameter(status, gaze_index, json_data['pd'], json_data['eye'])
-
- def __parse_gaze_direction(self, status, gaze_index, json_data):
-
- return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye'])
-
- def __parse_gaze_position(self, status, gaze_index, json_data):
-
- return GazePosition(status, gaze_index, json_data['l'], json_data['gp'])
-
- def __parse_gaze_position_3d(self, status, gaze_index, json_data):
-
- return GazePosition3D(status, gaze_index, json_data['gp3'])
-
- def __parse_marker_position(self, status, json_data):
-
- return MarkerPosition(json_data['marker3d'], json_data['marker2d'])
-''' \ No newline at end of file
+ json_data = self.__post_request('/api/system/conf/', data)