aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/utils
diff options
context:
space:
mode:
Diffstat (limited to 'src/argaze/utils')
-rw-r--r--src/argaze/utils/UtilsFeatures.py35
-rw-r--r--src/argaze/utils/__init__.py4
-rw-r--r--src/argaze/utils/aruco_markers_group_export.py179
-rw-r--r--src/argaze/utils/contexts/OpenCV.py8
-rw-r--r--src/argaze/utils/contexts/TobiiProGlasses2.py124
5 files changed, 130 insertions, 220 deletions
diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py
index f38d041..3f2ceda 100644
--- a/src/argaze/utils/UtilsFeatures.py
+++ b/src/argaze/utils/UtilsFeatures.py
@@ -77,36 +77,6 @@ def import_from_test_package(module: str) -> types.ModuleType:
return TestModule
-class ExitSignalHandler():
- """
- Handle exit event
- """
-
- def __init__(self):
-
- import signal
- import threading
-
- global __exit_event
- global __on_exit_signal
-
- __exit_event = threading.Event()
-
- def __on_exit_signal(signo, _frame):
- __exit_event.set()
-
- for sig in ('TERM', 'HUP', 'INT'):
- signal.signal(getattr(signal, 'SIG'+sig), __on_exit_signal)
-
- def status(self) -> bool:
- """
- Get exit status.
-
- Returns:
- exit status
- """
- return __exit_event.is_set()
-
class TimeProbe():
"""
Assess temporal performance.
@@ -121,8 +91,11 @@ class TimeProbe():
Start chronometer.
"""
+ # noinspection PyAttributeOutsideInit
self.__last_time = time.perf_counter()
+ # noinspection PyAttributeOutsideInit
self.__lap_counter = 0
+ # noinspection PyAttributeOutsideInit
self.__elapsed_time = 0
def lap(self) -> tuple[float, int, float]:
@@ -137,6 +110,7 @@ class TimeProbe():
lap_time = time.perf_counter() - self.__last_time
+ # noinspection PyAttributeOutsideInit
self.__last_time = time.perf_counter()
self.__lap_counter += 1
self.__elapsed_time += lap_time
@@ -174,6 +148,7 @@ def PrintCallStack(method):
"""Wrap method to print call stack before its call.
Parameters:
+ self:
args: method arguments.
kwargs: extra arguments.
"""
diff --git a/src/argaze/utils/__init__.py b/src/argaze/utils/__init__.py
index a2322bb..2cee626 100644
--- a/src/argaze/utils/__init__.py
+++ b/src/argaze/utils/__init__.py
@@ -1,4 +1,4 @@
"""
-Miscelleaneous utilities.
+Miscellaneous utilities.
"""
-__all__ = ['UtilsFeatures', 'Providers'] \ No newline at end of file
+__all__ = ['UtilsFeatures', 'contexts'] \ No newline at end of file
diff --git a/src/argaze/utils/aruco_markers_group_export.py b/src/argaze/utils/aruco_markers_group_export.py
index dc1f673..46507b8 100644
--- a/src/argaze/utils/aruco_markers_group_export.py
+++ b/src/argaze/utils/aruco_markers_group_export.py
@@ -19,15 +19,14 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
import argparse
-import time
-import itertools
+import contextlib
+import cv2
+
+from argaze import DataFeatures
from argaze.ArUcoMarkers import ArUcoDetector, ArUcoOpticCalibrator, ArUcoMarkersGroup
from argaze.utils import UtilsFeatures
-import cv2
-import numpy
-
def main():
"""
Detect DICTIONARY and SIZE ArUco markers inside a MOVIE frame then, export detected ArUco markers group as .obj file into an OUTPUT folder.
@@ -69,7 +68,7 @@ def main():
configuration["optic_parameters"] = args.optic_parameters
# Load ArUco detector configuration
- aruco_detector = ArUcoDetector.ArUcoDetector.from_dict(configuration, '.')
+ aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
if args.verbose:
@@ -89,131 +88,131 @@ def main():
# Create a window
cv2.namedWindow("Export detected ArUco markers", cv2.WINDOW_AUTOSIZE)
- # Enable exit signal handler
- exit = UtilsFeatures.ExitSignalHandler()
-
# Init image selection
current_image_index = -1
_, current_image = video_capture.read()
next_image_index = int(args.start * video_fps)
refresh = False
- while not exit.status():
+ # Waiting for 'ctrl+C' interruption
+ with contextlib.suppress(KeyboardInterrupt):
- # Select a new image and detect markers once
- if next_image_index != current_image_index or refresh:
+ while True:
- video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index)
+ # Select a new image and detect markers once
+ if next_image_index != current_image_index or refresh:
- success, video_image = video_capture.read()
+ video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index)
- video_height, video_width, _ = video_image.shape
+ success, video_image = video_capture.read()
- # Create default optic parameters adapted to frame size
- if aruco_detector.optic_parameters is None:
+ video_height, video_width, _ = video_image.shape
- # Note: The choice of 1000 for default focal length should be discussed...
- aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=video_width, height=video_height))
+ # Create default optic parameters adapted to frame size
+ if aruco_detector.optic_parameters is None:
- if success:
+ # Note: The choice of 1000 for default focal length should be discussed...
+ aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=video_width, height=video_height))
- # Refresh once
- refresh = False
+ if success:
- current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1
- current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC)
+ # Refresh once
+ refresh = False
- try:
+ current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1
+ current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC)
- # Detect and project AR features
- aruco_detector.detect_markers(video_image)
+ try:
- # Estimate all detected markers pose
- aruco_detector.estimate_markers_pose(args.size)
+ # Detect and project AR features
+ aruco_detector.detect_markers(video_image)
- # Build aruco scene from detected markers
- aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, aruco_detector.detected_markers())
+ # Estimate all detected markers pose
+ aruco_detector.estimate_markers_pose(args.size)
- # Detection suceeded
- exception = None
+ # Build aruco scene from detected markers
+ aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, aruco_detector.detected_markers())
- # Write errors
- except Exception as e:
+ # Detection succeeded
+ exception = None
- aruco_markers_group = None
+ # Write errors
+ except Exception as e:
- exception = e
-
- # Draw detected markers
- aruco_detector.draw_detected_markers(video_image, draw_parameters)
+ aruco_markers_group = None
- # Write detected markers
- cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', (20, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
-
- # Write timing
- cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
-
- # Write exception
- if exception is not None:
+ exception = e
- cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ # Draw detected markers
+ aruco_detector.draw_detected_markers(video_image, draw_parameters)
- # Write documentation
- cv2.putText(video_image, f'<- previous image', (video_width-500, video_height-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'-> next image', (video_width-500, video_height-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'r: reload config', (video_width-500, video_height-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width-500, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
-
- # Copy image
- current_image = video_image.copy()
+ # Write detected markers
+ cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', (20, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
- # Keep last image
- else:
+ # Write timing
+ cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
- video_image = current_image.copy()
+ # Write exception
+ if exception is not None:
- key_pressed = cv2.waitKey(10)
+ cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- #if key_pressed != -1:
- # print(key_pressed)
+ # Write documentation
+ cv2.putText(video_image, f'<- previous image', (video_width-500, video_height-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(video_image, f'-> next image', (video_width-500, video_height-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(video_image, f'r: reload config', (video_width-500, video_height-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width-500, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- # Select previous image with left arrow
- if key_pressed == 2:
- next_image_index -= 1
+ # Copy image
+ current_image = video_image.copy()
- # Select next image with right arrow
- if key_pressed == 3:
- next_image_index += 1
+ # Keep last image
+ else:
- # Clip image index
- if next_image_index < 0:
- next_image_index = 0
+ video_image = current_image.copy()
- # r: reload configuration
- if key_pressed == 114:
-
- aruco_detector = ArUcoDetector.ArUcoDetector.from_dict(configuration)
- refresh = True
- print('Configuration reloaded')
+ key_pressed = cv2.waitKey(10)
- # Save selected marker edition using 'Ctrl + s'
- if key_pressed == 19:
+ #if key_pressed != -1:
+ # print(key_pressed)
- if aruco_markers_group:
+ # Select previous image with left arrow
+ if key_pressed == 2:
+ next_image_index -= 1
- aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj')
- print(f'ArUco markers saved into {args.output}')
+ # Select next image with right arrow
+ if key_pressed == 3:
+ next_image_index += 1
- else:
+ # Clip image index
+ if next_image_index < 0:
+ next_image_index = 0
+
+ # r: reload configuration
+ if key_pressed == 114:
+
+ aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
+ refresh = True
+ print('Configuration reloaded')
+
+ # Save selected marker edition using 'Ctrl + s'
+ if key_pressed == 19:
+
+ if aruco_markers_group:
+
+ aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj')
+ print(f'ArUco markers saved into {args.output}')
+
+ else:
- print(f'No ArUco markers to export')
+ print(f'No ArUco markers to export')
- # Close window using 'Esc' key
- if key_pressed == 27:
- break
+ # Close window using 'Esc' key
+ if key_pressed == 27:
+ break
- # Display video
- cv2.imshow(aruco_detector.name, video_image)
+ # Display video
+ cv2.imshow(aruco_detector.name, video_image)
# Close movie capture
video_capture.release()
diff --git a/src/argaze/utils/contexts/OpenCV.py b/src/argaze/utils/contexts/OpenCV.py
index 25b3dd7..f89189d 100644
--- a/src/argaze/utils/contexts/OpenCV.py
+++ b/src/argaze/utils/contexts/OpenCV.py
@@ -16,16 +16,14 @@ __credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
-import sys
import logging
import time
-from argaze import ArFeatures, DataFeatures, GazeFeatures
-from argaze.utils import UtilsFeatures
-
-import numpy
import cv2
+from argaze import ArFeatures, DataFeatures
+
+
class Window(ArFeatures.ArContext):
@DataFeatures.PipelineStepInit
diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py
index f2bc6a7..f83c1ac 100644
--- a/src/argaze/utils/contexts/TobiiProGlasses2.py
+++ b/src/argaze/utils/contexts/TobiiProGlasses2.py
@@ -17,17 +17,17 @@ __credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
-import sys
-import os
+import collections
+import datetime
+import gzip
+import json
import logging
+import math
+import os
import socket
+import sys
import threading
-import collections
-import json
import time
-import math
-import gzip
-import datetime
import uuid
from dataclasses import dataclass
@@ -41,8 +41,7 @@ except ImportError:
from urllib import urlencode
from urllib2 import urlopen, Request, HTTPError, URLError
-from argaze import ArFeatures, DataFeatures, GazeFeatures
-from argaze.utils import UtilsFeatures
+from argaze import ArFeatures, DataFeatures
import numpy
import cv2
@@ -131,8 +130,8 @@ class Gyroscope():
"""Gyroscope value"""
@dataclass
-class PupillCenter():
- """Define pupill center data (gidx pc eye)."""
+class PupilCenter():
+ """Define pupil center data (gidx pc eye)."""
validity: int
index: int
@@ -140,8 +139,8 @@ class PupillCenter():
eye: str # 'right' or 'left'
@dataclass
-class PupillDiameter():
- """Define pupill diameter data (gidx pd eye)."""
+class PupilDiameter():
+ """Define pupil diameter data (gidx pd eye)."""
validity: int
index: int
@@ -193,13 +192,13 @@ class TobiiJsonDataParser():
'ets': self.__parse_event,
'ac': self.__parse_accelerometer,
'gy': self.__parse_gyroscope,
- 'gidx': self.__parse_pupill_or_gaze,
+ 'gidx': self.__parse_pupil_or_gaze,
'marker3d': self.__parse_marker_position
}
- self.__parse_pupill_or_gaze_map = {
- 'pc': self.__parse_pupill_center,
- 'pd': self.__parse_pupill_diameter,
+ self.__parse_pupil_or_gaze_map = {
+ 'pc': self.__parse_pupil_center,
+ 'pd': self.__parse_pupil_diameter,
'gd': self.__parse_gaze_direction,
'l': self.__parse_gaze_position,
'gp3': self.__parse_gaze_position_3d
@@ -237,14 +236,14 @@ class TobiiJsonDataParser():
return data_object, data_object_type
- def __parse_pupill_or_gaze(self, status, data):
+ def __parse_pupil_or_gaze(self, status, data):
gaze_index = data.pop('gidx')
- # parse pupill or gaze data depending second json key
+ # parse pupil or gaze data depending second json key
second_key = next(iter(data))
- return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, data)
+ return self.__parse_pupil_or_gaze_map[second_key](status, gaze_index, data)
def __parse_dir_sig(self, status, data):
@@ -283,13 +282,13 @@ class TobiiJsonDataParser():
return Gyroscope(data['gy'])
- def __parse_pupill_center(self, status, gaze_index, data):
+ def __parse_pupil_center(self, status, gaze_index, data):
- return PupillCenter(status, gaze_index, data['pc'], data['eye'])
+ return PupilCenter(status, gaze_index, data['pc'], data['eye'])
- def __parse_pupill_diameter(self, status, gaze_index, data):
+ def __parse_pupil_diameter(self, status, gaze_index, data):
- return PupillDiameter(status, gaze_index, data['pd'], data['eye'])
+ return PupilDiameter(status, gaze_index, data['pd'], data['eye'])
def __parse_gaze_direction(self, status, gaze_index, data):
@@ -356,6 +355,7 @@ class LiveStream(ArFeatures.ArContext):
else:
+ # noinspection PyAttributeOutsideInit
self.__base_url = 'http://' + self.__address
@property
@@ -588,7 +588,7 @@ class LiveStream(ArFeatures.ArContext):
@DataFeatures.PipelineStepImage
def image(self, draw_something: bool = None, **kwargs: dict) -> numpy.array:
- """Get Tobbi visualisation.
+ """Get Tobii visualisation.
Parameters:
draw_something: example
@@ -886,52 +886,6 @@ class LiveStream(ArFeatures.ArContext):
# CALIBRATION
- def calibration_start(self, project_name, participant_name):
- """Start calibration process for project and participant."""
-
- project_id = self.__get_project_id(project_name)
- participant_id = self.get_participant_id(participant_name)
-
- # Init calibration id
- self.__calibration_id = None
-
- # Calibration have to be done for a project and a participant
- if project_id is None or participant_id is None:
-
- raise Exception(f'Setup project and participant before')
-
- data = {
- 'ca_project': project_id,
- 'ca_type': 'default',
- 'ca_participant': participant_id,
- 'ca_created': self.__get_current_datetime()
- }
-
- # Request calibration
- json_data = self.__post_request('/api/calibrations', data)
- self.__calibration_id = json_data['ca_id']
-
- # Start calibration
- self.__post_request('/api/calibrations/' + self.__calibration_id + '/start')
-
- def calibration_status(self) -> str:
- """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed."""
-
- if self.__calibration_id is not None:
-
- status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
-
- # Forget calibration id
- if status != 'calibrating':
-
- self.__calibration_id = None
-
- return status
-
- else:
-
- raise Exception(f'Start calibration before')
-
def calibrate(self, project_name, participant_name):
"""Handle whole Tobii glasses calibration process."""
@@ -975,6 +929,7 @@ class LiveStream(ArFeatures.ArContext):
# Request calibration
json_data = self.__post_request('/api/calibrations', data)
+ # noinspection PyAttributeOutsideInit
self.__calibration_id = json_data['ca_id']
# Start calibration
@@ -990,6 +945,7 @@ class LiveStream(ArFeatures.ArContext):
# Forget calibration id
if status != 'calibrating':
+ # noinspection PyAttributeOutsideInit
self.__calibration_id = None
return status
@@ -998,24 +954,6 @@ class LiveStream(ArFeatures.ArContext):
raise Exception(f'Start calibration before')
- def calibrate(self, project_name, participant_name):
- """Handle whole Tobii glasses calibration process."""
-
- # Start calibration
- self.calibration_start(project_name, participant_name)
-
- # While calibrating...
- status = self.calibration_status()
-
- while status == 'calibrating':
-
- time.sleep(1)
- status = self.calibration_status()
-
- if status == 'uncalibrated' or status == 'stale' or status == 'failed':
-
- raise Exception(f'Calibration {status}')
-
# RECORDING FEATURES
def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']):
@@ -1086,7 +1024,7 @@ class LiveStream(ArFeatures.ArContext):
return False
def get_recordings(self) -> str:
- """Get all recordings id."""
+ """Get all recordings' id."""
return self.__get_request('/api/recordings')
@@ -1185,7 +1123,7 @@ class PostProcessing(ArFeatures.ArContext):
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
- # Init ArContext classe
+ # Init ArContext class
super().__init__()
# Init private attributes
@@ -1202,8 +1140,8 @@ class PostProcessing(ArFeatures.ArContext):
'Event': 0,
'Accelerometer': 0,
'Gyroscope': 0,
- 'PupillCenter': 0,
- 'PupillDiameter': 0,
+ 'PupilCenter': 0,
+ 'PupilDiameter': 0,
'GazeDirection': 0,
'GazePosition': 0,
'GazePosition3D': 0,