aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2023-03-28 11:53:14 +0200
committerThéo de la Hogue2023-03-28 11:53:14 +0200
commit6b8e7ae63a63a2854613b98db2fdeb079026748e (patch)
tree799fbd0caec0038482c1da81fa130645da64caaf
parent78da945437bbb49a0fc84940c448e175f813230b (diff)
downloadargaze-6b8e7ae63a63a2854613b98db2fdeb079026748e.zip
argaze-6b8e7ae63a63a2854613b98db2fdeb079026748e.tar.gz
argaze-6b8e7ae63a63a2854613b98db2fdeb079026748e.tar.bz2
argaze-6b8e7ae63a63a2854613b98db2fdeb079026748e.tar.xz
Moving Tobii support into a dedicated repository.
-rw-r--r--README.md8
-rw-r--r--setup.py4
-rw-r--r--src/argaze/ArFeatures.py2
-rw-r--r--src/argaze/ArUcoMarkers/README.md6
-rw-r--r--src/argaze/TobiiGlassesPro2/README.md89
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiController.py392
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiData.py565
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiEntities.py334
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py253
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py226
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiSpecifications.py15
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiVideo.py283
-rw-r--r--src/argaze/TobiiGlassesPro2/__init__.py5
-rw-r--r--src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdfbin1965 -> 0 bytes
-rw-r--r--src/argaze/TobiiGlassesPro2/utils/imu.json21
-rw-r--r--src/argaze/__init__.py2
-rw-r--r--src/argaze/utils/README.md75
-rw-r--r--src/argaze/utils/tobii_camera_calibrate.py140
-rw-r--r--src/argaze/utils/tobii_imu_calibrate.py214
-rw-r--r--src/argaze/utils/tobii_sdcard_explore.py83
-rw-r--r--src/argaze/utils/tobii_segment_arscene_edit.py381
-rw-r--r--src/argaze/utils/tobii_segment_arscene_export.py306
-rw-r--r--src/argaze/utils/tobii_segment_data_plot_export.py141
-rw-r--r--src/argaze/utils/tobii_segment_display.py150
-rw-r--r--src/argaze/utils/tobii_segment_gaze_metrics_export.py242
-rw-r--r--src/argaze/utils/tobii_segment_gaze_movements_export.py570
-rw-r--r--src/argaze/utils/tobii_segment_record.py96
-rw-r--r--src/argaze/utils/tobii_stream_arscene_display.py154
-rw-r--r--src/argaze/utils/tobii_stream_display.py218
29 files changed, 15 insertions, 4960 deletions
diff --git a/README.md b/README.md
index 90fe5c3..65fab39 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,8 @@
-**ArGaze** is a python toolkit to deal with gaze tracking in Augmented Reality (AR) environment.
+**ArGaze** is a python toolkit to deal with gaze tracking in **Augmented Reality (AR) environment**.
-The ArGaze toolkit provides solutions to build 3D modeled AR environment based on ArUco markers and so ease experimentation design with wearable eye tracker. Gaze fixations and saccades can be realtime projected on to Areas Of Interest (AOI) onto AR environment for live interactions.
+The ArGaze toolkit provides solutions to build 3D modeled AR environment defining **Areas Of Interest (AOI)** mapped on **ArUco markers** and so ease experimentation design with wearable eye tracker device.
+
+Further, tracked gaze can be projected onto AR environment for live or post **gaze analysis** thanks to **timestamped data** features.
# Architecture
@@ -11,7 +13,7 @@ ArGaze is divided in submodules dedicated to various specifics features:
* `argaze.GazeAnalysis`: Class interface to work with various gaze analysis algorithms.
* `argaze.ArUcoMarkers`: ArUco markers generator, detector, camera calibration, ...
* `argaze.AreaOfInterest`: Area Of Interest (AOI) scene management for 2D and 3D environment.
-* `argaze.TobiiGlassesPro2`: A gaze tracking device interface.
+* `argaze.DataStructures`: Timestamped data features.
* `argaze.utils`: Collection of command-line high level features scripts based on ArGaze toolkit.
# Installation
diff --git a/setup.py b/setup.py
index 01072f9..74ebcbd 100644
--- a/setup.py
+++ b/setup.py
@@ -35,10 +35,10 @@ setup(
packages=find_packages(where='src'),
python_requires='>=3.11',
- install_requires=['opencv-python>=4.7.0', 'opencv-contrib-python>=4.7.0', 'numpy', 'av', 'rtsp', 'netifaces', 'pandas', 'matplotlib', 'shapely', 'scipy'],
+ install_requires=['opencv-python>=4.7.0', 'opencv-contrib-python>=4.7.0', 'numpy', 'pandas', 'matplotlib', 'shapely'],
project_urls={
'Bug Reports': 'https://git.recherche.enac.fr/projects/argaze/issues',
- 'Source': 'https://git.recherche.enac.fr/projects/arucopy/repository',
+ 'Source': 'https://git.recherche.enac.fr/projects/argaze/repository',
},
) \ No newline at end of file
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index d8a9f31..21a7d6a 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -102,7 +102,7 @@ class ArEnvironment():
return output
class PoseEstimationFailed(Exception):
- """Exception raised by ArScene project method when the pose can't be estimated due to unconsistencies."""
+ """Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies."""
def __init__(self, message, unconsistencies=None):
diff --git a/src/argaze/ArUcoMarkers/README.md b/src/argaze/ArUcoMarkers/README.md
index bdc8f9e..931ee4b 100644
--- a/src/argaze/ArUcoMarkers/README.md
+++ b/src/argaze/ArUcoMarkers/README.md
@@ -7,8 +7,6 @@ Here is more [about ArUco markers dictionaries](https://docs.opencv.org/3.4/d9/d
## Utils
-Print **A3_board_35cmx25cm_markers_4X4_3cm.pdf** onto A3 paper sheet to get board at expected dimensions.
+Print **A3_DICT_ARUCO_ORIGINAL_3cm_35cmx25cm.pdf** onto A3 paper sheet to get board at expected dimensions.
-Print **A4_markers_4x4_3cm.pdf** onto A4 paper sheet to get markers at expected dimensions.
-
-Load **detecter_configuration.json** file with argaze utils **tobii_segment_aruco_aoi_export.py** script with -p option. This is an example file to illustrate how to setup [ArUco markers detection parameters](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html). \ No newline at end of file
+Print **A4_DICT_ARUCO_ORIGINAL_3cm_0-9.pdf** onto A4 paper sheet to get markers at expected dimensions.
diff --git a/src/argaze/TobiiGlassesPro2/README.md b/src/argaze/TobiiGlassesPro2/README.md
deleted file mode 100644
index f13490a..0000000
--- a/src/argaze/TobiiGlassesPro2/README.md
+++ /dev/null
@@ -1,89 +0,0 @@
-Class interface to handle Tobbi Glasses Pro 2 device.
-This work is greatly inspired by the David de Tommaso and Agnieszka Wykowska [TobiiGlassesPySuite](https://arxiv.org/pdf/1912.09142.pdf).
-
-.. note::
- Read [Tobii Glasses Pro 2 device user manual](https://www.tobiipro.com/siteassets/tobii-pro/user-manuals/tobii-pro-glasses-2-user-manual.pdf).
-
-## Utils
-
-* Print **A4_calibration_target.pdf** onto A4 paper sheet to get calibration target at expected dimension.
-
-* Load **imu.json** file with argaze utils **tobii_imu_calibrate.py** script with -i option. This is an example file to illustrate how to load Inertial Measure Unit (IMU) calibration parameters.
-
-## Local network configuration
-
-If the tobii Glasses aren't connected to a router, here is how to configure a local DHCP server to enable IPv4 device connection.
-
-### Linux (Ubuntu)
-
-* Setup static eth0 interface
-
-**/etc/network/interfaces**
-
-```
-auto eth0
-iface eth0 inet static
- address 192.168.1.1
- netmask 255.255.255.0
- network 192.168.1.0
- gateway 192.168.1.254
-```
-
-* Install DHCP server:
-
-```
-sudo apt-get install isc-dhcp
-```
-
-* Setup DHCP server:
-
-**/etc/default/isc-dhcp-server**
-
-```
-# On what interfaces should the DHCP server (dhcpd) serve DHCP requests?
-INTERFACESv4="eth0"
-INTERFACESv6=""
-```
-
-**/etc/dhcp/dhcpd.conf**
-
-```
-# NECESSARY TO BE A DHCP SERVER
-authoritative;
-
-# DHCP CONFIGURATION INFORMATION
-default-lease-time 43200;
-max-lease-time 86400;
-server-name "dhcpserver.robotron.lan";
-
-# DNS SERVERS DHCP WILL PUSH TO CLIENTS
-option domain-name-servers 192.168.1.1;
-
-# SEARCH DOMAINS DHCP WILL PUSH TO CLIENTS
-option domain-name "robotron.lan";
-
-# DHCP STATIC IP ASSIGNMENTS FILE
-include "/etc/dhcp/master.conf";
-
-# SUBNET FOR IP ADDRESSES MANUALLY/STATICALLY ASSIGNED ONLY
-subnet 192.168.1.0 netmask 255.255.255.0 {
- option broadcast-address 192.168.1.255;
- option subnet-mask 255.255.255.0;
- option routers 192.168.1.254;
-}
-```
-
-**/etc/dhcp/master.conf**
-
-```
-# Static IP assignments
-## SUBNET - 192.168.1.0/24
-host tobiiglasses { hardware ethernet 74:fe:48:34:7c:92; fixed-address 192.168.1.10; }
-```
-Replace 74:fe:48:34:7c:92 by the correct MAC address.
-
-* Monitor DHCP server activity:
-
-```
-journalctl | grep -Ei 'dhcp'
-```
diff --git a/src/argaze/TobiiGlassesPro2/TobiiController.py b/src/argaze/TobiiGlassesPro2/TobiiController.py
deleted file mode 100644
index 39f4e46..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiController.py
+++ /dev/null
@@ -1,392 +0,0 @@
-#!/usr/bin/env python
-
-import datetime
-import uuid
-
-from argaze.TobiiGlassesPro2 import TobiiNetworkInterface, TobiiData, TobiiVideo
-
-TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f'
-TOBII_DATETIME_FORMAT_HUMREAD = '%d/%m/%Y %H:%M:%S'
-
-DEFAULT_PROJECT_NAME = 'DefaultProject'
-DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant'
-DEFAULT_RECORD_NAME = 'DefaultRecord'
-
-class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface):
- """Handle Tobii glasses Pro 2 device using network interface.
- It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py)."""
-
- project_name = None
- """Project name."""
-
- project_id = None
- """Project identifier."""
-
- participant_name = None
- """Participant name."""
-
- participant_id = None
- """Participant identifier."""
-
- calibration_id = None
- """Calibration identifier."""
-
- def __init__(self, ip_address = None, project_name = DEFAULT_PROJECT_NAME, participant_name = DEFAULT_PARTICIPANT_NAME):
- """Create a project, a participant and start calibration."""
-
- super().__init__(ip_address)
-
- # bind to project or create one if it doesn't exist
- self.project_name = project_name
- self.project_id = self.set_project(self.project_name)
-
- # bind to participant or create one if it doesn't exist
- self.participant_name = participant_name
- self.participant_id = self.set_participant(self.project_id, self.participant_name)
-
- self.__recording_index = 0
-
- self.__data_stream = None
- self.__video_stream = None
-
- self.__record_event_thread = None
-
- super().wait_for_status('/api/system/status', 'sys_status', ['ok']) == 'ok'
-
- def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT):
- return datetime.datetime.now().replace(microsecond=0).strftime(timeformat)
-
- # STREAMING FEATURES
-
- def enable_data_stream(self) -> "TobiiData.TobiiDataStream":
- """Enable Tobii Glasses Pro 2 data streaming."""
-
- if self.__data_stream == None:
- self.__data_stream = TobiiData.TobiiDataStream(self)
-
- return self.__data_stream
-
- def enable_video_stream(self) -> "TobiiVideo.TobiiVideoStream":
- """Enable Tobii Glasses Pro 2 video camera streaming."""
-
- if self.__video_stream == None:
- self.__video_stream = TobiiVideo.TobiiVideoStream(self)
-
- return self.__video_stream
-
- def start_streaming(self):
- """Start data and/or video streaming."""
-
- if self.__data_stream != None:
- self.__data_stream.open()
-
- if self.__video_stream != None:
- self.__video_stream.open()
-
- def stop_streaming(self):
- """Stop data and/or video streaming."""
-
- if self.__data_stream != None:
- self.__data_stream.close()
-
- if self.__video_stream != None:
- self.__video_stream.close()
-
- # PROJECT FEATURES
-
- def set_project(self, project_name = DEFAULT_PROJECT_NAME) -> str:
- """Bind to a project or create one if it doesn't exist.
-
- * **Returns:**
- - project id
- """
-
- project_id = self.get_project_id(project_name)
-
- if project_id is None:
-
- data = {
- 'pr_info' : {
- 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD),
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, project_name)),
- 'Name': project_name
- },
- 'pr_created': self.__get_current_datetime()
- }
-
- json_data = super().post_request('/api/projects', data)
-
- return json_data['pr_id']
-
- else:
-
- return project_id
-
- def get_project_id(self, project_name) -> str:
- """Get project id."""
-
- project_id = None
- projects = super().get_request('/api/projects')
-
- for project in projects:
-
- try:
- if project['pr_info']['Name'] == project_name:
- project_id = project['pr_id']
- except:
- pass
-
- return project_id
-
- def get_projects(self) -> str:
- """Get all projects id."""
-
- return super().get_request('/api/projects')
-
- # PARTICIPANT FEATURES
-
- def set_participant(self, project_id, participant_name = DEFAULT_PARTICIPANT_NAME, participant_notes = '') -> str:
- """Bind to a participant or create one if it doesn't exist.
-
- * **Returns:**
- - participant id
- """
-
- participant_id = self.get_participant_id(participant_name)
-
- if participant_id is None:
-
- data = {
- 'pa_project': project_id,
- 'pa_info': {
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.participant_name)),
- 'Name': self.participant_name,
- 'Notes': participant_notes
- },
- 'pa_created': self.__get_current_datetime()
- }
-
- json_data = super().post_request('/api/participants', data)
-
- return json_data['pa_id']
-
- else:
-
- return participant_id
-
- def get_participant_id(self, participant_name) -> str:
- """Get participant id."""
-
- participant_id = None
- participants = super().get_request('/api/participants')
-
- for participant in participants:
-
- try:
- if participant['pa_info']['Name'] == participant_name:
- participant_id = participant['pa_id']
-
- except:
- pass
-
- return participant_id
-
- def get_participants(self) -> str:
- """Get all participants id."""
-
- return super().get_request('/api/participants')
-
- # CALIBRATION
-
- def calibrate(self):
- """Start Tobii glasses calibration for current project and participant."""
-
- input('Position Tobbi glasses calibration target then press \'Enter\' to start calibration.')
-
- data = {
- 'ca_project': self.project_id,
- 'ca_type': 'default',
- 'ca_participant': self.participant_id,
- 'ca_created': self.__get_current_datetime()
- }
-
- json_data = super().post_request('/api/calibrations', data)
-
- self.calibration_id = json_data['ca_id']
-
- super().post_request('/api/calibrations/' + self.calibration_id + '/start')
-
- status = super().wait_for_status('/api/calibrations/' + self.calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
-
- if status == 'uncalibrated' or status == 'stale' or status == 'failed':
- raise Error(f'Tobii calibration {self.calibration_id} {status}')
-
- # RECORDING FEATURES
-
- def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']):
- return super().wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array)
-
- def create_recording(self, participant_name, recording_notes = '') -> str:
- """Create a new recording.
-
- * **Returns:**
- - recording id
- """
-
- participant_id = self.get_participant_id(participant_name)
-
- if participant_id is None:
- raise NameError(f'{participant_name} participant doesn\'t exist')
-
- self.__recording_index += 1
- recording_name = f'Recording_{self.__recording_index}'
-
- data = {
- 'rec_participant': participant_id,
- 'rec_info': {
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)),
- 'Name': recording_name,
- 'Notes': recording_notes
- },
- 'rec_created': self.__get_current_datetime()
- }
-
- json_data = super().post_request('/api/recordings', data)
-
- return json_data['rec_id']
-
- def start_recording(self, recording_id) -> bool:
- """Start recording on the Tobii interface's SD Card."""
-
- super().post_request('/api/recordings/' + recording_id + '/start')
- return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording'
-
- def stop_recording(self, recording_id) -> bool:
- """Stop recording on the Tobii interface's SD Card."""
-
- super().post_request('/api/recordings/' + recording_id + '/stop')
- return self.__wait_for_recording_status(recording_id, ['done']) == "done"
-
- def pause_recording(self, recording_id) -> bool:
- """Pause recording on the Tobii interface's SD Card."""
-
- super().post_request('/api/recordings/' + recording_id + '/pause')
- return self.__wait_for_recording_status(recording_id, ['paused']) == "paused"
-
- def __get_recording_status(self):
- return self.get_status()['sys_recording']
-
- def get_current_recording_id(self) -> str:
- """Get current recording id."""
-
- return self.__get_recording_status()['rec_id']
-
- @property
- def recording(self) -> bool:
- """Is it recording?"""
-
- rec_status = self.__get_recording_status()
-
- if rec_status != {}:
- if rec_status['rec_state'] == "recording":
- return True
-
- return False
-
- def get_recordings(self) -> str:
- """Get all recordings id."""
-
- return super().get_request('/api/recordings')
-
- # EVENTS AND EXPERIMENTAL VARIABLES
-
- def __post_recording_data(self, event_type: str, event_tag = ''):
- data = {'type': event_type, 'tag': event_tag}
- super().post_request('/api/events', data, wait_for_response=False)
-
- def send_event(self, event_type: str, event_value = None):
- self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value))
-
- def send_variable(self, variable_name: str, variable_value = None):
- self.__post_recording_data(str(variable_name), str(variable_value))
-
- # MISC
-
- def eject_sd(self):
- super().get_request('/api/eject')
-
- def get_battery_info(self):
- return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) )
-
- def get_battery_level(self):
- return self.get_battery_status()['level']
-
- def get_battery_remaining_time(self):
- return self.get_battery_status()['remaining_time']
-
- def get_battery_status(self):
- return self.get_status()['sys_battery']
-
- def get_et_freq(self):
- return self.get_configuration()['sys_et_freq']
-
- def get_et_frequencies(self):
- return self.get_status()['sys_et']['frequencies']
-
- def identify(self):
- super().get_request('/api/identify')
-
- def get_address(self):
- return self.address
-
- def get_configuration(self):
- return super().get_request('/api/system/conf')
-
- def get_status(self):
- return super().get_request('/api/system/status')
-
- def get_storage_info(self):
- return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) )
-
- def get_storage_remaining_time(self):
- return self.get_storage_status()['remaining_time']
-
- def get_storage_status(self):
- return self.get_status()['sys_storage']
-
- def get_scene_camera_freq(self):
- return self.get_configuration()['sys_sc_fps']
-
- def set_et_freq_50(self):
- data = {'sys_et_freq': 50}
- json_data = super().post_request('/api/system/conf', data)
-
- def set_et_freq_100(self):
- # May not be available. Check get_et_frequencies() first.
- data = {'sys_et_freq': 100}
- json_data = super().post_request('/api/system/conf', data)
-
- def set_eye_camera_indoor_preset(self) -> str:
- data = {'sys_ec_preset': 'Indoor'}
- return super().post_request('/api/system/conf', data)
-
- def set_eye_camera_outdoor_preset(self) -> str:
- data = {'sys_ec_preset': 'ClearWeather'}
- return super().post_request('/api/system/conf', data)
-
- def set_scene_camera_auto_preset(self):
- data = {'sys_sc_preset': 'Auto'}
- json_data = super().post_request('/api/system/conf', data)
-
- def set_scene_camera_gaze_preset(self):
- data = {'sys_sc_preset': 'GazeBasedExposure'}
- json_data = super().post_request('/api/system/conf', data)
-
- def set_scene_camera_freq_25(self):
- data = {'sys_sc_fps': 25}
- json_data = super().post_request('/api/system/conf/', data)
-
- def set_scene_camera_freq_50(self):
- data = {'sys_sc_fps': 50}
- json_data = super().post_request('/api/system/conf/', data)
-
diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py
deleted file mode 100644
index 0e28054..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiData.py
+++ /dev/null
@@ -1,565 +0,0 @@
-#!/usr/bin/env python
-
-from typing import Tuple, TypeVar
-from dataclasses import dataclass
-import threading
-import uuid
-import gzip
-import json
-import time
-import queue
-
-from argaze import DataStructures
-from argaze.TobiiGlassesPro2 import TobiiNetworkInterface
-
-from argaze.utils import MiscFeatures
-
-import numpy
-
-TobiiDataObjectType = TypeVar('TobiiDataObjectType', bound="TobiiDataObjectType")
-# Type definition for type annotation convenience
-
-@dataclass
-class DirSig():
- """Define dir sig data (dir sig)."""
-
- dir: int # meaning ?
- sig: int # meaning ?
-
-@dataclass
-class PresentationTimeStamp():
- """Define presentation time stamp (pts) data."""
-
- value: int
- """Pts value."""
-
-@dataclass
-class VideoTimeStamp():
- """Define video time stamp (vts) data."""
-
- value: int
- """Vts value."""
-
- offset: int
- """Primary time stamp value."""
-
-@dataclass
-class EventSynch():
- """Define event synch (evts) data."""
-
- value: int # meaning ?
- """Evts value."""
-
-@dataclass
-class Event():
- """Define event data (ets type tag)."""
-
- ets: int # meaning ?
- type: str
- tag: str # dict ?
-
-@dataclass
-class Accelerometer():
- """Define accelerometer data (ac)."""
-
- value: numpy.array
- """Accelerometer value"""
-
-@dataclass
-class Gyroscope():
- """Define gyroscope data (gy)."""
-
- value: numpy.array
- """Gyroscope value"""
-
-@dataclass
-class PupilCenter():
- """Define pupil center data (gidx pc eye)."""
-
- validity: int
- index: int
- value: tuple((float, float, float))
- eye: str # 'right' or 'left'
-
-@dataclass
-class PupilDiameter():
- """Define pupil diameter data (gidx pd eye)."""
-
- validity: int
- index: int
- value: float
- eye: str # 'right' or 'left'
-
-@dataclass
-class GazeDirection():
- """Define gaze direction data (gidx gd eye)."""
-
- validity: int
- index: int
- value: tuple((float, float, float))
- eye: str # 'right' or 'left'
-
-@dataclass
-class GazePosition():
- """Define gaze position data (gidx l gp)."""
-
- validity: int
- index: int
- l: str # ?
- value: tuple((float, float))
-
-@dataclass
-class GazePosition3D():
- """Define gaze position 3D data (gidx gp3)."""
-
- validity: int
- index: int
- value: tuple((float, float))
-
-@dataclass
-class MarkerPosition():
- """Define marker data (marker3d marker2d)."""
-
- value_3d: tuple((float, float, float))
- value_2d: tuple((float, float))
-
-class TobiiJsonDataParser():
-
- def parse_dir_sig(self, status, json_data):
-
- return DirSig(json_data['dir'], json_data['sig'])
-
- def parse_pts(self, status, json_data):
-
- return PresentationTimeStamp(json_data['pts'])
-
- def parse_vts(self, status, json_data):
-
- return VideoTimeStamp(json_data['vts'], json_data['ts'])
-
- def parse_event_synch(self, status, json_data):
-
- return EventSynch(json_data['evts'])
-
- def parse_event(self, status, json_data):
-
- return Event(json_data['ets'], json_data['type'], json_data['tag'])
-
- def parse_accelerometer(self, status, json_data):
-
- return Accelerometer(json_data['ac'])
-
- def parse_gyroscope(self, status, json_data):
-
- return Gyroscope(json_data['gy'])
-
- def parse_pupil_center(self, status, gaze_index, json_data):
-
- return PupilCenter(status, gaze_index, json_data['pc'], json_data['eye'])
-
- def parse_pupil_diameter(self, status, gaze_index, json_data):
-
- return PupilDiameter(status, gaze_index, json_data['pd'], json_data['eye'])
-
- def parse_gaze_direction(self, status, gaze_index, json_data):
-
- return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye'])
-
- def parse_gaze_position(self, status, gaze_index, json_data):
-
- return GazePosition(status, gaze_index, json_data['l'], json_data['gp'])
-
- def parse_gaze_position_3d(self, status, gaze_index, json_data):
-
- return GazePosition3D(status, gaze_index, json_data['gp3'])
-
- def parse_marker_position(self, status, json_data):
-
- return MarkerPosition(json_data['marker3d'], json_data['marker2d'])
-
- def parse_pupil_or_gaze(self, status, json_data):
-
- gaze_index = json_data.pop('gidx')
-
- # parse pupil or gaze data depending second json key
- second_key = next(iter(json_data))
-
- parse_map = {
- 'pc': self.parse_pupil_center,
- 'pd': self.parse_pupil_diameter,
- 'gd': self.parse_gaze_direction,
- 'l': self.parse_gaze_position,
- 'gp3': self.parse_gaze_position_3d
- }
-
- return parse_map[second_key](status, gaze_index, json_data)
-
- def parse_data(self, status, json_data):
-
- # parse data depending first json key
- first_key = next(iter(json_data))
-
- parse_map = {
- 'dir': self.parse_dir_sig,
- 'pts': self.parse_pts,
- 'vts': self.parse_vts,
- 'evts': self.parse_event_synch,
- 'ets': self.parse_event,
- 'ac': self.parse_accelerometer,
- 'gy': self.parse_gyroscope,
- 'gidx': self.parse_pupil_or_gaze,
- 'marker3d': self.parse_marker_position
- }
-
- return parse_map[first_key](status, json_data)
-
-class TobiiDataSegment():
- """Handle Tobii Glasses Pro 2 segment data file from segment directory.
- Load, parse and store each segment data into dedicated TimeStampedBuffers considering VideoTimeStamp offset to ease data/video synchronisation."""
-
- def __init__(self, segment_data_path, start_timestamp = 0, end_timestamp = None):
-
- self.__path = segment_data_path
-
- self.__vts_offset = 0
- self.__vts_ts = -1
-
- self.__json_data_parser = TobiiJsonDataParser()
-
- self.__ts_data_buffer_dict = {
- 'DirSig': DataStructures.TimeStampedBuffer(),
- 'PresentationTimeStamp': DataStructures.TimeStampedBuffer(),
- 'VideoTimeStamp': DataStructures.TimeStampedBuffer(),
- 'EventSynch': DataStructures.TimeStampedBuffer(),
- 'Event': DataStructures.TimeStampedBuffer(),
- 'Accelerometer': DataStructures.TimeStampedBuffer(),
- 'Gyroscope': DataStructures.TimeStampedBuffer(),
- 'PupilCenter': DataStructures.TimeStampedBuffer(),
- 'PupilDiameter': DataStructures.TimeStampedBuffer(),
- 'GazeDirection': DataStructures.TimeStampedBuffer(),
- 'GazePosition': DataStructures.TimeStampedBuffer(),
- 'GazePosition3D': DataStructures.TimeStampedBuffer(),
- 'MarkerPosition': DataStructures.TimeStampedBuffer()
- }
-
- # define a decoder function
- def decode(json_data):
-
- # parse data status
- status = json_data.pop('s', -1)
-
- # convert timestamp
- ts = json_data.pop('ts')
-
- # watch for vts data to offset timestamps
- try:
- self.__vts_offset = json_data['vts']
- self.__vts_ts = ts
-
- # store primary ts value to allow further reverse offset operation
- json_data['ts'] = ts
-
- except KeyError:
- pass
-
- # ignore data before first vts entry
- if self.__vts_ts == -1:
- return True # continue
-
- ts -= self.__vts_ts
- ts += self.__vts_offset
-
- # ignore timestamps out of the given time range
- if ts < start_timestamp:
- return True # continue
-
- if ts >= end_timestamp:
- return False # stop
-
- # convert json data into data object
- data_object = self.__json_data_parser.parse_data(status, json_data)
- data_object_type = type(data_object).__name__
-
- # store data object into dedicated timestamped buffer
- self.__ts_data_buffer_dict[data_object_type][ts] = data_object
-
- return True # continue
-
- # start loading
- with gzip.open(self.__path) as f:
-
- for item in f:
- if not json.loads(item.decode('utf-8'), object_hook=decode):
- break
-
- def __getitem__(self, key):
- return self.__ts_data_buffer_dict[key]
-
- def keys(self) -> list[str]:
- """Get all segment data keys."""
-
- return list(self.__ts_data_buffer_dict.keys())
-
- @property
- def path(self) -> str:
- """Get segment data path."""
-
- return self.__path
-
-class TobiiDataStream():
- """Handle Tobii Glasses Pro 2 data stream in separate thread."""
-
- def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface):
- """Initialise network connection and prepare data reception."""
-
- # Enable network connection
- self.__network = network_interface
- self.__data_socket = self.__network.make_socket()
-
- # Data reception
- self.__data_thread = None
- self.__json_data_parser = TobiiJsonDataParser()
- self.__first_ts = 0
-
- # Sync reading data subscription
- self.reading_callbacks = []
- self.__subcription_lock = threading.Lock()
-
- # Keep connection alive
- self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
- self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
- self.__keep_alive_thread.daemon = True
- self.__keep_alive_thread.start()
-
- def __del__(self):
- """Stop data reception and network connection before destruction."""
-
- if self.__data_thread != None:
-
- threading.Thread.join(self.__data_thread)
- self.__data_thread = None
-
- threading.Thread.join(self.__keep_alive_thread)
-
- self.__data_socket.close()
-
- def __keep_alive(self):
- """Maintain network connection and clear socket when is not read."""
-
- while True:
-
- # if no thread is reading data socket
- if self.__data_thread == None:
-
- self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg)
-
- clear_count = 0
- while clear_count < 1000 and self.__data_thread == None:
-
- # Clear socket each milli second
- time.sleep(0.001)
- self.__network.grab_data(self.__data_socket)
- clear_count += 1
-
- else:
-
- self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg)
- time.sleep(1)
-
- def open(self):
- """Start data reception."""
-
- if self.__data_thread == None:
-
- self.__first_ts = 0
-
- self.__data_thread = threading.Thread(target = self.__run)
- self.__data_thread.daemon = True
-
- self.__stop_event = threading.Event()
-
- self.__data_thread.start()
-
- def close(self):
- """Stop data reception definitively."""
-
- if self.__data_thread != None:
-
- self.__stop_event.set()
-
- threading.Thread.join(self.__data_thread)
- self.__data_thread = None
-
- @property
- def running(self) -> bool:
- """Is tobii data streaming running?"""
-
- return self.__data_thread != None
-
- def __run(self):
- """Managed received data for sync and async reading case.
- - Sync: send data to callback function.
- - Async: store data into a locked queue for further reading."""
-
- while not self.__stop_event.isSet():
-
- # grab data
- data = self.__network.grab_data(self.__data_socket)
-
- # decode data
- json_data = json.loads(data.decode('utf-8'))
-
- # parse json into timestamped data object
- data_ts, data_object, data_object_type = self.__parse_json_data(json_data)
-
- # lock data subcription
- self.__subcription_lock.acquire()
-
- # share incoming data to all subscribers
- for callback in self.reading_callbacks:
-
- callback(data_ts, data_object, data_object_type)
-
- # unlock data subscription
- self.__subcription_lock.release()
-
- def subscribe(self, reading_callback):
- """Pass reading callback function to get incoming (data_ts, data_object, data_object_type) back."""
-
- # lock data subcription
- self.__subcription_lock.acquire()
-
- # append callback
- self.reading_callbacks.append(reading_callback)
-
- # unlock data subscription
- self.__subcription_lock.release()
-
- def unsubscribe(self, reading_callback):
- """Remove reading callback function to stop data reception."""
-
- # lock data subcription
- self.__subcription_lock.acquire()
-
- # remove callback
- self.reading_callbacks.remove(reading_callback)
-
- # unlock data subscription
- self.__subcription_lock.release()
-
- def __parse_json_data(self, json_data):
-
- # parse data status
- status = json_data.pop('s', -1)
-
- # convert timestamp
- data_ts = json_data.pop('ts')
-
- # convert json data into data object
- data_object = self.__json_data_parser.parse_data(status, json_data)
- data_object_type = type(data_object).__name__
-
- # keep first timestamp to offset all timestamps
- if self.__first_ts == 0:
- self.__first_ts = data_ts
-
- data_ts -= self.__first_ts
-
- return data_ts, data_object, data_object_type
-
- def __capture_callback(self, data_ts, data_object, data_object_type):
-
- if data_object_type == self.__data_stream_selector:
-
- if len(self.__data_ts_buffer.keys()) < self.__data_ts_buffer_size:
-
- # update first timestamp if next timestamps are negative
- if data_ts < 0:
- self.__first_ts += data_ts
- data_ts = 0
-
- self.__data_ts_buffer[data_ts] = data_object
-
- def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500) -> int:
- """Start data stream capture.
-
- * **Returns: each 100 ms**
- - buffer size
- """
-
- # Prepare for data acquisition
- self.__data_ts_buffer = data_ts_buffer
- self.__data_stream_selector = data_object_type
- self.__data_ts_buffer_size = sample_number
-
- # Subscribe to tobii data stream
- self.subscribe(self.__capture_callback)
-
- # Start data stream if needed
- close_after = False
- if not self.running:
- self.open()
- close_after = True
-
- # Share data acquisition progress
- buffer_size = 0
- while buffer_size < sample_number:
-
- time.sleep(0.1)
-
- buffer_size = len(self.__data_ts_buffer.keys())
-
- yield buffer_size
-
- # Stop data sream if needed
- if close_after:
- self.close()
-
- # Unsubscribe to tobii data stream
- self.unsubscribe(self.__capture_callback)
-
- def __buffer_callback(self, data_ts, data_object, data_object_type):
-
- # Lock data queue access
- self.__queue_lock.acquire()
-
- # Put data into the queue
- self.__data_queue.put((data_ts, data_object, data_object_type))
-
- # Unlock data queue access
- self.__queue_lock.release()
-
- def read(self) -> Tuple[int, TobiiDataObjectType, str]:
- """Iterate over incoming data buffer asynchronously."""
-
- # Setup data buffering
- self.__data_queue = queue.Queue()
- self.__queue_lock = threading.Lock()
-
- # Subscribe to tobii data stream
- self.subscribe(self.__buffer_callback)
-
- return self.__iter__()
-
- def __iter__(self):
-
- return self
-
- def __next__(self):
-
- # Wait for data
- while self.__data_queue.empty():
-
- time.sleep(0.0001)
- continue
-
- # Lock data queue access
- self.__queue_lock.acquire()
-
- # Get data from the queue
- data_ts, data_object, data_object_type = self.__data_queue.get()
-
- # Unlock data queue access
- self.__queue_lock.release()
-
- return data_ts, data_object, data_object_type
diff --git a/src/argaze/TobiiGlassesPro2/TobiiEntities.py b/src/argaze/TobiiGlassesPro2/TobiiEntities.py
deleted file mode 100644
index 0ae6dec..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiEntities.py
+++ /dev/null
@@ -1,334 +0,0 @@
-#!/usr/bin/env python
-
-from typing import TypeVar
-import datetime
-import json
-import os
-
-from argaze.TobiiGlassesPro2 import TobiiData, TobiiVideo
-
-import av
-import cv2 as cv
-
-TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f'
-
-TOBII_PROJECTS_DIRNAME = "projects"
-TOBII_PROJECT_FILENAME = "project.json"
-
-TOBII_PARTICIPANTS_DIRNAME = "participants"
-TOBII_PARTICIPANT_FILENAME = "participant.json"
-
-TOBII_RECORDINGS_DIRNAME = "recordings"
-TOBII_RECORD_FILENAME = "recording.json"
-
-TOBII_SEGMENTS_DIRNAME = "segments"
-TOBII_SEGMENT_INFO_FILENAME = "segment.json"
-TOBII_SEGMENT_VIDEO_FILENAME = "fullstream.mp4"
-TOBII_SEGMENT_DATA_FILENAME = "livedata.json.gz"
-
-DatetimeType = TypeVar('datetime', bound="datetime")
-# Type definition for type annotation convenience
-
-class TobiiSegment:
- """Handle Tobii Glasses Pro 2 segment info."""
-
- def __init__(self, segment_path, start_timestamp:int = 0, end_timestamp:int = None):
- """Load segment info from segment directory.
- Optionnaly select a time range in microsecond."""
-
- self.__id = os.path.basename(segment_path)
- self.__path = segment_path
-
- with open(os.path.join(self.__path, TOBII_SEGMENT_INFO_FILENAME)) as f:
- try:
- item = json.load(f)
- except:
- raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}')
-
- self.__start_timestamp = start_timestamp
- self.__end_timestamp = min(end_timestamp, int(item["seg_length"] * 1e6)) if end_timestamp != None else int(item["seg_length"] * 1e6)
-
- if self.__start_timestamp >= self.__end_timestamp:
- raise ValueError('start time is equal or greater than end time.')
-
- self.__calibrated = bool(item["seg_calibrated"])
-
- self.__start_date = datetime.datetime.strptime(item["seg_t_start"], TOBII_DATETIME_FORMAT)
- self.__stop_date = datetime.datetime.strptime(item["seg_t_stop"], TOBII_DATETIME_FORMAT)
-
- @property
- def path(self) -> str:
- """Get segment path."""
-
- return self.__path
-
- @property
- def id(self) -> str:
- """Get segment id."""
- return self.__id
-
- @property
- def start_timestamp(self) -> int:
- """Get the timestamp where the segment loading starts."""
-
- return self.__start_timestamp
-
- @property
- def end_timestamp(self) -> int:
- """Get the timestamp where the segment loading ends."""
-
- return self.__end_timestamp
-
- @property
- def start_date(self) -> DatetimeType:
- """Get the date when the segment has started."""
-
- return self.__start_date
-
- @property
- def stop_date(self) -> DatetimeType:
- """Get the date when the segment has stopped."""
-
- return self.__stop_date
-
- @property
- def calibrated(self) -> bool:
- """Is the segment has been calibrated?"""
-
- return self.__calibrated
-
- def load_data(self) -> "TobiiData.TobiiDataSegment":
- """Load recorded data stream."""
-
- return TobiiData.TobiiDataSegment(os.path.join(self.__path, TOBII_SEGMENT_DATA_FILENAME), self.__start_timestamp, self.__end_timestamp)
-
- def load_video(self) -> "TobiiVideo.TobiiVideoSegment":
- """Load recorded video stream."""
-
- return TobiiVideo.TobiiVideoSegment(os.path.join(self.__path, TOBII_SEGMENT_VIDEO_FILENAME), self.__start_timestamp, self.__end_timestamp)
-
-class TobiiRecording:
- """Handle Tobii Glasses Pro 2 recording info and segments."""
-
- def __init__(self, recording_path):
- """Load recording info from recording directory."""
-
- self.__id = os.path.basename(recording_path)
- self.__path = recording_path
-
- with open(os.path.join(self.__path, TOBII_RECORD_FILENAME)) as f:
- try:
- item = json.load(f)
- except:
- raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_RECORD_FILENAME}')
-
- self.__name = item["rec_info"]["Name"]
- self.__creation_date = datetime.datetime.strptime(item["rec_created"], TOBII_DATETIME_FORMAT)
-
- self.__length = int(item["rec_length"])
- self.__et_samples = int(item["rec_et_samples"])
- self.__et_samples_valid = int(item["rec_et_valid_samples"])
- self.__participant_id = item["rec_participant"]
-
- @property
- def path(self) -> str:
- """Get recording path."""
-
- return self.__path
-
- @property
- def id(self) -> str:
- """Get recording id."""
- return self.__id
-
- @property
- def name(self) -> str:
- """Get recording name."""
-
- return self.__name
-
- @property
- def creation_date(self) -> DatetimeType:
- """Get date when the recording has been done."""
-
- return self.__creation_date
-
- @property
- def length(self):
- """Get record duration in second."""
-
- return self.__length
-
- @property
- def eyetracker_samples(self) -> int:
- """Get numbers of recorded eye detecter samples."""
-
- return self.__et_samples
-
- @property
- def eyetracker_samples_valid(self) -> int:
- """Get numbers of recorded eye detecter valid samples."""
-
- return self.__et_samples_valid
-
- @property
- def project(self) -> "TobiiProject":
- """Get project to which it belongs."""
-
- project_path = os.path.dirname(os.path.dirname(os.path.abspath(self.__path)))
-
- return TobiiProject(project_path)
-
- @property
- def participant(self) -> "TobiiParticipant":
- """Get participant to which it belongs."""
-
- project_path = os.path.dirname(os.path.dirname(os.path.abspath(self.__path)))
-
- return TobiiParticipant(project_path + '/participants/' + self.__participant_id)
-
- def segments(self) -> list["TobiiSegment"]:
- """Get all recorded segments."""
-
- all_segments = []
- segments_path = os.path.join(self.__path, TOBII_SEGMENTS_DIRNAME)
-
- for item in os.listdir(segments_path):
- segment_path = os.path.join(segments_path, item)
- if os.path.isdir(segment_path):
- all_segments.append(TobiiSegment(segment_path))
-
- return all_segments
-
-class TobiiParticipant:
- """Handle Tobii Glasses Pro 2 participant data."""
-
- def __init__(self, participant_path):
- """Load participant data from path"""
-
- self.__id = os.path.basename(participant_path)
- self.__path = participant_path
-
- with open(os.path.join(self.__path, TOBII_PARTICIPANT_FILENAME)) as f:
- try:
- item = json.load(f)
- except:
- raise RuntimeError(f'JSON fails to load {source_dir}/{TOBII_PARTICIPANT_FILENAME}')
-
- self.__name = item["pa_info"]["Name"]
-
- @property
- def path(self) -> str:
- """Get participant path."""
-
- return self.__path
-
- @property
- def id(self) -> str:
- """Get participant id."""
- return self.__id
-
- @property
- def name(self) -> str:
- """Get participant name."""
-
- return self.__name
-
-class TobiiProject:
- """Handle Tobii Glasses Pro 2 project data."""
-
- def __init__(self, project_path):
- """Load project data from projects directory and project id."""
-
- self.__id = os.path.basename(project_path)
- self.__path = project_path
-
- with open(os.path.join(self.__path, TOBII_PROJECT_FILENAME)) as f:
- try:
- item = json.load(f)
- except:
- raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_PROJECT_FILENAME}')
-
- self.__creation_date = datetime.datetime.strptime(item["pr_created"], TOBII_DATETIME_FORMAT)
-
- try:
- self.__name = item["pr_info"]["Name"]
- except:
- self.__name = None
-
- @property
- def path(self) -> str:
- """Get project path."""
-
- return self.__path
-
- @property
- def id(self) -> str:
- """Get project id."""
- return self.__id
-
- @property
- def name(self) -> str:
- """Get project name."""
-
- return self.__name
-
- @property
- def creation_date(self) -> DatetimeType:
- """Get date when the project has been created."""
-
- return self.__creation_date
-
- def participants(self) -> list["TobiiParticipant"]:
- """Get all participants."""
-
- all_participants = []
- participants_path = os.path.join(self.__path, TOBII_PARTICIPANTS_DIRNAME)
-
- for item in os.listdir(participants_path):
- participant_path = os.path.join(participants_path, item)
- if os.path.isdir(participant_path):
- all_participants.append(TobiiParticipant(participant_path))
-
- return all_participants
-
- def recordings(self) -> list["TobiiRecording"]:
- """Get all recordings."""
-
- all_recordings = []
- recordings_path = os.path.join(self.__path, TOBII_RECORDINGS_DIRNAME)
-
- for item in os.listdir(recordings_path):
- recording_path = os.path.join(recordings_path, item)
- if os.path.isdir(recording_path):
- all_recordings.append(TobiiRecording(recording_path))
-
- return all_recordings
-
-class TobiiDrive:
- """Handle Tobii Glasses Pro 2 drive data."""
-
- def __init__(self, drive_path):
- """Load drive data from drive directory path."""
-
- self.__path = drive_path
-
- @property
- def path(self) -> str:
- """Get drive path."""
-
- return self.__path
-
- def projects(self) -> list["TobiiProject"]:
- """Get all projects."""
-
- all_projects = []
- projects_path = os.path.join(self.__path, TOBII_PROJECTS_DIRNAME)
-
- for item in os.listdir(projects_path):
- project_path = os.path.join(projects_path, item)
- if os.path.isdir(project_path):
- all_projects.append(TobiiProject(project_path))
-
- return all_projects
-
diff --git a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py b/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py
deleted file mode 100644
index 35bb035..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py
+++ /dev/null
@@ -1,253 +0,0 @@
-#!/usr/bin/env python
-
-from typing import Tuple
-import json
-import time
-import math
-
-from argaze.TobiiGlassesPro2 import TobiiData
-
-import numpy
-from scipy.optimize import curve_fit
-import cv2 as cv
-
-
-EARTH_GRAVITY = -9.81
-"""Earth gravity force (m/s2)."""
-
-EARTH_GRAVITY_VECTOR = [0, EARTH_GRAVITY, 0]
-"""Earth gravity force vector."""
-
-CAMERA_TO_IMU_TRANSLATION_VECTOR = [8, -1, -5]
-"""Translation vector from camera referential to imu referential (cm)."""
-
-CAMERA_TO_IMU_ROTATION_VECTOR = [18, 0, 180]
-"""Rotation vector from camera referential to imu referential (euler, degree)."""
-
-class TobiiInertialMeasureUnit():
- """Ease Tobbi [Inertial Measure Unit](https://connect.tobii.com/s/article/How-are-the-MEMS-data-reported-for-Tobii-Pro-Glasses-2?language=en_US) data handling"""
-
- def __init__(self):
- """Define IMU calibration data."""
-
- self.__gyroscope_offset = numpy.zeros(3)
- self.__accelerometer_coefficients = numpy.array([[1., 0.], [1., 0.], [1., 0.]])
-
- self.__plumb = numpy.array(EARTH_GRAVITY_VECTOR)
-
- self.reset_rotation()
- self.reset_translation()
-
- def load_calibration_file(self, calibration_filepath):
- """Load IMU calibration from a .json file."""
-
- with open(calibration_filepath) as calibration_file:
-
- # Deserialize .json
- # TODO find a better way
- calibration_data = json.load(calibration_file)
-
- # Load calibration data
- self.__gyroscope_offset = numpy.array(calibration_data['gyroscope_offset'])
- self.__accelerometer_coefficients = numpy.array(calibration_data['accelerometer_coefficients'])
-
- def save_calibration_file(self, calibration_filepath):
- """Save IMU calibration into .json file."""
-
- calibration_data = {
- 'gyroscope_offset': list(self.__gyroscope_offset),
- 'accelerometer_coefficients': [list(self.__accelerometer_coefficients[0]), list(self.__accelerometer_coefficients[1]), list(self.__accelerometer_coefficients[2])]
- }
-
- with open(calibration_filepath, 'w', encoding='utf-8') as calibration_file:
-
- json.dump(calibration_data, calibration_file, ensure_ascii=False, indent=4)
-
- def calibrate_gyroscope_offset(self, gyroscope_ts_buffer) -> numpy.array:
- """Calibrate gyroscope offset from a timestamped gyroscope buffer.
- **Returns:** numpy.array"""
-
- # Consider gyroscope values without timestamps
- gyroscope_values = []
- for ts, data_object in gyroscope_ts_buffer.items():
- gyroscope_values.append(data_object.value)
-
- # Calculate average value for each axis
- gx_offset = numpy.mean(numpy.array(gyroscope_values)[:, 0])
- gy_offset = numpy.mean(numpy.array(gyroscope_values)[:, 1])
- gz_offset = numpy.mean(numpy.array(gyroscope_values)[:, 2])
-
- # Store result
- self.__gyroscope_offset = numpy.array([gx_offset, gy_offset, gz_offset])
-
- return self.__gyroscope_offset
-
- @property
- def gyroscope_offset(self) -> numpy.array:
- """Get gyroscope offset."""
-
- return self.__gyroscope_offset
-
- def apply_gyroscope_offset(self, gyroscope_data_object: TobiiData.Gyroscope) -> "TobiiData.Gyroscope":
- """Remove gyroscope offset to given gyroscope data."""
-
- return TobiiData.Gyroscope(gyroscope_data_object.value - self.__gyroscope_offset)
-
- def reset_rotation(self):
- """Reset rotation value before to start integration process."""
-
- self.__last_gyroscope_ts = None
-
- self.__rotation = numpy.zeros(3)
-
- def update_rotation(self, gyroscope_data_ts, gyroscope_data_object):
- """Integrate timestamped gyroscope values to update rotation."""
-
- # Convert deg/s into deg/ms
- current_gyroscope = gyroscope_data_object.value * 1e-3
-
- # Init gyroscope integration
- if self.__last_gyroscope_ts == None:
-
- self.__last_gyroscope_ts = gyroscope_data_ts
- self.__last_gyroscope = current_gyroscope
-
- # Calculate elapsed time in ms
- delta_time = (gyroscope_data_ts - self.__last_gyroscope_ts) / 1e3
-
- # Integrate gyroscope
- self.__rotation = self.__rotation + (self.__last_gyroscope * delta_time)
-
- # Store current as last
- self.__last_gyroscope_ts = gyroscope_data_ts
- self.__last_gyroscope = current_gyroscope
-
- @property
- def rotation(self) -> numpy.array:
- """Return current rotation value (euler angles in degree)."""
-
- return self.__rotation
-
- def _accelerometer_linear_fit(self, x, a, b):
- """Linear function for accelerometer axis correction."""
- return a * x + b
-
- def calibrate_accelerometer_axis_coefficients(self, axis, upward_ts_buffer, downward_ts_buffer, perpendicular_ts_buffer):
- """Calibrate one accelerometer axis using three data set (upward/+1g, downward/-1g, perpendicular/0g) for linear fit."""
-
- # Consider accelerometer axis values without timestamps
- accelerometer_values = []
- expected_values = []
-
- for (upward_ts, upward_data_object), (downward_ts, downward_data_object), (perpendicular_ts, perpendicular_data_object) in zip(upward_ts_buffer.items(), downward_ts_buffer.items(), perpendicular_ts_buffer.items()):
-
- accelerometer_values.append(upward_data_object.value[axis])
- expected_values.append(+EARTH_GRAVITY)
-
- accelerometer_values.append(downward_data_object.value[axis])
- expected_values.append(-EARTH_GRAVITY)
-
- accelerometer_values.append(perpendicular_data_object.value[axis])
- expected_values.append(0.0)
-
- # Find optimal coefficients according linear fit between accelerometer values and expected values
- optimal_coefficients, _ = curve_fit(self._accelerometer_linear_fit, accelerometer_values, expected_values, maxfev = 10000)
-
- # Store results for the given axis
- self.__accelerometer_coefficients[axis] = numpy.array(optimal_coefficients)
-
- @property
- def accelerometer_coefficients(self) -> numpy.array:
- """Return accelerometer coefficients."""
-
- return self.__accelerometer_coefficients
-
- def apply_accelerometer_coefficients(self, accelerometer_data_object: TobiiData.Accelerometer) -> "TobiiData.Accelerometer":
- """Add accelerometer offset to given accelerometer data."""
-
- x = self._accelerometer_linear_fit(accelerometer_data_object.value[0], *self.__accelerometer_coefficients[0])
- y = self._accelerometer_linear_fit(accelerometer_data_object.value[1], *self.__accelerometer_coefficients[1])
- z = self._accelerometer_linear_fit(accelerometer_data_object.value[2], *self.__accelerometer_coefficients[2])
-
- return TobiiData.Accelerometer(numpy.array([x, y , z]))
-
- def reset_translation(self, translation_speed = numpy.zeros(3)):
- """Reset translation value before to start integration process."""
-
- self.__last_accelerometer_ts = None
-
- self.__translation_speed = translation_speed
- self.__translation = numpy.zeros(3)
-
- def update_translation(self, accelerometer_data_ts, accelerometer_data_object):
- """Integrate timestamped accelerometer values to update translation."""
-
- print('> update_translation: accelerometer_data_ts=', accelerometer_data_ts)
-
- # Convert m/s2 into cm/ms2
- current_accelerometer = accelerometer_data_object.value * 1e-4
-
- print('\tcurrent_accelerometer(cm/ms2)=', current_accelerometer)
- print('\tcurrent_accelerometer norm=', numpy.linalg.norm(current_accelerometer))
-
- # Init accelerometer integration
- if self.__last_accelerometer_ts == None:
-
- self.__last_accelerometer_ts = accelerometer_data_ts
- self.__last_accelerometer = current_accelerometer
- self.__last_translation_speed = numpy.zeros(3)
-
- # Calculate elapsed time in ms
- delta_time = (accelerometer_data_ts - self.__last_accelerometer_ts) / 1e3
-
- print('\tdelta_time=', delta_time)
-
- # Integrate accelerometer
- self.__translation_speed = self.__translation_speed + (self.__last_accelerometer * delta_time)
- self.__translation = self.__translation + (self.__last_translation_speed * delta_time)
-
- print('\tself.__translation_speed(cm/ms)=', self.__translation_speed)
- print('\tself.__translation(cm)=', self.__translation)
-
- # Store current as last
- self.__last_accelerometer = current_accelerometer
- self.__last_accelerometer_ts = accelerometer_data_ts
- self.__last_translation_speed = self.__translation_speed
-
- print('< update_translation')
-
- #else:
-
- # print('no valid head plumb')
-
- @property
- def translation(self) -> numpy.array:
- """Return current translation vector."""
-
- return self.__translation
-
- @property
- def translation_speed(self) -> numpy.array:
- """Return current translation speed vector."""
-
- return self.__translation_speed
-
- def rotate_plumb(self, rvec):
- """Rotate imu plumb to remove gravity effect in accelerometer data."""
-
- C, _ = cv.Rodrigues(rvec)
- self.__plumb = C.dot(EARTH_GRAVITY_VECTOR)
-
- # Check plumb length
- assert(math.isclose(numpy.linalg.norm(self.__plumb), math.fabs(EARTH_GRAVITY), abs_tol=1e-3))
-
- @property
- def plumb(self) -> numpy.array:
- """Return plumb vector."""
-
- return self.__plumb
-
- def apply_plumb(self, accelerometer_data_object: TobiiData.Accelerometer) -> "TobiiData.Accelerometer":
- """Remove gravity along plumb vector to given accelerometer data."""
-
- return TobiiData.Accelerometer(accelerometer_data_object.value - self.__plumb)
diff --git a/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py b/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py
deleted file mode 100644
index c65b121..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py
+++ /dev/null
@@ -1,226 +0,0 @@
-from typing import TypeVar, Any
-import logging
-import sys
-import socket
-import threading
-import json
-import time
-
-# python2 backwards compatibility for errors
-if sys.version_info[0] < 3:
- class ConnectionError(BaseException):
- pass
-
-try:
- import netifaces
- TOBII_DISCOVERY_ALLOWED = True
-except:
- TOBII_DISCOVERY_ALLOWED = False
-
-try:
- from urllib.parse import urlparse, urlencode
- from urllib.request import urlopen, Request
- from urllib.error import URLError, HTTPError
-
-except ImportError:
- from urlparse import urlparse
- from urllib import urlencode
- from urllib2 import urlopen, Request, HTTPError, URLError
-
-socket.IPPROTO_IPV6 = 41
-
-SocketType = TypeVar('socket', bound="socket")
-# Type definition for type annotation convenience
-
-class TobiiNetworkInterface():
- """Handle network connection to Tobii glasses Pro 2 device.
- It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py)."""
-
- def __init__(self, address = None):
-
- self.udpport = 49152
- self.address = address
- self.iface_name = None
-
- if self.address is None:
-
- data, address = self.__discover_device()
-
- if address is None:
- raise ConnectionError("No device found using discovery process")
- else:
- try:
- self.address = data["ipv4"]
- except:
- self.address = address
-
- if "%" in self.address:
- if sys.platform == "win32":
- self.address,self.iface_name = self.address.split("%")
- else:
- self.iface_name = self.address.split("%")[1]
-
- if ':' in self.address:
- self.base_url = 'http://[%s]' % self.address
- else:
- self.base_url = 'http://' + self.address
-
- self.__peer = (self.address, self.udpport)
-
- def make_socket(self) -> SocketType:
- """Create a socket to enable network communication."""
-
- iptype = socket.AF_INET
-
- if ':' in self.__peer[0]:
- iptype = socket.AF_INET6
-
- res = socket.getaddrinfo(self.__peer[0], self.__peer[1], socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)
- family, socktype, proto, canonname, sockaddr = res[0]
- new_socket = socket.socket(family, socktype, proto)
-
- new_socket.settimeout(5.0)
-
- try:
- if iptype == socket.AF_INET6:
- new_socket.setsockopt(socket.SOL_SOCKET, 25, 1)
-
- except socket.error as e:
- if e.errno == 1:
- logging.warning("Binding to a network interface is permitted only for root users.")
-
- return new_socket
-
- def __discover_device(self):
-
- if TOBII_DISCOVERY_ALLOWED == False:
- logging.error("Device discovery is not available due to a missing dependency (netifaces)")
- exit(1)
-
- MULTICAST_ADDR = 'ff02::1'
- PORT = 13006
-
- for i in netifaces.interfaces():
-
- if netifaces.AF_INET6 in netifaces.ifaddresses(i).keys():
-
- if "%" in netifaces.ifaddresses(i)[netifaces.AF_INET6][0]['addr']:
-
- if_name = netifaces.ifaddresses(i)[netifaces.AF_INET6][0]['addr'].split("%")[1]
- if_idx = socket.getaddrinfo(MULTICAST_ADDR + "%" + if_name, PORT, socket.AF_INET6, socket.SOCK_DGRAM)[0][4][3]
-
- s6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
- s6.settimeout(30.0)
- s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, if_idx)
- s6.bind(('::', PORT))
-
- PORT_OUT = PORT if sys.platform == 'win32' or sys.platform == 'darwin' else PORT + 1
-
- try:
-
- # Sending discover request
- discover_json = '{"type":"discover"}'
- s6.sendto(discover_json.encode('utf-8'), (MULTICAST_ADDR, PORT_OUT))
-
- # Waiting for a reponse from the device ...
- data, address = s6.recvfrom(1024)
- jdata = json.loads(data.decode('utf-8'))
-
- addr = address[0]
-
- if sys.version_info.major == 3 and sys.version_info.minor >= 8:
- addr = address[0] + '%' + if_name
-
- return (jdata, addr)
-
- except:
-
- # No device found on interface
- pass
-
- return (None, None)
-
- def get_request(self, api_action) -> str:
- """Send a GET request and get data back."""
-
- url = self.base_url + api_action
- res = urlopen(url).read()
-
- try:
- data = json.loads(res.decode('utf-8'))
- except json.JSONDecodeError:
- data = None
-
- return data
-
- def post_request(self, api_action, data=None, wait_for_response=True) -> str:
- """Send a POST request and get result back."""
-
- url = self.base_url + api_action
- req = Request(url)
- req.add_header('Content-Type', 'application/json')
- data = json.dumps(data)
-
- logging.debug("Sending JSON: " + str(data))
-
- if wait_for_response is False:
- threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start()
- return None
-
- response = urlopen(req, data.encode('utf-8'))
- res = response.read()
-
- logging.debug("Response: " + str(res))
-
- try:
- res = json.loads(res.decode('utf-8'))
- except:
- pass
-
- return res
-
- def send_keep_alive_msg(self, socket, msg):
- """Send a message to keep socket opened."""
-
- res = socket.sendto(msg.encode('utf-8'), self.__peer)
-
- def grab_data(self, socket) -> bytes:
- """Read incoming socket data."""
-
- try:
- data, address = socket.recvfrom(1024)
- return data
-
- except TimeoutError:
-
- logging.error("A timeout occurred while receiving data")
-
- def wait_for_status(self, api_action, key, values, timeout = None) -> Any:
- """Wait until a status matches given values."""
-
- url = self.base_url + api_action
- running = True
-
- while running:
-
- req = Request(url)
- req.add_header('Content-Type', 'application/json')
-
- try:
-
- response = urlopen(req, None, timeout = timeout)
-
- except URLError as e:
-
- logging.error(e.reason)
- return -1
-
- data = response.read()
- json_data = json.loads(data.decode('utf-8'))
-
- if json_data[key] in values:
- running = False
-
- time.sleep(1)
-
- return json_data[key]
diff --git a/src/argaze/TobiiGlassesPro2/TobiiSpecifications.py b/src/argaze/TobiiGlassesPro2/TobiiSpecifications.py
deleted file mode 100644
index 1b2a275..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiSpecifications.py
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env python
-
-"""Here is the article where from following [Tobii specifications](https://www.biorxiv.org/content/10.1101/299925v1) come."""
-
-ACCURACY = 1.42
-"""Gaze position accuracy in degree."""
-
-PRECISION = 0.34 # degree
-"""Gaze position precision in degree."""
-
-CAMERA_HFOV = 82
-"""Camera horizontal field of view in degree."""
-
-VISUAL_HFOV = 160
-"""Visual horizontal field of view in degree.""" \ No newline at end of file
diff --git a/src/argaze/TobiiGlassesPro2/TobiiVideo.py b/src/argaze/TobiiGlassesPro2/TobiiVideo.py
deleted file mode 100644
index 1292de8..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiVideo.py
+++ /dev/null
@@ -1,283 +0,0 @@
-#!/usr/bin/env python
-
-from typing import TypeVar, Tuple
-from dataclasses import dataclass, field
-import threading
-import uuid
-import time
-import copy
-
-from argaze.TobiiGlassesPro2 import TobiiNetworkInterface
-
-import cv2 as cv
-import av
-import numpy
-
-TobiiVideoFrameType = TypeVar('TobiiVideoFrame', bound="TobiiVideoFrame")
-# Type definition for type annotation convenience
-
-AvStreamType = TypeVar('av.stream.Stream', bound="av.stream.Stream")
-# Type definition for type annotation convenience
-
-@dataclass
-class TobiiVideoFrame():
- """Define tobii video frame"""
-
- matrix: list
- """Video frame matrix."""
-
- width: int = field(init=False)
- """Inferred video frame width."""
-
- height: int = field(init=False)
- """Inferred video frame height."""
-
- def __post_init__(self):
- """fill dimension attributes."""
-
- self.height, self.width = self.matrix.shape[:2]
-
- def copy(self) -> TobiiVideoFrameType:
- """Copy tobii video frame."""
-
- return TobiiVideoFrame(self.matrix.copy())
-
-class TobiiVideoSegment():
- """Handle Tobii Glasses Pro 2 segment video file."""
-
- def __init__(self, segment_video_path, start_timestamp:int = 0, end_timestamp:int = None):
- """Load segment video from segment directory"""
-
- self.__path = segment_video_path
-
- self.__container = av.open(self.__path)
- self.__stream = self.__container.streams.video[0]
-
- self.__width = int(cv.VideoCapture(self.__path).get(cv.CAP_PROP_FRAME_WIDTH))
- self.__height = int(cv.VideoCapture(self.__path).get(cv.CAP_PROP_FRAME_HEIGHT))
-
- self.__start_timestamp = start_timestamp
- self.__end_timestamp = end_timestamp
-
- # position at the given start time
- self.__container.seek(self.__start_timestamp)
-
- @property
- def path(self) -> str:
- """Get video segment path."""
-
- return self.__path
-
- @property
- def duration(self) -> int:
- """Duration in microsecond."""
-
- if self.__end_timestamp == None:
- return int((self.__stream.duration * self.__stream.time_base) * 1e6) - self.__start_timestamp
- else:
- return self.__end_timestamp - self.__start_timestamp
-
- @property
- def width(self) -> int:
- """Video width dimension."""
-
- return self.__width
-
- @property
- def height(self) -> int:
- """Video height dimension."""
-
- return self.__height
-
- @property
- def stream(self) -> AvStreamType:
- """Video stream."""
-
- return self.__stream
-
- def get_frame(self, i) -> Tuple[int, "TobiiVideoFrame"]:
- """Access to a frame."""
-
- if i < 0:
- ValueError('Frame index must be a positive integer.')
-
- counter = 0
- frame = None
- video_ts = 0
-
- # position at the given start time
- self.__container.seek(self.__start_timestamp)
-
- # start decoding
- self.__container.decode(self.__stream)
-
- while counter <= i:
-
- frame = self.__container.decode(self.__stream).__next__()
- video_ts = int(frame.time * 1e6)
- counter += 1
-
- # return micro second timestamp and frame data
- return video_ts, TobiiVideoFrame(frame.to_ndarray(format='bgr24'))
-
- def frames(self) -> Tuple[int, "TobiiVideoFrame"]:
- """Access to frame iterator."""
-
- return self.__iter__()
-
- def __iter__(self):
-
- # start decoding
- self.__container.decode(self.__stream)
-
- return self
-
- def __next__(self):
-
- frame = self.__container.decode(self.__stream).__next__()
-
- video_ts = int(frame.time * 1e6)
-
- # Ignore before start timestamp
- if video_ts < self.__start_timestamp:
- return self.__next__()
-
- # Ignore frames after end timestamp
- if self.__end_timestamp != None:
-
- if video_ts >= self.__end_timestamp:
- raise StopIteration
-
- # return micro second timestamp and frame data
- return video_ts, TobiiVideoFrame(frame.to_ndarray(format='bgr24'))
-
-class TobiiVideoStream(threading.Thread):
- """Capture Tobii Glasses Pro 2 video camera stream."""
-
- def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface):
- """Initialise video stream reception."""
-
- threading.Thread.__init__(self)
- threading.Thread.daemon = True
-
- self.__network = network_interface
- self.__video_socket = self.__network.make_socket()
-
- self.__stop_event = threading.Event()
- self.__read_lock = threading.Lock()
-
- self.__frame_tuple = None
-
- # prepare keep alive message
- self.__keep_alive_msg = "{\"type\": \"live.video.unicast\",\"key\": \""+ str(uuid.uuid4()) +"_video\", \"op\": \"start\"}"
- self.__keep_alive_thread = threading.Timer(0, self.__keep_alive)
- self.__keep_alive_thread.daemon = True
-
- def __del__(self):
- """Stop data reception before destruction."""
-
- if self.is_alive():
-
- self.close()
-
- def __keep_alive(self):
- """Maintain connection."""
-
- while not self.__stop_event.isSet():
-
- self.__network.send_keep_alive_msg(self.__video_socket, self.__keep_alive_msg)
-
- time.sleep(1)
-
- def open(self):
- """Start data reception."""
-
- self.__keep_alive_thread.start()
- threading.Thread.start(self)
-
- def close(self):
- """Stop data reception definitively."""
-
- self.__stop_event.set()
-
- threading.Thread.join(self.__keep_alive_thread)
- threading.Thread.join(self)
-
- self.__video_socket.close()
-
- def run(self):
- """Store frame for further reading."""
-
- container = av.open(f'rtsp://{self.__network.get_address()}:8554/live/scene', options={'rtsp_transport': 'tcp'})
- stream = container.streams.video[0]
-
- for frame in container.decode(stream):
-
- # quit if the video acquisition thread have been stopped
- if self.__stop_event.isSet():
- break
-
- # lock frame access
- self.__read_lock.acquire()
-
- # store frame time, matrix into a tuple
- self.__frame_tuple = (frame.time, frame.to_ndarray(format='bgr24'))
-
- # unlock frame access
- self.__read_lock.release()
-
- def read(self) -> Tuple[int, "TobiiVideoFrame"]:
- """Read incoming video frames."""
-
- # if the video acquisition thread have been stopped or isn't started
- if self.__stop_event.isSet() or self.__frame_tuple == None:
- return -1, TobiiVideoFrame(numpy.zeros((1, 1, 3), numpy.uint8))
-
- # lock frame access
- self.__read_lock.acquire()
-
- # copy frame tuple
- frame_tuple = copy.deepcopy(self.__frame_tuple)
-
- # unlock frame access
- self.__read_lock.release()
-
- return int(frame_tuple[0] * 1e6), TobiiVideoFrame(frame_tuple[1])
-
-class TobiiVideoOutput():
- """Export a video file at the same format than a given referent stream."""
- # TODO : Make a generic video managment to handle video from any device (not only Tobii)
-
- def __init__(self, output_video_path: str, referent_stream: av.stream.Stream):
- """Create a video file"""
-
- self.__path = output_video_path
- self.__container = av.open(self.__path, 'w')
- self.__stream = self.__container.add_stream(\
- referent_stream.codec_context.name, \
- width=referent_stream.codec_context.width, \
- height=referent_stream.codec_context.height, \
- rate=referent_stream.codec_context.framerate, \
- gop_size=referent_stream.codec_context.gop_size, \
- pix_fmt=referent_stream.codec_context.pix_fmt, \
- bit_rate=referent_stream.codec_context.bit_rate)
-
- @property
- def path(self) -> str:
- """Get video file path."""
-
- return self.__path
-
- def write(self, frame):
- """Write a frame into the output video file"""
-
- formated_frame = av.VideoFrame.from_ndarray(frame, format='bgr24')
- formated_frame.reformat(format=self.__stream.codec_context.pix_fmt, interpolation=None)
- self.__container.mux(self.__stream.encode(formated_frame))
-
- def close(self):
- """End the writing of the video file"""
-
- self.__container.mux(self.__stream.encode())
- self.__container.close()
-
diff --git a/src/argaze/TobiiGlassesPro2/__init__.py b/src/argaze/TobiiGlassesPro2/__init__.py
deleted file mode 100644
index 50fb742..0000000
--- a/src/argaze/TobiiGlassesPro2/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-"""
-.. include:: README.md
-"""
-__docformat__ = "restructuredtext"
-__all__ = ['TobiiEntities', 'TobiiController', 'TobiiNetworkInterface', 'TobiiData', 'TobiiVideo', 'TobiiInertialMeasureUnit', 'TobiiSpecifications',] \ No newline at end of file
diff --git a/src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf b/src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf
deleted file mode 100644
index dfdbe0a..0000000
--- a/src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf
+++ /dev/null
Binary files differ
diff --git a/src/argaze/TobiiGlassesPro2/utils/imu.json b/src/argaze/TobiiGlassesPro2/utils/imu.json
deleted file mode 100644
index b701b00..0000000
--- a/src/argaze/TobiiGlassesPro2/utils/imu.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "gyroscope_offset": [
- 1.00506,
- -3.338012,
- 1.9096039999999999
- ],
- "accelerometer_coefficients": [
- [
- 0.9918761478460875,
- -0.29679248712760953
- ],
- [
- 0.9749789492717152,
- -0.15941685808576067
- ],
- [
- 0.9520423758351338,
- 0.23147323632416672
- ]
- ]
-} \ No newline at end of file
diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py
index 1162ba7..ef75dd2 100644
--- a/src/argaze/__init__.py
+++ b/src/argaze/__init__.py
@@ -2,4 +2,4 @@
.. include:: ../../README.md
"""
__docformat__ = "restructuredtext"
-__all__ = ['ArFeatures','GazeFeatures','GazeAnalysis','ArUcoMarkers','AreaOfInterest','DataStructures','TobiiGlassesPro2','utils'] \ No newline at end of file
+__all__ = ['ArFeatures','GazeFeatures','GazeAnalysis','ArUcoMarkers','AreaOfInterest','DataStructures','utils'] \ No newline at end of file
diff --git a/src/argaze/utils/README.md b/src/argaze/utils/README.md
index 16c85c0..ea293f6 100644
--- a/src/argaze/utils/README.md
+++ b/src/argaze/utils/README.md
@@ -20,77 +20,10 @@ Export a 7 columns and 5 rows calibration board made of 5cm squares with 3cm mar
python ./src/argaze/utils/aruco_calibration_board_export.py 7 5 5 3 -o _export -d DICT_APRILTAG_16h5 -r 300
```
-# Tobii calibration
+# TODO: Camera calibration
-Calibrate Tobii Glasses Pro 2 camera (-t IP_ADDRESS) using a 7 columns and 5 rows calibration board made of 5cm squares with 3cm markers from DICT_APRILTAG_16h5 dictionary. Then, export its optical parameters into an tobii_camera.json file:
+Calibrate a network camera (-t IP_ADDRESS) using a 7 columns and 5 rows calibration board made of 5cm squares with 3cm markers from DICT_APRILTAG_16h5 dictionary. Then, export its optical parameters into an camera.json file:
```
-python ./src/argaze/utils/tobii_camera_calibrate.py 7 5 5 3 -t IP_ADDRESS -d DICT_APRILTAG_16h5 -o _export/tobii_camera.json
-```
-
-Calibrate Tobii Glasses Pro 2 inertial measure unit (-t IP_ADDRESS) then, export calibration parameters into an imu.json file:
-
-```
-python ./src/argaze/utils/tobii_imu_calibrate.py -t IP_ADDRESS -o _export/imu.json
-```
-
-# Tobii session
-
-Display Tobii Glasses Pro 2 camera video stream (-t IP_ADDRESS) with a live gaze pointer. Loading calibration file to display inertial sensors data:
-
-```
-python ./src/argaze/utils/tobii_stream_display.py -t IP_ADDRESS -i _export/imu.json
-```
-
-Record a Tobii Glasses Pro 2 'myProject' session for a 'myUser' participant on Tobii interface's SD card (-t IP_ADDRESS):
-
-```
-python ./src/argaze/utils/tobii_segment_record.py -t IP_ADDRESS -p myProject -u myUser
-```
-
-# Tobii drive
-
-Explore Tobii Glasses Pro 2 interface's SD Card (-d DRIVE_PATH, -p PROJECT_PATH, -r RECORDING_PATH, -s SEGMENT_PATH):
-
-```
-python ./src/argaze/utils/tobii_sdcard_explore.py -d DRIVE_PATH
-```
-
-```
-python ./src/argaze/utils/tobii_sdcard_explore.py -p PROJECT_PATH
-```
-
-```
-python ./src/argaze/utils/tobii_sdcard_explore.py -r RECORDING_PATH
-```
-
-```
-python ./src/argaze/utils/tobii_sdcard_explore.py -s SEGMENT_PATH
-```
-
-# Tobii post-processing
-
-Replay a time range selection (-r IN OUT) Tobii Glasses Pro 2 session (-s SEGMENT_PATH) synchronizing video and some data together:
-
-```
-python ./src/argaze/utils/tobii_segment_display.py -s SEGMENT_PATH -r IN OUT
-```
-
-Export Tobii segment fixations and saccades (-s SEGMENT_PATH) from a time range selection (-r IN OUT) as fixations.csv and saccades.csv files saved into the segment folder:
-
-```
-python ./src/argaze/utils/tobii_segment_gaze_movements_export.py -s SEGMENT_PATH -r IN OUT
-```
-
-# Tobii with ArUco
-
-Detect ArUco markers (-md MARKER_DICT -ms MARKER_SIZE) into Tobii camera video stream (-t IP_ADDRESS). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame:
-
-```
-python ./src/argaze/utils/tobii_stream_aruco_aoi_display.py -t IP_ADDRESS -c _export/tobii_camera.json -md MARKER_DICT -ms MARKER_SIZE -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}'
-```
-
-Detect ArUco markers (-md MARKER_DICT -ms MARKER_SIZE) into a Tobii camera video segment (-s SEGMENT_PATH) into a time range selection (-r IN OUT). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame. Export aoi video and data as a aruco_aoi.csv, aruco_aoi.mp4 files:
-```
-python ./src/argaze/utils/tobii_segment_aruco_aoi_export.py -s SEGMENT_PATH -c _export/tobii_camera.json -md MARKER_DICT -ms MARKER_SIZE -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}' -r IN OUT
-```
+python ./src/argaze/utils/camera_calibrate.py 7 5 5 3 -t IP_ADDRESS -d DICT_APRILTAG_16h5 -o _export/camera.json
+``` \ No newline at end of file
diff --git a/src/argaze/utils/tobii_camera_calibrate.py b/src/argaze/utils/tobii_camera_calibrate.py
deleted file mode 100644
index 24cbe5c..0000000
--- a/src/argaze/utils/tobii_camera_calibrate.py
+++ /dev/null
@@ -1,140 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import time
-
-from argaze.TobiiGlassesPro2 import TobiiController, TobiiVideo
-from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoBoard, ArUcoDetector, ArUcoCamera
-
-import cv2 as cv
-
-def main():
- """
- Captures board pictures and finally outputs camera calibration data into a .json file.
-
- - Export and print a calibration board using
- - Place the calibration board in order to view it entirely on screen and move the camera in many configurations (orientation and distance) : the script will automatically take pictures. Do this step with a good lighting and a clear background.
- - Once enough pictures have been captured (~20), press Esc key then, wait for the camera calibration processing.
- - Finally, check rms parameter: it should be between 0. and 1. if the calibration succeeded (lower is better).
-
- ### Reference:
- - [Camera calibration using ArUco marker tutorial](https://automaticaddison.com/how-to-perform-camera-calibration-using-opencv/)
- """
-
- # manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('columns', metavar='COLS_NUMBER', type=int, default=7, help='number of columns')
- parser.add_argument('rows', metavar='ROWS_NUMBER', type=int, default=5, help='number of rows')
- parser.add_argument('square_size', metavar='SQUARE_SIZE', type=float, default=5, help='square size (cm)')
- parser.add_argument('marker_size', metavar='MARKER_SIZE', type=float, default=3, help='marker size (cm)')
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default='camera.json', help='destination filepath')
- parser.add_argument('-d', '--dictionary', metavar='DICT', type=str, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL, DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)')
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(args.tobii_ip)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Setup camera at 25 fps to work on Full HD video stream
- tobii_controller.set_scene_camera_freq_25()
-
- # Get video stream dimension
- video_width = tobii_controller.get_configuration()['sys_sc_width']
- video_height = tobii_controller.get_configuration()['sys_sc_height']
-
- # Print current confirguration
- print(f'Tobii Glasses Pro 2 configuration:')
- for key, value in tobii_controller.get_configuration().items():
- print(f'\t{key}: {value}')
-
- # Enable tobii video stream
- tobii_video_stream = tobii_controller.enable_video_stream()
-
- # Create aruco camera
- aruco_camera = ArUcoCamera.ArUcoCamera(dimensions=(video_width, video_height))
-
- # Create aruco board
- aruco_board = ArUcoBoard.ArUcoBoard(args.columns, args.rows, args.square_size, args.marker_size, args.dictionary)
-
- # Create aruco detecter
- aruco_detector = ArUcoDetector.ArUcoDetector(dictionary=args.dictionary, marker_size=args.marker_size)
-
- # Start tobii glasses streaming
- tobii_controller.start_streaming()
-
- print("Camera calibration starts")
- print("Waiting for calibration board...")
-
- expected_markers_number = aruco_board.markers_number
- expected_corners_number = aruco_board.corners_number
-
- # Capture loop
- try:
-
- while tobii_video_stream.is_alive():
-
- # capture frame with a full displayed board
- video_ts, video_frame = tobii_video_stream.read()
-
- # detect all markers in the board
- aruco_detector.detect_board(video_frame.matrix, aruco_board, expected_markers_number)
-
- # draw only markers
- aruco_detector.draw_detected_markers(video_frame.matrix)
-
- # draw current calibration data count
- cv.putText(video_frame.matrix, f'Capture: {aruco_camera.calibration_data_count}', (50, 50), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv.LINE_AA)
- cv.imshow('Tobii Camera Calibration', video_frame.matrix)
-
- # if all board corners are detected
- if aruco_detector.board_corners_number == expected_corners_number:
-
- # draw board corners to notify a capture is done
- aruco_detector.draw_board(video_frame.matrix)
-
- # append data
- aruco_camera.store_calibration_data(aruco_detector.board_corners, aruco_detector.board_corners_identifier)
-
- cv.imshow('Tobii Camera Calibration', video_frame.matrix)
-
- # close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- # exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # stop frame display
- cv.destroyAllWindows()
-
- # Stop tobii glasses streaming
- tobii_controller.stop_streaming()
-
- print('\nCalibrating camera...')
- aruco_camera.calibrate(aruco_board)
-
- print('\nCalibration succeeded!')
- print(f'\nRMS:\n{aruco_camera.rms}')
- print(f'\nDimensions:\n{video_width}x{video_height}')
- print(f'\nCamera matrix:\n{aruco_camera.K}')
- print(f'\nDistortion coefficients:\n{aruco_camera.D}')
-
- aruco_camera.to_json(args.output)
-
- print(f'\nCalibration data exported into {args.output} file')
-
-if __name__ == '__main__':
-
- main()
diff --git a/src/argaze/utils/tobii_imu_calibrate.py b/src/argaze/utils/tobii_imu_calibrate.py
deleted file mode 100644
index c9e4813..0000000
--- a/src/argaze/utils/tobii_imu_calibrate.py
+++ /dev/null
@@ -1,214 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import time
-
-from argaze import DataStructures
-from argaze.TobiiGlassesPro2 import TobiiController, TobiiInertialMeasureUnit
-from argaze.utils import MiscFeatures
-
-import numpy
-import matplotlib.pyplot as mpyplot
-import matplotlib.patches as mpatches
-
-def main():
- """
- Calibrate Tobbi gyroscope and accelerometer sensors and finally outputs camera calibration data into a .json file.
-
- ### Reference:
- - [Inertial Measure Unit calibration tutorial](https://makersportal.com/blog/calibration-of-an-inertial-measurement-unit-imu-with-raspberry-pi-part-ii)
- """
-
- # manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
- parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath')
- parser.add_argument('-n', '--sample_number', metavar='BUFFER_SIZE', type=int, default=500, help='number of samples to store into calibration buffer')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default='imu.json', help='destination filepath')
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(args.tobii_ip)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Create tobii imu handler
- tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit()
-
- # Load optional imu calibration file
- if args.imu_calibration != None:
-
- tobii_imu.load_calibration_file(args.imu_calibration)
-
- # Enable tobii data stream
- tobii_data_stream = tobii_controller.enable_data_stream()
-
- # Menu loop
- try:
-
- while True:
-
- print('-' * 52)
- menu_input = input('Tobii Inertial Measure Unit sensor calibration menu:\n\t\'a\' for accelerometer calibration.\n\t\'A\' for accelerometer visualisation.\n\t\'g\' for gyroscope calibration.\n\t\'G\' for gyroscope visualisation.\n\t\'p\' print current calibration.\n\t\'s\' save calibration.\n\t\'q\' quit calibration without saving.\n>')
-
- match menu_input:
-
- case 'a':
-
- axis = ['X', 'Y', 'Z']
- directions = ['upward', 'downward', 'perpendicular']
-
- for i, axis in enumerate(axis):
-
- print(f'\nACCELEROMETER {axis} AXIS CALIBRATION')
-
- axis_buffers = {}
-
- for j, direction in enumerate(directions):
-
- input(f'\nKeep Tobii Glasses accelerometer {axis} axis {direction} then press \'Enter\' to start data acquisition.\n')
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Capture accelerometer data stream
- data_ts_buffer = DataStructures.TimeStampedBuffer()
- for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number):
-
- # Update progress Bar
- MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- axis_buffers[direction] = data_ts_buffer
-
- tobii_imu.calibrate_accelerometer_axis_coefficients(i, axis_buffers['upward'], axis_buffers['downward'], axis_buffers['perpendicular'])
-
- accelerometer_coefficients = tobii_imu.accelerometer_coefficients
-
- print(f'\n\nAccelerometer optimal linear fit coefficients over {progress} values for each axis:')
- print('\tX coefficients: ', accelerometer_coefficients[0])
- print('\tY coefficients: ', accelerometer_coefficients[1])
- print('\tZ coefficients: ', accelerometer_coefficients[2])
-
- case 'A':
-
- print('\nCAPTURE AND PLOT ACCELEROMETER STREAM')
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Capture accelerometer data stream
- data_ts_buffer = DataStructures.TimeStampedBuffer()
- for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number):
-
- # Update progress Bar
- MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Edit figure
- figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
- data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
- figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144)
-
- # Plot data
- subplot = figure.add_subplot(111)
- subplot.set_title('Accelerometer', loc='left')
- patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Display figure
- mpyplot.show()
- figure.clear()
-
- case 'g':
-
- print('\nGYROSCOPE CALIBRATION')
- input('Keep Tobii Glasses steady then press \'Enter\' to start data acquisition.\n')
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Capture gyroscope data stream
- data_ts_buffer = DataStructures.TimeStampedBuffer()
- for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number):
-
- # Update progress Bar
- MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- gyroscope_offset = tobii_imu.calibrate_gyroscope_offset(data_ts_buffer)
-
- print(f'\n\nGyroscope average over {progress} values for each axis:')
- print('\tX offset: ', gyroscope_offset[0])
- print('\tY offset: ', gyroscope_offset[1])
- print('\tZ offset: ', gyroscope_offset[2])
-
- case 'G':
-
- print('\nCAPTURE AND PLOT GYROSCOPE STREAM')
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Capture accelerometer data stream
- data_ts_buffer = DataStructures.TimeStampedBuffer()
- for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number):
-
- # Update progress Bar
- MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Edit figure
- figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
- data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
- figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144)
-
- # Plot data
- subplot = figure.add_subplot(111)
- subplot.set_title('Gyroscope', loc='left')
- patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Display figure
- mpyplot.show()
- figure.clear()
-
- case 'p':
-
- gyroscope_offset = tobii_imu.gyroscope_offset
-
- print(f'\nGyroscope offset for each axis:')
- print('\tX offset: ', gyroscope_offset[0])
- print('\tY offset: ', gyroscope_offset[1])
- print('\tZ offset: ', gyroscope_offset[2])
-
- accelerometer_coefficients = tobii_imu.accelerometer_coefficients
-
- print(f'\nAccelerometer optimal linear fit coefficients for each axis:')
- print('\tX coefficients: ', accelerometer_coefficients[0])
- print('\tY coefficients: ', accelerometer_coefficients[1])
- print('\tZ coefficients: ', accelerometer_coefficients[2])
-
- case 's':
-
- tobii_imu.save_calibration_file(args.output)
- print(f'\nCalibration data exported into {args.output} file')
-
- break
-
- case 'q':
-
- break
-
- # exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
-if __name__ == '__main__':
-
- main()
diff --git a/src/argaze/utils/tobii_sdcard_explore.py b/src/argaze/utils/tobii_sdcard_explore.py
deleted file mode 100644
index ed297be..0000000
--- a/src/argaze/utils/tobii_sdcard_explore.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiData, TobiiVideo
-
-import matplotlib.pyplot as mpyplot
-import matplotlib.patches as mpatches
-
-def main():
- """
- Explore Tobii Glasses Pro 2 interface's SD Card
- """
-
- # manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-d', '--drive_path', metavar='DRIVE_PATH', type=str, default=None, help='drive path')
- parser.add_argument('-p', '--project_path', metavar='PROJECT_PATH', type=str, default=None, help='project path')
- parser.add_argument('-r', '--recording_path', metavar='RECORDING_PATH', type=str, default=None, help='recording path')
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- args = parser.parse_args()
-
- if args.drive_path != None:
-
- # Load all projects from a tobii drive
- tobii_drive = TobiiEntities.TobiiDrive(args.drive_path)
-
- for project in tobii_drive.projects():
- print(f'Project id: {project.id}, name: {project.name}')
-
- elif args.project_path != None:
-
- # Load one tobii project
- tobii_project = TobiiEntities.TobiiProject(args.project_path)
-
- for participant in tobii_project.participants():
- print(f'Participant id: {participant.id}, name: {participant.name}')
-
- for recording in tobii_project.recordings():
- print(f'Recording id: {recording.id}, name: {recording.name}')
- print(f'\tProject: {recording.project.name}')
- print(f'\tParticipant: {recording.participant.name}')
-
- elif args.recording_path != None:
-
- # Load a tobii segment
- tobii_recording = TobiiEntities.TobiiRecording(args.recording_path)
-
- for segment in tobii_recording.segments():
- print(f'Segment id: {segment.id}')
-
- elif args.segment_path != None:
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path)
-
- tobii_segment_video = tobii_segment.load_video()
- print(f'Video properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'Loaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Edit figure
- figure_width = min(tobii_segment_video.duration/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
- data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
- figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144)
-
- # Plot data
- subplot = figure.add_subplot(111)
- subplot.set_title('VideoTimeStamps', loc='left')
- patches = tobii_segment_data['VideoTimeStamp'].plot(names=['offset','value'], colors=['#276FB6','#9427B6'], samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Display figure
- mpyplot.show()
- figure.clear()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_arscene_edit.py b/src/argaze/utils/tobii_segment_arscene_edit.py
deleted file mode 100644
index b8a5745..0000000
--- a/src/argaze/utils/tobii_segment_arscene_edit.py
+++ /dev/null
@@ -1,381 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import json
-import time
-
-from argaze import *
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications
-from argaze.ArUcoMarkers import *
-from argaze.AreaOfInterest import *
-from argaze.utils import MiscFeatures
-
-import numpy
-import cv2 as cv
-
-def main():
- """
- Open video file with ArUco marker scene inside
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-p', '--project_path', metavar='ARGAZE_PROJECT', type=str, default=None, help='json argaze project filepath')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction)
- args = parser.parse_args()
-
- if args.segment_path != None:
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- #vs_data_filepath = f'{destination_path}/visual_scan.csv'
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'Video properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Load ar scene
- ar_scene = ArScene.ArScene.from_json(args.project_path)
-
- print(ar_scene)
-
- # Display first frame
- video_ts, video_frame = tobii_segment_video.get_frame(0)
- cv.imshow(f'Segment {tobii_segment.id} ArUco marker editor', video_frame.matrix)
-
- # Init mouse interaction variables
- pointer = (0, 0)
- left_click = (0, 0)
- right_click = (0, 0)
- right_button = False
- edit_trans = False # translate
- edit_coord = 0 # x
-
- # On mouse left left_click : update pointer position
- def on_mouse_event(event, x, y, flags, param):
-
- nonlocal pointer
- nonlocal left_click
- nonlocal right_click
- nonlocal right_button
-
- # Update pointer
- pointer = (x, y)
-
- # Update left_click
- if event == cv.EVENT_LBUTTONUP:
-
- left_click = pointer
-
- # Udpate right_button
- elif event == cv.EVENT_RBUTTONDOWN:
-
- right_button = True
-
- elif event == cv.EVENT_RBUTTONUP:
-
- right_button = False
-
- # Udpate right_click
- if right_button:
-
- right_click = pointer
-
- cv.setMouseCallback(f'Segment {tobii_segment.id} ArUco marker editor', on_mouse_event)
-
- # Frame selector loop
- frame_index = 0
- last_frame_index = -1
- last_frame = video_frame.copy()
- force_update = False
-
- selected_marker_id = -1
-
- try:
-
- while True:
-
- # Select a frame on change
- if frame_index != last_frame_index or force_update:
-
- video_ts, video_frame = tobii_segment_video.get_frame(frame_index)
- video_ts_ms = video_ts / 1e3
-
- last_frame_index = frame_index
- last_frame = video_frame.copy()
-
- # Hide frame left and right borders before detection to ignore markers outside focus area
- cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width/6), int(video_frame.height)), (0, 0, 0), -1)
- cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - 1/6)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1)
-
- try:
-
- # Estimate scene pose from ArUco markers into frame.
- tvec, rmat, _ = ar_scene.estimate_pose(video_frame.matrix)
-
- # Catch exceptions raised by estimate_pose method
- except ArScene.PoseEstimationFailed as e:
-
- cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- else:
-
- video_frame = last_frame.copy()
-
- # Edit fake gaze position from pointer
- gaze_position = GazeFeatures.GazePosition(pointer, precision=2)
-
- # Copy video frame to edit visualisation on it with out disrupting aruco detection
- visu_frame = video_frame.copy()
-
- try:
-
- # Project AOI scene into frame according estimated pose
- aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV)
-
- # Catch exceptions raised by project method
- except ArScene.SceneProjectionFailed as e:
-
- cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Draw detected markers
- ar_scene.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- # Draw scene projection
- aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255))
-
- # Project 3D scene on each video frame and the visualisation frame
- if len(ar_scene.aruco_detector.detected_markers) > 0:
-
- # Write detected marker ids
- cv.putText(visu_frame.matrix, f'Detected markers: {list(ar_scene.aruco_detector.detected_markers.keys())}', (20, visu_frame.height - 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Update selected marker id by left_clicking on marker
- for (marker_id, marker) in ar_scene.aruco_detector.detected_markers.items():
-
- marker_aoi = marker.corners.reshape(4, 2).view(AOIFeatures.AreaOfInterest)
-
- if marker_aoi.contains_point(left_click):
-
- selected_marker_id = marker_id
-
- # If a marker is selected
- try:
-
- # Retreive marker index
- selected_marker = ar_scene.aruco_detector.detected_markers[selected_marker_id]
-
- marker_x, marker_y = selected_marker.center
- '''
- if right_button:
-
- pointer_delta_x, pointer_delta_y = (right_click[0] - marker_x) / (visu_frame.width/3), (marker_y - right_click[1]) / (visu_frame.width/3)
-
- if edit_trans:
-
- # Edit scene rotation
- if edit_coord == 0:
- aoi3D_scene_edit['rotation'] = numpy.array([pointer_delta_y, aoi3D_scene_edit['rotation'][1], aoi3D_scene_edit['rotation'][2]])
-
- elif edit_coord == 1:
- aoi3D_scene_edit['rotation'] = numpy.array([aoi3D_scene_edit['rotation'][0], pointer_delta_x, aoi3D_scene_edit['rotation'][2]])
-
- elif edit_coord == 2:
- aoi3D_scene_edit['rotation'] = numpy.array([aoi3D_scene_edit['rotation'][0], aoi3D_scene_edit['rotation'][1], -1*pointer_delta_y])
-
- else:
-
- # Edit scene translation
- if edit_coord == 0:
- aoi3D_scene_edit['translation'] = numpy.array([pointer_delta_x, aoi3D_scene_edit['translation'][1], aoi3D_scene_edit['translation'][2]])
-
- elif edit_coord == 1:
- aoi3D_scene_edit['translation'] = numpy.array([aoi3D_scene_edit['translation'][0], pointer_delta_y, aoi3D_scene_edit['translation'][2]])
-
- elif edit_coord == 2:
- aoi3D_scene_edit['translation'] = numpy.array([aoi3D_scene_edit['translation'][0], aoi3D_scene_edit['translation'][1], 2*pointer_delta_y])
- '''
- # Apply transformation
- aoi_scene_edited = ar_scene.aoi_scene#.transform(aoi3D_scene_edit['translation'], aoi3D_scene_edit['rotation'])
-
- cv.rectangle(visu_frame.matrix, (0, 130), (460, 450), (127, 127, 127), -1)
- '''
- # Write rotation matrix
- R, _ = cv.Rodrigues(aoi3D_scene_edit['rotation'])
- cv.putText(visu_frame.matrix, f'Rotation matrix:', (20, 160), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{R[0][0]:.3f} {R[0][1]:.3f} {R[0][2]:.3f}', (40, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{R[1][0]:.3f} {R[1][1]:.3f} {R[1][2]:.3f}', (40, 240), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{R[2][0]:.3f} {R[2][1]:.3f} {R[2][2]:.3f}', (40, 280), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA)
-
- # Write translation vector
- T = aoi3D_scene_edit['translation']
- cv.putText(visu_frame.matrix, f'Translation vector:', (20, 320), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{T[0]:.3f}', (40, 360), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{T[1]:.3f}', (40, 400), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{T[2]:.3f}', (40, 440), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA)
- '''
- # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it
- # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable.
- scene_projection_edited = aoi_scene_edited.project(selected_marker.translation, selected_marker.rotation, ar_scene.aruco_camera.K)
-
- # Draw aoi scene
- scene_projection_edited.draw_raycast(visu_frame.matrix, gaze_position)
-
- # Write warning related to marker pose processing
- except UserWarning as e:
-
- cv.putText(visu_frame.matrix, f'Marker {selected_marker_id}: {e}', (20, 120), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
-
- except KeyError:
-
- # Write error
- if selected_marker_id >= 0:
- cv.putText(visu_frame.matrix, f'Marker {selected_marker_id} not found', (20, 120), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
-
- # Draw focus area
- cv.rectangle(visu_frame.matrix, (int(visu_frame.width/6), 0), (int(visu_frame.width*(1-1/6)), int(visu_frame.height)), (255, 150, 150), 1)
-
- # Draw center
- cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1)
- cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1)
-
- # Draw pointer
- gaze_position.draw(visu_frame.matrix)
-
- # Write segment timing
- cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(visu_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Write selected marker id
- if selected_marker_id >= 0:
-
- cv.rectangle(visu_frame.matrix, (0, 50), (550, 90), (127, 127, 127), -1)
-
- # Select color
- if edit_coord == 0:
- color_axis = (0, 0, 255)
-
- elif edit_coord == 1:
- color_axis = (0, 255, 0)
-
- elif edit_coord == 2:
- color_axis = (255, 0, 0)
-
- if edit_trans:
- cv.putText(visu_frame.matrix, f'Rotate marker {selected_marker_id} around axis {edit_coord + 1}', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, color_axis, 1, cv.LINE_AA)
- else:
- cv.putText(visu_frame.matrix, f'Translate marker {selected_marker_id} along axis {edit_coord + 1}', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, color_axis, 1, cv.LINE_AA)
-
- # Write documentation
- else:
- cv.rectangle(visu_frame.matrix, (0, 50), (650, 250), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, f'> Left click on marker: select scene', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'> T: translate, R: rotate', (20, 120), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'> Shift + 0/1/2: select axis', (20, 160), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'> Right click and drag: edit axis', (20, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'> Ctrl + S: save scene', (20, 240), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Reset left_click
- left_click = (0, 0)
-
- if args.window:
-
- key_pressed = cv.waitKey(1)
-
- #if key_pressed != -1:
- # print(key_pressed)
-
- # Select previous frame with left arrow
- if key_pressed == 2:
- frame_index -= 1
-
- # Select next frame with right arrow
- if key_pressed == 3:
- frame_index += 1
-
- # Clip frame index
- if frame_index < 0:
- frame_index = 0
-
- # Edit rotation with r key
- if key_pressed == 114:
- edit_trans = True
-
- # Edit translation with t key
- if key_pressed == 116:
- edit_trans = False
-
- # Select coordinate to edit with Shift + 0, 1 or 2
- if key_pressed == 49 or key_pressed == 50 or key_pressed == 51:
- edit_coord = key_pressed - 49
-
- # Save selected marker edition using 'Ctrl + s'
- if key_pressed == 19:
-
- if selected_marker_id >= 0 and aoi3D_scene_edit != None:
-
- aoi_scene_filepath = args.marker_id_scene[f'{selected_marker_id}']
- aoi_scene_edited.save(aoi_scene_filepath)
-
- print(f'Saving scene related to marker #{selected_marker_id} into {aoi_scene_filepath}')
-
- # Close window using 'Esc' key
- if key_pressed == 27:
- break
-
- # Reload detecter configuration on 'c' key
- if key_pressed == 99:
- load_configuration_file()
- force_update = True
-
- # Display video
- cv.imshow(f'Segment {tobii_segment.id} ArUco marker editor', visu_frame.matrix)
-
- # Wait 1 second
- time.sleep(1)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_arscene_export.py b/src/argaze/utils/tobii_segment_arscene_export.py
deleted file mode 100644
index b2cc0e0..0000000
--- a/src/argaze/utils/tobii_segment_arscene_export.py
+++ /dev/null
@@ -1,306 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os, json
-import math
-import threading
-
-from argaze import *
-from argaze.TobiiGlassesPro2 import *
-from argaze.ArUcoMarkers import *
-from argaze.AreaOfInterest import *
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-
-def main():
- """
- Detect ArUcoScene into Tobii Glasses Pro 2 camera video record.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-p', '--env_path', metavar='ENVIRONMENT_PATH', type=str, default=None, help='json argaze environment filepath')
- parser.add_argument('-b', '--borders', metavar='BORDERS', type=float, default=16.666, help='define left and right borders mask (%) to not detect aruco out of these borders')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs')
- args = parser.parse_args()
-
- if args.segment_path != None:
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- aoi_json_filepath = f'{destination_path}/aoi.json'
- aoi_csv_filepath = f'{destination_path}/aoi.csv'
- aoi_mp4_filepath = f'{destination_path}/aoi.mp4'
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Load a tobii segment data
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'\nLoaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Access to video timestamp data buffer
- tobii_ts_vts = tobii_segment_data['VideoTimeStamp']
-
- # Access to timestamped gaze position data buffer
- tobii_ts_gaze_positions = tobii_segment_data['GazePosition']
-
- # Format tobii gaze position in pixel
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazePositions projection:', suffix = 'Complete', length = 100)
-
- for ts, tobii_gaze_position in tobii_ts_gaze_positions.items():
-
- # Update Progress Bar
- progress = ts - int(args.time_range[0] * 1e6)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'GazePositions projection:', suffix = 'Complete', length = 100)
-
- # Test gaze position validity
- if tobii_gaze_position.validity == 0:
-
- gaze_position_px = (int(tobii_gaze_position.value[0] * tobii_segment_video.width), int(tobii_gaze_position.value[1] * tobii_segment_video.height))
- ts_gaze_positions[ts] = GazeFeatures.GazePosition(gaze_position_px)
-
- print('\n')
-
- if args.debug:
-
- # Prepare video exportation at the same format than segment video
- output_video = TobiiVideo.TobiiVideoOutput(aoi_mp4_filepath, tobii_segment_video.stream)
-
- # Load ArEnvironment
- ar_env = ArFeatures.ArEnvironment.from_json(args.env_path)
-
- if args.debug:
- print(ar_env)
-
- # Work with first scene only
- _, ar_scene = next(iter(ar_env.items()))
-
- # Create timestamped buffer to store AOIs and primary time stamp offset
- ts_offset_aois = DataStructures.TimeStampedBuffer()
-
- # Video and data replay loop
- try:
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration/1e3, prefix = 'ArUco detection & AOI projection:', suffix = 'Complete', length = 100)
-
- # Iterate on video frames
- for video_ts, video_frame in tobii_segment_video.frames():
-
- # This video frame is the reference until the next frame
- # Here next frame is at + 40ms (25 fps)
- # TODO: Get video fps to adapt
- next_video_ts = video_ts + 40000
-
- # Copy video frame to edit visualisation on it without disrupting aruco detection
- visu_frame = video_frame.copy()
-
- # Prepare to store projected AOI
- projected_aois = {}
-
- # Process video and data frame
- try:
-
- # Get nearest video timestamp
- _, nearest_vts = tobii_ts_vts.get_last_before(video_ts)
-
- projected_aois['offset'] = nearest_vts.offset
-
- # Hide frame left and right borders before detection to ignore markers outside focus area
- cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width*args.borders/100), int(video_frame.height)), (0, 0, 0), -1)
- cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - args.borders/100)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1)
-
- # Detect aruco markers into frame
- ar_env.aruco_detector.detect_markers(video_frame.matrix)
-
- # Estimate markers poses
- ar_env.aruco_detector.estimate_markers_pose()
-
- # Estimate scene pose from ArUco markers into frame.
- tvec, rmat, _ = ar_scene.estimate_pose(ar_env.aruco_detector.detected_markers)
-
- # Project AOI scene into frame according estimated pose
- aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV)
-
- # Store all projected aoi
- for aoi_name in aoi_scene_projection.keys():
-
- projected_aois[aoi_name] = numpy.rint(aoi_scene_projection[aoi_name]).astype(int)
-
- if args.debug:
-
- # Draw detected markers
- ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- # Draw AOI
- aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255))
-
- # Catch exceptions raised by estimate_pose and project methods
- except (ArFeatures.PoseEstimationFailed, ArFeatures.SceneProjectionFailed) as e:
-
- if str(e) == 'Unconsistent marker poses':
-
- projected_aois['error'] = str(e) + ': ' + str(e.unconsistencies)
-
- else:
-
- projected_aois['error'] = str(e)
-
- if args.debug:
-
- # Draw detected markers
- ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Raised when timestamped buffer is empty
- except KeyError as e:
-
- e = 'VideoTimeStamp missing'
-
- projected_aois['offset'] = 0
- projected_aois['error'] = e
-
- if args.debug:
-
- cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
-
- # Store projected AOI
- ts_offset_aois[video_ts] = projected_aois
-
- if args.debug:
- # Draw gaze positions until next frame
- try:
-
- # Get next gaze position
- ts_start, start_gaze_position = ts_gaze_positions.first
- ts_next, next_gaze_position = ts_gaze_positions.first
-
- # Check next gaze position is not after next frame time
- while ts_next < next_video_ts:
-
- ts_start, start_gaze_position = ts_gaze_positions.pop_first()
- ts_next, next_gaze_position = ts_gaze_positions.first
-
- # Draw start gaze
- start_gaze_position.draw(visu_frame.matrix)
-
- if start_gaze_position.valid and next_gaze_position.valid:
-
- # Draw movement from start to next
- cv.line(visu_frame.matrix, start_gaze_position, next_gaze_position, (0, 255, 255), 1)
-
- if start_gaze_position.valid:
-
- # Write last start gaze position
- cv.putText(visu_frame.matrix, str(start_gaze_position.value), start_gaze_position.value, cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Write last start gaze position timing
- cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (31, 31, 31), -1)
- cv.putText(visu_frame.matrix, f'Gaze time: {ts_start*1e-3:.3f} ms', (20, 85), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Empty gaze position
- except IndexError:
- pass
-
- # Draw focus area
- cv.rectangle(visu_frame.matrix, (int(video_frame.width*args.borders/100.), 0), (int(visu_frame.width*(1-args.borders/100)), int(visu_frame.height)), (255, 150, 150), 1)
-
- # Draw center
- cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1)
- cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1)
-
- # Write segment timing
- cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(visu_frame.matrix, f'Video time: {video_ts*1e-3:.3f} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- if args.debug:
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- # Display visualisation
- cv.imshow(f'Segment {tobii_segment.id} ArUco AOI', visu_frame.matrix)
-
- # Write video
- output_video.write(visu_frame.matrix)
-
- # Update Progress Bar
- progress = video_ts*1e-3 - int(args.time_range[0] * 1e3)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration*1e-3, prefix = 'ArUco detection & AOI projection:', suffix = 'Complete', length = 100)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- if args.debug:
-
- # Stop frame display
- cv.destroyAllWindows()
-
- # End output video file
- output_video.close()
-
- # Print aruco detection metrics
- print('\n\nAruco marker detection metrics')
- try_count, detected_counts = ar_env.aruco_detector.detection_metrics
-
- for marker_id, detected_count in detected_counts.items():
- print(f'\tMarkers {marker_id} has been detected in {detected_count} / {try_count} frames ({round(100 * detected_count / try_count, 2)} %)')
-
- # Export aruco aoi data
- ts_offset_aois.to_json(aoi_json_filepath)
- ts_offset_aois.as_dataframe().to_csv(aoi_csv_filepath)
- print(f'Aruco AOI data saved into {aoi_json_filepath} and {aoi_csv_filepath}')
-
- # Notify when the aruco aoi video has been exported
- print(f'Aruco AOI video saved into {aoi_mp4_filepath}')
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_data_plot_export.py b/src/argaze/utils/tobii_segment_data_plot_export.py
deleted file mode 100644
index 69f88e3..0000000
--- a/src/argaze/utils/tobii_segment_data_plot_export.py
+++ /dev/null
@@ -1,141 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import json
-
-from argaze import DataStructures
-from argaze import GazeFeatures
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo
-from argaze.utils import MiscFeatures
-
-import pandas
-import matplotlib.pyplot as mpyplot
-import matplotlib.patches as mpatches
-
-def main():
- """
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- parser.add_argument('-r', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- args = parser.parse_args()
-
- if args.segment_path != None:
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- data_plots_filepath = f'{destination_path}/data_plot.svg'
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'Video properties:\n\tduration: {tobii_segment_video.duration / 1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Load a tobii segment data
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'Loaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Edit figure
- figure_width = min( 4 * tobii_segment_video.duration / 1e6, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
- data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
- figure = mpyplot.figure(figsize=(figure_width, 35), dpi=144)
-
- # Plot pupil diameter data
- subplot = figure.add_subplot(711)
- subplot.set_title('Pupil diameter', loc='left')
- subplot.set_ylim(0, 10)
- patches = tobii_segment_data['PupilDiameter'].plot(names=['value'], colors=['#FFD800'], samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Annotate events
- df_ts_events = tobii_segment_data['Event'].as_dataframe()
-
- if len(df_ts_events) > 0:
-
- for ts, event_type, event_tag in zip(df_ts_events.index, df_ts_events.type, df_ts_events.tag):
- subplot.annotate(f'{event_type}\n{event_tag}', xy=(ts, 7), horizontalalignment="left", verticalalignment="top")
- subplot.vlines(ts, 0, 6, color="tab:red", linewidth=1)
-
- # Plot pupil center data
- subplot = figure.add_subplot(712)
- subplot.set_title('Pupil center', loc='left')
- subplot.set_ylim(-40, -20)
- patches = tobii_segment_data['PupilCenter'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot gaze position data
- subplot = figure.add_subplot(713)
- subplot.set_title('Gaze position', loc='left')
- subplot.set_ylim(0., 1.)
- patches = tobii_segment_data['GazePosition'].plot(names=['x','y'], colors=['#276FB6','#9427B6'], split={'value':['x','y']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot gaze direction data
- subplot = figure.add_subplot(714)
- subplot.set_title('Gaze direction', loc='left')
- patches = tobii_segment_data['GazeDirection'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot gaze direction data
- subplot = figure.add_subplot(715)
- subplot.set_title('Gaze position 3D', loc='left')
- patches = tobii_segment_data['GazePosition3D'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot accelerometer data
- subplot = figure.add_subplot(716)
- subplot.set_title('Accelerometer', loc='left')
- patches = tobii_segment_data['Accelerometer'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot accelerometer data
- subplot = figure.add_subplot(717)
- subplot.set_title('Gyroscope', loc='left')
- patches = tobii_segment_data['Gyroscope'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Export figure
- mpyplot.tight_layout()
- mpyplot.savefig(data_plots_filepath)
- mpyplot.close('all')
-
- print(f'\nData plots saved into {data_plots_filepath}')
-
-if __name__ == '__main__':
-
- main()
diff --git a/src/argaze/utils/tobii_segment_display.py b/src/argaze/utils/tobii_segment_display.py
deleted file mode 100644
index 2e0dab5..0000000
--- a/src/argaze/utils/tobii_segment_display.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-
-from argaze import GazeFeatures
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiData, TobiiSpecifications
-from argaze.utils import MiscFeatures
-
-import numpy
-
-import cv2 as cv
-
-def main():
- """
- Display Tobii segment video and data
- """
-
- # manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- parser.add_argument('-r', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- args = parser.parse_args()
-
- if args.segment_path != None:
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'Video properties:\n\tduration: {tobii_segment_video.duration / 1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Load a tobii segment data
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'Loaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Access to timestamped gaze position data buffer
- tobii_ts_gaze_positions = tobii_segment_data['GazePosition']
-
- # Access to timestamped gaze position 3d data buffer
- tobii_ts_gaze_positions_3d = tobii_segment_data['GazePosition3D']
-
- # Access to timestamped head rotations data buffer
- tobii_ts_head_rotations = tobii_segment_data['Gyroscope']
-
- # Access to timestamped events data buffer
- tobii_ts_events = tobii_segment_data['Event']
-
- # Video and data replay loop
- try:
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration / 1e3, prefix = 'Video progression:', suffix = 'Complete', length = 100)
-
- # Iterate on video frames
- for video_ts, video_frame in tobii_segment_video.frames():
-
- video_ts_ms = video_ts / 1e3
-
- try:
-
- # Get nearest head rotation before video timestamp and remove all head rotations before
- _, nearest_head_rotation = tobii_ts_head_rotations.pop_last_before(video_ts)
-
- # Calculate head movement considering only head yaw and pitch
- head_movement = numpy.array(nearest_head_rotation.value)
- head_movement_px = head_movement.astype(int)
- head_movement_norm = numpy.linalg.norm(head_movement[0:2])
-
- # Draw movement vector
- cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_movement_px[1], int(video_frame.height/2) - head_movement_px[0]), (150, 150, 150), 3)
-
- # Wait for head rotation
- except KeyError:
- pass
-
- try:
-
- # Get nearest gaze position before video timestamp and remove all gaze positions before
- _, nearest_gaze_position = tobii_ts_gaze_positions.pop_last_before(video_ts)
-
- # Ignore frame when gaze position is not valid
- if nearest_gaze_position.validity == 0:
-
- gaze_position_pixel = GazeFeatures.GazePosition( (int(nearest_gaze_position.value[0] * video_frame.width), int(nearest_gaze_position.value[1] * video_frame.height)) )
-
- # Get nearest gaze position 3D before video timestamp and remove all gaze positions before
- _, nearest_gaze_position_3d = tobii_ts_gaze_positions_3d.pop_last_before(video_ts)
-
- # Ignore frame when gaze position 3D is not valid
- if nearest_gaze_position_3d.validity == 0:
-
- gaze_precision_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.PRECISION)) * nearest_gaze_position_3d.value[2]
- tobii_camera_hfov_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV / 2)) * nearest_gaze_position_3d.value[2]
-
- gaze_position_pixel.precision = round(video_frame.width * float(gaze_precision_mm) / float(tobii_camera_hfov_mm))
-
- # Draw gaze
- gaze_position_pixel.draw(video_frame.matrix)
-
- # Wait for gaze position
- except KeyError:
- pass
-
- try:
-
- # Get nearest event before video timestamp and remove all gaze positions before
- nearest_event_ts, nearest_event = tobii_ts_events.pop_last_before(video_ts)
-
- #print(nearest_event_ts / 1e3, nearest_event)
-
- # Write events
- cv.rectangle(video_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(video_frame.matrix, str(nearest_event), (20, 140), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Wait for events
- except KeyError:
- pass
-
- # Draw center
- cv.line(video_frame.matrix, (int(video_frame.width/2) - 50, int(video_frame.height/2)), (int(video_frame.width/2) + 50, int(video_frame.height/2)), (255, 150, 150), 1)
- cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2) - 50), (int(video_frame.width/2), int(video_frame.height/2) + 50), (255, 150, 150), 1)
-
- # Write segment timing
- cv.rectangle(video_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(video_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- cv.imshow(f'Segment {tobii_segment.id} video', video_frame.matrix)
-
- # Update Progress Bar
- progress = video_ts_ms - int(args.time_range[0] * 1e3)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration / 1e3, prefix = 'Video progression:', suffix = 'Complete', length = 100)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_gaze_metrics_export.py b/src/argaze/utils/tobii_segment_gaze_metrics_export.py
deleted file mode 100644
index 1e530e0..0000000
--- a/src/argaze/utils/tobii_segment_gaze_metrics_export.py
+++ /dev/null
@@ -1,242 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import math
-
-from argaze import DataStructures, GazeFeatures
-from argaze.AreaOfInterest import AOIFeatures
-from argaze.GazeAnalysis import DispersionBasedGazeMovementIdentifier
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-import pandas
-
-def main():
- """
- Analyse fixations and saccades
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='path to a tobii segment folder', required=True)
- parser.add_argument('-a', '--aoi', metavar='AOI_NAME', type=str, default=None, help='aoi name where to project gaze', required=True)
- parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-p', '--period', metavar=('PERIOD_TIME'), type=float, default=10, help='period of time (in second)')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- args = parser.parse_args()
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}/{args.aoi}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- positions_json_filepath = f'{destination_path}/gaze_positions.json'
-
- fixations_json_filepath = f'{destination_path}/gaze_fixations.json'
- saccades_json_filepath = f'{destination_path}/gaze_saccades.json'
- gaze_status_json_filepath = f'{destination_path}/gaze_status.json'
-
- gaze_metrics_period_filepath = f'{destination_path}/gaze_metrics_{int(args.period)}s.csv'
- gaze_metrics_whole_filepath = f'{destination_path}/gaze_metrics.csv'
-
- # Load gaze positions
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions.from_json(positions_json_filepath)
-
- # Load gaze movements
- ts_fixations = GazeFeatures.TimeStampedGazeMovements.from_json(fixations_json_filepath)
- ts_saccades = GazeFeatures.TimeStampedGazeMovements.from_json(saccades_json_filepath)
- ts_status = GazeFeatures.TimeStampedGazeStatus.from_json(gaze_status_json_filepath)
-
- print(f'\nLoaded gaze movements count:')
- print(f'\tFixations: {len(ts_fixations)}')
- print(f'\tSaccades: {len(ts_saccades)}')
-
- # Load tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Get participant name
- participant_name = TobiiEntities.TobiiParticipant(f'{args.segment_path}/../../').name
-
- print(f'\nParticipant: {participant_name}')
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration * 1e-6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Prepare gaze metrics
- ts_metrics = DataStructures.TimeStampedBuffer()
-
- positions_exist = len(ts_gaze_positions) > 0
- fixations_exist = len(ts_fixations) > 0
- saccades_exist = len(ts_saccades) > 0
- status_exist = len(ts_status) > 0
-
- if positions_exist:
-
- # Create pandas dataframe
- positions_dataframe = ts_gaze_positions.as_dataframe()
-
- # Reset time range offset
- positions_dataframe.index = positions_dataframe.index - positions_dataframe.index[0]
-
- if fixations_exist:
-
- # Create pandas dataframe
- fixations_dataframe = ts_fixations.as_dataframe()
-
- # Reset time range offset
- fixations_dataframe.index = fixations_dataframe.index - fixations_dataframe.index[0]
-
- # Add 'end' column
- fixations_dataframe['end'] = fixations_dataframe.index + fixations_dataframe.duration
-
- if saccades_exist:
-
- # Create pandas dataframe
- saccades_dataframe = ts_saccades.as_dataframe()
-
- # Reset time range offset
- saccades_dataframe.index = saccades_dataframe.index - saccades_dataframe.index[0]
-
- # Add 'end' column
- saccades_dataframe['end'] = saccades_dataframe.index + saccades_dataframe.duration
-
- # Define a function to export metrics for a period of time
- def metrics_for_period(period_start_ts, period_end_ts):
-
- period_duration = period_end_ts - period_start_ts
- period_metrics = {}
-
- #print(f'\n*** Anaysing period n°{i} [{period_start_ts * 1e-6:.3f}s, {period_end_ts * 1e-6:.3f}s]')
-
- # Store period duration
- period_metrics['duration (ms)'] = period_duration * 1e-3
-
- # Default positions analysis
- period_metrics['positions_number'] = 0
- period_metrics['positions_valid_ratio (%)'] = None
-
- # Analyse fixations
- if positions_exist:
-
- # Select period
- positions_period_dataframe = positions_dataframe[(positions_dataframe.index >= period_start_ts) & (positions_dataframe.index < period_end_ts)]
-
- if not positions_period_dataframe.empty:
-
- #print('\n* Positions:\n', positions_period_dataframe)
-
- period_metrics['positions_number'] = positions_period_dataframe.shape[0]
- period_metrics['positions_valid_ratio (%)'] = positions_period_dataframe.precision.count() / positions_period_dataframe.shape[0] * 100
-
- # Default fixation analysis
- fixations_duration_sum = 0.0
- period_metrics['fixations_number'] = 0
- period_metrics['fixations_duration_mean (ms)'] = None
- period_metrics['fixations_duration_sum (ms)'] = None
- period_metrics['fixations_duration_ratio (%)'] = None
- period_metrics['fixations_deviation_mean (px)'] = None
-
- # Analyse fixations
- if fixations_exist:
-
- # Select period
- fixations_period_dataframe = fixations_dataframe[(fixations_dataframe.index >= period_start_ts) & (fixations_dataframe.end < period_end_ts)]
-
- if not fixations_period_dataframe.empty:
-
- #print('\n* Fixations:\n', fixations_period_dataframe)
-
- fixations_duration_sum = fixations_period_dataframe.duration.sum()
- period_metrics['fixations_number'] = fixations_period_dataframe.shape[0]
- period_metrics['fixations_duration_mean (ms)'] = fixations_period_dataframe.duration.mean() * 1e-3
- period_metrics['fixations_duration_sum (ms)'] = fixations_duration_sum * 1e-3
- period_metrics['fixations_duration_ratio (%)'] = fixations_duration_sum / period_duration * 100
- period_metrics['fixations_deviation_mean (px)'] = fixations_period_dataframe.deviation_max.mean()
-
- # Default saccades analysis
- saccades_duration_sum = 0.0
- period_metrics['saccades_number'] = 0
- period_metrics['saccades_duration_mean (ms)'] = None
- period_metrics['saccades_duration_sum (ms)'] = None
- period_metrics['saccades_duration_ratio (%)'] = None
- period_metrics['saccades_distance_mean (px)'] = None
-
- # Analyse saccades
- if saccades_exist:
-
- # Select period
- saccades_period_dataframe = saccades_dataframe[(saccades_dataframe.index >= period_start_ts) & (saccades_dataframe.end < period_end_ts)]
-
- if not saccades_period_dataframe.empty:
-
- #print('\n* Saccades:\n', saccades_period_dataframe)
-
- saccades_duration_sum = saccades_period_dataframe.duration.sum()
- period_metrics['saccades_number'] = saccades_period_dataframe.shape[0]
- period_metrics['saccades_duration_mean (ms)'] = saccades_period_dataframe.duration.mean() * 1e-3
- period_metrics['saccades_duration_sum (ms)'] = saccades_duration_sum * 1e-3
- period_metrics['saccades_duration_ratio (%)'] = saccades_duration_sum / period_duration * 100
- period_metrics['saccades_distance_mean (px)'] = saccades_period_dataframe.distance.mean()
-
- # Analyse exploit/explore
- if saccades_duration_sum != 0.0:
-
- period_metrics['exploit_explore_ratio'] = fixations_duration_sum / saccades_duration_sum
-
- else:
-
- period_metrics['exploit_explore_ratio'] = None
-
- # Append period metrics
- ts_metrics[int(period_start_ts * 1e-3)] = period_metrics
-
- # Metrics for each period
- for i in range(0, int(tobii_segment_video.duration/(args.period * 1e6))):
-
- period_start_ts = i*(args.period * 1e6)
- period_end_ts = (i+1)*(args.period * 1e6)
-
- metrics_for_period(period_start_ts, period_end_ts)
-
- metrics_dataframe = ts_metrics.as_dataframe() #pandas.DataFrame(metrics, index=[participant_name])
- metrics_dataframe.to_csv(gaze_metrics_period_filepath, index=True)
- print(f'\nGaze metrics per period of time saved into {gaze_metrics_period_filepath}')
-
- # Metrics for the whole session
- ts_metrics = DataStructures.TimeStampedBuffer()
- metrics_for_period(0, tobii_segment_video.duration)
-
- metrics_dataframe = ts_metrics.as_dataframe() #pandas.DataFrame(metrics, index=[participant_name])
- metrics_dataframe.to_csv(gaze_metrics_whole_filepath, index=True)
- print(f'\nGaze metrics for whole segment saved into {gaze_metrics_whole_filepath}\n')
-
-if __name__ == '__main__':
-
- main()
diff --git a/src/argaze/utils/tobii_segment_gaze_movements_export.py b/src/argaze/utils/tobii_segment_gaze_movements_export.py
deleted file mode 100644
index 85fe74b..0000000
--- a/src/argaze/utils/tobii_segment_gaze_movements_export.py
+++ /dev/null
@@ -1,570 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import math
-
-from argaze import DataStructures, GazeFeatures
-from argaze.AreaOfInterest import AOIFeatures
-from argaze.GazeAnalysis import DispersionBasedGazeMovementIdentifier
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-import pandas
-
-def main():
- """
- Project gaze positions into an AOI and identify particular gaze movements like fixations and saccades
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='path to a tobii segment folder', required=True)
- parser.add_argument('-a', '--aoi', metavar='AOI_NAME', type=str, default=None, help='aoi name where to project gaze', required=True)
- parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-dev', '--deviation_max_threshold', metavar='DISPERSION_THRESHOLD', type=int, default=None, help='maximal distance for fixation identification in pixel')
- parser.add_argument('-dmin', '--duration_min_threshold', metavar='DURATION_MIN_THRESHOLD', type=int, default=200, help='minimal duration for fixation identification in millisecond')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs')
- args = parser.parse_args()
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}/{args.aoi}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- aoi_filepath = f'{destination_path}/../aoi.json'
-
- positions_json_filepath = f'{destination_path}/gaze_positions.json'
-
- fixations_json_filepath = f'{destination_path}/gaze_fixations.json'
- saccades_json_filepath = f'{destination_path}/gaze_saccades.json'
- movements_json_filepath = f'{destination_path}/gaze_movements.json'
- gaze_status_json_filepath = f'{destination_path}/gaze_status.json'
-
- gaze_status_video_filepath = f'{destination_path}/gaze_status.mp4'
- gaze_status_image_filepath = f'{destination_path}/gaze_status.png'
-
- # Load aoi scene projection
- ts_aois_projections = DataStructures.TimeStampedBuffer.from_json(aoi_filepath)
-
- print(f'\nAOI frames: ', len(ts_aois_projections))
- aoi_names = ts_aois_projections.as_dataframe().drop(['offset','error'], axis=1).columns
- for aoi_name in aoi_names:
- print(f'\t{aoi_name}')
-
- # Load tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Get participant name
- participant_name = TobiiEntities.TobiiParticipant(f'{args.segment_path}/../../').name
-
- print(f'\nParticipant: {participant_name}')
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Check that gaze positions have already been exported to not process them again
- if os.path.exists(positions_json_filepath):
-
- # Load gaze positions
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions.from_json(positions_json_filepath)
-
- print(f'\nLoaded gaze positions count:')
- print(f'\tPositions: {len(ts_gaze_positions)}')
-
- invalid_gaze_position_count = 0
- inner_precisions_px = []
-
- for ts, gaze_position in ts_gaze_positions.items():
-
- if not gaze_position.valid:
-
- invalid_gaze_position_count += 1
-
- else:
-
- inner_precisions_px.append(gaze_position.precision)
-
- print(f'\tInvalid positions: {invalid_gaze_position_count}/{len(ts_gaze_positions)} ({100*invalid_gaze_position_count/len(ts_gaze_positions):.2f} %)')
-
- inner_precision_px_mean = round(numpy.mean(inner_precisions_px))
- print(f'\tMean of projected precisions: {inner_precision_px_mean} px')
-
- # Project gaze positions into the selected AOI
- else:
-
- # Load a tobii segment data
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'\nLoaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Access to timestamped gaze position data buffer
- tobii_ts_gaze_positions = tobii_segment_data['GazePosition']
-
- # Access to timestamped gaze 3D positions data buffer
- tobii_ts_gaze_positions_3d = tobii_segment_data['GazePosition3D']
-
- # Format tobii gaze position and precision in pixel and project it in aoi scene
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # Gaze projection metrics
- ts_projection_metrics = DataStructures.TimeStampedBuffer()
- invalid_gaze_position_count = 0
- inner_precisions_px = []
-
- # Starting with no AOI projection
- ts_current_aoi = 0
- current_aoi = AOIFeatures.AreaOfInterest()
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazePositions projection:', suffix = 'Complete', length = 100)
-
- for ts, tobii_gaze_position in tobii_ts_gaze_positions.items():
-
- # Update Progress Bar
- progress = ts - int(args.time_range[0] * 1e6)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'GazePositions projection:', suffix = 'Complete', length = 100)
-
- # Edit default aoi error
- current_aoi_error = 'No available AOI projection'
-
- try:
-
- # Get the last aoi projection until the current gaze position timestamp
- ts_current_aois, current_aois = ts_aois_projections.pop_last_until(ts)
-
- assert(ts_current_aois <= ts)
-
- # Catch aoi error to not update current aoi
- if 'error' in current_aois.keys():
-
- # Remove extra error info after ':'
- current_aoi_error = current_aois.pop('error').split(':')[0]
-
- # Or update current aoi
- elif args.aoi in current_aois.keys():
-
- ts_current_aoi = ts_current_aois
- current_aoi = AOIFeatures.AreaOfInterest(current_aois.pop(args.aoi))
-
- current_aoi_error = ''
-
- # No aoi projection at the beginning
- except KeyError as e:
- pass
-
- # Wait for available aoi
- if current_aoi.empty:
-
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(current_aoi_error)
- invalid_gaze_position_count += 1
- continue
-
- # QUESTION: What todo if the current aoi is too old ?
- # if the aoi didn't move it is not a problem...
- # For the moment, we avoid 1s old aoi and we provide a metric to assess the problem
- ts_difference = ts - ts_current_aoi
-
- # If aoi is not updated after the
- if ts_difference >= args.duration_min_threshold*1e3:
-
- current_aoi = AOIFeatures.AreaOfInterest()
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition('AOI projection is too old (> 1s)')
- invalid_gaze_position_count += 1
- continue
-
- ts_projection_metrics[ts] = {'frame': ts_current_aois, 'age': ts_difference}
-
- # Test gaze position validity
- if tobii_gaze_position.validity == 0:
-
- gaze_position_px = (int(tobii_gaze_position.value[0] * tobii_segment_video.width), int(tobii_gaze_position.value[1] * tobii_segment_video.height))
-
- # Get gaze position 3D at same gaze position timestamp
- tobii_gaze_position_3d = tobii_ts_gaze_positions_3d.pop(ts)
-
- # Test gaze position 3d validity
- if tobii_gaze_position_3d.validity == 0:
-
- gaze_precision_mm = numpy.sin(numpy.deg2rad(TobiiSpecifications.PRECISION)) * tobii_gaze_position_3d.value[2]
- tobii_camera_hfov_mm = numpy.sin(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV)) * tobii_gaze_position_3d.value[2]
-
- gaze_precision_px = round(tobii_segment_video.width * float(gaze_precision_mm) / float(tobii_camera_hfov_mm))
-
- # Edit gaze position
- gaze_position = GazeFeatures.GazePosition(gaze_position_px)
-
- # Project gaze position into selected aois
- if current_aoi.contains_point(gaze_position.value):
-
- inner_x, inner_y = current_aoi.inner_axis(gaze_position.value)
- inner_precision_px = gaze_precision_px * tobii_segment_video.width * tobii_segment_video.height / current_aoi.area
-
- # Store inner precision for metrics
- inner_precisions_px.append(inner_precision_px)
-
- # Store inner gaze position for further movement processing
- # TEMP: 1920x1080 are Screen_Plan dimensions
- ts_gaze_positions[ts] = GazeFeatures.GazePosition((round(inner_x*1920), round((1.0 - inner_y)*1080)), precision=inner_precision_px)
-
- else:
-
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(f'GazePosition not inside {args.aoi}')
- invalid_gaze_position_count += 1
-
- else:
-
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(f'Invalid Tobii GazePosition3D')
- invalid_gaze_position_count += 1
-
- else:
-
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(f'Invalid Tobii GazePosition')
- invalid_gaze_position_count += 1
-
- print(f'\nGazePositions projection metrics:')
-
- print(f'\tInvalid positions: {invalid_gaze_position_count}/{len(tobii_ts_gaze_positions)} ({100*invalid_gaze_position_count/len(tobii_ts_gaze_positions):.2f} %)')
-
- if len(ts_projection_metrics):
-
- projection_metrics_dataframe = ts_projection_metrics.as_dataframe()
- print(f'\tAOI age mean: {projection_metrics_dataframe.age.mean() * 1e-3:.3f} ms')
- print(f'\tAOI age max: {projection_metrics_dataframe.age.max() * 1e-3:.3f} ms')
-
- inner_precision_px_mean = round(numpy.mean(inner_precisions_px))
- print(f'\tMean of projected precisions: {inner_precision_px_mean} px')
-
- else:
-
- print(print(f'\t no AOI projected'))
-
- ts_gaze_positions.to_json(positions_json_filepath)
- print(f'\nProjected gaze positions saved into {positions_json_filepath}')
-
- print(f'\nGazeMovement identifier setup:')
-
- if args.deviation_max_threshold == None:
-
- selected_deviation_max_threshold = inner_precision_px_mean
- print(f'\tDispersion threshold: {selected_deviation_max_threshold} px (equal to mean of projected precisions)')
-
- else:
-
- selected_deviation_max_threshold = args.deviation_max_threshold
- print(f'\tDispersion threshold: {selected_deviation_max_threshold} px')
-
- print(f'\tDuration threshold: {args.duration_min_threshold} ms')
-
- movement_identifier = DispersionBasedGazeMovementIdentifier.GazeMovementIdentifier(selected_deviation_max_threshold, args.duration_min_threshold*1e3)
-
- # Start movement identification
- ts_fixations = GazeFeatures.TimeStampedGazeMovements()
- ts_saccades = GazeFeatures.TimeStampedGazeMovements()
- ts_status = GazeFeatures.TimeStampedGazeStatus()
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazeMovements identification:', suffix = 'Complete', length = 100)
-
- for ts, gaze_position in ts_gaze_positions.items():
-
- gaze_movement = movement_identifier.identify(ts, gaze_position)
-
- if isinstance(gaze_movement, DispersionBasedGazeMovementIdentifier.Fixation):
-
- start_ts, start_position = gaze_movement.positions.first
-
- ts_fixations[start_ts] = gaze_movement
-
- for ts, position in gaze_movement.positions.items():
-
- ts_status[ts] = GazeFeatures.GazeStatus.from_position(position, 'Fixation', len(ts_fixations))
-
- elif isinstance(gaze_movement, DispersionBasedGazeMovementIdentifier.Saccade):
-
- start_ts, start_position = gaze_movement.positions.first
-
- ts_saccades[start_ts] = gaze_movement
-
- for ts, position in gaze_movement.positions.items():
-
- ts_status[ts] = GazeFeatures.GazeStatus.from_position(position, 'Saccade', len(ts_saccades))
-
- # Update Progress Bar
- progress = ts - int(args.time_range[0] * 1e6)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze movements identification:', suffix = 'Complete', length = 100)
-
- print(f'\nGazeMovements identification metrics:')
- print(f'\t{len(ts_fixations)} fixations found')
- print(f'\t{len(ts_saccades)} saccades found')
-
- ts_fixations.to_json(fixations_json_filepath)
- print(f'\nGaze fixations saved into {fixations_json_filepath}')
-
- ts_saccades.to_json(saccades_json_filepath)
- print(f'Gaze saccades saved into {saccades_json_filepath}')
-
- ts_status.to_json(gaze_status_json_filepath)
- print(f'Gaze status saved into {gaze_status_json_filepath}')
-
- # DEBUG
- ts_status.as_dataframe().to_csv(f'{destination_path}/gaze_status.csv')
-
- # Edit data visualisation
- if args.debug:
-
- # Prepare video exportation at the same format than segment video
- output_video = TobiiVideo.TobiiVideoOutput(gaze_status_video_filepath, tobii_segment_video.stream)
-
- # Reload aoi scene projection
- ts_aois_projections = DataStructures.TimeStampedBuffer.from_json(aoi_filepath)
-
- # Prepare gaze satus image
- gaze_status_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8)
-
- # Video loop
- try:
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGaze status video processing:', suffix = 'Complete', length = 100)
-
- fixations_exist = len(ts_fixations) > 0
- saccades_exist = len(ts_saccades) > 0
- status_exist = len(ts_status) > 0
-
- if fixations_exist:
- current_fixation_ts, current_fixation = ts_fixations.pop_first()
- current_fixation_time_counter = 0
-
- if saccades_exist:
- current_saccade_ts, current_saccade = ts_saccades.pop_first()
-
- # Iterate on video frames
- for video_ts, video_frame in tobii_segment_video.frames():
-
- # This video frame is the reference until the next frame
- # Here next frame is at + 40ms (25 fps)
- # TODO: Get video fps to adapt
- next_video_ts = video_ts + 40000
-
- visu_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8)
-
- try:
-
- # Get current aoi projection at video frame time
- ts_current_aois, current_aois = ts_aois_projections.pop_first()
-
- assert(ts_current_aois == video_ts)
-
- # Catch aoi error to not update current aoi
- if 'error' in current_aois.keys():
-
- # Display error (remove extra info after ':')
- current_aoi_error = current_aois.pop('error').split(':')[0]
-
- # Select color error
- if current_aoi_error == 'VideoTimeStamp missing':
- color_error = (0, 0, 255)
- else:
- color_error = (0, 255, 255)
-
- cv.rectangle(visu_matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_matrix, current_aoi_error, (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA)
-
- # Or update current aoi
- elif args.aoi in current_aois.keys():
-
- ts_current_aoi = ts_current_aois
- current_aoi = AOIFeatures.AreaOfInterest(current_aois.pop(args.aoi))
-
- # Apply perspective transform algorithm
- destination = numpy.float32([[0, 1080],[1920, 1080],[1920, 0],[0, 0]])
- aoi_matrix = cv.getPerspectiveTransform(current_aoi.astype(numpy.float32), destination)
- visu_matrix = cv.warpPerspective(video_frame.matrix, aoi_matrix, (1920, 1080))
-
- # Wait for aois projection
- except KeyError:
- pass
-
- if fixations_exist:
-
- # Check next fixation
- if video_ts >= current_fixation_ts + current_fixation.duration and len(ts_fixations) > 0:
-
- current_fixation_ts, current_fixation = ts_fixations.pop_first()
- current_fixation_time_counter = 0
-
- # While current time belongs to the current fixation
- if video_ts >= current_fixation_ts and video_ts < current_fixation_ts + current_fixation.duration:
-
- current_fixation_time_counter += 1
-
- # Draw current fixation
- cv.circle(visu_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 255, 0), current_fixation_time_counter)
- cv.circle(gaze_status_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 155, 0))
-
- if saccades_exist:
-
- # Check next saccade
- if video_ts >= current_saccade_ts + current_saccade.duration and len(ts_saccades) > 0:
-
- current_saccade_ts, current_saccade = ts_saccades.pop_first()
-
- # While current time belongs to the current saccade
- if video_ts >= current_saccade_ts and video_ts < current_saccade_ts + current_saccade.duration:
- pass
-
- # Draw gaze status until next frame
- try:
-
- # Get next gaze status
- ts_start, start_gaze_status = ts_status.first
- ts_next, next_gaze_status = ts_status.first
-
- # Check next gaze status is not after next frame time
- while ts_next < next_video_ts:
-
- ts_start, start_gaze_status = ts_status.pop_first()
- ts_next, next_gaze_status = ts_status.first
-
- # Draw movement type
- if start_gaze_status.valid and next_gaze_status.valid \
- and start_gaze_status.movement_index == next_gaze_status.movement_index \
- and start_gaze_status.movement_type == next_gaze_status.movement_type:
-
- if next_gaze_status.movement_type == 'Fixation':
- movement_color = (0, 255, 0)
- elif next_gaze_status.movement_type == 'Saccade':
- movement_color = (0, 0, 255)
- else:
- movement_color = (255, 0, 0)
-
- cv.line(visu_matrix, start_gaze_status, next_gaze_status, movement_color, 3)
- cv.line(gaze_status_matrix, start_gaze_status, next_gaze_status, movement_color, 3)
-
- # Empty gaze position
- except IndexError:
- pass
-
- # Draw gaze positions until next frame
- try:
-
- # Get next gaze position
- ts_start, start_gaze_position = ts_gaze_positions.first
- ts_next, next_gaze_position = ts_gaze_positions.first
-
- # Gaze position count
- gaze_position_count = 0
-
- # Check next gaze position is not after next frame time
- while ts_next < next_video_ts:
-
- ts_start, start_gaze_position = ts_gaze_positions.pop_first()
- ts_next, next_gaze_position = ts_gaze_positions.first
-
- if not start_gaze_position.valid:
-
- # Select color error
- if start_gaze_position.message == 'VideoTimeStamp missing':
- color_error = (0, 0, 255)
- else:
- color_error = (0, 255, 255)
-
- # Write unvalid error message
- cv.putText(visu_matrix, f'{ts_start*1e-3:.3f} ms: {start_gaze_position.message}', (20, 1060 - (gaze_position_count)*50), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA)
-
- # Draw start gaze
- start_gaze_position.draw(visu_matrix, draw_precision=False)
- start_gaze_position.draw(gaze_status_matrix, draw_precision=False)
-
- if start_gaze_position.valid and next_gaze_position.valid:
-
- # Draw movement from start to next
- cv.line(visu_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1)
- cv.line(gaze_status_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1)
-
- gaze_position_count += 1
-
- if start_gaze_position.valid:
-
- # Write last start gaze position
- cv.putText(visu_matrix, str(start_gaze_position.value), start_gaze_position.value, cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Write last start gaze position timing
- cv.rectangle(visu_matrix, (0, 50), (550, 100), (31, 31, 31), -1)
- cv.putText(visu_matrix, f'Gaze time: {ts_start*1e-3:.3f} ms', (20, 85), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Empty gaze position
- except IndexError:
- pass
-
- # Write segment timing
- cv.rectangle(visu_matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(visu_matrix, f'Video time: {video_ts*1e-3:.3f} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Write movement identification parameters
- cv.rectangle(visu_matrix, (0, 150), (550, 310), (63, 63, 63), -1)
- cv.putText(visu_matrix, f'Deviation max: {selected_deviation_max_threshold} px', (20, 210), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_matrix, f'Duration min: {args.duration_min_threshold} ms', (20, 270), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Draw dispersion threshold circle
- cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), 2, (0, 255, 255), -1)
- cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), selected_deviation_max_threshold, (255, 150, 150), 1)
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- # Display video
- cv.imshow(f'Segment {tobii_segment.id} movements', visu_matrix)
-
- # Write video
- output_video.write(visu_matrix)
-
- # Update Progress Bar
- progress = video_ts - int(args.time_range[0] * 1e6)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze status video processing:', suffix = 'Complete', length = 100)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Saving gaze status image
- cv.imwrite(gaze_status_image_filepath, gaze_status_matrix)
-
- # End output video file
- output_video.close()
- print(f'\nGaze status video saved into {gaze_status_video_filepath}\n')
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_record.py b/src/argaze/utils/tobii_segment_record.py
deleted file mode 100644
index 25066c4..0000000
--- a/src/argaze/utils/tobii_segment_record.py
+++ /dev/null
@@ -1,96 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import threading
-import time
-import random
-
-from argaze.TobiiGlassesPro2 import TobiiController
-from argaze.utils import MiscFeatures
-
-def main():
- """
- Record a Tobii Glasses Pro 2 session on Tobii interface's SD Card
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default='192.168.1.10', help='tobii glasses ip')
- parser.add_argument('-p', '--project_name', metavar='PROJECT_NAME', type=str, default=TobiiController.DEFAULT_PROJECT_NAME, help='project name')
- parser.add_argument('-u', '--participant_name', metavar='PARTICIPANT_NAME', type=str, default=TobiiController.DEFAULT_PARTICIPANT_NAME, help='participant name')
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(ip_address = args.tobii_ip, project_name = args.project_name, participant_name = args.participant_name)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Setup camera at 25 fps to work on Full HD video stream
- tobii_controller.set_scene_camera_freq_25()
-
- # Print current confirugration
- print(f'Tobii Glasses Pro 2 configuration:')
- for key, value in tobii_controller.get_configuration().items():
- print(f'\t{key}: {value}')
-
- # Calibrate tobii glasses
- tobii_controller.calibrate()
-
- # Create recording
- recording_id = tobii_controller.create_recording(args.participant_name)
-
- # Start recording
- tobii_controller.start_recording(recording_id)
- print(f'Recording {recording_id} started')
-
- # Define loop
- last_battery_level = 0
- time_count = 0
-
- exit = MiscFeatures.ExitSignalHandler()
- print('Waiting for Ctrl+C to quit...\n')
-
- while not exit.status():
-
- # Print storage info each minutes
- if time_count % 60 == 0:
-
- print(tobii_controller.get_storage_info())
-
- # print battery level each time it changes
- # send it as experimental variable
- battery_level = tobii_controller.get_battery_level()
- if battery_level != last_battery_level:
-
- print(tobii_controller.get_battery_info())
-
- tobii_controller.send_variable('battery', battery_level)
-
- last_battery_level = battery_level
-
- # send random event each 3 - 10 seconds
- if time_count % random.randint(3, 10) == 0:
-
- print('Send random event')
-
- tobii_controller.send_event('random')
-
- # Sleep 1 second
- time.sleep(1)
- time_count += 1
-
- # Stop recording
- tobii_controller.stop_recording(recording_id)
- print(f'Recording {recording_id} stopped')
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_stream_arscene_display.py b/src/argaze/utils/tobii_stream_arscene_display.py
deleted file mode 100644
index e7a3bfb..0000000
--- a/src/argaze/utils/tobii_stream_arscene_display.py
+++ /dev/null
@@ -1,154 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os, json
-
-from argaze import *
-from argaze.TobiiGlassesPro2 import *
-from argaze.ArUcoMarkers import *
-from argaze.AreaOfInterest import *
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-
-def main():
- """
- Detect ArUcoScene into Tobii Glasses Pro 2 camera video stream.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
- parser.add_argument('-p', '--env_path', metavar='ENVIRONMENT_PATH', type=str, default=None, help='json argaze environment filepath')
- parser.add_argument('-b', '--borders', metavar='BORDERS', type=float, default=16.666, help='define left and right borders mask (%) to not detect aruco out of these borders')
- parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs')
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(args.tobii_ip)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Setup camera at 25 fps to work on Full HD video stream
- tobii_controller.set_scene_camera_freq_25()
-
- # Print current confirugration
- print(f'Tobii Glasses Pro 2 configuration:')
- for key, value in tobii_controller.get_configuration().items():
- print(f'\t{key}: {value}')
-
- # Enable tobii data stream
- tobii_data_stream = tobii_controller.enable_data_stream()
-
- # Enable tobii video stream
- tobii_video_stream = tobii_controller.enable_video_stream()
-
- # Load ArEnvironment
- ar_env = ArFeatures.ArEnvironment.from_json(args.env_path)
-
- if args.debug:
- print(ar_env)
-
- # Work with first scene only
- _, ar_scene = next(iter(ar_env.items()))
-
- # Start streaming
- tobii_controller.start_streaming()
-
- # Live video stream capture loop
- try:
-
- # Assess loop performance
- loop_chrono = MiscFeatures.TimeProbe()
- fps = 0
-
- while tobii_video_stream.is_alive():
-
- # Read video stream
- video_ts, video_frame = tobii_video_stream.read()
-
- # Copy video frame to edit visualisation on it without disrupting aruco detection
- visu_frame = video_frame.copy()
-
- # Hide frame left and right borders before detection to ignore markers outside focus area
- cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width*args.borders/100), int(video_frame.height)), (0, 0, 0), -1)
- cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - args.borders/100)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1)
-
- # Process video and data frame
- try:
-
- # Detect aruco markers into frame
- ar_env.aruco_detector.detect_markers(video_frame.matrix)
-
- # Estimate markers poses
- ar_env.aruco_detector.estimate_markers_pose()
-
- # Estimate scene pose from ArUco markers into frame.
- tvec, rmat, _ = ar_scene.estimate_pose(ar_env.aruco_detector.detected_markers)
-
- # Project AOI scene into frame according estimated pose
- aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV)
-
- # Draw scene axis
- ar_scene.draw_axis(visu_frame.matrix)
-
- # Draw scene places
- ar_scene.draw_places(visu_frame.matrix)
-
- # Draw AOI
- aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255))
-
- # Draw detected markers
- ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- # Catch exceptions raised by estimate_pose and project methods
- except (ArFeatures.PoseEstimationFailed, ArFeatures.SceneProjectionFailed) as e:
-
- # Draw detected markers
- ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Assess loop performance
- lap_time, lap_counter, elapsed_time = loop_chrono.lap()
-
- # Update fps each 10 loops
- if lap_counter >= 10:
-
- fps = 1e3 * lap_counter / elapsed_time
- loop_chrono.restart()
-
- # Write stream timing
- cv.rectangle(visu_frame.matrix, (0, 0), (700, 50), (63, 63, 63), -1)
- cv.putText(visu_frame.matrix, f'Video stream time: {int(video_ts*1e-3)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'Fps: {int(fps)}', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- cv.imshow(f'Stream ArUco AOI', visu_frame.matrix)
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
- # Stop streaming
- tobii_controller.stop_streaming()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_stream_display.py b/src/argaze/utils/tobii_stream_display.py
deleted file mode 100644
index b979e03..0000000
--- a/src/argaze/utils/tobii_stream_display.py
+++ /dev/null
@@ -1,218 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-
-from argaze import DataStructures, GazeFeatures
-from argaze.TobiiGlassesPro2 import *
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-
-def main():
- """
- Capture video camera and gaze data streams and synchronise them.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
- parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath')
-
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(args.tobii_ip)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Setup camera at 25 fps to work on Full HD video stream
- tobii_controller.set_scene_camera_freq_25()
-
- # Print current confirugration
- print(f'Tobii Glasses Pro 2 configuration:')
- for key, value in tobii_controller.get_configuration().items():
- print(f'\t{key}: {value}')
-
- # Enable tobii data stream
- tobii_data_stream = tobii_controller.enable_data_stream()
-
- # Enable tobii video stream
- tobii_video_stream = tobii_controller.enable_video_stream()
-
- # Create tobii imu handler
- tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit()
-
- # Load optional imu calibration file
- if args.imu_calibration != None:
-
- tobii_imu.load_calibration_file(args.imu_calibration)
-
- # Init head rotation speed
- head_rotation_speed = numpy.zeros(3).astype(int)
-
- # Init gaze position and precision
- gaze_position_px = (0, 0)
- gaze_precision_px = 0
-
- # Init data timestamped in millisecond
- data_ts_ms = 0
-
- # Assess temporal performance
- loop_chrono = MiscFeatures.TimeProbe()
- gyroscope_chrono = MiscFeatures.TimeProbe()
- gaze_chrono = MiscFeatures.TimeProbe()
-
- loop_ps = 0
- gyroscope_ps = 0
- gaze_ps = 0
-
- def data_stream_callback(data_ts, data_object, data_object_type):
-
- nonlocal head_rotation_speed
- nonlocal gaze_position_px
- nonlocal gaze_precision_px
- nonlocal data_ts_ms
- nonlocal gyroscope_chrono
- nonlocal gaze_chrono
-
- data_ts_ms = data_ts / 1e3
-
- match data_object_type:
-
- case 'Gyroscope':
-
- # Assess gyroscope stream performance
- gyroscope_chrono.lap()
-
- # Apply imu calibration
- head_rotation_speed = tobii_imu.apply_gyroscope_offset(data_object).value.astype(int) * 5
-
- case 'GazePosition':
-
- # Assess gaze position stream performance
- gaze_chrono.lap()
-
- # Ignore frame when gaze position is not valid
- if data_object.validity == 0:
-
- gaze_position_px = (int(data_object.value[0] * video_frame.width), int(data_object.value[1] * video_frame.height))
-
- case 'GazePosition3D':
-
- # Ignore frame when gaze position 3D is not valid
- if data_object.validity == 0:
-
- gaze_precision_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.PRECISION)) * data_object.value[2]
- tobii_camera_hfov_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV / 2)) * data_object.value[2]
-
- gaze_precision_px = round(video_frame.width * float(gaze_precision_mm) / float(tobii_camera_hfov_mm))
-
- # Subscribe to tobii data stream
- tobii_data_stream.subscribe(data_stream_callback)
-
- # Start streaming
- tobii_controller.start_streaming()
-
- # Live video stream capture loop
- try:
-
- while tobii_video_stream.is_alive():
-
- # Read video stream
- video_ts, video_frame = tobii_video_stream.read()
- video_ts_ms = video_ts / 1e3
-
- # Assess loop performance
- lap_time, lap_counter, elapsed_time = loop_chrono.lap()
-
- # Update fps each 10 loops
- if lap_counter >= 10:
-
- loop_ps = 1e3 * lap_counter / elapsed_time
- loop_chrono.restart()
-
- # Assess gyroscope streaming performance
- elapsed_time, lap_counter = gyroscope_chrono.end()
- gyroscope_ps = 1e3 * lap_counter / elapsed_time
- gyroscope_chrono.restart()
-
- # Assess gaze streaming performance
- elapsed_time, lap_counter = gaze_chrono.end()
- gaze_ps = 1e3 * lap_counter / elapsed_time
- gaze_chrono.restart()
-
- # Draw head rotation speed considering only yaw and pitch values
- cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_rotation_speed[1], int(video_frame.height/2) - head_rotation_speed[0]), (150, 150, 150), 3)
-
- # Draw gaze
- gaze_position = GazeFeatures.GazePosition(gaze_position_px, precision=gaze_precision_px)
- gaze_position.draw(video_frame.matrix)
-
- # Draw center
- cv.line(video_frame.matrix, (int(video_frame.width/2) - 50, int(video_frame.height/2)), (int(video_frame.width/2) + 50, int(video_frame.height/2)), (255, 150, 150), 1)
- cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2) - 50), (int(video_frame.width/2), int(video_frame.height/2) + 50), (255, 150, 150), 1)
-
- # Write stream timing
- cv.rectangle(video_frame.matrix, (0, 0), (1100, 50), (63, 63, 63), -1)
- cv.putText(video_frame.matrix, f'Data stream time: {int(data_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(video_frame.matrix, f'Video delay: {int(data_ts_ms - video_ts_ms)} ms', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(video_frame.matrix, f'Fps: {int(loop_ps)}', (950, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- cv.rectangle(video_frame.matrix, (0, 50), (580, 100), (127, 127, 127), -1)
- cv.putText(video_frame.matrix, f'Gyroscope fps: {int(gyroscope_ps)}', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(video_frame.matrix, f'Gaze fps: {int(gaze_ps)}', (350, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- cv.imshow(f'Video and data stream', video_frame.matrix)
-
- key_pressed = cv.waitKey(1)
-
- #if key_pressed != -1:
- # print(key_pressed)
-
- # Set Auto scene camera preset with 'a' key
- if key_pressed == 97:
- tobii_controller.set_scene_camera_auto_preset()
- print('Tobii Glasses Pro 2 scene camera in Auto mode')
-
- # Set GazeExposure scene camera preset with 'z' key
- if key_pressed == 122:
- tobii_controller.set_scene_camera_gaze_preset()
- print('Tobii Glasses Pro 2 scene camera in GazeExposure mode')
-
- # Set Indoor eye camera preset with 'i' key
- if key_pressed == 105:
- tobii_controller.set_eye_camera_indoor_preset()
- print('Tobii Glasses Pro 2 eye camera in Indoor mode')
-
- # Set Outdoor eye camera preset with 'o' key
- if key_pressed == 111:
- tobii_controller.set_eye_camera_outdoor_preset()
- print('Tobii Glasses Pro 2 eye camera in Outdoor mode')
-
-
- # Close window using 'Esc' key
- if key_pressed == 27:
- break
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
- # Stop streaming
- tobii_controller.stop_streaming()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file