diff options
Diffstat (limited to 'src/argaze/utils')
-rw-r--r-- | src/argaze/utils/UtilsFeatures.py | 35 | ||||
-rw-r--r-- | src/argaze/utils/__init__.py | 4 | ||||
-rw-r--r-- | src/argaze/utils/aruco_markers_group_export.py | 179 | ||||
-rw-r--r-- | src/argaze/utils/contexts/OpenCV.py | 8 | ||||
-rw-r--r-- | src/argaze/utils/contexts/TobiiProGlasses2.py | 124 |
5 files changed, 130 insertions, 220 deletions
diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py index f38d041..3f2ceda 100644 --- a/src/argaze/utils/UtilsFeatures.py +++ b/src/argaze/utils/UtilsFeatures.py @@ -77,36 +77,6 @@ def import_from_test_package(module: str) -> types.ModuleType: return TestModule -class ExitSignalHandler(): - """ - Handle exit event - """ - - def __init__(self): - - import signal - import threading - - global __exit_event - global __on_exit_signal - - __exit_event = threading.Event() - - def __on_exit_signal(signo, _frame): - __exit_event.set() - - for sig in ('TERM', 'HUP', 'INT'): - signal.signal(getattr(signal, 'SIG'+sig), __on_exit_signal) - - def status(self) -> bool: - """ - Get exit status. - - Returns: - exit status - """ - return __exit_event.is_set() - class TimeProbe(): """ Assess temporal performance. @@ -121,8 +91,11 @@ class TimeProbe(): Start chronometer. """ + # noinspection PyAttributeOutsideInit self.__last_time = time.perf_counter() + # noinspection PyAttributeOutsideInit self.__lap_counter = 0 + # noinspection PyAttributeOutsideInit self.__elapsed_time = 0 def lap(self) -> tuple[float, int, float]: @@ -137,6 +110,7 @@ class TimeProbe(): lap_time = time.perf_counter() - self.__last_time + # noinspection PyAttributeOutsideInit self.__last_time = time.perf_counter() self.__lap_counter += 1 self.__elapsed_time += lap_time @@ -174,6 +148,7 @@ def PrintCallStack(method): """Wrap method to print call stack before its call. Parameters: + self: args: method arguments. kwargs: extra arguments. """ diff --git a/src/argaze/utils/__init__.py b/src/argaze/utils/__init__.py index a2322bb..2cee626 100644 --- a/src/argaze/utils/__init__.py +++ b/src/argaze/utils/__init__.py @@ -1,4 +1,4 @@ """ -Miscelleaneous utilities. +Miscellaneous utilities. """ -__all__ = ['UtilsFeatures', 'Providers']
\ No newline at end of file +__all__ = ['UtilsFeatures', 'contexts']
\ No newline at end of file diff --git a/src/argaze/utils/aruco_markers_group_export.py b/src/argaze/utils/aruco_markers_group_export.py index dc1f673..46507b8 100644 --- a/src/argaze/utils/aruco_markers_group_export.py +++ b/src/argaze/utils/aruco_markers_group_export.py @@ -19,15 +19,14 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import argparse -import time -import itertools +import contextlib +import cv2 + +from argaze import DataFeatures from argaze.ArUcoMarkers import ArUcoDetector, ArUcoOpticCalibrator, ArUcoMarkersGroup from argaze.utils import UtilsFeatures -import cv2 -import numpy - def main(): """ Detect DICTIONARY and SIZE ArUco markers inside a MOVIE frame then, export detected ArUco markers group as .obj file into an OUTPUT folder. @@ -69,7 +68,7 @@ def main(): configuration["optic_parameters"] = args.optic_parameters # Load ArUco detector configuration - aruco_detector = ArUcoDetector.ArUcoDetector.from_dict(configuration, '.') + aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration) if args.verbose: @@ -89,131 +88,131 @@ def main(): # Create a window cv2.namedWindow("Export detected ArUco markers", cv2.WINDOW_AUTOSIZE) - # Enable exit signal handler - exit = UtilsFeatures.ExitSignalHandler() - # Init image selection current_image_index = -1 _, current_image = video_capture.read() next_image_index = int(args.start * video_fps) refresh = False - while not exit.status(): + # Waiting for 'ctrl+C' interruption + with contextlib.suppress(KeyboardInterrupt): - # Select a new image and detect markers once - if next_image_index != current_image_index or refresh: + while True: - video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index) + # Select a new image and detect markers once + if next_image_index != current_image_index or refresh: - success, video_image = video_capture.read() + video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index) - video_height, video_width, _ = video_image.shape + success, video_image = video_capture.read() - # Create default optic parameters adapted to frame size - if aruco_detector.optic_parameters is None: + video_height, video_width, _ = video_image.shape - # Note: The choice of 1000 for default focal length should be discussed... - aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=video_width, height=video_height)) + # Create default optic parameters adapted to frame size + if aruco_detector.optic_parameters is None: - if success: + # Note: The choice of 1000 for default focal length should be discussed... + aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=video_width, height=video_height)) - # Refresh once - refresh = False + if success: - current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1 - current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC) + # Refresh once + refresh = False - try: + current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1 + current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC) - # Detect and project AR features - aruco_detector.detect_markers(video_image) + try: - # Estimate all detected markers pose - aruco_detector.estimate_markers_pose(args.size) + # Detect and project AR features + aruco_detector.detect_markers(video_image) - # Build aruco scene from detected markers - aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, aruco_detector.detected_markers()) + # Estimate all detected markers pose + aruco_detector.estimate_markers_pose(args.size) - # Detection suceeded - exception = None + # Build aruco scene from detected markers + aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, aruco_detector.detected_markers()) - # Write errors - except Exception as e: + # Detection succeeded + exception = None - aruco_markers_group = None + # Write errors + except Exception as e: - exception = e - - # Draw detected markers - aruco_detector.draw_detected_markers(video_image, draw_parameters) + aruco_markers_group = None - # Write detected markers - cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', (20, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - - # Write timing - cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - - # Write exception - if exception is not None: + exception = e - cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + # Draw detected markers + aruco_detector.draw_detected_markers(video_image, draw_parameters) - # Write documentation - cv2.putText(video_image, f'<- previous image', (video_width-500, video_height-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'-> next image', (video_width-500, video_height-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'r: reload config', (video_width-500, video_height-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width-500, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - - # Copy image - current_image = video_image.copy() + # Write detected markers + cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', (20, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - # Keep last image - else: + # Write timing + cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) - video_image = current_image.copy() + # Write exception + if exception is not None: - key_pressed = cv2.waitKey(10) + cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - #if key_pressed != -1: - # print(key_pressed) + # Write documentation + cv2.putText(video_image, f'<- previous image', (video_width-500, video_height-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'-> next image', (video_width-500, video_height-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'r: reload config', (video_width-500, video_height-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width-500, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - # Select previous image with left arrow - if key_pressed == 2: - next_image_index -= 1 + # Copy image + current_image = video_image.copy() - # Select next image with right arrow - if key_pressed == 3: - next_image_index += 1 + # Keep last image + else: - # Clip image index - if next_image_index < 0: - next_image_index = 0 + video_image = current_image.copy() - # r: reload configuration - if key_pressed == 114: - - aruco_detector = ArUcoDetector.ArUcoDetector.from_dict(configuration) - refresh = True - print('Configuration reloaded') + key_pressed = cv2.waitKey(10) - # Save selected marker edition using 'Ctrl + s' - if key_pressed == 19: + #if key_pressed != -1: + # print(key_pressed) - if aruco_markers_group: + # Select previous image with left arrow + if key_pressed == 2: + next_image_index -= 1 - aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj') - print(f'ArUco markers saved into {args.output}') + # Select next image with right arrow + if key_pressed == 3: + next_image_index += 1 - else: + # Clip image index + if next_image_index < 0: + next_image_index = 0 + + # r: reload configuration + if key_pressed == 114: + + aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration) + refresh = True + print('Configuration reloaded') + + # Save selected marker edition using 'Ctrl + s' + if key_pressed == 19: + + if aruco_markers_group: + + aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj') + print(f'ArUco markers saved into {args.output}') + + else: - print(f'No ArUco markers to export') + print(f'No ArUco markers to export') - # Close window using 'Esc' key - if key_pressed == 27: - break + # Close window using 'Esc' key + if key_pressed == 27: + break - # Display video - cv2.imshow(aruco_detector.name, video_image) + # Display video + cv2.imshow(aruco_detector.name, video_image) # Close movie capture video_capture.release() diff --git a/src/argaze/utils/contexts/OpenCV.py b/src/argaze/utils/contexts/OpenCV.py index 25b3dd7..f89189d 100644 --- a/src/argaze/utils/contexts/OpenCV.py +++ b/src/argaze/utils/contexts/OpenCV.py @@ -16,16 +16,14 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import sys import logging import time -from argaze import ArFeatures, DataFeatures, GazeFeatures -from argaze.utils import UtilsFeatures - -import numpy import cv2 +from argaze import ArFeatures, DataFeatures + + class Window(ArFeatures.ArContext): @DataFeatures.PipelineStepInit diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py index f2bc6a7..f83c1ac 100644 --- a/src/argaze/utils/contexts/TobiiProGlasses2.py +++ b/src/argaze/utils/contexts/TobiiProGlasses2.py @@ -17,17 +17,17 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import sys -import os +import collections +import datetime +import gzip +import json import logging +import math +import os import socket +import sys import threading -import collections -import json import time -import math -import gzip -import datetime import uuid from dataclasses import dataclass @@ -41,8 +41,7 @@ except ImportError: from urllib import urlencode from urllib2 import urlopen, Request, HTTPError, URLError -from argaze import ArFeatures, DataFeatures, GazeFeatures -from argaze.utils import UtilsFeatures +from argaze import ArFeatures, DataFeatures import numpy import cv2 @@ -131,8 +130,8 @@ class Gyroscope(): """Gyroscope value""" @dataclass -class PupillCenter(): - """Define pupill center data (gidx pc eye).""" +class PupilCenter(): + """Define pupil center data (gidx pc eye).""" validity: int index: int @@ -140,8 +139,8 @@ class PupillCenter(): eye: str # 'right' or 'left' @dataclass -class PupillDiameter(): - """Define pupill diameter data (gidx pd eye).""" +class PupilDiameter(): + """Define pupil diameter data (gidx pd eye).""" validity: int index: int @@ -193,13 +192,13 @@ class TobiiJsonDataParser(): 'ets': self.__parse_event, 'ac': self.__parse_accelerometer, 'gy': self.__parse_gyroscope, - 'gidx': self.__parse_pupill_or_gaze, + 'gidx': self.__parse_pupil_or_gaze, 'marker3d': self.__parse_marker_position } - self.__parse_pupill_or_gaze_map = { - 'pc': self.__parse_pupill_center, - 'pd': self.__parse_pupill_diameter, + self.__parse_pupil_or_gaze_map = { + 'pc': self.__parse_pupil_center, + 'pd': self.__parse_pupil_diameter, 'gd': self.__parse_gaze_direction, 'l': self.__parse_gaze_position, 'gp3': self.__parse_gaze_position_3d @@ -237,14 +236,14 @@ class TobiiJsonDataParser(): return data_object, data_object_type - def __parse_pupill_or_gaze(self, status, data): + def __parse_pupil_or_gaze(self, status, data): gaze_index = data.pop('gidx') - # parse pupill or gaze data depending second json key + # parse pupil or gaze data depending second json key second_key = next(iter(data)) - return self.__parse_pupill_or_gaze_map[second_key](status, gaze_index, data) + return self.__parse_pupil_or_gaze_map[second_key](status, gaze_index, data) def __parse_dir_sig(self, status, data): @@ -283,13 +282,13 @@ class TobiiJsonDataParser(): return Gyroscope(data['gy']) - def __parse_pupill_center(self, status, gaze_index, data): + def __parse_pupil_center(self, status, gaze_index, data): - return PupillCenter(status, gaze_index, data['pc'], data['eye']) + return PupilCenter(status, gaze_index, data['pc'], data['eye']) - def __parse_pupill_diameter(self, status, gaze_index, data): + def __parse_pupil_diameter(self, status, gaze_index, data): - return PupillDiameter(status, gaze_index, data['pd'], data['eye']) + return PupilDiameter(status, gaze_index, data['pd'], data['eye']) def __parse_gaze_direction(self, status, gaze_index, data): @@ -356,6 +355,7 @@ class LiveStream(ArFeatures.ArContext): else: + # noinspection PyAttributeOutsideInit self.__base_url = 'http://' + self.__address @property @@ -588,7 +588,7 @@ class LiveStream(ArFeatures.ArContext): @DataFeatures.PipelineStepImage def image(self, draw_something: bool = None, **kwargs: dict) -> numpy.array: - """Get Tobbi visualisation. + """Get Tobii visualisation. Parameters: draw_something: example @@ -886,52 +886,6 @@ class LiveStream(ArFeatures.ArContext): # CALIBRATION - def calibration_start(self, project_name, participant_name): - """Start calibration process for project and participant.""" - - project_id = self.__get_project_id(project_name) - participant_id = self.get_participant_id(participant_name) - - # Init calibration id - self.__calibration_id = None - - # Calibration have to be done for a project and a participant - if project_id is None or participant_id is None: - - raise Exception(f'Setup project and participant before') - - data = { - 'ca_project': project_id, - 'ca_type': 'default', - 'ca_participant': participant_id, - 'ca_created': self.__get_current_datetime() - } - - # Request calibration - json_data = self.__post_request('/api/calibrations', data) - self.__calibration_id = json_data['ca_id'] - - # Start calibration - self.__post_request('/api/calibrations/' + self.__calibration_id + '/start') - - def calibration_status(self) -> str: - """Ask for calibration status: calibrating, calibrated, stale, uncalibrated or failed.""" - - if self.__calibration_id is not None: - - status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) - - # Forget calibration id - if status != 'calibrating': - - self.__calibration_id = None - - return status - - else: - - raise Exception(f'Start calibration before') - def calibrate(self, project_name, participant_name): """Handle whole Tobii glasses calibration process.""" @@ -975,6 +929,7 @@ class LiveStream(ArFeatures.ArContext): # Request calibration json_data = self.__post_request('/api/calibrations', data) + # noinspection PyAttributeOutsideInit self.__calibration_id = json_data['ca_id'] # Start calibration @@ -990,6 +945,7 @@ class LiveStream(ArFeatures.ArContext): # Forget calibration id if status != 'calibrating': + # noinspection PyAttributeOutsideInit self.__calibration_id = None return status @@ -998,24 +954,6 @@ class LiveStream(ArFeatures.ArContext): raise Exception(f'Start calibration before') - def calibrate(self, project_name, participant_name): - """Handle whole Tobii glasses calibration process.""" - - # Start calibration - self.calibration_start(project_name, participant_name) - - # While calibrating... - status = self.calibration_status() - - while status == 'calibrating': - - time.sleep(1) - status = self.calibration_status() - - if status == 'uncalibrated' or status == 'stale' or status == 'failed': - - raise Exception(f'Calibration {status}') - # RECORDING FEATURES def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']): @@ -1086,7 +1024,7 @@ class LiveStream(ArFeatures.ArContext): return False def get_recordings(self) -> str: - """Get all recordings id.""" + """Get all recordings' id.""" return self.__get_request('/api/recordings') @@ -1185,7 +1123,7 @@ class PostProcessing(ArFeatures.ArContext): @DataFeatures.PipelineStepInit def __init__(self, **kwargs): - # Init ArContext classe + # Init ArContext class super().__init__() # Init private attributes @@ -1202,8 +1140,8 @@ class PostProcessing(ArFeatures.ArContext): 'Event': 0, 'Accelerometer': 0, 'Gyroscope': 0, - 'PupillCenter': 0, - 'PupillDiameter': 0, + 'PupilCenter': 0, + 'PupilDiameter': 0, 'GazeDirection': 0, 'GazePosition': 0, 'GazePosition3D': 0, |