aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2024-01-25 20:55:53 +0100
committerThéo de la Hogue2024-01-25 20:55:53 +0100
commit5cb362ba53ec9543e9aab9b6de25d3fa27b175c1 (patch)
treeb800ea9e9ac5a1edd9d8778429346ff0d0f59625
parentc4762c2e7f6da17c1f405ed54dde990161329c00 (diff)
downloadargaze-5cb362ba53ec9543e9aab9b6de25d3fa27b175c1.zip
argaze-5cb362ba53ec9543e9aab9b6de25d3fa27b175c1.tar.gz
argaze-5cb362ba53ec9543e9aab9b6de25d3fa27b175c1.tar.bz2
argaze-5cb362ba53ec9543e9aab9b6de25d3fa27b175c1.tar.xz
ArFeatures classes are not dataclass anymore.
-rw-r--r--src/argaze/ArFeatures.py874
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoCamera.py75
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoDetector.py64
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoScene.py62
-rw-r--r--src/argaze/DataFeatures.py103
-rw-r--r--src/argaze/utils/demo_aruco_markers_run.py8
6 files changed, 716 insertions, 470 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 8c9b3c8..93a21ed 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -8,8 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "BSD"
from typing import TypeVar, Tuple, Any, Iterator, Union
-from types import ModuleType
-from dataclasses import dataclass, field
import json
import os
import sys
@@ -95,61 +93,144 @@ DEFAULT_ARLAYER_DRAW_PARAMETERS = {
}
}
-@dataclass
class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed.
!!! note
- Inherits from DataFeatures.SharedObject class to be shared by multiple threads
-
- Parameters:
- name: name of the layer
- aoi_scene: AOI scene description
- aoi_matcher: AOI matcher object
- aoi_scan_path: AOI scan path object
- aoi_scan_path_analyzers: dictionary of AOI scan path analyzers
- draw_parameters: default parameters passed to draw method
- logging_module: path to logging module file in working directory
+ Inherits from DataFeatures.SharedObject class to be shared by multiple threads.
"""
- name: str
- aoi_scene: AOIFeatures.AOIScene = field(default_factory=AOIFeatures.AOIScene)
- aoi_matcher: GazeFeatures.AOIMatcher = field(default_factory=GazeFeatures.AOIMatcher)
- aoi_scan_path: GazeFeatures.AOIScanPath = field(default_factory=GazeFeatures.AOIScanPath)
- aoi_scan_path_analyzers: dict = field(default_factory=dict)
- draw_parameters: dict = field(default_factory=DEFAULT_ARLAYER_DRAW_PARAMETERS)
- logging_module: ModuleType = field(default=None)
+ def __init__(self, name: str = None, aoi_scene: AOIFeatures.AOIScene = None, aoi_matcher: GazeFeatures.AOIMatcher = None, aoi_scan_path: GazeFeatures.AOIScanPath = None, aoi_scan_path_analyzers: dict = None, draw_parameters: dict = None):
+ """ Initialize ArLayer
- def __post_init__(self):
+ Parameters:
+ name: name of the layer
+ aoi_scene: AOI scene description
+ aoi_matcher: AOI matcher object
+ aoi_scan_path: AOI scan path object
+ aoi_scan_path_analyzers: dictionary of AOI scan path analyzers
+ draw_parameters: default parameters passed to draw method
+ """
- # Init sharedObject
+ # Init parent classes
super().__init__()
- # Define parent attribute: it will be setup by parent later
- self.__parent = None
-
- # Init current gaze movement
+ # Init private attributes
+ self.__name = name
+ self.__aoi_scene = aoi_scene
+ self.__aoi_matcher = aoi_matcher
+ self.__aoi_scan_path = aoi_scan_path
+ self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers
+ self.__draw_parameters = draw_parameters
+ self.__parent = None # it will be setup by parent later
self.__gaze_movement = GazeFeatures.UnvalidGazeMovement()
-
- # Init current looked aoi name
self.__looked_aoi_name = None
-
- # Init aoi scan path analyzed state
self.__aoi_scan_path_analyzed = False
+
+ '''
+ # Register loggers from logging module as pipeline step observers
+ if self.logging_module is not None:
+
+ self.__observers = importlib.import_module(self.logging_module).__loggers__
+
+ # DEBUG
+ print(f'Observers registered for {self.__name} layer:', self.__observers)
+ '''
# Cast aoi scene to its effective dimension
- if self.aoi_scene.dimension == 2:
+ if self.__aoi_scene.dimension == 2:
+
+ self.__aoi_scene = AOI2DScene.AOI2DScene(self.__aoi_scene)
+
+ elif self.__aoi_scene.dimension == 3:
+
+ self.__aoi_scene = AOI3DScene.AOI3DScene(self.__aoi_scene)
+
+ @property
+ def name(self) -> str:
+ """Get layer's name."""
+ return self.__name
+
+ @property
+ def aoi_scene(self) -> AOIFeatures.AOIScene:
+ """Get layer's aoi scene object."""
+ return self.__aoi_scene
+
+ @aoi_scene.setter
+ def aoi_scene(self, aoi_scene: AOIFeatures.AOIScene):
+ """Set layer's aoi scene object."""
+ self.__aoi_scene = aoi_scene
+
+ @property
+ def aoi_matcher(self) -> GazeFeatures.AOIMatcher:
+ """Get layer's aoi matcher object."""
+ return self.__aoi_matcher
+
+ @property
+ def aoi_scan_path(self) -> GazeFeatures.AOIScanPath:
+ """Get layer's aoi scan path object."""
+ return self.__aoi_scan_path
+
+ @property
+ def aoi_scan_path_analyzers(self) -> dict:
+ """Get layer's aoi scan analyzers dictionary."""
+ return self.__aoi_scan_path_analyzers
+
+ @property
+ def draw_parameters(self):
+ """Get layer's draw parameters dictionary."""
+ return self.__draw_parameters
- self.aoi_scene = AOI2DScene.AOI2DScene(self.aoi_scene)
+ @property
+ def parent(self) -> object:
+ """Get layer's parent object."""
+ return self.__parent
- elif self.aoi_scene.dimension == 3:
+ @parent.setter
+ def parent(self, parent: object):
+ """Set layer's parent object."""
+ self.__parent = parent
- self.aoi_scene = AOI3DScene.AOI3DScene(self.aoi_scene)
+ @property
+ def looked_aoi_name(self) -> str:
+ """Get aoi matcher looked aoi name."""
+ return self.__looked_aoi_name
+
+ @property
+ def aoi_scan_path_analyzed(self) -> bool:
+ """Are aoi scan path analysis ready?"""
+
+ return self.__aoi_scan_path_analyzed
+
+ @property
+ def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]:
+ """Get aoi scan path analysis.
+
+ Returns
+ iterator: analyzer module path, analysis dictionary
+ """
+ assert(self.__aoi_scan_path_analyzed)
+
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items():
+
+ yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
+
+ def as_dict(self) -> dict:
+ """Export ArLayer attributes as dictionary."""
+
+ return {
+ "name": self.__name,
+ "aoi_scene": self.__aoi_scene,
+ "aoi_matcher": self.__aoi_matcher,
+ "aoi_scan_path": self.__aoi_scan_path,
+ "aoi_scan_path_analyzers": self.__aoi_scan_path_analyzers,
+ "draw_parameters": self.__draw_parameters
+ }
@classmethod
def from_dict(self, layer_data: dict, working_directory: str = None) -> ArLayerType:
- """Load attributes from dictionary.
+ """Load ArLayer attributes from dictionary.
Parameters:
layer_data: dictionary with attributes to load
@@ -305,7 +386,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
except KeyError:
new_layer_draw_parameters = DEFAULT_ARLAYER_DRAW_PARAMETERS
-
+ '''
# Load logging module
try:
@@ -314,26 +395,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# str: relative path to file
if type(new_logging_module_value) == str:
- logging_module_name = new_logging_module_value.split('.')[0]
+ new_logging_module = new_logging_module_value.split('.')[0]
- # Import logging module
- self.logging_module = importlib.import_module(logging_module_name)
-
- # Register loggers as pipeline step observers
- self.observers = self.logging_module.__loggers__
-
except KeyError:
- pass
-
+ new_logging_module = None
+ '''
# Create layer
- return ArLayer(new_layer_name, \
- new_aoi_scene, \
- new_aoi_matcher, \
- new_aoi_scan_path, \
- new_aoi_scan_path_analyzers, \
- new_layer_draw_parameters \
- )
+ return ArLayer(new_layer_name, new_aoi_scene, new_aoi_matcher, new_aoi_scan_path, new_aoi_scan_path_analyzers, new_layer_draw_parameters)
@classmethod
def from_json(self, json_filepath: str) -> ArLayerType:
@@ -351,43 +420,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return ArLayer.from_dict(layer_data, working_directory)
- @property
- def parent(self):
- """Get parent instance"""
-
- return self.__parent
-
- @parent.setter
- def parent(self, parent):
- """Get parent instance"""
-
- self.__parent = parent
-
- @property
- def looked_aoi_name(self) -> str:
- """The name of looked aoi."""
-
- return self.__looked_aoi_name
-
- @property
- def aoi_scan_path_analyzed(self) -> bool:
- """Are aoi scan path analysis ready?"""
-
- return self.__aoi_scan_path_analyzed
-
- def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]:
- """Get aoi scan path analysis.
-
- Returns
- iterator: analyzer module path, analysis dictionary
- """
-
- assert(self.__aoi_scan_path_analyzed)
-
- for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
-
- yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
-
@DataFeatures.PipelineStepMethod
def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()):
"""
@@ -402,7 +434,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
# Use layer locker feature
- with self.locker:
+ with self._lock:
# Update current gaze movement
self.__gaze_movement = gaze_movement
@@ -413,11 +445,11 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Reset aoi scan path analyzed state
self.__aoi_scan_path_analyzed = False
- if self.aoi_matcher is not None:
+ if self.__aoi_matcher is not None:
# Update looked aoi thanks to aoi matcher
# Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally
- self.__looked_aoi_name, _ = self.aoi_matcher.match(timestamp, self.aoi_scene, gaze_movement)
+ self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement)
# Valid and finished gaze movement has been identified
if gaze_movement.valid and gaze_movement.finished:
@@ -425,17 +457,17 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if GazeFeatures.is_fixation(gaze_movement):
# Append fixation to aoi scan path
- if self.aoi_scan_path is not None and self.__looked_aoi_name is not None:
+ if self.__aoi_scan_path is not None and self.__looked_aoi_name is not None:
- aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name)
+ aoi_scan_step = self.__aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name)
# Is there a new step?
- if aoi_scan_step is not None and len(self.aoi_scan_path) > 1:
+ if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1:
# Analyze aoi scan path
- for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items():
- aoi_scan_path_analyzer.analyze(timestamp, self.aoi_scan_path)
+ aoi_scan_path_analyzer.analyze(timestamp, self.__aoi_scan_path)
# Update aoi scan path analyzed state
self.__aoi_scan_path_analyzed = True
@@ -443,9 +475,9 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
elif GazeFeatures.is_saccade(gaze_movement):
# Append saccade to aoi scan path
- if self.aoi_scan_path is not None:
+ if self.__aoi_scan_path is not None:
- self.aoi_scan_path.append_saccade(timestamp, gaze_movement)
+ self.__aoi_scan_path.append_saccade(timestamp, gaze_movement)
def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None):
"""
@@ -459,20 +491,20 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Use draw_parameters attribute if no parameters
if draw_aoi_scene is None and draw_aoi_matching is None:
- return self.draw(image, **self.draw_parameters)
+ return self.draw(image, **self.__draw_parameters)
# Use layer locker feature
- with self.locker:
+ with self._lock:
# Draw aoi if required
if draw_aoi_scene is not None:
- self.aoi_scene.draw(image, **draw_aoi_scene)
+ self.__aoi_scene.draw(image, **draw_aoi_scene)
# Draw aoi matching if required
- if draw_aoi_matching is not None and self.aoi_matcher is not None:
+ if draw_aoi_matching is not None and self.__aoi_matcher is not None:
- self.aoi_matcher.draw(image, self.aoi_scene, **draw_aoi_matching)
+ self.__aoi_matcher.draw(image, self.__aoi_scene, **draw_aoi_matching)
# Define default ArFrame image parameters
DEFAULT_ARFRAME_IMAGE_PARAMETERS = {
@@ -495,67 +527,186 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = {
}
}
-@dataclass
class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed.
!!! note
Inherits from DataFeatures.SharedObject class to be shared by multiple threads
-
- Parameters:
- name: name of the frame
- size: defines the dimension of the rectangular area where gaze positions are projected
- gaze_position_calibrator: gaze position calibration algoritm
- gaze_movement_identifier: gaze movement identification algorithm
- filter_in_progress_identification: ignore in progress gaze movement identification
- scan_path: scan path object
- scan_path_analyzers: dictionary of scan path analyzers
- heatmap: heatmap object
- background: picture to draw behind
- layers: dictionary of AOI layers
- image_parameters: default parameters passed to image method
- logging_module: path to logging module file in working directory
"""
- name: str
- size: tuple[int] = field(default=(1, 1))
- gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = field(default_factory=GazeFeatures.GazePositionCalibrator)
- gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = field(default_factory=GazeFeatures.GazeMovementIdentifier)
- filter_in_progress_identification: bool = field(default=True)
- scan_path: GazeFeatures.ScanPath = field(default_factory=GazeFeatures.ScanPath)
- scan_path_analyzers: dict = field(default_factory=dict)
- heatmap: AOIFeatures.Heatmap = field(default_factory=AOIFeatures.Heatmap)
- background: numpy.array = field(default_factory=lambda : numpy.array([]))
- layers: dict = field(default_factory=dict)
- image_parameters: dict = field(default_factory=DEFAULT_ARFRAME_IMAGE_PARAMETERS)
- logging_module: ModuleType = field(default=None)
-
- def __post_init__(self):
-
- # Init sharedObject
+ def __init__(self, name: str = None, size: tuple[int] = (1, 1), gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = None, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = None, filter_in_progress_identification: bool = True, scan_path: GazeFeatures.ScanPath = None, scan_path_analyzers: dict = None, background: numpy.array = numpy.array([]), heatmap: AOIFeatures.Heatmap = None, layers: dict = None, image_parameters: dict = DEFAULT_ARFRAME_IMAGE_PARAMETERS):
+ """ Initialize ArFrame
+
+ Parameters:
+ name: name of the frame
+ size: defines the dimension of the rectangular area where gaze positions are projected
+ gaze_position_calibrator: gaze position calibration algoritm
+ gaze_movement_identifier: gaze movement identification algorithm
+ filter_in_progress_identification: ignore in progress gaze movement identification
+ scan_path: scan path object
+ scan_path_analyzers: dictionary of scan path analyzers
+ background: picture to draw behind
+ heatmap: heatmap object
+ layers: dictionary of AOI layers
+ image_parameters: default parameters passed to image method
+ """
+
+ # DEBUG
+ print(f'ArFrame.__init__ {name} {layers}')
+
+ # Init parent classes
super().__init__()
- # Define parent attribute: it will be setup by parent later
- self.__parent = None
+ # Init private attributes
+ self.__name = name
+ self.__size = size
+ self.__gaze_position_calibrator = gaze_position_calibrator
+ self.__gaze_movement_identifier = gaze_movement_identifier
+ self.__filter_in_progress_identification = filter_in_progress_identification
+ self.__scan_path = scan_path
+ self.__scan_path_analyzers = scan_path_analyzers
+ self.__background = background
+ self.__heatmap = heatmap
+ self.__layers = layers
+ self.__image_parameters = image_parameters
+ self.__parent = None # it will be setup by parent later
+ self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition()
+ self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ self.__scan_path_analyzed = False
# Setup layers parent attribute
- for name, layer in self.layers.items():
+ for name, layer in self.__layers.items():
layer.parent = self
+ '''
+ # Import logging module __loggers__ variable as pipeline step observers
+ if self.logging_module is not None:
- # Init current gaze position
- self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition()
+ self.__observers = importlib.import_module(self.logging_module).__loggers__
- # Init current gaze movement
- self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ # DEBUG
+ print(f'Observers registered for {self.__name} frame:', self.__observers)
+ '''
- # Init scan path analyzed state
- self.__scan_path_analyzed = False
+ @property
+ def name(self) -> str:
+ """Get frame's name."""
+ return self.__name
+
+ @property
+ def size(self) -> tuple[int]:
+ """Get frame's size."""
+ return self.__size
+
+ @property
+ def gaze_position_calibrator(self) -> GazeFeatures.GazePositionCalibrator:
+ """Get frame's gaze position calibrator object."""
+ return self.__gaze_position_calibrator
+
+ @property
+ def gaze_movement_identifier(self) -> GazeFeatures.GazeMovementIdentifier:
+ """Get frame's gaze movement identifier object."""
+ return self.__gaze_movement_identifier
+
+ @property
+ def filter_in_progress_indentification(self) -> bool:
+ """Is frame filtering in progress identification?"""
+ return self.__filter_in_progress_indentification
+
+ @property
+ def scan_path(self) -> GazeFeatures.ScanPath:
+ """Get frame's scan path object."""
+ return self.__scan_path
+
+ @property
+ def scan_path_analyzers(self) -> dict:
+ """Get frame's scan path analyzers dictionary."""
+ return self.__scan_path_analyzers
+
+ @property
+ def background(self) -> numpy.array:
+ """Get frame's background matrix."""
+ return self.__background
+
+ @background.setter
+ def background(self, image: numpy.array):
+ """Set frame's background matrix."""
+ self.__background = image
+
+ @property
+ def heatmap(self) -> AOIFeatures.Heatmap:
+ """Get frame's heatmap object."""
+ return self.__heatmap
+
+ @property
+ def layers(self) -> dict:
+ """Get frame's layers dictionary."""
+ return self.__layers
+
+ @property
+ def image_parameters(self) -> dict:
+ """Get frame's image parameters dictionary."""
+ return self.__image_parameters
+
+ @property
+ def parent(self) -> object:
+ """Get frame's parent object."""
+ return self.__parent
+
+ @parent.setter
+ def parent(self, parent: object):
+ """Set frame's parent object."""
+ self.__parent = parent
+
+ @property
+ def gaze_position(self) -> object:
+ """Get current calibrated gaze position"""
+ return self.__calibrated_gaze_position
+
+ @property
+ def gaze_movement(self) -> object:
+ """Get current identified gaze movement"""
+ return self.__identified_gaze_movement
+
+ @property
+ def scan_path_analyzed(self) -> bool:
+ """Are scan path analysis ready?"""
+ return self.__scan_path_analyzed
+
+ @property
+ def scan_path_analysis(self) -> Iterator[Union[str, dict]]:
+ """Get scan path analysis.
+
+ Returns
+ iterator: analyzer module path, analysis dictionary
+ """
+ assert(self.__scan_path_analyzed)
+
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items():
+
+ yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
+
+ def as_dict(self) -> dict:
+ """Export ArFrame attributes as dictionary."""
+
+ return {
+ "name": self.__name,
+ "size": self.__size,
+ "gaze_position_calibrator": self.__gaze_position_calibrator,
+ "gaze_movement_identifier": self.__gaze_movement_identifier,
+ "filter_in_progress_identification": self.__filter_in_progress_identification,
+ "scan_path": self.__scan_path,
+ "scan_path_analyzers": self.__scan_path_analyzers,
+ "background": self.__background,
+ "heatmap": self.__heatmap,
+ "layers": self.__layers,
+ "image_parameters": self.__image_parameters
+ }
@classmethod
def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType:
- """Load attributes from dictionary.
+ """Load ArFrame attributes from dictionary.
Parameters:
frame_data: dictionary with attributes to load
@@ -697,6 +848,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
pass
+ # Load background image
+ try:
+
+ new_frame_background_value = frame_data.pop('background')
+ new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value))
+ new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC)
+
+ except KeyError:
+
+ new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8)
+
# Load heatmap
try:
@@ -714,17 +876,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
new_heatmap_data = {}
new_heatmap = None
- # Load background image
- try:
-
- new_frame_background_value = frame_data.pop('background')
- new_frame_background = cv2.imread(os.path.join(working_directory, new_frame_background_value))
- new_frame_background = cv2.resize(new_frame_background, dsize=new_frame_size, interpolation=cv2.INTER_CUBIC)
-
- except KeyError:
-
- new_frame_background = numpy.full((new_frame_size[1], new_frame_size[0], 3), 127).astype(numpy.uint8)
-
# Load layers
new_layers = {}
@@ -762,31 +913,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# str: relative path to file
if type(new_logging_module_value) == str:
- logging_module_name = new_logging_module_value.split('.')[0]
+ new_logging_module = new_logging_module_value.split('.')[0]
- # Import logging module
- self.logging_module = importlib.import_module(logging_module_name)
-
- # Register loggers as pipeline step observers
- self.observers = self.logging_module.__loggers__
-
except KeyError:
- pass
+ new_logging_module = None
+
+ # DEBUG
+ print('Create frame', new_frame_name)
# Create frame
- return ArFrame(new_frame_name, \
- new_frame_size, \
- new_gaze_position_calibrator, \
- new_gaze_movement_identifier, \
- filter_in_progress_identification, \
- new_scan_path, \
- new_scan_path_analyzers, \
- new_heatmap, \
- new_frame_background, \
- new_layers, \
- new_frame_image_parameters \
- )
+ return ArFrame(new_frame_name, new_frame_size, new_gaze_position_calibrator, new_gaze_movement_identifier, filter_in_progress_identification, new_scan_path, new_scan_path_analyzers, new_frame_background, new_heatmap, new_layers, new_frame_image_parameters)
@classmethod
def from_json(self, json_filepath: str) -> ArFrameType:
@@ -804,49 +941,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return ArFrame.from_dict(frame_data, working_directory)
- @property
- def parent(self) -> object:
- """Get parent instance"""
-
- return self.__parent
-
- @parent.setter
- def parent(self, parent: object):
- """Set parent instance"""
-
- self.__parent = parent
-
- @property
- def gaze_position(self) -> object:
- """Get current calibrated gaze position"""
-
- return self.__calibrated_gaze_position
-
- @property
- def gaze_movement(self) -> object:
- """Get current identified gaze movement"""
-
- return self.__identified_gaze_movement
-
- @property
- def scan_path_analyzed(self) -> bool:
- """Are scan path analysis ready?"""
-
- return self.__scan_path_analyzed
-
- def scan_path_analysis(self) -> Iterator[Union[str, dict]]:
- """Get scan path analysis.
-
- Returns
- iterator: analyzer module path, analysis dictionary
- """
-
- assert(self.__scan_path_analyzed)
-
- for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
-
- yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
-
@DataFeatures.PipelineStepMethod
def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]:
"""
@@ -861,7 +955,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
# Use frame locker feature
- with self.locker:
+ with self._lock:
# No gaze movement identified by default
self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
@@ -870,9 +964,9 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__scan_path_analyzed = False
# Apply gaze position calibration
- if self.gaze_position_calibrator is not None:
+ if self.__gaze_position_calibrator is not None:
- self.__calibrated_gaze_position = self.gaze_position_calibrator.apply(gaze_position)
+ self.__calibrated_gaze_position = self.__gaze_position_calibrator.apply(gaze_position)
# Or update gaze position at least
else:
@@ -880,10 +974,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__calibrated_gaze_position = gaze_position
# Identify gaze movement
- if self.gaze_movement_identifier is not None:
+ if self.__gaze_movement_identifier is not None:
# Identify finished gaze movement
- self.__identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position)
+ self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position)
# Valid and finished gaze movement has been identified
if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished:
@@ -891,45 +985,45 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
if GazeFeatures.is_fixation(self.__identified_gaze_movement):
# Append fixation to scan path
- if self.scan_path is not None:
+ if self.__scan_path is not None:
- self.scan_path.append_fixation(timestamp, self.__identified_gaze_movement)
+ self.__scan_path.append_fixation(timestamp, self.__identified_gaze_movement)
elif GazeFeatures.is_saccade(self.__identified_gaze_movement):
# Append saccade to scan path
- if self.scan_path is not None:
+ if self.__scan_path is not None:
- scan_step = self.scan_path.append_saccade(timestamp, self.__identified_gaze_movement)
+ scan_step = self.__scan_path.append_saccade(timestamp, self.__identified_gaze_movement)
# Is there a new step?
- if scan_step and len(self.scan_path) > 1:
+ if scan_step and len(self.__scan_path) > 1:
# Analyze aoi scan path
- for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items():
+ for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items():
- scan_path_analyzer.analyze(timestamp, self.scan_path)
+ scan_path_analyzer.analyze(timestamp, self.__scan_path)
# Update scan path analyzed state
self.__scan_path_analyzed = True
# No valid finished gaze movement: optionnaly stop in progress identification filtering
- elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification:
+ elif self.__gaze_movement_identifier is not None and not self.__filter_in_progress_identification:
- self.__identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement
+ self.__identified_gaze_movement = self.__gaze_movement_identifier.current_gaze_movement
# Update heatmap
- if self.heatmap is not None:
+ if self.__heatmap is not None:
# Scale gaze position value
- scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]])
+ scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]])
# Update heatmap image
- self.heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale)
+ self.__heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale)
# Look layers with valid identified gaze movement
# Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally
- for layer_name, layer in self.layers.items():
+ for layer_name, layer in self.__layers.items():
layer.look(timestamp, self.__identified_gaze_movement)
@@ -949,56 +1043,56 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
# Use frame locker feature
- with self.locker:
+ with self._lock:
# Draw background only
- if background_weight is not None and (heatmap_weight is None or self.heatmap is None):
+ if background_weight is not None and (heatmap_weight is None or self.__heatmap is None):
- image = self.background.copy()
+ image = self.__background.copy()
# Draw mix background and heatmap if required
- elif background_weight is not None and heatmap_weight is not None and self.heatmap:
+ elif background_weight is not None and heatmap_weight is not None and self.__heatmap:
- background_image = self.background.copy()
- heatmap_image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR)
+ background_image = self.__background.copy()
+ heatmap_image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR)
image = cv2.addWeighted(heatmap_image, heatmap_weight, background_image, background_weight, 0)
# Draw heatmap only
- elif background_weight is None and heatmap_weight is not None and self.heatmap:
+ elif background_weight is None and heatmap_weight is not None and self.__heatmap:
- image = cv2.resize(self.heatmap.image, dsize=self.size, interpolation=cv2.INTER_LINEAR)
+ image = cv2.resize(self.__heatmap.image, dsize=self.__size, interpolation=cv2.INTER_LINEAR)
# Draw black image
else:
- image = numpy.full((self.size[1], self.size[0], 3), 0).astype(numpy.uint8)
+ image = numpy.full((self.__size[1], self.__size[0], 3), 0).astype(numpy.uint8)
# Draw gaze position calibrator
if draw_gaze_position_calibrator is not None:
- self.gaze_position_calibrator.draw(image, size=self.size, **draw_gaze_position_calibrator)
+ self.__gaze_position_calibrator.draw(image, size=self.__size, **draw_gaze_position_calibrator)
# Draw scan path if required
- if draw_scan_path is not None and self.scan_path is not None:
+ if draw_scan_path is not None and self.__scan_path is not None:
- self.scan_path.draw(image, **draw_scan_path)
+ self.__scan_path.draw(image, **draw_scan_path)
# Draw current fixation if required
- if draw_fixations is not None and self.gaze_movement_identifier is not None:
+ if draw_fixations is not None and self.__gaze_movement_identifier is not None:
- self.gaze_movement_identifier.current_fixation.draw(image, **draw_fixations)
+ self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations)
# Draw current saccade if required
- if draw_saccades is not None and self.gaze_movement_identifier is not None:
+ if draw_saccades is not None and self.__gaze_movement_identifier is not None:
- self.gaze_movement_identifier.current_saccade.draw(image, **draw_saccades)
+ self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades)
# Draw layers if required
if draw_layers is not None:
for layer_name, draw_layer in draw_layers.items():
- self.layers[layer_name].draw(image, **draw_layer)
+ self.__layers[layer_name].draw(image, **draw_layer)
# Draw current gaze position if required
if draw_gaze_positions is not None:
@@ -1019,77 +1113,105 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return self.__image(**kwargs)
- return self.__image(**self.image_parameters)
+ return self.__image(**self.__image_parameters)
-@dataclass
-class ArScene():
+class ArScene(DataFeatures.PipelineStepObject):
"""
Define abstract Augmented Reality scene with ArLayers and ArFrames inside.
-
- Parameters:
- name: name of the scene
- layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
- frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
- angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.
- distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.
"""
- name: str
- layers: dict = field(default_factory=dict)
- frames: dict = field(default_factory=dict)
- angle_tolerance: float = field(default=0.)
- distance_tolerance: float = field(default=0.)
+
+ def __init__(self, name: str = None, layers: dict = None, frames: dict = None, angle_tolerance: float = 0., distance_tolerance: float = 0.):
+ """ Initialize ArScene
- def __post_init__(self):
+ Parameters:
+ name: name of the scene
+ layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
+ frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below.
+ angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.
+ distance_tolerance: Optional distance error tolerance to validate marker pose in centimeter used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function.
+ """
- # Define parent attribute: it will be setup by parent object later
- self.__parent = None
+ # Init parent classes
+ super().__init__()
+
+ # Init private attributes
+ self.__name = name
+ self.__layers = layers
+ self.__frames = frames
+ self.__angle_tolerance = angle_tolerance
+ self.__distance_tolerance = distance_tolerance
+ self.__parent = None # it will be setup by parent later
# Setup layer parent attribute
- for name, layer in self.layers.items():
+ for name, layer in self.__layers.items():
layer.parent = self
# Setup frame parent attribute
- for name, frame in self.frames.items():
+ for name, frame in self.__frames.items():
frame.parent = self
- def __str__(self) -> str:
- """
- Returns:
- String representation
- """
-
- output = f'parent:\n{self.parent.name}\n'
-
- if len(self.layers):
- output += f'ArLayers:\n'
- for name, layer in self.layers.items():
- output += f'{name}:\n{layer}\n'
-
- if len(self.frames):
- output += f'ArFrames:\n'
- for name, frame in self.frames.items():
- output += f'{name}:\n{frame}\n'
-
- return output
-
@property
- def parent(self):
- """Get parent instance"""
-
+ def name(self) -> str:
+ """Get scene's name."""
+ return self.__name
+
+ @property
+ def layers(self) -> dict:
+ """Get scene's layers dictionary."""
+ return self.__layers
+
+ @property
+ def frames(self) -> dict:
+ """Get scene's frames dictionary."""
+ return self.__frames
+
+ @property
+ def angle_tolerance(self) -> float:
+ """Get scene's angle tolerance."""
+ return self.__angle_tolerance
+
+ @angle_tolerance.setter
+ def angle_tolerance(self, value: float):
+ """Set scene's angle tolerance."""
+ self.__angle_tolerance = value
+
+ @property
+ def distance_tolerance(self) -> float:
+ """Get scene's distance tolerance."""
+ return self.__distance_tolerance
+
+ @distance_tolerance.setter
+ def distance_tolerance(self, value: float):
+ """Set scene's distance tolerance."""
+ self.__distance_tolerance = value
+
+ @property
+ def parent(self) -> object:
+ """Get frame's parent object."""
return self.__parent
@parent.setter
- def parent(self, parent):
- """Get parent instance"""
-
+ def parent(self, parent: object):
+ """Set frame's parent object."""
self.__parent = parent
+ def as_dict(self) -> dict:
+ """Export ArScene attributes as dictionary."""
+
+ return {
+ "name": self.__name,
+ "layers": self.__layers,
+ "frames": self.__frames,
+ "angle_tolerance": self.__angle_tolerance,
+ "distance_tolerance": self.__distance_tolerance
+ }
+
@classmethod
def from_dict(self, scene_data: dict, working_directory: str = None) -> ArSceneType:
"""
- Load ArScene from dictionary.
+ Load ArScene attributes from dictionary.
Parameters:
scene_data: dictionary
@@ -1205,7 +1327,27 @@ class ArScene():
pass
return ArScene(new_scene_name, new_layers, new_frames, **scene_data)
-
+
+ def __str__(self) -> str:
+ """
+ Returns:
+ String representation
+ """
+
+ output = f'parent:\n{self.__parent.name}\n'
+
+ if len(self.__layers):
+ output += f'ArLayers:\n'
+ for name, layer in self.__layers.items():
+ output += f'{name}:\n{layer}\n'
+
+ if len(self.__frames):
+ output += f'ArFrames:\n'
+ for name, frame in self.__frames.items():
+ output += f'{name}:\n{frame}\n'
+
+ return output
+
def estimate_pose(self, detected_features: any) -> Tuple[numpy.array, numpy.array]:
"""Define abstract estimate scene pose method.
@@ -1232,7 +1374,7 @@ class ArScene():
iterator: name of projected layer and AOI2DScene projection
"""
- for name, layer in self.layers.items():
+ for name, layer in self.__layers.items():
# Clip AOI out of the visual horizontal field of view (optional)
# TODO: use HFOV and VFOV and don't use vision_cone method
@@ -1255,7 +1397,7 @@ class ArScene():
aoi_scene_copy = layer.aoi_scene.copy()
# Project layer aoi scene
- yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K)
+ yield name, aoi_scene_copy.project(tvec, rvec, self.__parent.aruco_detector.optic_parameters.K)
def draw(self, image: numpy.array, **kwargs: dict):
"""
@@ -1267,28 +1409,33 @@ class ArScene():
raise NotImplementedError('draw() method not implemented')
-@dataclass
class ArCamera(ArFrame):
"""
Define abstract Augmented Reality camera as ArFrame with ArScenes inside.
-
- Parameters:
- scenes: all scenes to project into camera frame
- visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV).
- visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV).
"""
- scenes: dict = field(default_factory=dict)
- visual_hfov: float = field(default=0.)
- visual_vfov: float = field(default=0.)
+ def __init__(self, scenes: dict = None, visual_hfov: float = 0., visual_vfov: float = 0., **kwargs):
+ """ Initialize ArCamera
+
+ Parameters:
+ scenes: all scenes to project into camera frame
+ visual_hfov: Optional angle in degree to clip scenes projection according visual horizontal field of view (HFOV).
+ visual_vfov: Optional angle in degree to clip scenes projection according visual vertical field of view (VFOV).
+ """
+
+ # DEBUG
+ print('ArCamera.__init__ kwargs', kwargs)
- def __post_init__(self):
+ # Init parent class
+ super().__init__(**kwargs)
- # Init ArFrame
- super().__post_init__()
+ # Init private attributes
+ self.__scenes = scenes
+ self.__visual_hfov = visual_hfov
+ self.__visual_vfov = visual_vfov
# Setup scenes parent attribute
- for name, scene in self.scenes.items():
+ for name, scene in self.__scenes.items():
scene.parent = self
@@ -1301,7 +1448,7 @@ class ArCamera(ArFrame):
expected_aoi_list = []
exclude_aoi_list = []
- for scene_name, scene in self.scenes.items():
+ for scene_name, scene in self.__scenes.items():
# Append scene layer aoi to corresponding expected camera layer aoi
try:
@@ -1329,55 +1476,66 @@ class ArCamera(ArFrame):
layer.aoi_scan_path.expected_aoi = expected_aoi_list
layer.aoi_matcher.exclude = exclude_aoi_list
-
- def __str__(self) -> str:
- """
- Returns:
- String representation
- """
-
- output = f'Name:\n{self.name}\n'
- for name, scene in self.scenes.items():
- output += f'\"{name}\" {type(scene)}:\n{scene}\n'
-
- return output
-
- @classmethod
- def from_dict(self, camera_data: dict, working_directory: str = None) -> ArCameraType:
- """
- Load ArCamera from dictionary.
-
- Parameters:
- camera_data: dictionary
- working_directory: folder path where to load files when a dictionary value is a relative filepath.
- """
-
- raise NotImplementedError('from_dict() method not implemented')
-
- @classmethod
- def from_json(self, json_filepath: str) -> ArCameraType:
- """
- Load ArCamera from .json file.
-
- Parameters:
- json_filepath: path to json file
- """
-
- raise NotImplementedError('from_json() method not implemented')
+ @property
+ def scenes(self) -> dict:
+ """Get camera's scenes dictionary."""
+ return self.__scenes
@property
+ def visual_hfov(self) -> float:
+ """Get camera's visual horizontal field of view."""
+ return self.__visual_hfov
+
+ @visual_hfov.setter
+ def visual_hfov(self, value: float):
+ """Set camera's visual horizontal field of view."""
+ self.__visual_hfov = value
+
+ @property
+ def visual_vfov(self) -> float:
+ """Get camera's visual vertical field of view."""
+ return self.__visual_vfov
+
+ @visual_vfov.setter
+ def visual_vfov(self, value: float):
+ """Set camera's visual vertical field of view."""
+ self.__visual_vfov = value
+
+ @property
def scene_frames(self) -> Iterator[ArFrame]:
"""Iterate over all scenes frames"""
# For each scene
- for scene_name, scene in self.scenes.items():
+ for scene_name, scene in self.__scenes.items():
# For each scene frame
for name, scene_frame in scene.frames.items():
yield scene_frame
+ def as_dict(self) -> dict:
+ """Export ArCamera attributes as dictionary."""
+
+ return {
+ "scenes": self.__scenes,
+ "visual_hfov": self.__visual_hfov,
+ "visual_vfov": self.__visual_vfov
+ }
+
+ def __str__(self) -> str:
+ """
+ Returns:
+ String representation
+ """
+
+ output = f'Name:\n{self.__name}\n'
+
+ for name, scene in self.__scenes.items():
+ output += f'\"{name}\" {type(scene)}:\n{scene}\n'
+
+ return output
+
@DataFeatures.PipelineStepMethod
def watch(self, timestamp: int|float, image: numpy.array):
"""Detect AR features from image and project scenes into camera frame.
@@ -1405,7 +1563,7 @@ class ArCamera(ArFrame):
super().look(timestamp, gaze_position)
# Use camera frame locker feature
- with self.locker:
+ with self._lock:
# Project gaze position into each scene frames if possible
for scene_frame in self.scene_frames:
@@ -1441,13 +1599,13 @@ class ArCamera(ArFrame):
"""
# Use camera frame locker feature
- with self.locker:
+ with self._lock:
# Project camera frame background into each scene frame if possible
for frame in self.scene_frames:
# Is there an AOI inside camera frame layers projection which its name equals to a scene frame name?
- for camera_layer_name, camera_layer in self.layers.items():
+ for camera_layer_name, camera_layer in self.__layers.items():
try:
@@ -1457,7 +1615,7 @@ class ArCamera(ArFrame):
width, height = frame.size
destination = numpy.float32([[0, 0], [width, 0], [width, height], [0, height]])
mapping = cv2.getPerspectiveTransform(aoi_2d.astype(numpy.float32), destination)
- frame.background = cv2.warpPerspective(self.background, mapping, (width, height))
+ frame.background = cv2.warpPerspective(self.__background, mapping, (width, height))
# Ignore missing frame projection
except KeyError:
diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py
index 775ab40..ca58c20 100644
--- a/src/argaze/ArUcoMarkers/ArUcoCamera.py
+++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py
@@ -8,7 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "BSD"
from typing import TypeVar, Tuple
-from dataclasses import dataclass, field
import json
import os
import time
@@ -33,26 +32,32 @@ DEFAULT_ARUCOCAMERA_IMAGE_PARAMETERS = {
}
}
-@dataclass
class ArUcoCamera(ArFeatures.ArCamera):
"""
Define an ArCamera based on ArUco marker detection.
-
- Parameters:
- aruco_detector: ArUco marker detector
"""
- aruco_detector: ArUcoDetector.ArUcoDetector = field(default_factory=ArUcoDetector.ArUcoDetector)
+ def __init__(self, aruco_detector: ArUcoDetector.ArUcoDetector, **kwargs):
+ """ Initialize ArUcoCamera
+
+ Parameters:
+ aruco_detector: ArUco marker detector
+ """
- def __post_init__(self):
+ # DEBUG
+ print('ArUcoCamera.__init__ kwargs', kwargs)
- super().__post_init__()
+ # Init parent class
+ super().__init__(**kwargs)
+
+ # Init private attribute
+ self.__aruco_detector = aruco_detector
# Check optic parameters
- if self.aruco_detector.optic_parameters is not None:
+ if self.__aruco_detector.optic_parameters is not None:
# Optic parameters dimensions should be equal to camera frame size
- if self.aruco_detector.optic_parameters.dimensions != self.size:
+ if self.__aruco_detector.optic_parameters.dimensions != self.size:
raise ArFeatures.LoadingFailed('ArUcoCamera: aruco_detector.optic_parameters.dimensions have to be equal to size.')
@@ -61,18 +66,12 @@ class ArUcoCamera(ArFeatures.ArCamera):
# Create default optic parameters adapted to frame size
# Note: The choice of 1000 for default focal length should be discussed...
- self.aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=self.size, K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=self.size[0], height=self.size[1]))
+ self.__aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=self.size, K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=self.size[0], height=self.size[1]))
- def __str__(self) -> str:
- """
- Returns:
- String representation
- """
-
- output = super().__str__()
- output += f'ArUcoDetector:\n{self.aruco_detector}\n'
-
- return output
+ @property
+ def aruco_detector(self) -> ArUcoDetector.ArUcoDetector:
+ """Get ArUco detector object."""
+ return self.__aruco_detector
@classmethod
def from_dict(self, aruco_camera_data: dict, working_directory: str = None) -> ArUcoCameraType:
@@ -120,11 +119,14 @@ class ArUcoCamera(ArFeatures.ArCamera):
for layer_name, layer_data in aruco_camera_data['layers'].items():
aruco_camera_data['image_parameters']['draw_layers'][layer_name] = ArFeatures.DEFAULT_ARLAYER_DRAW_PARAMETERS
- # Get values of temporary ar frame created from aruco_camera_data
- temp_ar_frame_values = DataFeatures.as_dict(ArFeatures.ArFrame.from_dict(aruco_camera_data, working_directory))
+ # Load temporary camera from aruco_camera_data then export it as dict
+ temp_camera_data = ArFeatures.ArCamera.from_dict(aruco_camera_data, working_directory).as_dict()
+
+ # DEBUG
+ print('ArUcoCamera.from_dict: temp_camera_data=', temp_camera_data)
# Create new aruco camera using temporary ar frame values
- return ArUcoCamera(aruco_detector=new_aruco_detector, scenes=new_scenes, **temp_ar_frame_values)
+ return ArUcoCamera(aruco_detector = new_aruco_detector, scenes = new_scenes, **temp_camera_data)
@classmethod
def from_json(self, json_filepath: str) -> ArUcoCameraType:
@@ -142,6 +144,17 @@ class ArUcoCamera(ArFeatures.ArCamera):
return ArUcoCamera.from_dict(aruco_camera_data, working_directory)
+ def __str__(self) -> str:
+ """
+ Returns:
+ String representation
+ """
+
+ output = super().__str__()
+ output += f'ArUcoDetector:\n{self.__aruco_detector}\n'
+
+ return output
+
@DataFeatures.PipelineStepMethod
def watch(self, timestamp: int|float, image: numpy.array):
"""Detect environment aruco markers from image and project scenes into camera frame.
@@ -151,10 +164,10 @@ class ArUcoCamera(ArFeatures.ArCamera):
"""
# Use camera frame locker feature
- with self.locker:
+ with self._lock:
# Detect aruco markers
- self.aruco_detector.detect_markers(timestamp, image)
+ self.__aruco_detector.detect_markers(timestamp, image)
# Fill camera frame background with image
self.background = image
@@ -173,7 +186,7 @@ class ArUcoCamera(ArFeatures.ArCamera):
try:
# Build AOI scene directly from detected ArUco marker corners
- self.layers[??].aoi_2d_scene |= scene.build_aruco_aoi_scene(self.aruco_detector.detected_markers)
+ self.layers[??].aoi_2d_scene |= scene.build_aruco_aoi_scene(self.__aruco_detector.detected_markers)
except ArFeatures.PoseEstimationFailed:
@@ -181,7 +194,7 @@ class ArUcoCamera(ArFeatures.ArCamera):
'''
# Estimate scene pose from detected scene markers
- tvec, rmat, _ = scene.estimate_pose(self.aruco_detector.detected_markers)
+ tvec, rmat, _ = scene.estimate_pose(self.__aruco_detector.detected_markers)
# Project scene into camera frame according estimated pose
for layer_name, layer_projection in scene.project(tvec, rmat, self.visual_hfov, self.visual_vfov):
@@ -216,12 +229,12 @@ class ArUcoCamera(ArFeatures.ArCamera):
image = super().image(**kwargs)
# Use frame locker feature
- with self.locker:
+ with self._lock:
# Draw optic parameters grid if required
if draw_optic_parameters_grid is not None:
- self.aruco_detector.optic_parameters.draw(image, **draw_optic_parameters_grid)
+ self.__aruco_detector.optic_parameters.draw(image, **draw_optic_parameters_grid)
# Draw scenes if required
if draw_scenes is not None:
@@ -233,7 +246,7 @@ class ArUcoCamera(ArFeatures.ArCamera):
# Draw detected markers if required
if draw_detected_markers is not None:
- self.aruco_detector.draw_detected_markers(image, draw_detected_markers)
+ self.__aruco_detector.draw_detected_markers(image, draw_detected_markers)
return image
diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py
index c562467..63f4851 100644
--- a/src/argaze/ArUcoMarkers/ArUcoDetector.py
+++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py
@@ -8,7 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "BSD"
from typing import TypeVar, Tuple
-from dataclasses import dataclass, field
import json
import os
from collections import Counter
@@ -131,7 +130,6 @@ class DetectorParameters():
def internal(self):
return self.__parameters
-@dataclass
class ArUcoDetector(DataFeatures.PipelineStepObject):
"""ArUco markers detector.
@@ -142,12 +140,24 @@ class ArUcoDetector(DataFeatures.PipelineStepObject):
parameters: ArUco detector parameters.
"""
- dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary = field(default_factory=ArUcoMarkersDictionary.ArUcoMarkersDictionary)
- marker_size: float = field(default=0.)
- optic_parameters: ArUcoOpticCalibrator.OpticParameters = field(default_factory=ArUcoOpticCalibrator.OpticParameters)
- parameters: DetectorParameters = field(default_factory=DetectorParameters)
+ def __init__(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary = None, marker_size: float = 0., optic_parameters: ArUcoOpticCalibrator.OpticParameters = None, parameters: DetectorParameters = None):
+ """ Initialize ArUcoDetector.
- def __post_init__(self):
+ Parameters:
+ dictionary: ArUco markers dictionary to detect.
+ marker_size: Size of ArUco markers to detect in centimeter.
+ optic_parameters: Optic parameters to use for ArUco detection into image.
+ parameters: ArUco detector parameters.
+ """
+
+ # Init parent class
+ super().__init__()
+
+ # Init private attributes
+ self.__dictionary = dictionary
+ self.__marker_size = marker_size
+ self.__optic_parameters = optic_parameters
+ self.__parameters = parameters
# Init detected markers data
self.__detected_markers = {}
@@ -162,9 +172,29 @@ class ArUcoDetector(DataFeatures.PipelineStepObject):
self.__detection_count = 0
self.__detected_ids = []
+ @property
+ def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary:
+ """Get aruco detector's dictionary object."""
+ return self.__dictionary
+
+ @property
+ def marker_size(self) -> float:
+ """Get aruco detector's marker size."""
+ return self.__marker_size
+
+ @property
+ def optic_parameters(self) -> ArUcoOpticCalibrator.OpticParameters:
+ """Get aruco detector's opetic parameters object."""
+ return self.__optic_parameters
+
+ @property
+ def parameters(self) -> DetectorParameters:
+ """Get aruco detector's parameters object."""
+ return self.__parameters
+
@classmethod
def from_dict(self, aruco_detector_data: dict, working_directory: str = None) -> ArUcoDetectorType:
- """Load attributes from dictionary.
+ """Load ArUcoDetector attributes from dictionary.
Parameters:
aruco_detector_data: dictionary with attributes to load
@@ -249,10 +279,10 @@ class ArUcoDetector(DataFeatures.PipelineStepObject):
def __str__(self) -> str:
"""String display"""
- output = f'\n\tDictionary: {self.dictionary}\n'
- output += f'\tMarker size: {self.marker_size} cm\n\n'
- output += f'\tOptic parameters:\n{self.optic_parameters}\n'
- output += f'\tDetection Parameters:\n{self.parameters}'
+ output = f'\n\tDictionary: {self.__dictionary}\n'
+ output += f'\tMarker size: {self.__marker_size} cm\n\n'
+ output += f'\tOptic parameters:\n{self.__optic_parameters}\n'
+ output += f'\tDetection Parameters:\n{self.__parameters}'
return output
@@ -277,7 +307,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject):
detection_start = time.perf_counter()
# Detect markers into gray picture
- detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(cv.cvtColor(image, cv.COLOR_BGR2GRAY), self.dictionary.markers, parameters = self.parameters.internal)
+ detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(cv.cvtColor(image, cv.COLOR_BGR2GRAY), self.__dictionary.markers, parameters = self.__parameters.internal)
# Assess marker detection time in ms
detection_time = (time.perf_counter() - detection_start) * 1e3
@@ -293,7 +323,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject):
for i, marker_id in enumerate(detected_markers_ids):
- marker = ArUcoMarker.ArUcoMarker(self.dictionary, marker_id, self.marker_size)
+ marker = ArUcoMarker.ArUcoMarker(self.__dictionary, marker_id, self.__marker_size)
marker.corners = detected_markers_corners[i][0]
# No pose estimation: call estimate_markers_pose to get one
@@ -333,7 +363,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject):
# Estimate pose of selected markers
if len(selected_markers_corners) > 0:
- markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(selected_markers_corners, self.marker_size, numpy.array(self.optic_parameters.K), numpy.array(self.optic_parameters.D))
+ markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(selected_markers_corners, self.__marker_size, numpy.array(self.__optic_parameters.K), numpy.array(self.__optic_parameters.D))
for i, marker_id in enumerate(selected_markers_ids):
@@ -368,7 +398,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject):
for marker_id, marker in self.__detected_markers.items():
- marker.draw(image, self.optic_parameters.K, self.optic_parameters.D, **draw_marker)
+ marker.draw(image, self.__optic_parameters.K, self.__optic_parameters.D, **draw_marker)
def detect_board(self, image: numpy.array, board, expected_markers_number):
"""Detect ArUco markers board in image setting up the number of detected markers needed to agree detection.
@@ -379,7 +409,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject):
# detect markers from gray picture
gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
- detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(gray, self.dictionary.markers, parameters = self.parameters.internal)
+ detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers, parameters = self.__parameters.internal)
# if all board markers are detected
if len(detected_markers_corners) == expected_markers_number:
diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py
index b60b59d..34c3157 100644
--- a/src/argaze/ArUcoMarkers/ArUcoScene.py
+++ b/src/argaze/ArUcoMarkers/ArUcoScene.py
@@ -8,7 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "BSD"
from typing import TypeVar, Tuple
-from dataclasses import dataclass, field
import json
import os
@@ -22,32 +21,31 @@ import numpy
ArUcoSceneType = TypeVar('ArUcoScene', bound="ArUcoScene")
# Type definition for type annotation convenience
-@dataclass
class ArUcoScene(ArFeatures.ArScene):
"""
Define an ArScene based on an ArUcoMarkersGroup description.
-
- Parameters:
-
- aruco_markers_group: ArUco markers 3D scene description used to estimate scene pose from detected markers: see [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function below.
-
"""
- aruco_markers_group: ArUcoMarkersGroup.ArUcoMarkersGroup = field(default_factory=ArUcoMarkersGroup.ArUcoMarkersGroup)
+
+ def __init__(self, aruco_markers_group: ArUcoMarkersGroup.ArUcoMarkersGroup, **kwargs):
+ """ Initialize ArUcoScene
- def __post_init__(self):
+ Parameters:
+ aruco_markers_group: ArUco markers 3D scene description used to estimate scene pose from detected markers: see [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function below.
+ """
- super().__post_init__()
+ # DEBUG
+ print(f'ArUcoScene.__init__', kwargs)
- def __str__(self) -> str:
- """
- Returns:
- String representation
- """
+ # Init parent classes
+ super().__init__(**kwargs)
- output = output = super().__str__()
- output += f'ArUcoMarkersGroup:\n{self.aruco_markers_group}\n'
+ # Init private attribute
+ self.__aruco_markers_group = aruco_markers_group
- return output
+ @property
+ def aruco_markers_group(self) -> ArUcoMarkersGroup.ArUcoMarkersGroup:
+ """Get ArUco scene markers group object."""
+ return self.__aruco_markers_group
@classmethod
def from_dict(self, aruco_scene_data: dict, working_directory: str = None) -> ArUcoSceneType:
@@ -90,12 +88,26 @@ class ArUcoScene(ArFeatures.ArScene):
new_aruco_markers_group = None
- # Get values of temporary ar scene created from aruco_scene_data
- temp_ar_scene_values = DataFeatures.as_dict(ArFeatures.ArScene.from_dict(aruco_scene_data, working_directory))
+ # Load temporary scene from aruco_scene_data then export it as dict
+ temp_scene_data = ArFeatures.ArScene.from_dict(aruco_scene_data, working_directory).as_dict()
+
+ # DEBUG
+ print('ArUcoScene.from_dict: temp_scene_data=', temp_scene_data)
# Create new aruco scene using temporary ar scene values
- return ArUcoScene(aruco_markers_group=new_aruco_markers_group, **temp_ar_scene_values)
-
+ return ArUcoScene(aruco_markers_group=new_aruco_markers_group, **temp_scene_data)
+
+ def __str__(self) -> str:
+ """
+ Returns:
+ String representation
+ """
+
+ output = output = super().__str__()
+ output += f'ArUcoMarkersGroup:\n{self.__aruco_markers_group}\n'
+
+ return output
+
def estimate_pose(self, detected_markers) -> Tuple[numpy.array, numpy.array, dict]:
"""Estimate scene pose from detected ArUco markers.
@@ -110,7 +122,7 @@ class ArUcoScene(ArFeatures.ArScene):
raise ArFeatures.PoseEstimationFailed('No marker detected')
- scene_markers, _ = self.aruco_markers_group.filter_markers(detected_markers)
+ scene_markers, _ = self.__aruco_markers_group.filter_markers(detected_markers)
# Pose estimation fails when no marker belongs to the scene
if len(scene_markers) == 0:
@@ -123,7 +135,7 @@ class ArUcoScene(ArFeatures.ArScene):
raise ArFeatures.PoseEstimationFailed('Only one marker belongs to the scene')
# Estimate pose from a markers corners
- success, tvec, rmat = self.aruco_markers_group.estimate_pose_from_markers_corners(scene_markers, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D)
+ success, tvec, rmat = self.__aruco_markers_group.estimate_pose_from_markers_corners(scene_markers, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D)
if not success:
@@ -143,4 +155,4 @@ class ArUcoScene(ArFeatures.ArScene):
# Draw group if required
if draw_aruco_markers_group is not None:
- self.aruco_markers_group.draw(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D, **draw_aruco_markers_group)
+ self.__aruco_markers_group.draw(image, self.parent.aruco_detector.optic_parameters.K, self.parent.aruco_detector.optic_parameters.D, **draw_aruco_markers_group)
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index 793f498..9b673cc 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -8,7 +8,6 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "BSD"
from typing import TypeVar, Tuple, Any
-from dataclasses import dataclass, field
import importlib
from inspect import getmembers, getmodule
import collections
@@ -33,23 +32,6 @@ DataType = TypeVar('Data')
TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer")
# Type definition for type annotation convenience
-def as_dict(dataclass_object) -> dict:
- """
- Get dataclass object fields's values as a dictionary.
-
- Returns:
- values: dictionary of dataclass fields's values
- """
-
- # Get data class fields names
- fields_names = []
- for member_name, member_value in getmembers(dataclass_object):
- if member_name == '__dataclass_fields__':
- fields_names = member_value.keys()
-
- # Copy fields values
- return {name: vars(dataclass_object)[name] for name in fields_names}
-
def module_path(obj) -> str:
"""
Get object module path.
@@ -369,23 +351,13 @@ class SharedObject():
self._exceptions = {}
@property
- def locker(self) -> threading.Lock:
+ def lock(self) -> threading.Lock:
+ """Get shared object lock object."""
return self._lock
- def acquire(self):
- self._lock.acquire()
-
- def release(self):
- self._lock.release()
-
- @property
- def locked(self) -> bool:
- return self._lock.locked()
-
@property
def timestamp(self) -> int|float:
- """Get timestamp"""
-
+ """Get shared object timestamp."""
self._lock.acquire()
timestamp = self._timestamp
self._lock.release()
@@ -394,15 +366,13 @@ class SharedObject():
@timestamp.setter
def timestamp(self, timestamp: int|float):
- """Set timestamp"""
-
+ """Set shared object timestamp."""
self._lock.acquire()
self._timestamp = timestamp
self._lock.release()
def untimestamp(self):
- """Reset timestamp"""
-
+ """Reset shared object timestamp."""
self._lock.acquire()
self._timestamp = math.nan
self._lock.release()
@@ -410,7 +380,6 @@ class SharedObject():
@property
def timestamped(self) -> bool:
"""Is the object timestamped?"""
-
self._lock.acquire()
timestamped = not math.isnan(self._timestamp)
self._lock.release()
@@ -422,10 +391,65 @@ class PipelineStepObject():
Parameters:
execution_times: dictionary with each PipelineStepMethod execution time in ms.
+ observers: dictionary ...
"""
execution_times: dict = {}
- observers: dict = {}
+ __observers: dict = {}
+
+ @property
+ def observers(self) -> dict:
+
+ return self.__observers
+
+ def as_dict(self) -> dict:
+ """
+ Define abstract method to export PipelineStepObject attributes as dictionary.
+
+ Returns:
+ object_data: dictionary of PipelineStepObject.
+ """
+ raise NotImplementedError('serialize() method not implemented')
+
+ @classmethod
+ def from_dict(self, object_data: dict, working_directory: str = None) -> object:
+ """
+ Define abstract method to import PipelineStepObject attributes from dictionary.
+
+ Returns:
+ object_data: dictionary of PipelineStepObject
+ working_directory: folder path where to load files when a dictionary value is a relative filepath.
+ """
+ raise NotImplementedError('serialize() method not implemented')
+
+ @classmethod
+ def from_json(self, json_filepath: str) -> object:
+ """
+ Define abstract method to load PipelineStepObject from .json file.
+
+ Parameters:
+ json_filepath: path to json file
+ """
+ raise NotImplementedError('from_json() method not implemented')
+
+ def __str__(self) -> str:
+ """
+ Define abstract method to have a string representation of PipelineStepObject.
+
+ Returns:
+ String representation
+ """
+ raise NotImplementedError('__str__() method not implemented')
+
+def PipelineStepAttribute(method):
+
+ # Mark method as
+ method._tags = tags
+
+ return method
+
+# DEBUG
+from argaze import ArFeatures
def PipelineStepMethod(method):
"""Define a decorator use into PipelineStepObject class to declare pipeline method.
@@ -437,6 +461,11 @@ def PipelineStepMethod(method):
def wrapper(self, timestamp, *args, **kw):
"""Wrap pipeline step method to measure execution time."""
+ # DEBUG
+ if type(self) == ArFeatures.ArFrame:
+
+ print(timestamp, self.name, method.__name__, len(self.observers))
+
# Initialize execution time assessment
start = time.perf_counter()
diff --git a/src/argaze/utils/demo_aruco_markers_run.py b/src/argaze/utils/demo_aruco_markers_run.py
index c8b140c..a5b02f0 100644
--- a/src/argaze/utils/demo_aruco_markers_run.py
+++ b/src/argaze/utils/demo_aruco_markers_run.py
@@ -71,6 +71,7 @@ def main():
except Exception as e:
+ print(e)
gaze_analysis_time = 0
# Attach mouse callback to window
@@ -117,6 +118,7 @@ def main():
# Detect and project AR features
aruco_camera.watch(capture_time, video_image)
+ # Detection suceeded
exception = None
# Write errors
@@ -140,8 +142,10 @@ def main():
cv2.putText(aruco_camera_image, f'{gaze_positions_frequency} gaze positions/s | Gaze analysis {gaze_analysis_time:.2f}ms', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
# Handle exceptions
- cv2.rectangle(aruco_camera_image, (0, 100), (aruco_camera.size[0], 80), (127, 127, 127), -1)
- cv2.putText(aruco_camera_image, f'error: {exception}', (20, 140), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ if exception is not None:
+
+ cv2.rectangle(aruco_camera_image, (0, 100), (aruco_camera.size[0], 80), (127, 127, 127), -1)
+ cv2.putText(aruco_camera_image, f'error: {exception}', (20, 140), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
# Write hint
cv2.putText(aruco_camera_image, 'Mouve mouse pointer over gray rectangle area', (20, aruco_camera.size[1]-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)