diff options
Diffstat (limited to 'docs/use_cases/pilot_gaze_tracking/observers.md')
-rw-r--r-- | docs/use_cases/pilot_gaze_tracking/observers.md | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/docs/use_cases/pilot_gaze_tracking/observers.md b/docs/use_cases/pilot_gaze_tracking/observers.md new file mode 100644 index 0000000..5f5bc78 --- /dev/null +++ b/docs/use_cases/pilot_gaze_tracking/observers.md @@ -0,0 +1,103 @@ +Fixation events sending +======================= + +Observers are attached to pipeline steps to be notified when a method is called. + +## observers.py + +For this use case we need to enable [Ivy bus communication](https://gitlab.com/ivybus/ivy-python/) to log ArUco detection results (on *ArUcoCamera.on_watch* call) and fixation identification with AOI matching (on *ArUcoCamera.on_look* call). + +```python +import logging + +from argaze import DataFeatures, GazeFeatures + +from ivy.std_api import * +from ivy.ivy import IvyIllegalStateError + + +class IvyBus(DataFeatures.PipelineStepObject): + """Handle Ivy bus.""" + + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + self.__bus = None + + @property + def bus(self) -> str: + return self.__bus + + @bus.setter + def bus(self, bus: str): + self.__bus = bus + + @DataFeatures.PipelineStepEnter + def __enter__(self, parent = None): + + # Enable Ivy bus + IvyInit(self.name) + IvyStart(self.__bus) + + return self + + @DataFeatures.PipelineStepExit + def __exit__(self, exception_type, exception_value, exception_traceback): + + # Stop Ivy bus + IvyStop() + + +class ArUcoCameraLogger(DataFeatures.PipelineStepObject): + """Log ArUcoCamera activity.""" + + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + self._last_markers_number = None + + def on_watch(self, timestamp, aruco_camera, exception): + """Report ArUco markers detection info on Ivy bus.""" + + # Wait for number of detected marker changes + if aruco_camera.aruco_detector.detected_markers_number() != self._last_markers_number: + + self._last_markers_number = aruco_camera.aruco_detector.detected_markers_number() + + output = f'ArUcoDetection MarkersNumber={self._last_markers_number}' + + # Send Ivy message + IvySendMsg(output) + + logging.debug('%i %s', timestamp, output) + + def on_look(self, timestamp, aruco_camera, exception): + """Report fixation and metrics on Ivy bus.""" + + # Select 'Main' layer + main_layer = aruco_camera.layers['Main'] + + if GazeFeatures.is_fixation(aruco_camera.last_gaze_movement()): + + fixation = aruco_camera.last_gaze_movement() + + # Output in progress fixation data + if not fixation.is_finished(): + + output = f'FixationInProgress Start={fixation[0].timestamp} Duration={fixation.duration} AOI={main_layer.last_looked_aoi_name()} Probabilities={main_layer.aoi_matcher.looked_probabilities()}' + + # Send Ivy message + IvySendMsg(output) + + logging.debug('%i %s %s %s', timestamp, aruco_camera.last_gaze_position().value, aruco_camera.name, output) + + # Output finished fixation data + else: + + output = f'FixationEnd Start={fixation[0].timestamp} Duration={fixation.duration} AOI={main_layer.aoi_matcher.looked_aoi_name()} Probabilities={main_layer.aoi_matcher.looked_probabilities()}' + + # Send Ivy message + IvySendMsg(output) + + logging.debug('%i %s %s %s', timestamp, aruco_camera.last_gaze_position().value, aruco_camera.name, output) +``` |