aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThéo de la Hogue2023-08-10 14:54:04 +0200
committerThéo de la Hogue2023-08-10 14:54:04 +0200
commit6a47f96e33bcbe8ee3caf58767ae1863422676fd (patch)
treecec51478f6e445aeb22fcdda76fc094609514a76 /src
parent293d1cc9b0fe6d7e871511cd716001f5765d9118 (diff)
downloadargaze-6a47f96e33bcbe8ee3caf58767ae1863422676fd.zip
argaze-6a47f96e33bcbe8ee3caf58767ae1863422676fd.tar.gz
argaze-6a47f96e33bcbe8ee3caf58767ae1863422676fd.tar.bz2
argaze-6a47f96e33bcbe8ee3caf58767ae1863422676fd.tar.xz
Working on a new architecture based on new ArLayer class.
Diffstat (limited to 'src')
-rw-r--r--src/argaze/ArFeatures.py1084
-rw-r--r--src/argaze/AreaOfInterest/AOI2DScene.py12
-rw-r--r--src/argaze/AreaOfInterest/AOIFeatures.py36
-rw-r--r--src/argaze/utils/demo_ar_features_run.py12
-rw-r--r--src/argaze/utils/demo_environment/demo_ar_features_setup.json8
-rw-r--r--src/argaze/utils/demo_environment/demo_gaze_features_setup.json86
-rw-r--r--src/argaze/utils/demo_gaze_features_run.py68
7 files changed, 824 insertions, 482 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 0b53034..0022c80 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -24,13 +24,16 @@ from argaze.GazeAnalysis import *
import numpy
import cv2
-ArEnvironmentType = TypeVar('ArEnvironment', bound="ArEnvironment")
+ArLayerType = TypeVar('ArLayer', bound="ArLayer")
+# Type definition for type annotation convenience
+
+ArFrameType = TypeVar('ArFrame', bound="ArFrame")
# Type definition for type annotation convenience
ArSceneType = TypeVar('ArScene', bound="ArScene")
# Type definition for type annotation convenience
-ArFrameType = TypeVar('ArFrame', bound="ArFrame")
+ArEnvironmentType = TypeVar('ArEnvironment', bound="ArEnvironment")
# Type definition for type annotation convenience
class PoseEstimationFailed(Exception):
@@ -53,9 +56,9 @@ class SceneProjectionFailed(Exception):
super().__init__(message)
-class JSONLoadingFailed(Exception):
+class LoadingFailed(Exception):
"""
- Exception raised when JSON loading fails.
+ Exception raised when attributes loading fails.
"""
def __init__(self, message):
@@ -63,50 +66,497 @@ class JSONLoadingFailed(Exception):
super().__init__(message)
@dataclass
+class ArLayer():
+ """
+ Defines a space where to make matching of gaze movements and AOIs and inside which those matchings need to be analyzed.
+
+ Parameters:
+ name: name of the layer
+ aoi_scene: AOI scene description
+ looked_aoi_covering_threshold:
+ aoi_scan_path: AOI scan path object
+ aoi_scan_path_analyzers: dictionary of AOI scan path analyzers
+ """
+
+ name: str
+ aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene)
+ looked_aoi_covering_threshold: int = field(default=0)
+ aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath)
+ aoi_scan_path_analyzers: dict = field(default_factory=dict)
+
+ def __post_init__(self):
+
+ # Define parent attribute: it will be setup by parent later
+ self.__parent = None
+
+ # Init current gaze movement
+ self.__gaze_movement = GazeFeatures.UnvalidGazeMovement()
+
+ # Init looked aoi data
+ self.__init_looked_aoi_data()
+
+ # Init lock to share looking data with multiples threads
+ self.__look_lock = threading.Lock()
+
+ # Cast aoi scene to its effective dimension
+ if self.aoi_scene.dimension == 2:
+
+ self.aoi_scene = AOI2DScene.AOI2DScene(self.aoi_scene)
+
+ elif self.aoi_scene.dimension == 3:
+
+ self.aoi_scene = AOI3DScene.AOI3DScene(self.aoi_scene)
+
+ @classmethod
+ def from_dict(self, layer_data, working_directory: str = None) -> ArLayerType:
+ """Load attributes from dictionary.
+
+ Parameters:
+ layer_data: dictionary with attributes to load
+ working_directory: folder path where to load files when a dictionary value is a relative filepath.
+ """
+
+ # Load name
+ try:
+
+ new_layer_name = layer_data.pop('name')
+
+ except KeyError:
+
+ new_layer_name = None
+
+ # Load optional aoi filter
+ try:
+
+ aoi_exclude_list = layer_data.pop('aoi_exclude')
+
+ except KeyError:
+
+ aoi_exclude_list = []
+
+ # Load aoi scene
+ try:
+
+ new_aoi_scene_value = layer_data.pop('aoi_scene')
+
+ # str: relative path to file
+ if type(new_aoi_scene_value) == str:
+
+ filepath = os.path.join(working_directory, new_aoi_scene_value)
+ file_format = filepath.split('.')[-1]
+
+ # JSON file format for 2D or 3D dimension
+ if file_format == 'json':
+
+ new_aoi_scene = AOIFeatures.AOIScene.from_json(filepath).copy(exclude=aoi_exclude_list)
+
+ # OBJ file format for 3D dimension only
+ elif file_format == 'obj':
+
+ new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath).copy(exclude=aoi_exclude_list)
+
+ # dict:
+ else:
+
+ new_aoi_scene = AOIFeatures.AOIScene.from_dict(new_aoi_scene_value)
+
+ except KeyError:
+
+ new_aoi_scene = AOIFeatures.AOIScene()
+
+ # Looked aoi validity threshold
+ try:
+
+ looked_aoi_covering_threshold = layer_data.pop('looked_aoi_covering_threshold')
+
+ except KeyError:
+
+ looked_aoi_covering_threshold = 0
+
+ # Edit expected AOI list by removing AOI with name equals to layer name
+ expected_aois = list(new_aoi_scene.keys())
+ expected_aois.remove(new_layer_name)
+
+ # Load AOI scan path
+ try:
+
+ new_aoi_scan_path_data = layer_data.pop('aoi_scan_path')
+ new_aoi_scan_path_data['expected_aois'] = expected_aois
+ new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data)
+
+ except KeyError:
+
+ new_aoi_scan_path_data = {}
+ new_aoi_scan_path_data['expected_aois'] = expected_aois
+ new_aoi_scan_path = None
+
+ # Load AOI scan path analyzers
+ new_aoi_scan_path_analyzers = {}
+
+ try:
+
+ new_aoi_scan_path_analyzers_value = layer_data.pop('aoi_scan_path_analyzers')
+
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items():
+
+ # Prepend argaze.GazeAnalysis path when a single name is provided
+ if len(aoi_scan_path_analyzer_module_path.split('.')) == 1:
+ aoi_scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_module_path}'
+
+ aoi_scan_path_analyzer_module = importlib.import_module(aoi_scan_path_analyzer_module_path)
+
+ # Check aoi scan path analyzer parameters type
+ members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer)
+
+ for member in members:
+
+ if '__annotations__' in member:
+
+ for parameter, parameter_type in member[1].items():
+
+ # Check if parameter is part of argaze.GazeAnalysis module
+ parameter_module_path = parameter_type.__module__.split('.')
+
+ # Check if parameter is part of a package
+ if len(parameter_type.__module__.split('.')) > 1:
+
+ # Try get existing analyzer instance to append as parameter
+ try:
+
+ aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_type.__module__]
+
+ except KeyError:
+
+ raise LoadingFailed(f'{aoi_scan_path_analyzer_module_path} aoi scan path analyzer loading fails because {parameter_type.__module__} aoi scan path analyzer is missing.')
+
+ aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters)
+
+ new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer
+
+ # Force AOI scan path creation
+ if len(new_aoi_scan_path_analyzers) > 0 and new_aoi_scan_path == None:
+
+ new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data)
+
+ except KeyError:
+
+ pass
+
+ # Create layer
+ return ArLayer(new_layer_name, \
+ new_aoi_scene, \
+ looked_aoi_covering_threshold, \
+ new_aoi_scan_path, \
+ new_aoi_scan_path_analyzers \
+ )
+
+ @classmethod
+ def from_json(self, json_filepath: str) -> ArLayerType:
+ """
+ Load attributes from .json file.
+
+ Parameters:
+ json_filepath: path to json file
+ """
+
+ with open(json_filepath) as configuration_file:
+
+ layer_data = json.load(configuration_file)
+ working_directory = os.path.dirname(json_filepath)
+
+ return ArLayer.from_dict(layer_data, working_directory)
+
+ @property
+ def parent(self):
+ """Get parent instance"""
+
+ return self.__parent
+
+ @parent.setter
+ def parent(self, parent):
+ """Get parent instance"""
+
+ self.__parent = parent
+
+ @property
+ def looked_aoi(self) -> str:
+ """Get most likely looked aoi name for current fixation (e.g. the aoi with the highest covering mean value)"""
+
+ return self.__looked_aoi
+
+ @property
+ def looked_aoi_covering_mean(self) -> float:
+ """Get looked aoi covering mean for current fixation.
+ It represents the ratio of fixation deviation circle surface that used to cover the looked aoi."""
+
+ return self.__looked_aoi_covering_mean
+
+ @property
+ def looked_aoi_covering(self) -> dict:
+ """Get all looked aois covering for current fixation."""
+
+ return self.__looked_aoi_covering
+
+ def __init_looked_aoi_data(self):
+ """Init looked aoi data."""
+
+ self.__look_count = 0
+ self.__looked_aoi = None
+ self.__looked_aoi_covering_mean = 0
+ self.__looked_aoi_covering = {}
+
+ def __update_looked_aoi_data(self, fixation) -> str:
+ """Update looked aoi data."""
+
+ self.__look_count += 1
+
+ max_covering = 0.
+ most_likely_looked_aoi = None
+
+ for name, aoi in self.aoi_scene.items():
+
+ _, _, circle_ratio = aoi.circle_intersection(fixation.focus, fixation.deviation_max)
+
+ if name != self.name and circle_ratio > 0:
+
+ # Sum circle ratio to update aoi covering
+ try:
+
+ self.__looked_aoi_covering[name] += circle_ratio
+
+ except KeyError:
+
+ self.__looked_aoi_covering[name] = circle_ratio
+
+ # Update most likely looked aoi
+ if self.__looked_aoi_covering[name] > max_covering:
+
+ most_likely_looked_aoi = name
+ max_covering = self.__looked_aoi_covering[name]
+
+ # Update looked aoi
+ self.__looked_aoi = most_likely_looked_aoi
+
+ # Update looked aoi covering mean
+ self.__looked_aoi_covering_mean = int(100 * max_covering / self.__look_count) / 100
+
+ return self.__looked_aoi
+
+ def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> dict:
+ """
+ Project timestamped gaze movement into layer.
+
+ !!! warning
+ Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute.
+
+ Parameters:
+ gaze_movement: gaze movement to project
+
+ Returns:
+ looked_aoi: most likely looked aoi name
+ aoi_scan_path_analysis: aoi scan path analysis at each new scan step if aoi_scan_path is instanciated
+ exception: error catched during gaze movement processing
+ """
+
+ # Lock layer exploitation
+ self.__look_lock.acquire()
+
+ # Update current gaze movement
+ self.__gaze_movement = gaze_movement
+
+ # Init looked aoi
+ looked_aoi = None
+
+ # Init aoi scan path analysis report
+ aoi_scan_path_analysis = {}
+
+ # Assess pipeline execution times
+ execution_times = {
+ 'aoi_fixation_matcher': None,
+ 'aoi_scan_step_analyzers': {}
+ }
+
+ # Catch any error
+ exception = None
+
+ try:
+
+ # Valid and finished gaze movement has been identified
+ if gaze_movement.valid and gaze_movement.finished:
+
+ if GazeFeatures.is_fixation(gaze_movement):
+
+ # Store aoi matching start date
+ matching_start = time.perf_counter()
+
+ # Does the finished fixation match an aoi?
+ looked_aoi = self.__update_looked_aoi_data(gaze_movement)
+
+ # Assess aoi matching time in ms
+ execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3
+
+ # Append fixation to aoi scan path
+ if self.aoi_scan_path != None and self.looked_aoi != None and self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold:
+
+ aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, self.looked_aoi)
+
+ # Is there a new step?
+ if aoi_scan_step and len(self.aoi_scan_path) > 1:
+
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
+
+ # Store aoi scan path analysis start date
+ aoi_scan_path_analysis_start = time.perf_counter()
+
+ # Analyze aoi scan path
+ aoi_scan_path_analyzer.analyze(self.aoi_scan_path)
+
+ # Assess aoi scan step analysis time in ms
+ execution_times['aoi_scan_step_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_path_analysis_start) * 1e3
+
+ # Store analysis
+ aoi_scan_path_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis
+
+ elif GazeFeatures.is_saccade(gaze_movement):
+
+ # Reset looked aoi
+ self.__init_looked_aoi_data()
+
+ # Append saccade to aoi scan path
+ if self.aoi_scan_path != None:
+
+ self.aoi_scan_path.append_saccade(timestamp, gaze_movement)
+
+ # Valid in progress fixation
+ elif gaze_movement.valid and not gaze_movement.finished:
+
+ if GazeFeatures.is_fixation(gaze_movement):
+
+ # Store aoi matching start date
+ matching_start = time.perf_counter()
+
+ # Does the finished fixation match an aoi?
+ looked_aoi = self.__update_looked_aoi_data(gaze_movement)
+
+ # Assess aoi matching time in ms
+ execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3
+
+ except Exception as e:
+
+ print('Warning: the following error occurs in ArLayer.look method:', e)
+
+ looked_aoi = None
+ aoi_scan_path_analysis = {}
+ exception = e
+
+ # Unlock layer exploitation
+ self.__look_lock.release()
+
+ # Sum all execution times
+ total_execution_time = 0
+
+ if execution_times['aoi_fixation_matcher']:
+
+ total_execution_time += execution_times['aoi_fixation_matcher']
+
+ for _, aoi_scan_path_analysis_time in execution_times['aoi_scan_step_analyzers'].items():
+
+ total_execution_time += aoi_scan_path_analysis_time
+
+ execution_times['total'] = total_execution_time
+
+ # Return look data
+ return looked_aoi, aoi_scan_path_analysis, execution_times, exception
+
+ def draw(self, image:numpy.array, aoi_color=(0, 0, 0)) -> Exception:
+ """
+ Draw layer into image.
+
+ Parameters:
+ image: where to draw
+ """
+
+ # Lock frame exploitation
+ self.__look_lock.acquire()
+
+ # Catch any drawing error
+ exception = None
+
+ try:
+
+ # Draw aoi
+ self.aoi_scene.draw(image, color=aoi_color)
+
+ # Draw current gaze movement
+ if self.__gaze_movement.valid:
+
+ if GazeFeatures.is_fixation(self.__gaze_movement):
+
+ self.__gaze_movement.draw(image, color=(0, 255, 255))
+ self.__gaze_movement.draw_positions(image)
+
+ # Draw looked aoi
+ if self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold:
+
+ self.aoi_scene.draw_circlecast(image, self.__gaze_movement.focus, self.__gaze_movement.deviation_max, matching_aoi = [self.__looked_aoi], base_color=(0, 0, 0), matching_color=(255, 255, 255))
+
+ elif GazeFeatures.is_saccade(self.__gaze_movement):
+
+ self.__gaze_movement.draw(image, color=(0, 255, 255))
+ self.__gaze_movement.draw_positions(image)
+
+ except Exception as e:
+
+ # Store error to return it
+ exception = e
+
+ # Unlock frame exploitation
+ self.__look_lock.release()
+
+ # Return drawing error
+ return exception
+
+@dataclass
class ArFrame():
"""
- Defines rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed.
+ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed.
Parameters:
name: name of the frame
size: defines the dimension of the rectangular area where gaze positions are projected.
- aoi_2d_scene: AOI 2D scene description
background: image to draw behind
gaze_movement_identifier: gaze movement identification algorithm
- current_fixation_matching: enable AOI fixation matching even for in progress fixation
- looked_aoi_covering_threshold:
+ filter_in_progress_fixation: ignore in progress fixation
scan_path: scan path object
- scan_path_analyzers: dictionary of scan path analysis to apply on scan path
- aoi_scan_path: AOI scan path object
- aoi_scan_path_analyzers: dictionary of scan path analysis to apply on AOI scan path
+ scan_path_analyzers: dictionary of scan path analyzers
heatmap: heatmap object
+ aoi_layers: dictionary of AOI layers
"""
name: str
size: tuple[int] = field(default=(1, 1))
- aoi_2d_scene: AOI2DScene.AOI2DScene = field(default_factory=AOI2DScene.AOI2DScene)
background: numpy.array = field(default_factory=numpy.array)
gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier)
- current_fixation_matching: bool = field(default=False)
- looked_aoi_covering_threshold: int = field(default=0)
+ filter_in_progress_fixation: bool = field(default=True)
scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath)
scan_path_analyzers: dict = field(default_factory=dict)
- aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath)
- aoi_scan_path_analyzers: dict = field(default_factory=dict)
heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap)
+ layers: dict = field(default_factory=dict)
def __post_init__(self):
# Define parent attribute: it will be setup by parent later
self.__parent = None
+ # Setup layers parent attribute
+ for name, layer in self.layers.items():
+
+ layer.parent = self
+
# Init current gaze position
self.__gaze_position = GazeFeatures.UnvalidGazePosition()
- # Init looked aoi data
- self.__init_looked_aoi_data()
-
- # Init lock to share looked data wit hmultiples threads
+ # Init lock to share looked data with multiples threads
self.__look_lock = threading.Lock()
@classmethod
@@ -136,26 +586,6 @@ class ArFrame():
new_frame_size = (0, 0)
- # Load aoi 2D scene
- try:
-
- new_aoi_2d_scene_value = frame_data.pop('aoi_2d_scene')
-
- # str: relative path to .json file
- if type(new_aoi_2d_scene_value) == str:
-
- json_filepath = os.path.join(working_directory, new_aoi_2d_scene_value)
- new_aoi_2d_scene = AOI2DScene.AOI2DScene.from_json(obj_filepath)
-
- # dict:
- else:
-
- new_aoi_2d_scene = AOI2DScene.AOI2DScene(new_aoi_2d_scene_value)
-
- except KeyError:
-
- new_aoi_2d_scene = AOI2DScene.AOI2DScene()
-
# Load background image
try:
@@ -188,20 +618,11 @@ class ArFrame():
# Current fixation matching
try:
- current_fixation_matching = frame_data.pop('current_fixation_matching')
-
- except KeyError:
-
- current_fixation_matching = False
-
- # Looked aoi validity threshold
- try:
-
- looked_aoi_covering_threshold = frame_data.pop('looked_aoi_covering_threshold')
+ filter_in_progress_fixation = frame_data.pop('filter_in_progress_fixation')
except KeyError:
- looked_aoi_covering_threshold = 0
+ filter_in_progress_fixation = False
# Load scan path
try:
@@ -248,7 +669,7 @@ class ArFrame():
except KeyError:
- raise JSONLoadingFailed(f'{scan_path_analyzer_module_path} scan path analyzer loading fails because {parameter_type.__module__} scan path analyzer is missing.')
+ raise LoadingFailed(f'{scan_path_analyzer_module_path} scan path analyzer loading fails because {parameter_type.__module__} scan path analyzer is missing.')
scan_path_analyzer = scan_path_analyzer_module.ScanPathAnalyzer(**scan_path_analyzer_parameters)
@@ -263,105 +684,62 @@ class ArFrame():
pass
- # Load AOI scan path
- try:
-
- new_aoi_scan_path_data = frame_data.pop('aoi_scan_path')
- new_aoi_scan_path_data['expected_aois'] = list(new_aoi_2d_scene.keys())
- new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data)
-
- except KeyError:
-
- new_aoi_scan_path_data = {}
- new_aoi_scan_path_data['expected_aois'] = list(new_aoi_2d_scene.keys())
- new_aoi_scan_path = None
-
- # Load AOI scan path analyzers
- new_aoi_scan_path_analyzers = {}
-
+ # Load heatmap
try:
- new_aoi_scan_path_analyzers_value = frame_data.pop('aoi_scan_path_analyzers')
-
- for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer_parameters in new_aoi_scan_path_analyzers_value.items():
-
- # Prepend argaze.GazeAnalysis path when a single name is provided
- if len(aoi_scan_path_analyzer_module_path.split('.')) == 1:
- aoi_scan_path_analyzer_module_path = f'argaze.GazeAnalysis.{aoi_scan_path_analyzer_module_path}'
-
- aoi_scan_path_analyzer_module = importlib.import_module(aoi_scan_path_analyzer_module_path)
-
- # Check aoi scan path analyzer parameters type
- members = getmembers(aoi_scan_path_analyzer_module.AOIScanPathAnalyzer)
+ new_heatmap_data = frame_data.pop('heatmap')
- for member in members:
+ # Default heatmap size equals frame size
+ if 'size' not in new_heatmap_data.keys():
- if '__annotations__' in member:
+ new_heatmap_data['size'] = new_frame_size
- for parameter, parameter_type in member[1].items():
+ new_heatmap = AOIFeatures.Heatmap(**new_heatmap_data)
- # Check if parameter is part of argaze.GazeAnalysis module
- parameter_module_path = parameter_type.__module__.split('.')
+ except KeyError:
- # Check if parameter is part of a package
- if len(parameter_type.__module__.split('.')) > 1:
+ new_heatmap_data = {}
+ new_heatmap = None
- # Try get existing analyzer instance to append as parameter
- try:
+ # Load layers
+ new_layers = {}
- aoi_scan_path_analyzer_parameters[parameter] = new_aoi_scan_path_analyzers[parameter_type.__module__]
+ try:
- except KeyError:
+ for layer_name, layer_data in frame_data.pop('layers').items():
- raise JSONLoadingFailed(f'{aoi_scan_path_analyzer_module_path} aoi scan path analyzer loading fails because {parameter_type.__module__} aoi scan path analyzer is missing.')
+ # Append name
+ layer_data['name'] = layer_name
- aoi_scan_path_analyzer = aoi_scan_path_analyzer_module.AOIScanPathAnalyzer(**aoi_scan_path_analyzer_parameters)
+ # Create layer
+ new_layer = ArLayer.from_dict(layer_data, working_directory)
- new_aoi_scan_path_analyzers[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer
+ # Setup layer
+ if new_layer.aoi_scene.dimension == 3:
- # Force AOI scan path creation
- if len(new_aoi_scan_path_analyzers) > 0 and new_aoi_scan_path == None:
+ new_layer.aoi_scene = new_layer.aoi_scene.orthogonal_projection * new_frame_size
- new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data)
+ # Append new layer
+ new_layers[layer_name] = new_layer
except KeyError:
pass
- # Load heatmap
- try:
-
- new_heatmap_data = frame_data.pop('heatmap')
-
- # Default heatmap size equals frame size
- if 'size' not in new_heatmap_data.keys():
-
- new_heatmap_data['size'] = new_frame_size
-
- new_heatmap = AOIFeatures.Heatmap(**new_heatmap_data)
-
- except KeyError:
-
- new_heatmap_data = {}
- new_heatmap = None
-
# Create frame
return ArFrame(new_frame_name, \
new_frame_size, \
- new_aoi_2d_scene, \
new_frame_background, \
new_gaze_movement_identifier, \
- current_fixation_matching, \
- looked_aoi_covering_threshold, \
+ filter_in_progress_fixation, \
new_scan_path, \
new_scan_path_analyzers, \
- new_aoi_scan_path, \
- new_aoi_scan_path_analyzers, \
- new_heatmap \
+ new_heatmap, \
+ new_layers \
)
@classmethod
- def from_json(self, json_filepath: str) -> ArEnvironmentType:
+ def from_json(self, json_filepath: str) -> ArFrameType:
"""
Load attributes from .json file.
@@ -410,69 +788,7 @@ class ArFrame():
return image
- @property
- def looked_aoi(self) -> str:
- """Get most likely looked aoi name for current fixation (e.g. the aoi with the highest covering mean value)"""
-
- return self.__looked_aoi
-
- @property
- def looked_aoi_covering_mean(self) -> float:
- """Get looked aoi covering mean for current fixation.
- It represents the ratio of fixation deviation circle surface that used to cover the looked aoi."""
-
- return self.__looked_aoi_covering_mean
-
- @property
- def looked_aoi_covering(self) -> dict:
- """Get all looked aois covering for current fixation."""
-
- return self.__looked_aoi_covering
-
- def __init_looked_aoi_data(self):
- """Init looked aoi data."""
-
- self.__look_count = 0
- self.__looked_aoi = None
- self.__looked_aoi_covering_mean = 0
- self.__looked_aoi_covering = {}
-
- def __update_looked_aoi_data(self, fixation):
- """Update looked aoi data."""
-
- self.__look_count += 1
-
- max_covering = 0.
- most_likely_looked_aoi = None
-
- for name, aoi in self.aoi_2d_scene.items():
-
- _, _, circle_ratio = aoi.circle_intersection(fixation.focus, fixation.deviation_max)
-
- if name != self.name and circle_ratio > 0:
-
- # Sum circle ratio to update aoi covering
- try:
-
- self.__looked_aoi_covering[name] += circle_ratio
-
- except KeyError:
-
- self.__looked_aoi_covering[name] = circle_ratio
-
- # Update most likely looked aoi
- if self.__looked_aoi_covering[name] > max_covering:
-
- most_likely_looked_aoi = name
- max_covering = self.__looked_aoi_covering[name]
-
- # Update looked aoi
- self.__looked_aoi = most_likely_looked_aoi
-
- # Update looked aoi covering mean
- self.__looked_aoi_covering_mean = int(100 * max_covering / self.__look_count) / 100
-
- def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition(), identified_gaze_movement: GazeFeatures.GazeMovement = GazeFeatures.UnvalidGazeMovement()) -> Tuple[GazeFeatures.GazeMovement, dict, dict, dict]:
+ def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Tuple[GazeFeatures.GazeMovement, dict, dict, dict]:
"""
Project gaze position into frame.
@@ -482,12 +798,11 @@ class ArFrame():
Parameters:
timestamp:
gaze_position: gaze position to project
- identified_gaze_movement: pass identified gaze movement instead of timestamped gaze position to avoid double identification process.
Returns:
- identified_gaze_movement: identified gaze movement from incoming consecutive timestamped gaze positions if gaze_movement_identifier is instanciated. Current gaze movement if current_fixation_matching is True.
+ identified_gaze_movement: identified gaze movement from incoming consecutive timestamped gaze positions if gaze_movement_identifier is instanciated. Current gaze movement if filter_in_progress_fixation is True.
scan_path_analysis: scan path analysis at each new scan step if scan_path is instanciated
- aoi_scan_path_analysis: new scan step at each new aoi scan step if aoi_scan_path is instanciated
+
exception: error catched during gaze position processing
"""
@@ -498,19 +813,20 @@ class ArFrame():
self.__gaze_position = gaze_position
# No gaze movement identified by default
- temp_gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
# Init scan path analysis report
scan_step_analysis = {}
- aoi_scan_step_analysis = {}
+
+ # Init layer analysis report
+ layer_analysis = {}
# Assess pipeline execution times
execution_times = {
'gaze_movement_identifier': None,
- 'aoi_fixation_matcher': None,
'scan_step_analyzers':{},
- 'aoi_scan_step_analyzers': {},
- 'heatmap': None
+ 'heatmap': None,
+ 'layers': {}
}
# Catch any error
@@ -525,66 +841,27 @@ class ArFrame():
identification_start = time.perf_counter()
# Identify finished gaze movement
- temp_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__gaze_position)
+ identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__gaze_position)
# Assess movement identification time in ms
execution_times['gaze_movement_identifier'] = (time.perf_counter() - identification_start) * 1e3
- # Use given identified gaze movement
- else:
-
- temp_gaze_movement = identified_gaze_movement
-
# Valid and finished gaze movement has been identified
- if temp_gaze_movement.valid and temp_gaze_movement.finished:
-
- if GazeFeatures.is_fixation(temp_gaze_movement):
-
- # Store aoi matching start date
- matching_start = time.perf_counter()
-
- # Does the finished fixation match an aoi?
- self.__update_looked_aoi_data(temp_gaze_movement)
+ if identified_gaze_movement.valid and identified_gaze_movement.finished:
- # Assess aoi matching time in ms
- execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3
+ if GazeFeatures.is_fixation(identified_gaze_movement):
# Append fixation to scan path
if self.scan_path != None:
- self.scan_path.append_fixation(timestamp, temp_gaze_movement)
-
- # Append fixation to aoi scan path
- if self.aoi_scan_path != None and self.looked_aoi != None and self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold:
-
- aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, temp_gaze_movement, self.looked_aoi)
-
- # Is there a new step?
- if aoi_scan_step and len(self.aoi_scan_path) > 1:
-
- for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
-
- # Store aoi scan step analysis start date
- aoi_scan_step_analysis_start = time.perf_counter()
+ self.scan_path.append_fixation(timestamp, identified_gaze_movement)
- # Analyze aoi scan path
- aoi_scan_path_analyzer.analyze(self.aoi_scan_path)
-
- # Assess aoi scan step analysis time in ms
- execution_times['aoi_scan_step_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_step_analysis_start) * 1e3
-
- # Store analysis
- aoi_scan_step_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis
-
- elif GazeFeatures.is_saccade(temp_gaze_movement):
-
- # Reset looked aoi
- self.__init_looked_aoi_data()
+ elif GazeFeatures.is_saccade(identified_gaze_movement):
# Append saccade to scan path
if self.scan_path != None:
- scan_step = self.scan_path.append_saccade(timestamp, temp_gaze_movement)
+ scan_step = self.scan_path.append_saccade(timestamp, identified_gaze_movement)
# Is there a new step?
if scan_step and len(self.scan_path) > 1:
@@ -603,28 +880,14 @@ class ArFrame():
# Store analysis
scan_step_analysis[scan_path_analyzer_module_path] = scan_path_analyzer.analysis
- # Append saccade to aoi scan path
- if self.aoi_scan_path != None:
-
- self.aoi_scan_path.append_saccade(timestamp, temp_gaze_movement)
-
- # No valid finished gaze movement: optionnaly check current fixation matching
- elif self.gaze_movement_identifier and self.current_fixation_matching:
+ # No valid finished gaze movement: optionnaly stop in progress fixation filtering
+ elif self.gaze_movement_identifier and not self.filter_in_progress_fixation:
current_fixation = self.gaze_movement_identifier.current_fixation
if current_fixation.valid:
- temp_gaze_movement = current_fixation
-
- # Store aoi matching start date
- matching_start = time.perf_counter()
-
- # Does the current fixation match an aoi?
- self.__update_looked_aoi_data(current_fixation)
-
- # Assess aoi matching time in ms
- execution_times['aoi_fixation_matcher'] = (time.perf_counter() - matching_start) * 1e3
+ identified_gaze_movement = current_fixation
# Update heatmap
if self.heatmap:
@@ -640,14 +903,27 @@ class ArFrame():
# Assess heatmap time in ms
execution_times['heatmap'] = (time.perf_counter() - heatmap_start) * 1e3
-
+
+ # Look layers
+ for layer_name, layer in self.layers.items():
+
+ looked_aoi, aoi_scan_path_analysis, layer_execution_times, layer_exception = layer.look(timestamp, identified_gaze_movement)
+
+ layer_analysis[layer_name] = aoi_scan_path_analysis
+
+ execution_times['layers'][layer_name] = layer_execution_times
+
+ if layer_exception:
+
+ raise(layer_exception)
+
except Exception as e:
print('Warning: the following error occurs in ArFrame.look method:', e)
- returned_fixation = GazeFeatures.UnvalidGazeMovement()
+ identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
scan_step_analysis = {}
- aoi_scan_step_analysis = {}
+ layer_analysis = {}
exception = e
# Unlock frame exploitation
@@ -660,26 +936,22 @@ class ArFrame():
total_execution_time += execution_times['gaze_movement_identifier']
- if execution_times['aoi_fixation_matcher']:
-
- total_execution_time += execution_times['aoi_fixation_matcher']
-
for _, scan_step_analysis_time in execution_times['scan_step_analyzers'].items():
total_execution_time += scan_step_analysis_time
- for _, aoi_scan_step_analysis_time in execution_times['aoi_scan_step_analyzers'].items():
-
- total_execution_time += aoi_scan_step_analysis_time
-
if execution_times['heatmap']:
total_execution_time += execution_times['heatmap']
+ for _, layer_execution_times in execution_times['layers'].items():
+
+ total_execution_time += layer_execution_times['total']
+
execution_times['total'] = total_execution_time
# Return look data
- return temp_gaze_movement, scan_step_analysis, aoi_scan_step_analysis, execution_times, exception
+ return identified_gaze_movement, scan_step_analysis, layer_analysis, execution_times, exception
def draw(self, image:numpy.array, aoi_color=(0, 0, 0)) -> Exception:
"""
@@ -697,32 +969,14 @@ class ArFrame():
try:
- # Draw aoi
- self.aoi_2d_scene.draw(image, color=aoi_color)
+ # Draw layers
+ for layer_name, layer in self.layers.items():
+
+ exception = layer.draw(image, aoi_color)
# Draw current gaze position
self.__gaze_position.draw(image, color=(255, 255, 255))
- # Draw current gaze movement
- current_gaze_movement = self.gaze_movement_identifier.current_gaze_movement
-
- if current_gaze_movement.valid:
-
- if GazeFeatures.is_fixation(current_gaze_movement):
-
- current_gaze_movement.draw(image, color=(0, 255, 255))
- current_gaze_movement.draw_positions(image)
-
- # Draw looked aoi
- if self.looked_aoi_covering_mean > self.looked_aoi_covering_threshold:
-
- self.aoi_2d_scene.draw_circlecast(image, current_gaze_movement.focus, current_gaze_movement.deviation_max, matching_aoi = [self.__looked_aoi], base_color=(0, 0, 0), matching_color=(255, 255, 255))
-
- elif GazeFeatures.is_saccade(current_gaze_movement):
-
- current_gaze_movement.draw(image, color=(0, 255, 255))
- current_gaze_movement.draw_positions(image)
-
except Exception as e:
# Store error to return it
@@ -737,7 +991,7 @@ class ArFrame():
@dataclass
class ArScene():
"""
- Define an Augmented Reality scene with ArUco markers and AOI scenes.
+ Define an Augmented Reality scene with ArUcoMarkers, ArLayers and ArFrames inside.
Parameters:
@@ -745,9 +999,9 @@ class ArScene():
aruco_scene: ArUco markers 3D scene description used to estimate scene pose from detected markers: see [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function below.
- aoi_3d_scene: AOI 3D scene description that will be projected onto estimated scene once its pose will be estimated : see [project][argaze.ArFeatures.ArScene.project] function below.
+ layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
- aoi_frames: Optional dictionary to define AOI as ArFrame.
+ frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
aruco_axis: Optional dictionary to define orthogonal axis where each axis is defined by list of 3 markers identifier (first is origin). \
This pose estimation strategy is used by [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function when at least 3 markers are detected.
@@ -760,8 +1014,8 @@ class ArScene():
"""
name: str
aruco_scene: ArUcoScene.ArUcoScene = field(default_factory=ArUcoScene.ArUcoScene)
- aoi_3d_scene: AOI3DScene.AOI3DScene = field(default_factory=AOI3DScene.AOI3DScene)
- aoi_frames: dict = field(default_factory=dict)
+ layers: dict = field(default_factory=dict)
+ frames: dict = field(default_factory=dict)
aruco_axis: dict = field(default_factory=dict)
aruco_aoi: dict = field(default_factory=dict)
angle_tolerance: float = field(default=0.)
@@ -769,29 +1023,59 @@ class ArScene():
def __post_init__(self):
- # Define environment attribute: it will be setup by parent environment later
- self.__environment = None
+ # Define parent attribute: it will be setup by parent object later
+ self.__parent = None
+
+ # Setup layer parent attribute
+ for name, layer in self.layers.items():
- # Preprocess orthogonal projection to speed up further aruco aoi processings
- self.__orthogonal_projection_cache = self.aoi_3d_scene.orthogonal_projection
+ layer.parent = self
- # Setup aoi frame parent attribute
- for aoi_name, frame in self.aoi_frames.items():
+ # Setup frame parent attribute
+ for name, frame in self.frames.items():
frame.parent = self
+ # Preprocess orthogonal projection to speed up further processings
+ self.__orthogonal_projection_cache = {}
+
+ for name, layer in self.layers.items():
+
+ self.__orthogonal_projection_cache[name] = self.aoi_scene.orthogonal_projection
+
def __str__(self) -> str:
"""
Returns:
String representation
"""
- output = f'ArEnvironment:\n{self.environment.name}\n'
+ output = f'parent:\n{self.parent.name}\n'
output += f'ArUcoScene:\n{self.aruco_scene}\n'
- output += f'AOI3DScene:\n{self.aoi_3d_scene}\n'
+
+ if len(self.layers):
+ output += f'ArLayers:\n'
+ for name, layer in self.layers.items():
+ output += f'{name}:\n{layer}\n'
+
+ if len(self.frames):
+ output += f'ArFrames:\n'
+ for name, frame in self.frames.items():
+ output += f'{name}:\n{frame}\n'
return output
+ @property
+ def parent(self):
+ """Get parent instance"""
+
+ return self.__parent
+
+ @parent.setter
+ def parent(self, parent):
+ """Get parent instance"""
+
+ self.__parent = parent
+
@classmethod
def from_dict(self, scene_data, working_directory: str = None) -> ArSceneType:
@@ -825,74 +1109,83 @@ class ArScene():
new_aruco_scene = None
- # Load optional aoi filter
+ # Load layers
+ new_layers = {}
+
try:
- aoi_exclude_list = scene_data.pop('aoi_exclude')
+ for layer_name, layer_data in scene_data.pop('layers').items():
+
+ # Append name
+ layer_data['name'] = layer_name
+
+ # Create layer
+ new_layer = ArLayer.from_dict(layer_data, working_directory)
+
+ # Append new layer
+ new_layers[layer_name] = new_layer
except KeyError:
- aoi_exclude_list = []
+ pass
+
+ # Load frames
+ new_frames = {}
- # Load aoi 3d scene
try:
- # Check aoi_3d_scene value type
- aoi_3d_scene_value = scene_data.pop('aoi_3d_scene')
+ for frame_name, frame_data in scene_data.pop('frames').items():
- # str: relative path to .obj file
- if type(aoi_3d_scene_value) == str:
+ # Append name
+ frame_data['name'] = frame_name
- obj_filepath = os.path.join(working_directory, aoi_3d_scene_value)
- new_aoi_3d_scene = AOI3DScene.AOI3DScene.from_obj(obj_filepath).copy(exclude=aoi_exclude_list)
-
- # dict:
- else:
+ # Create frame
+ new_frame = ArFrame.from_dict(frame_data, working_directory)
- new_aoi_3d_scene = AOI3DScene.AOI3DScene(aoi_3d_scene_value).copy(exclude=aoi_exclude_list)
+ # Look for AOI with same frame name
+ aoi_frame = None
+ for layer_name, layer in new_layers.items():
- except KeyError:
+ try:
- new_aoi_3d_scene = None
+ aoi_frame = layer.aoi_scene[frame_name]
- # Load aoi frames
- new_aoi_frames = {}
+ except KeyError:
- try:
+ # AOI name should be unique
+ break
- for aoi_name, aoi_frame_data in scene_data.pop('aoi_frames').items():
+ if aoi_frame:
- # Create aoi frame
- new_aoi_frame = ArFrame.from_dict(aoi_frame_data, working_directory)
+ # Project and reframe each layers into corresponding frame layers
+ for frame_layer_name, frame_layer in new_frame.layers.items():
- # Setup aoi frame
- new_aoi_frame.name = aoi_name
- new_aoi_frame.aoi_2d_scene = new_aoi_3d_scene.orthogonal_projection.reframe(aoi_name, new_aoi_frame.size)
+ try:
- if new_aoi_frame.aoi_scan_path != None:
+ layer = new_layers[frame_layer_name]
+
+ frame_layer.aoi_scene = layer.aoi_scene.orthogonal_projection.reframe(aoi_frame, new_frame.size)
- new_aoi_frame.aoi_scan_path.expected_aois = list(new_aoi_3d_scene.keys())
+ if frame_layer.aoi_scan_path != None:
- # Append new aoi frame
- new_aoi_frames[aoi_name] = new_aoi_frame
+ # Edit expected AOI list by removing AOI with name equals to frame layer name
+ expected_aois = list(layer.aoi_scene.keys())
+ expected_aois.remove(frame_layer_name)
- except KeyError:
+ frame_layer.aoi_scan_path.expected_aois = expected_aois
- pass
+ except KeyError:
- return ArScene(new_scene_name, new_aruco_scene, new_aoi_3d_scene, new_aoi_frames, **scene_data)
+ continue
- @property
- def environment(self):
- """Get parent environment instance"""
+ # Append new frame
+ new_frames[name] = new_frame
- return self.__environment
+ except KeyError:
- @environment.setter
- def environment(self, environment):
- """Set parent environment instance"""
+ pass
- self.__environment = environment
+ return ArScene(new_scene_name, new_aruco_scene, new_layers, new_frames, **scene_data)
def estimate_pose(self, detected_markers) -> Tuple[numpy.array, numpy.array, str, dict]:
"""Estimate scene pose from detected ArUco markers.
@@ -957,8 +1250,8 @@ class ArScene():
return tvec, rmat, 'estimate_pose_from_markers', consistent_markers
- def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> AOI2DScene.AOI2DScene:
- """Project AOI scene according estimated pose and optional horizontal field of view clipping angle.
+ def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0.) -> dict:
+ """Project layers according estimated pose and optional horizontal field of view clipping angle.
Parameters:
tvec: translation vector
@@ -966,29 +1259,36 @@ class ArScene():
visual_hfov: horizontal field of view clipping angle
Returns:
- aoi_2d_scene: scene projection
+ layer_projections: dictionary of AOI2DScene projection
"""
- # Clip AOI out of the visual horizontal field of view (optional)
- if visual_hfov > 0:
+ layer_projections = {}
+
+ for name, layer in self.layers.items():
+
+ # Clip AOI out of the visual horizontal field of view (optional)
+ if visual_hfov > 0:
- # Transform scene into camera referential
- aoi_3d_scene_camera_ref = self.aoi_3d_scene.transform(tvec, rvec)
+ # Transform layer aoi scene into camera referential
+ aoi_scene_camera_ref = layer.aoi_scene.transform(tvec, rvec)
- # Get aoi inside vision cone field
- cone_vision_height_cm = 200 # cm
- cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm
+ # Get aoi inside vision cone field
+ cone_vision_height_cm = 200 # cm
+ cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm
- _, aoi_outside = aoi_3d_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm)
+ _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm)
- # Keep only aoi inside vision cone field
- aoi_3d_scene_copy = self.aoi_3d_scene.copy(exclude=aoi_outside.keys())
+ # Keep only aoi inside vision cone field
+ aoi_scene_copy = layer.aoi_scene.copy(exclude=aoi_outside.keys())
- else:
+ else:
+
+ aoi_scene_copy = layer.aoi_scene.copy()
- aoi_3d_scene_copy = self.aoi_3d_scene.copy()
+ # Project layer aoi scene
+ layer_projections[name] = aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K)
- return aoi_3d_scene_copy.project(tvec, rvec, self.environment.aruco_detector.optic_parameters.K)
+ return projected_layers
def build_aruco_aoi_scene(self, detected_markers) -> AOI2DScene.AOI2DScene:
"""
@@ -1044,7 +1344,7 @@ class ArScene():
image: where to draw
"""
- self.aruco_scene.draw_axis(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D)
+ self.aruco_scene.draw_axis(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D)
def draw_places(self, image: numpy.array):
"""
@@ -1054,7 +1354,7 @@ class ArScene():
image: where to draw
"""
- self.aruco_scene.draw_places(image, self.environment.aruco_detector.optic_parameters.K, self.environment.aruco_detector.optic_parameters.D)
+ self.aruco_scene.draw_places(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D)
@dataclass
class ArEnvironment():
@@ -1080,10 +1380,10 @@ class ArEnvironment():
self.camera_frame.parent = self
- # Setup scenes environment attribute
+ # Setup scenes parent attribute
for name, scene in self.scenes.items():
- scene.environment = self
+ scene.parent = self
# Init a lock to share AOI scene projections into camera frame between multiple threads
self.__camera_frame_lock = threading.Lock()
@@ -1155,29 +1455,37 @@ class ArEnvironment():
# Build scenes
new_scenes = {}
- for new_scene_name, scene_data in environment_data.pop('scenes').items():
+ for scene_name, scene_data in environment_data.pop('scenes').items():
+
+ # Append name
+ scene_data['name'] = scene_name
# Create new scene
new_scene = ArScene.from_dict(scene_data, working_directory)
- # Setup new scene
- new_scene.name = new_scene_name
-
# Append new scene
- new_scenes[new_scene_name] = new_scene
+ new_scenes[scene_name] = new_scene
- # Setup expected aoi for camera frame aoi scan path
+ # Setup expected aoi of each camera frame layer aoi scan path with the aoi of corresponding scene layer
if new_camera_frame != None:
- if new_camera_frame.aoi_scan_path != None:
+ for camera_frame_layer_name, camera_frame_layer in new_camera_frame.layers.items():
- # List all environment aoi
all_aoi_list = []
+
for scene_name, scene in new_scenes.items():
- all_aoi_list.extend(list(scene.aoi_3d_scene.keys()))
+ try:
- new_camera_frame.aoi_scan_path.expected_aois = all_aoi_list
+ scene_layer = scene.layers[camera_frame_layer_name]
+
+ all_aoi_list.extend(list(scene_layer.aoi_scene.keys()))
+
+ except KeyError:
+
+ continue
+
+ camera_frame_layer.aoi_scan_path.expected_aois = all_aoi_list
# Create new environment
return ArEnvironment(new_environment_name, new_aruco_detector, new_camera_frame, new_scenes)
@@ -1232,16 +1540,16 @@ class ArEnvironment():
return image
@property
- def aoi_frames(self):
- """Iterate over all environment scenes aoi frames"""
+ def frames(self):
+ """Iterate over all environment scenes frames"""
# For each scene
for scene_name, scene in self.scenes.items():
- # For each aoi frame
- for frame_name, aoi_frame in scene.aoi_frames.items():
+ # For each frame
+ for name, frame in scene.frames.items():
- yield aoi_frame
+ yield frame
def detect_and_project(self, image: numpy.array) -> Tuple[float, dict]:
"""Detect environment aruco markers from image and project scenes into camera frame.
@@ -1325,13 +1633,13 @@ class ArEnvironment():
# Project gaze position into camera frame
yield self.camera_frame, self.camera_frame.look(timestamp, gaze_position)
- # Project gaze position into each aoi frames if possible
- for aoi_frame in self.aoi_frames:
+ # Project gaze position into each frames if possible
+ for frame in self.frames:
- # Is aoi frame projected into camera frame ?
+ # Is there an AOI with the same frame name projected into camera frame ?
try:
- aoi_2d = self.camera_frame.aoi_2d_scene[aoi_frame.name]
+ aoi_2d = self.camera_frame.aoi_2d_scene[frame.name]
# TODO: Add option to use gaze precision circle
if aoi_2d.contains_point(gaze_position.value):
@@ -1341,7 +1649,7 @@ class ArEnvironment():
# QUESTION: How to project gaze precision?
inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y))
- yield aoi_frame, aoi_frame.look(timestamp, inner_gaze_position * aoi_frame.size)
+ yield frame, frame.look(timestamp, inner_gaze_position * frame.size)
# Ignore missing aoi frame projection
except KeyError:
@@ -1352,7 +1660,7 @@ class ArEnvironment():
self.__camera_frame_lock.release()
def map(self):
- """Project camera frame background into aoi frames background.
+ """Project camera frame background into frames background.
.. warning:: detect_and_project method needs to be called first.
"""
@@ -1365,20 +1673,20 @@ class ArEnvironment():
self.__camera_frame_lock.acquire()
# Project image if possible
- for aoi_frame in self.aoi_frames:
+ for frame in self.frames:
- # Is aoi frame projected into camera frame ?
+ # Is there an AOI with the same frame name projected into camera frame ?
try:
- aoi_2d = self.camera_frame.aoi_2d_scene[aoi_frame.name]
+ aoi_2d = self.camera_frame.aoi_2d_scene[frame.name]
# Apply perspective transform algorithm to fill aoi frame background
- width, height = aoi_frame.size
+ width, height = frame.size
destination = numpy.float32([[0, height],[width, height],[width, 0],[0, 0]])
mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination)
- aoi_frame.background = cv2.warpPerspective(self.camera_frame.background, mapping, (width, height))
+ frame.background = cv2.warpPerspective(self.camera_frame.background, mapping, (width, height))
- # Ignore missing aoi frame projection
+ # Ignore missing frame projection
except KeyError:
pass
diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py
index 694e304..b2dba39 100644
--- a/src/argaze/AreaOfInterest/AOI2DScene.py
+++ b/src/argaze/AreaOfInterest/AOI2DScene.py
@@ -112,21 +112,23 @@ class AOI2DScene(AOIFeatures.AOIScene):
# Draw form
aoi.draw(image, color)
- def reframe(self, aoi_name: str, size: tuple) -> AOI2DSceneType:
+ def reframe(self, aoi: AOIFeatures.AreaOfInterest, size: tuple) -> AOI2DSceneType:
"""
- Reframe whole scene to a scene bounded by an AOI.
+ Reframe whole scene to a scene bounded by a 4 vertices 2D AOI.
Parameters:
- aoi: name of AOI used to reframe scene
+ aoi: 4 vertices 2D AOI used to reframe scene
+ size: size of reframed scene
Returns:
reframed AOI 2D scene
"""
- assert(self[aoi_name].points_number == 4)
+ assert(aoi.dimension == 2)
+ assert(aoi.points_number == 4)
# Edit affine transformation (M) allowing to transform source axis (Src) into destination axis (Dst)
- Src = self[aoi_name].clockwise().astype(numpy.float32)
+ Src = aoi.clockwise().astype(numpy.float32)
Src_origin = Src[0]
Src = Src - Src_origin
Dst = numpy.float32([[0, 0], [size[0], 0], [size[0], size[1]], [0, size[1]]])
diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py
index 8c684c0..5a9d0a9 100644
--- a/src/argaze/AreaOfInterest/AOIFeatures.py
+++ b/src/argaze/AreaOfInterest/AOIFeatures.py
@@ -241,6 +241,42 @@ class AOIScene():
for name, area in areas.items():
self[name] = AreaOfInterest(area)
+ @classmethod
+ def from_dict(self, aoi_scene_data, working_directory: str = None) -> AOISceneType:
+ """Load attributes from dictionary.
+
+ Parameters:
+ aoi_scene_data: dictionary with attributes to load
+ working_directory: folder path where to load files when a dictionary value is a relative filepath.
+ """
+
+ # Load areas
+ areas = {}
+
+ for name, area in aoi_scene_data.items():
+ areas[name] = AreaOfInterest(area)
+
+ # Guess dimension from first area dimension
+ dimension = areas.values()[0].dimension
+
+ return AOIScene(dimension = dimension, areas = areas)
+
+ @classmethod
+ def from_json(self, json_filepath: str) -> AOISceneType:
+ """
+ Load attributes from .json file.
+
+ Parameters:
+ json_filepath: path to json file
+ """
+
+ with open(json_filepath) as configuration_file:
+
+ aoi_scene_data = json.load(configuration_file)
+ working_directory = os.path.dirname(json_filepath)
+
+ return AOIScene.from_dict(aoi_scene_data, working_directory)
+
def __getitem__(self, name) -> AreaOfInterest:
"""Get an AOI from the scene."""
diff --git a/src/argaze/utils/demo_ar_features_run.py b/src/argaze/utils/demo_ar_features_run.py
index bd48d0b..0df81c5 100644
--- a/src/argaze/utils/demo_ar_features_run.py
+++ b/src/argaze/utils/demo_ar_features_run.py
@@ -49,7 +49,7 @@ def main():
for frame, look_data in ar_environment.look(timestamp, GazeFeatures.GazePosition((x, y))):
# Unpack look data
- movement, scan_step_analysis, aoi_scan_step_analysis, times, exception = look_data
+ movement, scan_step_analysis, layer_analysis, execution_times, exception = look_data
# Do something with look data
# ...
@@ -94,17 +94,17 @@ def main():
# Display environment
cv2.imshow(ar_environment.name, environment_image)
- # Draw and display each aoi frames
- for aoi_frame in ar_environment.aoi_frames:
+ # Draw and display each frames
+ for frame in ar_environment.frames:
# Create frame image
- aoi_frame_image = aoi_frame.image
+ frame_image = frame.image
# Draw frame info
- aoi_frame.draw(aoi_frame_image)
+ frame.draw(frame_image)
# Display frame
- cv2.imshow(f'{aoi_frame.parent.name}:{aoi_frame.name}', aoi_frame_image)
+ cv2.imshow(f'{frame.parent.name}:{frame.name}', frame_image)
# Stop by pressing 'Esc' key
if cv2.waitKey(10) == 27:
diff --git a/src/argaze/utils/demo_environment/demo_ar_features_setup.json b/src/argaze/utils/demo_environment/demo_ar_features_setup.json
index 3e030f8..b943a83 100644
--- a/src/argaze/utils/demo_environment/demo_ar_features_setup.json
+++ b/src/argaze/utils/demo_environment/demo_ar_features_setup.json
@@ -23,8 +23,12 @@
"scenes": {
"AR Scene Demo" : {
"aruco_scene": "aruco_scene.obj",
- "aoi_3d_scene": "aoi_3d_scene.obj",
- "aoi_frames": {
+ "layers": {
+ "MainLayer" : {
+ "aoi_scene": "aoi_3d_scene.obj"
+ }
+ },
+ "frames": {
"GrayRectangle": {
"size": [640, 480],
"background": "frame_background.jpg",
diff --git a/src/argaze/utils/demo_environment/demo_gaze_features_setup.json b/src/argaze/utils/demo_environment/demo_gaze_features_setup.json
index 49bf257..f9947d6 100644
--- a/src/argaze/utils/demo_environment/demo_gaze_features_setup.json
+++ b/src/argaze/utils/demo_environment/demo_gaze_features_setup.json
@@ -1,49 +1,45 @@
{
- "name": "AR Environment Demo",
- "scenes": {
- "AR Scene Demo" : {
- "aoi_3d_scene": "aoi_3d_scene.obj",
- "aoi_frames": {
- "GrayRectangle": {
- "size": [1920, 1149],
- "background": "frame_background.jpg",
- "gaze_movement_identifier": {
- "DispersionThresholdIdentification": {
- "deviation_max_threshold": 50,
- "duration_min_threshold": 200
- }
- },
- "scan_path": {
- "duration_max": 10000
- },
- "scan_path_analyzers": {
- "Basic": {},
- "KCoefficient": {},
- "NearestNeighborIndex": {
- "size": [1920, 1149]
- },
- "ExploitExploreRatio": {
- "short_fixation_duration_threshold": 0
- }
- },
- "aoi_scan_path": {
- "duration_max": 10000
- },
- "aoi_scan_path_analyzers": {
- "Basic": {},
- "TransitionMatrix": {},
- "KCoefficient": {},
- "LempelZivComplexity": {},
- "NGram": {
- "n_min": 3,
- "n_max": 3
- },
- "Entropy":{}
- },
- "heatmap": {
- "size": [320, 240]
- }
- }
+ "name": "ArFrame Demo",
+ "size": [1920, 1149],
+ "background": "frame_background.jpg",
+ "gaze_movement_identifier": {
+ "DispersionThresholdIdentification": {
+ "deviation_max_threshold": 50,
+ "duration_min_threshold": 200
+ }
+ },
+ "scan_path": {
+ "duration_max": 10000
+ },
+ "scan_path_analyzers": {
+ "Basic": {},
+ "KCoefficient": {},
+ "NearestNeighborIndex": {
+ "size": [1920, 1149]
+ },
+ "ExploitExploreRatio": {
+ "short_fixation_duration_threshold": 0
+ }
+ },
+ "heatmap": {
+ "size": [320, 240]
+ },
+ "layers": {
+ "GrayRectangle": {
+ "aoi_scene": "aoi_3d_scene.obj",
+ "aoi_scan_path": {
+ "duration_max": 10000
+ },
+ "aoi_scan_path_analyzers": {
+ "Basic": {},
+ "TransitionMatrix": {},
+ "KCoefficient": {},
+ "LempelZivComplexity": {},
+ "NGram": {
+ "n_min": 3,
+ "n_max": 3
+ },
+ "Entropy":{}
}
}
}
diff --git a/src/argaze/utils/demo_gaze_features_run.py b/src/argaze/utils/demo_gaze_features_run.py
index 15fc4f4..915ae86 100644
--- a/src/argaze/utils/demo_gaze_features_run.py
+++ b/src/argaze/utils/demo_gaze_features_run.py
@@ -21,23 +21,20 @@ import pandas
def main():
"""
- Load AR environment from .json file to project AOI scene on screen and use mouse pointer to simulate gaze positions.
+ Load ArFrame from .json file and use mouse pointer to simulate gaze positions.
"""
current_directory = os.path.dirname(os.path.abspath(__file__))
# Manage arguments
parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('environment', metavar='ENVIRONMENT', type=str, help='ar environment filepath')
+ parser.add_argument('frame', metavar='FRAME', type=str, help='ar frame filepath')
args = parser.parse_args()
- # Load AR environment
- ar_environment = ArFeatures.ArEnvironment.from_json(args.environment)
+ # Load ArFrame
+ ar_frame = ArFeatures.ArFrame.from_json(args.frame)
- # Select AR frame
- ar_frame = ar_environment.scenes["AR Scene Demo"].aoi_frames["GrayRectangle"]
-
- # Create a window to display AR environment
+ # Create a window to display ArEnvironment
cv2.namedWindow(ar_frame.name, cv2.WINDOW_AUTOSIZE)
# Heatmap buffer display option
@@ -53,7 +50,7 @@ def main():
timestamp = int((time.time() - start_time) * 1e3)
# Project gaze position into frame
- movement, scan_step_analysis, aoi_scan_step_analysis, times, exception = ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y)))
+ movement, scan_step_analysis, layer_analysis, execution_times, exception = ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y)))
# Do something with look data
# ...
@@ -80,45 +77,45 @@ def main():
# Write last 5 steps of aoi scan path
path = ''
- for step in ar_frame.aoi_scan_path[-5:]:
+ for step in ar_frame.layers["GrayRectangle"].aoi_scan_path[-5:]:
path += f'> {step.aoi} '
- path += f'> {ar_frame.aoi_scan_path.current_aoi}'
+ path += f'> {ar_frame.layers["GrayRectangle"].aoi_scan_path.current_aoi}'
cv2.putText(frame_image, path, (20, ar_frame.size[1]-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
# Display Transition matrix analysis if loaded
- try:
+ #try:
- transition_matrix_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.TransitionMatrix"]
+ transition_matrix_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.TransitionMatrix"]
- cv2.putText(frame_image, f'Transition matrix density: {transition_matrix_analyzer.transition_matrix_density:.2f}', (20, ar_frame.size[1]-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
-
- # Iterate over indexes (departures)
- for from_aoi, row in transition_matrix_analyzer.transition_matrix_probabilities.iterrows():
+ cv2.putText(frame_image, f'Transition matrix density: {transition_matrix_analyzer.transition_matrix_density:.2f}', (20, ar_frame.size[1]-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+
+ # Iterate over indexes (departures)
+ for from_aoi, row in transition_matrix_analyzer.transition_matrix_probabilities.iterrows():
- # Iterate over columns (destinations)
- for to_aoi, probability in row.items():
+ # Iterate over columns (destinations)
+ for to_aoi, probability in row.items():
- if from_aoi != to_aoi and probability > 0.0:
+ if from_aoi != to_aoi and probability > 0.0:
- from_center = ar_frame.aoi_2d_scene[from_aoi].center.astype(int)
- to_center = ar_frame.aoi_2d_scene[to_aoi].center.astype(int)
- start_line = (0.5 * from_center + 0.5 * to_center).astype(int)
+ from_center = ar_frame.layers['GrayRectangle'].aoi_scene[from_aoi].center.astype(int)
+ to_center = ar_frame.layers['GrayRectangle'].aoi_scene[to_aoi].center.astype(int)
+ start_line = (0.5 * from_center + 0.5 * to_center).astype(int)
- color = [int(probability*200) + 55, int(probability*200) + 55, int(probability*200) + 55]
+ color = [int(probability*200) + 55, int(probability*200) + 55, int(probability*200) + 55]
- cv2.line(frame_image, start_line, to_center, color, int(probability*10) + 2)
- cv2.line(frame_image, from_center, to_center, [55, 55, 55], 2)
-
- except KeyError:
- pass
+ cv2.line(frame_image, start_line, to_center, color, int(probability*10) + 2)
+ cv2.line(frame_image, from_center, to_center, [55, 55, 55], 2)
+
+ #except KeyError:
+ # pass
# Display aoi scan path basic metrics analysis if loaded
try:
- basic_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.Basic"]
+ basic_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.Basic"]
# Write basic analysis
cv2.putText(frame_image, f'Step number: {basic_analyzer.steps_number}', (20, ar_frame.size[1]-440), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
@@ -147,7 +144,7 @@ def main():
# Display aoi scan path K-modified coefficient analysis if loaded
try:
- aoi_kc_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.KCoefficient"]
+ aoi_kc_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.KCoefficient"]
# Write aoi Kc analysis
if aoi_kc_analyzer.K < 0.:
@@ -164,7 +161,7 @@ def main():
# Display Lempel-Ziv complexity analysis if loaded
try:
- lzc_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.LempelZivComplexity"]
+ lzc_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.LempelZivComplexity"]
cv2.putText(frame_image, f'Lempel-Ziv complexity: {lzc_analyzer.lempel_ziv_complexity}', (20, ar_frame.size[1]-200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
@@ -174,7 +171,7 @@ def main():
# Display N-Gram analysis if loaded
try:
- ngram_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.NGram"]
+ ngram_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.NGram"]
# Display only 3-gram analysis
start = ar_frame.size[1] - ((len(ngram_analyzer.ngrams_count[3]) + 1) * 40)
@@ -194,7 +191,7 @@ def main():
# Display Entropy analysis if loaded
try:
- entropy_analyzer = ar_frame.aoi_scan_path_analyzers["argaze.GazeAnalysis.Entropy"]
+ entropy_analyzer = ar_frame.layers['GrayRectangle'].aoi_scan_path_analyzers["argaze.GazeAnalysis.Entropy"]
cv2.putText(frame_image, f'Stationary entropy: {entropy_analyzer.stationary_entropy:.3f},', (20, ar_frame.size[1]-280), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
cv2.putText(frame_image, f'Transition entropy: {entropy_analyzer.transition_entropy:.3f},', (20, ar_frame.size[1]-240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
@@ -234,8 +231,7 @@ def main():
# Reload environment with 'h' key
if key_pressed == 114:
- ar_environment = ArFeatures.ArEnvironment.from_json(args.environment)
- ar_frame = ar_environment.scenes["AR Scene Demo"].aoi_frames["GrayRectangle"]
+ ar_frame = ArFeatures.ArFrame.from_json(args.frame)
# Enable heatmap buffer with 'b' key
if key_pressed == 98: