From b130513e6538f6590be723632c55dbb3cc2de11e Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Mon, 22 Apr 2024 19:56:49 +0200 Subject: Fixing Tobii post processing end of file. Moving stop and pause event inside ArContext. --- src/argaze/ArFeatures.py | 20 +++++- src/argaze/__main__.py | 2 +- src/argaze/utils/contexts/PupilLabs.py | 9 +-- src/argaze/utils/contexts/Random.py | 7 +-- src/argaze/utils/contexts/TobiiProGlasses2.py | 90 ++++++++++++--------------- 5 files changed, 64 insertions(+), 64 deletions(-) diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 58b23a0..7ea7f2b 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -17,6 +17,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import logging +import threading import math import os import ast @@ -1463,6 +1464,8 @@ class ArContext(DataFeatures.PipelineStepObject): self.__process_camera_image_frequency = 0 # Init protected attributes + self._stop_event = threading.Event() + self._pause_event = threading.Event() self._image_parameters = DEFAULT_ARCONTEXT_IMAGE_PARAMETERS @property @@ -1661,19 +1664,30 @@ class ArContext(DataFeatures.PipelineStepObject): return image + def is_running(self) -> bool: + """Is context running?""" + + return not self._stop_event.is_set() + + @DataFeatures.PipelineStepMethod + def stop(self): + """Stop context.""" + + self._stop_event.set() + @DataFeatures.PipelineStepMethod def pause(self): """Pause pipeline processing.""" - raise NotImplementedError('pause() method not implemented') + self._pause_event.set() def is_paused(self) -> bool: """Is pipeline processing paused?""" - raise NotImplementedError('is_paused() method not implemented') + return self._pause_event.is_set() @DataFeatures.PipelineStepMethod def resume(self): """Resume pipeline processing.""" - raise NotImplementedError('resume() method not implemented') + self._pause_event.clear() diff --git a/src/argaze/__main__.py b/src/argaze/__main__.py index 9bcbe5a..e598384 100644 --- a/src/argaze/__main__.py +++ b/src/argaze/__main__.py @@ -69,7 +69,7 @@ with load(args.context_file) as context: with contextlib.suppress(KeyboardInterrupt), os.fdopen(pipe_file) if args.pipe_path is not None else contextlib.nullcontext() as pipe: # Visualization loop - while True: + while context.is_running(): # Read message from pipe if required if args.pipe_path is not None: diff --git a/src/argaze/utils/contexts/PupilLabs.py b/src/argaze/utils/contexts/PupilLabs.py index d814deb..3d265e4 100644 --- a/src/argaze/utils/contexts/PupilLabs.py +++ b/src/argaze/utils/contexts/PupilLabs.py @@ -44,9 +44,6 @@ class LiveStream(ArFeatures.ArContext): logging.info('Pupil-Labs Device connexion starts...') - # Create stop event - self.__stop_event = threading.Event() - # Init timestamp self.__start_time = time.time() @@ -80,7 +77,7 @@ class LiveStream(ArFeatures.ArContext): logging.debug('Stream gaze from Pupil Device') - while not self.__stop_event.is_set(): + while not self._stop_event.is_set(): try: while True: @@ -111,7 +108,7 @@ class LiveStream(ArFeatures.ArContext): logging.debug('Stream video from Pupil Device') - while not self.__stop_event.is_set(): + while not self._stop_event.is_set(): try: while True: @@ -134,7 +131,7 @@ class LiveStream(ArFeatures.ArContext): logging.debug('Pupil-Labs context stops...') # Close data stream - self.__stop_event.set() + self._stop_event.set() # Stop streaming threading.Thread.join(self.__gaze_thread) diff --git a/src/argaze/utils/contexts/Random.py b/src/argaze/utils/contexts/Random.py index 7fc4d91..f8890aa 100644 --- a/src/argaze/utils/contexts/Random.py +++ b/src/argaze/utils/contexts/Random.py @@ -50,9 +50,6 @@ class GazePositionGenerator(ArFeatures.ArContext): logging.info('GazePositionGenerator context starts...') - # Create stop event - self.__stop_event = threading.Event() - # Start gaze position generator thread self.__gaze_thread = threading.Thread(target=self.__generate_gaze_position) self.__gaze_thread.start() @@ -66,7 +63,7 @@ class GazePositionGenerator(ArFeatures.ArContext): self.__x = int(self.range[0] / 2) self.__y = int(self.range[1] / 2) - while not self.__stop_event.is_set(): + while not self._stop_event.is_set(): # Edit millisecond timestamp timestamp = int((time.time() - start_time) * 1e3) @@ -88,6 +85,6 @@ class GazePositionGenerator(ArFeatures.ArContext): logging.info('GazePositionGenerator context ends...') # Stop gaze position generator thread - self.__stop_event.set() + self._stop_event.set() threading.Thread.join(self.__gaze_thread) \ No newline at end of file diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py index 340dbd8..db99e4d 100644 --- a/src/argaze/utils/contexts/TobiiProGlasses2.py +++ b/src/argaze/utils/contexts/TobiiProGlasses2.py @@ -558,9 +558,6 @@ class LiveStream(ArFeatures.ArContext): logging.info('Tobii Pro Glasses 2 participant id: %s', self.__participant_id) - # Create stop event - self.__stop_event = threading.Event() - # Open data stream self.__data_socket = self.__make_socket() self.__data_thread = threading.Thread(target=self.__stream_data) @@ -591,7 +588,7 @@ class LiveStream(ArFeatures.ArContext): logging.debug('%s.__exit__', DataFeatures.get_class_path(self)) # Close data stream - self.__stop_event.set() + self._stop_event.set() # Stop keeping connection alive threading.Thread.join(self.__keep_alive_thread) @@ -657,7 +654,7 @@ class LiveStream(ArFeatures.ArContext): # First timestamp to offset all timestamps first_ts = 0 - while not self.__stop_event.is_set(): + while not self._stop_event.is_set(): try: @@ -727,7 +724,7 @@ class LiveStream(ArFeatures.ArContext): logging.debug('> new image decoded') # Quit if the video acquisition thread have been stopped - if self.__stop_event.is_set(): + if self._stop_event.is_set(): logging.debug('> stop event is set') break @@ -758,7 +755,7 @@ class LiveStream(ArFeatures.ArContext): logging.debug('%s.__video_buffer_read', DataFeatures.get_class_path(self)) - while not self.__stop_event.is_set(): + while not self._stop_event.is_set(): # Can't read image while it is locked while self.__video_buffer_lock.locked(): @@ -803,7 +800,7 @@ class LiveStream(ArFeatures.ArContext): logging.debug('%s.__keep_alive', DataFeatures.get_class_path(self)) - while not self.__stop_event.is_set(): + while not self._stop_event.is_set(): self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport)) self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport)) @@ -1245,10 +1242,10 @@ class PostProcessing(ArFeatures.ArContext): raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}') # Constrain reading dates - self.__end = min(self.__end, int(info["seg_length"] * 1e3)) if self.__end != None else int( + self.end = min(self.end, int(info["seg_length"] * 1e3)) if not math.isnan(self.end) else int( info["seg_length"] * 1e3) - if self.__start >= self.__end: + if self.start >= self.end: raise ValueError('Start reading timestamp is equal or greater than end reading timestamp.') # TODO: log various info @@ -1256,12 +1253,6 @@ class PostProcessing(ArFeatures.ArContext): start_date = datetime.datetime.strptime(info["seg_t_start"], TOBII_DATETIME_FORMAT) stop_date = datetime.datetime.strptime(info["seg_t_stop"], TOBII_DATETIME_FORMAT) - # Create stop event - self.__stop_event = threading.Event() - - # Create pause event - self.__pause_event = threading.Event() - # Open reading thread self.__reading_thread = threading.Thread(target=self.__read) @@ -1274,7 +1265,7 @@ class PostProcessing(ArFeatures.ArContext): logging.debug('%s.__exit__', DataFeatures.get_class_path(self)) # Close data stream - self.__stop_event.set() + self._stop_event.set() # Stop reading thread threading.Thread.join(self.__reading_thread) @@ -1285,7 +1276,7 @@ class PostProcessing(ArFeatures.ArContext): for video_ts, video_image, data_list in self: # Check pause event (and stop event) - while self.__pause_event.is_set() and not self.__stop_event.is_set(): + while self._pause_event.is_set() and not self._stop_event.is_set(): logging.debug('> reading is paused at %i', video_ts) @@ -1294,7 +1285,7 @@ class PostProcessing(ArFeatures.ArContext): time.sleep(1) # Check stop event - if self.__stop_event.is_set(): + if self._stop_event.is_set(): break @@ -1352,7 +1343,7 @@ class PostProcessing(ArFeatures.ArContext): continue # Otherwise, synchronize timestamp with sync event - else: + elif self.__sync_event is not None and self.__sync_ts is not None: data_ts = int(self.__sync_ts + data_ts - self.__sync_data_ts) @@ -1385,6 +1376,9 @@ class PostProcessing(ArFeatures.ArContext): # Process empty gaze position self._process_gaze_position(timestamp=data_ts) + + # Set stop event ourself + self._stop_event.set() def __iter__(self): @@ -1394,6 +1388,18 @@ class PostProcessing(ArFeatures.ArContext): self.__vts_offset = 0 self.__vts_ts = -1 + # Position at required start time + if not math.isnan(self.start): + + # Video timestamps are seconds + start_ts = int(self.start * 1e-3 / self.__video_file.streams.video[0].time_base) + self.__video_file.seek(start_ts, any_frame=True, stream=self.__video_file.streams.video[0]) + + # Data timestamps are milliseconds + while self.__next_data()[0] < self.start: + pass + + # Pull next image as current image self.__video_ts, self.__video_image = self.__next_video_image() return self @@ -1418,17 +1424,21 @@ class PostProcessing(ArFeatures.ArContext): def __next_video_image(self): - image = next(self.__video_file.decode(self.__video_file.streams.video[0])) - ts = int(image.time * 1e3) + try: + + image = next(self.__video_file.decode(self.__video_file.streams.video[0])) + ts = int(image.time * 1e3) + + except Exception as e: - # Ignore before start timestamp - if ts < self.__start: - return self.__next__() + # DEBUG + print("DECODE ERROR:", e) - # Ignore images after end timestamp - if self.__end != None: + # Ignore image after required end timestamp + if self.end != None: + + if ts >= self.end: - if ts >= self.__end: raise StopIteration # Return millisecond timestamp and image @@ -1457,16 +1467,14 @@ class PostProcessing(ArFeatures.ArContext): # Ignore data before first vts entry if self.__vts_ts == -1: + return self.__next_data() ts -= self.__vts_ts ts += self.__vts_offset - # Ignore timestamps out of the given time range - if ts < self.__start * 1e3: - return self.__next_data() - - if ts >= self.__end * 1e3: + # Stop at end timestamp + if ts >= self.end * 1e3: raise StopIteration # Parse data @@ -1475,19 +1483,3 @@ class PostProcessing(ArFeatures.ArContext): # Return millisecond timestamp, data object and type return ts * 1e-3, data_object, data_object_type - @DataFeatures.PipelineStepMethod - def pause(self): - """Pause pipeline processing.""" - - self.__pause_event.set() - - def is_paused(self) -> bool: - """Is pipeline processing paused?""" - - return self.__pause_event.is_set() - - @DataFeatures.PipelineStepMethod - def resume(self): - """Resume pipeline processing.""" - - self.__pause_event.clear() \ No newline at end of file -- cgit v1.1