From 18f52239ac7dfd7c879e72ca29b7765143ccb225 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 21 Mar 2024 08:49:26 +0100 Subject: Adding Providers folder to get a first TobiiProGlasses2 device. Adding util script to load configuration for worn device streaming. --- src/argaze/utils/Providers/TobiiProGlasses2.py | 917 +++++++++++++++++++++ src/argaze/utils/Providers/__init__.py | 4 + src/argaze/utils/__init__.py | 4 +- .../utils/aruco_camera_configuration_edit.py | 735 +++++++++++++++++ .../utils/demo_data/demo_aruco_markers_setup.json | 1 + src/argaze/utils/demo_data/provider_setup.json | 7 + src/argaze/utils/worn_device_stream.py | 110 +++ 7 files changed, 1776 insertions(+), 2 deletions(-) create mode 100644 src/argaze/utils/Providers/TobiiProGlasses2.py create mode 100644 src/argaze/utils/Providers/__init__.py create mode 100644 src/argaze/utils/aruco_camera_configuration_edit.py create mode 100644 src/argaze/utils/demo_data/provider_setup.json create mode 100644 src/argaze/utils/worn_device_stream.py (limited to 'src') diff --git a/src/argaze/utils/Providers/TobiiProGlasses2.py b/src/argaze/utils/Providers/TobiiProGlasses2.py new file mode 100644 index 0000000..c8cd81b --- /dev/null +++ b/src/argaze/utils/Providers/TobiiProGlasses2.py @@ -0,0 +1,917 @@ +""" Handle network connection to Tobii Pro Glasses 2 device. + It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py). + +This program is free software: you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation, either version 3 of the License, or (at your option) any later +version. +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +You should have received a copy of the GNU General Public License along with +this program. If not, see . +""" + +__author__ = "Théo de la Hogue" +__credits__ = [] +__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" +__license__ = "GPLv3" + +import sys +import socket +import threading +import json +import time +import datetime +import uuid +from dataclasses import dataclass + +try: + from urllib.parse import urlparse, urlencode + from urllib.request import urlopen, Request + from urllib.error import URLError, HTTPError + +except ImportError: + from urlparse import urlparse + from urllib import urlencode + from urllib2 import urlopen, Request, HTTPError, URLError + +from argaze import DataFeatures, GazeFeatures +from argaze.utils import UtilsFeatures + +import numpy + +socket.IPPROTO_IPV6 = 41 + +TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f' +TOBII_DATETIME_FORMAT_HUMREAD = '%d/%m/%Y %H:%M:%S' + +DEFAULT_PROJECT_NAME = 'DefaultProject' +DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant' +DEFAULT_RECORD_NAME = 'DefaultRecord' + +class Provider(DataFeatures.PipelineInputProvider): + + def __init__(self, address: str = None, project: str = None, participant: str = None, **kwargs): + + # DEBUG + print('TobiiProGlasses2.__init__') + + # Init parent classes + DataFeatures.PipelineInputProvider.__init__(self, **kwargs) + ''' + self.__project_name = project + self.__participant_name = participant + + self.__udpport = 49152 + self.__address = address + + # Remove part after % on under Windows + if "%" in self.__address: + + if sys.platform == "win32": + + self.__address = self.__address.split("%")[0] + + # Define base url + if ':' in self.__address: + + self.__base_url = f'http://[{self.__address}]' + + else: + + self.__base_url = 'http://' + self.__address + + # Create Tobii data parser + self.__parser = TobiiJsonDataParser() + ''' + def __enter__(self): + + # DEBUG + print('TobiiProGlasses2.__enter__') + ''' + # Bind to project or create one if required + if self.__project_name is not None: + + self.set_project(self.__project_name) + + # Bind to participant or create one if required + if self.__participant_name is not None: + + self.set_participant(self.__project_name, self.__participant_name) + + # TODO: Configure Tobii device as required + + # Setup camera at 25 fps to work on Full HD video stream + self.set_scene_camera_freq_25() + + # Setup eye tracking at 50 fps + self.set_et_freq_50() + + # Update video stream dimension + self.__video_width = self.get_configuration()['sys_sc_width'] + self.__video_height = self.get_configuration()['sys_sc_height'] + + # Open data stream + self.__data_socket = self.__make_socket() + self.__data_thread = threading.Thread(target = self.__run) + self.__data_thread.daemon = True + self.__data_thread.start() + + # Keep connection alive + self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}" + self.__keep_alive_thread = threading.Thread(target = self.__keep_alive) + self.__keep_alive_thread.daemon = True + self.__keep_alive_thread.start() + + # Create stop event + self.__stop_event = threading.Event() + ''' + + return self + + def __exit__(self): + + # DEBUG + print('TobiiProGlasses2.__exit__') + ''' + # Close data stream + self.__stop_event.set() + + threading.Thread.join(self.__data_thread) + self.__data_thread = None + ''' +''' + def __make_socket(self): + """Create a socket to enable network communication.""" + + iptype = socket.AF_INET + + if ':' in self.__address: + + iptype = socket.AF_INET6 + + res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE) + family, socktype, proto, canonname, sockaddr = res[0] + new_socket = socket.socket(family, socktype, proto) + + new_socket.settimeout(5.0) + + try: + + if iptype == socket.AF_INET6: + + new_socket.setsockopt(socket.SOL_SOCKET, 25, 1) + + except socket.error as e: + + if e.errno == 1: + + print("Binding to a network interface is permitted only for root users.") + + return new_socket + + def __grab_data(self): + """Grab data from dedicated socket.""" + + while not self.__stop_event.is_set(): + + try: + + data, _ = self.__data_socket.recvfrom(1024) + + except TimeoutError: + + print("A timeout occurred while receiving data") + + if data is not None: + + # Parse json into timestamped data object + data_ts, data_object, data_object_type = self.__parser.parse(json_data) + + # Edit millisecond timestamp + timestamp = int(data_ts * 1e-3) + + match data_object_type: + + case 'GazePosition': + + # When gaze position is valid + if data_object.validity == 0: + + # Edit timestamped gaze position + timestamped_gaze_position = GazeFeatures.GazePosition((int(data_object.value[0] * self.__video_width), int(data_object.value[1] * self.__video_height)), timestamp=timestamp) + + # Send timestamp and gaze position coordinates + + # DEBUG + print('TobiiProGlasses2.__grab_data', timestamped_gaze_position) + #self.gaze_position_callback(timestamped_gaze_position) + + else: + + # Edit empty gaze position + empty_gaze_position = GazeFeatures.GazePosition(timestamp=timestamp) + + # DEBUG + print('TobiiProGlasses2.__grab_data', empty_gaze_position) + #self.gaze_position_callback(empty_gaze_position) + + def __keep_alive(self): + """Maintain network connection.""" + + while True: + + self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport)) + # TODO: send message to video socket + time.sleep(1) + + def __post_request(self, api_action, data=None, wait_for_response=True) -> str: + """Send a POST request and get result back.""" + + url = self.__base_url + api_action + req = Request(url) + req.add_header('Content-Type', 'application/json') + data = json.dumps(data) + + if wait_for_response is False: + threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start() + return None + + response = urlopen(req, data.encode('utf-8')) + res = response.read() + + try: + res = json.loads(res.decode('utf-8')) + except: + pass + + return res + + def __wait_for_status(self, api_action, key, values, timeout = None) -> any: + """Wait until a status matches given values.""" + + url = self.__base_url + api_action + running = True + + while running: + + req = Request(url) + req.add_header('Content-Type', 'application/json') + + try: + + response = urlopen(req, None, timeout = timeout) + + except URLError as e: + + logging.error(e.reason) + return -1 + + data = response.read() + json_data = json.loads(data.decode('utf-8')) + + if json_data[key] in values: + running = False + + time.sleep(1) + + return json_data[key] + + def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT): + + return datetime.datetime.now().replace(microsecond=0).strftime(timeformat) + + # PROJECT FEATURES + + def set_project(self, project_name = DEFAULT_PROJECT_NAME) -> str: + """Bind to a project or create one if it doesn't exist. + + Returns: + project id + """ + + project_id = self.get_project_id(project_name) + + if project_id is None: + + data = { + 'pr_info' : { + 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD), + 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, project_name)), + 'Name': project_name + }, + 'pr_created': self.__get_current_datetime() + } + + json_data = self.__post_request('/api/projects', data) + + return json_data['pr_id'] + + else: + + return project_id + + def get_project_id(self, project_name) -> str: + """Get project id.""" + + project_id = None + projects = self.__get_request('/api/projects') + + for project in projects: + + try: + if project['pr_info']['Name'] == project_name: + project_id = project['pr_id'] + except: + pass + + return project_id + + def get_projects(self) -> str: + """Get all projects id.""" + + return self.__get_request('/api/projects') + + # PARTICIPANT FEATURES + + def set_participant(self, project_name, participant_name, participant_notes = '') -> str: + """Bind to a participant into a project or create one if it doesn't exist. + + Returns: + participant id + """ + project_id = self.get_project_id(project_name) + participant_id = self.get_participant_id(participant_name) + + # Participant creation is done into a project + if project_id is None : + + raise Exception(f'Participant creation fails: setup project before') + + if participant_id is None: + + data = { + 'pa_project': project_id, + 'pa_info': { + 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)), + 'Name': participant_name, + 'Notes': participant_notes + }, + 'pa_created': self.__get_current_datetime() + } + + json_data = self.__post_request('/api/participants', data) + + return json_data['pa_id'] + + else: + + return participant_id + + def get_participant_id(self, participant_name) -> str: + """Get participant id.""" + + participant_id = None + participants = self.__get_request('/api/participants') + + for participant in participants: + + try: + if participant['pa_info']['Name'] == participant_name: + participant_id = participant['pa_id'] + + except: + pass + + return participant_id + + def get_participants(self) -> str: + """Get all participants id.""" + + return self.__get_request('/api/participants') + + # CALIBRATION + + def calibration_start(self, project_name, participant_name): + """Start calibration process for project and participant.""" + + project_id = self.get_project_id(project_name) + participant_id = self.get_participant_id(participant_name) + + # Init calibration id + self.__calibration_id = None + + # Calibration have to be done for a project and a participant + if project_id is None or participant_id is None: + + raise Exception(f'Setup project and participant before') + + data = { + 'ca_project': project_id, + 'ca_type': 'default', + 'ca_participant': participant_id, + 'ca_created': self.__get_current_datetime() + } + + # Request calibration + json_data = self.__post_request('/api/calibrations', data) + self.__calibration_id = json_data['ca_id'] + + # Start calibration + self.__post_request('/api/calibrations/' + self.__calibration_id + '/start') + + def calibration_status(self) -> str: + """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed.""" + + if self.__calibration_id is not None: + + status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) + + # Forget calibration id + if status != 'calibrating': + + self.__calibration_id = None + + return status + + else: + + raise Exception(f'Start calibration before') + + def calibrate(self, project_name, participant_name): + """Handle whole Tobii glasses calibration process.""" + + # Start calibration + self.calibration_start(project_name, participant_name) + + # While calibrating... + status = self.calibration_status() + + while status == 'calibrating': + + time.sleep(1) + status = self.calibration_status() + + if status == 'uncalibrated' or status == 'stale' or status == 'failed': + + raise Exception(f'Calibration {status}') + + # CALIBRATION + + def calibration_start(self, project_name, participant_name): + """Start calibration process for project and participant.""" + + project_id = self.get_project_id(project_name) + participant_id = self.get_participant_id(participant_name) + + # Init calibration id + self.__calibration_id = None + + # Calibration have to be done for a project and a participant + if project_id is None or participant_id is None: + + raise Exception(f'Setup project and participant before') + + data = { + 'ca_project': project_id, + 'ca_type': 'default', + 'ca_participant': participant_id, + 'ca_created': self.__get_current_datetime() + } + + # Request calibration + json_data = super().post_request('/api/calibrations', data) + self.__calibration_id = json_data['ca_id'] + + # Start calibration + super().post_request('/api/calibrations/' + self.__calibration_id + '/start') + + def calibration_status(self) -> str: + """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed.""" + + if self.__calibration_id is not None: + + status = super().wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) + + # Forget calibration id + if status != 'calibrating': + + self.__calibration_id = None + + return status + + else: + + raise Exception(f'Start calibration before') + + def calibrate(self, project_name, participant_name): + """Handle whole Tobii glasses calibration process.""" + + # Start calibration + self.calibration_start(project_name, participant_name) + + # While calibrating... + status = self.calibration_status() + + while status == 'calibrating': + + time.sleep(1) + status = self.calibration_status() + + if status == 'uncalibrated' or status == 'stale' or status == 'failed': + + raise Exception(f'Calibration {status}') + + # RECORDING FEATURES + + def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']): + return super().wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array) + + def create_recording(self, participant_name, recording_name = '', recording_notes = '') -> str: + """Create a new recording. + + Returns: + recording id + """ + + participant_id = self.get_participant_id(participant_name) + + if participant_id is None: + raise NameError(f'{participant_name} participant doesn\'t exist') + + data = { + 'rec_participant': participant_id, + 'rec_info': { + 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)), + 'Name': recording_name, + 'Notes': recording_notes + }, + 'rec_created': self.__get_current_datetime() + } + + json_data = super().post_request('/api/recordings', data) + + return json_data['rec_id'] + + def start_recording(self, recording_id) -> bool: + """Start recording on the Tobii interface's SD Card.""" + + super().post_request('/api/recordings/' + recording_id + '/start') + return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording' + + def stop_recording(self, recording_id) -> bool: + """Stop recording on the Tobii interface's SD Card.""" + + super().post_request('/api/recordings/' + recording_id + '/stop') + return self.__wait_for_recording_status(recording_id, ['done']) == "done" + + def pause_recording(self, recording_id) -> bool: + """Pause recording on the Tobii interface's SD Card.""" + + super().post_request('/api/recordings/' + recording_id + '/pause') + return self.__wait_for_recording_status(recording_id, ['paused']) == "paused" + + def __get_recording_status(self): + return self.get_status()['sys_recording'] + + def get_current_recording_id(self) -> str: + """Get current recording id.""" + + return self.__get_recording_status()['rec_id'] + + @property + def recording(self) -> bool: + """Is it recording?""" + + rec_status = self.__get_recording_status() + + if rec_status != {}: + if rec_status['rec_state'] == "recording": + return True + + return False + + def get_recordings(self) -> str: + """Get all recordings id.""" + + return super().get_request('/api/recordings') + + # EVENTS AND EXPERIMENTAL VARIABLES + + def __post_recording_data(self, event_type: str, event_tag = ''): + data = {'type': event_type, 'tag': event_tag} + super().post_request('/api/events', data, wait_for_response=False) + + def send_event(self, event_type: str, event_value = None): + self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value)) + + def send_variable(self, variable_name: str, variable_value = None): + self.__post_recording_data(str(variable_name), str(variable_value)) + + # MISC + + def eject_sd(self): + super().get_request('/api/eject') + + def get_battery_info(self): + return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) ) + + def get_battery_level(self): + return self.get_battery_status()['level'] + + def get_battery_remaining_time(self): + return self.get_battery_status()['remaining_time'] + + def get_battery_status(self): + return self.get_status()['sys_battery'] + + def get_et_freq(self): + return self.get_configuration()['sys_et_freq'] + + def get_et_frequencies(self): + return self.get_status()['sys_et']['frequencies'] + + def identify(self): + super().get_request('/api/identify') + + def get_address(self): + return self.address + + def get_configuration(self): + return super().get_request('/api/system/conf') + + def get_status(self): + return super().get_request('/api/system/status') + + def get_storage_info(self): + return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) ) + + def get_storage_remaining_time(self): + return self.get_storage_status()['remaining_time'] + + def get_storage_status(self): + return self.get_status()['sys_storage'] + + def get_scene_camera_freq(self): + return self.get_configuration()['sys_sc_fps'] + + def set_et_freq_50(self): + data = {'sys_et_freq': 50} + json_data = super().post_request('/api/system/conf', data) + + def set_et_freq_100(self): + # May not be available. Check get_et_frequencies() first. + data = {'sys_et_freq': 100} + json_data = super().post_request('/api/system/conf', data) + + def set_eye_camera_indoor_preset(self) -> str: + data = {'sys_ec_preset': 'Indoor'} + return super().post_request('/api/system/conf', data) + + def set_eye_camera_outdoor_preset(self) -> str: + data = {'sys_ec_preset': 'ClearWeather'} + return super().post_request('/api/system/conf', data) + + def set_scene_camera_auto_preset(self): + data = {'sys_sc_preset': 'Auto'} + json_data = super().post_request('/api/system/conf', data) + + def set_scene_camera_gaze_preset(self): + data = {'sys_sc_preset': 'GazeBasedExposure'} + json_data = super().post_request('/api/system/conf', data) + + def set_scene_camera_freq_25(self): + data = {'sys_sc_fps': 25} + json_data = super().post_request('/api/system/conf/', data) + + def set_scene_camera_freq_50(self): + data = {'sys_sc_fps': 50} + json_data = super().post_request('/api/system/conf/', data) + +# Define extra classes to support Tobii data parsing +@dataclass +class DirSig(): + """Define dir sig data (dir sig).""" + + dir: int # meaning ? + sig: int # meaning ? + +@dataclass +class PresentationTimeStamp(): + """Define presentation time stamp (pts) data.""" + + value: int + """Pts value.""" + +@dataclass +class VideoTimeStamp(): + """Define video time stamp (vts) data.""" + + value: int + """Vts value.""" + + offset: int + """Primary time stamp value.""" + +@dataclass +class EventSynch(): + """Define event synch (evts) data.""" + + value: int # meaning ? + """Evts value.""" + +@dataclass +class Event(): + """Define event data (ets type tag).""" + + ets: int # meaning ? + type: str + tag: str # dict ? + +@dataclass +class Accelerometer(): + """Define accelerometer data (ac).""" + + value: numpy.array + """Accelerometer value""" + +@dataclass +class Gyroscope(): + """Define gyroscope data (gy).""" + + value: numpy.array + """Gyroscope value""" + +@dataclass +class PupillCenter(): + """Define pupill center data (gidx pc eye).""" + + validity: int + index: int + value: tuple((float, float, float)) + eye: str # 'right' or 'left' + +@dataclass +class PupillDiameter(): + """Define pupill diameter data (gidx pd eye).""" + + validity: int + index: int + value: float + eye: str # 'right' or 'left' + +@dataclass +class GazeDirection(): + """Define gaze direction data (gidx gd eye).""" + + validity: int + index: int + value: tuple((float, float, float)) + eye: str # 'right' or 'left' + +@dataclass +class GazePosition(): + """Define gaze position data (gidx l gp).""" + + validity: int + index: int + l: str # ? + value: tuple((float, float)) + +@dataclass +class GazePosition3D(): + """Define gaze position 3D data (gidx gp3).""" + + validity: int + index: int + value: tuple((float, float)) + +@dataclass +class MarkerPosition(): + """Define marker data (marker3d marker2d).""" + + value_3d: tuple((float, float, float)) + value_2d: tuple((float, float)) + +class TobiiJsonDataParser(): + + def __init__(self): + + self.__first_ts = 0 + + self.__parse_data_map = { + 'dir': self.__parse_dir_sig, + 'pts': self.__parse_pts, + 'vts': self.__parse_vts, + 'evts': self.__parse_event_synch, + 'ets': self.__parse_event, + 'ac': self.__parse_accelerometer, + 'gy': self.__parse_gyroscope, + 'gidx': self.__parse_pupill_or_gaze, + 'marker3d': self.__parse_marker_position + } + + self.__parse_pupill_or_gaze_map = { + 'pc': self.__parse_pupill_center, + 'pd': self.__parse_pupill_diameter, + 'gd': self.__parse_gaze_direction, + 'l': self.__parse_gaze_position, + 'gp3': self.__parse_gaze_position_3d + } + + def parse(self, data): + + json_data = json.loads(data.decode('utf-8')) + + # Parse data status + status = json_data.pop('s', -1) + + # Parse timestamp + data_ts = json_data.pop('ts') + + # Parse data depending first json key + first_key = next(iter(json_data)) + + # Convert json data into data object + data_object = self.__parse_data_map[first_key](status, json_data) + data_object_type = type(data_object).__name__ + + # Keep first timestamp to offset all timestamps + if self.__first_ts == 0: + self.__first_ts = data_ts + + data_ts -= self.__first_ts + + return data_ts, data_object, data_object_type + + def __parse_pupill_or_gaze(self, status, json_data): + + gaze_index = json_data.pop('gidx') + + # parse pupill or gaze data depending second json key + second_key = next(iter(json_data)) + + return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, json_data) + + def __parse_dir_sig(self, status, json_data): + + return DirSig(json_data['dir'], json_data['sig']) + + def __parse_pts(self, status, json_data): + + return PresentationTimeStamp(json_data['pts']) + + def __parse_vts(self, status, json_data): + + # ts is not sent when recording + try: + + ts = json_data['ts'] + + except KeyError: + + ts = -1 + + return VideoTimeStamp(json_data['vts'], ts) + + def __parse_event_synch(self, status, json_data): + + return EventSynch(json_data['evts']) + + def __parse_event(self, status, json_data): + + return Event(json_data['ets'], json_data['type'], json_data['tag']) + + def __parse_accelerometer(self, status, json_data): + + return Accelerometer(json_data['ac']) + + def __parse_gyroscope(self, status, json_data): + + return Gyroscope(json_data['gy']) + + def __parse_pupill_center(self, status, gaze_index, json_data): + + return PupillCenter(status, gaze_index, json_data['pc'], json_data['eye']) + + def __parse_pupill_diameter(self, status, gaze_index, json_data): + + return PupillDiameter(status, gaze_index, json_data['pd'], json_data['eye']) + + def __parse_gaze_direction(self, status, gaze_index, json_data): + + return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye']) + + def __parse_gaze_position(self, status, gaze_index, json_data): + + return GazePosition(status, gaze_index, json_data['l'], json_data['gp']) + + def __parse_gaze_position_3d(self, status, gaze_index, json_data): + + return GazePosition3D(status, gaze_index, json_data['gp3']) + + def __parse_marker_position(self, status, json_data): + + return MarkerPosition(json_data['marker3d'], json_data['marker2d']) +''' \ No newline at end of file diff --git a/src/argaze/utils/Providers/__init__.py b/src/argaze/utils/Providers/__init__.py new file mode 100644 index 0000000..f80a694 --- /dev/null +++ b/src/argaze/utils/Providers/__init__.py @@ -0,0 +1,4 @@ +""" +Collection of device interfaces. +""" +__all__ = ['tobii_pro_glasses_2'] \ No newline at end of file diff --git a/src/argaze/utils/__init__.py b/src/argaze/utils/__init__.py index 0303fbc..4b7b4db 100644 --- a/src/argaze/utils/__init__.py +++ b/src/argaze/utils/__init__.py @@ -1,4 +1,4 @@ """ -Collection of command-line high level features scripts. +Miscelleaneous utilities. """ -__all__ = ['UtilsFeatures'] \ No newline at end of file +__all__ = ['UtilsFeatures', 'providers'] \ No newline at end of file diff --git a/src/argaze/utils/aruco_camera_configuration_edit.py b/src/argaze/utils/aruco_camera_configuration_edit.py new file mode 100644 index 0000000..686f25e --- /dev/null +++ b/src/argaze/utils/aruco_camera_configuration_edit.py @@ -0,0 +1,735 @@ +#!/usr/bin/env python + +""" + +This program is free software: you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation, either version 3 of the License, or (at your option) any later +version. +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +You should have received a copy of the GNU General Public License along with +this program. If not, see . +""" + +__author__ = "Théo de la Hogue" +__credits__ = [] +__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" +__license__ = "GPLv3" + +import argparse +import time +import itertools + +from argaze import ArFeatures, GazeFeatures +from argaze.AreaOfInterest import AOIFeatures +from argaze.ArUcoMarkers import ArUcoCamera +from argaze.utils import UtilsFeatures + +from tobiiproglasses2 import * + +import cv2 +import numpy + +def main(): + """ + Load ArUco camera configuration from .json file, detect ArUco markers into movie images and estimate scene pose. + Edit configuration to improve pose estimation. + """ + + # Manage arguments + parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) + parser.add_argument('movie', metavar='MOVIE', type=str, default=None, help='movie path') + parser.add_argument('configuration', metavar='CONFIGURATION', type=str, help='argaze configuration filepath') + + parser.add_argument('-s','--start', metavar='START', type=float, default=0., help='start time in second') + parser.add_argument('-o', '--output', metavar='OUT', type=str, default='edited_configuration.json', help='edited configuration file path') + parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') + + args = parser.parse_args() + + # Load configuration + aruco_camera = ArUcoCamera.ArUcoCamera.from_json(args.configuration) + + if args.verbose: + + print(aruco_camera) + + # Select first AR scene + ar_scene = list(aruco_camera.scenes.values())[0] + + # Create a window to display AR environment + cv2.namedWindow(aruco_camera.name, cv2.WINDOW_AUTOSIZE) + + # Init mouse interaction + pointer = (0, 0) + left_click = (0, 0) + right_click = (0, 0) + right_drag = (0, 0) + right_button = False + edit_trans = False # translate + edit_z = False + snap = False + draw_help = False + draw_grid = False + draw_cover = False + pose_mode = 0 + z_grid = 100. + + # Update pointer position + def on_mouse_event(event, x, y, flags, param): + + nonlocal pointer + nonlocal left_click + nonlocal right_click + nonlocal right_drag + nonlocal right_button + + # Update pointer + pointer = (x, y) + + # Update left_click + if event == cv2.EVENT_LBUTTONUP: + + left_click = pointer + + # Udpate right_button + elif event == cv2.EVENT_RBUTTONDOWN and not right_button: + + right_button = True + right_click = pointer + + elif event == cv2.EVENT_RBUTTONUP and right_button: + + right_button = False + + # Udpate right_drag + if right_button: + + right_drag = (pointer[0] - right_click[0], pointer[1] - right_click[1]) + + # Attach mouse callback to window + cv2.setMouseCallback(aruco_camera.name, on_mouse_event) + + # Enable movie video capture + video_capture = cv2.VideoCapture(args.movie) + + video_fps = video_capture.get(cv2.CAP_PROP_FPS) + video_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)) + video_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + # Enable exit signal handler + exit = UtilsFeatures.ExitSignalHandler() + + # Init image selection + current_image_index = -1 + _, current_image = video_capture.read() + next_image_index = int(args.start * video_fps) + refresh = False + + # Init marker selection + scene_markers = {} + selected_marker_id = -1 + hovered_marker_id = -1 + + # Init place edition + place_edit = {} + + while not exit.status(): + + # Edit fake gaze position from pointer + gaze_position = GazeFeatures.GazePosition(pointer, precision=2) + + # Reset info image + info_image = numpy.full((850, 1500, 3), 0, dtype=numpy.uint8) + + # Select a new image and detect markers once + if next_image_index != current_image_index or refresh or draw_cover: + + video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index) + + success, video_image = video_capture.read() + + if success: + + # Refresh once + refresh = False + + current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1 + current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC) + + # Keep central square + cv2.rectangle(video_image, (0, 0), (int((video_width-video_height)/2), int(video_height)), (0, 0, 0), -1) + cv2.rectangle(video_image, (int(video_width-(video_width-video_height)/2), 0), (int(video_width), int(video_height)), (0, 0, 0), -1) + + # Hide zone + if draw_cover: + + # Draw black circle under pointer + cv2.circle(video_image, pointer, 50, (0, 0, 0), -1) + + # Process video image + try: + + aruco_camera.watch(current_image_time, video_image) + exception = None + + except Exception as e: + + exception = e + + # Update video image + video_image = aruco_camera.image() + + # Write exception + if exception is not None: + + cv2.rectangle(video_image, (0, video_height-50), (video_width, video_height), (0, 0, 127), -1) + cv2.putText(video_image, f'{exception}', (20, video_height-10), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + + # Draw gray panel on top + cv2.rectangle(video_image, (0, 0), (video_width, 50), (63, 63, 63), -1) + + # Draw camera calibration + if draw_grid: + + cv2.putText(video_image, f'Grid at {z_grid} cm', (500, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + aruco_camera.aruco_detector.optic_parameters.draw(video_image, video_width/10, video_height/10, z_grid, color=(127, 127, 127)) + + # Write timing + cv2.putText(video_image, f'Time: {int(current_image_time)} ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + + # Copy image + current_image = video_image.copy() + + # Keep last image + else: + + video_image = current_image.copy() + ''' + # Handle scene marker selection on left click + if len(scene_markers) > 0: + + # Update selected and hovered markers id + selected_marker_id = -1 + hovered_marker_id = -1 + for (marker_id, marker) in scene_markers.items(): + + marker_aoi = marker.corners.reshape(4, 2).view(AOIFeatures.AreaOfInterest) + + # Select by left clicking on marker + if marker_aoi.contains_point(left_click): + + selected_marker_id = marker_id + + # Hover by pointing on marker + if marker_aoi.contains_point(pointer): + + hovered_marker_id = marker_id + + # Edit marker's color + color_list = list(itertools.permutations([0, 255, 255])) + + for i, m in scene_markers.items(): + + m.color = color_list[i%len(color_list)] + + if i == selected_marker_id or i == hovered_marker_id: + continue + + if hovered_marker_id > 0: + m.color = (0, 0, 0) + else: + m.color = (127, 127, 127) + + # Draw center + cv2.circle(video_image, m.center.astype(int), 5, m.color, -1) + + try: + + # A marker is selected + if selected_marker_id >= 0: + + try: + + # Retreive selected marker + selected_marker = scene_markers[selected_marker_id] + + # Write selected marker id + cv2.rectangle(info_image, (0, 0), (500, 50), selected_marker.color, -1) + cv2.putText(info_image, f'Selected marker #{selected_marker.identifier}', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.rectangle(info_image, (0, 50), (500, video_height), (255, 255, 255), -1) + + # Write selected marker rotation matrix + R = ArUcoScene.make_euler_rotation_vector(selected_marker.rotation) + cv2.putText(info_image, f'Rotation (camera axis)', (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[0]:.3f}', (40, 160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[1]:.3f}', (40, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[2]:.3f}', (40, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Write selected marker translation vector + T = selected_marker.translation + cv2.putText(info_image, f'Translation (camera axis):', (20, 320), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[0]:.3f}', (40, 360), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[1]:.3f}', (40, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[2]:.3f}', (40, 440), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Retreive selected marker place + selected_place = ar_scene.aruco_scene.places[selected_marker_id] + + # On right click + if right_button: + + pointer_delta_x, pointer_delta_y = right_drag[0] / video_width, right_drag[1] / video_height + + place_edit[selected_marker_id] = {'rotation': (0, 0, 0), 'translation': (0, 0, 0)} + + if edit_trans: + + # Edit place rotation + if edit_z: + place_edit[selected_marker_id]['rotation'] = (0, 0, -pointer_delta_y) + else: + place_edit[selected_marker_id]['rotation'] = (pointer_delta_y, pointer_delta_x, 0) + + else: + + # Edit place translation + if edit_z: + place_edit[selected_marker_id]['translation'] = (0, 0, pointer_delta_y) + else: + place_edit[selected_marker_id]['translation'] = (-pointer_delta_x, pointer_delta_y, 0) + + # Edit transformations + R = ArUcoScene.make_rotation_matrix(*place_edit[selected_marker_id]['rotation']).T + T = numpy.array(place_edit[selected_marker_id]['translation']) + + # Apply transformations + edited_place = ArUcoScene.Place(selected_place.translation + T, selected_place.rotation.dot(R), selected_marker) + + else: + + edited_place = selected_place + + # A marker is hovered while another is selected + if hovered_marker_id >= 0 and hovered_marker_id != selected_marker_id: + + # Retreive hovered marker + hovered_marker = scene_markers[hovered_marker_id] + + # Write hovered marker id + cv2.rectangle(info_image, (500, 0), (1000, 50), hovered_marker.color, -1) + cv2.putText(info_image, f'Hovered marker #{hovered_marker.identifier}', (520, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.rectangle(info_image, (500, 50), (1000, video_height), (255, 255, 255), -1) + + # Write hovered marker rotation matrix + R = ArUcoScene.make_euler_rotation_vector(hovered_marker.rotation) + cv2.putText(info_image, f'Rotation (camera axis)', (520, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[0]:.3f}', (540, 160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[1]:.3f}', (540, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[2]:.3f}', (540, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Write hovered marker translation vector + T = hovered_marker.translation + cv2.putText(info_image, f'Translation (camera axis):', (520, 320), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[0]:.3f}', (540, 360), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[1]:.3f}', (540, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[2]:.3f}', (540, 440), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Retreive hovered marker place + hovered_place = ar_scene.aruco_scene.places[hovered_marker_id] + + # Write hovered place rotation matrix + R = ArUcoScene.make_euler_rotation_vector(hovered_place.rotation) + cv2.putText(info_image, f'Rotation (scene axis):', (520, 500), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[0]:.3f}', (540, 540), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[1]:.3f}', (540, 580), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[2]:.3f}', (540, 620), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Write hovered place translation vector + T = hovered_place.translation + cv2.putText(info_image, f'Translation (scene axis):', (520, 700), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[0]:.3f}', (540, 740), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[1]:.3f}', (540, 780), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[2]:.3f}', (540, 820), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Rotation between markers and places + markers_rotation_matrix = hovered_marker.rotation.dot(selected_marker.rotation.T) + places_rotation_matrix = hovered_place.rotation.dot(selected_place.rotation.T) + + markers_rotation_vector = ArUcoScene.make_euler_rotation_vector(markers_rotation_matrix) + places_rotation_vector = ArUcoScene.make_euler_rotation_vector(places_rotation_matrix) + + # Translation info between markers and places + markers_translation = hovered_marker.translation - selected_marker.translation + places_translation = hovered_place.translation - selected_place.translation + + # Write selected/hovered markers id + cv2.rectangle(info_image, (1000, 0), (1500, 50), (63, 63, 63), -1) + cv2.putText(info_image, f'#{selected_marker.identifier} -> #{hovered_marker.identifier}', (1020, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.rectangle(info_image, (1000, 50), (1500, video_height), (190, 190, 190), -1) + + # Write selected/hovered markers rotation matrix + R = markers_rotation_vector + cv2.putText(info_image, f'Rotation (camera axis)', (1020, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[0]:.3f}', (1040, 160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[1]:.3f}', (1040, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[2]:.3f}', (1040, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Write selected/hovered markers translation vector + T = markers_translation + cv2.putText(info_image, f'Translation (camera axis):', (1020, 320), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[0]:.3f}', (1040, 360), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[1]:.3f}', (1040, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[2]:.3f}', (1040, 440), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Write selected/hovered places rotation matrix + R = places_rotation_vector + cv2.putText(info_image, f'Rotation (scene axis):', (1020, 500), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[0]:.3f}', (1040, 540), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[1]:.3f}', (1040, 580), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[2]:.3f}', (1040, 620), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Write selected/hovered places translation vector + T = places_translation + cv2.putText(info_image, f'Translation (scene axis):', (1020, 700), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[0]:.3f}', (1040, 740), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[1]:.3f}', (1040, 780), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[2]:.3f}', (1040, 820), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + if snap: + + # Snap once + snap = False + + print(f'******* SNAP {selected_marker_id} / {hovered_marker_id} *******') + + # Edit rotation transformation + #R = places_rotation_matrix.dot(rmat.T).dot(markers_rotation_matrix.T).dot(rmat) + + #rmat_places_rotation_vector = ArUcoScene.make_euler_rotation_vector(places_rotation_matrix.dot(rmat.T)) + rdiff = places_rotation_vector - markers_rotation_vector + R = ArUcoScene.make_rotation_matrix(*rdiff) + + print(f'markers_rotation_vector: {markers_rotation_vector}') + print(f'places_rotation_vector: {places_rotation_vector}') + print(f'rdiff: {rdiff}') + print(f'R: {ArUcoScene.make_euler_rotation_vector(R)}') + + # Edit translation transformation + T = (places_translation.dot(rmat.T) - markers_translation).dot(rmat) + + print(f'markers_translation: {markers_translation} ({numpy.linalg.norm(markers_translation)})') + print(f'places_translation: {places_translation} ({numpy.linalg.norm(places_translation)})') + print(f'T: {T} ({numpy.linalg.norm(T)})') + + # Apply transformations + edited_place = ArUcoScene.Place(selected_place.translation + T, selected_place.rotation, selected_marker) + + # Write edited place rotation matrix + R = ArUcoScene.make_euler_rotation_vector(edited_place.rotation) + cv2.putText(info_image, f'Rotation (scene axis):', (20, 500), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[0]:.3f}', (40, 540), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[1]:.3f}', (40, 580), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{R[2]:.3f}', (40, 620), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Write edited place translation vector + T = edited_place.translation + cv2.putText(info_image, f'Translation (scene axis):', (20, 700), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[0]:.3f}', (40, 740), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[1]:.3f}', (40, 780), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA) + cv2.putText(info_image, f'{T[2]:.3f}', (40, 820), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv2.LINE_AA) + + # Replace selected place by edited place + ar_scene.aruco_scene.places[selected_marker_id] = edited_place + + # Refresh places consistency + ar_scene.aruco_scene.init_places_consistency() + + # Estimate scene pose from each marker + cv2.putText(video_image, f'Single marker scene pose estimation', (20, video_height - 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + + for i, m in scene_markers.items(): + + tvec, rmat = ar_scene.aruco_scene.estimate_pose_from_single_marker(m) + + # Project AOI scene into image according estimated pose + aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV) + + if i == selected_marker_id: + + # Draw AOI scene projection with gaze + aoi_scene_projection.draw_circlecast(video_image, gaze_position, 1, base_color=m.color, matching_color=(255, 255, 255)) + + else: + + # Draw AOI scene + aoi_scene_projection.draw(video_image, color=m.color) + + # Draw expected marker places + ar_scene.draw_places(video_image) + + # Catch missing selected marker + except KeyError: + + cv2.putText(video_image, f'Marker {selected_marker_id} not found', (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + + # No marker selected + else: + + cv2.putText(info_image, f'Left click on marker to select it', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + + if len(scene_markers) > 1: + + # Check markers consistency + consistent_markers, unconsistent_markers, unconsistencies = ar_scene.aruco_scene.check_markers_consistency(scene_markers, ar_scene.angle_tolerance, ar_scene.distance_tolerance) + + # No marker hovered + if hovered_marker_id < 0: + + # Set unconsistent marker color to red + for i, m in scene_markers.items(): + if i in list(unconsistent_markers.keys()) and i != hovered_marker_id: + m.color = (0, 0, 255) + + # Write unconsistencies + line = 0 + for i, (label, value) in enumerate(unconsistencies['rotation'].items()): + + current_rotation = value['current'] + expected_rotation = value['expected'] + + cv2.putText(info_image, f'Unconsistent rotation {label}: [{current_rotation[0]:.3f} {current_rotation[1]:.3f} {current_rotation[2]:.3f}]', (20, 120+line*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + line += 1 + + cv2.putText(info_image, f'Expected rotation {label}: [{expected_rotation[0]:.3f} {expected_rotation[1]:.3f} {expected_rotation[2]:.3f}]', (20, 120+line*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + line += 1 + + for i, (label, value) in enumerate(unconsistencies['translation'].items()): + + current_translation = value['current'] + expected_translation = value['expected'] + + cv2.putText(info_image, f'Unconsistent translation {label}: {current_translation:.3f}', (20, 120+ line*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA) + line += 1 + + cv2.putText(info_image, f'Expected translation {label}: {expected_translation:.3f}', (20, 120+ line*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + line += 1 + + # Force pose mode to single marker scene pose estimation + else: + + pose_mode = 0 + + # Single marker scene pose estimation + if pose_mode == 0: + + cv2.putText(video_image, f'Single marker scene pose estimation', (20, video_height - 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + + for i, m in scene_markers.items(): + + tvec, rmat = ar_scene.aruco_scene.estimate_pose_from_single_marker(m) + + # Project AOI scene into image according estimated pose + aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV) + + # Draw AOI scene + aoi_scene_projection.draw(video_image, color=m.color) + + # Consistent markers scene pose estimation + if pose_mode == 1: + + cv2.putText(video_image, f'Consistent markers scene pose estimation', (20, video_height - 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + + tvec, rmat = ar_scene.aruco_scene.estimate_pose_from_markers(consistent_markers) + + # Project AOI scene into image according estimated pose + aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV) + + # Draw AOI scene + aoi_scene_projection.draw(video_image, color=(255, 255, 255)) + + # ArUco marker axis scene pose estimation + elif pose_mode == 2: + + # Write pose estimation strategy + cv2.putText(video_image, f'ArUco marker axis scene pose estimation', (20, video_height - 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + + for axis_name, axis_markers in ar_scene.aruco_axis.items(): + + try: + + origin_marker = scene_markers[axis_markers['origin_marker']] + horizontal_axis_marker = scene_markers[axis_markers['horizontal_axis_marker']] + vertical_axis_marker = scene_markers[axis_markers['vertical_axis_marker']] + + tvec, rmat = ar_scene.aruco_scene.estimate_pose_from_axis_markers(origin_marker, horizontal_axis_marker, vertical_axis_marker) + + # Project AOI scene into image according estimated pose + aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV) + + # Draw AOI scene + aoi_scene_projection.draw(video_image, color=(255, 255, 255)) + + break + + except: + pass + + # ArUco AOI scene building + elif pose_mode == 3: + + # Write pose estimation strategy + cv2.putText(video_image, f'ArUco AOI scene building', (20, video_height - 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + + try : + + # Try to build AOI scene from detected ArUco marker corners + aoi_scene_projection = ar_scene.build_aruco_aoi_scene(scene_markers) + + # Draw AOI scene + aoi_scene_projection.draw(video_image, color=(255, 255, 255)) + + except: + pass + + # Draw expected marker places + #ar_scene.draw_places(video_image) + + # Catch exceptions raised by estimate_pose and project methods + except (ArFeatures.PoseEstimationFailed) as e: + + cv2.rectangle(video_image, (0, 90), (700, 130), (127, 127, 127), -1) + cv2.putText(video_image, f'Error: {e}', (20, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + + # Draw image + cv2.imshow(aruco_camera.name, video_image) + ''' + + # Draw pointer + gaze_position.draw(video_image) + + # Write documentation + cv2.putText(video_image, f'Press \'h\' for help', (950, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + ''' + if draw_help: + + cv2.rectangle(video_image, (0, 50), (700, 300), (127, 127, 127), -1) + cv2.putText(video_image, f'> Left click on marker: select marker', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'> Left click on image: unselect marker', (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'> T: translate, R: rotate, Z: select axis', (20, 160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'> Right click and drag: edit axis', (20, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'> Ctrl + S: save environment', (20, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'> Backspace: reload environment', (20, 280), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + + # Write selected marker id + elif selected_marker_id >= 0: + + cv2.rectangle(video_image, (0, 50), (700, 90), (127, 127, 127), -1) + + # Select color + if edit_z: + str_axis = 'Z' + color_axis = (255, 0, 0) + else: + str_axis = 'XY' + color_axis = (0, 255, 255) + + if edit_trans: + cv2.putText(video_image, f'Rotate marker {selected_marker_id} around axis {str_axis}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, color_axis, 1, cv2.LINE_AA) + else: + cv2.putText(video_image, f'Translate marker {selected_marker_id} along axis {str_axis}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, color_axis, 1, cv2.LINE_AA) + ''' + key_pressed = cv2.waitKey(10) + + #if key_pressed != -1: + # print(key_pressed) + + # Select previous image with left arrow + if key_pressed == 2: + next_image_index -= 1 + + # Select next image with right arrow + if key_pressed == 3: + next_image_index += 1 + + # Clip image index + if next_image_index < 0: + next_image_index = 0 + + # Edit rotation with r key + if key_pressed == 114: + edit_trans = True + + # Edit translation with t key + if key_pressed == 116: + edit_trans = False + + # Switch Z axis edition + if key_pressed == 122: + edit_z = not edit_z + + # Snap hovered marker with s key + if key_pressed == 115: + snap = True + + # Switch help mode with h key + if key_pressed == 104: + draw_help = not draw_help + + # Switch grid mode with g key + if key_pressed == 103: + draw_grid = not draw_grid + refresh = True + + # Raise z grid with down arrow + if key_pressed == 0: + z_grid += 10. + refresh = True + + # Unraise z grid with up arrow + if key_pressed == 1: + z_grid -= 10. + refresh = True + + # Switch draw_cover mode with c key + if key_pressed == 99: + draw_cover = not draw_cover + + # Switch pose estimation mode with m key + if key_pressed == 109: + pose_mode += 1 + if pose_mode > 3: + pose_mode = 0 + + # Save selected marker edition using 'Ctrl + s' + if key_pressed == 19: + aruco_camera.to_json(args.output) + print(f'Environment saved into {args.output}') + + # Close window using 'Esc' key + if key_pressed == 27: + break + + # Reload configuration on 'Backspace' key + if key_pressed == 127: + aruco_camera = ArUcoCamera.ArUcoCamera.from_json(args.configuration) + print(f'Configuration reloaded from {args.configuration}') + refresh = True + + # Display video + cv2.imshow(aruco_camera.name, video_image) + + # Display info + cv2.imshow('Info', info_image) + + # Close movie capture + video_capture.release() + + # Stop image display + cv2.destroyAllWindows() + +if __name__ == '__main__': + + main() \ No newline at end of file diff --git a/src/argaze/utils/demo_data/demo_aruco_markers_setup.json b/src/argaze/utils/demo_data/demo_aruco_markers_setup.json index 37a30d4..0a306d1 100644 --- a/src/argaze/utils/demo_data/demo_aruco_markers_setup.json +++ b/src/argaze/utils/demo_data/demo_aruco_markers_setup.json @@ -1,6 +1,7 @@ { "name": "demo_camera", "size": [1280, 720], + "provider": "provider_setup.json", "aruco_detector": { "dictionary": "DICT_APRILTAG_16h5", "parameters": { diff --git a/src/argaze/utils/demo_data/provider_setup.json b/src/argaze/utils/demo_data/provider_setup.json new file mode 100644 index 0000000..d63f914 --- /dev/null +++ b/src/argaze/utils/demo_data/provider_setup.json @@ -0,0 +1,7 @@ +{ + "TobiiProGlasses2" : { + "address": "10.34.0.17", + "project": "MyProject", + "participant": "NewParticipant" + } +} \ No newline at end of file diff --git a/src/argaze/utils/worn_device_stream.py b/src/argaze/utils/worn_device_stream.py new file mode 100644 index 0000000..faa2543 --- /dev/null +++ b/src/argaze/utils/worn_device_stream.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python + +"""Load ArUcoCamera from a configuration file then, stream and process gaze positions and image from any worn eye-tracker device. + +This program is free software: you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation, either version 3 of the License, or (at your option) any later +version. +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +You should have received a copy of the GNU General Public License along with +this program. If not, see . +""" + +__author__ = "Théo de la Hogue" +__credits__ = [] +__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" +__license__ = "GPLv3" + +import argparse +import contextlib + +from argaze import GazeFeatures, DataFeatures +from argaze.ArUcoMarkers import ArUcoCamera + +import cv2 + +# Manage arguments +parser = argparse.ArgumentParser(description=__doc__.split('-')[0]) +parser.add_argument('configuration', metavar='CONFIGURATION', type=str, help='configuration filepath') +parser.add_argument('-p', '--patch', metavar='PATCH', type=str, help='configuration patch filepath') +parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') +args = parser.parse_args() + +def main(): + + # Load ArUcoCamera configuration + with ArUcoCamera.ArUcoCamera.from_json(args.configuration, args.patch) as aruco_camera: + + if args.verbose: + + print(aruco_camera) + + # DEBUG + print(dir(aruco_camera)) + + # Gaze position processing + def gaze_position_callback(timestamped_gaze_position: GazeFeatures.GazePosition): + + # Project gaze position into environment + try: + + aruco_camera.look(timestamped_gaze_position) + + # Handle exceptions + except Exception as e: + + print(e) + + # Attach gaze position callback to provider + aruco_camera.provider.attach(gaze_position_callback) + + # Image processing + def image_callback(timestamp: int|float, image): + + # Detect ArUco code and project ArScenes + try: + + # Watch ArUco markers into image and estimate camera pose + aruco_camera.watch(image, timestamp=timestamp) + + # Handle exceptions + except Exception as e: + + print(e) + + # Attach image callback to provider + aruco_camera.provider.attach(image_callback) + + # Waiting for 'ctrl+C' interruption + with contextlib.suppress(KeyboardInterrupt): + + # Visualisation loop + while True: + + # Display camera frame image + image = aruco_camera.image() + + cv2.imshow(aruco_camera.name, image) + + # Display each scene frames image + for scene_frame in aruco_camera.scene_frames(): + + cv2.imshow(scene_frame.name, scene_frame.image()) + + # Key interaction + key_pressed = cv2.waitKey(10) + + # Esc: close window + if key_pressed == 27: + + raise KeyboardInterrupt() + + # Stop frame display + cv2.destroyAllWindows() + +if __name__ == '__main__': + + main() \ No newline at end of file -- cgit v1.1