diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/argaze/ArFeatures.py | 205 | ||||
-rw-r--r-- | src/argaze/ArUcoMarkers/ArUcoCamera.py | 83 | ||||
-rw-r--r-- | src/argaze/ArUcoMarkers/ArUcoScene.py | 2 | ||||
-rw-r--r-- | src/argaze/DataFeatures.py | 4 | ||||
-rw-r--r-- | src/argaze/__main__.py | 58 | ||||
-rw-r--r-- | src/argaze/utils/UtilsFeatures.py | 58 | ||||
-rw-r--r-- | src/argaze/utils/contexts/TobiiProGlasses2.py | 142 |
7 files changed, 474 insertions, 78 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index da56cce..a3ce1e3 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -19,6 +19,7 @@ __license__ = "GPLv3" import logging import math import os +import ast from typing import Iterator, Union import cv2 @@ -1032,25 +1033,11 @@ class ArScene(DataFeatures.PipelineStepObject): for name, layer in self._layers.items(): - # Clip AOI out of the visual horizontal field of view (optional) - # TODO: use HFOV and VFOV and don't use vision_cone method - if visual_hfov > 0: - - # Transform layer aoi scene into camera referential - aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec) - - # Get aoi inside vision cone field - cone_vision_height_cm = 200 # cm - cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm - - _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) - - # Keep only aoi inside vision cone field - aoi_scene_copy = layer.aoi_scene.copy(exclude=aoi_outside.keys()) - - else: - - aoi_scene_copy = layer.aoi_scene.copy() + # TODO: if greater than 0., use HFOV and VFOV + # to clip AOI out of the visual horizontal field of view + + # Copy aoi scene before projection + aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene # noinspection PyUnresolvedReferences @@ -1072,6 +1059,10 @@ class ArCamera(ArFrame): # Init private attributes self.__visual_hfov = 0. self.__visual_vfov = 0. + self.__projection_cache = None + self.__projection_cache_writer = None + self.__projection_cache_reader = None + self.__projection_cache_data = None # Init protected attributes self._scenes = {} @@ -1133,6 +1124,132 @@ class ArCamera(ArFrame): """Set camera's visual vertical field of view.""" self.__visual_vfov = value + @property + def projection_cache(self) -> str: + """file path to store/read layers projections into/from a cache.""" + return self.__projection_cache + + @projection_cache.setter + def projection_cache(self, projection_cache: str): + + self.__projection_cache = projection_cache + + # The file doesn't exist yet: store projections into the cache + if not os.path.exists(os.path.join( DataFeatures.get_working_directory(), self.__projection_cache) ): + + self.__projection_cache_writer = UtilsFeatures.FileWriter(path=self.__projection_cache) + self.__projection_cache_reader = None + + logging.info('ArCamera %s writes projection into %s', self.name, self.__projection_cache) + + # The file exist: read projection from the cache + else: + + self.__projection_cache_writer = None + self.__projection_cache_reader = UtilsFeatures.FileReader(path=self.__projection_cache) + + logging.info('ArCamera %s reads projection from %s', self.name, self.__projection_cache) + + def _clear_projection(self): + """Clear layers projection.""" + + logging.debug('ArCamera._clear_projection %s', self.name) + + for layer_name, layer in self.layers.items(): + + # Initialize layer if needed + if layer.aoi_scene is None: + + layer.aoi_scene = AOI2DScene.AOI2DScene() + + else: + + layer.aoi_scene.clear() + + def _write_projection_cache(self, timestamp: int|float, exception = None): + """Write layers aoi scene into the projection cache. + + Parameters: + timestamp: cache time + """ + + if self.__projection_cache_writer is not None: + + logging.debug('ArCamera._write_projection_cache %s %f', self.name, timestamp) + + if exception is None: + + projection = {} + + for layer_name, layer in self.layers.items(): + + projection[layer_name] = layer.aoi_scene + + self.__projection_cache_writer.write( (timestamp, projection) ) + + else: + + self.__projection_cache_writer.write( (timestamp, exception) ) + + def _read_projection_cache(self, timestamp: int|float): + """Read layers aoi scene from the projection cache. + + Parameters: + timestamp: cache time. + + Returns: + success: False if there is no projection cache, True otherwise. + """ + + if self.__projection_cache_reader is None: + + return False + + logging.debug('ArCamera._read_projection_cache %s %f', self.name, timestamp) + + # Clear former projection + self._clear_projection() + + try: + + # Read first data if not done yet + if self.__projection_cache_data is None: + + self.__projection_cache_data = self.__projection_cache_reader.read() + + # Continue reading cache until correct timestamped projection + while float(self.__projection_cache_data[0]) < timestamp: + + self.__projection_cache_data = self.__projection_cache_reader.read() + + # No more projection in the cache + except EOFError: + + raise DataFeatures.TimestampedException("Projection cache is empty", timestamp=timestamp) + + # Correct timestamped projection is found + if float(self.__projection_cache_data[0]) == timestamp: + + # When correct timestamped projection is found + projection = {} + + try: + + projection = ast.literal_eval(self.__projection_cache_data[1]) + + for layer_name, aoi_scene in projection.items(): + + self._layers[layer_name].aoi_scene = AOI2DScene.AOI2DScene(aoi_scene) + self._layers[layer_name].timestamp = timestamp + + logging.debug('> reading %s projection from cache', layer_name) + + except SyntaxError as e: + + raise DataFeatures.TimestampedException(self.__projection_cache_data[1], timestamp=timestamp) + + return True + def scene_frames(self) -> Iterator[ArFrame]: """Iterate over all scenes frames""" @@ -1153,6 +1270,28 @@ class ArCamera(ArFrame): "visual_vfov": self.__visual_vfov } + @DataFeatures.PipelineStepEnter + def __enter__(self): + + if self.__projection_cache_writer is not None: + + self.__projection_cache_writer.__enter__() + + if self.__projection_cache_reader is not None: + + self.__projection_cache_reader.__enter__() + + @DataFeatures.PipelineStepExit + def __exit__(self, exception_type, exception_value, exception_traceback): + + if self.__projection_cache_writer is not None: + + self.__projection_cache_writer.__exit__(exception_type, exception_value, exception_traceback) + + if self.__projection_cache_reader is not None: + + self.__projection_cache_reader.__exit__(exception_type, exception_value, exception_traceback) + def _update_expected_and_excluded_aoi(self): """Edit expected aoi of each layer aoi scan path with the aoi of corresponding scene layer. Edit excluded aoi to ignore frame aoi from aoi matching. @@ -1429,8 +1568,7 @@ class ArContext(DataFeatures.PipelineStepObject): # Compare image size with ArCamera frame size if list(image.shape[0:2][::-1]) != self.__pipeline.size: - logging.warning( - '%s._process_camera_image: image size (%i x %i) is different of ArCamera frame size (%i x %i)', + logging.warning('%s._process_camera_image: image size (%i x %i) is different of ArCamera frame size (%i x %i)', DataFeatures.get_class_path(self), width, height, self.__pipeline.size[0], self.__pipeline.size[1]) return @@ -1469,11 +1607,14 @@ class ArContext(DataFeatures.PipelineStepObject): logging.debug('\t> get image (%i x %i)', width, height) + last_position = self.__pipeline.last_gaze_position() + info_stack = 0 if draw_times: if image.is_timestamped(): + info_stack += 1 cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) @@ -1490,6 +1631,11 @@ class ArContext(DataFeatures.PipelineStepObject): info_stack += 1 cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + if last_position is not None: + + info_stack += 1 + cv2.putText(image, f'Position at {last_position.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + if issubclass(type(self.__pipeline), ArFrame): try: @@ -1514,3 +1660,20 @@ class ArContext(DataFeatures.PipelineStepObject): cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) return image + + @DataFeatures.PipelineStepMethod + def pause(self): + """Pause pipeline processing.""" + + raise NotImplementedError('pause() method not implemented') + + def is_paused(self) -> bool: + """Is pipeline processing paused?""" + + raise NotImplementedError('is_paused() method not implemented') + + @DataFeatures.PipelineStepMethod + def resume(self): + """Resume pipeline processing.""" + + raise NotImplementedError('resume() method not implemented') diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index 95df135..5b535b5 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -79,12 +79,7 @@ class ArUcoCamera(ArFeatures.ArCamera): # Create default optic parameters adapted to frame size # Note: The choice of 1000 for default focal length should be discussed... - self.__aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=self.size, - K=ArUcoOpticCalibrator.K0( - focal_length=( - 1000., 1000.), - width=self.size[0], - height=self.size[1])) + self.__aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=self.size, K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=self.size[0], height=self.size[1])) # Edit parent if self.__aruco_detector is not None: @@ -134,64 +129,70 @@ class ArUcoCamera(ArFeatures.ArCamera): cv2.rectangle(image, (0, 0), (self.__sides_mask, height), (0, 0, 0), -1) cv2.rectangle(image, (width - self.__sides_mask, 0), (width, height), (0, 0, 0), -1) - # Detect aruco markers - logging.debug('\t> detect markers') - - self.__aruco_detector.detect_markers(image) - # Fill camera frame background with timestamped image self.background = image - # Clear former layers projection into camera frame - for layer_name, layer in self.layers.items(): + # Read projection from the cache if required + if not self._read_projection_cache(image.timestamp): + + # Detect aruco markers + logging.debug('\t> detect markers') + + self.__aruco_detector.detect_markers(image) - # Initialize layer if needed - if layer.aoi_scene is None: + # Clear former layers projection into camera frame + self._clear_projection() - layer.aoi_scene = AOI2DScene.AOI2DScene() + # Project each aoi 3d scene into camera frame + for scene_name, scene in self.scenes.items(): - else: + ''' TODO: Enable aruco_aoi processing + if scene.aruco_aoi: - layer.aoi_scene.clear() + try: - # Project each aoi 3d scene into camera frame - for scene_name, scene in self.scenes.items(): + # Build AOI scene directly from detected ArUco marker corners + self.layers[??].aoi_2d_scene |= scene.build_aruco_aoi_scene(self.__aruco_detector.detected_markers()) - ''' TODO: Enable aruco_aoi processing - if scene.aruco_aoi: + except ArFeatures.PoseEstimationFailed: + + pass + ''' + + # Estimate scene pose from detected scene markers + logging.debug('\t> estimate %s scene pose', scene_name) try: - # Build AOI scene directly from detected ArUco marker corners - self.layers[??].aoi_2d_scene |= scene.build_aruco_aoi_scene(self.__aruco_detector.detected_markers()) + tvec, rmat, _ = scene.estimate_pose(self.__aruco_detector.detected_markers(), timestamp=image.timestamp) - except ArFeatures.PoseEstimationFailed: + # Project scene into camera frame according estimated pose + for layer_name, layer_projection in scene.project(tvec, rmat, self.visual_hfov, self.visual_vfov, timestamp=image.timestamp): - pass - ''' + logging.debug('\t> project %s scene %s layer', scene_name, layer_name) - # Estimate scene pose from detected scene markers - logging.debug('\t> estimate %s scene pose', scene_name) + try: - tvec, rmat, _ = scene.estimate_pose(self.__aruco_detector.detected_markers(), timestamp=self.timestamp) + # Update camera layer aoi + self.layers[layer_name].aoi_scene |= layer_projection - # Project scene into camera frame according estimated pose - for layer_name, layer_projection in scene.project(tvec, rmat, self.visual_hfov, self.visual_vfov, - timestamp=self.timestamp): + # Timestamp camera layer + self.layers[layer_name].timestamp = image.timestamp - logging.debug('\t> project %s scene %s layer', scene_name, layer_name) + except KeyError: - try: + pass - # Update camera layer aoi - self.layers[layer_name].aoi_scene |= layer_projection + # Write projection into the cache if required + self._write_projection_cache(image.timestamp) - # Timestamp camera layer - self.layers[layer_name].timestamp = self.timestamp + except DataFeatures.TimestampedException as e: - except KeyError: + # Write exception into the cache if required + self._write_projection_cache(image.timestamp, e) - pass + # Raise exception + raise e @DataFeatures.PipelineStepImage def image(self, draw_detected_markers: dict = None, draw_scenes: dict = None, diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py index bb7bdbf..8acedb1 100644 --- a/src/argaze/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze/ArUcoMarkers/ArUcoScene.py @@ -83,7 +83,7 @@ class ArUcoScene(ArFeatures.ArScene): raise ArFeatures.PoseEstimationFailed('Only one marker belongs to the scene') - # Estimate pose from a markers corners + # Estimate pose from markers corners success, tvec, rmat = self.__aruco_markers_group.estimate_pose_from_markers_corners(scene_markers, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D) if not success: diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 68afbca..08e0ef1 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -693,10 +693,10 @@ def PipelineStepExit(method): logging.debug('%s.__exit__', get_class_path(self)) - PipelineStepObject.__exit__(self, *args) - method(self, *args) + PipelineStepObject.__exit__(self, *args) + return wrapper diff --git a/src/argaze/__main__.py b/src/argaze/__main__.py index c80657e..9bcbe5a 100644 --- a/src/argaze/__main__.py +++ b/src/argaze/__main__.py @@ -18,7 +18,9 @@ __license__ = "GPLv3" import argparse import logging +import json import contextlib +import os from . import load from .ArFeatures import ArCamera, ArContext @@ -28,33 +30,64 @@ import cv2 # Manage arguments parser = argparse.ArgumentParser(description=__doc__.split('-')[0]) parser.add_argument('context_file', metavar='CONTEXT_FILE', type=str, help='JSON context filepath') -parser.add_argument('-v', '--verbose', action='store_true', default=False, - help='enable verbose mode to print information in console') +parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') +parser.add_argument('-p', '--pipe_path', metavar='PIPE_PATH', type=str, default=None, help='enable pipe communication at given path to execute external commands') args = parser.parse_args() # Manage logging logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG if args.verbose else logging.INFO) +# Manage pipe communication +if args.pipe_path is not None: + + if not os.path.exists(args.pipe_path): + + os.mkfifo(args.pipe_path) + + # Open the fifo in non-blocking mode or it will stalls until someone opens it for writting + pipe_file = os.open(args.pipe_path, os.O_RDONLY | os.O_NONBLOCK) + + logging.info('%s pipe opened', args.pipe_path) + # Load context from JSON file with load(args.context_file) as context: # Loaded object must be a subclass of ArContext if not issubclass(type(context), ArContext): + raise TypeError('Loaded object is not a subclass of ArContext') if args.verbose: + print(context) # Create a window to display context cv2.namedWindow(context.name, cv2.WINDOW_AUTOSIZE) # Waiting for 'ctrl+C' interruption - with contextlib.suppress(KeyboardInterrupt): + with contextlib.suppress(KeyboardInterrupt), os.fdopen(pipe_file) if args.pipe_path is not None else contextlib.nullcontext() as pipe: # Visualization loop while True: + # Read message from pipe if required + if args.pipe_path is not None: + + message = pipe.read().rstrip('\n') + + if message: + + logging.info('%s: %s', args.pipe_path, message) + + try: + + exec(message) + + except Exception as e: + + logging.error('%s', e) + # Display context cv2.imshow(context.name, context.image()) @@ -62,6 +95,7 @@ with load(args.context_file) as context: if issubclass(type(context.pipeline), ArCamera): for scene_frame in context.pipeline.scene_frames(): + cv2.imshow(scene_frame.name, scene_frame.image()) # Key interaction @@ -69,7 +103,25 @@ with load(args.context_file) as context: # Esc: close window if key_pressed == 27: + raise KeyboardInterrupt() + # Space bar: pause/resume pipeline processing + if key_pressed == 32: + + try: + + if context.is_paused(): + + context.resume() + + else: + + context.pause() + + except NotImplementedError: + + pass + # Stop frame display cv2.destroyAllWindows() diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py index c04d20a..3c5be35 100644 --- a/src/argaze/utils/UtilsFeatures.py +++ b/src/argaze/utils/UtilsFeatures.py @@ -19,6 +19,7 @@ __license__ = "GPLv3" import os import pathlib import time +import csv import types import traceback @@ -243,6 +244,63 @@ class FileWriter(DataFeatures.PipelineStepObject): # Write into file print(data, file=self.__file, flush=True) +class FileReader(DataFeatures.PipelineStepObject): + """Read data from a file line by line.""" + + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + # Init private attributes + self.__path = None + self.__separator = ',' + + @property + def path(self) -> str: + """File path where to read data.""" + return self.__path + + @path.setter + def path(self, path: str): + + self.__path = pathlib.Path(path) + + @property + def separator(self) -> str: + """String used to separate elements during string to tuple conversion.""" + return self.__separator + + @separator.setter + def separator(self, separator: str): + + self.__separator = separator + + @DataFeatures.PipelineStepEnter + def __enter__(self): + + # Open file + self.__file = csv.reader(open(self.__path), delimiter= self.__separator) + + @DataFeatures.PipelineStepExit + def __exit__(self, exception_type, exception_value, exception_traceback): + + pass + + def read(self) -> str|tuple: + """Read next data from file. + + !!! note + Quoted strings separated by separator string are converted into tuple elements. + """ + + try: + + return next(self.__file) + + except Exception: + + raise EOFError + class VideoWriter(DataFeatures.PipelineStepObject, DataFeatures.SharedObject): """Open ffmpeg application as sub-process. FFmpeg input PIPE: RAW images in BGR color format diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py index 2f43bc5..92aba0f 100644 --- a/src/argaze/utils/contexts/TobiiProGlasses2.py +++ b/src/argaze/utils/contexts/TobiiProGlasses2.py @@ -631,7 +631,7 @@ class LiveStream(ArFeatures.ArContext): iptype = socket.AF_INET6 res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, - socket.AI_PASSIVE) + socket.AI_PASSIVE) family, socktype, proto, canonname, sockaddr = res[0] new_socket = socket.socket(family, socktype, proto) @@ -947,8 +947,7 @@ class LiveStream(ArFeatures.ArContext): if self.__calibration_id is not None: - status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', - ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) + status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed']) # Forget calibration id if status != 'calibrating': @@ -963,9 +962,8 @@ class LiveStream(ArFeatures.ArContext): # RECORDING FEATURES - def __wait_for_recording_status(self, recording_id, - status_array=['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', - 'stopped', 'done', 'stale', 'failed']): + def __wait_for_recording_status(self, recording_id, status_array=['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']): + return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array) def create_recording(self, participant_name, recording_name='', recording_notes='') -> str: @@ -1160,6 +1158,24 @@ class PostProcessing(ArFeatures.ArContext): self.__data_list = [] + # Initialize synchronisation + self.__sync_event = None + self.__sync_event_unit = None + self.__sync_event_factor = None + self.__sync_data_ts = None + self.__sync_ts = None + self.__last_sync_data_ts = None + self.__last_sync_ts = None + + self.__time_unit_factor = { + "µs": 1e-3, + "ms": 1, + "s": 1e3 + } + + # Initialize inconsistent timestamp monitoring + self.__last_data_ts = None + # Init protected attributes self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS} @@ -1193,6 +1209,27 @@ class PostProcessing(ArFeatures.ArContext): self.__end = end + @property + def sync_event(self) -> str: + """Optional event type dedicated to syncrhonize Tobii timestamps with external time source.""" + return self.__sync_event + + @sync_event.setter + def sync_event(self, sync_event: str): + + self.__sync_event = sync_event + + @property + def sync_event_unit(self) -> str: + """Define sync event unit for conversion purpose ('µs', 'ms' or 's')""" + return self.__sync_event_unit + + @sync_event_unit.setter + def sync_event_unit(self, sync_event_unit: str): + + self.__sync_event_unit = sync_event_unit + self.__sync_event_factor = self.__time_unit_factor.get(sync_event_unit) + @DataFeatures.PipelineStepEnter def __enter__(self): @@ -1222,6 +1259,9 @@ class PostProcessing(ArFeatures.ArContext): # Create stop event self.__stop_event = threading.Event() + # Create pause event + self.__pause_event = threading.Event() + # Open reading thread self.__reading_thread = threading.Thread(target=self.__read) @@ -1244,15 +1284,24 @@ class PostProcessing(ArFeatures.ArContext): for video_ts, video_image, data_list in self: + # Check pause event (and stop event) + while self.__pause_event.is_set() and not self.__stop_event.is_set(): + + logging.debug('> reading is paused at %i', video_ts) + + self._process_camera_image(timestamp=video_ts, image=video_image) + + time.sleep(1) + + # Check stop event if self.__stop_event.is_set(): + break logging.debug('> read image at %i timestamp', video_ts) # Process camera image - self._process_camera_image( - timestamp=video_ts, - image=video_image) + self._process_camera_image(timestamp=video_ts, image=video_image) height, width, _ = video_image.shape @@ -1261,6 +1310,62 @@ class PostProcessing(ArFeatures.ArContext): # Process data for data_ts, data_object, data_object_type in data_list: + # Check sync event first if required + if self.__sync_event is not None: + + if data_object_type == 'Event': + + logging.debug('> reading %s event (%s) at %f ms', data_object.type, data_object.tag, data_ts) + + if data_object.type == self.__sync_event: + + # Store old sync data ts + if self.__last_sync_data_ts is None and self.__sync_data_ts is not None: + + self.__last_sync_data_ts = self.__sync_data_ts + + # Store old sync ts + if self.__last_sync_ts is None and self.__sync_ts is not None: + + self.__last_sync_ts = self.__sync_ts + + # Store sync event timestamp + self.__sync_data_ts = data_ts + self.__sync_ts = float(data_object.tag) * self.__sync_event_factor + + # Monitor delay between data ts and sync ts + if self.__last_sync_data_ts is not None and self.__last_sync_ts is not None: + + diff_data_ts = self.__sync_data_ts - self.__last_sync_data_ts + diff_sync_ts = (self.__sync_ts - self.__last_sync_ts) + + # Correct sync ts + self.__sync_ts += diff_data_ts-diff_sync_ts + + if abs(diff_data_ts-diff_sync_ts) > 0: + + logging.info('Difference between data and sync event timestamps is %i ms', diff_data_ts-diff_sync_ts) + + # Don't process gaze positions if sync is required but sync event not happened yet + if self.__sync_event is not None and self.__sync_ts is None: + + continue + + # Otherwise, synchronize timestamp with sync event + else: + + data_ts = int(self.__sync_ts + data_ts - self.__sync_data_ts) + + # Catch inconstistent timestamps + if self.__last_data_ts is not None: + + if self.__data_ts - self.__last_data_ts <= 0: + + logging.error('! %i gaze position more recent than the previous one', data_ts) + + last_data_ts = data_ts + + # Process gaze positions match data_object_type: case 'GazePosition': @@ -1280,7 +1385,7 @@ class PostProcessing(ArFeatures.ArContext): # Process empty gaze position self._process_gaze_position(timestamp=data_ts) - + def __iter__(self): self.__data_file = gzip.open(os.path.join(self.__segment, TOBII_SEGMENT_DATA_FILENAME)) @@ -1366,3 +1471,20 @@ class PostProcessing(ArFeatures.ArContext): # Return millisecond timestamp, data object and type return ts * 1e-3, data_object, data_object_type + + @DataFeatures.PipelineStepMethod + def pause(self): + """Pause pipeline processing.""" + + self.__pause_event.set() + + def is_paused(self) -> bool: + """Is pipeline processing paused?""" + + return self.__pause_event.is_set() + + @DataFeatures.PipelineStepMethod + def resume(self): + """Resume pipeline processing.""" + + self.__pause_event.clear()
\ No newline at end of file |