aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThéo de la Hogue2023-03-28 11:53:14 +0200
committerThéo de la Hogue2023-03-28 11:53:14 +0200
commit6b8e7ae63a63a2854613b98db2fdeb079026748e (patch)
tree799fbd0caec0038482c1da81fa130645da64caaf /src
parent78da945437bbb49a0fc84940c448e175f813230b (diff)
downloadargaze-6b8e7ae63a63a2854613b98db2fdeb079026748e.zip
argaze-6b8e7ae63a63a2854613b98db2fdeb079026748e.tar.gz
argaze-6b8e7ae63a63a2854613b98db2fdeb079026748e.tar.bz2
argaze-6b8e7ae63a63a2854613b98db2fdeb079026748e.tar.xz
Moving Tobii support into a dedicated repository.
Diffstat (limited to 'src')
-rw-r--r--src/argaze/ArFeatures.py2
-rw-r--r--src/argaze/ArUcoMarkers/README.md6
-rw-r--r--src/argaze/TobiiGlassesPro2/README.md89
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiController.py392
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiData.py565
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiEntities.py334
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py253
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py226
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiSpecifications.py15
-rw-r--r--src/argaze/TobiiGlassesPro2/TobiiVideo.py283
-rw-r--r--src/argaze/TobiiGlassesPro2/__init__.py5
-rw-r--r--src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdfbin1965 -> 0 bytes
-rw-r--r--src/argaze/TobiiGlassesPro2/utils/imu.json21
-rw-r--r--src/argaze/__init__.py2
-rw-r--r--src/argaze/utils/README.md75
-rw-r--r--src/argaze/utils/tobii_camera_calibrate.py140
-rw-r--r--src/argaze/utils/tobii_imu_calibrate.py214
-rw-r--r--src/argaze/utils/tobii_sdcard_explore.py83
-rw-r--r--src/argaze/utils/tobii_segment_arscene_edit.py381
-rw-r--r--src/argaze/utils/tobii_segment_arscene_export.py306
-rw-r--r--src/argaze/utils/tobii_segment_data_plot_export.py141
-rw-r--r--src/argaze/utils/tobii_segment_display.py150
-rw-r--r--src/argaze/utils/tobii_segment_gaze_metrics_export.py242
-rw-r--r--src/argaze/utils/tobii_segment_gaze_movements_export.py570
-rw-r--r--src/argaze/utils/tobii_segment_record.py96
-rw-r--r--src/argaze/utils/tobii_stream_arscene_display.py154
-rw-r--r--src/argaze/utils/tobii_stream_display.py218
27 files changed, 8 insertions, 4955 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index d8a9f31..21a7d6a 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -102,7 +102,7 @@ class ArEnvironment():
return output
class PoseEstimationFailed(Exception):
- """Exception raised by ArScene project method when the pose can't be estimated due to unconsistencies."""
+ """Exception raised by ArScene estimate_pose method when the pose can't be estimated due to unconsistencies."""
def __init__(self, message, unconsistencies=None):
diff --git a/src/argaze/ArUcoMarkers/README.md b/src/argaze/ArUcoMarkers/README.md
index bdc8f9e..931ee4b 100644
--- a/src/argaze/ArUcoMarkers/README.md
+++ b/src/argaze/ArUcoMarkers/README.md
@@ -7,8 +7,6 @@ Here is more [about ArUco markers dictionaries](https://docs.opencv.org/3.4/d9/d
## Utils
-Print **A3_board_35cmx25cm_markers_4X4_3cm.pdf** onto A3 paper sheet to get board at expected dimensions.
+Print **A3_DICT_ARUCO_ORIGINAL_3cm_35cmx25cm.pdf** onto A3 paper sheet to get board at expected dimensions.
-Print **A4_markers_4x4_3cm.pdf** onto A4 paper sheet to get markers at expected dimensions.
-
-Load **detecter_configuration.json** file with argaze utils **tobii_segment_aruco_aoi_export.py** script with -p option. This is an example file to illustrate how to setup [ArUco markers detection parameters](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html). \ No newline at end of file
+Print **A4_DICT_ARUCO_ORIGINAL_3cm_0-9.pdf** onto A4 paper sheet to get markers at expected dimensions.
diff --git a/src/argaze/TobiiGlassesPro2/README.md b/src/argaze/TobiiGlassesPro2/README.md
deleted file mode 100644
index f13490a..0000000
--- a/src/argaze/TobiiGlassesPro2/README.md
+++ /dev/null
@@ -1,89 +0,0 @@
-Class interface to handle Tobbi Glasses Pro 2 device.
-This work is greatly inspired by the David de Tommaso and Agnieszka Wykowska [TobiiGlassesPySuite](https://arxiv.org/pdf/1912.09142.pdf).
-
-.. note::
- Read [Tobii Glasses Pro 2 device user manual](https://www.tobiipro.com/siteassets/tobii-pro/user-manuals/tobii-pro-glasses-2-user-manual.pdf).
-
-## Utils
-
-* Print **A4_calibration_target.pdf** onto A4 paper sheet to get calibration target at expected dimension.
-
-* Load **imu.json** file with argaze utils **tobii_imu_calibrate.py** script with -i option. This is an example file to illustrate how to load Inertial Measure Unit (IMU) calibration parameters.
-
-## Local network configuration
-
-If the tobii Glasses aren't connected to a router, here is how to configure a local DHCP server to enable IPv4 device connection.
-
-### Linux (Ubuntu)
-
-* Setup static eth0 interface
-
-**/etc/network/interfaces**
-
-```
-auto eth0
-iface eth0 inet static
- address 192.168.1.1
- netmask 255.255.255.0
- network 192.168.1.0
- gateway 192.168.1.254
-```
-
-* Install DHCP server:
-
-```
-sudo apt-get install isc-dhcp
-```
-
-* Setup DHCP server:
-
-**/etc/default/isc-dhcp-server**
-
-```
-# On what interfaces should the DHCP server (dhcpd) serve DHCP requests?
-INTERFACESv4="eth0"
-INTERFACESv6=""
-```
-
-**/etc/dhcp/dhcpd.conf**
-
-```
-# NECESSARY TO BE A DHCP SERVER
-authoritative;
-
-# DHCP CONFIGURATION INFORMATION
-default-lease-time 43200;
-max-lease-time 86400;
-server-name "dhcpserver.robotron.lan";
-
-# DNS SERVERS DHCP WILL PUSH TO CLIENTS
-option domain-name-servers 192.168.1.1;
-
-# SEARCH DOMAINS DHCP WILL PUSH TO CLIENTS
-option domain-name "robotron.lan";
-
-# DHCP STATIC IP ASSIGNMENTS FILE
-include "/etc/dhcp/master.conf";
-
-# SUBNET FOR IP ADDRESSES MANUALLY/STATICALLY ASSIGNED ONLY
-subnet 192.168.1.0 netmask 255.255.255.0 {
- option broadcast-address 192.168.1.255;
- option subnet-mask 255.255.255.0;
- option routers 192.168.1.254;
-}
-```
-
-**/etc/dhcp/master.conf**
-
-```
-# Static IP assignments
-## SUBNET - 192.168.1.0/24
-host tobiiglasses { hardware ethernet 74:fe:48:34:7c:92; fixed-address 192.168.1.10; }
-```
-Replace 74:fe:48:34:7c:92 by the correct MAC address.
-
-* Monitor DHCP server activity:
-
-```
-journalctl | grep -Ei 'dhcp'
-```
diff --git a/src/argaze/TobiiGlassesPro2/TobiiController.py b/src/argaze/TobiiGlassesPro2/TobiiController.py
deleted file mode 100644
index 39f4e46..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiController.py
+++ /dev/null
@@ -1,392 +0,0 @@
-#!/usr/bin/env python
-
-import datetime
-import uuid
-
-from argaze.TobiiGlassesPro2 import TobiiNetworkInterface, TobiiData, TobiiVideo
-
-TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f'
-TOBII_DATETIME_FORMAT_HUMREAD = '%d/%m/%Y %H:%M:%S'
-
-DEFAULT_PROJECT_NAME = 'DefaultProject'
-DEFAULT_PARTICIPANT_NAME = 'DefaultParticipant'
-DEFAULT_RECORD_NAME = 'DefaultRecord'
-
-class TobiiController(TobiiNetworkInterface.TobiiNetworkInterface):
- """Handle Tobii glasses Pro 2 device using network interface.
- It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py)."""
-
- project_name = None
- """Project name."""
-
- project_id = None
- """Project identifier."""
-
- participant_name = None
- """Participant name."""
-
- participant_id = None
- """Participant identifier."""
-
- calibration_id = None
- """Calibration identifier."""
-
- def __init__(self, ip_address = None, project_name = DEFAULT_PROJECT_NAME, participant_name = DEFAULT_PARTICIPANT_NAME):
- """Create a project, a participant and start calibration."""
-
- super().__init__(ip_address)
-
- # bind to project or create one if it doesn't exist
- self.project_name = project_name
- self.project_id = self.set_project(self.project_name)
-
- # bind to participant or create one if it doesn't exist
- self.participant_name = participant_name
- self.participant_id = self.set_participant(self.project_id, self.participant_name)
-
- self.__recording_index = 0
-
- self.__data_stream = None
- self.__video_stream = None
-
- self.__record_event_thread = None
-
- super().wait_for_status('/api/system/status', 'sys_status', ['ok']) == 'ok'
-
- def __get_current_datetime(self, timeformat=TOBII_DATETIME_FORMAT):
- return datetime.datetime.now().replace(microsecond=0).strftime(timeformat)
-
- # STREAMING FEATURES
-
- def enable_data_stream(self) -> "TobiiData.TobiiDataStream":
- """Enable Tobii Glasses Pro 2 data streaming."""
-
- if self.__data_stream == None:
- self.__data_stream = TobiiData.TobiiDataStream(self)
-
- return self.__data_stream
-
- def enable_video_stream(self) -> "TobiiVideo.TobiiVideoStream":
- """Enable Tobii Glasses Pro 2 video camera streaming."""
-
- if self.__video_stream == None:
- self.__video_stream = TobiiVideo.TobiiVideoStream(self)
-
- return self.__video_stream
-
- def start_streaming(self):
- """Start data and/or video streaming."""
-
- if self.__data_stream != None:
- self.__data_stream.open()
-
- if self.__video_stream != None:
- self.__video_stream.open()
-
- def stop_streaming(self):
- """Stop data and/or video streaming."""
-
- if self.__data_stream != None:
- self.__data_stream.close()
-
- if self.__video_stream != None:
- self.__video_stream.close()
-
- # PROJECT FEATURES
-
- def set_project(self, project_name = DEFAULT_PROJECT_NAME) -> str:
- """Bind to a project or create one if it doesn't exist.
-
- * **Returns:**
- - project id
- """
-
- project_id = self.get_project_id(project_name)
-
- if project_id is None:
-
- data = {
- 'pr_info' : {
- 'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD),
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, project_name)),
- 'Name': project_name
- },
- 'pr_created': self.__get_current_datetime()
- }
-
- json_data = super().post_request('/api/projects', data)
-
- return json_data['pr_id']
-
- else:
-
- return project_id
-
- def get_project_id(self, project_name) -> str:
- """Get project id."""
-
- project_id = None
- projects = super().get_request('/api/projects')
-
- for project in projects:
-
- try:
- if project['pr_info']['Name'] == project_name:
- project_id = project['pr_id']
- except:
- pass
-
- return project_id
-
- def get_projects(self) -> str:
- """Get all projects id."""
-
- return super().get_request('/api/projects')
-
- # PARTICIPANT FEATURES
-
- def set_participant(self, project_id, participant_name = DEFAULT_PARTICIPANT_NAME, participant_notes = '') -> str:
- """Bind to a participant or create one if it doesn't exist.
-
- * **Returns:**
- - participant id
- """
-
- participant_id = self.get_participant_id(participant_name)
-
- if participant_id is None:
-
- data = {
- 'pa_project': project_id,
- 'pa_info': {
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.participant_name)),
- 'Name': self.participant_name,
- 'Notes': participant_notes
- },
- 'pa_created': self.__get_current_datetime()
- }
-
- json_data = super().post_request('/api/participants', data)
-
- return json_data['pa_id']
-
- else:
-
- return participant_id
-
- def get_participant_id(self, participant_name) -> str:
- """Get participant id."""
-
- participant_id = None
- participants = super().get_request('/api/participants')
-
- for participant in participants:
-
- try:
- if participant['pa_info']['Name'] == participant_name:
- participant_id = participant['pa_id']
-
- except:
- pass
-
- return participant_id
-
- def get_participants(self) -> str:
- """Get all participants id."""
-
- return super().get_request('/api/participants')
-
- # CALIBRATION
-
- def calibrate(self):
- """Start Tobii glasses calibration for current project and participant."""
-
- input('Position Tobbi glasses calibration target then press \'Enter\' to start calibration.')
-
- data = {
- 'ca_project': self.project_id,
- 'ca_type': 'default',
- 'ca_participant': self.participant_id,
- 'ca_created': self.__get_current_datetime()
- }
-
- json_data = super().post_request('/api/calibrations', data)
-
- self.calibration_id = json_data['ca_id']
-
- super().post_request('/api/calibrations/' + self.calibration_id + '/start')
-
- status = super().wait_for_status('/api/calibrations/' + self.calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
-
- if status == 'uncalibrated' or status == 'stale' or status == 'failed':
- raise Error(f'Tobii calibration {self.calibration_id} {status}')
-
- # RECORDING FEATURES
-
- def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']):
- return super().wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array)
-
- def create_recording(self, participant_name, recording_notes = '') -> str:
- """Create a new recording.
-
- * **Returns:**
- - recording id
- """
-
- participant_id = self.get_participant_id(participant_name)
-
- if participant_id is None:
- raise NameError(f'{participant_name} participant doesn\'t exist')
-
- self.__recording_index += 1
- recording_name = f'Recording_{self.__recording_index}'
-
- data = {
- 'rec_participant': participant_id,
- 'rec_info': {
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, participant_name)),
- 'Name': recording_name,
- 'Notes': recording_notes
- },
- 'rec_created': self.__get_current_datetime()
- }
-
- json_data = super().post_request('/api/recordings', data)
-
- return json_data['rec_id']
-
- def start_recording(self, recording_id) -> bool:
- """Start recording on the Tobii interface's SD Card."""
-
- super().post_request('/api/recordings/' + recording_id + '/start')
- return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording'
-
- def stop_recording(self, recording_id) -> bool:
- """Stop recording on the Tobii interface's SD Card."""
-
- super().post_request('/api/recordings/' + recording_id + '/stop')
- return self.__wait_for_recording_status(recording_id, ['done']) == "done"
-
- def pause_recording(self, recording_id) -> bool:
- """Pause recording on the Tobii interface's SD Card."""
-
- super().post_request('/api/recordings/' + recording_id + '/pause')
- return self.__wait_for_recording_status(recording_id, ['paused']) == "paused"
-
- def __get_recording_status(self):
- return self.get_status()['sys_recording']
-
- def get_current_recording_id(self) -> str:
- """Get current recording id."""
-
- return self.__get_recording_status()['rec_id']
-
- @property
- def recording(self) -> bool:
- """Is it recording?"""
-
- rec_status = self.__get_recording_status()
-
- if rec_status != {}:
- if rec_status['rec_state'] == "recording":
- return True
-
- return False
-
- def get_recordings(self) -> str:
- """Get all recordings id."""
-
- return super().get_request('/api/recordings')
-
- # EVENTS AND EXPERIMENTAL VARIABLES
-
- def __post_recording_data(self, event_type: str, event_tag = ''):
- data = {'type': event_type, 'tag': event_tag}
- super().post_request('/api/events', data, wait_for_response=False)
-
- def send_event(self, event_type: str, event_value = None):
- self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value))
-
- def send_variable(self, variable_name: str, variable_value = None):
- self.__post_recording_data(str(variable_name), str(variable_value))
-
- # MISC
-
- def eject_sd(self):
- super().get_request('/api/eject')
-
- def get_battery_info(self):
- return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) )
-
- def get_battery_level(self):
- return self.get_battery_status()['level']
-
- def get_battery_remaining_time(self):
- return self.get_battery_status()['remaining_time']
-
- def get_battery_status(self):
- return self.get_status()['sys_battery']
-
- def get_et_freq(self):
- return self.get_configuration()['sys_et_freq']
-
- def get_et_frequencies(self):
- return self.get_status()['sys_et']['frequencies']
-
- def identify(self):
- super().get_request('/api/identify')
-
- def get_address(self):
- return self.address
-
- def get_configuration(self):
- return super().get_request('/api/system/conf')
-
- def get_status(self):
- return super().get_request('/api/system/status')
-
- def get_storage_info(self):
- return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) )
-
- def get_storage_remaining_time(self):
- return self.get_storage_status()['remaining_time']
-
- def get_storage_status(self):
- return self.get_status()['sys_storage']
-
- def get_scene_camera_freq(self):
- return self.get_configuration()['sys_sc_fps']
-
- def set_et_freq_50(self):
- data = {'sys_et_freq': 50}
- json_data = super().post_request('/api/system/conf', data)
-
- def set_et_freq_100(self):
- # May not be available. Check get_et_frequencies() first.
- data = {'sys_et_freq': 100}
- json_data = super().post_request('/api/system/conf', data)
-
- def set_eye_camera_indoor_preset(self) -> str:
- data = {'sys_ec_preset': 'Indoor'}
- return super().post_request('/api/system/conf', data)
-
- def set_eye_camera_outdoor_preset(self) -> str:
- data = {'sys_ec_preset': 'ClearWeather'}
- return super().post_request('/api/system/conf', data)
-
- def set_scene_camera_auto_preset(self):
- data = {'sys_sc_preset': 'Auto'}
- json_data = super().post_request('/api/system/conf', data)
-
- def set_scene_camera_gaze_preset(self):
- data = {'sys_sc_preset': 'GazeBasedExposure'}
- json_data = super().post_request('/api/system/conf', data)
-
- def set_scene_camera_freq_25(self):
- data = {'sys_sc_fps': 25}
- json_data = super().post_request('/api/system/conf/', data)
-
- def set_scene_camera_freq_50(self):
- data = {'sys_sc_fps': 50}
- json_data = super().post_request('/api/system/conf/', data)
-
diff --git a/src/argaze/TobiiGlassesPro2/TobiiData.py b/src/argaze/TobiiGlassesPro2/TobiiData.py
deleted file mode 100644
index 0e28054..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiData.py
+++ /dev/null
@@ -1,565 +0,0 @@
-#!/usr/bin/env python
-
-from typing import Tuple, TypeVar
-from dataclasses import dataclass
-import threading
-import uuid
-import gzip
-import json
-import time
-import queue
-
-from argaze import DataStructures
-from argaze.TobiiGlassesPro2 import TobiiNetworkInterface
-
-from argaze.utils import MiscFeatures
-
-import numpy
-
-TobiiDataObjectType = TypeVar('TobiiDataObjectType', bound="TobiiDataObjectType")
-# Type definition for type annotation convenience
-
-@dataclass
-class DirSig():
- """Define dir sig data (dir sig)."""
-
- dir: int # meaning ?
- sig: int # meaning ?
-
-@dataclass
-class PresentationTimeStamp():
- """Define presentation time stamp (pts) data."""
-
- value: int
- """Pts value."""
-
-@dataclass
-class VideoTimeStamp():
- """Define video time stamp (vts) data."""
-
- value: int
- """Vts value."""
-
- offset: int
- """Primary time stamp value."""
-
-@dataclass
-class EventSynch():
- """Define event synch (evts) data."""
-
- value: int # meaning ?
- """Evts value."""
-
-@dataclass
-class Event():
- """Define event data (ets type tag)."""
-
- ets: int # meaning ?
- type: str
- tag: str # dict ?
-
-@dataclass
-class Accelerometer():
- """Define accelerometer data (ac)."""
-
- value: numpy.array
- """Accelerometer value"""
-
-@dataclass
-class Gyroscope():
- """Define gyroscope data (gy)."""
-
- value: numpy.array
- """Gyroscope value"""
-
-@dataclass
-class PupilCenter():
- """Define pupil center data (gidx pc eye)."""
-
- validity: int
- index: int
- value: tuple((float, float, float))
- eye: str # 'right' or 'left'
-
-@dataclass
-class PupilDiameter():
- """Define pupil diameter data (gidx pd eye)."""
-
- validity: int
- index: int
- value: float
- eye: str # 'right' or 'left'
-
-@dataclass
-class GazeDirection():
- """Define gaze direction data (gidx gd eye)."""
-
- validity: int
- index: int
- value: tuple((float, float, float))
- eye: str # 'right' or 'left'
-
-@dataclass
-class GazePosition():
- """Define gaze position data (gidx l gp)."""
-
- validity: int
- index: int
- l: str # ?
- value: tuple((float, float))
-
-@dataclass
-class GazePosition3D():
- """Define gaze position 3D data (gidx gp3)."""
-
- validity: int
- index: int
- value: tuple((float, float))
-
-@dataclass
-class MarkerPosition():
- """Define marker data (marker3d marker2d)."""
-
- value_3d: tuple((float, float, float))
- value_2d: tuple((float, float))
-
-class TobiiJsonDataParser():
-
- def parse_dir_sig(self, status, json_data):
-
- return DirSig(json_data['dir'], json_data['sig'])
-
- def parse_pts(self, status, json_data):
-
- return PresentationTimeStamp(json_data['pts'])
-
- def parse_vts(self, status, json_data):
-
- return VideoTimeStamp(json_data['vts'], json_data['ts'])
-
- def parse_event_synch(self, status, json_data):
-
- return EventSynch(json_data['evts'])
-
- def parse_event(self, status, json_data):
-
- return Event(json_data['ets'], json_data['type'], json_data['tag'])
-
- def parse_accelerometer(self, status, json_data):
-
- return Accelerometer(json_data['ac'])
-
- def parse_gyroscope(self, status, json_data):
-
- return Gyroscope(json_data['gy'])
-
- def parse_pupil_center(self, status, gaze_index, json_data):
-
- return PupilCenter(status, gaze_index, json_data['pc'], json_data['eye'])
-
- def parse_pupil_diameter(self, status, gaze_index, json_data):
-
- return PupilDiameter(status, gaze_index, json_data['pd'], json_data['eye'])
-
- def parse_gaze_direction(self, status, gaze_index, json_data):
-
- return GazeDirection(status, gaze_index, json_data['gd'], json_data['eye'])
-
- def parse_gaze_position(self, status, gaze_index, json_data):
-
- return GazePosition(status, gaze_index, json_data['l'], json_data['gp'])
-
- def parse_gaze_position_3d(self, status, gaze_index, json_data):
-
- return GazePosition3D(status, gaze_index, json_data['gp3'])
-
- def parse_marker_position(self, status, json_data):
-
- return MarkerPosition(json_data['marker3d'], json_data['marker2d'])
-
- def parse_pupil_or_gaze(self, status, json_data):
-
- gaze_index = json_data.pop('gidx')
-
- # parse pupil or gaze data depending second json key
- second_key = next(iter(json_data))
-
- parse_map = {
- 'pc': self.parse_pupil_center,
- 'pd': self.parse_pupil_diameter,
- 'gd': self.parse_gaze_direction,
- 'l': self.parse_gaze_position,
- 'gp3': self.parse_gaze_position_3d
- }
-
- return parse_map[second_key](status, gaze_index, json_data)
-
- def parse_data(self, status, json_data):
-
- # parse data depending first json key
- first_key = next(iter(json_data))
-
- parse_map = {
- 'dir': self.parse_dir_sig,
- 'pts': self.parse_pts,
- 'vts': self.parse_vts,
- 'evts': self.parse_event_synch,
- 'ets': self.parse_event,
- 'ac': self.parse_accelerometer,
- 'gy': self.parse_gyroscope,
- 'gidx': self.parse_pupil_or_gaze,
- 'marker3d': self.parse_marker_position
- }
-
- return parse_map[first_key](status, json_data)
-
-class TobiiDataSegment():
- """Handle Tobii Glasses Pro 2 segment data file from segment directory.
- Load, parse and store each segment data into dedicated TimeStampedBuffers considering VideoTimeStamp offset to ease data/video synchronisation."""
-
- def __init__(self, segment_data_path, start_timestamp = 0, end_timestamp = None):
-
- self.__path = segment_data_path
-
- self.__vts_offset = 0
- self.__vts_ts = -1
-
- self.__json_data_parser = TobiiJsonDataParser()
-
- self.__ts_data_buffer_dict = {
- 'DirSig': DataStructures.TimeStampedBuffer(),
- 'PresentationTimeStamp': DataStructures.TimeStampedBuffer(),
- 'VideoTimeStamp': DataStructures.TimeStampedBuffer(),
- 'EventSynch': DataStructures.TimeStampedBuffer(),
- 'Event': DataStructures.TimeStampedBuffer(),
- 'Accelerometer': DataStructures.TimeStampedBuffer(),
- 'Gyroscope': DataStructures.TimeStampedBuffer(),
- 'PupilCenter': DataStructures.TimeStampedBuffer(),
- 'PupilDiameter': DataStructures.TimeStampedBuffer(),
- 'GazeDirection': DataStructures.TimeStampedBuffer(),
- 'GazePosition': DataStructures.TimeStampedBuffer(),
- 'GazePosition3D': DataStructures.TimeStampedBuffer(),
- 'MarkerPosition': DataStructures.TimeStampedBuffer()
- }
-
- # define a decoder function
- def decode(json_data):
-
- # parse data status
- status = json_data.pop('s', -1)
-
- # convert timestamp
- ts = json_data.pop('ts')
-
- # watch for vts data to offset timestamps
- try:
- self.__vts_offset = json_data['vts']
- self.__vts_ts = ts
-
- # store primary ts value to allow further reverse offset operation
- json_data['ts'] = ts
-
- except KeyError:
- pass
-
- # ignore data before first vts entry
- if self.__vts_ts == -1:
- return True # continue
-
- ts -= self.__vts_ts
- ts += self.__vts_offset
-
- # ignore timestamps out of the given time range
- if ts < start_timestamp:
- return True # continue
-
- if ts >= end_timestamp:
- return False # stop
-
- # convert json data into data object
- data_object = self.__json_data_parser.parse_data(status, json_data)
- data_object_type = type(data_object).__name__
-
- # store data object into dedicated timestamped buffer
- self.__ts_data_buffer_dict[data_object_type][ts] = data_object
-
- return True # continue
-
- # start loading
- with gzip.open(self.__path) as f:
-
- for item in f:
- if not json.loads(item.decode('utf-8'), object_hook=decode):
- break
-
- def __getitem__(self, key):
- return self.__ts_data_buffer_dict[key]
-
- def keys(self) -> list[str]:
- """Get all segment data keys."""
-
- return list(self.__ts_data_buffer_dict.keys())
-
- @property
- def path(self) -> str:
- """Get segment data path."""
-
- return self.__path
-
-class TobiiDataStream():
- """Handle Tobii Glasses Pro 2 data stream in separate thread."""
-
- def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface):
- """Initialise network connection and prepare data reception."""
-
- # Enable network connection
- self.__network = network_interface
- self.__data_socket = self.__network.make_socket()
-
- # Data reception
- self.__data_thread = None
- self.__json_data_parser = TobiiJsonDataParser()
- self.__first_ts = 0
-
- # Sync reading data subscription
- self.reading_callbacks = []
- self.__subcription_lock = threading.Lock()
-
- # Keep connection alive
- self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
- self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
- self.__keep_alive_thread.daemon = True
- self.__keep_alive_thread.start()
-
- def __del__(self):
- """Stop data reception and network connection before destruction."""
-
- if self.__data_thread != None:
-
- threading.Thread.join(self.__data_thread)
- self.__data_thread = None
-
- threading.Thread.join(self.__keep_alive_thread)
-
- self.__data_socket.close()
-
- def __keep_alive(self):
- """Maintain network connection and clear socket when is not read."""
-
- while True:
-
- # if no thread is reading data socket
- if self.__data_thread == None:
-
- self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg)
-
- clear_count = 0
- while clear_count < 1000 and self.__data_thread == None:
-
- # Clear socket each milli second
- time.sleep(0.001)
- self.__network.grab_data(self.__data_socket)
- clear_count += 1
-
- else:
-
- self.__network.send_keep_alive_msg(self.__data_socket, self.__keep_alive_msg)
- time.sleep(1)
-
- def open(self):
- """Start data reception."""
-
- if self.__data_thread == None:
-
- self.__first_ts = 0
-
- self.__data_thread = threading.Thread(target = self.__run)
- self.__data_thread.daemon = True
-
- self.__stop_event = threading.Event()
-
- self.__data_thread.start()
-
- def close(self):
- """Stop data reception definitively."""
-
- if self.__data_thread != None:
-
- self.__stop_event.set()
-
- threading.Thread.join(self.__data_thread)
- self.__data_thread = None
-
- @property
- def running(self) -> bool:
- """Is tobii data streaming running?"""
-
- return self.__data_thread != None
-
- def __run(self):
- """Managed received data for sync and async reading case.
- - Sync: send data to callback function.
- - Async: store data into a locked queue for further reading."""
-
- while not self.__stop_event.isSet():
-
- # grab data
- data = self.__network.grab_data(self.__data_socket)
-
- # decode data
- json_data = json.loads(data.decode('utf-8'))
-
- # parse json into timestamped data object
- data_ts, data_object, data_object_type = self.__parse_json_data(json_data)
-
- # lock data subcription
- self.__subcription_lock.acquire()
-
- # share incoming data to all subscribers
- for callback in self.reading_callbacks:
-
- callback(data_ts, data_object, data_object_type)
-
- # unlock data subscription
- self.__subcription_lock.release()
-
- def subscribe(self, reading_callback):
- """Pass reading callback function to get incoming (data_ts, data_object, data_object_type) back."""
-
- # lock data subcription
- self.__subcription_lock.acquire()
-
- # append callback
- self.reading_callbacks.append(reading_callback)
-
- # unlock data subscription
- self.__subcription_lock.release()
-
- def unsubscribe(self, reading_callback):
- """Remove reading callback function to stop data reception."""
-
- # lock data subcription
- self.__subcription_lock.acquire()
-
- # remove callback
- self.reading_callbacks.remove(reading_callback)
-
- # unlock data subscription
- self.__subcription_lock.release()
-
- def __parse_json_data(self, json_data):
-
- # parse data status
- status = json_data.pop('s', -1)
-
- # convert timestamp
- data_ts = json_data.pop('ts')
-
- # convert json data into data object
- data_object = self.__json_data_parser.parse_data(status, json_data)
- data_object_type = type(data_object).__name__
-
- # keep first timestamp to offset all timestamps
- if self.__first_ts == 0:
- self.__first_ts = data_ts
-
- data_ts -= self.__first_ts
-
- return data_ts, data_object, data_object_type
-
- def __capture_callback(self, data_ts, data_object, data_object_type):
-
- if data_object_type == self.__data_stream_selector:
-
- if len(self.__data_ts_buffer.keys()) < self.__data_ts_buffer_size:
-
- # update first timestamp if next timestamps are negative
- if data_ts < 0:
- self.__first_ts += data_ts
- data_ts = 0
-
- self.__data_ts_buffer[data_ts] = data_object
-
- def capture(self, data_ts_buffer, data_object_type = '', sample_number = 500) -> int:
- """Start data stream capture.
-
- * **Returns: each 100 ms**
- - buffer size
- """
-
- # Prepare for data acquisition
- self.__data_ts_buffer = data_ts_buffer
- self.__data_stream_selector = data_object_type
- self.__data_ts_buffer_size = sample_number
-
- # Subscribe to tobii data stream
- self.subscribe(self.__capture_callback)
-
- # Start data stream if needed
- close_after = False
- if not self.running:
- self.open()
- close_after = True
-
- # Share data acquisition progress
- buffer_size = 0
- while buffer_size < sample_number:
-
- time.sleep(0.1)
-
- buffer_size = len(self.__data_ts_buffer.keys())
-
- yield buffer_size
-
- # Stop data sream if needed
- if close_after:
- self.close()
-
- # Unsubscribe to tobii data stream
- self.unsubscribe(self.__capture_callback)
-
- def __buffer_callback(self, data_ts, data_object, data_object_type):
-
- # Lock data queue access
- self.__queue_lock.acquire()
-
- # Put data into the queue
- self.__data_queue.put((data_ts, data_object, data_object_type))
-
- # Unlock data queue access
- self.__queue_lock.release()
-
- def read(self) -> Tuple[int, TobiiDataObjectType, str]:
- """Iterate over incoming data buffer asynchronously."""
-
- # Setup data buffering
- self.__data_queue = queue.Queue()
- self.__queue_lock = threading.Lock()
-
- # Subscribe to tobii data stream
- self.subscribe(self.__buffer_callback)
-
- return self.__iter__()
-
- def __iter__(self):
-
- return self
-
- def __next__(self):
-
- # Wait for data
- while self.__data_queue.empty():
-
- time.sleep(0.0001)
- continue
-
- # Lock data queue access
- self.__queue_lock.acquire()
-
- # Get data from the queue
- data_ts, data_object, data_object_type = self.__data_queue.get()
-
- # Unlock data queue access
- self.__queue_lock.release()
-
- return data_ts, data_object, data_object_type
diff --git a/src/argaze/TobiiGlassesPro2/TobiiEntities.py b/src/argaze/TobiiGlassesPro2/TobiiEntities.py
deleted file mode 100644
index 0ae6dec..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiEntities.py
+++ /dev/null
@@ -1,334 +0,0 @@
-#!/usr/bin/env python
-
-from typing import TypeVar
-import datetime
-import json
-import os
-
-from argaze.TobiiGlassesPro2 import TobiiData, TobiiVideo
-
-import av
-import cv2 as cv
-
-TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f'
-
-TOBII_PROJECTS_DIRNAME = "projects"
-TOBII_PROJECT_FILENAME = "project.json"
-
-TOBII_PARTICIPANTS_DIRNAME = "participants"
-TOBII_PARTICIPANT_FILENAME = "participant.json"
-
-TOBII_RECORDINGS_DIRNAME = "recordings"
-TOBII_RECORD_FILENAME = "recording.json"
-
-TOBII_SEGMENTS_DIRNAME = "segments"
-TOBII_SEGMENT_INFO_FILENAME = "segment.json"
-TOBII_SEGMENT_VIDEO_FILENAME = "fullstream.mp4"
-TOBII_SEGMENT_DATA_FILENAME = "livedata.json.gz"
-
-DatetimeType = TypeVar('datetime', bound="datetime")
-# Type definition for type annotation convenience
-
-class TobiiSegment:
- """Handle Tobii Glasses Pro 2 segment info."""
-
- def __init__(self, segment_path, start_timestamp:int = 0, end_timestamp:int = None):
- """Load segment info from segment directory.
- Optionnaly select a time range in microsecond."""
-
- self.__id = os.path.basename(segment_path)
- self.__path = segment_path
-
- with open(os.path.join(self.__path, TOBII_SEGMENT_INFO_FILENAME)) as f:
- try:
- item = json.load(f)
- except:
- raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}')
-
- self.__start_timestamp = start_timestamp
- self.__end_timestamp = min(end_timestamp, int(item["seg_length"] * 1e6)) if end_timestamp != None else int(item["seg_length"] * 1e6)
-
- if self.__start_timestamp >= self.__end_timestamp:
- raise ValueError('start time is equal or greater than end time.')
-
- self.__calibrated = bool(item["seg_calibrated"])
-
- self.__start_date = datetime.datetime.strptime(item["seg_t_start"], TOBII_DATETIME_FORMAT)
- self.__stop_date = datetime.datetime.strptime(item["seg_t_stop"], TOBII_DATETIME_FORMAT)
-
- @property
- def path(self) -> str:
- """Get segment path."""
-
- return self.__path
-
- @property
- def id(self) -> str:
- """Get segment id."""
- return self.__id
-
- @property
- def start_timestamp(self) -> int:
- """Get the timestamp where the segment loading starts."""
-
- return self.__start_timestamp
-
- @property
- def end_timestamp(self) -> int:
- """Get the timestamp where the segment loading ends."""
-
- return self.__end_timestamp
-
- @property
- def start_date(self) -> DatetimeType:
- """Get the date when the segment has started."""
-
- return self.__start_date
-
- @property
- def stop_date(self) -> DatetimeType:
- """Get the date when the segment has stopped."""
-
- return self.__stop_date
-
- @property
- def calibrated(self) -> bool:
- """Is the segment has been calibrated?"""
-
- return self.__calibrated
-
- def load_data(self) -> "TobiiData.TobiiDataSegment":
- """Load recorded data stream."""
-
- return TobiiData.TobiiDataSegment(os.path.join(self.__path, TOBII_SEGMENT_DATA_FILENAME), self.__start_timestamp, self.__end_timestamp)
-
- def load_video(self) -> "TobiiVideo.TobiiVideoSegment":
- """Load recorded video stream."""
-
- return TobiiVideo.TobiiVideoSegment(os.path.join(self.__path, TOBII_SEGMENT_VIDEO_FILENAME), self.__start_timestamp, self.__end_timestamp)
-
-class TobiiRecording:
- """Handle Tobii Glasses Pro 2 recording info and segments."""
-
- def __init__(self, recording_path):
- """Load recording info from recording directory."""
-
- self.__id = os.path.basename(recording_path)
- self.__path = recording_path
-
- with open(os.path.join(self.__path, TOBII_RECORD_FILENAME)) as f:
- try:
- item = json.load(f)
- except:
- raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_RECORD_FILENAME}')
-
- self.__name = item["rec_info"]["Name"]
- self.__creation_date = datetime.datetime.strptime(item["rec_created"], TOBII_DATETIME_FORMAT)
-
- self.__length = int(item["rec_length"])
- self.__et_samples = int(item["rec_et_samples"])
- self.__et_samples_valid = int(item["rec_et_valid_samples"])
- self.__participant_id = item["rec_participant"]
-
- @property
- def path(self) -> str:
- """Get recording path."""
-
- return self.__path
-
- @property
- def id(self) -> str:
- """Get recording id."""
- return self.__id
-
- @property
- def name(self) -> str:
- """Get recording name."""
-
- return self.__name
-
- @property
- def creation_date(self) -> DatetimeType:
- """Get date when the recording has been done."""
-
- return self.__creation_date
-
- @property
- def length(self):
- """Get record duration in second."""
-
- return self.__length
-
- @property
- def eyetracker_samples(self) -> int:
- """Get numbers of recorded eye detecter samples."""
-
- return self.__et_samples
-
- @property
- def eyetracker_samples_valid(self) -> int:
- """Get numbers of recorded eye detecter valid samples."""
-
- return self.__et_samples_valid
-
- @property
- def project(self) -> "TobiiProject":
- """Get project to which it belongs."""
-
- project_path = os.path.dirname(os.path.dirname(os.path.abspath(self.__path)))
-
- return TobiiProject(project_path)
-
- @property
- def participant(self) -> "TobiiParticipant":
- """Get participant to which it belongs."""
-
- project_path = os.path.dirname(os.path.dirname(os.path.abspath(self.__path)))
-
- return TobiiParticipant(project_path + '/participants/' + self.__participant_id)
-
- def segments(self) -> list["TobiiSegment"]:
- """Get all recorded segments."""
-
- all_segments = []
- segments_path = os.path.join(self.__path, TOBII_SEGMENTS_DIRNAME)
-
- for item in os.listdir(segments_path):
- segment_path = os.path.join(segments_path, item)
- if os.path.isdir(segment_path):
- all_segments.append(TobiiSegment(segment_path))
-
- return all_segments
-
-class TobiiParticipant:
- """Handle Tobii Glasses Pro 2 participant data."""
-
- def __init__(self, participant_path):
- """Load participant data from path"""
-
- self.__id = os.path.basename(participant_path)
- self.__path = participant_path
-
- with open(os.path.join(self.__path, TOBII_PARTICIPANT_FILENAME)) as f:
- try:
- item = json.load(f)
- except:
- raise RuntimeError(f'JSON fails to load {source_dir}/{TOBII_PARTICIPANT_FILENAME}')
-
- self.__name = item["pa_info"]["Name"]
-
- @property
- def path(self) -> str:
- """Get participant path."""
-
- return self.__path
-
- @property
- def id(self) -> str:
- """Get participant id."""
- return self.__id
-
- @property
- def name(self) -> str:
- """Get participant name."""
-
- return self.__name
-
-class TobiiProject:
- """Handle Tobii Glasses Pro 2 project data."""
-
- def __init__(self, project_path):
- """Load project data from projects directory and project id."""
-
- self.__id = os.path.basename(project_path)
- self.__path = project_path
-
- with open(os.path.join(self.__path, TOBII_PROJECT_FILENAME)) as f:
- try:
- item = json.load(f)
- except:
- raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_PROJECT_FILENAME}')
-
- self.__creation_date = datetime.datetime.strptime(item["pr_created"], TOBII_DATETIME_FORMAT)
-
- try:
- self.__name = item["pr_info"]["Name"]
- except:
- self.__name = None
-
- @property
- def path(self) -> str:
- """Get project path."""
-
- return self.__path
-
- @property
- def id(self) -> str:
- """Get project id."""
- return self.__id
-
- @property
- def name(self) -> str:
- """Get project name."""
-
- return self.__name
-
- @property
- def creation_date(self) -> DatetimeType:
- """Get date when the project has been created."""
-
- return self.__creation_date
-
- def participants(self) -> list["TobiiParticipant"]:
- """Get all participants."""
-
- all_participants = []
- participants_path = os.path.join(self.__path, TOBII_PARTICIPANTS_DIRNAME)
-
- for item in os.listdir(participants_path):
- participant_path = os.path.join(participants_path, item)
- if os.path.isdir(participant_path):
- all_participants.append(TobiiParticipant(participant_path))
-
- return all_participants
-
- def recordings(self) -> list["TobiiRecording"]:
- """Get all recordings."""
-
- all_recordings = []
- recordings_path = os.path.join(self.__path, TOBII_RECORDINGS_DIRNAME)
-
- for item in os.listdir(recordings_path):
- recording_path = os.path.join(recordings_path, item)
- if os.path.isdir(recording_path):
- all_recordings.append(TobiiRecording(recording_path))
-
- return all_recordings
-
-class TobiiDrive:
- """Handle Tobii Glasses Pro 2 drive data."""
-
- def __init__(self, drive_path):
- """Load drive data from drive directory path."""
-
- self.__path = drive_path
-
- @property
- def path(self) -> str:
- """Get drive path."""
-
- return self.__path
-
- def projects(self) -> list["TobiiProject"]:
- """Get all projects."""
-
- all_projects = []
- projects_path = os.path.join(self.__path, TOBII_PROJECTS_DIRNAME)
-
- for item in os.listdir(projects_path):
- project_path = os.path.join(projects_path, item)
- if os.path.isdir(project_path):
- all_projects.append(TobiiProject(project_path))
-
- return all_projects
-
diff --git a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py b/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py
deleted file mode 100644
index 35bb035..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiInertialMeasureUnit.py
+++ /dev/null
@@ -1,253 +0,0 @@
-#!/usr/bin/env python
-
-from typing import Tuple
-import json
-import time
-import math
-
-from argaze.TobiiGlassesPro2 import TobiiData
-
-import numpy
-from scipy.optimize import curve_fit
-import cv2 as cv
-
-
-EARTH_GRAVITY = -9.81
-"""Earth gravity force (m/s2)."""
-
-EARTH_GRAVITY_VECTOR = [0, EARTH_GRAVITY, 0]
-"""Earth gravity force vector."""
-
-CAMERA_TO_IMU_TRANSLATION_VECTOR = [8, -1, -5]
-"""Translation vector from camera referential to imu referential (cm)."""
-
-CAMERA_TO_IMU_ROTATION_VECTOR = [18, 0, 180]
-"""Rotation vector from camera referential to imu referential (euler, degree)."""
-
-class TobiiInertialMeasureUnit():
- """Ease Tobbi [Inertial Measure Unit](https://connect.tobii.com/s/article/How-are-the-MEMS-data-reported-for-Tobii-Pro-Glasses-2?language=en_US) data handling"""
-
- def __init__(self):
- """Define IMU calibration data."""
-
- self.__gyroscope_offset = numpy.zeros(3)
- self.__accelerometer_coefficients = numpy.array([[1., 0.], [1., 0.], [1., 0.]])
-
- self.__plumb = numpy.array(EARTH_GRAVITY_VECTOR)
-
- self.reset_rotation()
- self.reset_translation()
-
- def load_calibration_file(self, calibration_filepath):
- """Load IMU calibration from a .json file."""
-
- with open(calibration_filepath) as calibration_file:
-
- # Deserialize .json
- # TODO find a better way
- calibration_data = json.load(calibration_file)
-
- # Load calibration data
- self.__gyroscope_offset = numpy.array(calibration_data['gyroscope_offset'])
- self.__accelerometer_coefficients = numpy.array(calibration_data['accelerometer_coefficients'])
-
- def save_calibration_file(self, calibration_filepath):
- """Save IMU calibration into .json file."""
-
- calibration_data = {
- 'gyroscope_offset': list(self.__gyroscope_offset),
- 'accelerometer_coefficients': [list(self.__accelerometer_coefficients[0]), list(self.__accelerometer_coefficients[1]), list(self.__accelerometer_coefficients[2])]
- }
-
- with open(calibration_filepath, 'w', encoding='utf-8') as calibration_file:
-
- json.dump(calibration_data, calibration_file, ensure_ascii=False, indent=4)
-
- def calibrate_gyroscope_offset(self, gyroscope_ts_buffer) -> numpy.array:
- """Calibrate gyroscope offset from a timestamped gyroscope buffer.
- **Returns:** numpy.array"""
-
- # Consider gyroscope values without timestamps
- gyroscope_values = []
- for ts, data_object in gyroscope_ts_buffer.items():
- gyroscope_values.append(data_object.value)
-
- # Calculate average value for each axis
- gx_offset = numpy.mean(numpy.array(gyroscope_values)[:, 0])
- gy_offset = numpy.mean(numpy.array(gyroscope_values)[:, 1])
- gz_offset = numpy.mean(numpy.array(gyroscope_values)[:, 2])
-
- # Store result
- self.__gyroscope_offset = numpy.array([gx_offset, gy_offset, gz_offset])
-
- return self.__gyroscope_offset
-
- @property
- def gyroscope_offset(self) -> numpy.array:
- """Get gyroscope offset."""
-
- return self.__gyroscope_offset
-
- def apply_gyroscope_offset(self, gyroscope_data_object: TobiiData.Gyroscope) -> "TobiiData.Gyroscope":
- """Remove gyroscope offset to given gyroscope data."""
-
- return TobiiData.Gyroscope(gyroscope_data_object.value - self.__gyroscope_offset)
-
- def reset_rotation(self):
- """Reset rotation value before to start integration process."""
-
- self.__last_gyroscope_ts = None
-
- self.__rotation = numpy.zeros(3)
-
- def update_rotation(self, gyroscope_data_ts, gyroscope_data_object):
- """Integrate timestamped gyroscope values to update rotation."""
-
- # Convert deg/s into deg/ms
- current_gyroscope = gyroscope_data_object.value * 1e-3
-
- # Init gyroscope integration
- if self.__last_gyroscope_ts == None:
-
- self.__last_gyroscope_ts = gyroscope_data_ts
- self.__last_gyroscope = current_gyroscope
-
- # Calculate elapsed time in ms
- delta_time = (gyroscope_data_ts - self.__last_gyroscope_ts) / 1e3
-
- # Integrate gyroscope
- self.__rotation = self.__rotation + (self.__last_gyroscope * delta_time)
-
- # Store current as last
- self.__last_gyroscope_ts = gyroscope_data_ts
- self.__last_gyroscope = current_gyroscope
-
- @property
- def rotation(self) -> numpy.array:
- """Return current rotation value (euler angles in degree)."""
-
- return self.__rotation
-
- def _accelerometer_linear_fit(self, x, a, b):
- """Linear function for accelerometer axis correction."""
- return a * x + b
-
- def calibrate_accelerometer_axis_coefficients(self, axis, upward_ts_buffer, downward_ts_buffer, perpendicular_ts_buffer):
- """Calibrate one accelerometer axis using three data set (upward/+1g, downward/-1g, perpendicular/0g) for linear fit."""
-
- # Consider accelerometer axis values without timestamps
- accelerometer_values = []
- expected_values = []
-
- for (upward_ts, upward_data_object), (downward_ts, downward_data_object), (perpendicular_ts, perpendicular_data_object) in zip(upward_ts_buffer.items(), downward_ts_buffer.items(), perpendicular_ts_buffer.items()):
-
- accelerometer_values.append(upward_data_object.value[axis])
- expected_values.append(+EARTH_GRAVITY)
-
- accelerometer_values.append(downward_data_object.value[axis])
- expected_values.append(-EARTH_GRAVITY)
-
- accelerometer_values.append(perpendicular_data_object.value[axis])
- expected_values.append(0.0)
-
- # Find optimal coefficients according linear fit between accelerometer values and expected values
- optimal_coefficients, _ = curve_fit(self._accelerometer_linear_fit, accelerometer_values, expected_values, maxfev = 10000)
-
- # Store results for the given axis
- self.__accelerometer_coefficients[axis] = numpy.array(optimal_coefficients)
-
- @property
- def accelerometer_coefficients(self) -> numpy.array:
- """Return accelerometer coefficients."""
-
- return self.__accelerometer_coefficients
-
- def apply_accelerometer_coefficients(self, accelerometer_data_object: TobiiData.Accelerometer) -> "TobiiData.Accelerometer":
- """Add accelerometer offset to given accelerometer data."""
-
- x = self._accelerometer_linear_fit(accelerometer_data_object.value[0], *self.__accelerometer_coefficients[0])
- y = self._accelerometer_linear_fit(accelerometer_data_object.value[1], *self.__accelerometer_coefficients[1])
- z = self._accelerometer_linear_fit(accelerometer_data_object.value[2], *self.__accelerometer_coefficients[2])
-
- return TobiiData.Accelerometer(numpy.array([x, y , z]))
-
- def reset_translation(self, translation_speed = numpy.zeros(3)):
- """Reset translation value before to start integration process."""
-
- self.__last_accelerometer_ts = None
-
- self.__translation_speed = translation_speed
- self.__translation = numpy.zeros(3)
-
- def update_translation(self, accelerometer_data_ts, accelerometer_data_object):
- """Integrate timestamped accelerometer values to update translation."""
-
- print('> update_translation: accelerometer_data_ts=', accelerometer_data_ts)
-
- # Convert m/s2 into cm/ms2
- current_accelerometer = accelerometer_data_object.value * 1e-4
-
- print('\tcurrent_accelerometer(cm/ms2)=', current_accelerometer)
- print('\tcurrent_accelerometer norm=', numpy.linalg.norm(current_accelerometer))
-
- # Init accelerometer integration
- if self.__last_accelerometer_ts == None:
-
- self.__last_accelerometer_ts = accelerometer_data_ts
- self.__last_accelerometer = current_accelerometer
- self.__last_translation_speed = numpy.zeros(3)
-
- # Calculate elapsed time in ms
- delta_time = (accelerometer_data_ts - self.__last_accelerometer_ts) / 1e3
-
- print('\tdelta_time=', delta_time)
-
- # Integrate accelerometer
- self.__translation_speed = self.__translation_speed + (self.__last_accelerometer * delta_time)
- self.__translation = self.__translation + (self.__last_translation_speed * delta_time)
-
- print('\tself.__translation_speed(cm/ms)=', self.__translation_speed)
- print('\tself.__translation(cm)=', self.__translation)
-
- # Store current as last
- self.__last_accelerometer = current_accelerometer
- self.__last_accelerometer_ts = accelerometer_data_ts
- self.__last_translation_speed = self.__translation_speed
-
- print('< update_translation')
-
- #else:
-
- # print('no valid head plumb')
-
- @property
- def translation(self) -> numpy.array:
- """Return current translation vector."""
-
- return self.__translation
-
- @property
- def translation_speed(self) -> numpy.array:
- """Return current translation speed vector."""
-
- return self.__translation_speed
-
- def rotate_plumb(self, rvec):
- """Rotate imu plumb to remove gravity effect in accelerometer data."""
-
- C, _ = cv.Rodrigues(rvec)
- self.__plumb = C.dot(EARTH_GRAVITY_VECTOR)
-
- # Check plumb length
- assert(math.isclose(numpy.linalg.norm(self.__plumb), math.fabs(EARTH_GRAVITY), abs_tol=1e-3))
-
- @property
- def plumb(self) -> numpy.array:
- """Return plumb vector."""
-
- return self.__plumb
-
- def apply_plumb(self, accelerometer_data_object: TobiiData.Accelerometer) -> "TobiiData.Accelerometer":
- """Remove gravity along plumb vector to given accelerometer data."""
-
- return TobiiData.Accelerometer(accelerometer_data_object.value - self.__plumb)
diff --git a/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py b/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py
deleted file mode 100644
index c65b121..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py
+++ /dev/null
@@ -1,226 +0,0 @@
-from typing import TypeVar, Any
-import logging
-import sys
-import socket
-import threading
-import json
-import time
-
-# python2 backwards compatibility for errors
-if sys.version_info[0] < 3:
- class ConnectionError(BaseException):
- pass
-
-try:
- import netifaces
- TOBII_DISCOVERY_ALLOWED = True
-except:
- TOBII_DISCOVERY_ALLOWED = False
-
-try:
- from urllib.parse import urlparse, urlencode
- from urllib.request import urlopen, Request
- from urllib.error import URLError, HTTPError
-
-except ImportError:
- from urlparse import urlparse
- from urllib import urlencode
- from urllib2 import urlopen, Request, HTTPError, URLError
-
-socket.IPPROTO_IPV6 = 41
-
-SocketType = TypeVar('socket', bound="socket")
-# Type definition for type annotation convenience
-
-class TobiiNetworkInterface():
- """Handle network connection to Tobii glasses Pro 2 device.
- It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py)."""
-
- def __init__(self, address = None):
-
- self.udpport = 49152
- self.address = address
- self.iface_name = None
-
- if self.address is None:
-
- data, address = self.__discover_device()
-
- if address is None:
- raise ConnectionError("No device found using discovery process")
- else:
- try:
- self.address = data["ipv4"]
- except:
- self.address = address
-
- if "%" in self.address:
- if sys.platform == "win32":
- self.address,self.iface_name = self.address.split("%")
- else:
- self.iface_name = self.address.split("%")[1]
-
- if ':' in self.address:
- self.base_url = 'http://[%s]' % self.address
- else:
- self.base_url = 'http://' + self.address
-
- self.__peer = (self.address, self.udpport)
-
- def make_socket(self) -> SocketType:
- """Create a socket to enable network communication."""
-
- iptype = socket.AF_INET
-
- if ':' in self.__peer[0]:
- iptype = socket.AF_INET6
-
- res = socket.getaddrinfo(self.__peer[0], self.__peer[1], socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)
- family, socktype, proto, canonname, sockaddr = res[0]
- new_socket = socket.socket(family, socktype, proto)
-
- new_socket.settimeout(5.0)
-
- try:
- if iptype == socket.AF_INET6:
- new_socket.setsockopt(socket.SOL_SOCKET, 25, 1)
-
- except socket.error as e:
- if e.errno == 1:
- logging.warning("Binding to a network interface is permitted only for root users.")
-
- return new_socket
-
- def __discover_device(self):
-
- if TOBII_DISCOVERY_ALLOWED == False:
- logging.error("Device discovery is not available due to a missing dependency (netifaces)")
- exit(1)
-
- MULTICAST_ADDR = 'ff02::1'
- PORT = 13006
-
- for i in netifaces.interfaces():
-
- if netifaces.AF_INET6 in netifaces.ifaddresses(i).keys():
-
- if "%" in netifaces.ifaddresses(i)[netifaces.AF_INET6][0]['addr']:
-
- if_name = netifaces.ifaddresses(i)[netifaces.AF_INET6][0]['addr'].split("%")[1]
- if_idx = socket.getaddrinfo(MULTICAST_ADDR + "%" + if_name, PORT, socket.AF_INET6, socket.SOCK_DGRAM)[0][4][3]
-
- s6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
- s6.settimeout(30.0)
- s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, if_idx)
- s6.bind(('::', PORT))
-
- PORT_OUT = PORT if sys.platform == 'win32' or sys.platform == 'darwin' else PORT + 1
-
- try:
-
- # Sending discover request
- discover_json = '{"type":"discover"}'
- s6.sendto(discover_json.encode('utf-8'), (MULTICAST_ADDR, PORT_OUT))
-
- # Waiting for a reponse from the device ...
- data, address = s6.recvfrom(1024)
- jdata = json.loads(data.decode('utf-8'))
-
- addr = address[0]
-
- if sys.version_info.major == 3 and sys.version_info.minor >= 8:
- addr = address[0] + '%' + if_name
-
- return (jdata, addr)
-
- except:
-
- # No device found on interface
- pass
-
- return (None, None)
-
- def get_request(self, api_action) -> str:
- """Send a GET request and get data back."""
-
- url = self.base_url + api_action
- res = urlopen(url).read()
-
- try:
- data = json.loads(res.decode('utf-8'))
- except json.JSONDecodeError:
- data = None
-
- return data
-
- def post_request(self, api_action, data=None, wait_for_response=True) -> str:
- """Send a POST request and get result back."""
-
- url = self.base_url + api_action
- req = Request(url)
- req.add_header('Content-Type', 'application/json')
- data = json.dumps(data)
-
- logging.debug("Sending JSON: " + str(data))
-
- if wait_for_response is False:
- threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start()
- return None
-
- response = urlopen(req, data.encode('utf-8'))
- res = response.read()
-
- logging.debug("Response: " + str(res))
-
- try:
- res = json.loads(res.decode('utf-8'))
- except:
- pass
-
- return res
-
- def send_keep_alive_msg(self, socket, msg):
- """Send a message to keep socket opened."""
-
- res = socket.sendto(msg.encode('utf-8'), self.__peer)
-
- def grab_data(self, socket) -> bytes:
- """Read incoming socket data."""
-
- try:
- data, address = socket.recvfrom(1024)
- return data
-
- except TimeoutError:
-
- logging.error("A timeout occurred while receiving data")
-
- def wait_for_status(self, api_action, key, values, timeout = None) -> Any:
- """Wait until a status matches given values."""
-
- url = self.base_url + api_action
- running = True
-
- while running:
-
- req = Request(url)
- req.add_header('Content-Type', 'application/json')
-
- try:
-
- response = urlopen(req, None, timeout = timeout)
-
- except URLError as e:
-
- logging.error(e.reason)
- return -1
-
- data = response.read()
- json_data = json.loads(data.decode('utf-8'))
-
- if json_data[key] in values:
- running = False
-
- time.sleep(1)
-
- return json_data[key]
diff --git a/src/argaze/TobiiGlassesPro2/TobiiSpecifications.py b/src/argaze/TobiiGlassesPro2/TobiiSpecifications.py
deleted file mode 100644
index 1b2a275..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiSpecifications.py
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env python
-
-"""Here is the article where from following [Tobii specifications](https://www.biorxiv.org/content/10.1101/299925v1) come."""
-
-ACCURACY = 1.42
-"""Gaze position accuracy in degree."""
-
-PRECISION = 0.34 # degree
-"""Gaze position precision in degree."""
-
-CAMERA_HFOV = 82
-"""Camera horizontal field of view in degree."""
-
-VISUAL_HFOV = 160
-"""Visual horizontal field of view in degree.""" \ No newline at end of file
diff --git a/src/argaze/TobiiGlassesPro2/TobiiVideo.py b/src/argaze/TobiiGlassesPro2/TobiiVideo.py
deleted file mode 100644
index 1292de8..0000000
--- a/src/argaze/TobiiGlassesPro2/TobiiVideo.py
+++ /dev/null
@@ -1,283 +0,0 @@
-#!/usr/bin/env python
-
-from typing import TypeVar, Tuple
-from dataclasses import dataclass, field
-import threading
-import uuid
-import time
-import copy
-
-from argaze.TobiiGlassesPro2 import TobiiNetworkInterface
-
-import cv2 as cv
-import av
-import numpy
-
-TobiiVideoFrameType = TypeVar('TobiiVideoFrame', bound="TobiiVideoFrame")
-# Type definition for type annotation convenience
-
-AvStreamType = TypeVar('av.stream.Stream', bound="av.stream.Stream")
-# Type definition for type annotation convenience
-
-@dataclass
-class TobiiVideoFrame():
- """Define tobii video frame"""
-
- matrix: list
- """Video frame matrix."""
-
- width: int = field(init=False)
- """Inferred video frame width."""
-
- height: int = field(init=False)
- """Inferred video frame height."""
-
- def __post_init__(self):
- """fill dimension attributes."""
-
- self.height, self.width = self.matrix.shape[:2]
-
- def copy(self) -> TobiiVideoFrameType:
- """Copy tobii video frame."""
-
- return TobiiVideoFrame(self.matrix.copy())
-
-class TobiiVideoSegment():
- """Handle Tobii Glasses Pro 2 segment video file."""
-
- def __init__(self, segment_video_path, start_timestamp:int = 0, end_timestamp:int = None):
- """Load segment video from segment directory"""
-
- self.__path = segment_video_path
-
- self.__container = av.open(self.__path)
- self.__stream = self.__container.streams.video[0]
-
- self.__width = int(cv.VideoCapture(self.__path).get(cv.CAP_PROP_FRAME_WIDTH))
- self.__height = int(cv.VideoCapture(self.__path).get(cv.CAP_PROP_FRAME_HEIGHT))
-
- self.__start_timestamp = start_timestamp
- self.__end_timestamp = end_timestamp
-
- # position at the given start time
- self.__container.seek(self.__start_timestamp)
-
- @property
- def path(self) -> str:
- """Get video segment path."""
-
- return self.__path
-
- @property
- def duration(self) -> int:
- """Duration in microsecond."""
-
- if self.__end_timestamp == None:
- return int((self.__stream.duration * self.__stream.time_base) * 1e6) - self.__start_timestamp
- else:
- return self.__end_timestamp - self.__start_timestamp
-
- @property
- def width(self) -> int:
- """Video width dimension."""
-
- return self.__width
-
- @property
- def height(self) -> int:
- """Video height dimension."""
-
- return self.__height
-
- @property
- def stream(self) -> AvStreamType:
- """Video stream."""
-
- return self.__stream
-
- def get_frame(self, i) -> Tuple[int, "TobiiVideoFrame"]:
- """Access to a frame."""
-
- if i < 0:
- ValueError('Frame index must be a positive integer.')
-
- counter = 0
- frame = None
- video_ts = 0
-
- # position at the given start time
- self.__container.seek(self.__start_timestamp)
-
- # start decoding
- self.__container.decode(self.__stream)
-
- while counter <= i:
-
- frame = self.__container.decode(self.__stream).__next__()
- video_ts = int(frame.time * 1e6)
- counter += 1
-
- # return micro second timestamp and frame data
- return video_ts, TobiiVideoFrame(frame.to_ndarray(format='bgr24'))
-
- def frames(self) -> Tuple[int, "TobiiVideoFrame"]:
- """Access to frame iterator."""
-
- return self.__iter__()
-
- def __iter__(self):
-
- # start decoding
- self.__container.decode(self.__stream)
-
- return self
-
- def __next__(self):
-
- frame = self.__container.decode(self.__stream).__next__()
-
- video_ts = int(frame.time * 1e6)
-
- # Ignore before start timestamp
- if video_ts < self.__start_timestamp:
- return self.__next__()
-
- # Ignore frames after end timestamp
- if self.__end_timestamp != None:
-
- if video_ts >= self.__end_timestamp:
- raise StopIteration
-
- # return micro second timestamp and frame data
- return video_ts, TobiiVideoFrame(frame.to_ndarray(format='bgr24'))
-
-class TobiiVideoStream(threading.Thread):
- """Capture Tobii Glasses Pro 2 video camera stream."""
-
- def __init__(self, network_interface: TobiiNetworkInterface.TobiiNetworkInterface):
- """Initialise video stream reception."""
-
- threading.Thread.__init__(self)
- threading.Thread.daemon = True
-
- self.__network = network_interface
- self.__video_socket = self.__network.make_socket()
-
- self.__stop_event = threading.Event()
- self.__read_lock = threading.Lock()
-
- self.__frame_tuple = None
-
- # prepare keep alive message
- self.__keep_alive_msg = "{\"type\": \"live.video.unicast\",\"key\": \""+ str(uuid.uuid4()) +"_video\", \"op\": \"start\"}"
- self.__keep_alive_thread = threading.Timer(0, self.__keep_alive)
- self.__keep_alive_thread.daemon = True
-
- def __del__(self):
- """Stop data reception before destruction."""
-
- if self.is_alive():
-
- self.close()
-
- def __keep_alive(self):
- """Maintain connection."""
-
- while not self.__stop_event.isSet():
-
- self.__network.send_keep_alive_msg(self.__video_socket, self.__keep_alive_msg)
-
- time.sleep(1)
-
- def open(self):
- """Start data reception."""
-
- self.__keep_alive_thread.start()
- threading.Thread.start(self)
-
- def close(self):
- """Stop data reception definitively."""
-
- self.__stop_event.set()
-
- threading.Thread.join(self.__keep_alive_thread)
- threading.Thread.join(self)
-
- self.__video_socket.close()
-
- def run(self):
- """Store frame for further reading."""
-
- container = av.open(f'rtsp://{self.__network.get_address()}:8554/live/scene', options={'rtsp_transport': 'tcp'})
- stream = container.streams.video[0]
-
- for frame in container.decode(stream):
-
- # quit if the video acquisition thread have been stopped
- if self.__stop_event.isSet():
- break
-
- # lock frame access
- self.__read_lock.acquire()
-
- # store frame time, matrix into a tuple
- self.__frame_tuple = (frame.time, frame.to_ndarray(format='bgr24'))
-
- # unlock frame access
- self.__read_lock.release()
-
- def read(self) -> Tuple[int, "TobiiVideoFrame"]:
- """Read incoming video frames."""
-
- # if the video acquisition thread have been stopped or isn't started
- if self.__stop_event.isSet() or self.__frame_tuple == None:
- return -1, TobiiVideoFrame(numpy.zeros((1, 1, 3), numpy.uint8))
-
- # lock frame access
- self.__read_lock.acquire()
-
- # copy frame tuple
- frame_tuple = copy.deepcopy(self.__frame_tuple)
-
- # unlock frame access
- self.__read_lock.release()
-
- return int(frame_tuple[0] * 1e6), TobiiVideoFrame(frame_tuple[1])
-
-class TobiiVideoOutput():
- """Export a video file at the same format than a given referent stream."""
- # TODO : Make a generic video managment to handle video from any device (not only Tobii)
-
- def __init__(self, output_video_path: str, referent_stream: av.stream.Stream):
- """Create a video file"""
-
- self.__path = output_video_path
- self.__container = av.open(self.__path, 'w')
- self.__stream = self.__container.add_stream(\
- referent_stream.codec_context.name, \
- width=referent_stream.codec_context.width, \
- height=referent_stream.codec_context.height, \
- rate=referent_stream.codec_context.framerate, \
- gop_size=referent_stream.codec_context.gop_size, \
- pix_fmt=referent_stream.codec_context.pix_fmt, \
- bit_rate=referent_stream.codec_context.bit_rate)
-
- @property
- def path(self) -> str:
- """Get video file path."""
-
- return self.__path
-
- def write(self, frame):
- """Write a frame into the output video file"""
-
- formated_frame = av.VideoFrame.from_ndarray(frame, format='bgr24')
- formated_frame.reformat(format=self.__stream.codec_context.pix_fmt, interpolation=None)
- self.__container.mux(self.__stream.encode(formated_frame))
-
- def close(self):
- """End the writing of the video file"""
-
- self.__container.mux(self.__stream.encode())
- self.__container.close()
-
diff --git a/src/argaze/TobiiGlassesPro2/__init__.py b/src/argaze/TobiiGlassesPro2/__init__.py
deleted file mode 100644
index 50fb742..0000000
--- a/src/argaze/TobiiGlassesPro2/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-"""
-.. include:: README.md
-"""
-__docformat__ = "restructuredtext"
-__all__ = ['TobiiEntities', 'TobiiController', 'TobiiNetworkInterface', 'TobiiData', 'TobiiVideo', 'TobiiInertialMeasureUnit', 'TobiiSpecifications',] \ No newline at end of file
diff --git a/src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf b/src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf
deleted file mode 100644
index dfdbe0a..0000000
--- a/src/argaze/TobiiGlassesPro2/utils/A4_calibration_target.pdf
+++ /dev/null
Binary files differ
diff --git a/src/argaze/TobiiGlassesPro2/utils/imu.json b/src/argaze/TobiiGlassesPro2/utils/imu.json
deleted file mode 100644
index b701b00..0000000
--- a/src/argaze/TobiiGlassesPro2/utils/imu.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "gyroscope_offset": [
- 1.00506,
- -3.338012,
- 1.9096039999999999
- ],
- "accelerometer_coefficients": [
- [
- 0.9918761478460875,
- -0.29679248712760953
- ],
- [
- 0.9749789492717152,
- -0.15941685808576067
- ],
- [
- 0.9520423758351338,
- 0.23147323632416672
- ]
- ]
-} \ No newline at end of file
diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py
index 1162ba7..ef75dd2 100644
--- a/src/argaze/__init__.py
+++ b/src/argaze/__init__.py
@@ -2,4 +2,4 @@
.. include:: ../../README.md
"""
__docformat__ = "restructuredtext"
-__all__ = ['ArFeatures','GazeFeatures','GazeAnalysis','ArUcoMarkers','AreaOfInterest','DataStructures','TobiiGlassesPro2','utils'] \ No newline at end of file
+__all__ = ['ArFeatures','GazeFeatures','GazeAnalysis','ArUcoMarkers','AreaOfInterest','DataStructures','utils'] \ No newline at end of file
diff --git a/src/argaze/utils/README.md b/src/argaze/utils/README.md
index 16c85c0..ea293f6 100644
--- a/src/argaze/utils/README.md
+++ b/src/argaze/utils/README.md
@@ -20,77 +20,10 @@ Export a 7 columns and 5 rows calibration board made of 5cm squares with 3cm mar
python ./src/argaze/utils/aruco_calibration_board_export.py 7 5 5 3 -o _export -d DICT_APRILTAG_16h5 -r 300
```
-# Tobii calibration
+# TODO: Camera calibration
-Calibrate Tobii Glasses Pro 2 camera (-t IP_ADDRESS) using a 7 columns and 5 rows calibration board made of 5cm squares with 3cm markers from DICT_APRILTAG_16h5 dictionary. Then, export its optical parameters into an tobii_camera.json file:
+Calibrate a network camera (-t IP_ADDRESS) using a 7 columns and 5 rows calibration board made of 5cm squares with 3cm markers from DICT_APRILTAG_16h5 dictionary. Then, export its optical parameters into an camera.json file:
```
-python ./src/argaze/utils/tobii_camera_calibrate.py 7 5 5 3 -t IP_ADDRESS -d DICT_APRILTAG_16h5 -o _export/tobii_camera.json
-```
-
-Calibrate Tobii Glasses Pro 2 inertial measure unit (-t IP_ADDRESS) then, export calibration parameters into an imu.json file:
-
-```
-python ./src/argaze/utils/tobii_imu_calibrate.py -t IP_ADDRESS -o _export/imu.json
-```
-
-# Tobii session
-
-Display Tobii Glasses Pro 2 camera video stream (-t IP_ADDRESS) with a live gaze pointer. Loading calibration file to display inertial sensors data:
-
-```
-python ./src/argaze/utils/tobii_stream_display.py -t IP_ADDRESS -i _export/imu.json
-```
-
-Record a Tobii Glasses Pro 2 'myProject' session for a 'myUser' participant on Tobii interface's SD card (-t IP_ADDRESS):
-
-```
-python ./src/argaze/utils/tobii_segment_record.py -t IP_ADDRESS -p myProject -u myUser
-```
-
-# Tobii drive
-
-Explore Tobii Glasses Pro 2 interface's SD Card (-d DRIVE_PATH, -p PROJECT_PATH, -r RECORDING_PATH, -s SEGMENT_PATH):
-
-```
-python ./src/argaze/utils/tobii_sdcard_explore.py -d DRIVE_PATH
-```
-
-```
-python ./src/argaze/utils/tobii_sdcard_explore.py -p PROJECT_PATH
-```
-
-```
-python ./src/argaze/utils/tobii_sdcard_explore.py -r RECORDING_PATH
-```
-
-```
-python ./src/argaze/utils/tobii_sdcard_explore.py -s SEGMENT_PATH
-```
-
-# Tobii post-processing
-
-Replay a time range selection (-r IN OUT) Tobii Glasses Pro 2 session (-s SEGMENT_PATH) synchronizing video and some data together:
-
-```
-python ./src/argaze/utils/tobii_segment_display.py -s SEGMENT_PATH -r IN OUT
-```
-
-Export Tobii segment fixations and saccades (-s SEGMENT_PATH) from a time range selection (-r IN OUT) as fixations.csv and saccades.csv files saved into the segment folder:
-
-```
-python ./src/argaze/utils/tobii_segment_gaze_movements_export.py -s SEGMENT_PATH -r IN OUT
-```
-
-# Tobii with ArUco
-
-Detect ArUco markers (-md MARKER_DICT -ms MARKER_SIZE) into Tobii camera video stream (-t IP_ADDRESS). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame:
-
-```
-python ./src/argaze/utils/tobii_stream_aruco_aoi_display.py -t IP_ADDRESS -c _export/tobii_camera.json -md MARKER_DICT -ms MARKER_SIZE -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}'
-```
-
-Detect ArUco markers (-md MARKER_DICT -ms MARKER_SIZE) into a Tobii camera video segment (-s SEGMENT_PATH) into a time range selection (-r IN OUT). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame. Export aoi video and data as a aruco_aoi.csv, aruco_aoi.mp4 files:
-```
-python ./src/argaze/utils/tobii_segment_aruco_aoi_export.py -s SEGMENT_PATH -c _export/tobii_camera.json -md MARKER_DICT -ms MARKER_SIZE -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}' -r IN OUT
-```
+python ./src/argaze/utils/camera_calibrate.py 7 5 5 3 -t IP_ADDRESS -d DICT_APRILTAG_16h5 -o _export/camera.json
+``` \ No newline at end of file
diff --git a/src/argaze/utils/tobii_camera_calibrate.py b/src/argaze/utils/tobii_camera_calibrate.py
deleted file mode 100644
index 24cbe5c..0000000
--- a/src/argaze/utils/tobii_camera_calibrate.py
+++ /dev/null
@@ -1,140 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import time
-
-from argaze.TobiiGlassesPro2 import TobiiController, TobiiVideo
-from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoBoard, ArUcoDetector, ArUcoCamera
-
-import cv2 as cv
-
-def main():
- """
- Captures board pictures and finally outputs camera calibration data into a .json file.
-
- - Export and print a calibration board using
- - Place the calibration board in order to view it entirely on screen and move the camera in many configurations (orientation and distance) : the script will automatically take pictures. Do this step with a good lighting and a clear background.
- - Once enough pictures have been captured (~20), press Esc key then, wait for the camera calibration processing.
- - Finally, check rms parameter: it should be between 0. and 1. if the calibration succeeded (lower is better).
-
- ### Reference:
- - [Camera calibration using ArUco marker tutorial](https://automaticaddison.com/how-to-perform-camera-calibration-using-opencv/)
- """
-
- # manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('columns', metavar='COLS_NUMBER', type=int, default=7, help='number of columns')
- parser.add_argument('rows', metavar='ROWS_NUMBER', type=int, default=5, help='number of rows')
- parser.add_argument('square_size', metavar='SQUARE_SIZE', type=float, default=5, help='square size (cm)')
- parser.add_argument('marker_size', metavar='MARKER_SIZE', type=float, default=3, help='marker size (cm)')
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default='camera.json', help='destination filepath')
- parser.add_argument('-d', '--dictionary', metavar='DICT', type=str, default='DICT_ARUCO_ORIGINAL', help='aruco marker dictionnary (DICT_4X4_50, DICT_4X4_100, DICT_4X4_250, DICT_4X4_1000, DICT_5X5_50, DICT_5X5_100, DICT_5X5_250, DICT_5X5_1000, DICT_6X6_50, DICT_6X6_100, DICT_6X6_250, DICT_6X6_1000, DICT_7X7_50, DICT_7X7_100, DICT_7X7_250, DICT_7X7_1000, DICT_ARUCO_ORIGINAL, DICT_APRILTAG_16h5, DICT_APRILTAG_25h9, DICT_APRILTAG_36h10, DICT_APRILTAG_36h11)')
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(args.tobii_ip)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Setup camera at 25 fps to work on Full HD video stream
- tobii_controller.set_scene_camera_freq_25()
-
- # Get video stream dimension
- video_width = tobii_controller.get_configuration()['sys_sc_width']
- video_height = tobii_controller.get_configuration()['sys_sc_height']
-
- # Print current confirguration
- print(f'Tobii Glasses Pro 2 configuration:')
- for key, value in tobii_controller.get_configuration().items():
- print(f'\t{key}: {value}')
-
- # Enable tobii video stream
- tobii_video_stream = tobii_controller.enable_video_stream()
-
- # Create aruco camera
- aruco_camera = ArUcoCamera.ArUcoCamera(dimensions=(video_width, video_height))
-
- # Create aruco board
- aruco_board = ArUcoBoard.ArUcoBoard(args.columns, args.rows, args.square_size, args.marker_size, args.dictionary)
-
- # Create aruco detecter
- aruco_detector = ArUcoDetector.ArUcoDetector(dictionary=args.dictionary, marker_size=args.marker_size)
-
- # Start tobii glasses streaming
- tobii_controller.start_streaming()
-
- print("Camera calibration starts")
- print("Waiting for calibration board...")
-
- expected_markers_number = aruco_board.markers_number
- expected_corners_number = aruco_board.corners_number
-
- # Capture loop
- try:
-
- while tobii_video_stream.is_alive():
-
- # capture frame with a full displayed board
- video_ts, video_frame = tobii_video_stream.read()
-
- # detect all markers in the board
- aruco_detector.detect_board(video_frame.matrix, aruco_board, expected_markers_number)
-
- # draw only markers
- aruco_detector.draw_detected_markers(video_frame.matrix)
-
- # draw current calibration data count
- cv.putText(video_frame.matrix, f'Capture: {aruco_camera.calibration_data_count}', (50, 50), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv.LINE_AA)
- cv.imshow('Tobii Camera Calibration', video_frame.matrix)
-
- # if all board corners are detected
- if aruco_detector.board_corners_number == expected_corners_number:
-
- # draw board corners to notify a capture is done
- aruco_detector.draw_board(video_frame.matrix)
-
- # append data
- aruco_camera.store_calibration_data(aruco_detector.board_corners, aruco_detector.board_corners_identifier)
-
- cv.imshow('Tobii Camera Calibration', video_frame.matrix)
-
- # close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- # exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # stop frame display
- cv.destroyAllWindows()
-
- # Stop tobii glasses streaming
- tobii_controller.stop_streaming()
-
- print('\nCalibrating camera...')
- aruco_camera.calibrate(aruco_board)
-
- print('\nCalibration succeeded!')
- print(f'\nRMS:\n{aruco_camera.rms}')
- print(f'\nDimensions:\n{video_width}x{video_height}')
- print(f'\nCamera matrix:\n{aruco_camera.K}')
- print(f'\nDistortion coefficients:\n{aruco_camera.D}')
-
- aruco_camera.to_json(args.output)
-
- print(f'\nCalibration data exported into {args.output} file')
-
-if __name__ == '__main__':
-
- main()
diff --git a/src/argaze/utils/tobii_imu_calibrate.py b/src/argaze/utils/tobii_imu_calibrate.py
deleted file mode 100644
index c9e4813..0000000
--- a/src/argaze/utils/tobii_imu_calibrate.py
+++ /dev/null
@@ -1,214 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import time
-
-from argaze import DataStructures
-from argaze.TobiiGlassesPro2 import TobiiController, TobiiInertialMeasureUnit
-from argaze.utils import MiscFeatures
-
-import numpy
-import matplotlib.pyplot as mpyplot
-import matplotlib.patches as mpatches
-
-def main():
- """
- Calibrate Tobbi gyroscope and accelerometer sensors and finally outputs camera calibration data into a .json file.
-
- ### Reference:
- - [Inertial Measure Unit calibration tutorial](https://makersportal.com/blog/calibration-of-an-inertial-measurement-unit-imu-with-raspberry-pi-part-ii)
- """
-
- # manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
- parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath')
- parser.add_argument('-n', '--sample_number', metavar='BUFFER_SIZE', type=int, default=500, help='number of samples to store into calibration buffer')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default='imu.json', help='destination filepath')
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(args.tobii_ip)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Create tobii imu handler
- tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit()
-
- # Load optional imu calibration file
- if args.imu_calibration != None:
-
- tobii_imu.load_calibration_file(args.imu_calibration)
-
- # Enable tobii data stream
- tobii_data_stream = tobii_controller.enable_data_stream()
-
- # Menu loop
- try:
-
- while True:
-
- print('-' * 52)
- menu_input = input('Tobii Inertial Measure Unit sensor calibration menu:\n\t\'a\' for accelerometer calibration.\n\t\'A\' for accelerometer visualisation.\n\t\'g\' for gyroscope calibration.\n\t\'G\' for gyroscope visualisation.\n\t\'p\' print current calibration.\n\t\'s\' save calibration.\n\t\'q\' quit calibration without saving.\n>')
-
- match menu_input:
-
- case 'a':
-
- axis = ['X', 'Y', 'Z']
- directions = ['upward', 'downward', 'perpendicular']
-
- for i, axis in enumerate(axis):
-
- print(f'\nACCELEROMETER {axis} AXIS CALIBRATION')
-
- axis_buffers = {}
-
- for j, direction in enumerate(directions):
-
- input(f'\nKeep Tobii Glasses accelerometer {axis} axis {direction} then press \'Enter\' to start data acquisition.\n')
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Capture accelerometer data stream
- data_ts_buffer = DataStructures.TimeStampedBuffer()
- for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number):
-
- # Update progress Bar
- MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- axis_buffers[direction] = data_ts_buffer
-
- tobii_imu.calibrate_accelerometer_axis_coefficients(i, axis_buffers['upward'], axis_buffers['downward'], axis_buffers['perpendicular'])
-
- accelerometer_coefficients = tobii_imu.accelerometer_coefficients
-
- print(f'\n\nAccelerometer optimal linear fit coefficients over {progress} values for each axis:')
- print('\tX coefficients: ', accelerometer_coefficients[0])
- print('\tY coefficients: ', accelerometer_coefficients[1])
- print('\tZ coefficients: ', accelerometer_coefficients[2])
-
- case 'A':
-
- print('\nCAPTURE AND PLOT ACCELEROMETER STREAM')
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Capture accelerometer data stream
- data_ts_buffer = DataStructures.TimeStampedBuffer()
- for progress in tobii_data_stream.capture(data_ts_buffer, 'Accelerometer', args.sample_number):
-
- # Update progress Bar
- MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Edit figure
- figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
- data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
- figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144)
-
- # Plot data
- subplot = figure.add_subplot(111)
- subplot.set_title('Accelerometer', loc='left')
- patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Display figure
- mpyplot.show()
- figure.clear()
-
- case 'g':
-
- print('\nGYROSCOPE CALIBRATION')
- input('Keep Tobii Glasses steady then press \'Enter\' to start data acquisition.\n')
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Capture gyroscope data stream
- data_ts_buffer = DataStructures.TimeStampedBuffer()
- for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number):
-
- # Update progress Bar
- MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- gyroscope_offset = tobii_imu.calibrate_gyroscope_offset(data_ts_buffer)
-
- print(f'\n\nGyroscope average over {progress} values for each axis:')
- print('\tX offset: ', gyroscope_offset[0])
- print('\tY offset: ', gyroscope_offset[1])
- print('\tZ offset: ', gyroscope_offset[2])
-
- case 'G':
-
- print('\nCAPTURE AND PLOT GYROSCOPE STREAM')
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Capture accelerometer data stream
- data_ts_buffer = DataStructures.TimeStampedBuffer()
- for progress in tobii_data_stream.capture(data_ts_buffer, 'Gyroscope', args.sample_number):
-
- # Update progress Bar
- MiscFeatures.printProgressBar(progress, args.sample_number, prefix = 'Data acquisition:', suffix = 'Complete', length = 100)
-
- # Edit figure
- figure_width = min(args.sample_number/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
- data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
- figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144)
-
- # Plot data
- subplot = figure.add_subplot(111)
- subplot.set_title('Gyroscope', loc='left')
- patches = data_ts_buffer.plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Display figure
- mpyplot.show()
- figure.clear()
-
- case 'p':
-
- gyroscope_offset = tobii_imu.gyroscope_offset
-
- print(f'\nGyroscope offset for each axis:')
- print('\tX offset: ', gyroscope_offset[0])
- print('\tY offset: ', gyroscope_offset[1])
- print('\tZ offset: ', gyroscope_offset[2])
-
- accelerometer_coefficients = tobii_imu.accelerometer_coefficients
-
- print(f'\nAccelerometer optimal linear fit coefficients for each axis:')
- print('\tX coefficients: ', accelerometer_coefficients[0])
- print('\tY coefficients: ', accelerometer_coefficients[1])
- print('\tZ coefficients: ', accelerometer_coefficients[2])
-
- case 's':
-
- tobii_imu.save_calibration_file(args.output)
- print(f'\nCalibration data exported into {args.output} file')
-
- break
-
- case 'q':
-
- break
-
- # exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
-if __name__ == '__main__':
-
- main()
diff --git a/src/argaze/utils/tobii_sdcard_explore.py b/src/argaze/utils/tobii_sdcard_explore.py
deleted file mode 100644
index ed297be..0000000
--- a/src/argaze/utils/tobii_sdcard_explore.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiData, TobiiVideo
-
-import matplotlib.pyplot as mpyplot
-import matplotlib.patches as mpatches
-
-def main():
- """
- Explore Tobii Glasses Pro 2 interface's SD Card
- """
-
- # manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-d', '--drive_path', metavar='DRIVE_PATH', type=str, default=None, help='drive path')
- parser.add_argument('-p', '--project_path', metavar='PROJECT_PATH', type=str, default=None, help='project path')
- parser.add_argument('-r', '--recording_path', metavar='RECORDING_PATH', type=str, default=None, help='recording path')
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- args = parser.parse_args()
-
- if args.drive_path != None:
-
- # Load all projects from a tobii drive
- tobii_drive = TobiiEntities.TobiiDrive(args.drive_path)
-
- for project in tobii_drive.projects():
- print(f'Project id: {project.id}, name: {project.name}')
-
- elif args.project_path != None:
-
- # Load one tobii project
- tobii_project = TobiiEntities.TobiiProject(args.project_path)
-
- for participant in tobii_project.participants():
- print(f'Participant id: {participant.id}, name: {participant.name}')
-
- for recording in tobii_project.recordings():
- print(f'Recording id: {recording.id}, name: {recording.name}')
- print(f'\tProject: {recording.project.name}')
- print(f'\tParticipant: {recording.participant.name}')
-
- elif args.recording_path != None:
-
- # Load a tobii segment
- tobii_recording = TobiiEntities.TobiiRecording(args.recording_path)
-
- for segment in tobii_recording.segments():
- print(f'Segment id: {segment.id}')
-
- elif args.segment_path != None:
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path)
-
- tobii_segment_video = tobii_segment.load_video()
- print(f'Video properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'Loaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Edit figure
- figure_width = min(tobii_segment_video.duration/10, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
- data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
- figure = mpyplot.figure(figsize=(figure_width, 5), dpi=144)
-
- # Plot data
- subplot = figure.add_subplot(111)
- subplot.set_title('VideoTimeStamps', loc='left')
- patches = tobii_segment_data['VideoTimeStamp'].plot(names=['offset','value'], colors=['#276FB6','#9427B6'], samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Display figure
- mpyplot.show()
- figure.clear()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_arscene_edit.py b/src/argaze/utils/tobii_segment_arscene_edit.py
deleted file mode 100644
index b8a5745..0000000
--- a/src/argaze/utils/tobii_segment_arscene_edit.py
+++ /dev/null
@@ -1,381 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import json
-import time
-
-from argaze import *
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications
-from argaze.ArUcoMarkers import *
-from argaze.AreaOfInterest import *
-from argaze.utils import MiscFeatures
-
-import numpy
-import cv2 as cv
-
-def main():
- """
- Open video file with ArUco marker scene inside
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-p', '--project_path', metavar='ARGAZE_PROJECT', type=str, default=None, help='json argaze project filepath')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction)
- args = parser.parse_args()
-
- if args.segment_path != None:
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- #vs_data_filepath = f'{destination_path}/visual_scan.csv'
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'Video properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Load ar scene
- ar_scene = ArScene.ArScene.from_json(args.project_path)
-
- print(ar_scene)
-
- # Display first frame
- video_ts, video_frame = tobii_segment_video.get_frame(0)
- cv.imshow(f'Segment {tobii_segment.id} ArUco marker editor', video_frame.matrix)
-
- # Init mouse interaction variables
- pointer = (0, 0)
- left_click = (0, 0)
- right_click = (0, 0)
- right_button = False
- edit_trans = False # translate
- edit_coord = 0 # x
-
- # On mouse left left_click : update pointer position
- def on_mouse_event(event, x, y, flags, param):
-
- nonlocal pointer
- nonlocal left_click
- nonlocal right_click
- nonlocal right_button
-
- # Update pointer
- pointer = (x, y)
-
- # Update left_click
- if event == cv.EVENT_LBUTTONUP:
-
- left_click = pointer
-
- # Udpate right_button
- elif event == cv.EVENT_RBUTTONDOWN:
-
- right_button = True
-
- elif event == cv.EVENT_RBUTTONUP:
-
- right_button = False
-
- # Udpate right_click
- if right_button:
-
- right_click = pointer
-
- cv.setMouseCallback(f'Segment {tobii_segment.id} ArUco marker editor', on_mouse_event)
-
- # Frame selector loop
- frame_index = 0
- last_frame_index = -1
- last_frame = video_frame.copy()
- force_update = False
-
- selected_marker_id = -1
-
- try:
-
- while True:
-
- # Select a frame on change
- if frame_index != last_frame_index or force_update:
-
- video_ts, video_frame = tobii_segment_video.get_frame(frame_index)
- video_ts_ms = video_ts / 1e3
-
- last_frame_index = frame_index
- last_frame = video_frame.copy()
-
- # Hide frame left and right borders before detection to ignore markers outside focus area
- cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width/6), int(video_frame.height)), (0, 0, 0), -1)
- cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - 1/6)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1)
-
- try:
-
- # Estimate scene pose from ArUco markers into frame.
- tvec, rmat, _ = ar_scene.estimate_pose(video_frame.matrix)
-
- # Catch exceptions raised by estimate_pose method
- except ArScene.PoseEstimationFailed as e:
-
- cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- else:
-
- video_frame = last_frame.copy()
-
- # Edit fake gaze position from pointer
- gaze_position = GazeFeatures.GazePosition(pointer, precision=2)
-
- # Copy video frame to edit visualisation on it with out disrupting aruco detection
- visu_frame = video_frame.copy()
-
- try:
-
- # Project AOI scene into frame according estimated pose
- aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV)
-
- # Catch exceptions raised by project method
- except ArScene.SceneProjectionFailed as e:
-
- cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Draw detected markers
- ar_scene.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- # Draw scene projection
- aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255))
-
- # Project 3D scene on each video frame and the visualisation frame
- if len(ar_scene.aruco_detector.detected_markers) > 0:
-
- # Write detected marker ids
- cv.putText(visu_frame.matrix, f'Detected markers: {list(ar_scene.aruco_detector.detected_markers.keys())}', (20, visu_frame.height - 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Update selected marker id by left_clicking on marker
- for (marker_id, marker) in ar_scene.aruco_detector.detected_markers.items():
-
- marker_aoi = marker.corners.reshape(4, 2).view(AOIFeatures.AreaOfInterest)
-
- if marker_aoi.contains_point(left_click):
-
- selected_marker_id = marker_id
-
- # If a marker is selected
- try:
-
- # Retreive marker index
- selected_marker = ar_scene.aruco_detector.detected_markers[selected_marker_id]
-
- marker_x, marker_y = selected_marker.center
- '''
- if right_button:
-
- pointer_delta_x, pointer_delta_y = (right_click[0] - marker_x) / (visu_frame.width/3), (marker_y - right_click[1]) / (visu_frame.width/3)
-
- if edit_trans:
-
- # Edit scene rotation
- if edit_coord == 0:
- aoi3D_scene_edit['rotation'] = numpy.array([pointer_delta_y, aoi3D_scene_edit['rotation'][1], aoi3D_scene_edit['rotation'][2]])
-
- elif edit_coord == 1:
- aoi3D_scene_edit['rotation'] = numpy.array([aoi3D_scene_edit['rotation'][0], pointer_delta_x, aoi3D_scene_edit['rotation'][2]])
-
- elif edit_coord == 2:
- aoi3D_scene_edit['rotation'] = numpy.array([aoi3D_scene_edit['rotation'][0], aoi3D_scene_edit['rotation'][1], -1*pointer_delta_y])
-
- else:
-
- # Edit scene translation
- if edit_coord == 0:
- aoi3D_scene_edit['translation'] = numpy.array([pointer_delta_x, aoi3D_scene_edit['translation'][1], aoi3D_scene_edit['translation'][2]])
-
- elif edit_coord == 1:
- aoi3D_scene_edit['translation'] = numpy.array([aoi3D_scene_edit['translation'][0], pointer_delta_y, aoi3D_scene_edit['translation'][2]])
-
- elif edit_coord == 2:
- aoi3D_scene_edit['translation'] = numpy.array([aoi3D_scene_edit['translation'][0], aoi3D_scene_edit['translation'][1], 2*pointer_delta_y])
- '''
- # Apply transformation
- aoi_scene_edited = ar_scene.aoi_scene#.transform(aoi3D_scene_edit['translation'], aoi3D_scene_edit['rotation'])
-
- cv.rectangle(visu_frame.matrix, (0, 130), (460, 450), (127, 127, 127), -1)
- '''
- # Write rotation matrix
- R, _ = cv.Rodrigues(aoi3D_scene_edit['rotation'])
- cv.putText(visu_frame.matrix, f'Rotation matrix:', (20, 160), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{R[0][0]:.3f} {R[0][1]:.3f} {R[0][2]:.3f}', (40, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{R[1][0]:.3f} {R[1][1]:.3f} {R[1][2]:.3f}', (40, 240), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{R[2][0]:.3f} {R[2][1]:.3f} {R[2][2]:.3f}', (40, 280), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA)
-
- # Write translation vector
- T = aoi3D_scene_edit['translation']
- cv.putText(visu_frame.matrix, f'Translation vector:', (20, 320), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{T[0]:.3f}', (40, 360), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{T[1]:.3f}', (40, 400), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{T[2]:.3f}', (40, 440), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA)
- '''
- # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it
- # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable.
- scene_projection_edited = aoi_scene_edited.project(selected_marker.translation, selected_marker.rotation, ar_scene.aruco_camera.K)
-
- # Draw aoi scene
- scene_projection_edited.draw_raycast(visu_frame.matrix, gaze_position)
-
- # Write warning related to marker pose processing
- except UserWarning as e:
-
- cv.putText(visu_frame.matrix, f'Marker {selected_marker_id}: {e}', (20, 120), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
-
- except KeyError:
-
- # Write error
- if selected_marker_id >= 0:
- cv.putText(visu_frame.matrix, f'Marker {selected_marker_id} not found', (20, 120), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
-
- # Draw focus area
- cv.rectangle(visu_frame.matrix, (int(visu_frame.width/6), 0), (int(visu_frame.width*(1-1/6)), int(visu_frame.height)), (255, 150, 150), 1)
-
- # Draw center
- cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1)
- cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1)
-
- # Draw pointer
- gaze_position.draw(visu_frame.matrix)
-
- # Write segment timing
- cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(visu_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Write selected marker id
- if selected_marker_id >= 0:
-
- cv.rectangle(visu_frame.matrix, (0, 50), (550, 90), (127, 127, 127), -1)
-
- # Select color
- if edit_coord == 0:
- color_axis = (0, 0, 255)
-
- elif edit_coord == 1:
- color_axis = (0, 255, 0)
-
- elif edit_coord == 2:
- color_axis = (255, 0, 0)
-
- if edit_trans:
- cv.putText(visu_frame.matrix, f'Rotate marker {selected_marker_id} around axis {edit_coord + 1}', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, color_axis, 1, cv.LINE_AA)
- else:
- cv.putText(visu_frame.matrix, f'Translate marker {selected_marker_id} along axis {edit_coord + 1}', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, color_axis, 1, cv.LINE_AA)
-
- # Write documentation
- else:
- cv.rectangle(visu_frame.matrix, (0, 50), (650, 250), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, f'> Left click on marker: select scene', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'> T: translate, R: rotate', (20, 120), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'> Shift + 0/1/2: select axis', (20, 160), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'> Right click and drag: edit axis', (20, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'> Ctrl + S: save scene', (20, 240), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Reset left_click
- left_click = (0, 0)
-
- if args.window:
-
- key_pressed = cv.waitKey(1)
-
- #if key_pressed != -1:
- # print(key_pressed)
-
- # Select previous frame with left arrow
- if key_pressed == 2:
- frame_index -= 1
-
- # Select next frame with right arrow
- if key_pressed == 3:
- frame_index += 1
-
- # Clip frame index
- if frame_index < 0:
- frame_index = 0
-
- # Edit rotation with r key
- if key_pressed == 114:
- edit_trans = True
-
- # Edit translation with t key
- if key_pressed == 116:
- edit_trans = False
-
- # Select coordinate to edit with Shift + 0, 1 or 2
- if key_pressed == 49 or key_pressed == 50 or key_pressed == 51:
- edit_coord = key_pressed - 49
-
- # Save selected marker edition using 'Ctrl + s'
- if key_pressed == 19:
-
- if selected_marker_id >= 0 and aoi3D_scene_edit != None:
-
- aoi_scene_filepath = args.marker_id_scene[f'{selected_marker_id}']
- aoi_scene_edited.save(aoi_scene_filepath)
-
- print(f'Saving scene related to marker #{selected_marker_id} into {aoi_scene_filepath}')
-
- # Close window using 'Esc' key
- if key_pressed == 27:
- break
-
- # Reload detecter configuration on 'c' key
- if key_pressed == 99:
- load_configuration_file()
- force_update = True
-
- # Display video
- cv.imshow(f'Segment {tobii_segment.id} ArUco marker editor', visu_frame.matrix)
-
- # Wait 1 second
- time.sleep(1)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_arscene_export.py b/src/argaze/utils/tobii_segment_arscene_export.py
deleted file mode 100644
index b2cc0e0..0000000
--- a/src/argaze/utils/tobii_segment_arscene_export.py
+++ /dev/null
@@ -1,306 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os, json
-import math
-import threading
-
-from argaze import *
-from argaze.TobiiGlassesPro2 import *
-from argaze.ArUcoMarkers import *
-from argaze.AreaOfInterest import *
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-
-def main():
- """
- Detect ArUcoScene into Tobii Glasses Pro 2 camera video record.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-p', '--env_path', metavar='ENVIRONMENT_PATH', type=str, default=None, help='json argaze environment filepath')
- parser.add_argument('-b', '--borders', metavar='BORDERS', type=float, default=16.666, help='define left and right borders mask (%) to not detect aruco out of these borders')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs')
- args = parser.parse_args()
-
- if args.segment_path != None:
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- aoi_json_filepath = f'{destination_path}/aoi.json'
- aoi_csv_filepath = f'{destination_path}/aoi.csv'
- aoi_mp4_filepath = f'{destination_path}/aoi.mp4'
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Load a tobii segment data
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'\nLoaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Access to video timestamp data buffer
- tobii_ts_vts = tobii_segment_data['VideoTimeStamp']
-
- # Access to timestamped gaze position data buffer
- tobii_ts_gaze_positions = tobii_segment_data['GazePosition']
-
- # Format tobii gaze position in pixel
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazePositions projection:', suffix = 'Complete', length = 100)
-
- for ts, tobii_gaze_position in tobii_ts_gaze_positions.items():
-
- # Update Progress Bar
- progress = ts - int(args.time_range[0] * 1e6)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'GazePositions projection:', suffix = 'Complete', length = 100)
-
- # Test gaze position validity
- if tobii_gaze_position.validity == 0:
-
- gaze_position_px = (int(tobii_gaze_position.value[0] * tobii_segment_video.width), int(tobii_gaze_position.value[1] * tobii_segment_video.height))
- ts_gaze_positions[ts] = GazeFeatures.GazePosition(gaze_position_px)
-
- print('\n')
-
- if args.debug:
-
- # Prepare video exportation at the same format than segment video
- output_video = TobiiVideo.TobiiVideoOutput(aoi_mp4_filepath, tobii_segment_video.stream)
-
- # Load ArEnvironment
- ar_env = ArFeatures.ArEnvironment.from_json(args.env_path)
-
- if args.debug:
- print(ar_env)
-
- # Work with first scene only
- _, ar_scene = next(iter(ar_env.items()))
-
- # Create timestamped buffer to store AOIs and primary time stamp offset
- ts_offset_aois = DataStructures.TimeStampedBuffer()
-
- # Video and data replay loop
- try:
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration/1e3, prefix = 'ArUco detection & AOI projection:', suffix = 'Complete', length = 100)
-
- # Iterate on video frames
- for video_ts, video_frame in tobii_segment_video.frames():
-
- # This video frame is the reference until the next frame
- # Here next frame is at + 40ms (25 fps)
- # TODO: Get video fps to adapt
- next_video_ts = video_ts + 40000
-
- # Copy video frame to edit visualisation on it without disrupting aruco detection
- visu_frame = video_frame.copy()
-
- # Prepare to store projected AOI
- projected_aois = {}
-
- # Process video and data frame
- try:
-
- # Get nearest video timestamp
- _, nearest_vts = tobii_ts_vts.get_last_before(video_ts)
-
- projected_aois['offset'] = nearest_vts.offset
-
- # Hide frame left and right borders before detection to ignore markers outside focus area
- cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width*args.borders/100), int(video_frame.height)), (0, 0, 0), -1)
- cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - args.borders/100)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1)
-
- # Detect aruco markers into frame
- ar_env.aruco_detector.detect_markers(video_frame.matrix)
-
- # Estimate markers poses
- ar_env.aruco_detector.estimate_markers_pose()
-
- # Estimate scene pose from ArUco markers into frame.
- tvec, rmat, _ = ar_scene.estimate_pose(ar_env.aruco_detector.detected_markers)
-
- # Project AOI scene into frame according estimated pose
- aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV)
-
- # Store all projected aoi
- for aoi_name in aoi_scene_projection.keys():
-
- projected_aois[aoi_name] = numpy.rint(aoi_scene_projection[aoi_name]).astype(int)
-
- if args.debug:
-
- # Draw detected markers
- ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- # Draw AOI
- aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255))
-
- # Catch exceptions raised by estimate_pose and project methods
- except (ArFeatures.PoseEstimationFailed, ArFeatures.SceneProjectionFailed) as e:
-
- if str(e) == 'Unconsistent marker poses':
-
- projected_aois['error'] = str(e) + ': ' + str(e.unconsistencies)
-
- else:
-
- projected_aois['error'] = str(e)
-
- if args.debug:
-
- # Draw detected markers
- ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Raised when timestamped buffer is empty
- except KeyError as e:
-
- e = 'VideoTimeStamp missing'
-
- projected_aois['offset'] = 0
- projected_aois['error'] = e
-
- if args.debug:
-
- cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
-
- # Store projected AOI
- ts_offset_aois[video_ts] = projected_aois
-
- if args.debug:
- # Draw gaze positions until next frame
- try:
-
- # Get next gaze position
- ts_start, start_gaze_position = ts_gaze_positions.first
- ts_next, next_gaze_position = ts_gaze_positions.first
-
- # Check next gaze position is not after next frame time
- while ts_next < next_video_ts:
-
- ts_start, start_gaze_position = ts_gaze_positions.pop_first()
- ts_next, next_gaze_position = ts_gaze_positions.first
-
- # Draw start gaze
- start_gaze_position.draw(visu_frame.matrix)
-
- if start_gaze_position.valid and next_gaze_position.valid:
-
- # Draw movement from start to next
- cv.line(visu_frame.matrix, start_gaze_position, next_gaze_position, (0, 255, 255), 1)
-
- if start_gaze_position.valid:
-
- # Write last start gaze position
- cv.putText(visu_frame.matrix, str(start_gaze_position.value), start_gaze_position.value, cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Write last start gaze position timing
- cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (31, 31, 31), -1)
- cv.putText(visu_frame.matrix, f'Gaze time: {ts_start*1e-3:.3f} ms', (20, 85), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Empty gaze position
- except IndexError:
- pass
-
- # Draw focus area
- cv.rectangle(visu_frame.matrix, (int(video_frame.width*args.borders/100.), 0), (int(visu_frame.width*(1-args.borders/100)), int(visu_frame.height)), (255, 150, 150), 1)
-
- # Draw center
- cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1)
- cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1)
-
- # Write segment timing
- cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(visu_frame.matrix, f'Video time: {video_ts*1e-3:.3f} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- if args.debug:
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- # Display visualisation
- cv.imshow(f'Segment {tobii_segment.id} ArUco AOI', visu_frame.matrix)
-
- # Write video
- output_video.write(visu_frame.matrix)
-
- # Update Progress Bar
- progress = video_ts*1e-3 - int(args.time_range[0] * 1e3)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration*1e-3, prefix = 'ArUco detection & AOI projection:', suffix = 'Complete', length = 100)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- if args.debug:
-
- # Stop frame display
- cv.destroyAllWindows()
-
- # End output video file
- output_video.close()
-
- # Print aruco detection metrics
- print('\n\nAruco marker detection metrics')
- try_count, detected_counts = ar_env.aruco_detector.detection_metrics
-
- for marker_id, detected_count in detected_counts.items():
- print(f'\tMarkers {marker_id} has been detected in {detected_count} / {try_count} frames ({round(100 * detected_count / try_count, 2)} %)')
-
- # Export aruco aoi data
- ts_offset_aois.to_json(aoi_json_filepath)
- ts_offset_aois.as_dataframe().to_csv(aoi_csv_filepath)
- print(f'Aruco AOI data saved into {aoi_json_filepath} and {aoi_csv_filepath}')
-
- # Notify when the aruco aoi video has been exported
- print(f'Aruco AOI video saved into {aoi_mp4_filepath}')
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_data_plot_export.py b/src/argaze/utils/tobii_segment_data_plot_export.py
deleted file mode 100644
index 69f88e3..0000000
--- a/src/argaze/utils/tobii_segment_data_plot_export.py
+++ /dev/null
@@ -1,141 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import json
-
-from argaze import DataStructures
-from argaze import GazeFeatures
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo
-from argaze.utils import MiscFeatures
-
-import pandas
-import matplotlib.pyplot as mpyplot
-import matplotlib.patches as mpatches
-
-def main():
- """
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- parser.add_argument('-r', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- args = parser.parse_args()
-
- if args.segment_path != None:
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- data_plots_filepath = f'{destination_path}/data_plot.svg'
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'Video properties:\n\tduration: {tobii_segment_video.duration / 1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Load a tobii segment data
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'Loaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Edit figure
- figure_width = min( 4 * tobii_segment_video.duration / 1e6, 56) # maximal width to display: 56 inches at 144 dpi < 2^16 pixels
- data_sample = 8064 # 56 inches * 144 dpi = 8064 data can be displayed at max
- figure = mpyplot.figure(figsize=(figure_width, 35), dpi=144)
-
- # Plot pupil diameter data
- subplot = figure.add_subplot(711)
- subplot.set_title('Pupil diameter', loc='left')
- subplot.set_ylim(0, 10)
- patches = tobii_segment_data['PupilDiameter'].plot(names=['value'], colors=['#FFD800'], samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Annotate events
- df_ts_events = tobii_segment_data['Event'].as_dataframe()
-
- if len(df_ts_events) > 0:
-
- for ts, event_type, event_tag in zip(df_ts_events.index, df_ts_events.type, df_ts_events.tag):
- subplot.annotate(f'{event_type}\n{event_tag}', xy=(ts, 7), horizontalalignment="left", verticalalignment="top")
- subplot.vlines(ts, 0, 6, color="tab:red", linewidth=1)
-
- # Plot pupil center data
- subplot = figure.add_subplot(712)
- subplot.set_title('Pupil center', loc='left')
- subplot.set_ylim(-40, -20)
- patches = tobii_segment_data['PupilCenter'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot gaze position data
- subplot = figure.add_subplot(713)
- subplot.set_title('Gaze position', loc='left')
- subplot.set_ylim(0., 1.)
- patches = tobii_segment_data['GazePosition'].plot(names=['x','y'], colors=['#276FB6','#9427B6'], split={'value':['x','y']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot gaze direction data
- subplot = figure.add_subplot(714)
- subplot.set_title('Gaze direction', loc='left')
- patches = tobii_segment_data['GazeDirection'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot gaze direction data
- subplot = figure.add_subplot(715)
- subplot.set_title('Gaze position 3D', loc='left')
- patches = tobii_segment_data['GazePosition3D'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot accelerometer data
- subplot = figure.add_subplot(716)
- subplot.set_title('Accelerometer', loc='left')
- patches = tobii_segment_data['Accelerometer'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Plot accelerometer data
- subplot = figure.add_subplot(717)
- subplot.set_title('Gyroscope', loc='left')
- patches = tobii_segment_data['Gyroscope'].plot(names=['x','y','z'], colors=['#276FB6','#9427B6','#888888'], split={'value':['x','y','z']}, samples=data_sample)
- subplot.legend(handles=patches, loc='upper left')
-
- # Export figure
- mpyplot.tight_layout()
- mpyplot.savefig(data_plots_filepath)
- mpyplot.close('all')
-
- print(f'\nData plots saved into {data_plots_filepath}')
-
-if __name__ == '__main__':
-
- main()
diff --git a/src/argaze/utils/tobii_segment_display.py b/src/argaze/utils/tobii_segment_display.py
deleted file mode 100644
index 2e0dab5..0000000
--- a/src/argaze/utils/tobii_segment_display.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-
-from argaze import GazeFeatures
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiData, TobiiSpecifications
-from argaze.utils import MiscFeatures
-
-import numpy
-
-import cv2 as cv
-
-def main():
- """
- Display Tobii segment video and data
- """
-
- # manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path')
- parser.add_argument('-r', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- args = parser.parse_args()
-
- if args.segment_path != None:
-
- # Load a tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'Video properties:\n\tduration: {tobii_segment_video.duration / 1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Load a tobii segment data
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'Loaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Access to timestamped gaze position data buffer
- tobii_ts_gaze_positions = tobii_segment_data['GazePosition']
-
- # Access to timestamped gaze position 3d data buffer
- tobii_ts_gaze_positions_3d = tobii_segment_data['GazePosition3D']
-
- # Access to timestamped head rotations data buffer
- tobii_ts_head_rotations = tobii_segment_data['Gyroscope']
-
- # Access to timestamped events data buffer
- tobii_ts_events = tobii_segment_data['Event']
-
- # Video and data replay loop
- try:
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration / 1e3, prefix = 'Video progression:', suffix = 'Complete', length = 100)
-
- # Iterate on video frames
- for video_ts, video_frame in tobii_segment_video.frames():
-
- video_ts_ms = video_ts / 1e3
-
- try:
-
- # Get nearest head rotation before video timestamp and remove all head rotations before
- _, nearest_head_rotation = tobii_ts_head_rotations.pop_last_before(video_ts)
-
- # Calculate head movement considering only head yaw and pitch
- head_movement = numpy.array(nearest_head_rotation.value)
- head_movement_px = head_movement.astype(int)
- head_movement_norm = numpy.linalg.norm(head_movement[0:2])
-
- # Draw movement vector
- cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_movement_px[1], int(video_frame.height/2) - head_movement_px[0]), (150, 150, 150), 3)
-
- # Wait for head rotation
- except KeyError:
- pass
-
- try:
-
- # Get nearest gaze position before video timestamp and remove all gaze positions before
- _, nearest_gaze_position = tobii_ts_gaze_positions.pop_last_before(video_ts)
-
- # Ignore frame when gaze position is not valid
- if nearest_gaze_position.validity == 0:
-
- gaze_position_pixel = GazeFeatures.GazePosition( (int(nearest_gaze_position.value[0] * video_frame.width), int(nearest_gaze_position.value[1] * video_frame.height)) )
-
- # Get nearest gaze position 3D before video timestamp and remove all gaze positions before
- _, nearest_gaze_position_3d = tobii_ts_gaze_positions_3d.pop_last_before(video_ts)
-
- # Ignore frame when gaze position 3D is not valid
- if nearest_gaze_position_3d.validity == 0:
-
- gaze_precision_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.PRECISION)) * nearest_gaze_position_3d.value[2]
- tobii_camera_hfov_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV / 2)) * nearest_gaze_position_3d.value[2]
-
- gaze_position_pixel.precision = round(video_frame.width * float(gaze_precision_mm) / float(tobii_camera_hfov_mm))
-
- # Draw gaze
- gaze_position_pixel.draw(video_frame.matrix)
-
- # Wait for gaze position
- except KeyError:
- pass
-
- try:
-
- # Get nearest event before video timestamp and remove all gaze positions before
- nearest_event_ts, nearest_event = tobii_ts_events.pop_last_before(video_ts)
-
- #print(nearest_event_ts / 1e3, nearest_event)
-
- # Write events
- cv.rectangle(video_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(video_frame.matrix, str(nearest_event), (20, 140), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Wait for events
- except KeyError:
- pass
-
- # Draw center
- cv.line(video_frame.matrix, (int(video_frame.width/2) - 50, int(video_frame.height/2)), (int(video_frame.width/2) + 50, int(video_frame.height/2)), (255, 150, 150), 1)
- cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2) - 50), (int(video_frame.width/2), int(video_frame.height/2) + 50), (255, 150, 150), 1)
-
- # Write segment timing
- cv.rectangle(video_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(video_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- cv.imshow(f'Segment {tobii_segment.id} video', video_frame.matrix)
-
- # Update Progress Bar
- progress = video_ts_ms - int(args.time_range[0] * 1e3)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration / 1e3, prefix = 'Video progression:', suffix = 'Complete', length = 100)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_gaze_metrics_export.py b/src/argaze/utils/tobii_segment_gaze_metrics_export.py
deleted file mode 100644
index 1e530e0..0000000
--- a/src/argaze/utils/tobii_segment_gaze_metrics_export.py
+++ /dev/null
@@ -1,242 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import math
-
-from argaze import DataStructures, GazeFeatures
-from argaze.AreaOfInterest import AOIFeatures
-from argaze.GazeAnalysis import DispersionBasedGazeMovementIdentifier
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-import pandas
-
-def main():
- """
- Analyse fixations and saccades
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='path to a tobii segment folder', required=True)
- parser.add_argument('-a', '--aoi', metavar='AOI_NAME', type=str, default=None, help='aoi name where to project gaze', required=True)
- parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-p', '--period', metavar=('PERIOD_TIME'), type=float, default=10, help='period of time (in second)')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- args = parser.parse_args()
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}/{args.aoi}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- positions_json_filepath = f'{destination_path}/gaze_positions.json'
-
- fixations_json_filepath = f'{destination_path}/gaze_fixations.json'
- saccades_json_filepath = f'{destination_path}/gaze_saccades.json'
- gaze_status_json_filepath = f'{destination_path}/gaze_status.json'
-
- gaze_metrics_period_filepath = f'{destination_path}/gaze_metrics_{int(args.period)}s.csv'
- gaze_metrics_whole_filepath = f'{destination_path}/gaze_metrics.csv'
-
- # Load gaze positions
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions.from_json(positions_json_filepath)
-
- # Load gaze movements
- ts_fixations = GazeFeatures.TimeStampedGazeMovements.from_json(fixations_json_filepath)
- ts_saccades = GazeFeatures.TimeStampedGazeMovements.from_json(saccades_json_filepath)
- ts_status = GazeFeatures.TimeStampedGazeStatus.from_json(gaze_status_json_filepath)
-
- print(f'\nLoaded gaze movements count:')
- print(f'\tFixations: {len(ts_fixations)}')
- print(f'\tSaccades: {len(ts_saccades)}')
-
- # Load tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Get participant name
- participant_name = TobiiEntities.TobiiParticipant(f'{args.segment_path}/../../').name
-
- print(f'\nParticipant: {participant_name}')
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration * 1e-6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Prepare gaze metrics
- ts_metrics = DataStructures.TimeStampedBuffer()
-
- positions_exist = len(ts_gaze_positions) > 0
- fixations_exist = len(ts_fixations) > 0
- saccades_exist = len(ts_saccades) > 0
- status_exist = len(ts_status) > 0
-
- if positions_exist:
-
- # Create pandas dataframe
- positions_dataframe = ts_gaze_positions.as_dataframe()
-
- # Reset time range offset
- positions_dataframe.index = positions_dataframe.index - positions_dataframe.index[0]
-
- if fixations_exist:
-
- # Create pandas dataframe
- fixations_dataframe = ts_fixations.as_dataframe()
-
- # Reset time range offset
- fixations_dataframe.index = fixations_dataframe.index - fixations_dataframe.index[0]
-
- # Add 'end' column
- fixations_dataframe['end'] = fixations_dataframe.index + fixations_dataframe.duration
-
- if saccades_exist:
-
- # Create pandas dataframe
- saccades_dataframe = ts_saccades.as_dataframe()
-
- # Reset time range offset
- saccades_dataframe.index = saccades_dataframe.index - saccades_dataframe.index[0]
-
- # Add 'end' column
- saccades_dataframe['end'] = saccades_dataframe.index + saccades_dataframe.duration
-
- # Define a function to export metrics for a period of time
- def metrics_for_period(period_start_ts, period_end_ts):
-
- period_duration = period_end_ts - period_start_ts
- period_metrics = {}
-
- #print(f'\n*** Anaysing period n°{i} [{period_start_ts * 1e-6:.3f}s, {period_end_ts * 1e-6:.3f}s]')
-
- # Store period duration
- period_metrics['duration (ms)'] = period_duration * 1e-3
-
- # Default positions analysis
- period_metrics['positions_number'] = 0
- period_metrics['positions_valid_ratio (%)'] = None
-
- # Analyse fixations
- if positions_exist:
-
- # Select period
- positions_period_dataframe = positions_dataframe[(positions_dataframe.index >= period_start_ts) & (positions_dataframe.index < period_end_ts)]
-
- if not positions_period_dataframe.empty:
-
- #print('\n* Positions:\n', positions_period_dataframe)
-
- period_metrics['positions_number'] = positions_period_dataframe.shape[0]
- period_metrics['positions_valid_ratio (%)'] = positions_period_dataframe.precision.count() / positions_period_dataframe.shape[0] * 100
-
- # Default fixation analysis
- fixations_duration_sum = 0.0
- period_metrics['fixations_number'] = 0
- period_metrics['fixations_duration_mean (ms)'] = None
- period_metrics['fixations_duration_sum (ms)'] = None
- period_metrics['fixations_duration_ratio (%)'] = None
- period_metrics['fixations_deviation_mean (px)'] = None
-
- # Analyse fixations
- if fixations_exist:
-
- # Select period
- fixations_period_dataframe = fixations_dataframe[(fixations_dataframe.index >= period_start_ts) & (fixations_dataframe.end < period_end_ts)]
-
- if not fixations_period_dataframe.empty:
-
- #print('\n* Fixations:\n', fixations_period_dataframe)
-
- fixations_duration_sum = fixations_period_dataframe.duration.sum()
- period_metrics['fixations_number'] = fixations_period_dataframe.shape[0]
- period_metrics['fixations_duration_mean (ms)'] = fixations_period_dataframe.duration.mean() * 1e-3
- period_metrics['fixations_duration_sum (ms)'] = fixations_duration_sum * 1e-3
- period_metrics['fixations_duration_ratio (%)'] = fixations_duration_sum / period_duration * 100
- period_metrics['fixations_deviation_mean (px)'] = fixations_period_dataframe.deviation_max.mean()
-
- # Default saccades analysis
- saccades_duration_sum = 0.0
- period_metrics['saccades_number'] = 0
- period_metrics['saccades_duration_mean (ms)'] = None
- period_metrics['saccades_duration_sum (ms)'] = None
- period_metrics['saccades_duration_ratio (%)'] = None
- period_metrics['saccades_distance_mean (px)'] = None
-
- # Analyse saccades
- if saccades_exist:
-
- # Select period
- saccades_period_dataframe = saccades_dataframe[(saccades_dataframe.index >= period_start_ts) & (saccades_dataframe.end < period_end_ts)]
-
- if not saccades_period_dataframe.empty:
-
- #print('\n* Saccades:\n', saccades_period_dataframe)
-
- saccades_duration_sum = saccades_period_dataframe.duration.sum()
- period_metrics['saccades_number'] = saccades_period_dataframe.shape[0]
- period_metrics['saccades_duration_mean (ms)'] = saccades_period_dataframe.duration.mean() * 1e-3
- period_metrics['saccades_duration_sum (ms)'] = saccades_duration_sum * 1e-3
- period_metrics['saccades_duration_ratio (%)'] = saccades_duration_sum / period_duration * 100
- period_metrics['saccades_distance_mean (px)'] = saccades_period_dataframe.distance.mean()
-
- # Analyse exploit/explore
- if saccades_duration_sum != 0.0:
-
- period_metrics['exploit_explore_ratio'] = fixations_duration_sum / saccades_duration_sum
-
- else:
-
- period_metrics['exploit_explore_ratio'] = None
-
- # Append period metrics
- ts_metrics[int(period_start_ts * 1e-3)] = period_metrics
-
- # Metrics for each period
- for i in range(0, int(tobii_segment_video.duration/(args.period * 1e6))):
-
- period_start_ts = i*(args.period * 1e6)
- period_end_ts = (i+1)*(args.period * 1e6)
-
- metrics_for_period(period_start_ts, period_end_ts)
-
- metrics_dataframe = ts_metrics.as_dataframe() #pandas.DataFrame(metrics, index=[participant_name])
- metrics_dataframe.to_csv(gaze_metrics_period_filepath, index=True)
- print(f'\nGaze metrics per period of time saved into {gaze_metrics_period_filepath}')
-
- # Metrics for the whole session
- ts_metrics = DataStructures.TimeStampedBuffer()
- metrics_for_period(0, tobii_segment_video.duration)
-
- metrics_dataframe = ts_metrics.as_dataframe() #pandas.DataFrame(metrics, index=[participant_name])
- metrics_dataframe.to_csv(gaze_metrics_whole_filepath, index=True)
- print(f'\nGaze metrics for whole segment saved into {gaze_metrics_whole_filepath}\n')
-
-if __name__ == '__main__':
-
- main()
diff --git a/src/argaze/utils/tobii_segment_gaze_movements_export.py b/src/argaze/utils/tobii_segment_gaze_movements_export.py
deleted file mode 100644
index 85fe74b..0000000
--- a/src/argaze/utils/tobii_segment_gaze_movements_export.py
+++ /dev/null
@@ -1,570 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os
-import math
-
-from argaze import DataStructures, GazeFeatures
-from argaze.AreaOfInterest import AOIFeatures
-from argaze.GazeAnalysis import DispersionBasedGazeMovementIdentifier
-from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-import pandas
-
-def main():
- """
- Project gaze positions into an AOI and identify particular gaze movements like fixations and saccades
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='path to a tobii segment folder', required=True)
- parser.add_argument('-a', '--aoi', metavar='AOI_NAME', type=str, default=None, help='aoi name where to project gaze', required=True)
- parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
- parser.add_argument('-dev', '--deviation_max_threshold', metavar='DISPERSION_THRESHOLD', type=int, default=None, help='maximal distance for fixation identification in pixel')
- parser.add_argument('-dmin', '--duration_min_threshold', metavar='DURATION_MIN_THRESHOLD', type=int, default=200, help='minimal duration for fixation identification in millisecond')
- parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
- parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs')
- args = parser.parse_args()
-
- # Manage destination path
- destination_path = '.'
- if args.output != None:
-
- if not os.path.exists(os.path.dirname(args.output)):
-
- os.makedirs(os.path.dirname(args.output))
- print(f'{os.path.dirname(args.output)} folder created')
-
- destination_path = args.output
-
- else:
-
- destination_path = args.segment_path
-
- # Export into a dedicated time range folder
- if args.time_range[1] != None:
- timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]'
- else:
- timerange_path = f'[all]'
-
- destination_path = f'{destination_path}/{timerange_path}/{args.aoi}'
-
- if not os.path.exists(destination_path):
-
- os.makedirs(destination_path)
- print(f'{destination_path} folder created')
-
- aoi_filepath = f'{destination_path}/../aoi.json'
-
- positions_json_filepath = f'{destination_path}/gaze_positions.json'
-
- fixations_json_filepath = f'{destination_path}/gaze_fixations.json'
- saccades_json_filepath = f'{destination_path}/gaze_saccades.json'
- movements_json_filepath = f'{destination_path}/gaze_movements.json'
- gaze_status_json_filepath = f'{destination_path}/gaze_status.json'
-
- gaze_status_video_filepath = f'{destination_path}/gaze_status.mp4'
- gaze_status_image_filepath = f'{destination_path}/gaze_status.png'
-
- # Load aoi scene projection
- ts_aois_projections = DataStructures.TimeStampedBuffer.from_json(aoi_filepath)
-
- print(f'\nAOI frames: ', len(ts_aois_projections))
- aoi_names = ts_aois_projections.as_dataframe().drop(['offset','error'], axis=1).columns
- for aoi_name in aoi_names:
- print(f'\t{aoi_name}')
-
- # Load tobii segment
- tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
-
- # Get participant name
- participant_name = TobiiEntities.TobiiParticipant(f'{args.segment_path}/../../').name
-
- print(f'\nParticipant: {participant_name}')
-
- # Load a tobii segment video
- tobii_segment_video = tobii_segment.load_video()
- print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px')
-
- # Check that gaze positions have already been exported to not process them again
- if os.path.exists(positions_json_filepath):
-
- # Load gaze positions
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions.from_json(positions_json_filepath)
-
- print(f'\nLoaded gaze positions count:')
- print(f'\tPositions: {len(ts_gaze_positions)}')
-
- invalid_gaze_position_count = 0
- inner_precisions_px = []
-
- for ts, gaze_position in ts_gaze_positions.items():
-
- if not gaze_position.valid:
-
- invalid_gaze_position_count += 1
-
- else:
-
- inner_precisions_px.append(gaze_position.precision)
-
- print(f'\tInvalid positions: {invalid_gaze_position_count}/{len(ts_gaze_positions)} ({100*invalid_gaze_position_count/len(ts_gaze_positions):.2f} %)')
-
- inner_precision_px_mean = round(numpy.mean(inner_precisions_px))
- print(f'\tMean of projected precisions: {inner_precision_px_mean} px')
-
- # Project gaze positions into the selected AOI
- else:
-
- # Load a tobii segment data
- tobii_segment_data = tobii_segment.load_data()
-
- print(f'\nLoaded data count:')
- for name in tobii_segment_data.keys():
- print(f'\t{name}: {len(tobii_segment_data[name])} data')
-
- # Access to timestamped gaze position data buffer
- tobii_ts_gaze_positions = tobii_segment_data['GazePosition']
-
- # Access to timestamped gaze 3D positions data buffer
- tobii_ts_gaze_positions_3d = tobii_segment_data['GazePosition3D']
-
- # Format tobii gaze position and precision in pixel and project it in aoi scene
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # Gaze projection metrics
- ts_projection_metrics = DataStructures.TimeStampedBuffer()
- invalid_gaze_position_count = 0
- inner_precisions_px = []
-
- # Starting with no AOI projection
- ts_current_aoi = 0
- current_aoi = AOIFeatures.AreaOfInterest()
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazePositions projection:', suffix = 'Complete', length = 100)
-
- for ts, tobii_gaze_position in tobii_ts_gaze_positions.items():
-
- # Update Progress Bar
- progress = ts - int(args.time_range[0] * 1e6)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'GazePositions projection:', suffix = 'Complete', length = 100)
-
- # Edit default aoi error
- current_aoi_error = 'No available AOI projection'
-
- try:
-
- # Get the last aoi projection until the current gaze position timestamp
- ts_current_aois, current_aois = ts_aois_projections.pop_last_until(ts)
-
- assert(ts_current_aois <= ts)
-
- # Catch aoi error to not update current aoi
- if 'error' in current_aois.keys():
-
- # Remove extra error info after ':'
- current_aoi_error = current_aois.pop('error').split(':')[0]
-
- # Or update current aoi
- elif args.aoi in current_aois.keys():
-
- ts_current_aoi = ts_current_aois
- current_aoi = AOIFeatures.AreaOfInterest(current_aois.pop(args.aoi))
-
- current_aoi_error = ''
-
- # No aoi projection at the beginning
- except KeyError as e:
- pass
-
- # Wait for available aoi
- if current_aoi.empty:
-
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(current_aoi_error)
- invalid_gaze_position_count += 1
- continue
-
- # QUESTION: What todo if the current aoi is too old ?
- # if the aoi didn't move it is not a problem...
- # For the moment, we avoid 1s old aoi and we provide a metric to assess the problem
- ts_difference = ts - ts_current_aoi
-
- # If aoi is not updated after the
- if ts_difference >= args.duration_min_threshold*1e3:
-
- current_aoi = AOIFeatures.AreaOfInterest()
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition('AOI projection is too old (> 1s)')
- invalid_gaze_position_count += 1
- continue
-
- ts_projection_metrics[ts] = {'frame': ts_current_aois, 'age': ts_difference}
-
- # Test gaze position validity
- if tobii_gaze_position.validity == 0:
-
- gaze_position_px = (int(tobii_gaze_position.value[0] * tobii_segment_video.width), int(tobii_gaze_position.value[1] * tobii_segment_video.height))
-
- # Get gaze position 3D at same gaze position timestamp
- tobii_gaze_position_3d = tobii_ts_gaze_positions_3d.pop(ts)
-
- # Test gaze position 3d validity
- if tobii_gaze_position_3d.validity == 0:
-
- gaze_precision_mm = numpy.sin(numpy.deg2rad(TobiiSpecifications.PRECISION)) * tobii_gaze_position_3d.value[2]
- tobii_camera_hfov_mm = numpy.sin(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV)) * tobii_gaze_position_3d.value[2]
-
- gaze_precision_px = round(tobii_segment_video.width * float(gaze_precision_mm) / float(tobii_camera_hfov_mm))
-
- # Edit gaze position
- gaze_position = GazeFeatures.GazePosition(gaze_position_px)
-
- # Project gaze position into selected aois
- if current_aoi.contains_point(gaze_position.value):
-
- inner_x, inner_y = current_aoi.inner_axis(gaze_position.value)
- inner_precision_px = gaze_precision_px * tobii_segment_video.width * tobii_segment_video.height / current_aoi.area
-
- # Store inner precision for metrics
- inner_precisions_px.append(inner_precision_px)
-
- # Store inner gaze position for further movement processing
- # TEMP: 1920x1080 are Screen_Plan dimensions
- ts_gaze_positions[ts] = GazeFeatures.GazePosition((round(inner_x*1920), round((1.0 - inner_y)*1080)), precision=inner_precision_px)
-
- else:
-
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(f'GazePosition not inside {args.aoi}')
- invalid_gaze_position_count += 1
-
- else:
-
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(f'Invalid Tobii GazePosition3D')
- invalid_gaze_position_count += 1
-
- else:
-
- ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition(f'Invalid Tobii GazePosition')
- invalid_gaze_position_count += 1
-
- print(f'\nGazePositions projection metrics:')
-
- print(f'\tInvalid positions: {invalid_gaze_position_count}/{len(tobii_ts_gaze_positions)} ({100*invalid_gaze_position_count/len(tobii_ts_gaze_positions):.2f} %)')
-
- if len(ts_projection_metrics):
-
- projection_metrics_dataframe = ts_projection_metrics.as_dataframe()
- print(f'\tAOI age mean: {projection_metrics_dataframe.age.mean() * 1e-3:.3f} ms')
- print(f'\tAOI age max: {projection_metrics_dataframe.age.max() * 1e-3:.3f} ms')
-
- inner_precision_px_mean = round(numpy.mean(inner_precisions_px))
- print(f'\tMean of projected precisions: {inner_precision_px_mean} px')
-
- else:
-
- print(print(f'\t no AOI projected'))
-
- ts_gaze_positions.to_json(positions_json_filepath)
- print(f'\nProjected gaze positions saved into {positions_json_filepath}')
-
- print(f'\nGazeMovement identifier setup:')
-
- if args.deviation_max_threshold == None:
-
- selected_deviation_max_threshold = inner_precision_px_mean
- print(f'\tDispersion threshold: {selected_deviation_max_threshold} px (equal to mean of projected precisions)')
-
- else:
-
- selected_deviation_max_threshold = args.deviation_max_threshold
- print(f'\tDispersion threshold: {selected_deviation_max_threshold} px')
-
- print(f'\tDuration threshold: {args.duration_min_threshold} ms')
-
- movement_identifier = DispersionBasedGazeMovementIdentifier.GazeMovementIdentifier(selected_deviation_max_threshold, args.duration_min_threshold*1e3)
-
- # Start movement identification
- ts_fixations = GazeFeatures.TimeStampedGazeMovements()
- ts_saccades = GazeFeatures.TimeStampedGazeMovements()
- ts_status = GazeFeatures.TimeStampedGazeStatus()
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazeMovements identification:', suffix = 'Complete', length = 100)
-
- for ts, gaze_position in ts_gaze_positions.items():
-
- gaze_movement = movement_identifier.identify(ts, gaze_position)
-
- if isinstance(gaze_movement, DispersionBasedGazeMovementIdentifier.Fixation):
-
- start_ts, start_position = gaze_movement.positions.first
-
- ts_fixations[start_ts] = gaze_movement
-
- for ts, position in gaze_movement.positions.items():
-
- ts_status[ts] = GazeFeatures.GazeStatus.from_position(position, 'Fixation', len(ts_fixations))
-
- elif isinstance(gaze_movement, DispersionBasedGazeMovementIdentifier.Saccade):
-
- start_ts, start_position = gaze_movement.positions.first
-
- ts_saccades[start_ts] = gaze_movement
-
- for ts, position in gaze_movement.positions.items():
-
- ts_status[ts] = GazeFeatures.GazeStatus.from_position(position, 'Saccade', len(ts_saccades))
-
- # Update Progress Bar
- progress = ts - int(args.time_range[0] * 1e6)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze movements identification:', suffix = 'Complete', length = 100)
-
- print(f'\nGazeMovements identification metrics:')
- print(f'\t{len(ts_fixations)} fixations found')
- print(f'\t{len(ts_saccades)} saccades found')
-
- ts_fixations.to_json(fixations_json_filepath)
- print(f'\nGaze fixations saved into {fixations_json_filepath}')
-
- ts_saccades.to_json(saccades_json_filepath)
- print(f'Gaze saccades saved into {saccades_json_filepath}')
-
- ts_status.to_json(gaze_status_json_filepath)
- print(f'Gaze status saved into {gaze_status_json_filepath}')
-
- # DEBUG
- ts_status.as_dataframe().to_csv(f'{destination_path}/gaze_status.csv')
-
- # Edit data visualisation
- if args.debug:
-
- # Prepare video exportation at the same format than segment video
- output_video = TobiiVideo.TobiiVideoOutput(gaze_status_video_filepath, tobii_segment_video.stream)
-
- # Reload aoi scene projection
- ts_aois_projections = DataStructures.TimeStampedBuffer.from_json(aoi_filepath)
-
- # Prepare gaze satus image
- gaze_status_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8)
-
- # Video loop
- try:
-
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGaze status video processing:', suffix = 'Complete', length = 100)
-
- fixations_exist = len(ts_fixations) > 0
- saccades_exist = len(ts_saccades) > 0
- status_exist = len(ts_status) > 0
-
- if fixations_exist:
- current_fixation_ts, current_fixation = ts_fixations.pop_first()
- current_fixation_time_counter = 0
-
- if saccades_exist:
- current_saccade_ts, current_saccade = ts_saccades.pop_first()
-
- # Iterate on video frames
- for video_ts, video_frame in tobii_segment_video.frames():
-
- # This video frame is the reference until the next frame
- # Here next frame is at + 40ms (25 fps)
- # TODO: Get video fps to adapt
- next_video_ts = video_ts + 40000
-
- visu_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8)
-
- try:
-
- # Get current aoi projection at video frame time
- ts_current_aois, current_aois = ts_aois_projections.pop_first()
-
- assert(ts_current_aois == video_ts)
-
- # Catch aoi error to not update current aoi
- if 'error' in current_aois.keys():
-
- # Display error (remove extra info after ':')
- current_aoi_error = current_aois.pop('error').split(':')[0]
-
- # Select color error
- if current_aoi_error == 'VideoTimeStamp missing':
- color_error = (0, 0, 255)
- else:
- color_error = (0, 255, 255)
-
- cv.rectangle(visu_matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_matrix, current_aoi_error, (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA)
-
- # Or update current aoi
- elif args.aoi in current_aois.keys():
-
- ts_current_aoi = ts_current_aois
- current_aoi = AOIFeatures.AreaOfInterest(current_aois.pop(args.aoi))
-
- # Apply perspective transform algorithm
- destination = numpy.float32([[0, 1080],[1920, 1080],[1920, 0],[0, 0]])
- aoi_matrix = cv.getPerspectiveTransform(current_aoi.astype(numpy.float32), destination)
- visu_matrix = cv.warpPerspective(video_frame.matrix, aoi_matrix, (1920, 1080))
-
- # Wait for aois projection
- except KeyError:
- pass
-
- if fixations_exist:
-
- # Check next fixation
- if video_ts >= current_fixation_ts + current_fixation.duration and len(ts_fixations) > 0:
-
- current_fixation_ts, current_fixation = ts_fixations.pop_first()
- current_fixation_time_counter = 0
-
- # While current time belongs to the current fixation
- if video_ts >= current_fixation_ts and video_ts < current_fixation_ts + current_fixation.duration:
-
- current_fixation_time_counter += 1
-
- # Draw current fixation
- cv.circle(visu_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 255, 0), current_fixation_time_counter)
- cv.circle(gaze_status_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 155, 0))
-
- if saccades_exist:
-
- # Check next saccade
- if video_ts >= current_saccade_ts + current_saccade.duration and len(ts_saccades) > 0:
-
- current_saccade_ts, current_saccade = ts_saccades.pop_first()
-
- # While current time belongs to the current saccade
- if video_ts >= current_saccade_ts and video_ts < current_saccade_ts + current_saccade.duration:
- pass
-
- # Draw gaze status until next frame
- try:
-
- # Get next gaze status
- ts_start, start_gaze_status = ts_status.first
- ts_next, next_gaze_status = ts_status.first
-
- # Check next gaze status is not after next frame time
- while ts_next < next_video_ts:
-
- ts_start, start_gaze_status = ts_status.pop_first()
- ts_next, next_gaze_status = ts_status.first
-
- # Draw movement type
- if start_gaze_status.valid and next_gaze_status.valid \
- and start_gaze_status.movement_index == next_gaze_status.movement_index \
- and start_gaze_status.movement_type == next_gaze_status.movement_type:
-
- if next_gaze_status.movement_type == 'Fixation':
- movement_color = (0, 255, 0)
- elif next_gaze_status.movement_type == 'Saccade':
- movement_color = (0, 0, 255)
- else:
- movement_color = (255, 0, 0)
-
- cv.line(visu_matrix, start_gaze_status, next_gaze_status, movement_color, 3)
- cv.line(gaze_status_matrix, start_gaze_status, next_gaze_status, movement_color, 3)
-
- # Empty gaze position
- except IndexError:
- pass
-
- # Draw gaze positions until next frame
- try:
-
- # Get next gaze position
- ts_start, start_gaze_position = ts_gaze_positions.first
- ts_next, next_gaze_position = ts_gaze_positions.first
-
- # Gaze position count
- gaze_position_count = 0
-
- # Check next gaze position is not after next frame time
- while ts_next < next_video_ts:
-
- ts_start, start_gaze_position = ts_gaze_positions.pop_first()
- ts_next, next_gaze_position = ts_gaze_positions.first
-
- if not start_gaze_position.valid:
-
- # Select color error
- if start_gaze_position.message == 'VideoTimeStamp missing':
- color_error = (0, 0, 255)
- else:
- color_error = (0, 255, 255)
-
- # Write unvalid error message
- cv.putText(visu_matrix, f'{ts_start*1e-3:.3f} ms: {start_gaze_position.message}', (20, 1060 - (gaze_position_count)*50), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA)
-
- # Draw start gaze
- start_gaze_position.draw(visu_matrix, draw_precision=False)
- start_gaze_position.draw(gaze_status_matrix, draw_precision=False)
-
- if start_gaze_position.valid and next_gaze_position.valid:
-
- # Draw movement from start to next
- cv.line(visu_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1)
- cv.line(gaze_status_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1)
-
- gaze_position_count += 1
-
- if start_gaze_position.valid:
-
- # Write last start gaze position
- cv.putText(visu_matrix, str(start_gaze_position.value), start_gaze_position.value, cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Write last start gaze position timing
- cv.rectangle(visu_matrix, (0, 50), (550, 100), (31, 31, 31), -1)
- cv.putText(visu_matrix, f'Gaze time: {ts_start*1e-3:.3f} ms', (20, 85), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Empty gaze position
- except IndexError:
- pass
-
- # Write segment timing
- cv.rectangle(visu_matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(visu_matrix, f'Video time: {video_ts*1e-3:.3f} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Write movement identification parameters
- cv.rectangle(visu_matrix, (0, 150), (550, 310), (63, 63, 63), -1)
- cv.putText(visu_matrix, f'Deviation max: {selected_deviation_max_threshold} px', (20, 210), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_matrix, f'Duration min: {args.duration_min_threshold} ms', (20, 270), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Draw dispersion threshold circle
- cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), 2, (0, 255, 255), -1)
- cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), selected_deviation_max_threshold, (255, 150, 150), 1)
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- # Display video
- cv.imshow(f'Segment {tobii_segment.id} movements', visu_matrix)
-
- # Write video
- output_video.write(visu_matrix)
-
- # Update Progress Bar
- progress = video_ts - int(args.time_range[0] * 1e6)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze status video processing:', suffix = 'Complete', length = 100)
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Saving gaze status image
- cv.imwrite(gaze_status_image_filepath, gaze_status_matrix)
-
- # End output video file
- output_video.close()
- print(f'\nGaze status video saved into {gaze_status_video_filepath}\n')
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_segment_record.py b/src/argaze/utils/tobii_segment_record.py
deleted file mode 100644
index 25066c4..0000000
--- a/src/argaze/utils/tobii_segment_record.py
+++ /dev/null
@@ -1,96 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import threading
-import time
-import random
-
-from argaze.TobiiGlassesPro2 import TobiiController
-from argaze.utils import MiscFeatures
-
-def main():
- """
- Record a Tobii Glasses Pro 2 session on Tobii interface's SD Card
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default='192.168.1.10', help='tobii glasses ip')
- parser.add_argument('-p', '--project_name', metavar='PROJECT_NAME', type=str, default=TobiiController.DEFAULT_PROJECT_NAME, help='project name')
- parser.add_argument('-u', '--participant_name', metavar='PARTICIPANT_NAME', type=str, default=TobiiController.DEFAULT_PARTICIPANT_NAME, help='participant name')
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(ip_address = args.tobii_ip, project_name = args.project_name, participant_name = args.participant_name)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Setup camera at 25 fps to work on Full HD video stream
- tobii_controller.set_scene_camera_freq_25()
-
- # Print current confirugration
- print(f'Tobii Glasses Pro 2 configuration:')
- for key, value in tobii_controller.get_configuration().items():
- print(f'\t{key}: {value}')
-
- # Calibrate tobii glasses
- tobii_controller.calibrate()
-
- # Create recording
- recording_id = tobii_controller.create_recording(args.participant_name)
-
- # Start recording
- tobii_controller.start_recording(recording_id)
- print(f'Recording {recording_id} started')
-
- # Define loop
- last_battery_level = 0
- time_count = 0
-
- exit = MiscFeatures.ExitSignalHandler()
- print('Waiting for Ctrl+C to quit...\n')
-
- while not exit.status():
-
- # Print storage info each minutes
- if time_count % 60 == 0:
-
- print(tobii_controller.get_storage_info())
-
- # print battery level each time it changes
- # send it as experimental variable
- battery_level = tobii_controller.get_battery_level()
- if battery_level != last_battery_level:
-
- print(tobii_controller.get_battery_info())
-
- tobii_controller.send_variable('battery', battery_level)
-
- last_battery_level = battery_level
-
- # send random event each 3 - 10 seconds
- if time_count % random.randint(3, 10) == 0:
-
- print('Send random event')
-
- tobii_controller.send_event('random')
-
- # Sleep 1 second
- time.sleep(1)
- time_count += 1
-
- # Stop recording
- tobii_controller.stop_recording(recording_id)
- print(f'Recording {recording_id} stopped')
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_stream_arscene_display.py b/src/argaze/utils/tobii_stream_arscene_display.py
deleted file mode 100644
index e7a3bfb..0000000
--- a/src/argaze/utils/tobii_stream_arscene_display.py
+++ /dev/null
@@ -1,154 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-import os, json
-
-from argaze import *
-from argaze.TobiiGlassesPro2 import *
-from argaze.ArUcoMarkers import *
-from argaze.AreaOfInterest import *
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-
-def main():
- """
- Detect ArUcoScene into Tobii Glasses Pro 2 camera video stream.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
- parser.add_argument('-p', '--env_path', metavar='ENVIRONMENT_PATH', type=str, default=None, help='json argaze environment filepath')
- parser.add_argument('-b', '--borders', metavar='BORDERS', type=float, default=16.666, help='define left and right borders mask (%) to not detect aruco out of these borders')
- parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs')
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(args.tobii_ip)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Setup camera at 25 fps to work on Full HD video stream
- tobii_controller.set_scene_camera_freq_25()
-
- # Print current confirugration
- print(f'Tobii Glasses Pro 2 configuration:')
- for key, value in tobii_controller.get_configuration().items():
- print(f'\t{key}: {value}')
-
- # Enable tobii data stream
- tobii_data_stream = tobii_controller.enable_data_stream()
-
- # Enable tobii video stream
- tobii_video_stream = tobii_controller.enable_video_stream()
-
- # Load ArEnvironment
- ar_env = ArFeatures.ArEnvironment.from_json(args.env_path)
-
- if args.debug:
- print(ar_env)
-
- # Work with first scene only
- _, ar_scene = next(iter(ar_env.items()))
-
- # Start streaming
- tobii_controller.start_streaming()
-
- # Live video stream capture loop
- try:
-
- # Assess loop performance
- loop_chrono = MiscFeatures.TimeProbe()
- fps = 0
-
- while tobii_video_stream.is_alive():
-
- # Read video stream
- video_ts, video_frame = tobii_video_stream.read()
-
- # Copy video frame to edit visualisation on it without disrupting aruco detection
- visu_frame = video_frame.copy()
-
- # Hide frame left and right borders before detection to ignore markers outside focus area
- cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width*args.borders/100), int(video_frame.height)), (0, 0, 0), -1)
- cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - args.borders/100)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1)
-
- # Process video and data frame
- try:
-
- # Detect aruco markers into frame
- ar_env.aruco_detector.detect_markers(video_frame.matrix)
-
- # Estimate markers poses
- ar_env.aruco_detector.estimate_markers_pose()
-
- # Estimate scene pose from ArUco markers into frame.
- tvec, rmat, _ = ar_scene.estimate_pose(ar_env.aruco_detector.detected_markers)
-
- # Project AOI scene into frame according estimated pose
- aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV)
-
- # Draw scene axis
- ar_scene.draw_axis(visu_frame.matrix)
-
- # Draw scene places
- ar_scene.draw_places(visu_frame.matrix)
-
- # Draw AOI
- aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255))
-
- # Draw detected markers
- ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- # Catch exceptions raised by estimate_pose and project methods
- except (ArFeatures.PoseEstimationFailed, ArFeatures.SceneProjectionFailed) as e:
-
- # Draw detected markers
- ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix)
-
- cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1)
- cv.putText(visu_frame.matrix, str(e), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Assess loop performance
- lap_time, lap_counter, elapsed_time = loop_chrono.lap()
-
- # Update fps each 10 loops
- if lap_counter >= 10:
-
- fps = 1e3 * lap_counter / elapsed_time
- loop_chrono.restart()
-
- # Write stream timing
- cv.rectangle(visu_frame.matrix, (0, 0), (700, 50), (63, 63, 63), -1)
- cv.putText(visu_frame.matrix, f'Video stream time: {int(video_ts*1e-3)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'Fps: {int(fps)}', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- cv.imshow(f'Stream ArUco AOI', visu_frame.matrix)
-
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
- # Stop streaming
- tobii_controller.stop_streaming()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file
diff --git a/src/argaze/utils/tobii_stream_display.py b/src/argaze/utils/tobii_stream_display.py
deleted file mode 100644
index b979e03..0000000
--- a/src/argaze/utils/tobii_stream_display.py
+++ /dev/null
@@ -1,218 +0,0 @@
-#!/usr/bin/env python
-
-import argparse
-
-from argaze import DataStructures, GazeFeatures
-from argaze.TobiiGlassesPro2 import *
-from argaze.utils import MiscFeatures
-
-import cv2 as cv
-import numpy
-
-def main():
- """
- Capture video camera and gaze data streams and synchronise them.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
- parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath')
-
- args = parser.parse_args()
-
- # Create tobii controller (with auto discovery network process if no ip argument is provided)
- print("Looking for a Tobii Glasses Pro 2 device ...")
-
- try:
-
- tobii_controller = TobiiController.TobiiController(args.tobii_ip)
- print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
-
- except ConnectionError as e:
-
- print(e)
- exit()
-
- # Setup camera at 25 fps to work on Full HD video stream
- tobii_controller.set_scene_camera_freq_25()
-
- # Print current confirugration
- print(f'Tobii Glasses Pro 2 configuration:')
- for key, value in tobii_controller.get_configuration().items():
- print(f'\t{key}: {value}')
-
- # Enable tobii data stream
- tobii_data_stream = tobii_controller.enable_data_stream()
-
- # Enable tobii video stream
- tobii_video_stream = tobii_controller.enable_video_stream()
-
- # Create tobii imu handler
- tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit()
-
- # Load optional imu calibration file
- if args.imu_calibration != None:
-
- tobii_imu.load_calibration_file(args.imu_calibration)
-
- # Init head rotation speed
- head_rotation_speed = numpy.zeros(3).astype(int)
-
- # Init gaze position and precision
- gaze_position_px = (0, 0)
- gaze_precision_px = 0
-
- # Init data timestamped in millisecond
- data_ts_ms = 0
-
- # Assess temporal performance
- loop_chrono = MiscFeatures.TimeProbe()
- gyroscope_chrono = MiscFeatures.TimeProbe()
- gaze_chrono = MiscFeatures.TimeProbe()
-
- loop_ps = 0
- gyroscope_ps = 0
- gaze_ps = 0
-
- def data_stream_callback(data_ts, data_object, data_object_type):
-
- nonlocal head_rotation_speed
- nonlocal gaze_position_px
- nonlocal gaze_precision_px
- nonlocal data_ts_ms
- nonlocal gyroscope_chrono
- nonlocal gaze_chrono
-
- data_ts_ms = data_ts / 1e3
-
- match data_object_type:
-
- case 'Gyroscope':
-
- # Assess gyroscope stream performance
- gyroscope_chrono.lap()
-
- # Apply imu calibration
- head_rotation_speed = tobii_imu.apply_gyroscope_offset(data_object).value.astype(int) * 5
-
- case 'GazePosition':
-
- # Assess gaze position stream performance
- gaze_chrono.lap()
-
- # Ignore frame when gaze position is not valid
- if data_object.validity == 0:
-
- gaze_position_px = (int(data_object.value[0] * video_frame.width), int(data_object.value[1] * video_frame.height))
-
- case 'GazePosition3D':
-
- # Ignore frame when gaze position 3D is not valid
- if data_object.validity == 0:
-
- gaze_precision_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.PRECISION)) * data_object.value[2]
- tobii_camera_hfov_mm = numpy.tan(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV / 2)) * data_object.value[2]
-
- gaze_precision_px = round(video_frame.width * float(gaze_precision_mm) / float(tobii_camera_hfov_mm))
-
- # Subscribe to tobii data stream
- tobii_data_stream.subscribe(data_stream_callback)
-
- # Start streaming
- tobii_controller.start_streaming()
-
- # Live video stream capture loop
- try:
-
- while tobii_video_stream.is_alive():
-
- # Read video stream
- video_ts, video_frame = tobii_video_stream.read()
- video_ts_ms = video_ts / 1e3
-
- # Assess loop performance
- lap_time, lap_counter, elapsed_time = loop_chrono.lap()
-
- # Update fps each 10 loops
- if lap_counter >= 10:
-
- loop_ps = 1e3 * lap_counter / elapsed_time
- loop_chrono.restart()
-
- # Assess gyroscope streaming performance
- elapsed_time, lap_counter = gyroscope_chrono.end()
- gyroscope_ps = 1e3 * lap_counter / elapsed_time
- gyroscope_chrono.restart()
-
- # Assess gaze streaming performance
- elapsed_time, lap_counter = gaze_chrono.end()
- gaze_ps = 1e3 * lap_counter / elapsed_time
- gaze_chrono.restart()
-
- # Draw head rotation speed considering only yaw and pitch values
- cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2)), (int(video_frame.width/2) + head_rotation_speed[1], int(video_frame.height/2) - head_rotation_speed[0]), (150, 150, 150), 3)
-
- # Draw gaze
- gaze_position = GazeFeatures.GazePosition(gaze_position_px, precision=gaze_precision_px)
- gaze_position.draw(video_frame.matrix)
-
- # Draw center
- cv.line(video_frame.matrix, (int(video_frame.width/2) - 50, int(video_frame.height/2)), (int(video_frame.width/2) + 50, int(video_frame.height/2)), (255, 150, 150), 1)
- cv.line(video_frame.matrix, (int(video_frame.width/2), int(video_frame.height/2) - 50), (int(video_frame.width/2), int(video_frame.height/2) + 50), (255, 150, 150), 1)
-
- # Write stream timing
- cv.rectangle(video_frame.matrix, (0, 0), (1100, 50), (63, 63, 63), -1)
- cv.putText(video_frame.matrix, f'Data stream time: {int(data_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(video_frame.matrix, f'Video delay: {int(data_ts_ms - video_ts_ms)} ms', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(video_frame.matrix, f'Fps: {int(loop_ps)}', (950, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- cv.rectangle(video_frame.matrix, (0, 50), (580, 100), (127, 127, 127), -1)
- cv.putText(video_frame.matrix, f'Gyroscope fps: {int(gyroscope_ps)}', (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- cv.putText(video_frame.matrix, f'Gaze fps: {int(gaze_ps)}', (350, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- cv.imshow(f'Video and data stream', video_frame.matrix)
-
- key_pressed = cv.waitKey(1)
-
- #if key_pressed != -1:
- # print(key_pressed)
-
- # Set Auto scene camera preset with 'a' key
- if key_pressed == 97:
- tobii_controller.set_scene_camera_auto_preset()
- print('Tobii Glasses Pro 2 scene camera in Auto mode')
-
- # Set GazeExposure scene camera preset with 'z' key
- if key_pressed == 122:
- tobii_controller.set_scene_camera_gaze_preset()
- print('Tobii Glasses Pro 2 scene camera in GazeExposure mode')
-
- # Set Indoor eye camera preset with 'i' key
- if key_pressed == 105:
- tobii_controller.set_eye_camera_indoor_preset()
- print('Tobii Glasses Pro 2 eye camera in Indoor mode')
-
- # Set Outdoor eye camera preset with 'o' key
- if key_pressed == 111:
- tobii_controller.set_eye_camera_outdoor_preset()
- print('Tobii Glasses Pro 2 eye camera in Outdoor mode')
-
-
- # Close window using 'Esc' key
- if key_pressed == 27:
- break
-
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
-
- # Stop frame display
- cv.destroyAllWindows()
-
- # Stop streaming
- tobii_controller.stop_streaming()
-
-if __name__ == '__main__':
-
- main() \ No newline at end of file