Fixation events sending ======================= Observers are attached to pipeline steps to be notified when a method is called. ## observers.py For this use case we need to enable [Ivy bus communication](https://gitlab.com/ivybus/ivy-python/) to log ArUco detection results (on *ArUcoCamera.on_watch* call) and fixation identification with AOI matching (on *ArUcoCamera.on_look* call). ```python import logging from argaze import DataFeatures, GazeFeatures from ivy.std_api import * from ivy.ivy import IvyIllegalStateError class IvyBus(DataFeatures.PipelineStepObject): """Handle Ivy bus.""" @DataFeatures.PipelineStepInit def __init__(self, **kwargs): self.__bus = None @property def bus(self) -> str: return self.__bus @bus.setter def bus(self, bus: str): self.__bus = bus @DataFeatures.PipelineStepEnter def __enter__(self, parent = None): # Enable Ivy bus IvyInit(self.name) IvyStart(self.__bus) return self @DataFeatures.PipelineStepExit def __exit__(self, exception_type, exception_value, exception_traceback): # Stop Ivy bus IvyStop() class ArUcoCameraLogger(DataFeatures.PipelineStepObject): """Log ArUcoCamera activity.""" @DataFeatures.PipelineStepInit def __init__(self, **kwargs): self._last_markers_number = None def on_watch(self, timestamp, aruco_camera, exception): """Report ArUco markers detection info on Ivy bus.""" # Wait for number of detected marker changes if aruco_camera.aruco_detector.detected_markers_number() != self._last_markers_number: self._last_markers_number = aruco_camera.aruco_detector.detected_markers_number() output = f'ArUcoDetection MarkersNumber={self._last_markers_number}' # Send Ivy message IvySendMsg(output) logging.debug('%i %s', timestamp, output) def on_look(self, timestamp, aruco_camera, exception): """Report fixation and metrics on Ivy bus.""" # Select 'Main' layer main_layer = aruco_camera.layers['Main'] if GazeFeatures.is_fixation(aruco_camera.last_gaze_movement()): fixation = aruco_camera.last_gaze_movement() # Output in progress fixation data if not fixation.is_finished(): output = f'FixationInProgress Start={fixation[0].timestamp} Duration={fixation.duration} AOI={main_layer.last_looked_aoi_name()} Probabilities={main_layer.aoi_matcher.looked_probabilities()}' # Send Ivy message IvySendMsg(output) logging.debug('%i %s %s %s', timestamp, aruco_camera.last_gaze_position().value, aruco_camera.name, output) # Output finished fixation data else: output = f'FixationEnd Start={fixation[0].timestamp} Duration={fixation.duration} AOI={main_layer.aoi_matcher.looked_aoi_name()} Probabilities={main_layer.aoi_matcher.looked_probabilities()}' # Send Ivy message IvySendMsg(output) logging.debug('%i %s %s %s', timestamp, aruco_camera.last_gaze_position().value, aruco_camera.name, output) ```