aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/utils
diff options
context:
space:
mode:
authorThéo de la Hogue2024-04-09 00:18:55 +0200
committerThéo de la Hogue2024-04-09 00:18:55 +0200
commit3edf39d92d14d73f8afd0aaf7f04259cef7455d8 (patch)
treeb6a592a541253dce7d16d032e207d9123cf69648 /src/argaze/utils
parentad25274a8493b63c40cd5d59f59889214fcc8aa2 (diff)
downloadargaze-3edf39d92d14d73f8afd0aaf7f04259cef7455d8.zip
argaze-3edf39d92d14d73f8afd0aaf7f04259cef7455d8.tar.gz
argaze-3edf39d92d14d73f8afd0aaf7f04259cef7455d8.tar.bz2
argaze-3edf39d92d14d73f8afd0aaf7f04259cef7455d8.tar.xz
Using https url.
Diffstat (limited to 'src/argaze/utils')
-rw-r--r--src/argaze/utils/UtilsFeatures.py2
-rw-r--r--src/argaze/utils/aruco_markers_group_export.py283
-rw-r--r--src/argaze/utils/contexts/OpenCV.py2
-rw-r--r--src/argaze/utils/contexts/PupilLabs.py55
-rw-r--r--src/argaze/utils/contexts/TobiiProGlasses2.py187
-rw-r--r--src/argaze/utils/demo/recorders.py2
6 files changed, 263 insertions, 268 deletions
diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py
index 133809b..c04d20a 100644
--- a/src/argaze/utils/UtilsFeatures.py
+++ b/src/argaze/utils/UtilsFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/utils/aruco_markers_group_export.py b/src/argaze/utils/aruco_markers_group_export.py
index 46507b8..569ba6b 100644
--- a/src/argaze/utils/aruco_markers_group_export.py
+++ b/src/argaze/utils/aruco_markers_group_export.py
@@ -10,7 +10,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -27,199 +27,208 @@ from argaze import DataFeatures
from argaze.ArUcoMarkers import ArUcoDetector, ArUcoOpticCalibrator, ArUcoMarkersGroup
from argaze.utils import UtilsFeatures
-def main():
- """
- Detect DICTIONARY and SIZE ArUco markers inside a MOVIE frame then, export detected ArUco markers group as .obj file into an OUTPUT folder.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('movie', metavar='MOVIE', type=str, default=None, help='movie path')
- parser.add_argument('dictionary', metavar='DICTIONARY', type=str, default=None, help='expected ArUco markers dictionary')
- parser.add_argument('size', metavar='SIZE', type=float, default=None, help='expected ArUco markers size (in cm)')
-
- parser.add_argument('-p', '--parameters', metavar='PARAMETERS', type=str, default=None, help='ArUco detector parameters file')
- parser.add_argument('-op', '--optic_parameters', metavar='OPTIC_PARAMETERS', type=str, default=None, help='ArUco detector optic parameters file')
-
- parser.add_argument('-s', '--start', metavar='START', type=float, default=0., help='start time in second')
- parser.add_argument('-o', '--output', metavar='OUTPUT', type=str, default='.', help='export folder path')
- parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console')
- args = parser.parse_args()
-
- # Load movie
- video_capture = cv2.VideoCapture(args.movie)
+def main():
+ """
+ Detect DICTIONARY and SIZE ArUco markers inside a MOVIE frame then, export detected ArUco markers group as .obj file into an OUTPUT folder.
+ """
- video_fps = video_capture.get(cv2.CAP_PROP_FPS)
- image_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
- image_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
+ # Manage arguments
+ parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
+ parser.add_argument('movie', metavar='MOVIE', type=str, default=None, help='movie path')
+ parser.add_argument('dictionary', metavar='DICTIONARY', type=str, default=None,
+ help='expected ArUco markers dictionary')
+ parser.add_argument('size', metavar='SIZE', type=float, default=None, help='expected ArUco markers size (in cm)')
- # Edit ArUco detector configuration
- configuration = {
- "dictionary": args.dictionary
- }
+ parser.add_argument('-p', '--parameters', metavar='PARAMETERS', type=str, default=None,
+ help='ArUco detector parameters file')
+ parser.add_argument('-op', '--optic_parameters', metavar='OPTIC_PARAMETERS', type=str, default=None,
+ help='ArUco detector optic parameters file')
- if args.parameters:
+ parser.add_argument('-s', '--start', metavar='START', type=float, default=0., help='start time in second')
+ parser.add_argument('-o', '--output', metavar='OUTPUT', type=str, default='.', help='export folder path')
+ parser.add_argument('-v', '--verbose', action='store_true', default=False,
+ help='enable verbose mode to print information in console')
- configuration["parameters"] = args.parameters
+ args = parser.parse_args()
- if args.optic_parameters:
+ # Load movie
+ video_capture = cv2.VideoCapture(args.movie)
- configuration["optic_parameters"] = args.optic_parameters
+ video_fps = video_capture.get(cv2.CAP_PROP_FPS)
+ image_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
+ image_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
- # Load ArUco detector configuration
- aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
+ # Edit ArUco detector configuration
+ configuration = {
+ "dictionary": args.dictionary
+ }
- if args.verbose:
+ if args.parameters:
+ configuration["parameters"] = args.parameters
- print(aruco_detector)
+ if args.optic_parameters:
+ configuration["optic_parameters"] = args.optic_parameters
- # Create empty ArUco scene
- aruco_markers_group = None
+ # Load ArUco detector configuration
+ aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
- # Edit draw parameters
- draw_parameters = {
- "color": [255, 255, 255],
- "draw_axes": {
- "thickness": 4
- }
- }
+ if args.verbose:
+ print(aruco_detector)
- # Create a window
- cv2.namedWindow("Export detected ArUco markers", cv2.WINDOW_AUTOSIZE)
+ # Create empty ArUco scene
+ aruco_markers_group = None
- # Init image selection
- current_image_index = -1
- _, current_image = video_capture.read()
- next_image_index = int(args.start * video_fps)
- refresh = False
+ # Edit draw parameters
+ draw_parameters = {
+ "color": [255, 255, 255],
+ "draw_axes": {
+ "thickness": 4
+ }
+ }
- # Waiting for 'ctrl+C' interruption
- with contextlib.suppress(KeyboardInterrupt):
+ # Create a window
+ cv2.namedWindow("Export detected ArUco markers", cv2.WINDOW_AUTOSIZE)
- while True:
+ # Init image selection
+ current_image_index = -1
+ _, current_image = video_capture.read()
+ next_image_index = int(args.start * video_fps)
+ refresh = False
- # Select a new image and detect markers once
- if next_image_index != current_image_index or refresh:
+ # Waiting for 'ctrl+C' interruption
+ with contextlib.suppress(KeyboardInterrupt):
- video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index)
+ while True:
- success, video_image = video_capture.read()
+ # Select a new image and detect markers once
+ if next_image_index != current_image_index or refresh:
- video_height, video_width, _ = video_image.shape
+ video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index)
- # Create default optic parameters adapted to frame size
- if aruco_detector.optic_parameters is None:
+ success, video_image = video_capture.read()
- # Note: The choice of 1000 for default focal length should be discussed...
- aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=video_width, height=video_height))
+ video_height, video_width, _ = video_image.shape
- if success:
+ # Create default optic parameters adapted to frame size
+ if aruco_detector.optic_parameters is None:
+ # Note: The choice of 1000 for default focal length should be discussed...
+ aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(
+ video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.),
+ width=video_width, height=video_height))
- # Refresh once
- refresh = False
+ if success:
- current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1
- current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC)
+ # Refresh once
+ refresh = False
- try:
+ current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1
+ current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC)
- # Detect and project AR features
- aruco_detector.detect_markers(video_image)
+ try:
- # Estimate all detected markers pose
- aruco_detector.estimate_markers_pose(args.size)
+ # Detect and project AR features
+ aruco_detector.detect_markers(video_image)
- # Build aruco scene from detected markers
- aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, aruco_detector.detected_markers())
+ # Estimate all detected markers pose
+ aruco_detector.estimate_markers_pose(args.size)
- # Detection succeeded
- exception = None
+ # Build aruco scene from detected markers
+ aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary,
+ aruco_detector.detected_markers())
- # Write errors
- except Exception as e:
+ # Detection succeeded
+ exception = None
- aruco_markers_group = None
+ # Write errors
+ except Exception as e:
- exception = e
+ aruco_markers_group = None
- # Draw detected markers
- aruco_detector.draw_detected_markers(video_image, draw_parameters)
+ exception = e
- # Write detected markers
- cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', (20, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ # Draw detected markers
+ aruco_detector.draw_detected_markers(video_image, draw_parameters)
- # Write timing
- cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ # Write detected markers
+ cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}',
+ (20, video_height - 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
- # Write exception
- if exception is not None:
+ # Write timing
+ cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ # Write exception
+ if exception is not None:
+ cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1,
+ (0, 255, 255), 1, cv2.LINE_AA)
- # Write documentation
- cv2.putText(video_image, f'<- previous image', (video_width-500, video_height-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'-> next image', (video_width-500, video_height-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'r: reload config', (video_width-500, video_height-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width-500, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ # Write documentation
+ cv2.putText(video_image, f'<- previous image', (video_width - 500, video_height - 160),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(video_image, f'-> next image', (video_width - 500, video_height - 120),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(video_image, f'r: reload config', (video_width - 500, video_height - 80),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width - 500, video_height - 40),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- # Copy image
- current_image = video_image.copy()
+ # Copy image
+ current_image = video_image.copy()
- # Keep last image
- else:
+ # Keep last image
+ else:
- video_image = current_image.copy()
+ video_image = current_image.copy()
- key_pressed = cv2.waitKey(10)
+ key_pressed = cv2.waitKey(10)
- #if key_pressed != -1:
- # print(key_pressed)
+ #if key_pressed != -1:
+ # print(key_pressed)
- # Select previous image with left arrow
- if key_pressed == 2:
- next_image_index -= 1
+ # Select previous image with left arrow
+ if key_pressed == 2:
+ next_image_index -= 1
- # Select next image with right arrow
- if key_pressed == 3:
- next_image_index += 1
+ # Select next image with right arrow
+ if key_pressed == 3:
+ next_image_index += 1
- # Clip image index
- if next_image_index < 0:
- next_image_index = 0
+ # Clip image index
+ if next_image_index < 0:
+ next_image_index = 0
- # r: reload configuration
- if key_pressed == 114:
+ # r: reload configuration
+ if key_pressed == 114:
+ aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
+ refresh = True
+ print('Configuration reloaded')
- aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
- refresh = True
- print('Configuration reloaded')
+ # Save selected marker edition using 'Ctrl + s'
+ if key_pressed == 19:
- # Save selected marker edition using 'Ctrl + s'
- if key_pressed == 19:
+ if aruco_markers_group:
- if aruco_markers_group:
+ aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj')
+ print(f'ArUco markers saved into {args.output}')
- aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj')
- print(f'ArUco markers saved into {args.output}')
+ else:
- else:
+ print(f'No ArUco markers to export')
- print(f'No ArUco markers to export')
+ # Close window using 'Esc' key
+ if key_pressed == 27:
+ break
- # Close window using 'Esc' key
- if key_pressed == 27:
- break
+ # Display video
+ cv2.imshow(aruco_detector.name, video_image)
- # Display video
- cv2.imshow(aruco_detector.name, video_image)
+ # Close movie capture
+ video_capture.release()
- # Close movie capture
- video_capture.release()
+ # Stop image display
+ cv2.destroyAllWindows()
- # Stop image display
- cv2.destroyAllWindows()
if __name__ == '__main__':
-
- main() \ No newline at end of file
+ main()
diff --git a/src/argaze/utils/contexts/OpenCV.py b/src/argaze/utils/contexts/OpenCV.py
index f89189d..20be1a4 100644
--- a/src/argaze/utils/contexts/OpenCV.py
+++ b/src/argaze/utils/contexts/OpenCV.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/utils/contexts/PupilLabs.py b/src/argaze/utils/contexts/PupilLabs.py
index 43fe47e..d814deb 100644
--- a/src/argaze/utils/contexts/PupilLabs.py
+++ b/src/argaze/utils/contexts/PupilLabs.py
@@ -1,5 +1,5 @@
"""Handle network connection to Pupil Labs devices. Tested with Pupil Invisible.
- Based on Pupil Labs' Realtime Python API.
+ Based on Pupil Labs' Realtime Python API.
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Damien Mouratille"
@@ -31,6 +31,7 @@ import cv2
from pupil_labs.realtime_api.simple import discover_one_device
+
class LiveStream(ArFeatures.ArContext):
@DataFeatures.PipelineStepInit
@@ -59,75 +60,71 @@ class LiveStream(ArFeatures.ArContext):
logging.info('Device found. Stream loading.')
# Open gaze stream
- self.__gaze_thread = threading.Thread(target = self.__stream_gaze)
+ self.__gaze_thread = threading.Thread(target=self.__stream_gaze)
logging.debug('> starting gaze thread...')
self.__gaze_thread.start()
-
+
# Open video stream
- self.__video_thread = threading.Thread(target = self.__stream_video)
+ self.__video_thread = threading.Thread(target=self.__stream_video)
logging.debug('> starting video thread...')
self.__video_thread.start()
-
-
return self
-
def __stream_gaze(self):
"""Stream gaze."""
logging.debug('Stream gaze from Pupil Device')
while not self.__stop_event.is_set():
-
+
try:
while True:
gaze = self.__device.receive_gaze_datum()
gaze_timestamp = int((gaze.timestamp_unix_seconds - self.__start_time) * 1e3)
-
+
logging.debug('Gaze received at %i timestamp', gaze_timestamp)
-
+
# When gaze position is valid
if gaze.worn is True:
-
+
self._process_gaze_position(
- timestamp = gaze_timestamp,
- x = int(gaze.x),
- y = int(gaze.y))
+ timestamp=gaze_timestamp,
+ x=int(gaze.x),
+ y=int(gaze.y))
else:
# Process empty gaze position
logging.debug('Not worn at %i timestamp', gaze_timestamp)
- self._process_gaze_position(timestamp = gaze_timestamp)
-
+ self._process_gaze_position(timestamp=gaze_timestamp)
+
except KeyboardInterrupt:
pass
-
-
+
def __stream_video(self):
"""Stream video."""
logging.debug('Stream video from Pupil Device')
while not self.__stop_event.is_set():
-
+
try:
while True:
scene_frame, frame_datetime = self.__device.receive_scene_video_frame()
-
+
scene_timestamp = int((frame_datetime - self.__start_time) * 1e3)
-
+
logging.debug('Video received at %i timestamp', scene_timestamp)
-
+
self._process_camera_image(
- timestamp = scene_timestamp,
- image = scene_frame)
-
+ timestamp=scene_timestamp,
+ image=scene_frame)
+
except KeyboardInterrupt:
pass
@@ -135,10 +132,10 @@ class LiveStream(ArFeatures.ArContext):
def __exit__(self, exception_type, exception_value, exception_traceback):
logging.debug('Pupil-Labs context stops...')
-
+
# Close data stream
self.__stop_event.set()
-
+
# Stop streaming
threading.Thread.join(self.__gaze_thread)
- threading.Thread.join(self.__video_thread) \ No newline at end of file
+ threading.Thread.join(self.__video_thread)
diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py
index 0fba2ff..0c2b8f9 100644
--- a/src/argaze/utils/contexts/TobiiProGlasses2.py
+++ b/src/argaze/utils/contexts/TobiiProGlasses2.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -75,13 +75,15 @@ DEFAULT_TOBII_IMAGE_PARAMETERS = {
"draw_something": False
}
+
# Define extra classes to support Tobii data parsing
@dataclass
class DirSig():
"""Define dir sig data (dir sig)."""
- dir: int # meaning ?
- sig: int # meaning ?
+ dir: int # meaning ?
+ sig: int # meaning ?
+
@dataclass
class PresentationTimeStamp():
@@ -90,6 +92,7 @@ class PresentationTimeStamp():
value: int
"""Pts value."""
+
@dataclass
class VideoTimeStamp():
"""Define video time stamp (vts) data."""
@@ -100,20 +103,23 @@ class VideoTimeStamp():
offset: int
"""Primary time stamp value."""
+
@dataclass
class EventSynch():
"""Define event synch (evts) data."""
- value: int # meaning ?
+ value: int # meaning ?
"""Evts value."""
+
@dataclass
class Event():
"""Define event data (ets type tag)."""
- ets: int # meaning ?
+ ets: int # meaning ?
type: str
- tag: str # dict ?
+ tag: str # dict ?
+
@dataclass
class Accelerometer():
@@ -122,6 +128,7 @@ class Accelerometer():
value: numpy.array
"""Accelerometer value"""
+
@dataclass
class Gyroscope():
"""Define gyroscope data (gy)."""
@@ -129,6 +136,7 @@ class Gyroscope():
value: numpy.array
"""Gyroscope value"""
+
@dataclass
class PupilCenter():
"""Define pupil center data (gidx pc eye)."""
@@ -136,7 +144,8 @@ class PupilCenter():
validity: int
index: int
value: tuple[(float, float, float)]
- eye: str # 'right' or 'left'
+ eye: str # 'right' or 'left'
+
@dataclass
class PupilDiameter():
@@ -145,7 +154,8 @@ class PupilDiameter():
validity: int
index: int
value: float
- eye: str # 'right' or 'left'
+ eye: str # 'right' or 'left'
+
@dataclass
class GazeDirection():
@@ -154,7 +164,8 @@ class GazeDirection():
validity: int
index: int
value: tuple[(float, float, float)]
- eye: str # 'right' or 'left'
+ eye: str # 'right' or 'left'
+
@dataclass
class GazePosition():
@@ -162,9 +173,10 @@ class GazePosition():
validity: int
index: int
- l: str # ?
+ l: str # ?
value: tuple[(float, float)]
+
@dataclass
class GazePosition3D():
"""Define gaze position 3D data (gidx gp3)."""
@@ -173,6 +185,7 @@ class GazePosition3D():
index: int
value: tuple[(float, float)]
+
@dataclass
class MarkerPosition():
"""Define marker data (marker3d marker2d)."""
@@ -180,6 +193,7 @@ class MarkerPosition():
value_3d: tuple[(float, float, float)]
value_2d: tuple[(float, float)]
+
class TobiiJsonDataParser():
def __init__(self):
@@ -319,6 +333,7 @@ class TobiiJsonDataParser():
return MarkerPosition(data['marker3d'], data['marker2d'])
+
class LiveStream(ArFeatures.ArContext):
@DataFeatures.PipelineStepInit
@@ -343,14 +358,14 @@ class LiveStream(ArFeatures.ArContext):
# Init protected attributes
self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS}
-
+
@property
def address(self) -> str:
"""Network address where to find the device."""
return self.__address
@address.setter
- def address(self, address:str):
+ def address(self, address: str):
self.__address = address
@@ -358,7 +373,6 @@ class LiveStream(ArFeatures.ArContext):
if "%" in self.__address:
if sys.platform == "win32":
-
self.__address = self.__address.split("%")[0]
# Define base url
@@ -372,7 +386,7 @@ class LiveStream(ArFeatures.ArContext):
self.__base_url = 'http://' + self.__address
@property
- def configuration(self)-> dict:
+ def configuration(self) -> dict:
"""Patch system configuration dictionary."""
return self.__configuration
@@ -388,15 +402,14 @@ class LiveStream(ArFeatures.ArContext):
return self.__project_name
@project.setter
- def project(self, project:str):
-
+ def project(self, project: str):
+
self.__project_name = project
def __bind_project(self):
"""Bind to a project or create one if it doesn't exist."""
if self.__project_name is None:
-
raise Exception(f'Project binding fails: setup project before.')
self.__project_id = None
@@ -409,7 +422,6 @@ class LiveStream(ArFeatures.ArContext):
try:
if project['pr_info']['Name'] == self.__project_name:
-
self.__project_id = project['pr_id']
logging.debug('> %s project already exist: %s', self.__project_name, self.__project_id)
@@ -420,13 +432,12 @@ class LiveStream(ArFeatures.ArContext):
# The project doesn't exist, create one
if self.__project_id is None:
-
logging.debug('> %s project doesn\'t exist', self.__project_name)
data = {
- 'pr_info' : {
+ 'pr_info': {
'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD),
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)),
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)),
'Name': self.__project_name
},
'pr_created': self.__get_current_datetime()
@@ -439,12 +450,12 @@ class LiveStream(ArFeatures.ArContext):
logging.debug('> new %s project created: %s', self.__project_name, self.__project_id)
@property
- def participant(self)-> str:
+ def participant(self) -> str:
"""Participant name"""
return self.__participant_name
@participant.setter
- def participant(self, participant:str):
+ def participant(self, participant: str):
self.__participant_name = participant
@@ -456,13 +467,11 @@ class LiveStream(ArFeatures.ArContext):
"""
if self.__participant_name is None:
-
raise Exception(f'Participant binding fails: setup participant before.')
- if self.__project_id is None :
-
+ if self.__project_id is None:
raise Exception(f'Participant binding fails: bind to a project before')
-
+
self.__participant_id = None
# Check if participant exist
@@ -473,7 +482,6 @@ class LiveStream(ArFeatures.ArContext):
try:
if participant['pa_info']['Name'] == self.__participant_name:
-
self.__participant_id = participant['pa_id']
logging.debug('> %s participant already exist: %s', self.__participant_name, self.__participant_id)
@@ -484,15 +492,14 @@ class LiveStream(ArFeatures.ArContext):
# The participant doesn't exist, create one
if self.__participant_id is None:
-
logging.debug('> %s participant doesn\'t exist', self.__participant_name)
data = {
'pa_project': self.__project_id,
- 'pa_info': {
+ 'pa_info': {
'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__participant_name)),
'Name': self.__participant_name,
- 'Notes': '' # TODO: set participant notes
+ 'Notes': '' # TODO: set participant notes
},
'pa_created': self.__get_current_datetime()
}
@@ -507,7 +514,7 @@ class LiveStream(ArFeatures.ArContext):
def __enter__(self):
logging.info('Tobii Pro Glasses 2 connexion starts...')
-
+
# Update current configuration with configuration patch
logging.debug('> updating configuration')
@@ -527,7 +534,6 @@ class LiveStream(ArFeatures.ArContext):
logging.info('Tobii Pro Glasses 2 configuration:')
for key, value in configuration.items():
-
logging.info('%s: %s', key, str(value))
# Store video stream info
@@ -546,7 +552,6 @@ class LiveStream(ArFeatures.ArContext):
# Bind to participant if required
if self.__participant_name is not None:
-
logging.debug('> binding participant %s', self.__participant_name)
self.__bind_participant()
@@ -558,21 +563,22 @@ class LiveStream(ArFeatures.ArContext):
# Open data stream
self.__data_socket = self.__make_socket()
- self.__data_thread = threading.Thread(target = self.__stream_data)
+ self.__data_thread = threading.Thread(target=self.__stream_data)
logging.debug('> starting data thread...')
self.__data_thread.start()
# Open video stream
self.__video_socket = self.__make_socket()
- self.__video_thread = threading.Thread(target = self.__stream_video)
+ self.__video_thread = threading.Thread(target=self.__stream_video)
logging.debug('> starting video thread...')
self.__video_thread.start()
# Keep connection alive
- self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
- self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
+ self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \"" + str(
+ uuid.uuid4()) + "\", \"op\": \"start\"}"
+ self.__keep_alive_thread = threading.Thread(target=self.__keep_alive)
logging.debug('> starting keep alive thread...')
self.__keep_alive_thread.start()
@@ -583,7 +589,7 @@ class LiveStream(ArFeatures.ArContext):
def __exit__(self, exception_type, exception_value, exception_traceback):
logging.debug('%s.__exit__', DataFeatures.get_class_path(self))
-
+
# Close data stream
self.__stop_event.set()
@@ -612,7 +618,6 @@ class LiveStream(ArFeatures.ArContext):
image = super().image(**kwargs)
if draw_something:
-
cv2.putText(image, 'SOMETHING', (512, 512), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
return image
@@ -623,10 +628,10 @@ class LiveStream(ArFeatures.ArContext):
iptype = socket.AF_INET
if ':' in self.__address:
-
iptype = socket.AF_INET6
- res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)
+ res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0,
+ socket.AI_PASSIVE)
family, socktype, proto, canonname, sockaddr = res[0]
new_socket = socket.socket(family, socktype, proto)
@@ -635,13 +640,11 @@ class LiveStream(ArFeatures.ArContext):
try:
if iptype == socket.AF_INET6:
-
new_socket.setsockopt(socket.SOL_SOCKET, 25, 1)
except socket.error as e:
if e.errno == 1:
-
logging.error('Binding to a network interface is permitted only for root users.')
return new_socket
@@ -672,7 +675,6 @@ class LiveStream(ArFeatures.ArContext):
# Store first timestamp
if first_ts == 0:
-
first_ts = data_ts
# Edit millisecond timestamp
@@ -689,15 +691,15 @@ class LiveStream(ArFeatures.ArContext):
# Process timestamped gaze position
self._process_gaze_position(
- timestamp = timestamp,
- x = int(data_object.value[0] * self.__video_width),
- y = int(data_object.value[1] * self.__video_height) )
+ timestamp=timestamp,
+ x=int(data_object.value[0] * self.__video_width),
+ y=int(data_object.value[1] * self.__video_height))
else:
# Process empty gaze position
- self._process_gaze_position(timestamp = timestamp)
-
+ self._process_gaze_position(timestamp=timestamp)
+
def __stream_video(self):
"""Stream video from dedicated socket."""
@@ -712,7 +714,7 @@ class LiveStream(ArFeatures.ArContext):
self.__video_buffer_lock = threading.Lock()
# Open video buffer reader
- self.__video_buffer_read_thread = threading.Thread(target = self.__video_buffer_read)
+ self.__video_buffer_read_thread = threading.Thread(target=self.__video_buffer_read)
logging.debug('> starting video buffer reader thread...')
self.__video_buffer_read_thread.start()
@@ -726,7 +728,6 @@ class LiveStream(ArFeatures.ArContext):
# Quit if the video acquisition thread have been stopped
if self.__stop_event.is_set():
-
logging.debug('> stop event is set')
break
@@ -736,7 +737,6 @@ class LiveStream(ArFeatures.ArContext):
# Store first timestamp
if first_ts == 0:
-
first_ts = image.time
# Edit millisecond timestamp
@@ -762,7 +762,6 @@ class LiveStream(ArFeatures.ArContext):
# Can't read image while it is locked
while self.__video_buffer_lock.locked():
-
# Check 10 times per frame
time.sleep(1 / (10 * self.__video_fps))
@@ -782,7 +781,6 @@ class LiveStream(ArFeatures.ArContext):
logging.debug('> read image at %i timestamp', timestamp)
if len(self.__video_buffer) > 0:
-
logging.warning('skipping %i image', len(self.__video_buffer))
# Clear buffer
@@ -790,9 +788,9 @@ class LiveStream(ArFeatures.ArContext):
# Process camera image
self._process_camera_image(
- timestamp = timestamp,
- image = image)
-
+ timestamp=timestamp,
+ image=image)
+
except Exception as e:
logging.warning('%s.__video_buffer_read: %s', DataFeatures.get_class_path(self), e)
@@ -806,7 +804,6 @@ class LiveStream(ArFeatures.ArContext):
logging.debug('%s.__keep_alive', DataFeatures.get_class_path(self))
while not self.__stop_event.is_set():
-
self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
@@ -833,7 +830,7 @@ class LiveStream(ArFeatures.ArContext):
return data
- def __post_request(self, api_action, data = None, wait_for_response = True) -> any:
+ def __post_request(self, api_action, data=None, wait_for_response=True) -> any:
"""Send a POST request and get result back."""
url = self.__base_url + api_action
@@ -845,7 +842,6 @@ class LiveStream(ArFeatures.ArContext):
data = json.dumps(data)
if wait_for_response is False:
-
threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start()
return None
@@ -863,7 +859,7 @@ class LiveStream(ArFeatures.ArContext):
return res
- def __wait_for_status(self, api_action, key, values, timeout = None) -> any:
+ def __wait_for_status(self, api_action, key, values, timeout=None) -> any:
"""Wait until a status matches given values."""
url = self.__base_url + api_action
@@ -875,8 +871,8 @@ class LiveStream(ArFeatures.ArContext):
req.add_header('Content-Type', 'application/json')
try:
-
- response = urlopen(req, None, timeout = timeout)
+
+ response = urlopen(req, None, timeout=timeout)
except URLError as e:
@@ -910,12 +906,10 @@ class LiveStream(ArFeatures.ArContext):
status = self.calibration_status()
while status == 'calibrating':
-
time.sleep(1)
status = self.calibration_status()
if status == 'uncalibrated' or status == 'stale' or status == 'failed':
-
raise Exception(f'Calibration {status}')
# CALIBRATION
@@ -931,11 +925,10 @@ class LiveStream(ArFeatures.ArContext):
# Calibration have to be done for a project and a participant
if project_id is None or participant_id is None:
-
raise Exception(f'Setup project and participant before')
data = {
- 'ca_project': project_id,
+ 'ca_project': project_id,
'ca_type': 'default',
'ca_participant': participant_id,
'ca_created': self.__get_current_datetime()
@@ -954,11 +947,11 @@ class LiveStream(ArFeatures.ArContext):
if self.__calibration_id is not None:
- status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
+ status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state',
+ ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
# Forget calibration id
if status != 'calibrating':
-
# noinspection PyAttributeOutsideInit
self.__calibration_id = None
@@ -970,10 +963,12 @@ class LiveStream(ArFeatures.ArContext):
# RECORDING FEATURES
- def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']):
+ def __wait_for_recording_status(self, recording_id,
+ status_array=['init', 'starting', 'recording', 'pausing', 'paused', 'stopping',
+ 'stopped', 'done', 'stale', 'failed']):
return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array)
- def create_recording(self, participant_name, recording_name = '', recording_notes = '') -> str:
+ def create_recording(self, participant_name, recording_name='', recording_notes='') -> str:
"""Create a new recording.
Returns:
@@ -1001,7 +996,7 @@ class LiveStream(ArFeatures.ArContext):
def start_recording(self, recording_id) -> bool:
"""Start recording on the Tobii interface's SD Card."""
-
+
self.__post_request('/api/recordings/' + recording_id + '/start')
return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording'
@@ -1044,14 +1039,14 @@ class LiveStream(ArFeatures.ArContext):
# EVENTS AND EXPERIMENTAL VARIABLES
- def __post_recording_data(self, event_type: str, event_tag = ''):
+ def __post_recording_data(self, event_type: str, event_tag=''):
data = {'type': event_type, 'tag': event_tag}
self.__post_request('/api/events', data, wait_for_response=False)
- def send_event(self, event_type: str, event_value = None):
+ def send_event(self, event_type: str, event_value=None):
self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value))
- def send_variable(self, variable_name: str, variable_value = None):
+ def send_variable(self, variable_name: str, variable_value=None):
self.__post_recording_data(str(variable_name), str(variable_value))
# MISC
@@ -1060,7 +1055,8 @@ class LiveStream(ArFeatures.ArContext):
self.__get_request('/api/eject')
def get_battery_info(self):
- return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) )
+ return ("Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (
+ float(self.get_battery_level()), float(self.get_battery_remaining_time())))
def get_battery_level(self):
return self.get_battery_status()['level']
@@ -1087,7 +1083,7 @@ class LiveStream(ArFeatures.ArContext):
return self.__get_request('/api/system/status')
def get_storage_info(self):
- return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) )
+ return ("Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()))
def get_storage_remaining_time(self):
return self.get_storage_status()['remaining_time']
@@ -1166,7 +1162,7 @@ class PostProcessing(ArFeatures.ArContext):
# Init protected attributes
self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS}
-
+
@property
def segment(self) -> str:
"""Path to segment folder."""
@@ -1202,7 +1198,7 @@ class PostProcessing(ArFeatures.ArContext):
# Read segment info
with open(os.path.join(self.__segment, TOBII_SEGMENT_INFO_FILENAME)) as info_file:
-
+
try:
info = json.load(info_file)
@@ -1212,10 +1208,10 @@ class PostProcessing(ArFeatures.ArContext):
raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}')
# Constrain reading dates
- self.__end = min(self.__end, int(info["seg_length"] * 1e3)) if self.__end != None else int(info["seg_length"] * 1e3)
+ self.__end = min(self.__end, int(info["seg_length"] * 1e3)) if self.__end != None else int(
+ info["seg_length"] * 1e3)
if self.__start >= self.__end:
-
raise ValueError('Start reading timestamp is equal or greater than end reading timestamp.')
# TODO: log various info
@@ -1227,7 +1223,7 @@ class PostProcessing(ArFeatures.ArContext):
self.__stop_event = threading.Event()
# Open reading thread
- self.__reading_thread = threading.Thread(target = self.__read)
+ self.__reading_thread = threading.Thread(target=self.__read)
logging.debug('> starting reading thread...')
self.__reading_thread.start()
@@ -1236,7 +1232,7 @@ class PostProcessing(ArFeatures.ArContext):
def __exit__(self, exception_type, exception_value, exception_traceback):
logging.debug('%s.__exit__', DataFeatures.get_class_path(self))
-
+
# Close data stream
self.__stop_event.set()
@@ -1249,15 +1245,14 @@ class PostProcessing(ArFeatures.ArContext):
for video_ts, video_image, data_list in self:
if self.__stop_event.is_set():
-
break
logging.debug('> read image at %i timestamp', video_ts)
# Process camera image
self._process_camera_image(
- timestamp = video_ts,
- image = video_image)
+ timestamp=video_ts,
+ image=video_image)
height, width, _ = video_image.shape
@@ -1277,14 +1272,14 @@ class PostProcessing(ArFeatures.ArContext):
# Process timestamped gaze position
self._process_gaze_position(
- timestamp = data_ts,
- x = int(data_object.value[0] * width),
- y = int(data_object.value[1] * height) )
+ timestamp=data_ts,
+ x=int(data_object.value[0] * width),
+ y=int(data_object.value[1] * height))
else:
# Process empty gaze position
- self._process_gaze_position(timestamp = data_ts)
+ self._process_gaze_position(timestamp=data_ts)
def __iter__(self):
@@ -1304,7 +1299,6 @@ class PostProcessing(ArFeatures.ArContext):
next_data_ts, next_data_object, next_data_object_type = self.__next_data()
while next_data_ts < next_video_ts:
-
data_list.append((next_data_ts, next_data_object, next_data_object_type))
next_data_ts, next_data_object, next_data_object_type = self.__next_data()
@@ -1321,14 +1315,12 @@ class PostProcessing(ArFeatures.ArContext):
# Ignore before start timestamp
if ts < self.__start:
-
return self.__next__()
# Ignore images after end timestamp
if self.__end != None:
if ts >= self.__end:
-
raise StopIteration
# Return millisecond timestamp and image
@@ -1337,7 +1329,7 @@ class PostProcessing(ArFeatures.ArContext):
def __next_data(self):
data = json.loads(next(self.__data_file).decode('utf-8'))
-
+
# Parse data status
status = data.pop('s', -1)
@@ -1357,7 +1349,6 @@ class PostProcessing(ArFeatures.ArContext):
# Ignore data before first vts entry
if self.__vts_ts == -1:
-
return self.__next_data()
ts -= self.__vts_ts
@@ -1365,15 +1356,13 @@ class PostProcessing(ArFeatures.ArContext):
# Ignore timestamps out of the given time range
if ts < self.__start * 1e3:
-
return self.__next_data()
if ts >= self.__end * 1e3:
-
raise StopIteration
# Parse data
data_object, data_object_type = self.__parser.parse_data(status, data)
# Return millisecond timestamp, data object and type
- return ts * 1e-3, data_object, data_object_type \ No newline at end of file
+ return ts * 1e-3, data_object, data_object_type
diff --git a/src/argaze/utils/demo/recorders.py b/src/argaze/utils/demo/recorders.py
index 0debc12..679e6f7 100644
--- a/src/argaze/utils/demo/recorders.py
+++ b/src/argaze/utils/demo/recorders.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"