From 2c7562e6d64dcd34529f105bfd4d007f2d1addf9 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 23 Aug 2023 11:31:28 +0200 Subject: Externalizing aoi matching algorithm as a new configurable pipeline step. --- src/argaze/ArFeatures.py | 171 ++++++--------------- src/argaze/GazeAnalysis/DeviationCircleCoverage.py | 100 ++++++++++++ src/argaze/GazeFeatures.py | 15 ++ .../demo_environment/demo_ar_features_setup.json | 4 +- .../demo_environment/demo_gaze_features_setup.json | 5 + 5 files changed, 173 insertions(+), 122 deletions(-) create mode 100644 src/argaze/GazeAnalysis/DeviationCircleCoverage.py diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 5fa5cde..4811a0b 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -74,7 +74,7 @@ class ArLayer(): name: name of the layer aoi_color: color to used in draw method aoi_scene: AOI scene description - looked_aoi_covering_threshold: + aoi_matcher: AOI matcher object aoi_scan_path: AOI scan path object aoi_scan_path_analyzers: dictionary of AOI scan path analyzers """ @@ -82,7 +82,7 @@ class ArLayer(): name: str aoi_color: tuple = field(default=(0, 0, 0)) aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene) - looked_aoi_covering_threshold: int = field(default=0) + aoi_matcher: GazeFeatures.AOIMatcher = field(default_factory=GazeFeatures.AOIMatcher) aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath) aoi_scan_path_analyzers: dict = field(default_factory=dict) @@ -94,9 +94,6 @@ class ArLayer(): # Init current gaze movement self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() - # Init looked aoi data - self.__init_looked_aoi_data() - # Init lock to share looking data with multiples threads self.__look_lock = threading.Lock() @@ -176,14 +173,23 @@ class ArLayer(): # Add AOI 2D Scene by default new_aoi_scene = AOI2DScene.AOI2DScene() - # Looked aoi validity threshold + # Load aoi matcher try: - looked_aoi_covering_threshold = layer_data.pop('looked_aoi_covering_threshold') + aoi_matcher_value = layer_data.pop('aoi_matcher') + + aoi_matcher_module_path, aoi_matcher_parameters = aoi_matcher_value.popitem() + + # Prepend argaze.GazeAnalysis path when a single name is provided + if len(aoi_matcher_module_path.split('.')) == 1: + aoi_matcher_module_path = f'argaze.GazeAnalysis.{aoi_matcher_module_path}' + + aoi_matcher_module = importlib.import_module(aoi_matcher_module_path) + new_aoi_matcher = aoi_matcher_module.AOIMatcher(**aoi_matcher_parameters) except KeyError: - looked_aoi_covering_threshold = 0 + new_aoi_matcher = None # Edit expected AOI list by removing AOI with name equals to layer name expected_aois = list(new_aoi_scene.keys()) @@ -260,7 +266,7 @@ class ArLayer(): return ArLayer(new_layer_name, \ new_aoi_color, \ new_aoi_scene, \ - looked_aoi_covering_threshold, \ + new_aoi_matcher, \ new_aoi_scan_path, \ new_aoi_scan_path_analyzers \ ) @@ -293,70 +299,6 @@ class ArLayer(): self.__parent = parent - @property - def looked_aoi(self) -> str: - """Get most likely looked aoi name for current fixation (e.g. the aoi with the highest covering mean value)""" - - return self.__looked_aoi - - @property - def looked_aoi_covering_mean(self) -> float: - """Get looked aoi covering mean for current fixation. - It represents the ratio of fixation deviation circle surface that used to cover the looked aoi.""" - - return self.__looked_aoi_covering_mean - - @property - def looked_aoi_covering(self) -> dict: - """Get all looked aois covering for current fixation.""" - - return self.__looked_aoi_covering - - def __init_looked_aoi_data(self): - """Init looked aoi data.""" - - self.__look_count = 0 - self.__looked_aoi = None - self.__looked_aoi_covering_mean = 0 - self.__looked_aoi_covering = {} - - def __update_looked_aoi_data(self, fixation) -> str: - """Update looked aoi data.""" - - self.__look_count += 1 - - max_covering = 0. - most_likely_looked_aoi = None - - for name, aoi in self.aoi_scene.items(): - - _, _, circle_ratio = aoi.circle_intersection(fixation.focus, fixation.deviation_max) - - if name != self.name and circle_ratio > 0: - - # Sum circle ratio to update aoi covering - try: - - self.__looked_aoi_covering[name] += circle_ratio - - except KeyError: - - self.__looked_aoi_covering[name] = circle_ratio - - # Update most likely looked aoi - if self.__looked_aoi_covering[name] > max_covering: - - most_likely_looked_aoi = name - max_covering = self.__looked_aoi_covering[name] - - # Update looked aoi - self.__looked_aoi = most_likely_looked_aoi - - # Update looked aoi covering mean - self.__looked_aoi_covering_mean = int(100 * max_covering / self.__look_count) / 100 - - return self.__looked_aoi - def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> dict: """ Project timestamped gaze movement into layer. @@ -387,7 +329,7 @@ class ArLayer(): # Assess pipeline execution times execution_times = { - 'aoi_fixation_matcher': None, + 'aoi_matcher': None, 'aoi_scan_step_analyzers': {} } @@ -396,65 +338,54 @@ class ArLayer(): try: - # Valid and finished gaze movement has been identified - if gaze_movement.valid and gaze_movement.finished: + # Check gaze movement validity + if gaze_movement.valid: - if GazeFeatures.is_fixation(gaze_movement): + if self.aoi_matcher: # Store aoi matching start date matching_start = time.perf_counter() - # Does the finished fixation match an aoi? - looked_aoi = self.__update_looked_aoi_data(gaze_movement) + # Update looked aoi thanks to aoi matcher + # Note: don't filter finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally + looked_aoi = self.aoi_matcher.match(self.aoi_scene, gaze_movement, exclude=[self.name]) # Assess aoi matching time in ms - execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3 - - # Append fixation to aoi scan path - if self.aoi_scan_path != None and self.looked_aoi != None and self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold: - - aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, self.looked_aoi) + execution_times['aoi_matcher'] = (time.perf_counter() - matching_start) * 1e3 - # Is there a new step? - if aoi_scan_step and len(self.aoi_scan_path) > 1: + # Finished gaze movement has been identified + if gaze_movement.finished: - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): + if GazeFeatures.is_fixation(gaze_movement): - # Store aoi scan path analysis start date - aoi_scan_path_analysis_start = time.perf_counter() + # Append fixation to aoi scan path + if self.aoi_scan_path != None and looked_aoi != None: - # Analyze aoi scan path - aoi_scan_path_analyzer.analyze(self.aoi_scan_path) + aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, looked_aoi) - # Assess aoi scan step analysis time in ms - execution_times['aoi_scan_step_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_path_analysis_start) * 1e3 + # Is there a new step? + if aoi_scan_step and len(self.aoi_scan_path) > 1: - # Store analysis - aoi_scan_path_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): - elif GazeFeatures.is_saccade(gaze_movement): + # Store aoi scan path analysis start date + aoi_scan_path_analysis_start = time.perf_counter() - # Reset looked aoi - self.__init_looked_aoi_data() + # Analyze aoi scan path + aoi_scan_path_analyzer.analyze(self.aoi_scan_path) - # Append saccade to aoi scan path - if self.aoi_scan_path != None: + # Assess aoi scan step analysis time in ms + execution_times['aoi_scan_step_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_path_analysis_start) * 1e3 - self.aoi_scan_path.append_saccade(timestamp, gaze_movement) + # Store analysis + aoi_scan_path_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis - # Valid in progress fixation - elif gaze_movement.valid and not gaze_movement.finished: + elif GazeFeatures.is_saccade(gaze_movement): - if GazeFeatures.is_fixation(gaze_movement): + # Append saccade to aoi scan path + if self.aoi_scan_path != None: - # Store aoi matching start date - matching_start = time.perf_counter() - - # Does the finished fixation match an aoi? - looked_aoi = self.__update_looked_aoi_data(gaze_movement) - - # Assess aoi matching time in ms - execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3 + self.aoi_scan_path.append_saccade(timestamp, gaze_movement) except Exception as e: @@ -470,9 +401,9 @@ class ArLayer(): # Sum all execution times total_execution_time = 0 - if execution_times['aoi_fixation_matcher']: + if execution_times['aoi_matcher']: - total_execution_time += execution_times['aoi_fixation_matcher'] + total_execution_time += execution_times['aoi_matcher'] for _, aoi_scan_path_analysis_time in execution_times['aoi_scan_step_analyzers'].items(): @@ -511,9 +442,9 @@ class ArLayer(): self.__gaze_movement.draw_positions(image) # Draw looked aoi - if self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold: + if self.aoi_matcher.looked_aoi != None: - self.aoi_scene.draw_circlecast(image, self.__gaze_movement.focus, self.__gaze_movement.deviation_max, matching_aoi = [self.__looked_aoi], base_color=(0, 0, 0), matching_color=(255, 255, 255)) + self.aoi_scene.draw_circlecast(image, self.__gaze_movement.focus, self.__gaze_movement.deviation_max, matching_aoi = [self.aoi_matcher.looked_aoi], base_color=(0, 0, 0), matching_color=(255, 255, 255)) elif GazeFeatures.is_saccade(self.__gaze_movement): @@ -637,7 +568,7 @@ class ArFrame(): except KeyError: - filter_in_progress_fixation = False + filter_in_progress_fixation = True # Load scan path try: @@ -729,7 +660,7 @@ class ArFrame(): # Create layer new_layer = ArLayer.from_dict(layer_data, working_directory) - # Setup layer + # Project 3D aoi scene layer to get only 2D aoi scene if new_layer.aoi_scene.dimension == 3: new_layer.aoi_scene = new_layer.aoi_scene.orthogonal_projection * new_frame_size @@ -1674,7 +1605,7 @@ class ArEnvironment(): aoi_2d = camera_layer.aoi_scene[frame.name] - # TODO: Add option to use gaze precision circle + # TODO?: Should we prefer to use camera frame AOIMatcher object? if aoi_2d.contains_point(gaze_position.value): inner_x, inner_y = aoi_2d.clockwise().inner_axis(gaze_position.value) diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py new file mode 100644 index 0000000..8e7acef --- /dev/null +++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python + +"""Matching algorithm based on fixation's deviation circle coverage over AOI +""" + +__author__ = "Théo de la Hogue" +__credits__ = [] +__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" +__license__ = "BSD" + +from typing import TypeVar, Tuple +from dataclasses import dataclass, field +import math + +from argaze import GazeFeatures + +import numpy +import cv2 + +GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement") +# Type definition for type annotation convenience + +@dataclass +class AOIMatcher(GazeFeatures.AOIMatcher): + + coverage_threshold: int|float + """ """ + + def __post_init__(self): + """Init looked aoi data.""" + + self.__look_count = 0 + self.__looked_aoi = None + self.__looked_aoi_coverage_mean = 0 + self.__looked_aoi_coverage = {} + + def match(self, aoi_scene, gaze_movement, exclude=[]) -> str: + """Returns AOI with the maximal fixation's deviation circle coverage if above coverage threshold.""" + + if GazeFeatures.is_fixation(gaze_movement): + + self.__look_count += 1 + + max_coverage = 0. + most_likely_looked_aoi = None + + for name, aoi in aoi_scene.items(): + + _, _, circle_ratio = aoi.circle_intersection(gaze_movement.focus, gaze_movement.deviation_max) + + if name not in exclude and circle_ratio > 0: + + # Sum circle ratio to update aoi coverage + try: + + self.__looked_aoi_coverage[name] += circle_ratio + + except KeyError: + + self.__looked_aoi_coverage[name] = circle_ratio + + # Update most likely looked aoi + if self.__looked_aoi_coverage[name] > max_coverage: + + most_likely_looked_aoi = name + max_coverage = self.__looked_aoi_coverage[name] + + # Update looked aoi + self.__looked_aoi = most_likely_looked_aoi + + # Update looked aoi coverage mean + self.__looked_aoi_coverage_mean = int(100 * max_coverage / self.__look_count) / 100 + + # Return + if self.looked_aoi_coverage_mean > self.coverage_threshold: + + return self.__looked_aoi + + elif GazeFeatures.is_saccade(gaze_movement): + + self.__post_init__() + + @property + def looked_aoi(self) -> str: + """Get most likely looked aoi name for current fixation (e.g. the aoi with the highest coverage mean value)""" + + return self.__looked_aoi + + @property + def looked_aoi_coverage_mean(self) -> float: + """Get looked aoi coverage mean for current fixation. + It represents the ratio of fixation deviation circle surface that used to cover the looked aoi.""" + + return self.__looked_aoi_coverage_mean + + @property + def looked_aoi_coverage(self) -> dict: + """Get all looked aois coverage for current fixation.""" + + return self.__looked_aoi_coverage \ No newline at end of file diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index bb5f991..33fd562 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -15,6 +15,7 @@ import json from inspect import getmembers from argaze import DataStructures +from argaze.AreaOfInterest import AOIFeatures import numpy import pandas @@ -700,6 +701,20 @@ class ScanPathAnalyzer(): raise NotImplementedError('analyze() method not implemented') +class AOIMatcher(): + """Abstract class to define what should provide an AOI matcher algorithm.""" + + def match(self, aoi_scene: AOIFeatures.AOIScene, gaze_movement: GazeMovement, exclude=[]) -> str: + """Which AOI is looked in the scene?""" + + raise NotImplementedError('match() method not implemented') + + @property + def looked_aoi(self) -> str: + """Get most likely looked aoi name.""" + + raise NotImplementedError('looked_aoi getter not implemented') + AOIScanStepType = TypeVar('AOIScanStep', bound="AOIScanStep") # Type definition for type annotation convenience diff --git a/src/argaze/utils/demo_environment/demo_ar_features_setup.json b/src/argaze/utils/demo_environment/demo_ar_features_setup.json index 4e16978..39ace44 100644 --- a/src/argaze/utils/demo_environment/demo_ar_features_setup.json +++ b/src/argaze/utils/demo_environment/demo_ar_features_setup.json @@ -1,5 +1,5 @@ { - "name": "AR Environment Demo", + "name": "ArEnvironment Demo", "aruco_detector": { "dictionary": { "name": "DICT_APRILTAG_16h5" @@ -18,7 +18,7 @@ } }, "scenes": { - "AR Scene Demo" : { + "ArScene Demo" : { "aruco_scene": "aruco_scene.obj", "layers": { "_" : { diff --git a/src/argaze/utils/demo_environment/demo_gaze_features_setup.json b/src/argaze/utils/demo_environment/demo_gaze_features_setup.json index 90ae30f..6e43895 100644 --- a/src/argaze/utils/demo_environment/demo_gaze_features_setup.json +++ b/src/argaze/utils/demo_environment/demo_gaze_features_setup.json @@ -28,6 +28,11 @@ "GrayRectangle": { "aoi_color": [0, 0, 0], "aoi_scene": "aoi_3d_scene.obj", + "aoi_matcher": { + "DeviationCircleCoverage": { + "coverage_threshold": 0.5 + } + }, "aoi_scan_path": { "duration_max": 10000 }, -- cgit v1.1