aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/ArFeatures.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/argaze/ArFeatures.py')
-rw-r--r--src/argaze/ArFeatures.py874
1 files changed, 516 insertions, 358 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 8c9b3c8..93a21ed 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -8,8 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "BSD"
from typing import TypeVar, Tuple, Any, Iterator, Union
-from types import ModuleType
-from dataclasses import dataclass, field
import json
import os
import sys
@@ -95,61 +93,144 @@ DEFAULT_ARLAYER_DRAW_PARAMETERS = {
}
}
-@dataclass
class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed.
!!! note
- Inherits from DataFeatures.SharedObject class to be shared by multiple threads
-
- Parameters:
- name: name of the layer
- aoi_scene: AOI scene description
- aoi_matcher: AOI matcher object
- aoi_scan_path: AOI scan path object
- aoi_scan_path_analyzers: dictionary of AOI scan path analyzers
- draw_parameters: default parameters passed to draw method
- logging_module: path to logging module file in working directory
+ Inherits from DataFeatures.SharedObject class to be shared by multiple threads.
"""
- name: str
- aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene)
- aoi_matcher: GazeFeatures.AOIMatcher = field(default_factory=GazeFeatures.AOIMatcher)
- aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath)
- aoi_scan_path_analyzers: dict = field(default_factory=dict)
- draw_parameters: dict = field(default_factory=DEFAULT_ARLAYER_DRAW_PARAMETERS)
- logging_module: ModuleType = field(default=None)
+ def __init__(self, name: str = None, aoi_scene: AOIFeatures.AOIScene = None, aoi_matcher: GazeFeatures.AOIMatcher = None, aoi_scan_path: GazeFeatures.AOIScanPath = None, aoi_scan_path_analyzers: dict = None, draw_parameters: dict = None):
+ """ Initialize ArLayer
- def __post_init__(self):
+ Parameters:
+ name: name of the layer
+ aoi_scene: AOI scene description
+ aoi_matcher: AOI matcher object
+ aoi_scan_path: AOI scan path object
+ aoi_scan_path_analyzers: dictionary of AOI scan path analyzers
+ draw_parameters: default parameters passed to draw method
+ """
- # Init sharedObject
+ # Init parent classes
super().__init__()
- # Define parent attribute: it will be setup by parent later
- self.__parent = None
-
- # Init current gaze movement
+ # Init private attributes
+ self.__name = name
+ self.__aoi_scene = aoi_scene
+ self.__aoi_matcher = aoi_matcher
+ self.__aoi_scan_path = aoi_scan_path
+ self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers
+ self.__draw_parameters = draw_parameters
+ self.__parent = None # it will be setup by parent later
self.__gaze_movement = GazeFeatures.UnvalidGazeMovement()
-
- # Init current looked aoi name
self.__looked_aoi_name = None
-
- # Init aoi scan path analyzed state
self.__aoi_scan_path_analyzed = False
+
+ '''
+ # Register loggers from logging module as pipeline step observers
+ if self.logging_module is not None:
+
+ self.__observers = importlib.import_module(self.logging_module).__loggers__
+
+ # DEBUG
+ print(f'Observers registered for {self.__name} layer:', self.__observers)
+ '''
# Cast aoi scene to its effective dimension
- if self.aoi_scene.dimension == 2:
+ if self.__aoi_scene.dimension == 2:
+
+ self.__aoi_scene = AOI2DScene.AOI2DScene(self.__aoi_scene)
+
+ elif self.__aoi_scene.dimension == 3:
+
+ self.__aoi_scene = AOI3DScene.AOI3DScene(self.__aoi_scene)
+
+ @property
+ def name(self) -> str:
+ """Get layer's name."""
+ return self.__name
+
+ @property
+ def aoi_scene(self) -> AOIFeatures.AOIScene:
+ """Get layer's aoi scene object."""
+ return self.__aoi_scene
+
+ @aoi_scene.setter
+ def aoi_scene(self, aoi_scene: AOIFeatures.AOIScene):
+ """Set layer's aoi scene object."""
+ self.__aoi_scene = aoi_scene
+
+ @property
+ def aoi_matcher(self) -> GazeFeatures.AOIMatcher:
+ """Get layer's aoi matcher object."""
+ return self.__aoi_matcher
+
+ @property
+ def aoi_scan_path(self) -> GazeFeatures.AOIScanPath:
+ """Get layer's aoi scan path object."""
+ return self.__aoi_scan_path
+
+ @property
+ def aoi_scan_path_analyzers(self) -> dict:
+ """Get layer's aoi scan analyzers dictionary."""
+ return self.__aoi_scan_path_analyzers
+
+ @property
+ def draw_parameters(self):
+ """Get layer's draw parameters dictionary."""
+ return self.__draw_parameters
- self.aoi_scene = AOI2DScene.AOI2DScene(self.aoi_scene)
+ @property
+ def parent(self) -> object:
+ """Get layer's parent object."""
+ return self.__parent
- elif self.aoi_scene.dimension == 3:
+ @parent.setter
+ def parent(self, parent: object):
+ """Set layer's parent object."""
+ self.__parent = parent
- self.aoi_scene = AOI3DScene.AOI3DScene(self.aoi_scene)
+ @property
+ def looked_aoi_name(self) -> str:
+ """Get aoi matcher looked aoi name."""
+ return self.__looked_aoi_name
+
+ @property
+ def aoi_scan_path_analyzed(self) -> bool:
+ """Are aoi scan path analysis ready?"""
+
+ return self.__aoi_scan_path_analyzed
+
+ @property
+ def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]:
+ """Get aoi scan path analysis.
+
+ Returns
+ iterator: analyzer module path, analysis dictionary
+ """
+ assert(self.__aoi_scan_path_analyzed)
+
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items():
+
+ yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
+
+ def as_dict(self) -> dict:
+ """Export ArLayer attributes as dictionary."""
+
+ return {
+ "name": self.__name,
+ "aoi_scene": self.__aoi_scene,
+ "aoi_matcher": self.__aoi_matcher,
+ "aoi_scan_path": self.__aoi_scan_path,
+ "aoi_scan_path_analyzers": self.__aoi_scan_path_analyzers,
+ "draw_parameters": self.__draw_parameters
+ }
@classmethod
def from_dict(self, layer_data: dict, working_directory: str = None) -> ArLayerType:
- """Load attributes from dictionary.
+ """Load ArLayer attributes from dictionary.
Parameters:
layer_data: dictionary with attributes to load
@@ -305,7 +386,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
except KeyError:
new_layer_draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS
-
+ '''
# Load logging module
try:
@@ -314,26 +395,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# str: relative path to file
if type(new_logging_module_value) == str:
- logging_module_name = new_logging_module_value.split('.')[0]
+ new_logging_module = new_logging_module_value.split('.')[0]
- # Import logging module
- self.logging_module = importlib.import_module(logging_module_name)
-
- # Register loggers as pipeline step observers
- self.observers = self.logging_module.__loggers__
-
except KeyError:
- pass
-
+ new_logging_module = None
+ '''
# Create layer
- return ArLayer(new_layer_name, \
- new_aoi_scene, \
- new_aoi_matcher, \
- new_aoi_scan_path, \
- new_aoi_scan_path_analyzers, \
- new_layer_draw_parameters \
- )
+ return ArLayer(new_layer_name, new_aoi_scene, new_aoi_matcher, new_aoi_scan_path, new_aoi_scan_path_analyzers, new_layer_draw_parameters)
@classmethod
def from_json(self, json_filepath: str) -> ArLayerType:
@@ -351,43 +420,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return ArLayer.from_dict(layer_data, working_directory)
- @property
- def parent(self):
- """Get parent instance"""
-
- return self.__parent
-
- @parent.setter
- def parent(self, parent):
- """Get parent instance"""
-
- self.__parent = parent
-
- @property
- def looked_aoi_name(self) -> str:
- """The name of looked aoi."""
-
- return self.__looked_aoi_name
-
- @property
- def aoi_scan_path_analyzed(self) -> bool:
- """Are aoi scan path analysis ready?"""
-
- return self.__aoi_scan_path_analyzed
-
- def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]:
- """Get aoi scan path analysis.
-
- Returns
- iterator: analyzer module path, analysis dictionary
- """
-
- assert(self.__aoi_scan_path_analyzed)
-
- for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
-
- yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
-
@DataFeatures.PipelineStepMethod
def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()):
"""
@@ -402,7 +434,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
# Use layer locker feature
- with self.locker:
+ with self._lock:
# Update current gaze movement
self.__gaze_movement = gaze_movement
@@ -413,11 +445,11 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Reset aoi scan path analyzed state
self.__aoi_scan_path_analyzed = False
- if self.aoi_matcher is not None:
+ if self.__aoi_matcher is not None:
# Update looked aoi thanks to aoi matcher
# Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally
- self.__looked_aoi_name, _ = self.aoi_matcher.match(timestamp, self.aoi_scene, gaze_movement)
+ self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement)
# Valid and finished gaze movement has been identified
if gaze_movement.valid and gaze_movement.finished:
@@ -425,17 +457,17 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if GazeFeatures.is_fixation(gaze_movement):
# Append fixation to aoi scan path
- if self.aoi_scan_path is not None and self.__looked_aoi_name is not None:
+ if self.__aoi_scan_path is not None and self.__looked_aoi_name is not None:
- aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name)
+ aoi_scan_step = self.__aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name)
# Is there a new step?
- if aoi_scan_step is not None and len(self.aoi_scan_path) > 1:
+ if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1:
# Analyze aoi scan path
- for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items():
- aoi_scan_path_analyzer.analyze(timestamp, self.aoi_scan_path)
+ aoi_scan_path_analyzer.analyze(timestamp, self.__aoi_scan_path)
# Update aoi scan path analyzed state
self.__aoi_scan_path_analyzed = True
@@ -443,9 +475,9 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
elif GazeFeatures.is_saccade(gaze_movement):
# Append saccade to aoi scan path
- if self.aoi_scan_path is not None:
+ if self.__aoi_scan_path is not None:
- self.aoi_scan_path.append_saccade(timestamp, gaze_movement)
+ self.__aoi_scan_path.append_saccade(timestamp, gaze_movement)
def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None):
"""
@@ -459,20 +491,20 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Use draw_parameters attribute if no parameters
if draw_aoi_scene is None and draw_aoi_matching is None:
- return self.draw(image, **self.draw_parameters)
+ return self.draw(image, **self.__draw_parameters)
# Use layer locker feature
- with self.locker:
+ with self._lock:
# Draw aoi if required
if draw_aoi_scene is not None:
- self.aoi_scene.draw(image, **draw_aoi_scene)
+ self.__aoi_scene.draw(image, **draw_aoi_scene)
# Draw aoi matching if required
- if draw_aoi_matching is not None and self.aoi_matcher is not None:
+ if draw_aoi_matching is not None and self.__aoi_matcher is not None:
- self.aoi_matcher.draw(image, self.aoi_scene, **draw_aoi_matching)
+ self.__aoi_matcher.draw(image, self.__aoi_scene, **draw_aoi_matching)
# Define default ArFrame image parameters
DEFAULT_ARFRAME_IMAGE_PARAMETERS = {
@@ -495,67 +527,186 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = {
}
}
-@dataclass
class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed.
!!! note
Inherits from DataFeatures.SharedObject class to be shared by multiple threads
-
- Parameters:
- name: name of the frame
- size: defines the dimension of the rectangular area where gaze positions are projected
- gaze_position_calibrator: gaze position calibration algoritm
- gaze_movement_identifier: gaze movement identification algorithm
- filter_in_progress_identification: ignore in progress gaze movement identification
- scan_path: scan path object
- scan_path_analyzers: dictionary of scan path analyzers
- heatmap: heatmap object
- background: picture to draw behind
- layers: dictionary of AOI layers
- image_parameters: default parameters passed to image method
- logging_module: path to logging module file in working directory
"""
- name: str
- size: tuple[int] = field(default=(1, 1))
- gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = field(default_factory=GazeFeatures.GazePositionCalibrator)
- gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier)
- filter_in_progress_identification: bool = field(default=True)
- scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath)
- scan_path_analyzers: dict = field(default_factory=dict)
- heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap)
- background: numpy.array = field(default_factory=lambda : numpy.array([]))
- layers: dict = field(default_factory=dict)
- image_parameters: dict = field(default_factory=DEFAULT_ARFRAME_IMAGE_PARAMETERS)
- logging_module: ModuleType = field(default=None)
-
- def __post_init__(self):
-
- # Init sharedObject
+ def __init__(self, name: str = None, size: tuple[int] = (1, 1), gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = None, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = None, filter_in_progress_identification: bool = True, scan_path: GazeFeatures.ScanPath = None, scan_path_analyzers: dict = None, background: numpy.array = numpy.array([]), heatmap: AOIFeatures.Heatmap = None, layers: dict = None, image_parameters: dict = DEFAULT_ARFRAME_IMAGE_PARAMETERS):
+ """ Initialize ArFrame
+
+ Parameters:
+ name: name of the frame
+ size: defines the dimension of the rectangular area where gaze positions are projected
+ gaze_position_calibrator: gaze position calibration algoritm
+ gaze_movement_identifier: gaze movement identification algorithm
+ filter_in_progress_identification: ignore in progress gaze movement identification
+ scan_path: scan path object
+ scan_path_analyzers: dictionary of scan path analyzers
+ background: picture to draw behind
+ heatmap: heatmap object
+ layers: dictionary of AOI layers
+ image_parameters: default parameters passed to image method
+ """
+
+ # DEBUG
+ print(f'ArFrame.__init__ {name} {layers}')
+
+ # Init parent classes
super().__init__()
- # Define parent attribute: it will be setup by parent later
- self.__parent = None
+ # Init private attributes
+ self.__name = name
+ self.__size = size
+ self.__gaze_position_calibrator = gaze_position_calibrator
+ self.__gaze_movement_identifier = gaze_movement_identifier
+ self.__filter_in_progress_identification = filter_in_progress_identification
+ self.__scan_path = scan_path
+ self.__scan_path_analyzers = scan_path_analyzers
+ self.__background = background
+ self.__heatmap = heatmap
+ self.__layers = layers
+ self.__image_parameters = image_parameters
+ self.__parent = None # it will be setup by parent later
+ self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition()
+ self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ self.__scan_path_analyzed = False
# Setup layers parent attribute
- for name, layer in self.layers.items():
+ for name, layer in self.__layers.items():
layer.parent = self
+ '''
+ # Import logging module __loggers__ variable as pipeline step observers
+ if self.logging_module is not None:
- # Init current gaze position
- self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition()
+ self.__observers = importlib.import_module(self.logging_module).__loggers__
- # Init current gaze movement
- self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ # DEBUG
+ print(f'Observers registered for {self.__name} frame:', self.__observers)
+ '''
- # Init scan path analyzed state
- self.__scan_path_analyzed = False
+ @property
+ def name(self) -> str:
+ """Get frame's name."""
+ return self.__name
+
+ @property
+ def size(self) -> tuple[int]:
+ """Get frame's size."""
+ return self.__size
+
+ @property
+ def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator:
+ """Get frame's gaze position calibrator object."""
+ return self.__gaze_position_calibrator
+
+ @property
+ def gaze_movement_identifier(self) -> GazeFeatures.GazeMovementIdentifier:
+ """Get frame's gaze movement identifier object."""
+ return self.__gaze_movement_identifier
+
+ @property
+ def filter_in_progress_indentification(self) -> bool:
+ """Is frame filtering in progress identification?"""
+ return self.__filter_in_progress_indentification
+
+ @property
+ def scan_path(self) -> GazeFeatures.ScanPath:
+ """Get frame's scan path object."""
+ return self.__scan_path
+
+ @property
+ def scan_path_analyzers(self) -> dict:
+ """Get frame's scan path analyzers dictionary."""
+ return self.__scan_path_analyzers
+
+ @property
+ def background(self) -> numpy.array:
+ """Get frame's background matrix."""
+ return self.__background
+
+ @background.setter
+ def background(self, image: numpy.array):
+ """Set frame's background matrix."""
+ self.__background = image
+
+ @property
+ def heatmap(self) -> AOIFeatures.Heatmap:
+ """Get frame's heatmap object."""
+ return self.__heatmap
+
+ @property
+ def layers(self) -> dict:
+ """Get frame's layers dictionary."""
+ return self.__layers
+
+ @property
+ def image_parameters(self) -> dict:
+ """Get frame's image parameters dictionary."""
+ return self.__image_parameters
+
+ @property
+ def parent(self) -> object:
+ """Get frame's parent object."""
+ return self.__parent
+
+ @parent.setter
+ def parent(self, parent: object):
+ """Set frame's parent object."""
+ self.__parent = parent
+
+ @property
+ def gaze_position(self) -> object:
+ """Get current calibrated gaze position"""
+ return self.__calibrated_gaze_position
+
+ @property
+ def gaze_movement(self) -> object:
+ """Get current identified gaze movement"""
+ return self.__identified_gaze_movement
+
+ @property
+ def scan_path_analyzed(self) -> bool:
+ """Are scan path analysis ready?"""
+ return self.__scan_path_analyzed
+
+ @property
+ def scan_path_analysis(self) -> Iterator[Union[str, dict]]:
+ """Get scan path analysis.
+
+ Returns
+ iterator: analyzer module path, analysis dictionary
+ """
+ assert(self.__scan_path_analyzed)
+
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items():
+
+ yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
+
+ def as_dict(self) -> dict:
+ """Export ArFrame attributes as dictionary."""
+
+ return {
+ "name": self.__name,
+ "size": self.__size,
+ "gaze_position_calibrator": self.__gaze_position_calibrator,
+ "gaze_movement_identifier": self.__gaze_movement_identifier,
+ "filter_in_progress_identification": self.__filter_in_progress_identification,
+ "scan_path": self.__scan_path,
+ "scan_path_analyzers": self.__scan_path_analyzers,
+ "background": self.__background,
+ "heatmap": self.__heatmap,
+ "layers": self.__layers,
+ "image_parameters": self.__image_parameters
+ }
@classmethod
def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType:
- """Load attributes from dictionary.
+ """Load ArFrame attributes from dictionary.
Parameters:
frame_data: dictionary with attributes to load
@@ -697,6 +848,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
pass
+ # Load background image
+ try:
+
+ new_frame_background_value = frame_data.pop('background')
+ new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value))
+ new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC)
+
+ except KeyError:
+
+ new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8)
+
# Load heatmap
try:
@@ -714,17 +876,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
new_heatmap_data = {}
new_heatmap = None
- # Load background image
- try:
-
- new_frame_background_value = frame_data.pop('background')
- new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value))
- new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC)
-
- except KeyError:
-
- new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8)
-
# Load layers
new_layers = {}
@@ -762,31 +913,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# str: relative path to file
if type(new_logging_module_value) == str:
- logging_module_name = new_logging_module_value.split('.')[0]
+ new_logging_module = new_logging_module_value.split('.')[0]
- # Import logging module
- self.logging_module = importlib.import_module(logging_module_name)
-
- # Register loggers as pipeline step observers
- self.observers = self.logging_module.__loggers__
-
except KeyError:
- pass
+ new_logging_module = None
+
+ # DEBUG
+ print('Create frame', new_frame_name)
# Create frame
- return ArFrame(new_frame_name, \
- new_frame_size, \
- new_gaze_position_calibrator, \
- new_gaze_movement_identifier, \
- filter_in_progress_identification, \
- new_scan_path, \
- new_scan_path_analyzers, \
- new_heatmap, \
- new_frame_background, \
- new_layers, \
- new_frame_image_parameters \
- )
+ return ArFrame(new_frame_name, new_frame_size, new_gaze_position_calibrator, new_gaze_movement_identifier, filter_in_progress_identification, new_scan_path, new_scan_path_analyzers, new_frame_background, new_heatmap, new_layers, new_frame_image_parameters)
@classmethod
def from_json(self, json_filepath: str) -> ArFrameType:
@@ -804,49 +941,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return ArFrame.from_dict(frame_data, working_directory)
- @property
- def parent(self) -> object:
- """Get parent instance"""
-
- return self.__parent
-
- @parent.setter
- def parent(self, parent: object):
- """Set parent instance"""
-
- self.__parent = parent
-
- @property
- def gaze_position(self) -> object:
- """Get current calibrated gaze position"""
-
- return self.__calibrated_gaze_position
-
- @property
- def gaze_movement(self) -> object:
- """Get current identified gaze movement"""
-
- return self.__identified_gaze_movement
-
- @property
- def scan_path_analyzed(self) -> bool:
- """Are scan path analysis ready?"""
-
- return self.__scan_path_analyzed
-
- def scan_path_analysis(self) -> Iterator[Union[str, dict]]:
- """Get scan path analysis.
-
- Returns
- iterator: analyzer module path, analysis dictionary
- """
-
- assert(self.__scan_path_analyzed)
-
- for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
-
- yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
-
@DataFeatures.PipelineStepMethod
def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]:
"""
@@ -861,7 +955,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
# Use frame locker feature
- with self.locker:
+ with self._lock:
# No gaze movement identified by default
self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
@@ -870,9 +964,9 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__scan_path_analyzed = False
# Apply gaze position calibration
- if self.gaze_position_calibrator is not None:
+ if self.__gaze_position_calibrator is not None:
- self.__calibrated_gaze_position = self.gaze_position_calibrator.apply(gaze_position)
+ self.__calibrated_gaze_position = self.__gaze_position_calibrator.apply(gaze_position)
# Or update gaze position at least
else:
@@ -880,10 +974,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__calibrated_gaze_position = gaze_position
# Identify gaze movement
- if self.gaze_movement_identifier is not None:
+ if self.__gaze_movement_identifier is not None:
# Identify finished gaze movement
- self.__identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position)
+ self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position)
# Valid and finished gaze movement has been identified
if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished:
@@ -891,45 +985,45 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if GazeFeatures.is_fixation(self.__identified_gaze_movement):
# Append fixation to scan path
- if self.scan_path is not None:
+ if self.__scan_path is not None:
- self.scan_path.append_fixation(timestamp, self.__identified_gaze_movement)
+ self.__scan_path.append_fixation(timestamp, self.__identified_gaze_movement)
elif GazeFeatures.is_saccade(self.__identified_gaze_movement):
# Append saccade to scan path
- if self.scan_path is not None:
+ if self.__scan_path is not None:
- scan_step = self.scan_path.append_saccade(timestamp, self.__identified_gaze_movement)
+ scan_step = self.__scan_path.append_saccade(timestamp, self.__identified_gaze_movement)
# Is there a new step?
- if scan_step and len(self.scan_path) > 1:
+ if scan_step and len(self.__scan_path) > 1:
# Analyze aoi scan path
- for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items():
+ for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items():
- scan_path_analyzer.analyze(timestamp, self.scan_path)
+ scan_path_analyzer.analyze(timestamp, self.__scan_path)
# Update scan path analyzed state
self.__scan_path_analyzed = True
# No valid finished gaze movement: optionnaly stop in progress identification filtering
- elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification:
+ elif self.__gaze_movement_identifier is not None and not self.__filter_in_progress_identification:
- self.__identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement
+ self.__identified_gaze_movement = self.__gaze_movement_identifier.current_gaze_movement
# Update heatmap
- if self.heatmap is not None:
+ if self.__heatmap is not None:
# Scale gaze position value
- scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]])
+ scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]])
# Update heatmap image
- self.heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale)
+ self.__heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale)
# Look layers with valid identified gaze movement
# Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally
- for layer_name, layer in self.layers.items():
+ for layer_name, layer in self.__layers.items():
layer.look(timestamp, self.__identified_gaze_movement)
@@ -949,56 +1043,56 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
# Use frame locker feature
- with self.locker:
+ with self._lock:
# Draw background only
- if background_weight is not None and (heatmap_weight is None or self.heatmap is None):
+ if background_weight is not None and (heatmap_weight is None or self.__heatmap is None):
- image = self.background.copy()
+ image = self.__background.copy()
# Draw mix background and heatmap if required
- elif background_weight is not None and heatmap_weight is not None and self.heatmap:
+ elif background_weight is not None and heatmap_weight is not None and self.__heatmap:
- background_image = self.background.copy()
- heatmap_image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR)
+ background_image = self.__background.copy()
+ heatmap_image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR)
image = cv2.addWeighted(heatmap_image, heatmap_weight, background_image, background_weight, 0)
# Draw heatmap only
- elif background_weight is None and heatmap_weight is not None and self.heatmap:
+ elif background_weight is None and heatmap_weight is not None and self.__heatmap:
- image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR)
+ image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR)
# Draw black image
else:
- image = numpy.full((self.size[1], self.size[0], 3), 0).astype(numpy.uint8)
+ image = numpy.full((self.__size[1], self.__size[0], 3), 0).astype(numpy.uint8)
# Draw gaze position calibrator
if draw_gaze_position_calibrator is not None:
- self.gaze_position_calibrator.draw(image, size=self.size, **draw_gaze_position_calibrator)
+ self.__gaze_position_calibrator.draw(image, size=self.__size, **draw_gaze_position_calibrator)
# Draw scan path if required
- if draw_scan_path is not None and self.scan_path is not None:
+ if draw_scan_path is not None and self.__scan_path is not None:
- self.scan_path.draw(image, **draw_scan_path)
+ self.__scan_path.draw(image, **draw_scan_path)
# Draw current fixation if required
- if draw_fixations is not None and self.gaze_movement_identifier is not None:
+ if draw_fixations is not None and self.__gaze_movement_identifier is not None:
- self.gaze_movement_identifier.current_fixation.draw(image, **draw_fixations)
+ self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations)
# Draw current saccade if required
- if draw_saccades is not None and self.gaze_movement_identifier is not None:
+ if draw_saccades is not None and self.__gaze_movement_identifier is not None:
- self.gaze_movement_identifier.current_saccade.draw(image, **draw_saccades)
+ self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades)
# Draw layers if required
if draw_layers is not None:
for layer_name, draw_layer in draw_layers.items():
- self.layers[layer_name].draw(image, **draw_layer)
+ self.__layers[layer_name].draw(image, **draw_layer)
# Draw current gaze position if required
if draw_gaze_positions is not None:
@@ -1019,77 +1113,105 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return self.__image(**kwargs)
- return self.__image(**self.image_parameters)
+ return self.__image(**self.__image_parameters)
-@dataclass
-class ArScene():
+class ArScene(DataFeatures.PipelineStepObject):
"""
Define abstract Augmented Reality scene with ArLayers and ArFrames inside.
-
- Parameters:
- name: name of the scene
- layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
- frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
- angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.
- distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.
"""
- name: str
- layers: dict = field(default_factory=dict)
- frames: dict = field(default_factory=dict)
- angle_tolerance: float = field(default=0.)
- distance_tolerance: float = field(default=0.)
+
+ def __init__(self, name: str = None, layers: dict = None, frames: dict = None, angle_tolerance: float = 0., distance_tolerance: float = 0.):
+ """ Initialize ArScene
- def __post_init__(self):
+ Parameters:
+ name: name of the scene
+ layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
+ frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
+ angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.
+ distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.
+ """
- # Define parent attribute: it will be setup by parent object later
- self.__parent = None
+ # Init parent classes
+ super().__init__()
+
+ # Init private attributes
+ self.__name = name
+ self.__layers = layers
+ self.__frames = frames
+ self.__angle_tolerance = angle_tolerance
+ self.__distance_tolerance = distance_tolerance
+ self.__parent = None # it will be setup by parent later
# Setup layer parent attribute
- for name, layer in self.layers.items():
+ for name, layer in self.__layers.items():
layer.parent = self
# Setup frame parent attribute
- for name, frame in self.frames.items():
+ for name, frame in self.__frames.items():
frame.parent = self
- def __str__(self) -> str:
- """
- Returns:
- String representation
- """
-
- output = f'parent:\n{self.parent.name}\n'
-
- if len(self.layers):
- output += f'ArLayers:\n'
- for name, layer in self.layers.items():
- output += f'{name}:\n{layer}\n'
-
- if len(self.frames):
- output += f'ArFrames:\n'
- for name, frame in self.frames.items():
- output += f'{name}:\n{frame}\n'
-
- return output
-
@property
- def parent(self):
- """Get parent instance"""
-
+ def name(self) -> str:
+ """Get scene's name."""
+ return self.__name
+
+ @property
+ def layers(self) -> dict:
+ """Get scene's layers dictionary."""
+ return self.__layers
+
+ @property
+ def frames(self) -> dict:
+ """Get scene's frames dictionary."""
+ return self.__frames
+
+ @property
+ def angle_tolerance(self) -> float:
+ """Get scene's angle tolerance."""
+ return self.__angle_tolerance
+
+ @angle_tolerance.setter
+ def angle_tolerance(self, value: float):
+ """Set scene's angle tolerance."""
+ self.__angle_tolerance = value
+
+ @property
+ def distance_tolerance(self) -> float:
+ """Get scene's distance tolerance."""
+ return self.__distance_tolerance
+
+ @distance_tolerance.setter
+ def distance_tolerance(self, value: float):
+ """Set scene's distance tolerance."""
+ self.__distance_tolerance = value
+
+ @property
+ def parent(self) -> object:
+ """Get frame's parent object."""
return self.__parent
@parent.setter
- def parent(self, parent):
- """Get parent instance"""
-
+ def parent(self, parent: object):
+ """Set frame's parent object."""
self.__parent = parent
+ def as_dict(self) -> dict:
+ """Export ArScene attributes as dictionary."""
+
+ return {
+ "name": self.__name,
+ "layers": self.__layers,
+ "frames": self.__frames,
+ "angle_tolerance": self.__angle_tolerance,
+ "distance_tolerance": self.__distance_tolerance
+ }
+
@classmethod
def from_dict(self, scene_data: dict, working_directory: str = None) -> ArSceneType:
"""
- Load ArScene from dictionary.
+ Load ArScene attributes from dictionary.
Parameters:
scene_data: dictionary
@@ -1205,7 +1327,27 @@ class ArScene():
pass
return ArScene(new_scene_name, new_layers, new_frames, **scene_data)
-
+
+ def __str__(self) -> str:
+ """
+ Returns:
+ String representation
+ """
+
+ output = f'parent:\n{self.__parent.name}\n'
+
+ if len(self.__layers):
+ output += f'ArLayers:\n'
+ for name, layer in self.__layers.items():
+ output += f'{name}:\n{layer}\n'
+
+ if len(self.__frames):
+ output += f'ArFrames:\n'
+ for name, frame in self.__frames.items():
+ output += f'{name}:\n{frame}\n'
+
+ return output
+
def estimate_pose(self, detected_features: any) -> Tuple[numpy.array, numpy.array]:
"""Define abstract estimate scene pose method.
@@ -1232,7 +1374,7 @@ class ArScene():
iterator: name of projected layer and AOI2DScene projection
"""
- for name, layer in self.layers.items():
+ for name, layer in self.__layers.items():
# Clip AOI out of the visual horizontal field of view (optional)
# TODO: use HFOV and VFOV and don't use vision_cone method
@@ -1255,7 +1397,7 @@ class ArScene():
aoi_scene_copy = layer.aoi_scene.copy()
# Project layer aoi scene
- yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K)
+ yield name, aoi_scene_copy.project(tvec, rvec, self.__parent.aruco_detector.optic_parameters.K)
def draw(self, image: numpy.array, **kwargs: dict):
"""
@@ -1267,28 +1409,33 @@ class ArScene():
raise NotImplementedError('draw() method not implemented')
-@dataclass
class ArCamera(ArFrame):
"""
Define abstract Augmented Reality camera as ArFrame with ArScenes inside.
-
- Parameters:
- scenes: all scenes to project into camera frame
- visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV).
- visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV).
"""
- scenes: dict = field(default_factory=dict)
- visual_hfov: float = field(default=0.)
- visual_vfov: float = field(default=0.)
+ def __init__(self, scenes: dict = None, visual_hfov: float = 0., visual_vfov: float = 0., **kwargs):
+ """ Initialize ArCamera
+
+ Parameters:
+ scenes: all scenes to project into camera frame
+ visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV).
+ visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV).
+ """
+
+ # DEBUG
+ print('ArCamera.__init__ kwargs', kwargs)
- def __post_init__(self):
+ # Init parent class
+ super().__init__(**kwargs)
- # Init ArFrame
- super().__post_init__()
+ # Init private attributes
+ self.__scenes = scenes
+ self.__visual_hfov = visual_hfov
+ self.__visual_vfov = visual_vfov
# Setup scenes parent attribute
- for name, scene in self.scenes.items():
+ for name, scene in self.__scenes.items():
scene.parent = self
@@ -1301,7 +1448,7 @@ class ArCamera(ArFrame):
expected_aoi_list = []
exclude_aoi_list = []
- for scene_name, scene in self.scenes.items():
+ for scene_name, scene in self.__scenes.items():
# Append scene layer aoi to corresponding expected camera layer aoi
try:
@@ -1329,55 +1476,66 @@ class ArCamera(ArFrame):
layer.aoi_scan_path.expected_aoi = expected_aoi_list
layer.aoi_matcher.exclude = exclude_aoi_list
-
- def __str__(self) -> str:
- """
- Returns:
- String representation
- """
-
- output = f'Name:\n{self.name}\n'
- for name, scene in self.scenes.items():
- output += f'\"{name}\" {type(scene)}:\n{scene}\n'
-
- return output
-
- @classmethod
- def from_dict(self, camera_data: dict, working_directory: str = None) -> ArCameraType:
- """
- Load ArCamera from dictionary.
-
- Parameters:
- camera_data: dictionary
- working_directory: folder path where to load files when a dictionary value is a relative filepath.
- """
-
- raise NotImplementedError('from_dict() method not implemented')
-
- @classmethod
- def from_json(self, json_filepath: str) -> ArCameraType:
- """
- Load ArCamera from .json file.
-
- Parameters:
- json_filepath: path to json file
- """
-
- raise NotImplementedError('from_json() method not implemented')
+ @property
+ def scenes(self) -> dict:
+ """Get camera's scenes dictionary."""
+ return self.__scenes
@property
+ def visual_hfov(self) -> float:
+ """Get camera's visual horizontal field of view."""
+ return self.__visual_hfov
+
+ @visual_hfov.setter
+ def visual_hfov(self, value: float):
+ """Set camera's visual horizontal field of view."""
+ self.__visual_hfov = value
+
+ @property
+ def visual_vfov(self) -> float:
+ """Get camera's visual vertical field of view."""
+ return self.__visual_vfov
+
+ @visual_vfov.setter
+ def visual_vfov(self, value: float):
+ """Set camera's visual vertical field of view."""
+ self.__visual_vfov = value
+
+ @property
def scene_frames(self) -> Iterator[ArFrame]:
"""Iterate over all scenes frames"""
# For each scene
- for scene_name, scene in self.scenes.items():
+ for scene_name, scene in self.__scenes.items():
# For each scene frame
for name, scene_frame in scene.frames.items():
yield scene_frame
+ def as_dict(self) -> dict:
+ """Export ArCamera attributes as dictionary."""
+
+ return {
+ "scenes": self.__scenes,
+ "visual_hfov": self.__visual_hfov,
+ "visual_vfov": self.__visual_vfov
+ }
+
+ def __str__(self) -> str:
+ """
+ Returns:
+ String representation
+ """
+
+ output = f'Name:\n{self.__name}\n'
+
+ for name, scene in self.__scenes.items():
+ output += f'\"{name}\" {type(scene)}:\n{scene}\n'
+
+ return output
+
@DataFeatures.PipelineStepMethod
def watch(self, timestamp: int|float, image: numpy.array):
"""Detect AR features from image and project scenes into camera frame.
@@ -1405,7 +1563,7 @@ class ArCamera(ArFrame):
super().look(timestamp, gaze_position)
# Use camera frame locker feature
- with self.locker:
+ with self._lock:
# Project gaze position into each scene frames if possible
for scene_frame in self.scene_frames:
@@ -1441,13 +1599,13 @@ class ArCamera(ArFrame):
"""
# Use camera frame locker feature
- with self.locker:
+ with self._lock:
# Project camera frame background into each scene frame if possible
for frame in self.scene_frames:
# Is there an AOI inside camera frame layers projection which its name equals to a scene frame name?
- for camera_layer_name, camera_layer in self.layers.items():
+ for camera_layer_name, camera_layer in self.__layers.items():
try:
@@ -1457,7 +1615,7 @@ class ArCamera(ArFrame):
width, height = frame.size
destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]])
mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination)
- frame.background = cv2.warpPerspective(self.background, mapping, (width, height))
+ frame.background = cv2.warpPerspective(self.__background, mapping, (width, height))
# Ignore missing frame projection
except KeyError: