From 6b8e7ae63a63a2854613b98db2fdeb079026748e Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Tue, 28 Mar 2023 11:53:14 +0200 Subject: Moving Tobii support into a dedicated repository. --- src/argaze/ArFeatures.py | 2 +- src/argaze/ArUcoMarkers/README.md | 6 +- src/argaze/TobiiGlassesPro2/README.md | 89 ---- src/argaze/TobiiGlassesPro2/TobiiController.py | 392 -------------- src/argaze/TobiiGlassesPro2/TobiiData.py | 565 -------------------- src/argaze/TobiiGlassesPro2/TobiiEntities.py | 334 ------------ .../TobiiGlassesPro2/TobiiInertialMeasureUnit.py | 253 --------- .../TobiiGlassesPro2/TobiiNetworkInterface.py | 226 -------- src/argaze/TobiiGlassesPro2/TobiiSpecifications.py | 15 - src/argaze/TobiiGlassesPro2/TobiiVideo.py | 283 ---------- src/argaze/TobiiGlassesPro2/__init__.py | 5 - .../utils/A4_calibration_target.pdf | Bin 1965 -> 0 bytes src/argaze/TobiiGlassesPro2/utils/imu.json | 21 - src/argaze/__init__.py | 2 +- src/argaze/utils/README.md | 75 +-- src/argaze/utils/tobii_camera_calibrate.py | 140 ----- src/argaze/utils/tobii_imu_calibrate.py | 214 -------- src/argaze/utils/tobii_sdcard_explore.py | 83 --- src/argaze/utils/tobii_segment_arscene_edit.py | 381 -------------- src/argaze/utils/tobii_segment_arscene_export.py | 306 ----------- src/argaze/utils/tobii_segment_data_plot_export.py | 141 ----- src/argaze/utils/tobii_segment_display.py | 150 ------ .../utils/tobii_segment_gaze_metrics_export.py | 242 --------- .../utils/tobii_segment_gaze_movements_export.py | 570 --------------------- src/argaze/utils/tobii_segment_record.py | 96 ---- src/argaze/utils/tobii_stream_arscene_display.py | 154 ------ src/argaze/utils/tobii_stream_display.py | 218 -------- 27 files changed, 8 insertions(+), 4955 deletions(-) delete mode 100644 src/argaze/TobiiGlassesPro2/README.md delete mode 100644 src/argaze/TobiiGlassesPro2/TobiiController.py delete mode 100644 src/argaze/TobiiGlassesPro2/TobiiData.py delete mode 100644 src/argaze/TobiiGlassesPro2/TobiiEntities.py delete mode 100644 src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py delete mode 100644 src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py delete mode 100644 src/argaze/TobiiGlassesPro2/TobiiSpecifications.py delete mode 100644 src/argaze/TobiiGlassesPro2/TobiiVideo.py delete mode 100644 src/argaze/TobiiGlassesPro2/__init__.py delete mode 100644 src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf delete mode 100644 src/argaze/TobiiGlassesPro2/utils/imu.json delete mode 100644 src/argaze/utils/tobii_camera_calibrate.py delete mode 100644 src/argaze/utils/tobii_imu_calibrate.py delete mode 100644 src/argaze/utils/tobii_sdcard_explore.py delete mode 100644 src/argaze/utils/tobii_segment_arscene_edit.py delete mode 100644 src/argaze/utils/tobii_segment_arscene_export.py delete mode 100644 src/argaze/utils/tobii_segment_data_plot_export.py delete mode 100644 src/argaze/utils/tobii_segment_display.py delete mode 100644 src/argaze/utils/tobii_segment_gaze_metrics_export.py delete mode 100644 src/argaze/utils/tobii_segment_gaze_movements_export.py delete mode 100644 src/argaze/utils/tobii_segment_record.py delete mode 100644 src/argaze/utils/tobii_stream_arscene_display.py delete mode 100644 src/argaze/utils/tobii_stream_display.py (limited to 'src') diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index d8a9f31..21a7d6a 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -102,7 +102,7 @@ class ArEnvironment(): return output class PoseEstimationFailed(Exception): - """Exception raised by ArScene project method when the pose can't be estimated due to unconsistencies.""" + """Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies.""" def __init__(self, message, unconsistencies=None): diff --git a/src/argaze/ArUcoMarkers/README.md b/src/argaze/ArUcoMarkers/README.md index bdc8f9e..931ee4b 100644 --- a/src/argaze/ArUcoMarkers/README.md +++ b/src/argaze/ArUcoMarkers/README.md @@ -7,8 +7,6 @@ Here is more [about ArUco markers dictionaries](https://docs.opencv.org/3.4/d9/d ## Utils -Print **A3_board_35cmx25cm_markers_4X4_3cm.pdf** onto A3 paper sheet to get board at expected dimensions. +Print **A3_DICT_ARUCO_ORIGINAL_3cm_35cmx25cm.pdf** onto A3 paper sheet to get board at expected dimensions. -Print **A4_markers_4x4_3cm.pdf** onto A4 paper sheet to get markers at expected dimensions. - -Load **detecter_configuration.json** file with argaze utils **tobii_segment_aruco_aoi_export.py** script with -p option. This is an example file to illustrate how to setup [ArUco markers detection parameters](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html). \ No newline at end of file +Print **A4_DICT_ARUCO_ORIGINAL_3cm_0-9.pdf** onto A4 paper sheet to get markers at expected dimensions. diff --git a/src/argaze/TobiiGlassesPro2/README.md b/src/argaze/TobiiGlassesPro2/README.md deleted file mode 100644 index f13490a..0000000 --- a/src/argaze/TobiiGlassesPro2/README.md +++ /dev/null @@ -1,89 +0,0 @@ -Class interface to handle Tobbi Glasses Pro 2 device. -This work is greatly inspired by the David de Tommaso and Agnieszka Wykowska [TobiiGlassesPySuite](https://arxiv.org/pdf/1912.09142.pdf). - -.. note:: - Read [Tobii Glasses Pro 2 device user manual](https://www.tobiipro.com/siteassets/tobii-pro/user-manuals/tobii-pro-glasses-2-user-manual.pdf). - -## Utils - -* Print **A4_calibration_target.pdf** onto A4 paper sheet to get calibration target at expected dimension. - -* Load **imu.json** file with argaze utils **tobii_imu_calibrate.py** script with -i option. This is an example file to illustrate how to load Inertial Measure Unit (IMU) calibration parameters. - -## Local network configuration - -If the tobii Glasses aren't connected to a router, here is how to configure a local DHCP server to enable IPv4 device connection. - -### Linux (Ubuntu) - -* Setup static eth0 interface - -**/etc/network/interfaces** - -``` -auto eth0 -iface eth0 inet static - address 192.168.1.1 - netmask 255.255.255.0 - network 192.168.1.0 - gateway 192.168.1.254 -``` - -* Install DHCP server: - -``` -sudo apt-get install isc-dhcp -``` - -* Setup DHCP server: - -**/etc/default/isc-dhcp-server** - -``` -# On what interfaces should the DHCP server (dhcpd) serve DHCP requests? -INTERFACESv4="eth0" -INTERFACESv6="" -``` - -**/etc/dhcp/dhcpd.conf** - -``` -# NECESSARY TO BE A DHCP SERVER -authoritative; - -# DHCP CONFIGURATION INFORMATION -default-lease-time 43200; -max-lease-time 86400; -server-name "dhcpserver.robotron.lan"; - -# DNS SERVERS DHCP WILL PUSH TO CLIENTS -option domain-name-servers 192.168.1.1; - -# SEARCH DOMAINS DHCP WILL PUSH TO CLIENTS -option domain-name "robotron.lan"; - -# DHCP STATIC IP ASSIGNMENTS FILE -include "/etc/dhcp/master.conf"; - -# SUBNET FOR IP ADDRESSES MANUALLY/STATICALLY ASSIGNED ONLY -subnet 192.168.1.0 netmask 255.255.255.0 { - option broadcast-address 192.168.1.255; - option subnet-mask 255.255.255.0; - option routers 192.168.1.254; -} -``` - -**/etc/dhcp/master.conf** - -``` -# Static IP assignments -## SUBNET - 192.168.1.0/24 -host tobiiglasses { hardware ethernet 74:fe:48:34:7c:92; fixed-address 192.168.1.10; } -``` -Replace 74:fe:48:34:7c:92 by the correct MAC address. - -* Monitor DHCP server activity: - -``` -journalctl | grep -Ei 'dhcp' -``` diff --git a/src/argaze/TobiiGlassesPro2/TobiiController.py b/src/argaze/TobiiGlassesPro2/TobiiController.py deleted file mode 100644 index 39f4e46..0000000 --- a/src/argaze/TobiiGlassesPro2/TobiiController.py +++ /dev/null @@ -1,392 +0,0 @@ -#!/usr/bin/env python - -import datetime -import uuid - -from argaze.TobiiGlassesPro2 import TobiiNetworkInterface, TobiiData, TobiiVideo - -TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f' -TOBII_DATETIME_FORMAT_HUMREAD = '%d/%m/%Y %H:%M:%S' - -DEFAULT_PROJECT_NAME = 'DefaultProject' -DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant' -DEFAULT_RECORD_NAME = 'DefaultRecord' - -class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface): - """Handle Tobii glasses Pro 2 device using network interface. - It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py).""" - - project_name = None - """Project name.""" - - project_id = None - """Project identifier.""" - - participant_name = None - """Participant name.""" - - participant_id = None - """Participant identifier.""" - - calibration_id = None - """Calibration identifier.""" - - def __init__(self, ip_address = None, project_name = DEFAULT_PROJECT_NAME, participant_name = DEFAULT_PARTICIPANT_NAME): - """Create a project, a participant and start calibration.""" - - super().__init__(ip_address) - - # bind to project or create one if it doesn't exist - self.project_name = project_name - self.project_id = self.set_project(self.project_name) - - # bind to participant or create one if it doesn't exist - self.participant_name = participant_name - self.participant_id = self.set_participant(self.project_id, self.participant_name) - - self.__recording_index = 0 - - self.__data_stream = None - self.__video_stream = None - - self.__record_event_thread = None - - super().wait_for_status('/api/system/status', 'sys_status', ['ok']) == 'ok' - - def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT): - return datetime.datetime.now().replace(microsecond=0).strftime(timeformat) - - # STREAMING FEATURES - - def enable_data_stream(self) -> "TobiiData.TobiiDataStream": - """Enable Tobii Glasses Pro 2 data streaming.""" - - if self.__data_stream == None: - self.__data_stream = TobiiData.TobiiDataStream(self) - - return self.__data_stream - - def enable_video_stream(self) -> "TobiiVideo.TobiiVideoStream": - """Enable Tobii Glasses Pro 2 video camera streaming.""" - - if self.__video_stream == None: - self.__video_stream = TobiiVideo.TobiiVideoStream(self) - - return self.__video_stream - - def start_streaming(self): - """Start data and/or video streaming.""" - - if self.__data_stream != None: - self.__data_stream.open() - - if self.__video_stream != None: - self.__video_stream.open() - - def stop_streaming(self): - """Stop data and/or video streaming.""" - - if self.__data_stream != None: - self.__data_stream.close() - - if self.__video_stream != None: - self.__video_stream.close() - - # PROJECT FEATURES - - def set_project(self, project_name = DEFAULT_PROJECT_NAME) -> str: - """Bind to a project or create one if it doesn't exist. - - * **Returns:** - - project id - """ - - project_id = self.get_project_id(project_name) - - if project_id is None: - - data = { - 'pr_info' : { - 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD), - 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, project_name)), - 'Name': project_name - }, - 'pr_created': self.__get_current_datetime() - } - - json_data = super().post_request('/api/projects', data) - - return json_data['pr_id'] - - else: - - return project_id - - def get_project_id(self, project_name) -> str: - """Get project id.""" - - project_id = None - projects = super().get_request('/api/projects') - - for project in projects: - - try: - if project['pr_info']['Name'] == project_name: - project_id = project['pr_id'] - except: - pass - - return project_id - - def get_projects(self) -> str: - """Get all projects id.""" - - return super().get_request('/api/projects') - - # PARTICIPANT FEATURES - - def set_participant(self, project_id, participant_name = DEFAULT_PARTICIPANT_NAME, participant_notes = '') -> str: - """Bind to a participant or create one if it doesn't exist. - - * **Returns:** - - participant id - """ - - participant_id = self.get_participant_id(participant_name) - - if participant_id is None: - - data = { - 'pa_project': project_id, - 'pa_info': { - 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.participant_name)), - 'Name': self.participant_name, - 'Notes': participant_notes - }, - 'pa_created': self.__get_current_datetime() - } - - json_data = super().post_request('/api/participants', data) - - return json_data['pa_id'] - - else: - - return participant_id - - def get_participant_id(self, participant_name) -> str: - """Get participant id.""" - - participant_id = None - participants = super().get_request('/api/participants') - - for participant in participants: - - try: - if participant['pa_info']['Name'] == participant_name: - participant_id = participant['pa_id'] - - except: - pass - - return participant_id - - def get_participants(self) -> str: - """Get all participants id.""" - - return super().get_request('/api/participants') - - # CALIBRATION - - def calibrate(self): - """Start Tobii glasses calibration for current project and participant.""" - - input('Position Tobbi glasses calibration target then press \'Enter\' to start calibration.') - - data = { - 'ca_project': self.project_id, - 'ca_type': 'default', - 'ca_participant': self.participant_id, - 'ca_created': self.__get_current_datetime() - } - - json_data = super().post_request('/api/calibrations', data) - - self.calibration_id = json_data['ca_id'] - - super().post_request('/api/calibrations/' + self.calibration_id + '/start') - - status = super().wait_for_status('/api/calibrations/' + self.calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) - - if status == 'uncalibrated' or status == 'stale' or status == 'failed': - raise Error(f'Tobii calibration {self.calibration_id} {status}') - - # RECORDING FEATURES - - def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']): - return super().wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array) - - def create_recording(self, participant_name, recording_notes = '') -> str: - """Create a new recording. - - * **Returns:** - - recording id - """ - - participant_id = self.get_participant_id(participant_name) - - if participant_id is None: - raise NameError(f'{participant_name} participant doesn\'t exist') - - self.__recording_index += 1 - recording_name = f'Recording_{self.__recording_index}' - - data = { - 'rec_participant': participant_id, - 'rec_info': { - 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)), - 'Name': recording_name, - 'Notes': recording_notes - }, - 'rec_created': self.__get_current_datetime() - } - - json_data = super().post_request('/api/recordings', data) - - return json_data['rec_id'] - - def start_recording(self, recording_id) -> bool: - """Start recording on the Tobii interface's SD Card.""" - - super().post_request('/api/recordings/' + recording_id + '/start') - return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording' - - def stop_recording(self, recording_id) -> bool: - """Stop recording on the Tobii interface's SD Card.""" - - super().post_request('/api/recordings/' + recording_id + '/stop') - return self.__wait_for_recording_status(recording_id, ['done']) == "done" - - def pause_recording(self, recording_id) -> bool: - """Pause recording on the Tobii interface's SD Card.""" - - super().post_request('/api/recordings/' + recording_id + '/pause') - return self.__wait_for_recording_status(recording_id, ['paused']) == "paused" - - def __get_recording_status(self): - return self.get_status()['sys_recording'] - - def get_current_recording_id(self) -> str: - """Get current recording id.""" - - return self.__get_recording_status()['rec_id'] - - @property - def recording(self) -> bool: - """Is it recording?""" - - rec_status = self.__get_recording_status() - - if rec_status != {}: - if rec_status['rec_state'] == "recording": - return True - - return False - - def get_recordings(self) -> str: - """Get all recordings id.""" - - return super().get_request('/api/recordings') - - # EVENTS AND EXPERIMENTAL VARIABLES - - def __post_recording_data(self, event_type: str, event_tag = ''): - data = {'type': event_type, 'tag': event_tag} - super().post_request('/api/events', data, wait_for_response=False) - - def send_event(self, event_type: str, event_value = None): - self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value)) - - def send_variable(self, variable_name: str, variable_value = None): - self.__post_recording_data(str(variable_name), str(variable_value)) - - # MISC - - def eject_sd(self): - super().get_request('/api/eject') - - def get_battery_info(self): - return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) ) - - def get_battery_level(self): - return self.get_battery_status()['level'] - - def get_battery_remaining_time(self): - return self.get_battery_status()['remaining_time'] - - def get_battery_status(self): - return self.get_status()['sys_battery'] - - def get_et_freq(self): - return self.get_configuration()['sys_et_freq'] - - def get_et_frequencies(self): - return self.get_status()['sys_et']['frequencies'] - - def identify(self): - super().get_request('/api/identify') - - def get_address(self): - return self.address - - def get_configuration(self): - return super().get_request('/api/system/conf') - - def get_status(self): - return super().get_request('/api/system/status') - - def get_storage_info(self): - return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) ) - - def get_storage_remaining_time(self): - return self.get_storage_status()['remaining_time'] - - def get_storage_status(self): - return self.get_status()['sys_storage'] - - def get_scene_camera_freq(self): - return self.get_configuration()['sys_sc_fps'] - - def set_et_freq_50(self): - data = {'sys_et_freq': 50} - json_data = super().post_request('/api/system/conf', data) - - def set_et_freq_100(self): - # May not be available. Check get_et_frequencies() first. - data = {'sys_et_freq': 100} - json_data = super().post_request('/api/system/conf', data) - - def set_eye_camera_indoor_preset(self) -> str: - data = {'sys_ec_preset': 'Indoor'} - return super().post_request('/api/system/conf', data) - - def set_eye_camera_outdoor_preset(self) -> str: - data = {'sys_ec_preset': 'ClearWeather'} - return super().post_request('/api/system/conf', data) - - def set_scene_camera_auto_preset(self): - data = {'sys_sc_preset': 'Auto'} - json_data = super().post_request('/api/system/conf', data) - - def set_scene_camera_gaze_preset(self): - data = {'sys_sc_preset': 'GazeBasedExposure'} - json_data = super().post_request('/api/system/conf', data) - - def set_scene_camera_freq_25(self): - data = {'sys_sc_fps': 25} - json_data = super().post_request('/api/system/conf/', data) - - def set_scene_camera_freq_50(self): - data = {'sys_sc_fps': 50} - json_data = super().post_request('/api/system/conf/', data) - diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py deleted file mode 100644 index 0e28054..0000000 --- a/src/argaze/TobiiGlassesPro2/TobiiData.py +++ /dev/null @@ -1,565 +0,0 @@ -#!/usr/bin/env python - -from typing import Tuple, TypeVar -from dataclasses import dataclass -import threading -import uuid -import gzip -import json -import time -import queue - -from argaze import DataStructures -from argaze.TobiiGlassesPro2 import TobiiNetworkInterface - -from argaze.utils import MiscFeatures - -import numpy - -TobiiDataObjectType = TypeVar('TobiiDataObjectType', bound="TobiiDataObjectType") -# Type definition for type annotation convenience - -@dataclass -class DirSig(): - """Define dir sig data (dir sig).""" - - dir: int # meaning ? - sig: int # meaning ? - -@dataclass -class PresentationTimeStamp(): - """Define presentation time stamp (pts) data.""" - - value: int - """Pts value.""" - -@dataclass -class VideoTimeStamp(): - """Define video time stamp (vts) data.""" - - value: int - """Vts value.""" - - offset: int - """Primary time stamp value.""" - -@dataclass -class EventSynch(): - """Define event synch (evts) data.""" - - value: int # meaning ? - """Evts value.""" - -@dataclass -class Event(): - """Define event data (ets type tag).""" - - ets: int # meaning ? - type: str - tag: str # dict ? - -@dataclass -class Accelerometer(): - """Define accelerometer data (ac).""" - - value: numpy.array - """Accelerometer value""" - -@dataclass -class Gyroscope(): - """Define gyroscope data (gy).""" - - value: numpy.array - """Gyroscope value""" - -@dataclass -class PupilCenter(): - """Define pupil center data (gidx pc eye).""" - - validity: int - index: int - value: tuple((float, float, float)) - eye: str # 'right' or 'left' - -@dataclass -class PupilDiameter(): - """Define pupil diameter data (gidx pd eye).""" - - validity: int - index: int - value: float - eye: str # 'right' or 'left' - -@dataclass -class GazeDirection(): - """Define gaze direction data (gidx gd eye).""" - - validity: int - index: int - value: tuple((float, float, float)) - eye: str # 'right' or 'left' - -@dataclass -class GazePosition(): - """Define gaze position data (gidx l gp).""" - - validity: int - index: int - l: str # ? - value: tuple((float, float)) - -@dataclass -class GazePosition3D(): - """Define gaze position 3D data (gidx gp3).""" - - validity: int - index: int - value: tuple((float, float)) - -@dataclass -class MarkerPosition(): - """Define marker data (marker3d marker2d).""" - - value_3d: tuple((float, float, float)) - value_2d: tuple((float, float)) - -class TobiiJsonDataParser(): - - def parse_dir_sig(self, status, json_data): - - return DirSig(json_data['dir'], json_data['sig']) - - def parse_pts(self, status, json_data): - - return PresentationTimeStamp(json_data['pts']) - - def parse_vts(self, status, json_data): - - return VideoTimeStamp(json_data['vts'], json_data['ts']) - - def parse_event_synch(self, status, json_data): - - return EventSynch(json_data['evts']) - - def parse_event(self, status, json_data): - - return Event(json_data['ets'], json_data['type'], json_data['tag']) - - def parse_accelerometer(self, status, json_data): - - return Accelerometer(json_data['ac']) - - def parse_gyroscope(self, status, json_data): - - return Gyroscope(json_data['gy']) - - def parse_pupil_center(self, status, gaze_index, json_data): - - return PupilCenter(status, gaze_index, json_data['pc'], json_data['eye']) - - def parse_pupil_diameter(self, status, gaze_index, json_data): - - return PupilDiameter(status, gaze_index, json_data['pd'], json_data['eye']) - - def parse_gaze_direction(self, status, gaze_index, json_data): - - return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye']) - - def parse_gaze_position(self, status, gaze_index, json_data): - - return GazePosition(status, gaze_index, json_data['l'], json_data['gp']) - - def parse_gaze_position_3d(self, status, gaze_index, json_data): - - return GazePosition3D(status, gaze_index, json_data['gp3']) - - def parse_marker_position(self, status, json_data): - - return MarkerPosition(json_data['marker3d'], json_data['marker2d']) - - def parse_pupil_or_gaze(self, status, json_data): - - gaze_index = json_data.pop('gidx') - - # parse pupil or gaze data depending second json key - second_key = next(iter(json_data)) - - parse_map = { - 'pc': self.parse_pupil_center, - 'pd': self.parse_pupil_diameter, - 'gd': self.parse_gaze_direction, - 'l': self.parse_gaze_position, - 'gp3': self.parse_gaze_position_3d - } - - return parse_map[second_key](status, gaze_index, json_data) - - def parse_data(self, status, json_data): - - # parse data depending first json key - first_key = next(iter(json_data)) - - parse_map = { - 'dir': self.parse_dir_sig, - 'pts': self.parse_pts, - 'vts': self.parse_vts, - 'evts': self.parse_event_synch, - 'ets': self.parse_event, - 'ac': self.parse_accelerometer, - 'gy': self.parse_gyroscope, - 'gidx': self.parse_pupil_or_gaze, - 'marker3d': self.parse_marker_position - } - - return parse_map[first_key](status, json_data) - -class TobiiDataSegment(): - """Handle Tobii Glasses Pro 2 segment data file from segment directory. - Load, parse and store each segment data into dedicated TimeStampedBuffers considering VideoTimeStamp offset to ease data/video synchronisation.""" - - def __init__(self, segment_data_path, start_timestamp = 0, end_timestamp = None): - - self.__path = segment_data_path - - self.__vts_offset = 0 - self.__vts_ts = -1 - - self.__json_data_parser = TobiiJsonDataParser() - - self.__ts_data_buffer_dict = { - 'DirSig': DataStructures.TimeStampedBuffer(), - 'PresentationTimeStamp': DataStructures.TimeStampedBuffer(), - 'VideoTimeStamp': DataStructures.TimeStampedBuffer(), - 'EventSynch': DataStructures.TimeStampedBuffer(), - 'Event': DataStructures.TimeStampedBuffer(), - 'Accelerometer': DataStructures.TimeStampedBuffer(), - 'Gyroscope': DataStructures.TimeStampedBuffer(), - 'PupilCenter': DataStructures.TimeStampedBuffer(), - 'PupilDiameter': DataStructures.TimeStampedBuffer(), - 'GazeDirection': DataStructures.TimeStampedBuffer(), - 'GazePosition': DataStructures.TimeStampedBuffer(), - 'GazePosition3D': DataStructures.TimeStampedBuffer(), - 'MarkerPosition': DataStructures.TimeStampedBuffer() - } - - # define a decoder function - def decode(json_data): - - # parse data status - status = json_data.pop('s', -1) - - # convert timestamp - ts = json_data.pop('ts') - - # watch for vts data to offset timestamps - try: - self.__vts_offset = json_data['vts'] - self.__vts_ts = ts - - # store primary ts value to allow further reverse offset operation - json_data['ts'] = ts - - except KeyError: - pass - - # ignore data before first vts entry - if self.__vts_ts == -1: - return True # continue - - ts -= self.__vts_ts - ts += self.__vts_offset - - # ignore timestamps out of the given time range - if ts < start_timestamp: - return True # continue - - if ts >= end_timestamp: - return False # stop - - # convert json data into data object - data_object = self.__json_data_parser.parse_data(status, json_data) - data_object_type = type(data_object).__name__ - - # store data object into dedicated timestamped buffer - self.__ts_data_buffer_dict[data_object_type][ts] = data_object - - return True # continue - - # start loading - with gzip.open(self.__path) as f: - - for item in f: - if not json.loads(item.decode('utf-8'), object_hook=decode): - break - - def __getitem__(self, key): - return self.__ts_data_buffer_dict[key] - - def keys(self) -> list[str]: - """Get all segment data keys.""" - - return list(self.__ts_data_buffer_dict.keys()) - - @property - def path(self) -> str: - """Get segment data path.""" - - return self.__path - -class TobiiDataStream(): - """Handle Tobii Glasses Pro 2 data stream in separate thread.""" - - def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface): - """Initialise network connection and prepare data reception.""" - - # Enable network connection - self.__network = network_interface - self.__data_socket = self.__network.make_socket() - - # Data reception - self.__data_thread = None - self.__json_data_parser = TobiiJsonDataParser() - self.__first_ts = 0 - - # Sync reading data subscription - self.reading_callbacks = [] - self.__subcription_lock = threading.Lock() - - # Keep connection alive - self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}" - self.__keep_alive_thread = threading.Thread(target = self.__keep_alive) - self.__keep_alive_thread.daemon = True - self.__keep_alive_thread.start() - - def __del__(self): - """Stop data reception and network connection before destruction.""" - - if self.__data_thread != None: - - threading.Thread.join(self.__data_thread) - self.__data_thread = None - - threading.Thread.join(self.__keep_alive_thread) - - self.__data_socket.close() - - def __keep_alive(self): - """Maintain network connection and clear socket when is not read.""" - - while True: - - # if no thread is reading data socket - if self.__data_thread == None: - - self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg) - - clear_count = 0 - while clear_count < 1000 and self.__data_thread == None: - - # Clear socket each milli second - time.sleep(0.001) - self.__network.grab_data(self.__data_socket) - clear_count += 1 - - else: - - self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg) - time.sleep(1) - - def open(self): - """Start data reception.""" - - if self.__data_thread == None: - - self.__first_ts = 0 - - self.__data_thread = threading.Thread(target = self.__run) - self.__data_thread.daemon = True - - self.__stop_event = threading.Event() - - self.__data_thread.start() - - def close(self): - """Stop data reception definitively.""" - - if self.__data_thread != None: - - self.__stop_event.set() - - threading.Thread.join(self.__data_thread) - self.__data_thread = None - - @property - def running(self) -> bool: - """Is tobii data streaming running?""" - - return self.__data_thread != None - - def __run(self): - """Managed received data for sync and async reading case. - - Sync: send data to callback function. - - Async: store data into a locked queue for further reading.""" - - while not self.__stop_event.isSet(): - - # grab data - data = self.__network.grab_data(self.__data_socket) - - # decode data - json_data = json.loads(data.decode('utf-8')) - - # parse json into timestamped data object - data_ts, data_object, data_object_type = self.__parse_json_data(json_data) - - # lock data subcription - self.__subcription_lock.acquire() - - # share incoming data to all subscribers - for callback in self.reading_callbacks: - - callback(data_ts, data_object, data_object_type) - - # unlock data subscription - self.__subcription_lock.release() - - def subscribe(self, reading_callback): - """Pass reading callback function to get incoming (data_ts, data_object, data_object_type) back.""" - - # lock data subcription - self.__subcription_lock.acquire() - - # append callback - self.reading_callbacks.append(reading_callback) - - # unlock data subscription - self.__subcription_lock.release() - - def unsubscribe(self, reading_callback): - """Remove reading callback function to stop data reception.""" - - # lock data subcription - self.__subcription_lock.acquire() - - # remove callback - self.reading_callbacks.remove(reading_callback) - - # unlock data subscription - self.__subcription_lock.release() - - def __parse_json_data(self, json_data): - - # parse data status - status = json_data.pop('s', -1) - - # convert timestamp - data_ts = json_data.pop('ts') - - # convert json data into data object - data_object = self.__json_data_parser.parse_data(status, json_data) - data_object_type = type(data_object).__name__ - - # keep first timestamp to offset all timestamps - if self.__first_ts == 0: - self.__first_ts = data_ts - - data_ts -= self.__first_ts - - return data_ts, data_object, data_object_type - - def __capture_callback(self, data_ts, data_object, data_object_type): - - if data_object_type == self.__data_stream_selector: - - if len(self.__data_ts_buffer.keys()) < self.__data_ts_buffer_size: - - # update first timestamp if next timestamps are negative - if data_ts < 0: - self.__first_ts += data_ts - data_ts = 0 - - self.__data_ts_buffer[data_ts] = data_object - - def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500) -> int: - """Start data stream capture. - - * **Returns: each 100 ms** - - buffer size - """ - - # Prepare for data acquisition - self.__data_ts_buffer = data_ts_buffer - self.__data_stream_selector = data_object_type - self.__data_ts_buffer_size = sample_number - - # Subscribe to tobii data stream - self.subscribe(self.__capture_callback) - - # Start data stream if needed - close_after = False - if not self.running: - self.open() - close_after = True - - # Share data acquisition progress - buffer_size = 0 - while buffer_size < sample_number: - - time.sleep(0.1) - - buffer_size = len(self.__data_ts_buffer.keys()) - - yield buffer_size - - # Stop data sream if needed - if close_after: - self.close() - - # Unsubscribe to tobii data stream - self.unsubscribe(self.__capture_callback) - - def __buffer_callback(self, data_ts, data_object, data_object_type): - - # Lock data queue access - self.__queue_lock.acquire() - - # Put data into the queue - self.__data_queue.put((data_ts, data_object, data_object_type)) - - # Unlock data queue access - self.__queue_lock.release() - - def read(self) -> Tuple[int, TobiiDataObjectType, str]: - """Iterate over incoming data buffer asynchronously.""" - - # Setup data buffering - self.__data_queue = queue.Queue() - self.__queue_lock = threading.Lock() - - # Subscribe to tobii data stream - self.subscribe(self.__buffer_callback) - - return self.__iter__() - - def __iter__(self): - - return self - - def __next__(self): - - # Wait for data - while self.__data_queue.empty(): - - time.sleep(0.0001) - continue - - # Lock data queue access - self.__queue_lock.acquire() - - # Get data from the queue - data_ts, data_object, data_object_type = self.__data_queue.get() - - # Unlock data queue access - self.__queue_lock.release() - - return data_ts, data_object, data_object_type diff --git a/src/argaze/TobiiGlassesPro2/TobiiEntities.py b/src/argaze/TobiiGlassesPro2/TobiiEntities.py deleted file mode 100644 index 0ae6dec..0000000 --- a/src/argaze/TobiiGlassesPro2/TobiiEntities.py +++ /dev/null @@ -1,334 +0,0 @@ -#!/usr/bin/env python - -from typing import TypeVar -import datetime -import json -import os - -from argaze.TobiiGlassesPro2 import TobiiData, TobiiVideo - -import av -import cv2 as cv - -TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f' - -TOBII_PROJECTS_DIRNAME = "projects" -TOBII_PROJECT_FILENAME = "project.json" - -TOBII_PARTICIPANTS_DIRNAME = "participants" -TOBII_PARTICIPANT_FILENAME = "participant.json" - -TOBII_RECORDINGS_DIRNAME = "recordings" -TOBII_RECORD_FILENAME = "recording.json" - -TOBII_SEGMENTS_DIRNAME = "segments" -TOBII_SEGMENT_INFO_FILENAME = "segment.json" -TOBII_SEGMENT_VIDEO_FILENAME = "fullstream.mp4" -TOBII_SEGMENT_DATA_FILENAME = "livedata.json.gz" - -DatetimeType = TypeVar('datetime', bound="datetime") -# Type definition for type annotation convenience - -class TobiiSegment: - """Handle Tobii Glasses Pro 2 segment info.""" - - def __init__(self, segment_path, start_timestamp:int = 0, end_timestamp:int = None): - """Load segment info from segment directory. - Optionnaly select a time range in microsecond.""" - - self.__id = os.path.basename(segment_path) - self.__path = segment_path - - with open(os.path.join(self.__path, TOBII_SEGMENT_INFO_FILENAME)) as f: - try: - item = json.load(f) - except: - raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}') - - self.__start_timestamp = start_timestamp - self.__end_timestamp = min(end_timestamp, int(item["seg_length"] * 1e6)) if end_timestamp != None else int(item["seg_length"] * 1e6) - - if self.__start_timestamp >= self.__end_timestamp: - raise ValueError('start time is equal or greater than end time.') - - self.__calibrated = bool(item["seg_calibrated"]) - - self.__start_date = datetime.datetime.strptime(item["seg_t_start"], TOBII_DATETIME_FORMAT) - self.__stop_date = datetime.datetime.strptime(item["seg_t_stop"], TOBII_DATETIME_FORMAT) - - @property - def path(self) -> str: - """Get segment path.""" - - return self.__path - - @property - def id(self) -> str: - """Get segment id.""" - return self.__id - - @property - def start_timestamp(self) -> int: - """Get the timestamp where the segment loading starts.""" - - return self.__start_timestamp - - @property - def end_timestamp(self) -> int: - """Get the timestamp where the segment loading ends.""" - - return self.__end_timestamp - - @property - def start_date(self) -> DatetimeType: - """Get the date when the segment has started.""" - - return self.__start_date - - @property - def stop_date(self) -> DatetimeType: - """Get the date when the segment has stopped.""" - - return self.__stop_date - - @property - def calibrated(self) -> bool: - """Is the segment has been calibrated?""" - - return self.__calibrated - - def load_data(self) -> "TobiiData.TobiiDataSegment": - """Load recorded data stream.""" - - return TobiiData.TobiiDataSegment(os.path.join(self.__path, TOBII_SEGMENT_DATA_FILENAME), self.__start_timestamp, self.__end_timestamp) - - def load_video(self) -> "TobiiVideo.TobiiVideoSegment": - """Load recorded video stream.""" - - return TobiiVideo.TobiiVideoSegment(os.path.join(self.__path, TOBII_SEGMENT_VIDEO_FILENAME), self.__start_timestamp, self.__end_timestamp) - -class TobiiRecording: - """Handle Tobii Glasses Pro 2 recording info and segments.""" - - def __init__(self, recording_path): - """Load recording info from recording directory.""" - - self.__id = os.path.basename(recording_path) - self.__path = recording_path - - with open(os.path.join(self.__path, TOBII_RECORD_FILENAME)) as f: - try: - item = json.load(f) - except: - raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_RECORD_FILENAME}') - - self.__name = item["rec_info"]["Name"] - self.__creation_date = datetime.datetime.strptime(item["rec_created"], TOBII_DATETIME_FORMAT) - - self.__length = int(item["rec_length"]) - self.__et_samples = int(item["rec_et_samples"]) - self.__et_samples_valid = int(item["rec_et_valid_samples"]) - self.__participant_id = item["rec_participant"] - - @property - def path(self) -> str: - """Get recording path.""" - - return self.__path - - @property - def id(self) -> str: - """Get recording id.""" - return self.__id - - @property - def name(self) -> str: - """Get recording name.""" - - return self.__name - - @property - def creation_date(self) -> DatetimeType: - """Get date when the recording has been done.""" - - return self.__creation_date - - @property - def length(self): - """Get record duration in second.""" - - return self.__length - - @property - def eyetracker_samples(self) -> int: - """Get numbers of recorded eye detecter samples.""" - - return self.__et_samples - - @property - def eyetracker_samples_valid(self) -> int: - """Get numbers of recorded eye detecter valid samples.""" - - return self.__et_samples_valid - - @property - def project(self) -> "TobiiProject": - """Get project to which it belongs.""" - - project_path = os.path.dirname(os.path.dirname(os.path.abspath(self.__path))) - - return TobiiProject(project_path) - - @property - def participant(self) -> "TobiiParticipant": - """Get participant to which it belongs.""" - - project_path = os.path.dirname(os.path.dirname(os.path.abspath(self.__path))) - - return TobiiParticipant(project_path + '/participants/' + self.__participant_id) - - def segments(self) -> list["TobiiSegment"]: - """Get all recorded segments.""" - - all_segments = [] - segments_path = os.path.join(self.__path, TOBII_SEGMENTS_DIRNAME) - - for item in os.listdir(segments_path): - segment_path = os.path.join(segments_path, item) - if os.path.isdir(segment_path): - all_segments.append(TobiiSegment(segment_path)) - - return all_segments - -class TobiiParticipant: - """Handle Tobii Glasses Pro 2 participant data.""" - - def __init__(self, participant_path): - """Load participant data from path""" - - self.__id = os.path.basename(participant_path) - self.__path = participant_path - - with open(os.path.join(self.__path, TOBII_PARTICIPANT_FILENAME)) as f: - try: - item = json.load(f) - except: - raise RuntimeError(f'JSON fails to load {source_dir}/{TOBII_PARTICIPANT_FILENAME}') - - self.__name = item["pa_info"]["Name"] - - @property - def path(self) -> str: - """Get participant path.""" - - return self.__path - - @property - def id(self) -> str: - """Get participant id.""" - return self.__id - - @property - def name(self) -> str: - """Get participant name.""" - - return self.__name - -class TobiiProject: - """Handle Tobii Glasses Pro 2 project data.""" - - def __init__(self, project_path): - """Load project data from projects directory and project id.""" - - self.__id = os.path.basename(project_path) - self.__path = project_path - - with open(os.path.join(self.__path, TOBII_PROJECT_FILENAME)) as f: - try: - item = json.load(f) - except: - raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_PROJECT_FILENAME}') - - self.__creation_date = datetime.datetime.strptime(item["pr_created"], TOBII_DATETIME_FORMAT) - - try: - self.__name = item["pr_info"]["Name"] - except: - self.__name = None - - @property - def path(self) -> str: - """Get project path.""" - - return self.__path - - @property - def id(self) -> str: - """Get project id.""" - return self.__id - - @property - def name(self) -> str: - """Get project name.""" - - return self.__name - - @property - def creation_date(self) -> DatetimeType: - """Get date when the project has been created.""" - - return self.__creation_date - - def participants(self) -> list["TobiiParticipant"]: - """Get all participants.""" - - all_participants = [] - participants_path = os.path.join(self.__path, TOBII_PARTICIPANTS_DIRNAME) - - for item in os.listdir(participants_path): - participant_path = os.path.join(participants_path, item) - if os.path.isdir(participant_path): - all_participants.append(TobiiParticipant(participant_path)) - - return all_participants - - def recordings(self) -> list["TobiiRecording"]: - """Get all recordings.""" - - all_recordings = [] - recordings_path = os.path.join(self.__path, TOBII_RECORDINGS_DIRNAME) - - for item in os.listdir(recordings_path): - recording_path = os.path.join(recordings_path, item) - if os.path.isdir(recording_path): - all_recordings.append(TobiiRecording(recording_path)) - - return all_recordings - -class TobiiDrive: - """Handle Tobii Glasses Pro 2 drive data.""" - - def __init__(self, drive_path): - """Load drive data from drive directory path.""" - - self.__path = drive_path - - @property - def path(self) -> str: - """Get drive path.""" - - return self.__path - - def projects(self) -> list["TobiiProject"]: - """Get all projects.""" - - all_projects = [] - projects_path = os.path.join(self.__path, TOBII_PROJECTS_DIRNAME) - - for item in os.listdir(projects_path): - project_path = os.path.join(projects_path, item) - if os.path.isdir(project_path): - all_projects.append(TobiiProject(project_path)) - - return all_projects - diff --git a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py b/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py deleted file mode 100644 index 35bb035..0000000 --- a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py +++ /dev/null @@ -1,253 +0,0 @@ -#!/usr/bin/env python - -from typing import Tuple -import json -import time -import math - -from argaze.TobiiGlassesPro2 import TobiiData - -import numpy -from scipy.optimize import curve_fit -import cv2 as cv - - -EARTH_GRAVITY = -9.81 -"""Earth gravity force (m/s2).""" - -EARTH_GRAVITY_VECTOR = [0, EARTH_GRAVITY, 0] -"""Earth gravity force vector.""" - -CAMERA_TO_IMU_TRANSLATION_VECTOR = [8, -1, -5] -"""Translation vector from camera referential to imu referential (cm).""" - -CAMERA_TO_IMU_ROTATION_VECTOR = [18, 0, 180] -"""Rotation vector from camera referential to imu referential (euler, degree).""" - -class TobiiInertialMeasureUnit(): - """Ease Tobbi [Inertial Measure Unit](https://connect.tobii.com/s/article/How-are-the-MEMS-data-reported-for-Tobii-Pro-Glasses-2?language=en_US) data handling""" - - def __init__(self): - """Define IMU calibration data.""" - - self.__gyroscope_offset = numpy.zeros(3) - self.__accelerometer_coefficients = numpy.array([[1., 0.], [1., 0.], [1., 0.]]) - - self.__plumb = numpy.array(EARTH_GRAVITY_VECTOR) - - self.reset_rotation() - self.reset_translation() - - def load_calibration_file(self, calibration_filepath): - """Load IMU calibration from a .json file.""" - - with open(calibration_filepath) as calibration_file: - - # Deserialize .json - # TODO find a better way - calibration_data = json.load(calibration_file) - - # Load calibration data - self.__gyroscope_offset = numpy.array(calibration_data['gyroscope_offset']) - self.__accelerometer_coefficients = numpy.array(calibration_data['accelerometer_coefficients']) - - def save_calibration_file(self, calibration_filepath): - """Save IMU calibration into .json file.""" - - calibration_data = { - 'gyroscope_offset': list(self.__gyroscope_offset), - 'accelerometer_coefficients': [list(self.__accelerometer_coefficients[0]), list(self.__accelerometer_coefficients[1]), list(self.__accelerometer_coefficients[2])] - } - - with open(calibration_filepath, 'w', encoding='utf-8') as calibration_file: - - json.dump(calibration_data, calibration_file, ensure_ascii=False, indent=4) - - def calibrate_gyroscope_offset(self, gyroscope_ts_buffer) -> numpy.array: - """Calibrate gyroscope offset from a timestamped gyroscope buffer. - **Returns:** numpy.array""" - - # Consider gyroscope values without timestamps - gyroscope_values = [] - for ts, data_object in gyroscope_ts_buffer.items(): - gyroscope_values.append(data_object.value) - - # Calculate average value for each axis - gx_offset = numpy.mean(numpy.array(gyroscope_values)[:, 0]) - gy_offset = numpy.mean(numpy.array(gyroscope_values)[:, 1]) - gz_offset = numpy.mean(numpy.array(gyroscope_values)[:, 2]) - - # Store result - self.__gyroscope_offset = numpy.array([gx_offset, gy_offset, gz_offset]) - - return self.__gyroscope_offset - - @property - def gyroscope_offset(self) -> numpy.array: - """Get gyroscope offset.""" - - return self.__gyroscope_offset - - def apply_gyroscope_offset(self, gyroscope_data_object: TobiiData.Gyroscope) -> "TobiiData.Gyroscope": - """Remove gyroscope offset to given gyroscope data.""" - - return TobiiData.Gyroscope(gyroscope_data_object.value - self.__gyroscope_offset) - - def reset_rotation(self): - """Reset rotation value before to start integration process.""" - - self.__last_gyroscope_ts = None - - self.__rotation = numpy.zeros(3) - - def update_rotation(self, gyroscope_data_ts, gyroscope_data_object): - """Integrate timestamped gyroscope values to update rotation.""" - - # Convert deg/s into deg/ms - current_gyroscope = gyroscope_data_object.value * 1e-3 - - # Init gyroscope integration - if self.__last_gyroscope_ts == None: - - self.__last_gyroscope_ts = gyroscope_data_ts - self.__last_gyroscope = current_gyroscope - - # Calculate elapsed time in ms - delta_time = (gyroscope_data_ts - self.__last_gyroscope_ts) / 1e3 - - # Integrate gyroscope - self.__rotation = self.__rotation + (self.__last_gyroscope * delta_time) - - # Store current as last - self.__last_gyroscope_ts = gyroscope_data_ts - self.__last_gyroscope = current_gyroscope - - @property - def rotation(self) -> numpy.array: - """Return current rotation value (euler angles in degree).""" - - return self.__rotation - - def _accelerometer_linear_fit(self, x, a, b): - """Linear function for accelerometer axis correction.""" - return a * x + b - - def calibrate_accelerometer_axis_coefficients(self, axis, upward_ts_buffer, downward_ts_buffer, perpendicular_ts_buffer): - """Calibrate one accelerometer axis using three data set (upward/+1g, downward/-1g, perpendicular/0g) for linear fit.""" - - # Consider accelerometer axis values without timestamps - accelerometer_values = [] - expected_values = [] - - for (upward_ts, upward_data_object), (downward_ts, downward_data_object), (perpendicular_ts, perpendicular_data_object) in zip(upward_ts_buffer.items(), downward_ts_buffer.items(), perpendicular_ts_buffer.items()): - - accelerometer_values.append(upward_data_object.value[axis]) - expected_values.append(+EARTH_GRAVITY) - - accelerometer_values.append(downward_data_object.value[axis]) - expected_values.append(-EARTH_GRAVITY) - - accelerometer_values.append(perpendicular_data_object.value[axis]) - expected_values.append(0.0) - - # Find optimal coefficients according linear fit between accelerometer values and expected values - optimal_coefficients, _ = curve_fit(self._accelerometer_linear_fit, accelerometer_values, expected_values, maxfev = 10000) - - # Store results for the given axis - self.__accelerometer_coefficients[axis] = numpy.array(optimal_coefficients) - - @property - def accelerometer_coefficients(self) -> numpy.array: - """Return accelerometer coefficients.""" - - return self.__accelerometer_coefficients - - def apply_accelerometer_coefficients(self, accelerometer_data_object: TobiiData.Accelerometer) -> "TobiiData.Accelerometer": - """Add accelerometer offset to given accelerometer data.""" - - x = self._accelerometer_linear_fit(accelerometer_data_object.value[0], *self.__accelerometer_coefficients[0]) - y = self._accelerometer_linear_fit(accelerometer_data_object.value[1], *self.__accelerometer_coefficients[1]) - z = self._accelerometer_linear_fit(accelerometer_data_object.value[2], *self.__accelerometer_coefficients[2]) - - return TobiiData.Accelerometer(numpy.array([x, y , z])) - - def reset_translation(self, translation_speed = numpy.zeros(3)): - """Reset translation value before to start integration process.""" - - self.__last_accelerometer_ts = None - - self.__translation_speed = translation_speed - self.__translation = numpy.zeros(3) - - def update_translation(self, accelerometer_data_ts, accelerometer_data_object): - """Integrate timestamped accelerometer values to update translation.""" - - print('> update_translation: accelerometer_data_ts=', accelerometer_data_ts) - - # Convert m/s2 into cm/ms2 - current_accelerometer = accelerometer_data_object.value * 1e-4 - - print('\tcurrent_accelerometer(cm/ms2)=', current_accelerometer) - print('\tcurrent_accelerometer norm=', numpy.linalg.norm(current_accelerometer)) - - # Init accelerometer integration - if self.__last_accelerometer_ts == None: - - self.__last_accelerometer_ts = accelerometer_data_ts - self.__last_accelerometer = current_accelerometer - self.__last_translation_speed = numpy.zeros(3) - - # Calculate elapsed time in ms - delta_time = (accelerometer_data_ts - self.__last_accelerometer_ts) / 1e3 - - print('\tdelta_time=', delta_time) - - # Integrate accelerometer - self.__translation_speed = self.__translation_speed + (self.__last_accelerometer * delta_time) - self.__translation = self.__translation + (self.__last_translation_speed * delta_time) - - print('\tself.__translation_speed(cm/ms)=', self.__translation_speed) - print('\tself.__translation(cm)=', self.__translation) - - # Store current as last - self.__last_accelerometer = current_accelerometer - self.__last_accelerometer_ts = accelerometer_data_ts - self.__last_translation_speed = self.__translation_speed - - print('< update_translation') - - #else: - - # print('no valid head plumb') - - @property - def translation(self) -> numpy.array: - """Return current translation vector.""" - - return self.__translation - - @property - def translation_speed(self) -> numpy.array: - """Return current translation speed vector.""" - - return self.__translation_speed - - def rotate_plumb(self, rvec): - """Rotate imu plumb to remove gravity effect in accelerometer data.""" - - C, _ = cv.Rodrigues(rvec) - self.__plumb = C.dot(EARTH_GRAVITY_VECTOR) - - # Check plumb length - assert(math.isclose(numpy.linalg.norm(self.__plumb), math.fabs(EARTH_GRAVITY), abs_tol=1e-3)) - - @property - def plumb(self) -> numpy.array: - """Return plumb vector.""" - - return self.__plumb - - def apply_plumb(self, accelerometer_data_object: TobiiData.Accelerometer) -> "TobiiData.Accelerometer": - """Remove gravity along plumb vector to given accelerometer data.""" - - return TobiiData.Accelerometer(accelerometer_data_object.value - self.__plumb) diff --git a/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py b/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py deleted file mode 100644 index c65b121..0000000 --- a/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py +++ /dev/null @@ -1,226 +0,0 @@ -from typing import TypeVar, Any -import logging -import sys -import socket -import threading -import json -import time - -# python2 backwards compatibility for errors -if sys.version_info[0] < 3: - class ConnectionError(BaseException): - pass - -try: - import netifaces - TOBII_DISCOVERY_ALLOWED = True -except: - TOBII_DISCOVERY_ALLOWED = False - -try: - from urllib.parse import urlparse, urlencode - from urllib.request import urlopen, Request - from urllib.error import URLError, HTTPError - -except ImportError: - from urlparse import urlparse - from urllib import urlencode - from urllib2 import urlopen, Request, HTTPError, URLError - -socket.IPPROTO_IPV6 = 41 - -SocketType = TypeVar('socket', bound="socket") -# Type definition for type annotation convenience - -class TobiiNetworkInterface(): - """Handle network connection to Tobii glasses Pro 2 device. - It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py).""" - - def __init__(self, address = None): - - self.udpport = 49152 - self.address = address - self.iface_name = None - - if self.address is None: - - data, address = self.__discover_device() - - if address is None: - raise ConnectionError("No device found using discovery process") - else: - try: - self.address = data["ipv4"] - except: - self.address = address - - if "%" in self.address: - if sys.platform == "win32": - self.address,self.iface_name = self.address.split("%") - else: - self.iface_name = self.address.split("%")[1] - - if ':' in self.address: - self.base_url = 'http://[%s]' % self.address - else: - self.base_url = 'http://' + self.address - - self.__peer = (self.address, self.udpport) - - def make_socket(self) -> SocketType: - """Create a socket to enable network communication.""" - - iptype = socket.AF_INET - - if ':' in self.__peer[0]: - iptype = socket.AF_INET6 - - res = socket.getaddrinfo(self.__peer[0], self.__peer[1], socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE) - family, socktype, proto, canonname, sockaddr = res[0] - new_socket = socket.socket(family, socktype, proto) - - new_socket.settimeout(5.0) - - try: - if iptype == socket.AF_INET6: - new_socket.setsockopt(socket.SOL_SOCKET, 25, 1) - - except socket.error as e: - if e.errno == 1: - logging.warning("Binding to a network interface is permitted only for root users.") - - return new_socket - - def __discover_device(self): - - if TOBII_DISCOVERY_ALLOWED == False: - logging.error("Device discovery is not available due to a missing dependency (netifaces)") - exit(1) - - MULTICAST_ADDR = 'ff02::1' - PORT = 13006 - - for i in netifaces.interfaces(): - - if netifaces.AF_INET6 in netifaces.ifaddresses(i).keys(): - - if "%" in netifaces.ifaddresses(i)[netifaces.AF_INET6][0]['addr']: - - if_name = netifaces.ifaddresses(i)[netifaces.AF_INET6][0]['addr'].split("%")[1] - if_idx = socket.getaddrinfo(MULTICAST_ADDR + "%" + if_name, PORT, socket.AF_INET6, socket.SOCK_DGRAM)[0][4][3] - - s6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - s6.settimeout(30.0) - s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, if_idx) - s6.bind(('::', PORT)) - - PORT_OUT = PORT if sys.platform == 'win32' or sys.platform == 'darwin' else PORT + 1 - - try: - - # Sending discover request - discover_json = '{"type":"discover"}' - s6.sendto(discover_json.encode('utf-8'), (MULTICAST_ADDR, PORT_OUT)) - - # Waiting for a reponse from the device ... - data, address = s6.recvfrom(1024) - jdata = json.loads(data.decode('utf-8')) - - addr = address[0] - - if sys.version_info.major == 3 and sys.version_info.minor >= 8: - addr = address[0] + '%' + if_name - - return (jdata, addr) - - except: - - # No device found on interface - pass - - return (None, None) - - def get_request(self, api_action) -> str: - """Send a GET request and get data back.""" - - url = self.base_url + api_action - res = urlopen(url).read() - - try: - data = json.loads(res.decode('utf-8')) - except json.JSONDecodeError: - data = None - - return data - - def post_request(self, api_action, data=None, wait_for_response=True) -> str: - """Send a POST request and get result back.""" - - url = self.base_url + api_action - req = Request(url) - req.add_header('Content-Type', 'application/json') - data = json.dumps(data) - - logging.debug("Sending JSON: " + str(data)) - - if wait_for_response is False: - threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start() - return None - - response = urlopen(req, data.encode('utf-8')) - res = response.read() - - logging.debug("Response: " + str(res)) - - try: - res = json.loads(res.decode('utf-8')) - except: - pass - - return res - - def send_keep_alive_msg(self, socket, msg): - """Send a message to keep socket opened.""" - - res = socket.sendto(msg.encode('utf-8'), self.__peer) - - def grab_data(self, socket) -> bytes: - """Read incoming socket data.""" - - try: - data, address = socket.recvfrom(1024) - return data - - except TimeoutError: - - logging.error("A timeout occurred while receiving data") - - def wait_for_status(self, api_action, key, values, timeout = None) -> Any: - """Wait until a status matches given values.""" - - url = self.base_url + api_action - running = True - - while running: - - req = Request(url) - req.add_header('Content-Type', 'application/json') - - try: - - response = urlopen(req, None, timeout = timeout) - - except URLError as e: - - logging.error(e.reason) - return -1 - - data = response.read() - json_data = json.loads(data.decode('utf-8')) - - if json_data[key] in values: - running = False - - time.sleep(1) - - return json_data[key] diff --git a/src/argaze/TobiiGlassesPro2/TobiiSpecifications.py b/src/argaze/TobiiGlassesPro2/TobiiSpecifications.py deleted file mode 100644 index 1b2a275..0000000 --- a/src/argaze/TobiiGlassesPro2/TobiiSpecifications.py +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env python - -"""Here is the article where from following [Tobii specifications](https://www.biorxiv.org/content/10.1101/299925v1) come.""" - -ACCURACY = 1.42 -"""Gaze position accuracy in degree.""" - -PRECISION = 0.34 # degree -"""Gaze position precision in degree.""" - -CAMERA_HFOV = 82 -"""Camera horizontal field of view in degree.""" - -VISUAL_HFOV = 160 -"""Visual horizontal field of view in degree.""" \ No newline at end of file diff --git a/src/argaze/TobiiGlassesPro2/TobiiVideo.py b/src/argaze/TobiiGlassesPro2/TobiiVideo.py deleted file mode 100644 index 1292de8..0000000 --- a/src/argaze/TobiiGlassesPro2/TobiiVideo.py +++ /dev/null @@ -1,283 +0,0 @@ -#!/usr/bin/env python - -from typing import TypeVar, Tuple -from dataclasses import dataclass, field -import threading -import uuid -import time -import copy - -from argaze.TobiiGlassesPro2 import TobiiNetworkInterface - -import cv2 as cv -import av -import numpy - -TobiiVideoFrameType = TypeVar('TobiiVideoFrame', bound="TobiiVideoFrame") -# Type definition for type annotation convenience - -AvStreamType = TypeVar('av.stream.Stream', bound="av.stream.Stream") -# Type definition for type annotation convenience - -@dataclass -class TobiiVideoFrame(): - """Define tobii video frame""" - - matrix: list - """Video frame matrix.""" - - width: int = field(init=False) - """Inferred video frame width.""" - - height: int = field(init=False) - """Inferred video frame height.""" - - def __post_init__(self): - """fill dimension attributes.""" - - self.height, self.width = self.matrix.shape[:2] - - def copy(self) -> TobiiVideoFrameType: - """Copy tobii video frame.""" - - return TobiiVideoFrame(self.matrix.copy()) - -class TobiiVideoSegment(): - """Handle Tobii Glasses Pro 2 segment video file.""" - - def __init__(self, segment_video_path, start_timestamp:int = 0, end_timestamp:int = None): - """Load segment video from segment directory""" - - self.__path = segment_video_path - - self.__container = av.open(self.__path) - self.__stream = self.__container.streams.video[0] - - self.__width = int(cv.VideoCapture(self.__path).get(cv.CAP_PROP_FRAME_WIDTH)) - self.__height = int(cv.VideoCapture(self.__path).get(cv.CAP_PROP_FRAME_HEIGHT)) - - self.__start_timestamp = start_timestamp - self.__end_timestamp = end_timestamp - - # position at the given start time - self.__container.seek(self.__start_timestamp) - - @property - def path(self) -> str: - """Get video segment path.""" - - return self.__path - - @property - def duration(self) -> int: - """Duration in microsecond.""" - - if self.__end_timestamp == None: - return int((self.__stream.duration * self.__stream.time_base) * 1e6) - self.__start_timestamp - else: - return self.__end_timestamp - self.__start_timestamp - - @property - def width(self) -> int: - """Video width dimension.""" - - return self.__width - - @property - def height(self) -> int: - """Video height dimension.""" - - return self.__height - - @property - def stream(self) -> AvStreamType: - """Video stream.""" - - return self.__stream - - def get_frame(self, i) -> Tuple[int, "TobiiVideoFrame"]: - """Access to a frame.""" - - if i < 0: - ValueError('Frame index must be a positive integer.') - - counter = 0 - frame = None - video_ts = 0 - - # position at the given start time - self.__container.seek(self.__start_timestamp) - - # start decoding - self.__container.decode(self.__stream) - - while counter <= i: - - frame = self.__container.decode(self.__stream).__next__() - video_ts = int(frame.time * 1e6) - counter += 1 - - # return micro second timestamp and frame data - return video_ts, TobiiVideoFrame(frame.to_ndarray(format='bgr24')) - - def frames(self) -> Tuple[int, "TobiiVideoFrame"]: - """Access to frame iterator.""" - - return self.__iter__() - - def __iter__(self): - - # start decoding - self.__container.decode(self.__stream) - - return self - - def __next__(self): - - frame = self.__container.decode(self.__stream).__next__() - - video_ts = int(frame.time * 1e6) - - # Ignore before start timestamp - if video_ts < self.__start_timestamp: - return self.__next__() - - # Ignore frames after end timestamp - if self.__end_timestamp != None: - - if video_ts >= self.__end_timestamp: - raise StopIteration - - # return micro second timestamp and frame data - return video_ts, TobiiVideoFrame(frame.to_ndarray(format='bgr24')) - -class TobiiVideoStream(threading.Thread): - """Capture Tobii Glasses Pro 2 video camera stream.""" - - def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface): - """Initialise video stream reception.""" - - threading.Thread.__init__(self) - threading.Thread.daemon = True - - self.__network = network_interface - self.__video_socket = self.__network.make_socket() - - self.__stop_event = threading.Event() - self.__read_lock = threading.Lock() - - self.__frame_tuple = None - - # prepare keep alive message - self.__keep_alive_msg = "{\"type\": \"live.video.unicast\",\"key\": \""+ str(uuid.uuid4()) +"_video\", \"op\": \"start\"}" - self.__keep_alive_thread = threading.Timer(0, self.__keep_alive) - self.__keep_alive_thread.daemon = True - - def __del__(self): - """Stop data reception before destruction.""" - - if self.is_alive(): - - self.close() - - def __keep_alive(self): - """Maintain connection.""" - - while not self.__stop_event.isSet(): - - self.__network.send_keep_alive_msg(self.__video_socket, self.__keep_alive_msg) - - time.sleep(1) - - def open(self): - """Start data reception.""" - - self.__keep_alive_thread.start() - threading.Thread.start(self) - - def close(self): - """Stop data reception definitively.""" - - self.__stop_event.set() - - threading.Thread.join(self.__keep_alive_thread) - threading.Thread.join(self) - - self.__video_socket.close() - - def run(self): - """Store frame for further reading.""" - - container = av.open(f'rtsp://{self.__network.get_address()}:8554/live/scene', options={'rtsp_transport': 'tcp'}) - stream = container.streams.video[0] - - for frame in container.decode(stream): - - # quit if the video acquisition thread have been stopped - if self.__stop_event.isSet(): - break - - # lock frame access - self.__read_lock.acquire() - - # store frame time, matrix into a tuple - self.__frame_tuple = (frame.time, frame.to_ndarray(format='bgr24')) - - # unlock frame access - self.__read_lock.release() - - def read(self) -> Tuple[int, "TobiiVideoFrame"]: - """Read incoming video frames.""" - - # if the video acquisition thread have been stopped or isn't started - if self.__stop_event.isSet() or self.__frame_tuple == None: - return -1, TobiiVideoFrame(numpy.zeros((1, 1, 3), numpy.uint8)) - - # lock frame access - self.__read_lock.acquire() - - # copy frame tuple - frame_tuple = copy.deepcopy(self.__frame_tuple) - - # unlock frame access - self.__read_lock.release() - - return int(frame_tuple[0] * 1e6), TobiiVideoFrame(frame_tuple[1]) - -class TobiiVideoOutput(): - """Export a video file at the same format than a given referent stream.""" - # TODO : Make a generic video managment to handle video from any device (not only Tobii) - - def __init__(self, output_video_path: str, referent_stream: av.stream.Stream): - """Create a video file""" - - self.__path = output_video_path - self.__container = av.open(self.__path, 'w') - self.__stream = self.__container.add_stream(\ - referent_stream.codec_context.name, \ - width=referent_stream.codec_context.width, \ - height=referent_stream.codec_context.height, \ - rate=referent_stream.codec_context.framerate, \ - gop_size=referent_stream.codec_context.gop_size, \ - pix_fmt=referent_stream.codec_context.pix_fmt, \ - bit_rate=referent_stream.codec_context.bit_rate) - - @property - def path(self) -> str: - """Get video file path.""" - - return self.__path - - def write(self, frame): - """Write a frame into the output video file""" - - formated_frame = av.VideoFrame.from_ndarray(frame, format='bgr24') - formated_frame.reformat(format=self.__stream.codec_context.pix_fmt, interpolation=None) - self.__container.mux(self.__stream.encode(formated_frame)) - - def close(self): - """End the writing of the video file""" - - self.__container.mux(self.__stream.encode()) - self.__container.close() - diff --git a/src/argaze/TobiiGlassesPro2/__init__.py b/src/argaze/TobiiGlassesPro2/__init__.py deleted file mode 100644 index 50fb742..0000000 --- a/src/argaze/TobiiGlassesPro2/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -.. include:: README.md -""" -__docformat__ = "restructuredtext" -__all__ = ['TobiiEntities', 'TobiiController', 'TobiiNetworkInterface', 'TobiiData', 'TobiiVideo', 'TobiiInertialMeasureUnit', 'TobiiSpecifications',] \ No newline at end of file diff --git a/src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf b/src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf deleted file mode 100644 index dfdbe0a..0000000 Binary files a/src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf and /dev/null differ diff --git a/src/argaze/TobiiGlassesPro2/utils/imu.json b/src/argaze/TobiiGlassesPro2/utils/imu.json deleted file mode 100644 index b701b00..0000000 --- a/src/argaze/TobiiGlassesPro2/utils/imu.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "gyroscope_offset": [ - 1.00506, - -3.338012, - 1.9096039999999999 - ], - "accelerometer_coefficients": [ - [ - 0.9918761478460875, - -0.29679248712760953 - ], - [ - 0.9749789492717152, - -0.15941685808576067 - ], - [ - 0.9520423758351338, - 0.23147323632416672 - ] - ] -} \ No newline at end of file diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py index 1162ba7..ef75dd2 100644 --- a/src/argaze/__init__.py +++ b/src/argaze/__init__.py @@ -2,4 +2,4 @@ .. include:: ../../README.md """ __docformat__ = "restructuredtext" -__all__ = ['ArFeatures','GazeFeatures','GazeAnalysis','ArUcoMarkers','AreaOfInterest','DataStructures','TobiiGlassesPro2','utils'] \ No newline at end of file +__all__ = ['ArFeatures','GazeFeatures','GazeAnalysis','ArUcoMarkers','AreaOfInterest','DataStructures','utils'] \ No newline at end of file diff --git a/src/argaze/utils/README.md b/src/argaze/utils/README.md index 16c85c0..ea293f6 100644 --- a/src/argaze/utils/README.md +++ b/src/argaze/utils/README.md @@ -20,77 +20,10 @@ Export a 7 columns and 5 rows calibration board made of 5cm squares with 3cm mar python ./src/argaze/utils/aruco_calibration_board_export.py 7 5 5 3 -o _export -d DICT_APRILTAG_16h5 -r 300 ``` -# Tobii calibration +# TODO: Camera calibration -Calibrate Tobii Glasses Pro 2 camera (-t IP_ADDRESS) using a 7 columns and 5 rows calibration board made of 5cm squares with 3cm markers from DICT_APRILTAG_16h5 dictionary. Then, export its optical parameters into an tobii_camera.json file: +Calibrate a network camera (-t IP_ADDRESS) using a 7 columns and 5 rows calibration board made of 5cm squares with 3cm markers from DICT_APRILTAG_16h5 dictionary. Then, export its optical parameters into an camera.json file: ``` -python ./src/argaze/utils/tobii_camera_calibrate.py 7 5 5 3 -t IP_ADDRESS -d DICT_APRILTAG_16h5 -o _export/tobii_camera.json -``` - -Calibrate Tobii Glasses Pro 2 inertial measure unit (-t IP_ADDRESS) then, export calibration parameters into an imu.json file: - -``` -python ./src/argaze/utils/tobii_imu_calibrate.py -t IP_ADDRESS -o _export/imu.json -``` - -# Tobii session - -Display Tobii Glasses Pro 2 camera video stream (-t IP_ADDRESS) with a live gaze pointer. Loading calibration file to display inertial sensors data: - -``` -python ./src/argaze/utils/tobii_stream_display.py -t IP_ADDRESS -i _export/imu.json -``` - -Record a Tobii Glasses Pro 2 'myProject' session for a 'myUser' participant on Tobii interface's SD card (-t IP_ADDRESS): - -``` -python ./src/argaze/utils/tobii_segment_record.py -t IP_ADDRESS -p myProject -u myUser -``` - -# Tobii drive - -Explore Tobii Glasses Pro 2 interface's SD Card (-d DRIVE_PATH, -p PROJECT_PATH, -r RECORDING_PATH, -s SEGMENT_PATH): - -``` -python ./src/argaze/utils/tobii_sdcard_explore.py -d DRIVE_PATH -``` - -``` -python ./src/argaze/utils/tobii_sdcard_explore.py -p PROJECT_PATH -``` - -``` -python ./src/argaze/utils/tobii_sdcard_explore.py -r RECORDING_PATH -``` - -``` -python ./src/argaze/utils/tobii_sdcard_explore.py -s SEGMENT_PATH -``` - -# Tobii post-processing - -Replay a time range selection (-r IN OUT) Tobii Glasses Pro 2 session (-s SEGMENT_PATH) synchronizing video and some data together: - -``` -python ./src/argaze/utils/tobii_segment_display.py -s SEGMENT_PATH -r IN OUT -``` - -Export Tobii segment fixations and saccades (-s SEGMENT_PATH) from a time range selection (-r IN OUT) as fixations.csv and saccades.csv files saved into the segment folder: - -``` -python ./src/argaze/utils/tobii_segment_gaze_movements_export.py -s SEGMENT_PATH -r IN OUT -``` - -# Tobii with ArUco - -Detect ArUco markers (-md MARKER_DICT -ms MARKER_SIZE) into Tobii camera video stream (-t IP_ADDRESS). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame: - -``` -python ./src/argaze/utils/tobii_stream_aruco_aoi_display.py -t IP_ADDRESS -c _export/tobii_camera.json -md MARKER_DICT -ms MARKER_SIZE -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}' -``` - -Detect ArUco markers (-md MARKER_DICT -ms MARKER_SIZE) into a Tobii camera video segment (-s SEGMENT_PATH) into a time range selection (-r IN OUT). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame. Export aoi video and data as a aruco_aoi.csv, aruco_aoi.mp4 files: -``` -python ./src/argaze/utils/tobii_segment_aruco_aoi_export.py -s SEGMENT_PATH -c _export/tobii_camera.json -md MARKER_DICT -ms MARKER_SIZE -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}' -r IN OUT -``` +python ./src/argaze/utils/camera_calibrate.py 7 5 5 3 -t IP_ADDRESS -d DICT_APRILTAG_16h5 -o _export/camera.json +``` \ No newline at end of file diff --git a/src/argaze/utils/tobii_camera_calibrate.py b/src/argaze/utils/tobii_camera_calibrate.py deleted file mode 100644 index 24cbe5c..0000000 --- a/src/argaze/utils/tobii_camera_calibrate.py +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os -import time - -from argaze.TobiiGlassesPro2 import TobiiController, TobiiVideo -from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoBoard, ArUcoDetector, ArUcoCamera - -import cv2 as cv - -def main(): - """ - Captures board pictures and finally outputs camera calibration data into a .json file. - - - Export and print a calibration board using - - Place the calibration board in order to view it entirely on screen and move the camera in many configurations (orientation and distance) : the script will automatically take pictures. Do this step with a good lighting and a clear background. - - Once enough pictures have been captured (~20), press Esc key then, wait for the camera calibration processing. - - Finally, check rms parameter: it should be between 0. and 1. if the calibration succeeded (lower is better). - - ### Reference: - - [Camera calibration using ArUco marker tutorial](https://automaticaddison.com/how-to-perform-camera-calibration-using-opencv/) - """ - - # manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('columns', metavar='COLS_NUMBER', type=int, default=7, help='number of columns') - parser.add_argument('rows', metavar='ROWS_NUMBER', type=int, default=5, help='number of rows') - parser.add_argument('square_size', metavar='SQUARE_SIZE', type=float, default=5, help='square size (cm)') - parser.add_argument('marker_size', metavar='MARKER_SIZE', type=float, default=3, help='marker size (cm)') - parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default='camera.json', help='destination filepath') - parser.add_argument('-d', '--dictionary', metavar='DICT', type=str, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL, DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)') - args = parser.parse_args() - - # Create tobii controller (with auto discovery network process if no ip argument is provided) - print("Looking for a Tobii Glasses Pro 2 device ...") - - try: - - tobii_controller = TobiiController.TobiiController(args.tobii_ip) - print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') - - except ConnectionError as e: - - print(e) - exit() - - # Setup camera at 25 fps to work on Full HD video stream - tobii_controller.set_scene_camera_freq_25() - - # Get video stream dimension - video_width = tobii_controller.get_configuration()['sys_sc_width'] - video_height = tobii_controller.get_configuration()['sys_sc_height'] - - # Print current confirguration - print(f'Tobii Glasses Pro 2 configuration:') - for key, value in tobii_controller.get_configuration().items(): - print(f'\t{key}: {value}') - - # Enable tobii video stream - tobii_video_stream = tobii_controller.enable_video_stream() - - # Create aruco camera - aruco_camera = ArUcoCamera.ArUcoCamera(dimensions=(video_width, video_height)) - - # Create aruco board - aruco_board = ArUcoBoard.ArUcoBoard(args.columns, args.rows, args.square_size, args.marker_size, args.dictionary) - - # Create aruco detecter - aruco_detector = ArUcoDetector.ArUcoDetector(dictionary=args.dictionary, marker_size=args.marker_size) - - # Start tobii glasses streaming - tobii_controller.start_streaming() - - print("Camera calibration starts") - print("Waiting for calibration board...") - - expected_markers_number = aruco_board.markers_number - expected_corners_number = aruco_board.corners_number - - # Capture loop - try: - - while tobii_video_stream.is_alive(): - - # capture frame with a full displayed board - video_ts, video_frame = tobii_video_stream.read() - - # detect all markers in the board - aruco_detector.detect_board(video_frame.matrix, aruco_board, expected_markers_number) - - # draw only markers - aruco_detector.draw_detected_markers(video_frame.matrix) - - # draw current calibration data count - cv.putText(video_frame.matrix, f'Capture: {aruco_camera.calibration_data_count}', (50, 50), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv.LINE_AA) - cv.imshow('Tobii Camera Calibration', video_frame.matrix) - - # if all board corners are detected - if aruco_detector.board_corners_number == expected_corners_number: - - # draw board corners to notify a capture is done - aruco_detector.draw_board(video_frame.matrix) - - # append data - aruco_camera.store_calibration_data(aruco_detector.board_corners, aruco_detector.board_corners_identifier) - - cv.imshow('Tobii Camera Calibration', video_frame.matrix) - - # close window using 'Esc' key - if cv.waitKey(1) == 27: - break - - # exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - # stop frame display - cv.destroyAllWindows() - - # Stop tobii glasses streaming - tobii_controller.stop_streaming() - - print('\nCalibrating camera...') - aruco_camera.calibrate(aruco_board) - - print('\nCalibration succeeded!') - print(f'\nRMS:\n{aruco_camera.rms}') - print(f'\nDimensions:\n{video_width}x{video_height}') - print(f'\nCamera matrix:\n{aruco_camera.K}') - print(f'\nDistortion coefficients:\n{aruco_camera.D}') - - aruco_camera.to_json(args.output) - - print(f'\nCalibration data exported into {args.output} file') - -if __name__ == '__main__': - - main() diff --git a/src/argaze/utils/tobii_imu_calibrate.py b/src/argaze/utils/tobii_imu_calibrate.py deleted file mode 100644 index c9e4813..0000000 --- a/src/argaze/utils/tobii_imu_calibrate.py +++ /dev/null @@ -1,214 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os -import time - -from argaze import DataStructures -from argaze.TobiiGlassesPro2 import TobiiController, TobiiInertialMeasureUnit -from argaze.utils import MiscFeatures - -import numpy -import matplotlib.pyplot as mpyplot -import matplotlib.patches as mpatches - -def main(): - """ - Calibrate Tobbi gyroscope and accelerometer sensors and finally outputs camera calibration data into a .json file. - - ### Reference: - - [Inertial Measure Unit calibration tutorial](https://makersportal.com/blog/calibration-of-an-inertial-measurement-unit-imu-with-raspberry-pi-part-ii) - """ - - # manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip') - parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath') - parser.add_argument('-n', '--sample_number', metavar='BUFFER_SIZE', type=int, default=500, help='number of samples to store into calibration buffer') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default='imu.json', help='destination filepath') - args = parser.parse_args() - - # Create tobii controller (with auto discovery network process if no ip argument is provided) - print("Looking for a Tobii Glasses Pro 2 device ...") - - try: - - tobii_controller = TobiiController.TobiiController(args.tobii_ip) - print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') - - except ConnectionError as e: - - print(e) - exit() - - # Create tobii imu handler - tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit() - - # Load optional imu calibration file - if args.imu_calibration != None: - - tobii_imu.load_calibration_file(args.imu_calibration) - - # Enable tobii data stream - tobii_data_stream = tobii_controller.enable_data_stream() - - # Menu loop - try: - - while True: - - print('-' * 52) - menu_input = input('Tobii Inertial Measure Unit sensor calibration menu:\n\t\'a\' for accelerometer calibration.\n\t\'A\' for accelerometer visualisation.\n\t\'g\' for gyroscope calibration.\n\t\'G\' for gyroscope visualisation.\n\t\'p\' print current calibration.\n\t\'s\' save calibration.\n\t\'q\' quit calibration without saving.\n>') - - match menu_input: - - case 'a': - - axis = ['X', 'Y', 'Z'] - directions = ['upward', 'downward', 'perpendicular'] - - for i, axis in enumerate(axis): - - print(f'\nACCELEROMETER {axis} AXIS CALIBRATION') - - axis_buffers = {} - - for j, direction in enumerate(directions): - - input(f'\nKeep Tobii Glasses accelerometer {axis} axis {direction} then press \'Enter\' to start data acquisition.\n') - - # Initialise progress bar - MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) - - # Capture accelerometer data stream - data_ts_buffer = DataStructures.TimeStampedBuffer() - for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number): - - # Update progress Bar - MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) - - axis_buffers[direction] = data_ts_buffer - - tobii_imu.calibrate_accelerometer_axis_coefficients(i, axis_buffers['upward'], axis_buffers['downward'], axis_buffers['perpendicular']) - - accelerometer_coefficients = tobii_imu.accelerometer_coefficients - - print(f'\n\nAccelerometer optimal linear fit coefficients over {progress} values for each axis:') - print('\tX coefficients: ', accelerometer_coefficients[0]) - print('\tY coefficients: ', accelerometer_coefficients[1]) - print('\tZ coefficients: ', accelerometer_coefficients[2]) - - case 'A': - - print('\nCAPTURE AND PLOT ACCELEROMETER STREAM') - - # Initialise progress bar - MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) - - # Capture accelerometer data stream - data_ts_buffer = DataStructures.TimeStampedBuffer() - for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number): - - # Update progress Bar - MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) - - # Edit figure - figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels - data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max - figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144) - - # Plot data - subplot = figure.add_subplot(111) - subplot.set_title('Accelerometer', loc='left') - patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Display figure - mpyplot.show() - figure.clear() - - case 'g': - - print('\nGYROSCOPE CALIBRATION') - input('Keep Tobii Glasses steady then press \'Enter\' to start data acquisition.\n') - - # Initialise progress bar - MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) - - # Capture gyroscope data stream - data_ts_buffer = DataStructures.TimeStampedBuffer() - for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number): - - # Update progress Bar - MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) - - gyroscope_offset = tobii_imu.calibrate_gyroscope_offset(data_ts_buffer) - - print(f'\n\nGyroscope average over {progress} values for each axis:') - print('\tX offset: ', gyroscope_offset[0]) - print('\tY offset: ', gyroscope_offset[1]) - print('\tZ offset: ', gyroscope_offset[2]) - - case 'G': - - print('\nCAPTURE AND PLOT GYROSCOPE STREAM') - - # Initialise progress bar - MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) - - # Capture accelerometer data stream - data_ts_buffer = DataStructures.TimeStampedBuffer() - for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number): - - # Update progress Bar - MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100) - - # Edit figure - figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels - data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max - figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144) - - # Plot data - subplot = figure.add_subplot(111) - subplot.set_title('Gyroscope', loc='left') - patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Display figure - mpyplot.show() - figure.clear() - - case 'p': - - gyroscope_offset = tobii_imu.gyroscope_offset - - print(f'\nGyroscope offset for each axis:') - print('\tX offset: ', gyroscope_offset[0]) - print('\tY offset: ', gyroscope_offset[1]) - print('\tZ offset: ', gyroscope_offset[2]) - - accelerometer_coefficients = tobii_imu.accelerometer_coefficients - - print(f'\nAccelerometer optimal linear fit coefficients for each axis:') - print('\tX coefficients: ', accelerometer_coefficients[0]) - print('\tY coefficients: ', accelerometer_coefficients[1]) - print('\tZ coefficients: ', accelerometer_coefficients[2]) - - case 's': - - tobii_imu.save_calibration_file(args.output) - print(f'\nCalibration data exported into {args.output} file') - - break - - case 'q': - - break - - # exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - -if __name__ == '__main__': - - main() diff --git a/src/argaze/utils/tobii_sdcard_explore.py b/src/argaze/utils/tobii_sdcard_explore.py deleted file mode 100644 index ed297be..0000000 --- a/src/argaze/utils/tobii_sdcard_explore.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python - -import argparse - -from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiData, TobiiVideo - -import matplotlib.pyplot as mpyplot -import matplotlib.patches as mpatches - -def main(): - """ - Explore Tobii Glasses Pro 2 interface's SD Card - """ - - # manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-d', '--drive_path', metavar='DRIVE_PATH', type=str, default=None, help='drive path') - parser.add_argument('-p', '--project_path', metavar='PROJECT_PATH', type=str, default=None, help='project path') - parser.add_argument('-r', '--recording_path', metavar='RECORDING_PATH', type=str, default=None, help='recording path') - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path') - args = parser.parse_args() - - if args.drive_path != None: - - # Load all projects from a tobii drive - tobii_drive = TobiiEntities.TobiiDrive(args.drive_path) - - for project in tobii_drive.projects(): - print(f'Project id: {project.id}, name: {project.name}') - - elif args.project_path != None: - - # Load one tobii project - tobii_project = TobiiEntities.TobiiProject(args.project_path) - - for participant in tobii_project.participants(): - print(f'Participant id: {participant.id}, name: {participant.name}') - - for recording in tobii_project.recordings(): - print(f'Recording id: {recording.id}, name: {recording.name}') - print(f'\tProject: {recording.project.name}') - print(f'\tParticipant: {recording.participant.name}') - - elif args.recording_path != None: - - # Load a tobii segment - tobii_recording = TobiiEntities.TobiiRecording(args.recording_path) - - for segment in tobii_recording.segments(): - print(f'Segment id: {segment.id}') - - elif args.segment_path != None: - - # Load a tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path) - - tobii_segment_video = tobii_segment.load_video() - print(f'Video properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') - - tobii_segment_data = tobii_segment.load_data() - - print(f'Loaded data count:') - for name in tobii_segment_data.keys(): - print(f'\t{name}: {len(tobii_segment_data[name])} data') - - # Edit figure - figure_width = min(tobii_segment_video.duration/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels - data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max - figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144) - - # Plot data - subplot = figure.add_subplot(111) - subplot.set_title('VideoTimeStamps', loc='left') - patches = tobii_segment_data['VideoTimeStamp'].plot(names=['offset','value'], colors=['#276FB6','#9427B6'], samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Display figure - mpyplot.show() - figure.clear() - -if __name__ == '__main__': - - main() \ No newline at end of file diff --git a/src/argaze/utils/tobii_segment_arscene_edit.py b/src/argaze/utils/tobii_segment_arscene_edit.py deleted file mode 100644 index b8a5745..0000000 --- a/src/argaze/utils/tobii_segment_arscene_edit.py +++ /dev/null @@ -1,381 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os -import json -import time - -from argaze import * -from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications -from argaze.ArUcoMarkers import * -from argaze.AreaOfInterest import * -from argaze.utils import MiscFeatures - -import numpy -import cv2 as cv - -def main(): - """ - Open video file with ArUco marker scene inside - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path') - parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') - parser.add_argument('-p', '--project_path', metavar='ARGAZE_PROJECT', type=str, default=None, help='json argaze project filepath') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)') - parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction) - args = parser.parse_args() - - if args.segment_path != None: - - # Manage destination path - destination_path = '.' - if args.output != None: - - if not os.path.exists(os.path.dirname(args.output)): - - os.makedirs(os.path.dirname(args.output)) - print(f'{os.path.dirname(args.output)} folder created') - - destination_path = args.output - - else: - - destination_path = args.segment_path - - # Export into a dedicated time range folder - if args.time_range[1] != None: - timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]' - else: - timerange_path = f'[all]' - - destination_path = f'{destination_path}/{timerange_path}' - - if not os.path.exists(destination_path): - - os.makedirs(destination_path) - print(f'{destination_path} folder created') - - #vs_data_filepath = f'{destination_path}/visual_scan.csv' - - # Load a tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) - - # Load a tobii segment video - tobii_segment_video = tobii_segment.load_video() - print(f'Video properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') - - # Load ar scene - ar_scene = ArScene.ArScene.from_json(args.project_path) - - print(ar_scene) - - # Display first frame - video_ts, video_frame = tobii_segment_video.get_frame(0) - cv.imshow(f'Segment {tobii_segment.id} ArUco marker editor', video_frame.matrix) - - # Init mouse interaction variables - pointer = (0, 0) - left_click = (0, 0) - right_click = (0, 0) - right_button = False - edit_trans = False # translate - edit_coord = 0 # x - - # On mouse left left_click : update pointer position - def on_mouse_event(event, x, y, flags, param): - - nonlocal pointer - nonlocal left_click - nonlocal right_click - nonlocal right_button - - # Update pointer - pointer = (x, y) - - # Update left_click - if event == cv.EVENT_LBUTTONUP: - - left_click = pointer - - # Udpate right_button - elif event == cv.EVENT_RBUTTONDOWN: - - right_button = True - - elif event == cv.EVENT_RBUTTONUP: - - right_button = False - - # Udpate right_click - if right_button: - - right_click = pointer - - cv.setMouseCallback(f'Segment {tobii_segment.id} ArUco marker editor', on_mouse_event) - - # Frame selector loop - frame_index = 0 - last_frame_index = -1 - last_frame = video_frame.copy() - force_update = False - - selected_marker_id = -1 - - try: - - while True: - - # Select a frame on change - if frame_index != last_frame_index or force_update: - - video_ts, video_frame = tobii_segment_video.get_frame(frame_index) - video_ts_ms = video_ts / 1e3 - - last_frame_index = frame_index - last_frame = video_frame.copy() - - # Hide frame left and right borders before detection to ignore markers outside focus area - cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width/6), int(video_frame.height)), (0, 0, 0), -1) - cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - 1/6)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1) - - try: - - # Estimate scene pose from ArUco markers into frame. - tvec, rmat, _ = ar_scene.estimate_pose(video_frame.matrix) - - # Catch exceptions raised by estimate_pose method - except ArScene.PoseEstimationFailed as e: - - cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - else: - - video_frame = last_frame.copy() - - # Edit fake gaze position from pointer - gaze_position = GazeFeatures.GazePosition(pointer, precision=2) - - # Copy video frame to edit visualisation on it with out disrupting aruco detection - visu_frame = video_frame.copy() - - try: - - # Project AOI scene into frame according estimated pose - aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV) - - # Catch exceptions raised by project method - except ArScene.SceneProjectionFailed as e: - - cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Draw detected markers - ar_scene.aruco_detector.draw_detected_markers(visu_frame.matrix) - - # Draw scene projection - aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255)) - - # Project 3D scene on each video frame and the visualisation frame - if len(ar_scene.aruco_detector.detected_markers) > 0: - - # Write detected marker ids - cv.putText(visu_frame.matrix, f'Detected markers: {list(ar_scene.aruco_detector.detected_markers.keys())}', (20, visu_frame.height - 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Update selected marker id by left_clicking on marker - for (marker_id, marker) in ar_scene.aruco_detector.detected_markers.items(): - - marker_aoi = marker.corners.reshape(4, 2).view(AOIFeatures.AreaOfInterest) - - if marker_aoi.contains_point(left_click): - - selected_marker_id = marker_id - - # If a marker is selected - try: - - # Retreive marker index - selected_marker = ar_scene.aruco_detector.detected_markers[selected_marker_id] - - marker_x, marker_y = selected_marker.center - ''' - if right_button: - - pointer_delta_x, pointer_delta_y = (right_click[0] - marker_x) / (visu_frame.width/3), (marker_y - right_click[1]) / (visu_frame.width/3) - - if edit_trans: - - # Edit scene rotation - if edit_coord == 0: - aoi3D_scene_edit['rotation'] = numpy.array([pointer_delta_y, aoi3D_scene_edit['rotation'][1], aoi3D_scene_edit['rotation'][2]]) - - elif edit_coord == 1: - aoi3D_scene_edit['rotation'] = numpy.array([aoi3D_scene_edit['rotation'][0], pointer_delta_x, aoi3D_scene_edit['rotation'][2]]) - - elif edit_coord == 2: - aoi3D_scene_edit['rotation'] = numpy.array([aoi3D_scene_edit['rotation'][0], aoi3D_scene_edit['rotation'][1], -1*pointer_delta_y]) - - else: - - # Edit scene translation - if edit_coord == 0: - aoi3D_scene_edit['translation'] = numpy.array([pointer_delta_x, aoi3D_scene_edit['translation'][1], aoi3D_scene_edit['translation'][2]]) - - elif edit_coord == 1: - aoi3D_scene_edit['translation'] = numpy.array([aoi3D_scene_edit['translation'][0], pointer_delta_y, aoi3D_scene_edit['translation'][2]]) - - elif edit_coord == 2: - aoi3D_scene_edit['translation'] = numpy.array([aoi3D_scene_edit['translation'][0], aoi3D_scene_edit['translation'][1], 2*pointer_delta_y]) - ''' - # Apply transformation - aoi_scene_edited = ar_scene.aoi_scene#.transform(aoi3D_scene_edit['translation'], aoi3D_scene_edit['rotation']) - - cv.rectangle(visu_frame.matrix, (0, 130), (460, 450), (127, 127, 127), -1) - ''' - # Write rotation matrix - R, _ = cv.Rodrigues(aoi3D_scene_edit['rotation']) - cv.putText(visu_frame.matrix, f'Rotation matrix:', (20, 160), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'{R[0][0]:.3f} {R[0][1]:.3f} {R[0][2]:.3f}', (40, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'{R[1][0]:.3f} {R[1][1]:.3f} {R[1][2]:.3f}', (40, 240), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'{R[2][0]:.3f} {R[2][1]:.3f} {R[2][2]:.3f}', (40, 280), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA) - - # Write translation vector - T = aoi3D_scene_edit['translation'] - cv.putText(visu_frame.matrix, f'Translation vector:', (20, 320), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'{T[0]:.3f}', (40, 360), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'{T[1]:.3f}', (40, 400), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'{T[2]:.3f}', (40, 440), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA) - ''' - # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it - # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable. - scene_projection_edited = aoi_scene_edited.project(selected_marker.translation, selected_marker.rotation, ar_scene.aruco_camera.K) - - # Draw aoi scene - scene_projection_edited.draw_raycast(visu_frame.matrix, gaze_position) - - # Write warning related to marker pose processing - except UserWarning as e: - - cv.putText(visu_frame.matrix, f'Marker {selected_marker_id}: {e}', (20, 120), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA) - - except KeyError: - - # Write error - if selected_marker_id >= 0: - cv.putText(visu_frame.matrix, f'Marker {selected_marker_id} not found', (20, 120), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA) - - # Draw focus area - cv.rectangle(visu_frame.matrix, (int(visu_frame.width/6), 0), (int(visu_frame.width*(1-1/6)), int(visu_frame.height)), (255, 150, 150), 1) - - # Draw center - cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1) - cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1) - - # Draw pointer - gaze_position.draw(visu_frame.matrix) - - # Write segment timing - cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1) - cv.putText(visu_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Write selected marker id - if selected_marker_id >= 0: - - cv.rectangle(visu_frame.matrix, (0, 50), (550, 90), (127, 127, 127), -1) - - # Select color - if edit_coord == 0: - color_axis = (0, 0, 255) - - elif edit_coord == 1: - color_axis = (0, 255, 0) - - elif edit_coord == 2: - color_axis = (255, 0, 0) - - if edit_trans: - cv.putText(visu_frame.matrix, f'Rotate marker {selected_marker_id} around axis {edit_coord + 1}', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, color_axis, 1, cv.LINE_AA) - else: - cv.putText(visu_frame.matrix, f'Translate marker {selected_marker_id} along axis {edit_coord + 1}', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, color_axis, 1, cv.LINE_AA) - - # Write documentation - else: - cv.rectangle(visu_frame.matrix, (0, 50), (650, 250), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, f'> Left click on marker: select scene', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'> T: translate, R: rotate', (20, 120), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'> Shift + 0/1/2: select axis', (20, 160), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'> Right click and drag: edit axis', (20, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'> Ctrl + S: save scene', (20, 240), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Reset left_click - left_click = (0, 0) - - if args.window: - - key_pressed = cv.waitKey(1) - - #if key_pressed != -1: - # print(key_pressed) - - # Select previous frame with left arrow - if key_pressed == 2: - frame_index -= 1 - - # Select next frame with right arrow - if key_pressed == 3: - frame_index += 1 - - # Clip frame index - if frame_index < 0: - frame_index = 0 - - # Edit rotation with r key - if key_pressed == 114: - edit_trans = True - - # Edit translation with t key - if key_pressed == 116: - edit_trans = False - - # Select coordinate to edit with Shift + 0, 1 or 2 - if key_pressed == 49 or key_pressed == 50 or key_pressed == 51: - edit_coord = key_pressed - 49 - - # Save selected marker edition using 'Ctrl + s' - if key_pressed == 19: - - if selected_marker_id >= 0 and aoi3D_scene_edit != None: - - aoi_scene_filepath = args.marker_id_scene[f'{selected_marker_id}'] - aoi_scene_edited.save(aoi_scene_filepath) - - print(f'Saving scene related to marker #{selected_marker_id} into {aoi_scene_filepath}') - - # Close window using 'Esc' key - if key_pressed == 27: - break - - # Reload detecter configuration on 'c' key - if key_pressed == 99: - load_configuration_file() - force_update = True - - # Display video - cv.imshow(f'Segment {tobii_segment.id} ArUco marker editor', visu_frame.matrix) - - # Wait 1 second - time.sleep(1) - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - # Stop frame display - cv.destroyAllWindows() - -if __name__ == '__main__': - - main() \ No newline at end of file diff --git a/src/argaze/utils/tobii_segment_arscene_export.py b/src/argaze/utils/tobii_segment_arscene_export.py deleted file mode 100644 index b2cc0e0..0000000 --- a/src/argaze/utils/tobii_segment_arscene_export.py +++ /dev/null @@ -1,306 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os, json -import math -import threading - -from argaze import * -from argaze.TobiiGlassesPro2 import * -from argaze.ArUcoMarkers import * -from argaze.AreaOfInterest import * -from argaze.utils import MiscFeatures - -import cv2 as cv -import numpy - -def main(): - """ - Detect ArUcoScene into Tobii Glasses Pro 2 camera video record. - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path') - parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') - parser.add_argument('-p', '--env_path', metavar='ENVIRONMENT_PATH', type=str, default=None, help='json argaze environment filepath') - parser.add_argument('-b', '--borders', metavar='BORDERS', type=float, default=16.666, help='define left and right borders mask (%) to not detect aruco out of these borders') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)') - parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs') - args = parser.parse_args() - - if args.segment_path != None: - - # Manage destination path - destination_path = '.' - if args.output != None: - - if not os.path.exists(os.path.dirname(args.output)): - - os.makedirs(os.path.dirname(args.output)) - print(f'{os.path.dirname(args.output)} folder created') - - destination_path = args.output - - else: - - destination_path = args.segment_path - - # Export into a dedicated time range folder - if args.time_range[1] != None: - timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]' - else: - timerange_path = f'[all]' - - destination_path = f'{destination_path}/{timerange_path}' - - if not os.path.exists(destination_path): - - os.makedirs(destination_path) - print(f'{destination_path} folder created') - - aoi_json_filepath = f'{destination_path}/aoi.json' - aoi_csv_filepath = f'{destination_path}/aoi.csv' - aoi_mp4_filepath = f'{destination_path}/aoi.mp4' - - # Load a tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) - - # Load a tobii segment video - tobii_segment_video = tobii_segment.load_video() - print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') - - # Load a tobii segment data - tobii_segment_data = tobii_segment.load_data() - - print(f'\nLoaded data count:') - for name in tobii_segment_data.keys(): - print(f'\t{name}: {len(tobii_segment_data[name])} data') - - # Access to video timestamp data buffer - tobii_ts_vts = tobii_segment_data['VideoTimeStamp'] - - # Access to timestamped gaze position data buffer - tobii_ts_gaze_positions = tobii_segment_data['GazePosition'] - - # Format tobii gaze position in pixel - ts_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - # Initialise progress bar - MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazePositions projection:', suffix = 'Complete', length = 100) - - for ts, tobii_gaze_position in tobii_ts_gaze_positions.items(): - - # Update Progress Bar - progress = ts - int(args.time_range[0] * 1e6) - MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'GazePositions projection:', suffix = 'Complete', length = 100) - - # Test gaze position validity - if tobii_gaze_position.validity == 0: - - gaze_position_px = (int(tobii_gaze_position.value[0] * tobii_segment_video.width), int(tobii_gaze_position.value[1] * tobii_segment_video.height)) - ts_gaze_positions[ts] = GazeFeatures.GazePosition(gaze_position_px) - - print('\n') - - if args.debug: - - # Prepare video exportation at the same format than segment video - output_video = TobiiVideo.TobiiVideoOutput(aoi_mp4_filepath, tobii_segment_video.stream) - - # Load ArEnvironment - ar_env = ArFeatures.ArEnvironment.from_json(args.env_path) - - if args.debug: - print(ar_env) - - # Work with first scene only - _, ar_scene = next(iter(ar_env.items())) - - # Create timestamped buffer to store AOIs and primary time stamp offset - ts_offset_aois = DataStructures.TimeStampedBuffer() - - # Video and data replay loop - try: - - # Initialise progress bar - MiscFeatures.printProgressBar(0, tobii_segment_video.duration/1e3, prefix = 'ArUco detection & AOI projection:', suffix = 'Complete', length = 100) - - # Iterate on video frames - for video_ts, video_frame in tobii_segment_video.frames(): - - # This video frame is the reference until the next frame - # Here next frame is at + 40ms (25 fps) - # TODO: Get video fps to adapt - next_video_ts = video_ts + 40000 - - # Copy video frame to edit visualisation on it without disrupting aruco detection - visu_frame = video_frame.copy() - - # Prepare to store projected AOI - projected_aois = {} - - # Process video and data frame - try: - - # Get nearest video timestamp - _, nearest_vts = tobii_ts_vts.get_last_before(video_ts) - - projected_aois['offset'] = nearest_vts.offset - - # Hide frame left and right borders before detection to ignore markers outside focus area - cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width*args.borders/100), int(video_frame.height)), (0, 0, 0), -1) - cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - args.borders/100)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1) - - # Detect aruco markers into frame - ar_env.aruco_detector.detect_markers(video_frame.matrix) - - # Estimate markers poses - ar_env.aruco_detector.estimate_markers_pose() - - # Estimate scene pose from ArUco markers into frame. - tvec, rmat, _ = ar_scene.estimate_pose(ar_env.aruco_detector.detected_markers) - - # Project AOI scene into frame according estimated pose - aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV) - - # Store all projected aoi - for aoi_name in aoi_scene_projection.keys(): - - projected_aois[aoi_name] = numpy.rint(aoi_scene_projection[aoi_name]).astype(int) - - if args.debug: - - # Draw detected markers - ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix) - - # Draw AOI - aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255)) - - # Catch exceptions raised by estimate_pose and project methods - except (ArFeatures.PoseEstimationFailed, ArFeatures.SceneProjectionFailed) as e: - - if str(e) == 'Unconsistent marker poses': - - projected_aois['error'] = str(e) + ': ' + str(e.unconsistencies) - - else: - - projected_aois['error'] = str(e) - - if args.debug: - - # Draw detected markers - ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix) - - cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Raised when timestamped buffer is empty - except KeyError as e: - - e = 'VideoTimeStamp missing' - - projected_aois['offset'] = 0 - projected_aois['error'] = e - - if args.debug: - - cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA) - - # Store projected AOI - ts_offset_aois[video_ts] = projected_aois - - if args.debug: - # Draw gaze positions until next frame - try: - - # Get next gaze position - ts_start, start_gaze_position = ts_gaze_positions.first - ts_next, next_gaze_position = ts_gaze_positions.first - - # Check next gaze position is not after next frame time - while ts_next < next_video_ts: - - ts_start, start_gaze_position = ts_gaze_positions.pop_first() - ts_next, next_gaze_position = ts_gaze_positions.first - - # Draw start gaze - start_gaze_position.draw(visu_frame.matrix) - - if start_gaze_position.valid and next_gaze_position.valid: - - # Draw movement from start to next - cv.line(visu_frame.matrix, start_gaze_position, next_gaze_position, (0, 255, 255), 1) - - if start_gaze_position.valid: - - # Write last start gaze position - cv.putText(visu_frame.matrix, str(start_gaze_position.value), start_gaze_position.value, cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Write last start gaze position timing - cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (31, 31, 31), -1) - cv.putText(visu_frame.matrix, f'Gaze time: {ts_start*1e-3:.3f} ms', (20, 85), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Empty gaze position - except IndexError: - pass - - # Draw focus area - cv.rectangle(visu_frame.matrix, (int(video_frame.width*args.borders/100.), 0), (int(visu_frame.width*(1-args.borders/100)), int(visu_frame.height)), (255, 150, 150), 1) - - # Draw center - cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1) - cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1) - - # Write segment timing - cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1) - cv.putText(visu_frame.matrix, f'Video time: {video_ts*1e-3:.3f} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - if args.debug: - - # Close window using 'Esc' key - if cv.waitKey(1) == 27: - break - - # Display visualisation - cv.imshow(f'Segment {tobii_segment.id} ArUco AOI', visu_frame.matrix) - - # Write video - output_video.write(visu_frame.matrix) - - # Update Progress Bar - progress = video_ts*1e-3 - int(args.time_range[0] * 1e3) - MiscFeatures.printProgressBar(progress, tobii_segment_video.duration*1e-3, prefix = 'ArUco detection & AOI projection:', suffix = 'Complete', length = 100) - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - if args.debug: - - # Stop frame display - cv.destroyAllWindows() - - # End output video file - output_video.close() - - # Print aruco detection metrics - print('\n\nAruco marker detection metrics') - try_count, detected_counts = ar_env.aruco_detector.detection_metrics - - for marker_id, detected_count in detected_counts.items(): - print(f'\tMarkers {marker_id} has been detected in {detected_count} / {try_count} frames ({round(100 * detected_count / try_count, 2)} %)') - - # Export aruco aoi data - ts_offset_aois.to_json(aoi_json_filepath) - ts_offset_aois.as_dataframe().to_csv(aoi_csv_filepath) - print(f'Aruco AOI data saved into {aoi_json_filepath} and {aoi_csv_filepath}') - - # Notify when the aruco aoi video has been exported - print(f'Aruco AOI video saved into {aoi_mp4_filepath}') - -if __name__ == '__main__': - - main() \ No newline at end of file diff --git a/src/argaze/utils/tobii_segment_data_plot_export.py b/src/argaze/utils/tobii_segment_data_plot_export.py deleted file mode 100644 index 69f88e3..0000000 --- a/src/argaze/utils/tobii_segment_data_plot_export.py +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os -import json - -from argaze import DataStructures -from argaze import GazeFeatures -from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo -from argaze.utils import MiscFeatures - -import pandas -import matplotlib.pyplot as mpyplot -import matplotlib.patches as mpatches - -def main(): - """ - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path') - parser.add_argument('-r', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)') - args = parser.parse_args() - - if args.segment_path != None: - - # Manage destination path - destination_path = '.' - if args.output != None: - - if not os.path.exists(os.path.dirname(args.output)): - - os.makedirs(os.path.dirname(args.output)) - print(f'{os.path.dirname(args.output)} folder created') - - destination_path = args.output - - else: - - destination_path = args.segment_path - - # Export into a dedicated time range folder - if args.time_range[1] != None: - timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]' - else: - timerange_path = f'[all]' - - destination_path = f'{destination_path}/{timerange_path}' - - if not os.path.exists(destination_path): - - os.makedirs(destination_path) - print(f'{destination_path} folder created') - - data_plots_filepath = f'{destination_path}/data_plot.svg' - - # Load a tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) - - # Load a tobii segment video - tobii_segment_video = tobii_segment.load_video() - print(f'Video properties:\n\tduration: {tobii_segment_video.duration / 1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') - - # Load a tobii segment data - tobii_segment_data = tobii_segment.load_data() - - print(f'Loaded data count:') - for name in tobii_segment_data.keys(): - print(f'\t{name}: {len(tobii_segment_data[name])} data') - - # Edit figure - figure_width = min( 4 * tobii_segment_video.duration / 1e6, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels - data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max - figure = mpyplot.figure(figsize=(figure_width, 35), dpi=144) - - # Plot pupil diameter data - subplot = figure.add_subplot(711) - subplot.set_title('Pupil diameter', loc='left') - subplot.set_ylim(0, 10) - patches = tobii_segment_data['PupilDiameter'].plot(names=['value'], colors=['#FFD800'], samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Annotate events - df_ts_events = tobii_segment_data['Event'].as_dataframe() - - if len(df_ts_events) > 0: - - for ts, event_type, event_tag in zip(df_ts_events.index, df_ts_events.type, df_ts_events.tag): - subplot.annotate(f'{event_type}\n{event_tag}', xy=(ts, 7), horizontalalignment="left", verticalalignment="top") - subplot.vlines(ts, 0, 6, color="tab:red", linewidth=1) - - # Plot pupil center data - subplot = figure.add_subplot(712) - subplot.set_title('Pupil center', loc='left') - subplot.set_ylim(-40, -20) - patches = tobii_segment_data['PupilCenter'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Plot gaze position data - subplot = figure.add_subplot(713) - subplot.set_title('Gaze position', loc='left') - subplot.set_ylim(0., 1.) - patches = tobii_segment_data['GazePosition'].plot(names=['x','y'], colors=['#276FB6','#9427B6'], split={'value':['x','y']}, samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Plot gaze direction data - subplot = figure.add_subplot(714) - subplot.set_title('Gaze direction', loc='left') - patches = tobii_segment_data['GazeDirection'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Plot gaze direction data - subplot = figure.add_subplot(715) - subplot.set_title('Gaze position 3D', loc='left') - patches = tobii_segment_data['GazePosition3D'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Plot accelerometer data - subplot = figure.add_subplot(716) - subplot.set_title('Accelerometer', loc='left') - patches = tobii_segment_data['Accelerometer'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Plot accelerometer data - subplot = figure.add_subplot(717) - subplot.set_title('Gyroscope', loc='left') - patches = tobii_segment_data['Gyroscope'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample) - subplot.legend(handles=patches, loc='upper left') - - # Export figure - mpyplot.tight_layout() - mpyplot.savefig(data_plots_filepath) - mpyplot.close('all') - - print(f'\nData plots saved into {data_plots_filepath}') - -if __name__ == '__main__': - - main() diff --git a/src/argaze/utils/tobii_segment_display.py b/src/argaze/utils/tobii_segment_display.py deleted file mode 100644 index 2e0dab5..0000000 --- a/src/argaze/utils/tobii_segment_display.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env python - -import argparse - -from argaze import GazeFeatures -from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiData, TobiiSpecifications -from argaze.utils import MiscFeatures - -import numpy - -import cv2 as cv - -def main(): - """ - Display Tobii segment video and data - """ - - # manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path') - parser.add_argument('-r', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') - args = parser.parse_args() - - if args.segment_path != None: - - # Load a tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) - - # Load a tobii segment video - tobii_segment_video = tobii_segment.load_video() - print(f'Video properties:\n\tduration: {tobii_segment_video.duration / 1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') - - # Load a tobii segment data - tobii_segment_data = tobii_segment.load_data() - - print(f'Loaded data count:') - for name in tobii_segment_data.keys(): - print(f'\t{name}: {len(tobii_segment_data[name])} data') - - # Access to timestamped gaze position data buffer - tobii_ts_gaze_positions = tobii_segment_data['GazePosition'] - - # Access to timestamped gaze position 3d data buffer - tobii_ts_gaze_positions_3d = tobii_segment_data['GazePosition3D'] - - # Access to timestamped head rotations data buffer - tobii_ts_head_rotations = tobii_segment_data['Gyroscope'] - - # Access to timestamped events data buffer - tobii_ts_events = tobii_segment_data['Event'] - - # Video and data replay loop - try: - - # Initialise progress bar - MiscFeatures.printProgressBar(0, tobii_segment_video.duration / 1e3, prefix = 'Video progression:', suffix = 'Complete', length = 100) - - # Iterate on video frames - for video_ts, video_frame in tobii_segment_video.frames(): - - video_ts_ms = video_ts / 1e3 - - try: - - # Get nearest head rotation before video timestamp and remove all head rotations before - _, nearest_head_rotation = tobii_ts_head_rotations.pop_last_before(video_ts) - - # Calculate head movement considering only head yaw and pitch - head_movement = numpy.array(nearest_head_rotation.value) - head_movement_px = head_movement.astype(int) - head_movement_norm = numpy.linalg.norm(head_movement[0:2]) - - # Draw movement vector - cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_movement_px[1], int(video_frame.height/2) - head_movement_px[0]), (150, 150, 150), 3) - - # Wait for head rotation - except KeyError: - pass - - try: - - # Get nearest gaze position before video timestamp and remove all gaze positions before - _, nearest_gaze_position = tobii_ts_gaze_positions.pop_last_before(video_ts) - - # Ignore frame when gaze position is not valid - if nearest_gaze_position.validity == 0: - - gaze_position_pixel = GazeFeatures.GazePosition( (int(nearest_gaze_position.value[0] * video_frame.width), int(nearest_gaze_position.value[1] * video_frame.height)) ) - - # Get nearest gaze position 3D before video timestamp and remove all gaze positions before - _, nearest_gaze_position_3d = tobii_ts_gaze_positions_3d.pop_last_before(video_ts) - - # Ignore frame when gaze position 3D is not valid - if nearest_gaze_position_3d.validity == 0: - - gaze_precision_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.PRECISION)) * nearest_gaze_position_3d.value[2] - tobii_camera_hfov_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV / 2)) * nearest_gaze_position_3d.value[2] - - gaze_position_pixel.precision = round(video_frame.width * float(gaze_precision_mm) / float(tobii_camera_hfov_mm)) - - # Draw gaze - gaze_position_pixel.draw(video_frame.matrix) - - # Wait for gaze position - except KeyError: - pass - - try: - - # Get nearest event before video timestamp and remove all gaze positions before - nearest_event_ts, nearest_event = tobii_ts_events.pop_last_before(video_ts) - - #print(nearest_event_ts / 1e3, nearest_event) - - # Write events - cv.rectangle(video_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1) - cv.putText(video_frame.matrix, str(nearest_event), (20, 140), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Wait for events - except KeyError: - pass - - # Draw center - cv.line(video_frame.matrix, (int(video_frame.width/2) - 50, int(video_frame.height/2)), (int(video_frame.width/2) + 50, int(video_frame.height/2)), (255, 150, 150), 1) - cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2) - 50), (int(video_frame.width/2), int(video_frame.height/2) + 50), (255, 150, 150), 1) - - # Write segment timing - cv.rectangle(video_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1) - cv.putText(video_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Close window using 'Esc' key - if cv.waitKey(1) == 27: - break - - cv.imshow(f'Segment {tobii_segment.id} video', video_frame.matrix) - - # Update Progress Bar - progress = video_ts_ms - int(args.time_range[0] * 1e3) - MiscFeatures.printProgressBar(progress, tobii_segment_video.duration / 1e3, prefix = 'Video progression:', suffix = 'Complete', length = 100) - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - # Stop frame display - cv.destroyAllWindows() - -if __name__ == '__main__': - - main() \ No newline at end of file diff --git a/src/argaze/utils/tobii_segment_gaze_metrics_export.py b/src/argaze/utils/tobii_segment_gaze_metrics_export.py deleted file mode 100644 index 1e530e0..0000000 --- a/src/argaze/utils/tobii_segment_gaze_metrics_export.py +++ /dev/null @@ -1,242 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os -import math - -from argaze import DataStructures, GazeFeatures -from argaze.AreaOfInterest import AOIFeatures -from argaze.GazeAnalysis import DispersionBasedGazeMovementIdentifier -from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications -from argaze.utils import MiscFeatures - -import cv2 as cv -import numpy -import pandas - -def main(): - """ - Analyse fixations and saccades - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='path to a tobii segment folder', required=True) - parser.add_argument('-a', '--aoi', metavar='AOI_NAME', type=str, default=None, help='aoi name where to project gaze', required=True) - parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') - parser.add_argument('-p', '--period', metavar=('PERIOD_TIME'), type=float, default=10, help='period of time (in second)') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)') - args = parser.parse_args() - - # Manage destination path - destination_path = '.' - if args.output != None: - - if not os.path.exists(os.path.dirname(args.output)): - - os.makedirs(os.path.dirname(args.output)) - print(f'{os.path.dirname(args.output)} folder created') - - destination_path = args.output - - else: - - destination_path = args.segment_path - - # Export into a dedicated time range folder - if args.time_range[1] != None: - timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]' - else: - timerange_path = f'[all]' - - destination_path = f'{destination_path}/{timerange_path}/{args.aoi}' - - if not os.path.exists(destination_path): - - os.makedirs(destination_path) - print(f'{destination_path} folder created') - - positions_json_filepath = f'{destination_path}/gaze_positions.json' - - fixations_json_filepath = f'{destination_path}/gaze_fixations.json' - saccades_json_filepath = f'{destination_path}/gaze_saccades.json' - gaze_status_json_filepath = f'{destination_path}/gaze_status.json' - - gaze_metrics_period_filepath = f'{destination_path}/gaze_metrics_{int(args.period)}s.csv' - gaze_metrics_whole_filepath = f'{destination_path}/gaze_metrics.csv' - - # Load gaze positions - ts_gaze_positions = GazeFeatures.TimeStampedGazePositions.from_json(positions_json_filepath) - - # Load gaze movements - ts_fixations = GazeFeatures.TimeStampedGazeMovements.from_json(fixations_json_filepath) - ts_saccades = GazeFeatures.TimeStampedGazeMovements.from_json(saccades_json_filepath) - ts_status = GazeFeatures.TimeStampedGazeStatus.from_json(gaze_status_json_filepath) - - print(f'\nLoaded gaze movements count:') - print(f'\tFixations: {len(ts_fixations)}') - print(f'\tSaccades: {len(ts_saccades)}') - - # Load tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) - - # Get participant name - participant_name = TobiiEntities.TobiiParticipant(f'{args.segment_path}/../../').name - - print(f'\nParticipant: {participant_name}') - - # Load a tobii segment video - tobii_segment_video = tobii_segment.load_video() - print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration * 1e-6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') - - # Prepare gaze metrics - ts_metrics = DataStructures.TimeStampedBuffer() - - positions_exist = len(ts_gaze_positions) > 0 - fixations_exist = len(ts_fixations) > 0 - saccades_exist = len(ts_saccades) > 0 - status_exist = len(ts_status) > 0 - - if positions_exist: - - # Create pandas dataframe - positions_dataframe = ts_gaze_positions.as_dataframe() - - # Reset time range offset - positions_dataframe.index = positions_dataframe.index - positions_dataframe.index[0] - - if fixations_exist: - - # Create pandas dataframe - fixations_dataframe = ts_fixations.as_dataframe() - - # Reset time range offset - fixations_dataframe.index = fixations_dataframe.index - fixations_dataframe.index[0] - - # Add 'end' column - fixations_dataframe['end'] = fixations_dataframe.index + fixations_dataframe.duration - - if saccades_exist: - - # Create pandas dataframe - saccades_dataframe = ts_saccades.as_dataframe() - - # Reset time range offset - saccades_dataframe.index = saccades_dataframe.index - saccades_dataframe.index[0] - - # Add 'end' column - saccades_dataframe['end'] = saccades_dataframe.index + saccades_dataframe.duration - - # Define a function to export metrics for a period of time - def metrics_for_period(period_start_ts, period_end_ts): - - period_duration = period_end_ts - period_start_ts - period_metrics = {} - - #print(f'\n*** Anaysing period n°{i} [{period_start_ts * 1e-6:.3f}s, {period_end_ts * 1e-6:.3f}s]') - - # Store period duration - period_metrics['duration (ms)'] = period_duration * 1e-3 - - # Default positions analysis - period_metrics['positions_number'] = 0 - period_metrics['positions_valid_ratio (%)'] = None - - # Analyse fixations - if positions_exist: - - # Select period - positions_period_dataframe = positions_dataframe[(positions_dataframe.index >= period_start_ts) & (positions_dataframe.index < period_end_ts)] - - if not positions_period_dataframe.empty: - - #print('\n* Positions:\n', positions_period_dataframe) - - period_metrics['positions_number'] = positions_period_dataframe.shape[0] - period_metrics['positions_valid_ratio (%)'] = positions_period_dataframe.precision.count() / positions_period_dataframe.shape[0] * 100 - - # Default fixation analysis - fixations_duration_sum = 0.0 - period_metrics['fixations_number'] = 0 - period_metrics['fixations_duration_mean (ms)'] = None - period_metrics['fixations_duration_sum (ms)'] = None - period_metrics['fixations_duration_ratio (%)'] = None - period_metrics['fixations_deviation_mean (px)'] = None - - # Analyse fixations - if fixations_exist: - - # Select period - fixations_period_dataframe = fixations_dataframe[(fixations_dataframe.index >= period_start_ts) & (fixations_dataframe.end < period_end_ts)] - - if not fixations_period_dataframe.empty: - - #print('\n* Fixations:\n', fixations_period_dataframe) - - fixations_duration_sum = fixations_period_dataframe.duration.sum() - period_metrics['fixations_number'] = fixations_period_dataframe.shape[0] - period_metrics['fixations_duration_mean (ms)'] = fixations_period_dataframe.duration.mean() * 1e-3 - period_metrics['fixations_duration_sum (ms)'] = fixations_duration_sum * 1e-3 - period_metrics['fixations_duration_ratio (%)'] = fixations_duration_sum / period_duration * 100 - period_metrics['fixations_deviation_mean (px)'] = fixations_period_dataframe.deviation_max.mean() - - # Default saccades analysis - saccades_duration_sum = 0.0 - period_metrics['saccades_number'] = 0 - period_metrics['saccades_duration_mean (ms)'] = None - period_metrics['saccades_duration_sum (ms)'] = None - period_metrics['saccades_duration_ratio (%)'] = None - period_metrics['saccades_distance_mean (px)'] = None - - # Analyse saccades - if saccades_exist: - - # Select period - saccades_period_dataframe = saccades_dataframe[(saccades_dataframe.index >= period_start_ts) & (saccades_dataframe.end < period_end_ts)] - - if not saccades_period_dataframe.empty: - - #print('\n* Saccades:\n', saccades_period_dataframe) - - saccades_duration_sum = saccades_period_dataframe.duration.sum() - period_metrics['saccades_number'] = saccades_period_dataframe.shape[0] - period_metrics['saccades_duration_mean (ms)'] = saccades_period_dataframe.duration.mean() * 1e-3 - period_metrics['saccades_duration_sum (ms)'] = saccades_duration_sum * 1e-3 - period_metrics['saccades_duration_ratio (%)'] = saccades_duration_sum / period_duration * 100 - period_metrics['saccades_distance_mean (px)'] = saccades_period_dataframe.distance.mean() - - # Analyse exploit/explore - if saccades_duration_sum != 0.0: - - period_metrics['exploit_explore_ratio'] = fixations_duration_sum / saccades_duration_sum - - else: - - period_metrics['exploit_explore_ratio'] = None - - # Append period metrics - ts_metrics[int(period_start_ts * 1e-3)] = period_metrics - - # Metrics for each period - for i in range(0, int(tobii_segment_video.duration/(args.period * 1e6))): - - period_start_ts = i*(args.period * 1e6) - period_end_ts = (i+1)*(args.period * 1e6) - - metrics_for_period(period_start_ts, period_end_ts) - - metrics_dataframe = ts_metrics.as_dataframe() #pandas.DataFrame(metrics, index=[participant_name]) - metrics_dataframe.to_csv(gaze_metrics_period_filepath, index=True) - print(f'\nGaze metrics per period of time saved into {gaze_metrics_period_filepath}') - - # Metrics for the whole session - ts_metrics = DataStructures.TimeStampedBuffer() - metrics_for_period(0, tobii_segment_video.duration) - - metrics_dataframe = ts_metrics.as_dataframe() #pandas.DataFrame(metrics, index=[participant_name]) - metrics_dataframe.to_csv(gaze_metrics_whole_filepath, index=True) - print(f'\nGaze metrics for whole segment saved into {gaze_metrics_whole_filepath}\n') - -if __name__ == '__main__': - - main() diff --git a/src/argaze/utils/tobii_segment_gaze_movements_export.py b/src/argaze/utils/tobii_segment_gaze_movements_export.py deleted file mode 100644 index 85fe74b..0000000 --- a/src/argaze/utils/tobii_segment_gaze_movements_export.py +++ /dev/null @@ -1,570 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os -import math - -from argaze import DataStructures, GazeFeatures -from argaze.AreaOfInterest import AOIFeatures -from argaze.GazeAnalysis import DispersionBasedGazeMovementIdentifier -from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications -from argaze.utils import MiscFeatures - -import cv2 as cv -import numpy -import pandas - -def main(): - """ - Project gaze positions into an AOI and identify particular gaze movements like fixations and saccades - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='path to a tobii segment folder', required=True) - parser.add_argument('-a', '--aoi', metavar='AOI_NAME', type=str, default=None, help='aoi name where to project gaze', required=True) - parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') - parser.add_argument('-dev', '--deviation_max_threshold', metavar='DISPERSION_THRESHOLD', type=int, default=None, help='maximal distance for fixation identification in pixel') - parser.add_argument('-dmin', '--duration_min_threshold', metavar='DURATION_MIN_THRESHOLD', type=int, default=200, help='minimal duration for fixation identification in millisecond') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)') - parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs') - args = parser.parse_args() - - # Manage destination path - destination_path = '.' - if args.output != None: - - if not os.path.exists(os.path.dirname(args.output)): - - os.makedirs(os.path.dirname(args.output)) - print(f'{os.path.dirname(args.output)} folder created') - - destination_path = args.output - - else: - - destination_path = args.segment_path - - # Export into a dedicated time range folder - if args.time_range[1] != None: - timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]' - else: - timerange_path = f'[all]' - - destination_path = f'{destination_path}/{timerange_path}/{args.aoi}' - - if not os.path.exists(destination_path): - - os.makedirs(destination_path) - print(f'{destination_path} folder created') - - aoi_filepath = f'{destination_path}/../aoi.json' - - positions_json_filepath = f'{destination_path}/gaze_positions.json' - - fixations_json_filepath = f'{destination_path}/gaze_fixations.json' - saccades_json_filepath = f'{destination_path}/gaze_saccades.json' - movements_json_filepath = f'{destination_path}/gaze_movements.json' - gaze_status_json_filepath = f'{destination_path}/gaze_status.json' - - gaze_status_video_filepath = f'{destination_path}/gaze_status.mp4' - gaze_status_image_filepath = f'{destination_path}/gaze_status.png' - - # Load aoi scene projection - ts_aois_projections = DataStructures.TimeStampedBuffer.from_json(aoi_filepath) - - print(f'\nAOI frames: ', len(ts_aois_projections)) - aoi_names = ts_aois_projections.as_dataframe().drop(['offset','error'], axis=1).columns - for aoi_name in aoi_names: - print(f'\t{aoi_name}') - - # Load tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) - - # Get participant name - participant_name = TobiiEntities.TobiiParticipant(f'{args.segment_path}/../../').name - - print(f'\nParticipant: {participant_name}') - - # Load a tobii segment video - tobii_segment_video = tobii_segment.load_video() - print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') - - # Check that gaze positions have already been exported to not process them again - if os.path.exists(positions_json_filepath): - - # Load gaze positions - ts_gaze_positions = GazeFeatures.TimeStampedGazePositions.from_json(positions_json_filepath) - - print(f'\nLoaded gaze positions count:') - print(f'\tPositions: {len(ts_gaze_positions)}') - - invalid_gaze_position_count = 0 - inner_precisions_px = [] - - for ts, gaze_position in ts_gaze_positions.items(): - - if not gaze_position.valid: - - invalid_gaze_position_count += 1 - - else: - - inner_precisions_px.append(gaze_position.precision) - - print(f'\tInvalid positions: {invalid_gaze_position_count}/{len(ts_gaze_positions)} ({100*invalid_gaze_position_count/len(ts_gaze_positions):.2f} %)') - - inner_precision_px_mean = round(numpy.mean(inner_precisions_px)) - print(f'\tMean of projected precisions: {inner_precision_px_mean} px') - - # Project gaze positions into the selected AOI - else: - - # Load a tobii segment data - tobii_segment_data = tobii_segment.load_data() - - print(f'\nLoaded data count:') - for name in tobii_segment_data.keys(): - print(f'\t{name}: {len(tobii_segment_data[name])} data') - - # Access to timestamped gaze position data buffer - tobii_ts_gaze_positions = tobii_segment_data['GazePosition'] - - # Access to timestamped gaze 3D positions data buffer - tobii_ts_gaze_positions_3d = tobii_segment_data['GazePosition3D'] - - # Format tobii gaze position and precision in pixel and project it in aoi scene - ts_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - # Gaze projection metrics - ts_projection_metrics = DataStructures.TimeStampedBuffer() - invalid_gaze_position_count = 0 - inner_precisions_px = [] - - # Starting with no AOI projection - ts_current_aoi = 0 - current_aoi = AOIFeatures.AreaOfInterest() - - # Initialise progress bar - MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazePositions projection:', suffix = 'Complete', length = 100) - - for ts, tobii_gaze_position in tobii_ts_gaze_positions.items(): - - # Update Progress Bar - progress = ts - int(args.time_range[0] * 1e6) - MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'GazePositions projection:', suffix = 'Complete', length = 100) - - # Edit default aoi error - current_aoi_error = 'No available AOI projection' - - try: - - # Get the last aoi projection until the current gaze position timestamp - ts_current_aois, current_aois = ts_aois_projections.pop_last_until(ts) - - assert(ts_current_aois <= ts) - - # Catch aoi error to not update current aoi - if 'error' in current_aois.keys(): - - # Remove extra error info after ':' - current_aoi_error = current_aois.pop('error').split(':')[0] - - # Or update current aoi - elif args.aoi in current_aois.keys(): - - ts_current_aoi = ts_current_aois - current_aoi = AOIFeatures.AreaOfInterest(current_aois.pop(args.aoi)) - - current_aoi_error = '' - - # No aoi projection at the beginning - except KeyError as e: - pass - - # Wait for available aoi - if current_aoi.empty: - - ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(current_aoi_error) - invalid_gaze_position_count += 1 - continue - - # QUESTION: What todo if the current aoi is too old ? - # if the aoi didn't move it is not a problem... - # For the moment, we avoid 1s old aoi and we provide a metric to assess the problem - ts_difference = ts - ts_current_aoi - - # If aoi is not updated after the - if ts_difference >= args.duration_min_threshold*1e3: - - current_aoi = AOIFeatures.AreaOfInterest() - ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition('AOI projection is too old (> 1s)') - invalid_gaze_position_count += 1 - continue - - ts_projection_metrics[ts] = {'frame': ts_current_aois, 'age': ts_difference} - - # Test gaze position validity - if tobii_gaze_position.validity == 0: - - gaze_position_px = (int(tobii_gaze_position.value[0] * tobii_segment_video.width), int(tobii_gaze_position.value[1] * tobii_segment_video.height)) - - # Get gaze position 3D at same gaze position timestamp - tobii_gaze_position_3d = tobii_ts_gaze_positions_3d.pop(ts) - - # Test gaze position 3d validity - if tobii_gaze_position_3d.validity == 0: - - gaze_precision_mm = numpy.sin(numpy.deg2rad(TobiiSpecifications.PRECISION)) * tobii_gaze_position_3d.value[2] - tobii_camera_hfov_mm = numpy.sin(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV)) * tobii_gaze_position_3d.value[2] - - gaze_precision_px = round(tobii_segment_video.width * float(gaze_precision_mm) / float(tobii_camera_hfov_mm)) - - # Edit gaze position - gaze_position = GazeFeatures.GazePosition(gaze_position_px) - - # Project gaze position into selected aois - if current_aoi.contains_point(gaze_position.value): - - inner_x, inner_y = current_aoi.inner_axis(gaze_position.value) - inner_precision_px = gaze_precision_px * tobii_segment_video.width * tobii_segment_video.height / current_aoi.area - - # Store inner precision for metrics - inner_precisions_px.append(inner_precision_px) - - # Store inner gaze position for further movement processing - # TEMP: 1920x1080 are Screen_Plan dimensions - ts_gaze_positions[ts] = GazeFeatures.GazePosition((round(inner_x*1920), round((1.0 - inner_y)*1080)), precision=inner_precision_px) - - else: - - ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(f'GazePosition not inside {args.aoi}') - invalid_gaze_position_count += 1 - - else: - - ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(f'Invalid Tobii GazePosition3D') - invalid_gaze_position_count += 1 - - else: - - ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(f'Invalid Tobii GazePosition') - invalid_gaze_position_count += 1 - - print(f'\nGazePositions projection metrics:') - - print(f'\tInvalid positions: {invalid_gaze_position_count}/{len(tobii_ts_gaze_positions)} ({100*invalid_gaze_position_count/len(tobii_ts_gaze_positions):.2f} %)') - - if len(ts_projection_metrics): - - projection_metrics_dataframe = ts_projection_metrics.as_dataframe() - print(f'\tAOI age mean: {projection_metrics_dataframe.age.mean() * 1e-3:.3f} ms') - print(f'\tAOI age max: {projection_metrics_dataframe.age.max() * 1e-3:.3f} ms') - - inner_precision_px_mean = round(numpy.mean(inner_precisions_px)) - print(f'\tMean of projected precisions: {inner_precision_px_mean} px') - - else: - - print(print(f'\t no AOI projected')) - - ts_gaze_positions.to_json(positions_json_filepath) - print(f'\nProjected gaze positions saved into {positions_json_filepath}') - - print(f'\nGazeMovement identifier setup:') - - if args.deviation_max_threshold == None: - - selected_deviation_max_threshold = inner_precision_px_mean - print(f'\tDispersion threshold: {selected_deviation_max_threshold} px (equal to mean of projected precisions)') - - else: - - selected_deviation_max_threshold = args.deviation_max_threshold - print(f'\tDispersion threshold: {selected_deviation_max_threshold} px') - - print(f'\tDuration threshold: {args.duration_min_threshold} ms') - - movement_identifier = DispersionBasedGazeMovementIdentifier.GazeMovementIdentifier(selected_deviation_max_threshold, args.duration_min_threshold*1e3) - - # Start movement identification - ts_fixations = GazeFeatures.TimeStampedGazeMovements() - ts_saccades = GazeFeatures.TimeStampedGazeMovements() - ts_status = GazeFeatures.TimeStampedGazeStatus() - - # Initialise progress bar - MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazeMovements identification:', suffix = 'Complete', length = 100) - - for ts, gaze_position in ts_gaze_positions.items(): - - gaze_movement = movement_identifier.identify(ts, gaze_position) - - if isinstance(gaze_movement, DispersionBasedGazeMovementIdentifier.Fixation): - - start_ts, start_position = gaze_movement.positions.first - - ts_fixations[start_ts] = gaze_movement - - for ts, position in gaze_movement.positions.items(): - - ts_status[ts] = GazeFeatures.GazeStatus.from_position(position, 'Fixation', len(ts_fixations)) - - elif isinstance(gaze_movement, DispersionBasedGazeMovementIdentifier.Saccade): - - start_ts, start_position = gaze_movement.positions.first - - ts_saccades[start_ts] = gaze_movement - - for ts, position in gaze_movement.positions.items(): - - ts_status[ts] = GazeFeatures.GazeStatus.from_position(position, 'Saccade', len(ts_saccades)) - - # Update Progress Bar - progress = ts - int(args.time_range[0] * 1e6) - MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze movements identification:', suffix = 'Complete', length = 100) - - print(f'\nGazeMovements identification metrics:') - print(f'\t{len(ts_fixations)} fixations found') - print(f'\t{len(ts_saccades)} saccades found') - - ts_fixations.to_json(fixations_json_filepath) - print(f'\nGaze fixations saved into {fixations_json_filepath}') - - ts_saccades.to_json(saccades_json_filepath) - print(f'Gaze saccades saved into {saccades_json_filepath}') - - ts_status.to_json(gaze_status_json_filepath) - print(f'Gaze status saved into {gaze_status_json_filepath}') - - # DEBUG - ts_status.as_dataframe().to_csv(f'{destination_path}/gaze_status.csv') - - # Edit data visualisation - if args.debug: - - # Prepare video exportation at the same format than segment video - output_video = TobiiVideo.TobiiVideoOutput(gaze_status_video_filepath, tobii_segment_video.stream) - - # Reload aoi scene projection - ts_aois_projections = DataStructures.TimeStampedBuffer.from_json(aoi_filepath) - - # Prepare gaze satus image - gaze_status_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8) - - # Video loop - try: - - # Initialise progress bar - MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGaze status video processing:', suffix = 'Complete', length = 100) - - fixations_exist = len(ts_fixations) > 0 - saccades_exist = len(ts_saccades) > 0 - status_exist = len(ts_status) > 0 - - if fixations_exist: - current_fixation_ts, current_fixation = ts_fixations.pop_first() - current_fixation_time_counter = 0 - - if saccades_exist: - current_saccade_ts, current_saccade = ts_saccades.pop_first() - - # Iterate on video frames - for video_ts, video_frame in tobii_segment_video.frames(): - - # This video frame is the reference until the next frame - # Here next frame is at + 40ms (25 fps) - # TODO: Get video fps to adapt - next_video_ts = video_ts + 40000 - - visu_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8) - - try: - - # Get current aoi projection at video frame time - ts_current_aois, current_aois = ts_aois_projections.pop_first() - - assert(ts_current_aois == video_ts) - - # Catch aoi error to not update current aoi - if 'error' in current_aois.keys(): - - # Display error (remove extra info after ':') - current_aoi_error = current_aois.pop('error').split(':')[0] - - # Select color error - if current_aoi_error == 'VideoTimeStamp missing': - color_error = (0, 0, 255) - else: - color_error = (0, 255, 255) - - cv.rectangle(visu_matrix, (0, 100), (550, 150), (127, 127, 127), -1) - cv.putText(visu_matrix, current_aoi_error, (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA) - - # Or update current aoi - elif args.aoi in current_aois.keys(): - - ts_current_aoi = ts_current_aois - current_aoi = AOIFeatures.AreaOfInterest(current_aois.pop(args.aoi)) - - # Apply perspective transform algorithm - destination = numpy.float32([[0, 1080],[1920, 1080],[1920, 0],[0, 0]]) - aoi_matrix = cv.getPerspectiveTransform(current_aoi.astype(numpy.float32), destination) - visu_matrix = cv.warpPerspective(video_frame.matrix, aoi_matrix, (1920, 1080)) - - # Wait for aois projection - except KeyError: - pass - - if fixations_exist: - - # Check next fixation - if video_ts >= current_fixation_ts + current_fixation.duration and len(ts_fixations) > 0: - - current_fixation_ts, current_fixation = ts_fixations.pop_first() - current_fixation_time_counter = 0 - - # While current time belongs to the current fixation - if video_ts >= current_fixation_ts and video_ts < current_fixation_ts + current_fixation.duration: - - current_fixation_time_counter += 1 - - # Draw current fixation - cv.circle(visu_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 255, 0), current_fixation_time_counter) - cv.circle(gaze_status_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 155, 0)) - - if saccades_exist: - - # Check next saccade - if video_ts >= current_saccade_ts + current_saccade.duration and len(ts_saccades) > 0: - - current_saccade_ts, current_saccade = ts_saccades.pop_first() - - # While current time belongs to the current saccade - if video_ts >= current_saccade_ts and video_ts < current_saccade_ts + current_saccade.duration: - pass - - # Draw gaze status until next frame - try: - - # Get next gaze status - ts_start, start_gaze_status = ts_status.first - ts_next, next_gaze_status = ts_status.first - - # Check next gaze status is not after next frame time - while ts_next < next_video_ts: - - ts_start, start_gaze_status = ts_status.pop_first() - ts_next, next_gaze_status = ts_status.first - - # Draw movement type - if start_gaze_status.valid and next_gaze_status.valid \ - and start_gaze_status.movement_index == next_gaze_status.movement_index \ - and start_gaze_status.movement_type == next_gaze_status.movement_type: - - if next_gaze_status.movement_type == 'Fixation': - movement_color = (0, 255, 0) - elif next_gaze_status.movement_type == 'Saccade': - movement_color = (0, 0, 255) - else: - movement_color = (255, 0, 0) - - cv.line(visu_matrix, start_gaze_status, next_gaze_status, movement_color, 3) - cv.line(gaze_status_matrix, start_gaze_status, next_gaze_status, movement_color, 3) - - # Empty gaze position - except IndexError: - pass - - # Draw gaze positions until next frame - try: - - # Get next gaze position - ts_start, start_gaze_position = ts_gaze_positions.first - ts_next, next_gaze_position = ts_gaze_positions.first - - # Gaze position count - gaze_position_count = 0 - - # Check next gaze position is not after next frame time - while ts_next < next_video_ts: - - ts_start, start_gaze_position = ts_gaze_positions.pop_first() - ts_next, next_gaze_position = ts_gaze_positions.first - - if not start_gaze_position.valid: - - # Select color error - if start_gaze_position.message == 'VideoTimeStamp missing': - color_error = (0, 0, 255) - else: - color_error = (0, 255, 255) - - # Write unvalid error message - cv.putText(visu_matrix, f'{ts_start*1e-3:.3f} ms: {start_gaze_position.message}', (20, 1060 - (gaze_position_count)*50), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA) - - # Draw start gaze - start_gaze_position.draw(visu_matrix, draw_precision=False) - start_gaze_position.draw(gaze_status_matrix, draw_precision=False) - - if start_gaze_position.valid and next_gaze_position.valid: - - # Draw movement from start to next - cv.line(visu_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1) - cv.line(gaze_status_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1) - - gaze_position_count += 1 - - if start_gaze_position.valid: - - # Write last start gaze position - cv.putText(visu_matrix, str(start_gaze_position.value), start_gaze_position.value, cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Write last start gaze position timing - cv.rectangle(visu_matrix, (0, 50), (550, 100), (31, 31, 31), -1) - cv.putText(visu_matrix, f'Gaze time: {ts_start*1e-3:.3f} ms', (20, 85), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Empty gaze position - except IndexError: - pass - - # Write segment timing - cv.rectangle(visu_matrix, (0, 0), (550, 50), (63, 63, 63), -1) - cv.putText(visu_matrix, f'Video time: {video_ts*1e-3:.3f} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Write movement identification parameters - cv.rectangle(visu_matrix, (0, 150), (550, 310), (63, 63, 63), -1) - cv.putText(visu_matrix, f'Deviation max: {selected_deviation_max_threshold} px', (20, 210), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - cv.putText(visu_matrix, f'Duration min: {args.duration_min_threshold} ms', (20, 270), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Draw dispersion threshold circle - cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), 2, (0, 255, 255), -1) - cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), selected_deviation_max_threshold, (255, 150, 150), 1) - - # Close window using 'Esc' key - if cv.waitKey(1) == 27: - break - - # Display video - cv.imshow(f'Segment {tobii_segment.id} movements', visu_matrix) - - # Write video - output_video.write(visu_matrix) - - # Update Progress Bar - progress = video_ts - int(args.time_range[0] * 1e6) - MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze status video processing:', suffix = 'Complete', length = 100) - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - # Saving gaze status image - cv.imwrite(gaze_status_image_filepath, gaze_status_matrix) - - # End output video file - output_video.close() - print(f'\nGaze status video saved into {gaze_status_video_filepath}\n') - -if __name__ == '__main__': - - main() \ No newline at end of file diff --git a/src/argaze/utils/tobii_segment_record.py b/src/argaze/utils/tobii_segment_record.py deleted file mode 100644 index 25066c4..0000000 --- a/src/argaze/utils/tobii_segment_record.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python - -import argparse -import threading -import time -import random - -from argaze.TobiiGlassesPro2 import TobiiController -from argaze.utils import MiscFeatures - -def main(): - """ - Record a Tobii Glasses Pro 2 session on Tobii interface's SD Card - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default='192.168.1.10', help='tobii glasses ip') - parser.add_argument('-p', '--project_name', metavar='PROJECT_NAME', type=str, default=TobiiController.DEFAULT_PROJECT_NAME, help='project name') - parser.add_argument('-u', '--participant_name', metavar='PARTICIPANT_NAME', type=str, default=TobiiController.DEFAULT_PARTICIPANT_NAME, help='participant name') - args = parser.parse_args() - - # Create tobii controller (with auto discovery network process if no ip argument is provided) - print("Looking for a Tobii Glasses Pro 2 device ...") - - try: - - tobii_controller = TobiiController.TobiiController(ip_address = args.tobii_ip, project_name = args.project_name, participant_name = args.participant_name) - print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') - - except ConnectionError as e: - - print(e) - exit() - - # Setup camera at 25 fps to work on Full HD video stream - tobii_controller.set_scene_camera_freq_25() - - # Print current confirugration - print(f'Tobii Glasses Pro 2 configuration:') - for key, value in tobii_controller.get_configuration().items(): - print(f'\t{key}: {value}') - - # Calibrate tobii glasses - tobii_controller.calibrate() - - # Create recording - recording_id = tobii_controller.create_recording(args.participant_name) - - # Start recording - tobii_controller.start_recording(recording_id) - print(f'Recording {recording_id} started') - - # Define loop - last_battery_level = 0 - time_count = 0 - - exit = MiscFeatures.ExitSignalHandler() - print('Waiting for Ctrl+C to quit...\n') - - while not exit.status(): - - # Print storage info each minutes - if time_count % 60 == 0: - - print(tobii_controller.get_storage_info()) - - # print battery level each time it changes - # send it as experimental variable - battery_level = tobii_controller.get_battery_level() - if battery_level != last_battery_level: - - print(tobii_controller.get_battery_info()) - - tobii_controller.send_variable('battery', battery_level) - - last_battery_level = battery_level - - # send random event each 3 - 10 seconds - if time_count % random.randint(3, 10) == 0: - - print('Send random event') - - tobii_controller.send_event('random') - - # Sleep 1 second - time.sleep(1) - time_count += 1 - - # Stop recording - tobii_controller.stop_recording(recording_id) - print(f'Recording {recording_id} stopped') - -if __name__ == '__main__': - - main() \ No newline at end of file diff --git a/src/argaze/utils/tobii_stream_arscene_display.py b/src/argaze/utils/tobii_stream_arscene_display.py deleted file mode 100644 index e7a3bfb..0000000 --- a/src/argaze/utils/tobii_stream_arscene_display.py +++ /dev/null @@ -1,154 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os, json - -from argaze import * -from argaze.TobiiGlassesPro2 import * -from argaze.ArUcoMarkers import * -from argaze.AreaOfInterest import * -from argaze.utils import MiscFeatures - -import cv2 as cv -import numpy - -def main(): - """ - Detect ArUcoScene into Tobii Glasses Pro 2 camera video stream. - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip') - parser.add_argument('-p', '--env_path', metavar='ENVIRONMENT_PATH', type=str, default=None, help='json argaze environment filepath') - parser.add_argument('-b', '--borders', metavar='BORDERS', type=float, default=16.666, help='define left and right borders mask (%) to not detect aruco out of these borders') - parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs') - args = parser.parse_args() - - # Create tobii controller (with auto discovery network process if no ip argument is provided) - print("Looking for a Tobii Glasses Pro 2 device ...") - - try: - - tobii_controller = TobiiController.TobiiController(args.tobii_ip) - print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') - - except ConnectionError as e: - - print(e) - exit() - - # Setup camera at 25 fps to work on Full HD video stream - tobii_controller.set_scene_camera_freq_25() - - # Print current confirugration - print(f'Tobii Glasses Pro 2 configuration:') - for key, value in tobii_controller.get_configuration().items(): - print(f'\t{key}: {value}') - - # Enable tobii data stream - tobii_data_stream = tobii_controller.enable_data_stream() - - # Enable tobii video stream - tobii_video_stream = tobii_controller.enable_video_stream() - - # Load ArEnvironment - ar_env = ArFeatures.ArEnvironment.from_json(args.env_path) - - if args.debug: - print(ar_env) - - # Work with first scene only - _, ar_scene = next(iter(ar_env.items())) - - # Start streaming - tobii_controller.start_streaming() - - # Live video stream capture loop - try: - - # Assess loop performance - loop_chrono = MiscFeatures.TimeProbe() - fps = 0 - - while tobii_video_stream.is_alive(): - - # Read video stream - video_ts, video_frame = tobii_video_stream.read() - - # Copy video frame to edit visualisation on it without disrupting aruco detection - visu_frame = video_frame.copy() - - # Hide frame left and right borders before detection to ignore markers outside focus area - cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width*args.borders/100), int(video_frame.height)), (0, 0, 0), -1) - cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - args.borders/100)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1) - - # Process video and data frame - try: - - # Detect aruco markers into frame - ar_env.aruco_detector.detect_markers(video_frame.matrix) - - # Estimate markers poses - ar_env.aruco_detector.estimate_markers_pose() - - # Estimate scene pose from ArUco markers into frame. - tvec, rmat, _ = ar_scene.estimate_pose(ar_env.aruco_detector.detected_markers) - - # Project AOI scene into frame according estimated pose - aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV) - - # Draw scene axis - ar_scene.draw_axis(visu_frame.matrix) - - # Draw scene places - ar_scene.draw_places(visu_frame.matrix) - - # Draw AOI - aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255)) - - # Draw detected markers - ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix) - - # Catch exceptions raised by estimate_pose and project methods - except (ArFeatures.PoseEstimationFailed, ArFeatures.SceneProjectionFailed) as e: - - # Draw detected markers - ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix) - - cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(e), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Assess loop performance - lap_time, lap_counter, elapsed_time = loop_chrono.lap() - - # Update fps each 10 loops - if lap_counter >= 10: - - fps = 1e3 * lap_counter / elapsed_time - loop_chrono.restart() - - # Write stream timing - cv.rectangle(visu_frame.matrix, (0, 0), (700, 50), (63, 63, 63), -1) - cv.putText(visu_frame.matrix, f'Video stream time: {int(video_ts*1e-3)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - cv.putText(visu_frame.matrix, f'Fps: {int(fps)}', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - cv.imshow(f'Stream ArUco AOI', visu_frame.matrix) - - # Close window using 'Esc' key - if cv.waitKey(1) == 27: - break - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - # Stop frame display - cv.destroyAllWindows() - - # Stop streaming - tobii_controller.stop_streaming() - -if __name__ == '__main__': - - main() \ No newline at end of file diff --git a/src/argaze/utils/tobii_stream_display.py b/src/argaze/utils/tobii_stream_display.py deleted file mode 100644 index b979e03..0000000 --- a/src/argaze/utils/tobii_stream_display.py +++ /dev/null @@ -1,218 +0,0 @@ -#!/usr/bin/env python - -import argparse - -from argaze import DataStructures, GazeFeatures -from argaze.TobiiGlassesPro2 import * -from argaze.utils import MiscFeatures - -import cv2 as cv -import numpy - -def main(): - """ - Capture video camera and gaze data streams and synchronise them. - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip') - parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath') - - args = parser.parse_args() - - # Create tobii controller (with auto discovery network process if no ip argument is provided) - print("Looking for a Tobii Glasses Pro 2 device ...") - - try: - - tobii_controller = TobiiController.TobiiController(args.tobii_ip) - print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.') - - except ConnectionError as e: - - print(e) - exit() - - # Setup camera at 25 fps to work on Full HD video stream - tobii_controller.set_scene_camera_freq_25() - - # Print current confirugration - print(f'Tobii Glasses Pro 2 configuration:') - for key, value in tobii_controller.get_configuration().items(): - print(f'\t{key}: {value}') - - # Enable tobii data stream - tobii_data_stream = tobii_controller.enable_data_stream() - - # Enable tobii video stream - tobii_video_stream = tobii_controller.enable_video_stream() - - # Create tobii imu handler - tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit() - - # Load optional imu calibration file - if args.imu_calibration != None: - - tobii_imu.load_calibration_file(args.imu_calibration) - - # Init head rotation speed - head_rotation_speed = numpy.zeros(3).astype(int) - - # Init gaze position and precision - gaze_position_px = (0, 0) - gaze_precision_px = 0 - - # Init data timestamped in millisecond - data_ts_ms = 0 - - # Assess temporal performance - loop_chrono = MiscFeatures.TimeProbe() - gyroscope_chrono = MiscFeatures.TimeProbe() - gaze_chrono = MiscFeatures.TimeProbe() - - loop_ps = 0 - gyroscope_ps = 0 - gaze_ps = 0 - - def data_stream_callback(data_ts, data_object, data_object_type): - - nonlocal head_rotation_speed - nonlocal gaze_position_px - nonlocal gaze_precision_px - nonlocal data_ts_ms - nonlocal gyroscope_chrono - nonlocal gaze_chrono - - data_ts_ms = data_ts / 1e3 - - match data_object_type: - - case 'Gyroscope': - - # Assess gyroscope stream performance - gyroscope_chrono.lap() - - # Apply imu calibration - head_rotation_speed = tobii_imu.apply_gyroscope_offset(data_object).value.astype(int) * 5 - - case 'GazePosition': - - # Assess gaze position stream performance - gaze_chrono.lap() - - # Ignore frame when gaze position is not valid - if data_object.validity == 0: - - gaze_position_px = (int(data_object.value[0] * video_frame.width), int(data_object.value[1] * video_frame.height)) - - case 'GazePosition3D': - - # Ignore frame when gaze position 3D is not valid - if data_object.validity == 0: - - gaze_precision_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.PRECISION)) * data_object.value[2] - tobii_camera_hfov_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV / 2)) * data_object.value[2] - - gaze_precision_px = round(video_frame.width * float(gaze_precision_mm) / float(tobii_camera_hfov_mm)) - - # Subscribe to tobii data stream - tobii_data_stream.subscribe(data_stream_callback) - - # Start streaming - tobii_controller.start_streaming() - - # Live video stream capture loop - try: - - while tobii_video_stream.is_alive(): - - # Read video stream - video_ts, video_frame = tobii_video_stream.read() - video_ts_ms = video_ts / 1e3 - - # Assess loop performance - lap_time, lap_counter, elapsed_time = loop_chrono.lap() - - # Update fps each 10 loops - if lap_counter >= 10: - - loop_ps = 1e3 * lap_counter / elapsed_time - loop_chrono.restart() - - # Assess gyroscope streaming performance - elapsed_time, lap_counter = gyroscope_chrono.end() - gyroscope_ps = 1e3 * lap_counter / elapsed_time - gyroscope_chrono.restart() - - # Assess gaze streaming performance - elapsed_time, lap_counter = gaze_chrono.end() - gaze_ps = 1e3 * lap_counter / elapsed_time - gaze_chrono.restart() - - # Draw head rotation speed considering only yaw and pitch values - cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_rotation_speed[1], int(video_frame.height/2) - head_rotation_speed[0]), (150, 150, 150), 3) - - # Draw gaze - gaze_position = GazeFeatures.GazePosition(gaze_position_px, precision=gaze_precision_px) - gaze_position.draw(video_frame.matrix) - - # Draw center - cv.line(video_frame.matrix, (int(video_frame.width/2) - 50, int(video_frame.height/2)), (int(video_frame.width/2) + 50, int(video_frame.height/2)), (255, 150, 150), 1) - cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2) - 50), (int(video_frame.width/2), int(video_frame.height/2) + 50), (255, 150, 150), 1) - - # Write stream timing - cv.rectangle(video_frame.matrix, (0, 0), (1100, 50), (63, 63, 63), -1) - cv.putText(video_frame.matrix, f'Data stream time: {int(data_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - cv.putText(video_frame.matrix, f'Video delay: {int(data_ts_ms - video_ts_ms)} ms', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - cv.putText(video_frame.matrix, f'Fps: {int(loop_ps)}', (950, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - cv.rectangle(video_frame.matrix, (0, 50), (580, 100), (127, 127, 127), -1) - cv.putText(video_frame.matrix, f'Gyroscope fps: {int(gyroscope_ps)}', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - cv.putText(video_frame.matrix, f'Gaze fps: {int(gaze_ps)}', (350, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - cv.imshow(f'Video and data stream', video_frame.matrix) - - key_pressed = cv.waitKey(1) - - #if key_pressed != -1: - # print(key_pressed) - - # Set Auto scene camera preset with 'a' key - if key_pressed == 97: - tobii_controller.set_scene_camera_auto_preset() - print('Tobii Glasses Pro 2 scene camera in Auto mode') - - # Set GazeExposure scene camera preset with 'z' key - if key_pressed == 122: - tobii_controller.set_scene_camera_gaze_preset() - print('Tobii Glasses Pro 2 scene camera in GazeExposure mode') - - # Set Indoor eye camera preset with 'i' key - if key_pressed == 105: - tobii_controller.set_eye_camera_indoor_preset() - print('Tobii Glasses Pro 2 eye camera in Indoor mode') - - # Set Outdoor eye camera preset with 'o' key - if key_pressed == 111: - tobii_controller.set_eye_camera_outdoor_preset() - print('Tobii Glasses Pro 2 eye camera in Outdoor mode') - - - # Close window using 'Esc' key - if key_pressed == 27: - break - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - # Stop frame display - cv.destroyAllWindows() - - # Stop streaming - tobii_controller.stop_streaming() - -if __name__ == '__main__': - - main() \ No newline at end of file -- cgit v1.1