aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2024-03-23 00:56:45 +0100
committerThéo de la Hogue2024-03-23 00:56:45 +0100
commitcf36ef98f7a1a99caf083b4942e68d0cbd1c7500 (patch)
tree0ce0fd8b9a0fc56ef6a8f540411f21a8f7cdb7fb
parent20c5e89023ae87fce74389e1403181fc84c81acc (diff)
downloadargaze-cf36ef98f7a1a99caf083b4942e68d0cbd1c7500.zip
argaze-cf36ef98f7a1a99caf083b4942e68d0cbd1c7500.tar.gz
argaze-cf36ef98f7a1a99caf083b4942e68d0cbd1c7500.tar.bz2
argaze-cf36ef98f7a1a99caf083b4942e68d0cbd1c7500.tar.xz
Fixing expected and excluded aoi.
-rw-r--r--src/argaze/ArFeatures.py102
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoCamera.py3
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoDetector.py2
-rw-r--r--src/argaze/GazeFeatures.py119
4 files changed, 137 insertions, 89 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 8d0da7d..5e219ff 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -17,6 +17,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
from typing import Iterator, Union
+import logging
import json
import os
import sys
@@ -174,9 +175,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__aoi_scene = AOI3DScene.AOI3DScene(new_aoi_scene)
- # Update expected AOI of AOI scan path
- self.__update_expected_aoi()
-
# Edit parent
if self.__aoi_scene is not None:
@@ -213,8 +211,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__aoi_scan_path = aoi_scan_path
- # Update expected AOI of AOI scan path
- self.__update_expected_aoi()
+ # Update expected AOI
+ self._update_expected_aoi()
# Edit parent
if self.__aoi_scan_path is not None:
@@ -318,18 +316,27 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"draw_parameters": self._draw_parameters
}
- def __update_expected_aoi(self):
- """Edit aoi_scan_path's expected aoi list by removing aoi with name equals to layer name."""
+ def _update_expected_aoi(self):
+ """Update expected AOI of AOI scan path considering AOI scene and layer name."""
+
+ if self.__aoi_scene is None:
+
+ logging.debug('ArLayer._update_expected_aoi %s (parent: %s): missing aoi scene', self.name, self.parent)
- if self.__aoi_scene is not None and self.__aoi_scan_path is not None:
+ return
- expected_aoi = list(self.__aoi_scene.keys())
+ logging.debug('ArLayer._update_expected_aoi %s (parent: %s)', self.name, self.parent)
- if self.name in expected_aoi:
+ # Get aoi names from aoi scene
+ expected_aoi = list(self.__aoi_scene.keys())
- expected_aoi.remove(self.name)
+ # Remove layer name from expected aoi
+ if self.name in expected_aoi:
- self.__aoi_scan_path.expected_aoi = expected_aoi
+ expected_aoi.remove(self.name)
+
+ # Update expected aoi: this will clear the scan path
+ self.__aoi_scan_path.expected_aoi = expected_aoi
@DataFeatures.PipelineStepMethod
def look(self, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()):
@@ -346,6 +353,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Use layer lock feature
with self._lock:
+ logging.debug('ArLayer.look %s (parent: %s)', self.name, self.parent.name)
+
# Update current gaze movement
self.__gaze_movement = gaze_movement
@@ -361,6 +370,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally
self.__looked_aoi_name, _ = self.__aoi_matcher.match(self.__aoi_scene, gaze_movement, timestamp=gaze_movement.timestamp)
+ logging.debug('\t> looked aoi name: %s', self.__looked_aoi_name)
+
# Valid and finished gaze movement has been identified
if gaze_movement and gaze_movement.is_finished():
@@ -370,11 +381,15 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# TODO: add an option to filter None looked_aoi_name or not
if self.__aoi_scan_path is not None:
+ logging.debug('\t> append fixation')
+
aoi_scan_step = self.__aoi_scan_path.append_fixation(gaze_movement, self.__looked_aoi_name)
# Is there a new step?
if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1:
+ logging.debug('\t> analyse aoi scan path')
+
# Analyze aoi scan path
for aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers:
@@ -388,6 +403,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Append saccade to aoi scan path
if self.__aoi_scan_path is not None:
+ logging.debug('\t> append saccade')
+
self.__aoi_scan_path.append_saccade(gaze_movement)
def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None):
@@ -1125,7 +1142,7 @@ class ArCamera(ArFrame):
layer.parent = self
# Update expected and excluded aoi
- self.__update_expected_and_excluded_aoi()
+ self._update_expected_and_excluded_aoi()
@property
def scenes(self) -> dict:
@@ -1148,7 +1165,7 @@ class ArCamera(ArFrame):
scene.parent = self
# Update expected and excluded aoi
- self.__update_expected_and_excluded_aoi()
+ self._update_expected_and_excluded_aoi()
@property
def visual_hfov(self) -> float:
@@ -1191,50 +1208,57 @@ class ArCamera(ArFrame):
"visual_vfov": self.__visual_vfov
}
- def __update_expected_and_excluded_aoi(self):
+ def _update_expected_and_excluded_aoi(self):
"""Edit expected aoi of each layer aoi scan path with the aoi of corresponding scene layer.
Edit excluded aoi to ignore frame aoi from aoi matching.
"""
- if self._layers and self._scenes:
- for layer_name, layer in self._layers.items():
+ if not self._layers or not self._scenes:
- expected_aoi_list = []
- excluded_aoi_list = []
+ logging.debug('ArCamera._update_expected_and_excluded_aoi %s: missing layers or scenes', self.name)
- for scene_name, scene in self._scenes.items():
+ return
- # Append scene layer aoi to corresponding expected camera layer aoi
- try:
+ logging.debug('ArCamera._update_expected_and_excluded_aoi %s', self.name)
- scene_layer = scene.layers[layer_name]
+ for layer_name, layer in self._layers.items():
- expected_aoi_list.extend(list(scene_layer.aoi_scene.keys()))
+ expected_aoi_list = []
+ excluded_aoi_list = []
- except KeyError:
+ for scene_name, scene in self._scenes.items():
- continue
+ # Append scene layer aoi to corresponding expected camera layer aoi
+ try:
- # Remove scene frame from expected camera layer aoi
- # Exclude scene frame from camera layer aoi matching
- for frame_name, frame in scene.frames.items():
+ scene_layer = scene.layers[layer_name]
- try:
-
- expected_aoi_list.remove(frame_name)
- excluded_aoi_list.append(frame_name)
+ expected_aoi_list.extend(list(scene_layer.aoi_scene.keys()))
+
+ except KeyError:
+
+ continue
- except ValueError:
+ # Remove scene frame from expected camera layer aoi
+ # Exclude scene frame from camera layer aoi matching
+ for frame_name, frame in scene.frames.items():
- continue
+ try:
+
+ expected_aoi_list.remove(frame_name)
+ excluded_aoi_list.append(frame_name)
+
+ except ValueError:
+
+ continue
- if layer.aoi_scan_path is not None:
+ if layer.aoi_scan_path is not None:
- layer.aoi_scan_path.expected_aoi = expected_aoi_list
+ layer.aoi_scan_path.expected_aoi = expected_aoi_list
- if layer.aoi_matcher is not None:
+ if layer.aoi_matcher is not None:
- layer.aoi_matcher.exclude = excluded_aoi_list
+ layer.aoi_matcher.exclude = excluded_aoi_list
@DataFeatures.PipelineStepMethod
def watch(self, image: numpy.array):
diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py
index 0ad73db..df1362a 100644
--- a/src/argaze/ArUcoMarkers/ArUcoCamera.py
+++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py
@@ -100,6 +100,9 @@ class ArUcoCamera(ArFeatures.ArCamera):
scene.parent = self
+ # Update expected and excluded aoi
+ self._update_expected_and_excluded_aoi()
+
@ArFeatures.ArCamera.image_parameters.setter
@DataFeatures.PipelineStepAttributeSetter
def image_parameters(self, image_parameters: dict):
diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py
index a0f874b..b84030c 100644
--- a/src/argaze/ArUcoMarkers/ArUcoDetector.py
+++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py
@@ -334,7 +334,7 @@ class Observer(DataFeatures.PipelineStepObserver):
def __init__(self, **kwargs):
"""Initialize marker detection metrics."""
- super().__init__()
+ DataFeatures.PipelineStepObserver.__init__(self)
self.__try_count = 0
self.__success_count = 0
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index 9ea877a..48b303c 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -23,7 +23,6 @@ import importlib
from argaze import DataFeatures
from argaze.AreaOfInterest import AOIFeatures
-from argaze.utils import UtilsFeatures # DEBUG
import numpy
import pandas
@@ -711,22 +710,27 @@ class ScanStep():
return self.__first_fixation.duration + self.__last_saccade.duration
class ScanPath(list):
- """List of scan steps.
-
- Parameters:
- duration_max: duration from which older scan steps are removed each time new scan steps are added. 0 means no maximal duration.
- """
+ """List of scan steps."""
def __init__(self, duration_max: int|float = 0):
super().__init__()
- self.duration_max = duration_max
-
+ self.__duration_max = duration_max
self.__last_fixation = None
self.__duration = 0
@property
+ def duration_max(self) -> float:
+ """Duration from which older scan steps are removed each time new scan steps are added. 0 means no maximal duration."""
+ return self.__duration_max
+
+ @duration_max.setter
+ def duration_max(self, duration_max: float):
+
+ self.__duration_max = duration_max
+
+ @property
def duration(self) -> int|float:
"""Sum of all scan steps duration
@@ -739,9 +743,9 @@ class ScanPath(list):
def __check_duration(self):
"""Constrain path duration to maximal duration."""
- if self.duration_max > 0:
+ if self.__duration_max > 0:
- while self.__duration > self.duration_max:
+ while self.__duration > self.__duration_max:
oldest_step = self.pop(0)
@@ -952,34 +956,56 @@ class AOIScanStep():
OutsideAOI = 'GazeFeatures.OutsideAOI'
class AOIScanPath(list):
- """List of aoi scan steps over successive aoi.
-
- Parameters:
- duration_max: duration from which older aoi scan steps are removed each time new aoi scan steps are added. 0 means no maximal duration.
- """
+ """List of aoi scan steps over successive aoi."""
def __init__(self, expected_aoi: list[str] = [], duration_max: int|float = 0):
super().__init__()
- self.duration_max = duration_max
- self.expected_aoi = expected_aoi
-
+ self.__expected_aoi = expected_aoi
+ self.__duration_max = duration_max
self.__duration = 0
- def clear(self):
- """Clear aoi scan steps list, letter sequence and transition matrix."""
+ self.clear()
- super().clear()
+ @property
+ def expected_aoi(self):
+ """List of all expected aoi."""
- self.__movements = TimeStampedGazeMovements()
- self.__current_aoi = ''
- self.__index = ord('A')
- self.__aoi_letter = {}
- self.__letter_aoi = {}
+ return self.__expected_aoi
- size = len(self.__expected_aoi)
- self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi, columns=self.__expected_aoi)
+ @expected_aoi.setter
+ def expected_aoi(self, expected_aoi: list[str] = []):
+ """Edit list of all expected aoi.
+
+ !!! warning
+ This will clear the AOIScanPath
+ """
+
+ # Check expected aoi are not the same than previous ones
+ if len(expected_aoi) == len(self.__expected_aoi[1:]):
+
+ equal = [a == b for a, b in zip(expected_aoi, self.__expected_aoi[1:])]
+
+ if all(equal):
+
+ return
+
+ # Otherwise, update expected aoi
+ self.__expected_aoi = [OutsideAOI]
+ self.__expected_aoi += expected_aoi
+
+ self.clear()
+
+ @property
+ def duration_max(self) -> float:
+ """Duration from which older aoi scan steps are removed each time new aoi scan steps are added. 0 means no maximal duration."""
+ return self.__duration_max
+
+ @duration_max.setter
+ def duration_max(self, duration_max: float):
+
+ self.__duration_max = duration_max
@property
def duration(self) -> float:
@@ -990,9 +1016,9 @@ class AOIScanPath(list):
def __check_duration(self):
"""Constrain path duration to maximal duration."""
- if self.duration_max > 0:
+ if self.__duration_max > 0:
- while self.__duration > self.duration_max:
+ while self.__duration > self.__duration_max:
oldest_step = self.pop(0)
@@ -1004,6 +1030,20 @@ class AOIScanPath(list):
# Decrement [index: source, columns: destination] value
self.__transition_matrix.loc[oldest_step.aoi, self[0].aoi,] -= 1
+ def clear(self):
+ """Clear aoi scan steps list, letter sequence and transition matrix."""
+
+ super().clear()
+
+ self.__movements = TimeStampedGazeMovements()
+ self.__current_aoi = ''
+ self.__index = ord('A')
+ self.__aoi_letter = {}
+ self.__letter_aoi = {}
+
+ size = len(self.__expected_aoi)
+ self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi, columns=self.__expected_aoi)
+
def __get_aoi_letter(self, aoi):
try :
@@ -1031,25 +1071,6 @@ class AOIScanPath(list):
sequence += step.letter
return sequence
-
- @property
- def expected_aoi(self):
- """List of all expected aoi."""
-
- return self.__expected_aoi
-
- @expected_aoi.setter
- def expected_aoi(self, expected_aoi: list[str] = []):
- """Edit list of all expected aoi.
-
- !!! warning
- This will clear the AOIScanPath
- """
-
- self.__expected_aoi = [OutsideAOI]
- self.__expected_aoi += expected_aoi
-
- self.clear()
@property
def current_aoi(self):