From 3fc1abf3ed699c71faf7dec66c53a3a020ec8e16 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Mon, 1 Jul 2024 17:19:07 +0200 Subject: Improving use cases section. --- docs/use_cases/pilot_gaze_monitoring/observers.md | 99 +++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 docs/use_cases/pilot_gaze_monitoring/observers.md (limited to 'docs/use_cases/pilot_gaze_monitoring/observers.md') diff --git a/docs/use_cases/pilot_gaze_monitoring/observers.md b/docs/use_cases/pilot_gaze_monitoring/observers.md new file mode 100644 index 0000000..3b3f07d --- /dev/null +++ b/docs/use_cases/pilot_gaze_monitoring/observers.md @@ -0,0 +1,99 @@ +Fixation events sending +======================= + +The **observers.py** file ... + +```python +import logging + +from argaze import DataFeatures, GazeFeatures + +from ivy.std_api import * +from ivy.ivy import IvyIllegalStateError + + +class IvyBus(DataFeatures.PipelineStepObject): + """Handle Ivy bus.""" + + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + self.__bus = None + + @property + def bus(self) -> str: + return self.__bus + + @bus.setter + def bus(self, bus: str): + self.__bus = bus + + @DataFeatures.PipelineStepEnter + def __enter__(self, parent = None): + + # Enable Ivy bus + IvyInit(self.name) + IvyStart(self.__bus) + + return self + + @DataFeatures.PipelineStepExit + def __exit__(self, exception_type, exception_value, exception_traceback): + + # Stop Ivy bus + IvyStop() + + +class ArUcoCameraLogger(DataFeatures.PipelineStepObject): + """Log ArUcoCamera activity.""" + + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + self._last_markers_number = None + + def on_watch(self, timestamp, aruco_camera, exception): + """Report ArUco markers detection info on Ivy bus.""" + + # Wait for number of detected marker changes + if aruco_camera.aruco_detector.detected_markers_number() != self._last_markers_number: + + self._last_markers_number = aruco_camera.aruco_detector.detected_markers_number() + + output = f'ArUcoDetection MarkersNumber={self._last_markers_number}' + + # Send Ivy message + IvySendMsg(output) + + logging.debug('%i %s', timestamp, output) + + def on_look(self, timestamp, aruco_camera, exception): + """Report fixation and metrics on Ivy bus.""" + + # Select 'Main' layer + main_layer = aruco_camera.layers['Main'] + + if GazeFeatures.is_fixation(aruco_camera.last_gaze_movement()): + + fixation = aruco_camera.last_gaze_movement() + + # Output in progress fixation data + if not fixation.is_finished(): + + output = f'FixationInProgress Start={fixation[0].timestamp} Duration={fixation.duration} AOI={main_layer.last_looked_aoi_name()} Probabilities={main_layer.aoi_matcher.looked_probabilities()}' + + # Send Ivy message + IvySendMsg(output) + + logging.debug('%i %s %s %s', timestamp, aruco_camera.last_gaze_position().value, aruco_camera.name, output) + + # Output finished fixation data + else: + + output = f'FixationEnd Start={fixation[0].timestamp} Duration={fixation.duration} AOI={main_layer.aoi_matcher.looked_aoi_name()} Probabilities={main_layer.aoi_matcher.looked_probabilities()}' + + # Send Ivy message + IvySendMsg(output) + + logging.debug('%i %s %s %s', timestamp, aruco_camera.last_gaze_position().value, aruco_camera.name, output) +``` -- cgit v1.1