From cf36ef98f7a1a99caf083b4942e68d0cbd1c7500 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Sat, 23 Mar 2024 00:56:45 +0100 Subject: Fixing expected and excluded aoi. --- src/argaze/ArFeatures.py | 102 ++++++++++++++++---------- src/argaze/ArUcoMarkers/ArUcoCamera.py | 3 + src/argaze/ArUcoMarkers/ArUcoDetector.py | 2 +- src/argaze/GazeFeatures.py | 119 ++++++++++++++++++------------- 4 files changed, 137 insertions(+), 89 deletions(-) diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 8d0da7d..5e219ff 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -17,6 +17,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" from typing import Iterator, Union +import logging import json import os import sys @@ -174,9 +175,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__aoi_scene = AOI3DScene.AOI3DScene(new_aoi_scene) - # Update expected AOI of AOI scan path - self.__update_expected_aoi() - # Edit parent if self.__aoi_scene is not None: @@ -213,8 +211,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__aoi_scan_path = aoi_scan_path - # Update expected AOI of AOI scan path - self.__update_expected_aoi() + # Update expected AOI + self._update_expected_aoi() # Edit parent if self.__aoi_scan_path is not None: @@ -318,18 +316,27 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): "draw_parameters": self._draw_parameters } - def __update_expected_aoi(self): - """Edit aoi_scan_path's expected aoi list by removing aoi with name equals to layer name.""" + def _update_expected_aoi(self): + """Update expected AOI of AOI scan path considering AOI scene and layer name.""" + + if self.__aoi_scene is None: + + logging.debug('ArLayer._update_expected_aoi %s (parent: %s): missing aoi scene', self.name, self.parent) - if self.__aoi_scene is not None and self.__aoi_scan_path is not None: + return - expected_aoi = list(self.__aoi_scene.keys()) + logging.debug('ArLayer._update_expected_aoi %s (parent: %s)', self.name, self.parent) - if self.name in expected_aoi: + # Get aoi names from aoi scene + expected_aoi = list(self.__aoi_scene.keys()) - expected_aoi.remove(self.name) + # Remove layer name from expected aoi + if self.name in expected_aoi: - self.__aoi_scan_path.expected_aoi = expected_aoi + expected_aoi.remove(self.name) + + # Update expected aoi: this will clear the scan path + self.__aoi_scan_path.expected_aoi = expected_aoi @DataFeatures.PipelineStepMethod def look(self, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): @@ -346,6 +353,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Use layer lock feature with self._lock: + logging.debug('ArLayer.look %s (parent: %s)', self.name, self.parent.name) + # Update current gaze movement self.__gaze_movement = gaze_movement @@ -361,6 +370,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally self.__looked_aoi_name, _ = self.__aoi_matcher.match(self.__aoi_scene, gaze_movement, timestamp=gaze_movement.timestamp) + logging.debug('\t> looked aoi name: %s', self.__looked_aoi_name) + # Valid and finished gaze movement has been identified if gaze_movement and gaze_movement.is_finished(): @@ -370,11 +381,15 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # TODO: add an option to filter None looked_aoi_name or not if self.__aoi_scan_path is not None: + logging.debug('\t> append fixation') + aoi_scan_step = self.__aoi_scan_path.append_fixation(gaze_movement, self.__looked_aoi_name) # Is there a new step? if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1: + logging.debug('\t> analyse aoi scan path') + # Analyze aoi scan path for aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers: @@ -388,6 +403,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Append saccade to aoi scan path if self.__aoi_scan_path is not None: + logging.debug('\t> append saccade') + self.__aoi_scan_path.append_saccade(gaze_movement) def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): @@ -1125,7 +1142,7 @@ class ArCamera(ArFrame): layer.parent = self # Update expected and excluded aoi - self.__update_expected_and_excluded_aoi() + self._update_expected_and_excluded_aoi() @property def scenes(self) -> dict: @@ -1148,7 +1165,7 @@ class ArCamera(ArFrame): scene.parent = self # Update expected and excluded aoi - self.__update_expected_and_excluded_aoi() + self._update_expected_and_excluded_aoi() @property def visual_hfov(self) -> float: @@ -1191,50 +1208,57 @@ class ArCamera(ArFrame): "visual_vfov": self.__visual_vfov } - def __update_expected_and_excluded_aoi(self): + def _update_expected_and_excluded_aoi(self): """Edit expected aoi of each layer aoi scan path with the aoi of corresponding scene layer. Edit excluded aoi to ignore frame aoi from aoi matching. """ - if self._layers and self._scenes: - for layer_name, layer in self._layers.items(): + if not self._layers or not self._scenes: - expected_aoi_list = [] - excluded_aoi_list = [] + logging.debug('ArCamera._update_expected_and_excluded_aoi %s: missing layers or scenes', self.name) - for scene_name, scene in self._scenes.items(): + return - # Append scene layer aoi to corresponding expected camera layer aoi - try: + logging.debug('ArCamera._update_expected_and_excluded_aoi %s', self.name) - scene_layer = scene.layers[layer_name] + for layer_name, layer in self._layers.items(): - expected_aoi_list.extend(list(scene_layer.aoi_scene.keys())) + expected_aoi_list = [] + excluded_aoi_list = [] - except KeyError: + for scene_name, scene in self._scenes.items(): - continue + # Append scene layer aoi to corresponding expected camera layer aoi + try: - # Remove scene frame from expected camera layer aoi - # Exclude scene frame from camera layer aoi matching - for frame_name, frame in scene.frames.items(): + scene_layer = scene.layers[layer_name] - try: - - expected_aoi_list.remove(frame_name) - excluded_aoi_list.append(frame_name) + expected_aoi_list.extend(list(scene_layer.aoi_scene.keys())) + + except KeyError: + + continue - except ValueError: + # Remove scene frame from expected camera layer aoi + # Exclude scene frame from camera layer aoi matching + for frame_name, frame in scene.frames.items(): - continue + try: + + expected_aoi_list.remove(frame_name) + excluded_aoi_list.append(frame_name) + + except ValueError: + + continue - if layer.aoi_scan_path is not None: + if layer.aoi_scan_path is not None: - layer.aoi_scan_path.expected_aoi = expected_aoi_list + layer.aoi_scan_path.expected_aoi = expected_aoi_list - if layer.aoi_matcher is not None: + if layer.aoi_matcher is not None: - layer.aoi_matcher.exclude = excluded_aoi_list + layer.aoi_matcher.exclude = excluded_aoi_list @DataFeatures.PipelineStepMethod def watch(self, image: numpy.array): diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index 0ad73db..df1362a 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -100,6 +100,9 @@ class ArUcoCamera(ArFeatures.ArCamera): scene.parent = self + # Update expected and excluded aoi + self._update_expected_and_excluded_aoi() + @ArFeatures.ArCamera.image_parameters.setter @DataFeatures.PipelineStepAttributeSetter def image_parameters(self, image_parameters: dict): diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py index a0f874b..b84030c 100644 --- a/src/argaze/ArUcoMarkers/ArUcoDetector.py +++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py @@ -334,7 +334,7 @@ class Observer(DataFeatures.PipelineStepObserver): def __init__(self, **kwargs): """Initialize marker detection metrics.""" - super().__init__() + DataFeatures.PipelineStepObserver.__init__(self) self.__try_count = 0 self.__success_count = 0 diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 9ea877a..48b303c 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -23,7 +23,6 @@ import importlib from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures -from argaze.utils import UtilsFeatures # DEBUG import numpy import pandas @@ -711,22 +710,27 @@ class ScanStep(): return self.__first_fixation.duration + self.__last_saccade.duration class ScanPath(list): - """List of scan steps. - - Parameters: - duration_max: duration from which older scan steps are removed each time new scan steps are added. 0 means no maximal duration. - """ + """List of scan steps.""" def __init__(self, duration_max: int|float = 0): super().__init__() - self.duration_max = duration_max - + self.__duration_max = duration_max self.__last_fixation = None self.__duration = 0 @property + def duration_max(self) -> float: + """Duration from which older scan steps are removed each time new scan steps are added. 0 means no maximal duration.""" + return self.__duration_max + + @duration_max.setter + def duration_max(self, duration_max: float): + + self.__duration_max = duration_max + + @property def duration(self) -> int|float: """Sum of all scan steps duration @@ -739,9 +743,9 @@ class ScanPath(list): def __check_duration(self): """Constrain path duration to maximal duration.""" - if self.duration_max > 0: + if self.__duration_max > 0: - while self.__duration > self.duration_max: + while self.__duration > self.__duration_max: oldest_step = self.pop(0) @@ -952,34 +956,56 @@ class AOIScanStep(): OutsideAOI = 'GazeFeatures.OutsideAOI' class AOIScanPath(list): - """List of aoi scan steps over successive aoi. - - Parameters: - duration_max: duration from which older aoi scan steps are removed each time new aoi scan steps are added. 0 means no maximal duration. - """ + """List of aoi scan steps over successive aoi.""" def __init__(self, expected_aoi: list[str] = [], duration_max: int|float = 0): super().__init__() - self.duration_max = duration_max - self.expected_aoi = expected_aoi - + self.__expected_aoi = expected_aoi + self.__duration_max = duration_max self.__duration = 0 - def clear(self): - """Clear aoi scan steps list, letter sequence and transition matrix.""" + self.clear() - super().clear() + @property + def expected_aoi(self): + """List of all expected aoi.""" - self.__movements = TimeStampedGazeMovements() - self.__current_aoi = '' - self.__index = ord('A') - self.__aoi_letter = {} - self.__letter_aoi = {} + return self.__expected_aoi - size = len(self.__expected_aoi) - self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi, columns=self.__expected_aoi) + @expected_aoi.setter + def expected_aoi(self, expected_aoi: list[str] = []): + """Edit list of all expected aoi. + + !!! warning + This will clear the AOIScanPath + """ + + # Check expected aoi are not the same than previous ones + if len(expected_aoi) == len(self.__expected_aoi[1:]): + + equal = [a == b for a, b in zip(expected_aoi, self.__expected_aoi[1:])] + + if all(equal): + + return + + # Otherwise, update expected aoi + self.__expected_aoi = [OutsideAOI] + self.__expected_aoi += expected_aoi + + self.clear() + + @property + def duration_max(self) -> float: + """Duration from which older aoi scan steps are removed each time new aoi scan steps are added. 0 means no maximal duration.""" + return self.__duration_max + + @duration_max.setter + def duration_max(self, duration_max: float): + + self.__duration_max = duration_max @property def duration(self) -> float: @@ -990,9 +1016,9 @@ class AOIScanPath(list): def __check_duration(self): """Constrain path duration to maximal duration.""" - if self.duration_max > 0: + if self.__duration_max > 0: - while self.__duration > self.duration_max: + while self.__duration > self.__duration_max: oldest_step = self.pop(0) @@ -1004,6 +1030,20 @@ class AOIScanPath(list): # Decrement [index: source, columns: destination] value self.__transition_matrix.loc[oldest_step.aoi, self[0].aoi,] -= 1 + def clear(self): + """Clear aoi scan steps list, letter sequence and transition matrix.""" + + super().clear() + + self.__movements = TimeStampedGazeMovements() + self.__current_aoi = '' + self.__index = ord('A') + self.__aoi_letter = {} + self.__letter_aoi = {} + + size = len(self.__expected_aoi) + self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi, columns=self.__expected_aoi) + def __get_aoi_letter(self, aoi): try : @@ -1031,25 +1071,6 @@ class AOIScanPath(list): sequence += step.letter return sequence - - @property - def expected_aoi(self): - """List of all expected aoi.""" - - return self.__expected_aoi - - @expected_aoi.setter - def expected_aoi(self, expected_aoi: list[str] = []): - """Edit list of all expected aoi. - - !!! warning - This will clear the AOIScanPath - """ - - self.__expected_aoi = [OutsideAOI] - self.__expected_aoi += expected_aoi - - self.clear() @property def current_aoi(self): -- cgit v1.1