aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/utils/contexts/TobiiProGlasses2.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/argaze/utils/contexts/TobiiProGlasses2.py')
-rw-r--r--src/argaze/utils/contexts/TobiiProGlasses2.py1162
1 files changed, 1162 insertions, 0 deletions
diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py
new file mode 100644
index 0000000..8b92fef
--- /dev/null
+++ b/src/argaze/utils/contexts/TobiiProGlasses2.py
@@ -0,0 +1,1162 @@
+""" Handle network connection to Tobii Pro Glasses 2 device.
+ It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py).
+
+This program is free software: you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation, either version 3 of the License, or (at your option) any later
+version.
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+You should have received a copy of the GNU General Public License along with
+this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+__author__ = "Théo de la Hogue"
+__credits__ = []
+__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
+__license__ = "GPLv3"
+
+import sys
+import logging
+import socket
+import threading
+import collections
+import json
+import time
+import datetime
+import uuid
+from dataclasses import dataclass
+
+try:
+ from urllib.parse import urlparse, urlencode
+ from urllib.request import urlopen, Request
+ from urllib.error import URLError, HTTPError
+
+except ImportError:
+ from urlparse import urlparse
+ from urllib import urlencode
+ from urllib2 import urlopen, Request, HTTPError, URLError
+
+from argaze import ArFeatures, DataFeatures, GazeFeatures
+from argaze.utils import UtilsFeatures
+
+import numpy
+import cv2
+import av
+
+socket.IPPROTO_IPV6 = 41
+
+TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f'
+TOBII_DATETIME_FORMAT_HUMREAD = '%d/%m/%Y %H:%M:%S'
+
+DEFAULT_PROJECT_NAME = 'DefaultProject'
+DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant'
+DEFAULT_RECORD_NAME = 'DefaultRecord'
+
+# Define default Tobii image_parameters values
+DEFAULT_TOBII_IMAGE_PARAMETERS = {
+ "draw_something": False
+}
+
+# Define extra classes to support Tobii data parsing
+@dataclass
+class DirSig():
+ """Define dir sig data (dir sig)."""
+
+ dir: int # meaning ?
+ sig: int # meaning ?
+
+@dataclass
+class PresentationTimeStamp():
+ """Define presentation time stamp (pts) data."""
+
+ value: int
+ """Pts value."""
+
+@dataclass
+class VideoTimeStamp():
+ """Define video time stamp (vts) data."""
+
+ value: int
+ """Vts value."""
+
+ offset: int
+ """Primary time stamp value."""
+
+@dataclass
+class EventSynch():
+ """Define event synch (evts) data."""
+
+ value: int # meaning ?
+ """Evts value."""
+
+@dataclass
+class Event():
+ """Define event data (ets type tag)."""
+
+ ets: int # meaning ?
+ type: str
+ tag: str # dict ?
+
+@dataclass
+class Accelerometer():
+ """Define accelerometer data (ac)."""
+
+ value: numpy.array
+ """Accelerometer value"""
+
+@dataclass
+class Gyroscope():
+ """Define gyroscope data (gy)."""
+
+ value: numpy.array
+ """Gyroscope value"""
+
+@dataclass
+class PupillCenter():
+ """Define pupill center data (gidx pc eye)."""
+
+ validity: int
+ index: int
+ value: tuple((float, float, float))
+ eye: str # 'right' or 'left'
+
+@dataclass
+class PupillDiameter():
+ """Define pupill diameter data (gidx pd eye)."""
+
+ validity: int
+ index: int
+ value: float
+ eye: str # 'right' or 'left'
+
+@dataclass
+class GazeDirection():
+ """Define gaze direction data (gidx gd eye)."""
+
+ validity: int
+ index: int
+ value: tuple((float, float, float))
+ eye: str # 'right' or 'left'
+
+@dataclass
+class GazePosition():
+ """Define gaze position data (gidx l gp)."""
+
+ validity: int
+ index: int
+ l: str # ?
+ value: tuple((float, float))
+
+@dataclass
+class GazePosition3D():
+ """Define gaze position 3D data (gidx gp3)."""
+
+ validity: int
+ index: int
+ value: tuple((float, float))
+
+@dataclass
+class MarkerPosition():
+ """Define marker data (marker3d marker2d)."""
+
+ value_3d: tuple((float, float, float))
+ value_2d: tuple((float, float))
+
+class TobiiJsonDataParser():
+
+ def __init__(self):
+
+ self.__parse_data_map = {
+ 'dir': self.__parse_dir_sig,
+ 'pts': self.__parse_pts,
+ 'vts': self.__parse_vts,
+ 'evts': self.__parse_event_synch,
+ 'ets': self.__parse_event,
+ 'ac': self.__parse_accelerometer,
+ 'gy': self.__parse_gyroscope,
+ 'gidx': self.__parse_pupill_or_gaze,
+ 'marker3d': self.__parse_marker_position
+ }
+
+ self.__parse_pupill_or_gaze_map = {
+ 'pc': self.__parse_pupill_center,
+ 'pd': self.__parse_pupill_diameter,
+ 'gd': self.__parse_gaze_direction,
+ 'l': self.__parse_gaze_position,
+ 'gp3': self.__parse_gaze_position_3d
+ }
+
+ def parse(self, data):
+
+ json_data = json.loads(data.decode('utf-8'))
+
+ # Parse data status
+ status = json_data.pop('s', -1)
+
+ # Parse timestamp
+ data_ts = json_data.pop('ts')
+
+ # Parse data depending first json key
+ first_key = next(iter(json_data))
+
+ # Convert json data into data object
+ data_object = self.__parse_data_map[first_key](status, json_data)
+ data_object_type = type(data_object).__name__
+
+ return data_ts, data_object, data_object_type
+
+ def __parse_pupill_or_gaze(self, status, json_data):
+
+ gaze_index = json_data.pop('gidx')
+
+ # parse pupill or gaze data depending second json key
+ second_key = next(iter(json_data))
+
+ return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, json_data)
+
+ def __parse_dir_sig(self, status, json_data):
+
+ return DirSig(json_data['dir'], json_data['sig'])
+
+ def __parse_pts(self, status, json_data):
+
+ return PresentationTimeStamp(json_data['pts'])
+
+ def __parse_vts(self, status, json_data):
+
+ # ts is not sent when recording
+ try:
+
+ ts = json_data['ts']
+
+ except KeyError:
+
+ ts = -1
+
+ return VideoTimeStamp(json_data['vts'], ts)
+
+ def __parse_event_synch(self, status, json_data):
+
+ return EventSynch(json_data['evts'])
+
+ def __parse_event(self, status, json_data):
+
+ return Event(json_data['ets'], json_data['type'], json_data['tag'])
+
+ def __parse_accelerometer(self, status, json_data):
+
+ return Accelerometer(json_data['ac'])
+
+ def __parse_gyroscope(self, status, json_data):
+
+ return Gyroscope(json_data['gy'])
+
+ def __parse_pupill_center(self, status, gaze_index, json_data):
+
+ return PupillCenter(status, gaze_index, json_data['pc'], json_data['eye'])
+
+ def __parse_pupill_diameter(self, status, gaze_index, json_data):
+
+ return PupillDiameter(status, gaze_index, json_data['pd'], json_data['eye'])
+
+ def __parse_gaze_direction(self, status, gaze_index, json_data):
+
+ return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye'])
+
+ def __parse_gaze_position(self, status, gaze_index, json_data):
+
+ return GazePosition(status, gaze_index, json_data['l'], json_data['gp'])
+
+ def __parse_gaze_position_3d(self, status, gaze_index, json_data):
+
+ return GazePosition3D(status, gaze_index, json_data['gp3'])
+
+ def __parse_marker_position(self, status, json_data):
+
+ return MarkerPosition(json_data['marker3d'], json_data['marker2d'])
+
+class LiveStream(ArFeatures.ArContext):
+
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+
+ # Init parent classes
+ super().__init__()
+
+ # Init private attributes
+ self.__address = None
+ self.__udpport = 49152
+
+ self.__project_name = None
+ self.__project_id = None
+
+ self.__participant_name = None
+ self.__participant_id = None
+
+ self.__configuration = {}
+
+ self.__parser = TobiiJsonDataParser()
+
+ self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS}
+
+ @property
+ def address(self) -> str:
+ """Network address where to find the device."""
+ return self.__address
+
+ @address.setter
+ def address(self, address:str):
+
+ self.__address = address
+
+ # Remove part after % on under Windows
+ if "%" in self.__address:
+
+ if sys.platform == "win32":
+
+ self.__address = self.__address.split("%")[0]
+
+ # Define base url
+ if ':' in self.__address:
+
+ self.__base_url = f'http://[{self.__address}]'
+
+ else:
+
+ self.__base_url = 'http://' + self.__address
+
+ @property
+ def configuration(self)-> dict:
+ """Patch system configuration dictionary."""
+ return self.__configuration
+
+ @configuration.setter
+ @DataFeatures.PipelineStepAttributeSetter
+ def configuration(self, configuration: dict):
+
+ self.__configuration = configuration
+
+ @property
+ def project(self) -> str:
+ """Project name."""
+ return self.__project_name
+
+ @project.setter
+ def project(self, project:str):
+
+ self.__project_name = project
+
+ def __bind_project(self):
+ """Bind to a project or create one if it doesn't exist."""
+
+ if self.__project_name is None:
+
+ raise Exception(f'Project binding fails: setup project before.')
+
+ self.__project_id = None
+
+ # Check if project exist
+ projects = self.__get_request('/api/projects')
+
+ for project in projects:
+
+ try:
+
+ if project['pr_info']['Name'] == self.__project_name:
+
+ self.__project_id = project['pr_id']
+
+ logging.debug('> %s project already exist: %s', self.__project_name, self.__project_id)
+
+ except:
+
+ pass
+
+ # The project doesn't exist, create one
+ if self.__project_id is None:
+
+ logging.debug('> %s project doesn\'t exist', self.__project_name)
+
+ data = {
+ 'pr_info' : {
+ 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD),
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)),
+ 'Name': self.__project_name
+ },
+ 'pr_created': self.__get_current_datetime()
+ }
+
+ json_data = self.__post_request('/api/projects', data)
+
+ self.__project_id = json_data['pr_id']
+
+ logging.debug('> new %s project created: %s', self.__project_name, self.__project_id)
+
+ @property
+ def participant(self)-> str:
+ """Participant name"""
+ return self.__participant_name
+
+ @participant.setter
+ def participant(self, participant:str):
+
+ self.__participant_name = participant
+
+ def __bind_participant(self):
+ """Bind to a participant or create one if it doesn't exist.
+
+ !!! warning
+ Bind to a project before.
+ """
+
+ if self.__participant_name is None:
+
+ raise Exception(f'Participant binding fails: setup participant before.')
+
+ if self.__project_id is None :
+
+ raise Exception(f'Participant binding fails: bind to a project before')
+
+ self.__participant_id = None
+
+ # Check if participant exist
+ participants = self.__get_request('/api/participants')
+
+ for participant in participants:
+
+ try:
+
+ if participant['pa_info']['Name'] == self.__participant_name:
+
+ self.__participant_id = participant['pa_id']
+
+ logging.debug('> %s participant already exist: %s', self.__participant_name, self.__participant_id)
+
+ except:
+
+ pass
+
+ # The participant doesn't exist, create one
+ if self.__participant_id is None:
+
+ logging.debug('> %s participant doesn\'t exist', self.__participant_name)
+
+ data = {
+ 'pa_project': self.__project_id,
+ 'pa_info': {
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__participant_name)),
+ 'Name': self.__participant_name,
+ 'Notes': '' # TODO: set participant notes
+ },
+ 'pa_created': self.__get_current_datetime()
+ }
+
+ json_data = self.__post_request('/api/participants', data)
+
+ self.__participant_id = json_data['pa_id']
+
+ logging.debug('> new %s participant created: %s', self.__participant_name, self.__participant_id)
+
+ @DataFeatures.PipelineStepEnter
+ def __enter__(self):
+
+ logging.info('Tobii Pro Glasses 2 connexion starts...')
+
+ # Update current configuration with configuration patch
+ logging.debug('> updating configuration')
+
+ # Update current configuration with configuration patch
+ if self.__configuration:
+
+ logging.debug('> updating configuration')
+ configuration = self.__post_request('/api/system/conf', self.__configuration)
+
+ # Read current configuration
+ else:
+
+ logging.debug('> reading configuration')
+ configuration = self.__get_request('/api/system/conf')
+
+ # Log current configuration
+ logging.info('Tobii Pro Glasses 2 configuration:')
+
+ for key, value in configuration.items():
+
+ logging.info('%s: %s', key, str(value))
+
+ # Store video stream info
+ self.__video_width = configuration['sys_sc_width']
+ self.__video_height = configuration['sys_sc_height']
+ self.__video_fps = configuration['sys_sc_fps']
+
+ # Bind to project if required
+ if self.__project_name is not None:
+
+ logging.debug('> binding project %s', self.__project_name)
+
+ self.__bind_project()
+
+ logging.info('Tobii Pro Glasses 2 project id: %s', self.__project_id)
+
+ # Bind to participant if required
+ if self.__participant_name is not None:
+
+ logging.debug('> binding participant %s', self.__participant_name)
+
+ self.__bind_participant()
+
+ logging.info('Tobii Pro Glasses 2 participant id: %s', self.__participant_id)
+
+ # Create stop event
+ self.__stop_event = threading.Event()
+
+ # Open data stream
+ self.__data_socket = self.__make_socket()
+ self.__data_thread = threading.Thread(target = self.__stream_data)
+
+ logging.debug('> starting data thread...')
+ self.__data_thread.start()
+
+ # Open video stream
+ self.__video_socket = self.__make_socket()
+ self.__video_thread = threading.Thread(target = self.__stream_video)
+
+ logging.debug('> starting video thread...')
+ self.__video_thread.start()
+
+ # Keep connection alive
+ self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
+ self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
+
+ logging.debug('> starting keep alive thread...')
+ self.__keep_alive_thread.start()
+
+ return self
+
+ @DataFeatures.PipelineStepExit
+ def __exit__(self, exception_type, exception_value, exception_traceback):
+
+ logging.debug('%s.__exit__', type(self).__name__)
+
+ # Close data stream
+ self.__stop_event.set()
+
+ # Stop keeping connection alive
+ threading.Thread.join(self.__keep_alive_thread)
+
+ # Stop data streaming
+ threading.Thread.join(self.__data_thread)
+
+ # Stop video buffer reading
+ threading.Thread.join(self.__video_buffer_read_thread)
+
+ # Stop video streaming
+ threading.Thread.join(self.__video_thread)
+
+ def __image(self, draw_something: bool, **kwargs: dict) -> numpy.array:
+ """Get Tobbi visualisation.
+
+ Parameters:
+ kwargs: ArContext.image parameters
+ """
+
+ # Get context image
+ image = super().image(**kwargs)
+
+ if draw_something:
+
+ cv2.putText(image, 'SOMETHING', (512, 512), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+
+ return image
+
+ def image(self, **kwargs: dict) -> numpy.array:
+ """
+ Get Tobbi visualisation.
+
+ Parameters:
+ kwargs: LiveStream.__image parameters
+ """
+
+ # Use image_parameters attribute if no kwargs
+ if kwargs:
+
+ return self.__image(**kwargs)
+
+ return self.__image(**self._image_parameters)
+
+ def __make_socket(self):
+ """Create a socket to enable network communication."""
+
+ iptype = socket.AF_INET
+
+ if ':' in self.__address:
+
+ iptype = socket.AF_INET6
+
+ res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)
+ family, socktype, proto, canonname, sockaddr = res[0]
+ new_socket = socket.socket(family, socktype, proto)
+
+ new_socket.settimeout(5.0)
+
+ try:
+
+ if iptype == socket.AF_INET6:
+
+ new_socket.setsockopt(socket.SOL_SOCKET, 25, 1)
+
+ except socket.error as e:
+
+ if e.errno == 1:
+
+ logging.error('Binding to a network interface is permitted only for root users.')
+
+ return new_socket
+
+ def __stream_data(self):
+ """Stream data from dedicated socket."""
+
+ logging.debug('%s.__stream_data', type(self).__name__)
+
+ # First timestamp to offset all timestamps
+ first_ts = 0
+
+ while not self.__stop_event.is_set():
+
+ try:
+
+ data, _ = self.__data_socket.recvfrom(1024)
+
+ except TimeoutError:
+
+ logging.error('> timeout occurred while receiving data')
+ continue
+
+ if data is not None:
+
+ # Parse json into timestamped data object
+ data_ts, data_object, data_object_type = self.__parser.parse(data)
+
+ # Store first timestamp
+ if first_ts == 0:
+
+ first_ts = data_ts
+
+ # Edit millisecond timestamp
+ timestamp = int((data_ts - first_ts) * 1e-3)
+
+ match data_object_type:
+
+ case 'GazePosition':
+
+ logging.debug('> received %s at %i timestamp', data_object_type, timestamp)
+
+ # When gaze position is valid
+ if data_object.validity == 0:
+
+ # Process timestamped gaze position
+ self._process_gaze_position(
+ timestamp = timestamp,
+ x = int(data_object.value[0] * self.__video_width),
+ y = int(data_object.value[1] * self.__video_height) )
+
+ else:
+
+ # Process empty gaze position
+ self._process_gaze_position(timestamp = timestamp)
+
+ def __stream_video(self):
+ """Stream video from dedicated socket."""
+
+ logging.debug('%s.__stream_video', type(self).__name__)
+
+ # Open video stream
+ container = av.open(f'rtsp://{self.__address}:8554/live/scene', options={'rtsp_transport': 'tcp'})
+ self.__stream = container.streams.video[0]
+
+ # Create a video buffer with a lock
+ self.__video_buffer = collections.OrderedDict()
+ self.__video_buffer_lock = threading.Lock()
+
+ # Open video buffer reader
+ self.__video_buffer_read_thread = threading.Thread(target = self.__video_buffer_read)
+
+ logging.debug('> starting video buffer reader thread...')
+ self.__video_buffer_read_thread.start()
+
+ # First timestamp to offset all timestamps
+ first_ts = 0
+
+ for image in container.decode(self.__stream):
+
+ logging.debug('> new image decoded')
+
+ # Quit if the video acquisition thread have been stopped
+ if self.__stop_event.is_set():
+
+ logging.debug('> stop event is set')
+ break
+
+ if image is not None:
+
+ if image.time is not None:
+
+ # Store first timestamp
+ if first_ts == 0:
+
+ first_ts = image.time
+
+ # Edit millisecond timestamp
+ timestamp = int((image.time - first_ts) * 1e3)
+
+ logging.debug('> store image at %i timestamp', timestamp)
+
+ # Lock buffer access
+ self.__video_buffer_lock.acquire()
+
+ # Decode image and store it at time index
+ self.__video_buffer[timestamp] = image.to_ndarray(format='bgr24')
+
+ # Unlock buffer access
+ self.__video_buffer_lock.release()
+
+ def __video_buffer_read(self):
+ """Read incoming buffered video images."""
+
+ logging.debug('%s.__video_buffer_read', type(self).__name__)
+
+ while not self.__stop_event.is_set():
+
+ # Can't read image while it is locked
+ while self.__video_buffer_lock.locked():
+
+ # Check 10 times per frame
+ time.sleep(1 / (10 * self.__video_fps))
+
+ # Lock buffer access
+ self.__video_buffer_lock.acquire()
+
+ # Video buffer not empty
+ if len(self.__video_buffer) > 0:
+
+ logging.debug('> %i images in buffer', len(self.__video_buffer))
+
+ # Get last stored image
+ try:
+
+ timestamp, image = self.__video_buffer.popitem(last=True)
+
+ logging.debug('> read image at %i timestamp', timestamp)
+
+ if len(self.__video_buffer) > 0:
+
+ logging.warning('skipping %i image', len(self.__video_buffer))
+
+ # Clear buffer
+ self.__video_buffer = collections.OrderedDict()
+
+ # Process camera image
+ self._process_camera_image(
+ timestamp = timestamp,
+ image = image)
+
+ except Exception as e:
+
+ logging.warning('%s.__video_buffer_read: %s', type(self).__name__, e)
+
+ # Unlock buffer access
+ self.__video_buffer_lock.release()
+
+ def __keep_alive(self):
+ """Maintain network connection."""
+
+ logging.debug('%s.__keep_alive', type(self).__name__)
+
+ while not self.__stop_event.is_set():
+
+ self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
+ self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
+
+ time.sleep(1)
+
+ def __get_request(self, api_action) -> str:
+ """Send a GET request and get data back."""
+
+ url = self.__base_url + api_action
+
+ logging.debug('%s.__get_request %s', type(self).__name__, url)
+
+ res = urlopen(url).read()
+
+ try:
+
+ data = json.loads(res.decode('utf-8'))
+
+ except json.JSONDecodeError:
+
+ data = None
+
+ logging.debug('%s.__get_request received %s', type(self).__name__, data)
+
+ return data
+
+ def __post_request(self, api_action, data = None, wait_for_response = True) -> str:
+ """Send a POST request and get result back."""
+
+ url = self.__base_url + api_action
+
+ logging.debug('%s.__post_request %s', type(self).__name__, url)
+
+ req = Request(url)
+ req.add_header('Content-Type', 'application/json')
+ data = json.dumps(data)
+
+ if wait_for_response is False:
+
+ threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start()
+
+ return None
+
+ response = urlopen(req, data.encode('utf-8'))
+ res = response.read()
+
+ try:
+
+ res = json.loads(res.decode('utf-8'))
+
+ except:
+
+ pass
+
+ return res
+
+ def __wait_for_status(self, api_action, key, values, timeout = None) -> any:
+ """Wait until a status matches given values."""
+
+ url = self.__base_url + api_action
+ running = True
+
+ while running:
+
+ req = Request(url)
+ req.add_header('Content-Type', 'application/json')
+
+ try:
+
+ response = urlopen(req, None, timeout = timeout)
+
+ except URLError as e:
+
+ logging.error(e.reason)
+ return -1
+
+ data = response.read()
+ json_data = json.loads(data.decode('utf-8'))
+
+ if json_data[key] in values:
+ running = False
+
+ time.sleep(1)
+
+ return json_data[key]
+
+ def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT):
+
+ return datetime.datetime.now().replace(microsecond=0).strftime(timeformat)
+
+ # CALIBRATION
+
+ def calibration_start(self, project_name, participant_name):
+ """Start calibration process for project and participant."""
+
+ project_id = self.__get_project_id(project_name)
+ participant_id = self.get_participant_id(participant_name)
+
+ # Init calibration id
+ self.__calibration_id = None
+
+ # Calibration have to be done for a project and a participant
+ if project_id is None or participant_id is None:
+
+ raise Exception(f'Setup project and participant before')
+
+ data = {
+ 'ca_project': project_id,
+ 'ca_type': 'default',
+ 'ca_participant': participant_id,
+ 'ca_created': self.__get_current_datetime()
+ }
+
+ # Request calibration
+ json_data = self.__post_request('/api/calibrations', data)
+ self.__calibration_id = json_data['ca_id']
+
+ # Start calibration
+ self.__post_request('/api/calibrations/' + self.__calibration_id + '/start')
+
+ def calibration_status(self) -> str:
+ """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed."""
+
+ if self.__calibration_id is not None:
+
+ status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
+
+ # Forget calibration id
+ if status != 'calibrating':
+
+ self.__calibration_id = None
+
+ return status
+
+ else:
+
+ raise Exception(f'Start calibration before')
+
+ def calibrate(self, project_name, participant_name):
+ """Handle whole Tobii glasses calibration process."""
+
+ # Start calibration
+ self.calibration_start(project_name, participant_name)
+
+ # While calibrating...
+ status = self.calibration_status()
+
+ while status == 'calibrating':
+
+ time.sleep(1)
+ status = self.calibration_status()
+
+ if status == 'uncalibrated' or status == 'stale' or status == 'failed':
+
+ raise Exception(f'Calibration {status}')
+
+ # CALIBRATION
+
+ def calibration_start(self, project_name, participant_name):
+ """Start calibration process for project and participant."""
+
+ project_id = self.__get_project_id(project_name)
+ participant_id = self.get_participant_id(participant_name)
+
+ # Init calibration id
+ self.__calibration_id = None
+
+ # Calibration have to be done for a project and a participant
+ if project_id is None or participant_id is None:
+
+ raise Exception(f'Setup project and participant before')
+
+ data = {
+ 'ca_project': project_id,
+ 'ca_type': 'default',
+ 'ca_participant': participant_id,
+ 'ca_created': self.__get_current_datetime()
+ }
+
+ # Request calibration
+ json_data = self.__post_request('/api/calibrations', data)
+ self.__calibration_id = json_data['ca_id']
+
+ # Start calibration
+ self.__post_request('/api/calibrations/' + self.__calibration_id + '/start')
+
+ def calibration_status(self) -> str:
+ """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed."""
+
+ if self.__calibration_id is not None:
+
+ status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
+
+ # Forget calibration id
+ if status != 'calibrating':
+
+ self.__calibration_id = None
+
+ return status
+
+ else:
+
+ raise Exception(f'Start calibration before')
+
+ def calibrate(self, project_name, participant_name):
+ """Handle whole Tobii glasses calibration process."""
+
+ # Start calibration
+ self.calibration_start(project_name, participant_name)
+
+ # While calibrating...
+ status = self.calibration_status()
+
+ while status == 'calibrating':
+
+ time.sleep(1)
+ status = self.calibration_status()
+
+ if status == 'uncalibrated' or status == 'stale' or status == 'failed':
+
+ raise Exception(f'Calibration {status}')
+
+ # RECORDING FEATURES
+
+ def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']):
+ return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array)
+
+ def create_recording(self, participant_name, recording_name = '', recording_notes = '') -> str:
+ """Create a new recording.
+
+ Returns:
+ recording id
+ """
+
+ participant_id = self.get_participant_id(participant_name)
+
+ if participant_id is None:
+ raise NameError(f'{participant_name} participant doesn\'t exist')
+
+ data = {
+ 'rec_participant': participant_id,
+ 'rec_info': {
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)),
+ 'Name': recording_name,
+ 'Notes': recording_notes
+ },
+ 'rec_created': self.__get_current_datetime()
+ }
+
+ json_data = self.__post_request('/api/recordings', data)
+
+ return json_data['rec_id']
+
+ def start_recording(self, recording_id) -> bool:
+ """Start recording on the Tobii interface's SD Card."""
+
+ self.__post_request('/api/recordings/' + recording_id + '/start')
+ return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording'
+
+ def stop_recording(self, recording_id) -> bool:
+ """Stop recording on the Tobii interface's SD Card."""
+
+ self.__post_request('/api/recordings/' + recording_id + '/stop')
+ return self.__wait_for_recording_status(recording_id, ['done']) == "done"
+
+ def pause_recording(self, recording_id) -> bool:
+ """Pause recording on the Tobii interface's SD Card."""
+
+ self.__post_request('/api/recordings/' + recording_id + '/pause')
+ return self.__wait_for_recording_status(recording_id, ['paused']) == "paused"
+
+ def __get_recording_status(self):
+ return self.get_status()['sys_recording']
+
+ def get_current_recording_id(self) -> str:
+ """Get current recording id."""
+
+ return self.__get_recording_status()['rec_id']
+
+ @property
+ def recording(self) -> bool:
+ """Is it recording?"""
+
+ rec_status = self.__get_recording_status()
+
+ if rec_status != {}:
+ if rec_status['rec_state'] == "recording":
+ return True
+
+ return False
+
+ def get_recordings(self) -> str:
+ """Get all recordings id."""
+
+ return self.__get_request('/api/recordings')
+
+ # EVENTS AND EXPERIMENTAL VARIABLES
+
+ def __post_recording_data(self, event_type: str, event_tag = ''):
+ data = {'type': event_type, 'tag': event_tag}
+ self.__post_request('/api/events', data, wait_for_response=False)
+
+ def send_event(self, event_type: str, event_value = None):
+ self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value))
+
+ def send_variable(self, variable_name: str, variable_value = None):
+ self.__post_recording_data(str(variable_name), str(variable_value))
+
+ # MISC
+
+ def eject_sd(self):
+ self.__get_request('/api/eject')
+
+ def get_battery_info(self):
+ return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) )
+
+ def get_battery_level(self):
+ return self.get_battery_status()['level']
+
+ def get_battery_remaining_time(self):
+ return self.get_battery_status()['remaining_time']
+
+ def get_battery_status(self):
+ return self.get_status()['sys_battery']
+
+ def get_et_freq(self):
+ return self.get_configuration()['sys_et_freq']
+
+ def get_et_frequencies(self):
+ return self.get_status()['sys_et']['frequencies']
+
+ def identify(self):
+ self.__get_request('/api/identify')
+
+ def get_configuration(self):
+ return self.__get_request('/api/system/conf')
+
+ def get_status(self):
+ return self.__get_request('/api/system/status')
+
+ def get_storage_info(self):
+ return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) )
+
+ def get_storage_remaining_time(self):
+ return self.get_storage_status()['remaining_time']
+
+ def get_storage_status(self):
+ return self.get_status()['sys_storage']
+
+ def get_scene_camera_freq(self):
+ return self.get_configuration()['sys_sc_fps']
+
+ def set_et_freq_50(self):
+ data = {'sys_et_freq': 50}
+ json_data = self.__post_request('/api/system/conf', data)
+
+ def set_et_freq_100(self):
+ # May not be available. Check get_et_frequencies() first.
+ data = {'sys_et_freq': 100}
+ json_data = self.__post_request('/api/system/conf', data)
+
+ def set_eye_camera_indoor_preset(self) -> str:
+ data = {'sys_ec_preset': 'Indoor'}
+ return self.__post_request('/api/system/conf', data)
+
+ def set_eye_camera_outdoor_preset(self) -> str:
+ data = {'sys_ec_preset': 'ClearWeather'}
+ return self.__post_request('/api/system/conf', data)
+
+ def set_scene_camera_auto_preset(self):
+ data = {'sys_sc_preset': 'Auto'}
+ json_data = self.__post_request('/api/system/conf', data)
+
+ def set_scene_camera_gaze_preset(self):
+ data = {'sys_sc_preset': 'GazeBasedExposure'}
+ json_data = self.__post_request('/api/system/conf', data)
+
+ def set_scene_camera_freq_25(self):
+ data = {'sys_sc_fps': 25}
+ json_data = self.__post_request('/api/system/conf/', data)
+
+ def set_scene_camera_freq_50(self):
+ data = {'sys_sc_fps': 50}
+ json_data = self.__post_request('/api/system/conf/', data)