diff options
Diffstat (limited to 'docs/use_cases/pilot_gaze_monitoring/observers.md')
-rw-r--r-- | docs/use_cases/pilot_gaze_monitoring/observers.md | 99 |
1 files changed, 0 insertions, 99 deletions
diff --git a/docs/use_cases/pilot_gaze_monitoring/observers.md b/docs/use_cases/pilot_gaze_monitoring/observers.md deleted file mode 100644 index 2e3f394..0000000 --- a/docs/use_cases/pilot_gaze_monitoring/observers.md +++ /dev/null @@ -1,99 +0,0 @@ -Fixation events sending -======================= - -## observers.py - -```python -import logging - -from argaze import DataFeatures, GazeFeatures - -from ivy.std_api import * -from ivy.ivy import IvyIllegalStateError - - -class IvyBus(DataFeatures.PipelineStepObject): - """Handle Ivy bus.""" - - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): - - self.__bus = None - - @property - def bus(self) -> str: - return self.__bus - - @bus.setter - def bus(self, bus: str): - self.__bus = bus - - @DataFeatures.PipelineStepEnter - def __enter__(self, parent = None): - - # Enable Ivy bus - IvyInit(self.name) - IvyStart(self.__bus) - - return self - - @DataFeatures.PipelineStepExit - def __exit__(self, exception_type, exception_value, exception_traceback): - - # Stop Ivy bus - IvyStop() - - -class ArUcoCameraLogger(DataFeatures.PipelineStepObject): - """Log ArUcoCamera activity.""" - - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): - - self._last_markers_number = None - - def on_watch(self, timestamp, aruco_camera, exception): - """Report ArUco markers detection info on Ivy bus.""" - - # Wait for number of detected marker changes - if aruco_camera.aruco_detector.detected_markers_number() != self._last_markers_number: - - self._last_markers_number = aruco_camera.aruco_detector.detected_markers_number() - - output = f'ArUcoDetection MarkersNumber={self._last_markers_number}' - - # Send Ivy message - IvySendMsg(output) - - logging.debug('%i %s', timestamp, output) - - def on_look(self, timestamp, aruco_camera, exception): - """Report fixation and metrics on Ivy bus.""" - - # Select 'Main' layer - main_layer = aruco_camera.layers['Main'] - - if GazeFeatures.is_fixation(aruco_camera.last_gaze_movement()): - - fixation = aruco_camera.last_gaze_movement() - - # Output in progress fixation data - if not fixation.is_finished(): - - output = f'FixationInProgress Start={fixation[0].timestamp} Duration={fixation.duration} AOI={main_layer.last_looked_aoi_name()} Probabilities={main_layer.aoi_matcher.looked_probabilities()}' - - # Send Ivy message - IvySendMsg(output) - - logging.debug('%i %s %s %s', timestamp, aruco_camera.last_gaze_position().value, aruco_camera.name, output) - - # Output finished fixation data - else: - - output = f'FixationEnd Start={fixation[0].timestamp} Duration={fixation.duration} AOI={main_layer.aoi_matcher.looked_aoi_name()} Probabilities={main_layer.aoi_matcher.looked_probabilities()}' - - # Send Ivy message - IvySendMsg(output) - - logging.debug('%i %s %s %s', timestamp, aruco_camera.last_gaze_position().value, aruco_camera.name, output) -``` |