From 1f6be6f48cf3f083ccf6dd091f9c73e2e2de1c9a Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 19 Jun 2024 18:32:50 +0200 Subject: Rewriting video streaming features to avoid lock. --- src/argaze/utils/contexts/TobiiProGlasses2.py | 88 +++++++++++++++------------ 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py index 98ac85a..e3c68b0 100644 --- a/src/argaze/utils/contexts/TobiiProGlasses2.py +++ b/src/argaze/utils/contexts/TobiiProGlasses2.py @@ -696,8 +696,8 @@ class LiveStream(ArFeatures.LiveProcessingContext): container = av.open(f'rtsp://{self.__address}:8554/live/scene', options={'rtsp_transport': 'tcp'}) self.__stream = container.streams.video[0] - # Create a video buffer with a lock - self.__video_buffer = collections.OrderedDict() + # Create a single image buffer with a lock + self.__video_buffer = None self.__video_buffer_lock = threading.Lock() # Open video buffer reader @@ -718,70 +718,78 @@ class LiveStream(ArFeatures.LiveProcessingContext): logging.debug('> stop event is set') break - if image is not None: + # Check image validity + if image is None: - if image.time is not None: + # Wait for half frame time + time.sleep(2 / self.__video_fps) + continue + + # Check image time validity + if image.time is None: + + # Wait for half frame time + time.sleep(2 / self.__video_fps) + continue + + # Store first timestamp + if first_ts == 0: + + first_ts = image.time + + # Edit millisecond timestamp + timestamp = int((image.time - first_ts) * 1e3) - # Store first timestamp - if first_ts == 0: - first_ts = image.time + # Ignore image if buffer is locked + if not self.__video_buffer_lock.locked(): - # Edit millisecond timestamp - timestamp = int((image.time - first_ts) * 1e3) + # Decode image + image = image.to_ndarray(format='bgr24') + + # Lock buffer access + with self.__video_buffer_lock: logging.debug('> store image at %i timestamp', timestamp) - # Lock buffer access - self.__video_buffer_lock.acquire() + # Store image with time index + self.__video_buffer= (timestamp, image) - # Decode image and store it at time index - self.__video_buffer[timestamp] = image.to_ndarray(format='bgr24') + else: - # Unlock buffer access - self.__video_buffer_lock.release() + logging.warning('%s.__stream_video: ignore image at %i', DataFeatures.get_class_path(self), timestamp) def __video_buffer_read(self): - """Read incoming buffered video images.""" + """Read incoming video buffer.""" logging.debug('%s.__video_buffer_read', DataFeatures.get_class_path(self)) while not self._stop_event.is_set(): - # Can't read image while it is locked - while self.__video_buffer_lock.locked(): - # Check 10 times per frame - time.sleep(1 / (10 * self.__video_fps)) + # Wait for half frame time + time.sleep(2 / self.__video_fps) # Lock buffer access - self.__video_buffer_lock.acquire() - - # Video buffer not empty - if len(self.__video_buffer) > 0: - - logging.debug('> %i images in buffer', len(self.__video_buffer)) + with self.__video_buffer_lock: - # Get last stored image - try: + # Video buffer not empty + if self.__video_buffer is not None: - timestamp, image = self.__video_buffer.popitem(last=True) + timestamp, image = self.__video_buffer logging.debug('> read image at %i timestamp', timestamp) - if len(self.__video_buffer) > 0: - logging.warning('skipping %i image', len(self.__video_buffer)) + # Process video image + try: - # Clear buffer - self.__video_buffer = collections.OrderedDict() - - # Process camera image - self._process_camera_image(timestamp=timestamp, image=image) + # Process camera image + self._process_camera_image(timestamp=timestamp, image=image) - except Exception as e: + except Exception as e: - logging.warning('%s.__video_buffer_read: %s', DataFeatures.get_class_path(self), e) + logging.warning('%s.__video_buffer_read: %s', DataFeatures.get_class_path(self), e) - # Unlock buffer access - self.__video_buffer_lock.release() + # Clear buffer + self.__video_buffer = None def __keep_alive(self): """Maintain network connection.""" -- cgit v1.1