diff options
Diffstat (limited to 'src/argaze/utils/contexts/TobiiProGlasses2.py')
-rw-r--r-- | src/argaze/utils/contexts/TobiiProGlasses2.py | 1162 |
1 files changed, 1162 insertions, 0 deletions
diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py new file mode 100644 index 0000000..8b92fef --- /dev/null +++ b/src/argaze/utils/contexts/TobiiProGlasses2.py @@ -0,0 +1,1162 @@ +""" Handle network connection to Tobii Pro Glasses 2 device. + It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py). + +This program is free software: you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation, either version 3 of the License, or (at your option) any later +version. +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +You should have received a copy of the GNU General Public License along with +this program. If not, see <http://www.gnu.org/licenses/>. +""" + +__author__ = "Théo de la Hogue" +__credits__ = [] +__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" +__license__ = "GPLv3" + +import sys +import logging +import socket +import threading +import collections +import json +import time +import datetime +import uuid +from dataclasses import dataclass + +try: + from urllib.parse import urlparse, urlencode + from urllib.request import urlopen, Request + from urllib.error import URLError, HTTPError + +except ImportError: + from urlparse import urlparse + from urllib import urlencode + from urllib2 import urlopen, Request, HTTPError, URLError + +from argaze import ArFeatures, DataFeatures, GazeFeatures +from argaze.utils import UtilsFeatures + +import numpy +import cv2 +import av + +socket.IPPROTO_IPV6 = 41 + +TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f' +TOBII_DATETIME_FORMAT_HUMREAD = '%d/%m/%Y %H:%M:%S' + +DEFAULT_PROJECT_NAME = 'DefaultProject' +DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant' +DEFAULT_RECORD_NAME = 'DefaultRecord' + +# Define default Tobii image_parameters values +DEFAULT_TOBII_IMAGE_PARAMETERS = { + "draw_something": False +} + +# Define extra classes to support Tobii data parsing +@dataclass +class DirSig(): + """Define dir sig data (dir sig).""" + + dir: int # meaning ? + sig: int # meaning ? + +@dataclass +class PresentationTimeStamp(): + """Define presentation time stamp (pts) data.""" + + value: int + """Pts value.""" + +@dataclass +class VideoTimeStamp(): + """Define video time stamp (vts) data.""" + + value: int + """Vts value.""" + + offset: int + """Primary time stamp value.""" + +@dataclass +class EventSynch(): + """Define event synch (evts) data.""" + + value: int # meaning ? + """Evts value.""" + +@dataclass +class Event(): + """Define event data (ets type tag).""" + + ets: int # meaning ? + type: str + tag: str # dict ? + +@dataclass +class Accelerometer(): + """Define accelerometer data (ac).""" + + value: numpy.array + """Accelerometer value""" + +@dataclass +class Gyroscope(): + """Define gyroscope data (gy).""" + + value: numpy.array + """Gyroscope value""" + +@dataclass +class PupillCenter(): + """Define pupill center data (gidx pc eye).""" + + validity: int + index: int + value: tuple((float, float, float)) + eye: str # 'right' or 'left' + +@dataclass +class PupillDiameter(): + """Define pupill diameter data (gidx pd eye).""" + + validity: int + index: int + value: float + eye: str # 'right' or 'left' + +@dataclass +class GazeDirection(): + """Define gaze direction data (gidx gd eye).""" + + validity: int + index: int + value: tuple((float, float, float)) + eye: str # 'right' or 'left' + +@dataclass +class GazePosition(): + """Define gaze position data (gidx l gp).""" + + validity: int + index: int + l: str # ? + value: tuple((float, float)) + +@dataclass +class GazePosition3D(): + """Define gaze position 3D data (gidx gp3).""" + + validity: int + index: int + value: tuple((float, float)) + +@dataclass +class MarkerPosition(): + """Define marker data (marker3d marker2d).""" + + value_3d: tuple((float, float, float)) + value_2d: tuple((float, float)) + +class TobiiJsonDataParser(): + + def __init__(self): + + self.__parse_data_map = { + 'dir': self.__parse_dir_sig, + 'pts': self.__parse_pts, + 'vts': self.__parse_vts, + 'evts': self.__parse_event_synch, + 'ets': self.__parse_event, + 'ac': self.__parse_accelerometer, + 'gy': self.__parse_gyroscope, + 'gidx': self.__parse_pupill_or_gaze, + 'marker3d': self.__parse_marker_position + } + + self.__parse_pupill_or_gaze_map = { + 'pc': self.__parse_pupill_center, + 'pd': self.__parse_pupill_diameter, + 'gd': self.__parse_gaze_direction, + 'l': self.__parse_gaze_position, + 'gp3': self.__parse_gaze_position_3d + } + + def parse(self, data): + + json_data = json.loads(data.decode('utf-8')) + + # Parse data status + status = json_data.pop('s', -1) + + # Parse timestamp + data_ts = json_data.pop('ts') + + # Parse data depending first json key + first_key = next(iter(json_data)) + + # Convert json data into data object + data_object = self.__parse_data_map[first_key](status, json_data) + data_object_type = type(data_object).__name__ + + return data_ts, data_object, data_object_type + + def __parse_pupill_or_gaze(self, status, json_data): + + gaze_index = json_data.pop('gidx') + + # parse pupill or gaze data depending second json key + second_key = next(iter(json_data)) + + return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, json_data) + + def __parse_dir_sig(self, status, json_data): + + return DirSig(json_data['dir'], json_data['sig']) + + def __parse_pts(self, status, json_data): + + return PresentationTimeStamp(json_data['pts']) + + def __parse_vts(self, status, json_data): + + # ts is not sent when recording + try: + + ts = json_data['ts'] + + except KeyError: + + ts = -1 + + return VideoTimeStamp(json_data['vts'], ts) + + def __parse_event_synch(self, status, json_data): + + return EventSynch(json_data['evts']) + + def __parse_event(self, status, json_data): + + return Event(json_data['ets'], json_data['type'], json_data['tag']) + + def __parse_accelerometer(self, status, json_data): + + return Accelerometer(json_data['ac']) + + def __parse_gyroscope(self, status, json_data): + + return Gyroscope(json_data['gy']) + + def __parse_pupill_center(self, status, gaze_index, json_data): + + return PupillCenter(status, gaze_index, json_data['pc'], json_data['eye']) + + def __parse_pupill_diameter(self, status, gaze_index, json_data): + + return PupillDiameter(status, gaze_index, json_data['pd'], json_data['eye']) + + def __parse_gaze_direction(self, status, gaze_index, json_data): + + return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye']) + + def __parse_gaze_position(self, status, gaze_index, json_data): + + return GazePosition(status, gaze_index, json_data['l'], json_data['gp']) + + def __parse_gaze_position_3d(self, status, gaze_index, json_data): + + return GazePosition3D(status, gaze_index, json_data['gp3']) + + def __parse_marker_position(self, status, json_data): + + return MarkerPosition(json_data['marker3d'], json_data['marker2d']) + +class LiveStream(ArFeatures.ArContext): + + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + # Init parent classes + super().__init__() + + # Init private attributes + self.__address = None + self.__udpport = 49152 + + self.__project_name = None + self.__project_id = None + + self.__participant_name = None + self.__participant_id = None + + self.__configuration = {} + + self.__parser = TobiiJsonDataParser() + + self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS} + + @property + def address(self) -> str: + """Network address where to find the device.""" + return self.__address + + @address.setter + def address(self, address:str): + + self.__address = address + + # Remove part after % on under Windows + if "%" in self.__address: + + if sys.platform == "win32": + + self.__address = self.__address.split("%")[0] + + # Define base url + if ':' in self.__address: + + self.__base_url = f'http://[{self.__address}]' + + else: + + self.__base_url = 'http://' + self.__address + + @property + def configuration(self)-> dict: + """Patch system configuration dictionary.""" + return self.__configuration + + @configuration.setter + @DataFeatures.PipelineStepAttributeSetter + def configuration(self, configuration: dict): + + self.__configuration = configuration + + @property + def project(self) -> str: + """Project name.""" + return self.__project_name + + @project.setter + def project(self, project:str): + + self.__project_name = project + + def __bind_project(self): + """Bind to a project or create one if it doesn't exist.""" + + if self.__project_name is None: + + raise Exception(f'Project binding fails: setup project before.') + + self.__project_id = None + + # Check if project exist + projects = self.__get_request('/api/projects') + + for project in projects: + + try: + + if project['pr_info']['Name'] == self.__project_name: + + self.__project_id = project['pr_id'] + + logging.debug('> %s project already exist: %s', self.__project_name, self.__project_id) + + except: + + pass + + # The project doesn't exist, create one + if self.__project_id is None: + + logging.debug('> %s project doesn\'t exist', self.__project_name) + + data = { + 'pr_info' : { + 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD), + 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)), + 'Name': self.__project_name + }, + 'pr_created': self.__get_current_datetime() + } + + json_data = self.__post_request('/api/projects', data) + + self.__project_id = json_data['pr_id'] + + logging.debug('> new %s project created: %s', self.__project_name, self.__project_id) + + @property + def participant(self)-> str: + """Participant name""" + return self.__participant_name + + @participant.setter + def participant(self, participant:str): + + self.__participant_name = participant + + def __bind_participant(self): + """Bind to a participant or create one if it doesn't exist. + + !!! warning + Bind to a project before. + """ + + if self.__participant_name is None: + + raise Exception(f'Participant binding fails: setup participant before.') + + if self.__project_id is None : + + raise Exception(f'Participant binding fails: bind to a project before') + + self.__participant_id = None + + # Check if participant exist + participants = self.__get_request('/api/participants') + + for participant in participants: + + try: + + if participant['pa_info']['Name'] == self.__participant_name: + + self.__participant_id = participant['pa_id'] + + logging.debug('> %s participant already exist: %s', self.__participant_name, self.__participant_id) + + except: + + pass + + # The participant doesn't exist, create one + if self.__participant_id is None: + + logging.debug('> %s participant doesn\'t exist', self.__participant_name) + + data = { + 'pa_project': self.__project_id, + 'pa_info': { + 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__participant_name)), + 'Name': self.__participant_name, + 'Notes': '' # TODO: set participant notes + }, + 'pa_created': self.__get_current_datetime() + } + + json_data = self.__post_request('/api/participants', data) + + self.__participant_id = json_data['pa_id'] + + logging.debug('> new %s participant created: %s', self.__participant_name, self.__participant_id) + + @DataFeatures.PipelineStepEnter + def __enter__(self): + + logging.info('Tobii Pro Glasses 2 connexion starts...') + + # Update current configuration with configuration patch + logging.debug('> updating configuration') + + # Update current configuration with configuration patch + if self.__configuration: + + logging.debug('> updating configuration') + configuration = self.__post_request('/api/system/conf', self.__configuration) + + # Read current configuration + else: + + logging.debug('> reading configuration') + configuration = self.__get_request('/api/system/conf') + + # Log current configuration + logging.info('Tobii Pro Glasses 2 configuration:') + + for key, value in configuration.items(): + + logging.info('%s: %s', key, str(value)) + + # Store video stream info + self.__video_width = configuration['sys_sc_width'] + self.__video_height = configuration['sys_sc_height'] + self.__video_fps = configuration['sys_sc_fps'] + + # Bind to project if required + if self.__project_name is not None: + + logging.debug('> binding project %s', self.__project_name) + + self.__bind_project() + + logging.info('Tobii Pro Glasses 2 project id: %s', self.__project_id) + + # Bind to participant if required + if self.__participant_name is not None: + + logging.debug('> binding participant %s', self.__participant_name) + + self.__bind_participant() + + logging.info('Tobii Pro Glasses 2 participant id: %s', self.__participant_id) + + # Create stop event + self.__stop_event = threading.Event() + + # Open data stream + self.__data_socket = self.__make_socket() + self.__data_thread = threading.Thread(target = self.__stream_data) + + logging.debug('> starting data thread...') + self.__data_thread.start() + + # Open video stream + self.__video_socket = self.__make_socket() + self.__video_thread = threading.Thread(target = self.__stream_video) + + logging.debug('> starting video thread...') + self.__video_thread.start() + + # Keep connection alive + self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}" + self.__keep_alive_thread = threading.Thread(target = self.__keep_alive) + + logging.debug('> starting keep alive thread...') + self.__keep_alive_thread.start() + + return self + + @DataFeatures.PipelineStepExit + def __exit__(self, exception_type, exception_value, exception_traceback): + + logging.debug('%s.__exit__', type(self).__name__) + + # Close data stream + self.__stop_event.set() + + # Stop keeping connection alive + threading.Thread.join(self.__keep_alive_thread) + + # Stop data streaming + threading.Thread.join(self.__data_thread) + + # Stop video buffer reading + threading.Thread.join(self.__video_buffer_read_thread) + + # Stop video streaming + threading.Thread.join(self.__video_thread) + + def __image(self, draw_something: bool, **kwargs: dict) -> numpy.array: + """Get Tobbi visualisation. + + Parameters: + kwargs: ArContext.image parameters + """ + + # Get context image + image = super().image(**kwargs) + + if draw_something: + + cv2.putText(image, 'SOMETHING', (512, 512), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + + return image + + def image(self, **kwargs: dict) -> numpy.array: + """ + Get Tobbi visualisation. + + Parameters: + kwargs: LiveStream.__image parameters + """ + + # Use image_parameters attribute if no kwargs + if kwargs: + + return self.__image(**kwargs) + + return self.__image(**self._image_parameters) + + def __make_socket(self): + """Create a socket to enable network communication.""" + + iptype = socket.AF_INET + + if ':' in self.__address: + + iptype = socket.AF_INET6 + + res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE) + family, socktype, proto, canonname, sockaddr = res[0] + new_socket = socket.socket(family, socktype, proto) + + new_socket.settimeout(5.0) + + try: + + if iptype == socket.AF_INET6: + + new_socket.setsockopt(socket.SOL_SOCKET, 25, 1) + + except socket.error as e: + + if e.errno == 1: + + logging.error('Binding to a network interface is permitted only for root users.') + + return new_socket + + def __stream_data(self): + """Stream data from dedicated socket.""" + + logging.debug('%s.__stream_data', type(self).__name__) + + # First timestamp to offset all timestamps + first_ts = 0 + + while not self.__stop_event.is_set(): + + try: + + data, _ = self.__data_socket.recvfrom(1024) + + except TimeoutError: + + logging.error('> timeout occurred while receiving data') + continue + + if data is not None: + + # Parse json into timestamped data object + data_ts, data_object, data_object_type = self.__parser.parse(data) + + # Store first timestamp + if first_ts == 0: + + first_ts = data_ts + + # Edit millisecond timestamp + timestamp = int((data_ts - first_ts) * 1e-3) + + match data_object_type: + + case 'GazePosition': + + logging.debug('> received %s at %i timestamp', data_object_type, timestamp) + + # When gaze position is valid + if data_object.validity == 0: + + # Process timestamped gaze position + self._process_gaze_position( + timestamp = timestamp, + x = int(data_object.value[0] * self.__video_width), + y = int(data_object.value[1] * self.__video_height) ) + + else: + + # Process empty gaze position + self._process_gaze_position(timestamp = timestamp) + + def __stream_video(self): + """Stream video from dedicated socket.""" + + logging.debug('%s.__stream_video', type(self).__name__) + + # Open video stream + container = av.open(f'rtsp://{self.__address}:8554/live/scene', options={'rtsp_transport': 'tcp'}) + self.__stream = container.streams.video[0] + + # Create a video buffer with a lock + self.__video_buffer = collections.OrderedDict() + self.__video_buffer_lock = threading.Lock() + + # Open video buffer reader + self.__video_buffer_read_thread = threading.Thread(target = self.__video_buffer_read) + + logging.debug('> starting video buffer reader thread...') + self.__video_buffer_read_thread.start() + + # First timestamp to offset all timestamps + first_ts = 0 + + for image in container.decode(self.__stream): + + logging.debug('> new image decoded') + + # Quit if the video acquisition thread have been stopped + if self.__stop_event.is_set(): + + logging.debug('> stop event is set') + break + + if image is not None: + + if image.time is not None: + + # Store first timestamp + if first_ts == 0: + + first_ts = image.time + + # Edit millisecond timestamp + timestamp = int((image.time - first_ts) * 1e3) + + logging.debug('> store image at %i timestamp', timestamp) + + # Lock buffer access + self.__video_buffer_lock.acquire() + + # Decode image and store it at time index + self.__video_buffer[timestamp] = image.to_ndarray(format='bgr24') + + # Unlock buffer access + self.__video_buffer_lock.release() + + def __video_buffer_read(self): + """Read incoming buffered video images.""" + + logging.debug('%s.__video_buffer_read', type(self).__name__) + + while not self.__stop_event.is_set(): + + # Can't read image while it is locked + while self.__video_buffer_lock.locked(): + + # Check 10 times per frame + time.sleep(1 / (10 * self.__video_fps)) + + # Lock buffer access + self.__video_buffer_lock.acquire() + + # Video buffer not empty + if len(self.__video_buffer) > 0: + + logging.debug('> %i images in buffer', len(self.__video_buffer)) + + # Get last stored image + try: + + timestamp, image = self.__video_buffer.popitem(last=True) + + logging.debug('> read image at %i timestamp', timestamp) + + if len(self.__video_buffer) > 0: + + logging.warning('skipping %i image', len(self.__video_buffer)) + + # Clear buffer + self.__video_buffer = collections.OrderedDict() + + # Process camera image + self._process_camera_image( + timestamp = timestamp, + image = image) + + except Exception as e: + + logging.warning('%s.__video_buffer_read: %s', type(self).__name__, e) + + # Unlock buffer access + self.__video_buffer_lock.release() + + def __keep_alive(self): + """Maintain network connection.""" + + logging.debug('%s.__keep_alive', type(self).__name__) + + while not self.__stop_event.is_set(): + + self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport)) + self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport)) + + time.sleep(1) + + def __get_request(self, api_action) -> str: + """Send a GET request and get data back.""" + + url = self.__base_url + api_action + + logging.debug('%s.__get_request %s', type(self).__name__, url) + + res = urlopen(url).read() + + try: + + data = json.loads(res.decode('utf-8')) + + except json.JSONDecodeError: + + data = None + + logging.debug('%s.__get_request received %s', type(self).__name__, data) + + return data + + def __post_request(self, api_action, data = None, wait_for_response = True) -> str: + """Send a POST request and get result back.""" + + url = self.__base_url + api_action + + logging.debug('%s.__post_request %s', type(self).__name__, url) + + req = Request(url) + req.add_header('Content-Type', 'application/json') + data = json.dumps(data) + + if wait_for_response is False: + + threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start() + + return None + + response = urlopen(req, data.encode('utf-8')) + res = response.read() + + try: + + res = json.loads(res.decode('utf-8')) + + except: + + pass + + return res + + def __wait_for_status(self, api_action, key, values, timeout = None) -> any: + """Wait until a status matches given values.""" + + url = self.__base_url + api_action + running = True + + while running: + + req = Request(url) + req.add_header('Content-Type', 'application/json') + + try: + + response = urlopen(req, None, timeout = timeout) + + except URLError as e: + + logging.error(e.reason) + return -1 + + data = response.read() + json_data = json.loads(data.decode('utf-8')) + + if json_data[key] in values: + running = False + + time.sleep(1) + + return json_data[key] + + def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT): + + return datetime.datetime.now().replace(microsecond=0).strftime(timeformat) + + # CALIBRATION + + def calibration_start(self, project_name, participant_name): + """Start calibration process for project and participant.""" + + project_id = self.__get_project_id(project_name) + participant_id = self.get_participant_id(participant_name) + + # Init calibration id + self.__calibration_id = None + + # Calibration have to be done for a project and a participant + if project_id is None or participant_id is None: + + raise Exception(f'Setup project and participant before') + + data = { + 'ca_project': project_id, + 'ca_type': 'default', + 'ca_participant': participant_id, + 'ca_created': self.__get_current_datetime() + } + + # Request calibration + json_data = self.__post_request('/api/calibrations', data) + self.__calibration_id = json_data['ca_id'] + + # Start calibration + self.__post_request('/api/calibrations/' + self.__calibration_id + '/start') + + def calibration_status(self) -> str: + """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed.""" + + if self.__calibration_id is not None: + + status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) + + # Forget calibration id + if status != 'calibrating': + + self.__calibration_id = None + + return status + + else: + + raise Exception(f'Start calibration before') + + def calibrate(self, project_name, participant_name): + """Handle whole Tobii glasses calibration process.""" + + # Start calibration + self.calibration_start(project_name, participant_name) + + # While calibrating... + status = self.calibration_status() + + while status == 'calibrating': + + time.sleep(1) + status = self.calibration_status() + + if status == 'uncalibrated' or status == 'stale' or status == 'failed': + + raise Exception(f'Calibration {status}') + + # CALIBRATION + + def calibration_start(self, project_name, participant_name): + """Start calibration process for project and participant.""" + + project_id = self.__get_project_id(project_name) + participant_id = self.get_participant_id(participant_name) + + # Init calibration id + self.__calibration_id = None + + # Calibration have to be done for a project and a participant + if project_id is None or participant_id is None: + + raise Exception(f'Setup project and participant before') + + data = { + 'ca_project': project_id, + 'ca_type': 'default', + 'ca_participant': participant_id, + 'ca_created': self.__get_current_datetime() + } + + # Request calibration + json_data = self.__post_request('/api/calibrations', data) + self.__calibration_id = json_data['ca_id'] + + # Start calibration + self.__post_request('/api/calibrations/' + self.__calibration_id + '/start') + + def calibration_status(self) -> str: + """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed.""" + + if self.__calibration_id is not None: + + status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) + + # Forget calibration id + if status != 'calibrating': + + self.__calibration_id = None + + return status + + else: + + raise Exception(f'Start calibration before') + + def calibrate(self, project_name, participant_name): + """Handle whole Tobii glasses calibration process.""" + + # Start calibration + self.calibration_start(project_name, participant_name) + + # While calibrating... + status = self.calibration_status() + + while status == 'calibrating': + + time.sleep(1) + status = self.calibration_status() + + if status == 'uncalibrated' or status == 'stale' or status == 'failed': + + raise Exception(f'Calibration {status}') + + # RECORDING FEATURES + + def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']): + return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array) + + def create_recording(self, participant_name, recording_name = '', recording_notes = '') -> str: + """Create a new recording. + + Returns: + recording id + """ + + participant_id = self.get_participant_id(participant_name) + + if participant_id is None: + raise NameError(f'{participant_name} participant doesn\'t exist') + + data = { + 'rec_participant': participant_id, + 'rec_info': { + 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)), + 'Name': recording_name, + 'Notes': recording_notes + }, + 'rec_created': self.__get_current_datetime() + } + + json_data = self.__post_request('/api/recordings', data) + + return json_data['rec_id'] + + def start_recording(self, recording_id) -> bool: + """Start recording on the Tobii interface's SD Card.""" + + self.__post_request('/api/recordings/' + recording_id + '/start') + return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording' + + def stop_recording(self, recording_id) -> bool: + """Stop recording on the Tobii interface's SD Card.""" + + self.__post_request('/api/recordings/' + recording_id + '/stop') + return self.__wait_for_recording_status(recording_id, ['done']) == "done" + + def pause_recording(self, recording_id) -> bool: + """Pause recording on the Tobii interface's SD Card.""" + + self.__post_request('/api/recordings/' + recording_id + '/pause') + return self.__wait_for_recording_status(recording_id, ['paused']) == "paused" + + def __get_recording_status(self): + return self.get_status()['sys_recording'] + + def get_current_recording_id(self) -> str: + """Get current recording id.""" + + return self.__get_recording_status()['rec_id'] + + @property + def recording(self) -> bool: + """Is it recording?""" + + rec_status = self.__get_recording_status() + + if rec_status != {}: + if rec_status['rec_state'] == "recording": + return True + + return False + + def get_recordings(self) -> str: + """Get all recordings id.""" + + return self.__get_request('/api/recordings') + + # EVENTS AND EXPERIMENTAL VARIABLES + + def __post_recording_data(self, event_type: str, event_tag = ''): + data = {'type': event_type, 'tag': event_tag} + self.__post_request('/api/events', data, wait_for_response=False) + + def send_event(self, event_type: str, event_value = None): + self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value)) + + def send_variable(self, variable_name: str, variable_value = None): + self.__post_recording_data(str(variable_name), str(variable_value)) + + # MISC + + def eject_sd(self): + self.__get_request('/api/eject') + + def get_battery_info(self): + return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) ) + + def get_battery_level(self): + return self.get_battery_status()['level'] + + def get_battery_remaining_time(self): + return self.get_battery_status()['remaining_time'] + + def get_battery_status(self): + return self.get_status()['sys_battery'] + + def get_et_freq(self): + return self.get_configuration()['sys_et_freq'] + + def get_et_frequencies(self): + return self.get_status()['sys_et']['frequencies'] + + def identify(self): + self.__get_request('/api/identify') + + def get_configuration(self): + return self.__get_request('/api/system/conf') + + def get_status(self): + return self.__get_request('/api/system/status') + + def get_storage_info(self): + return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) ) + + def get_storage_remaining_time(self): + return self.get_storage_status()['remaining_time'] + + def get_storage_status(self): + return self.get_status()['sys_storage'] + + def get_scene_camera_freq(self): + return self.get_configuration()['sys_sc_fps'] + + def set_et_freq_50(self): + data = {'sys_et_freq': 50} + json_data = self.__post_request('/api/system/conf', data) + + def set_et_freq_100(self): + # May not be available. Check get_et_frequencies() first. + data = {'sys_et_freq': 100} + json_data = self.__post_request('/api/system/conf', data) + + def set_eye_camera_indoor_preset(self) -> str: + data = {'sys_ec_preset': 'Indoor'} + return self.__post_request('/api/system/conf', data) + + def set_eye_camera_outdoor_preset(self) -> str: + data = {'sys_ec_preset': 'ClearWeather'} + return self.__post_request('/api/system/conf', data) + + def set_scene_camera_auto_preset(self): + data = {'sys_sc_preset': 'Auto'} + json_data = self.__post_request('/api/system/conf', data) + + def set_scene_camera_gaze_preset(self): + data = {'sys_sc_preset': 'GazeBasedExposure'} + json_data = self.__post_request('/api/system/conf', data) + + def set_scene_camera_freq_25(self): + data = {'sys_sc_fps': 25} + json_data = self.__post_request('/api/system/conf/', data) + + def set_scene_camera_freq_50(self): + data = {'sys_sc_fps': 50} + json_data = self.__post_request('/api/system/conf/', data) |