From 4a51d3d60947ca673944b70ae0af6782e93ffac9 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Fri, 26 Jan 2024 10:43:48 +0100 Subject: Moving many features into PipelineStepObject. Improving printing of PipelineStepObject. --- setup.py | 2 +- src/argaze/ArFeatures.py | 244 +++++++-------------- src/argaze/ArUcoMarkers/ArUcoCamera.py | 11 - src/argaze/ArUcoMarkers/ArUcoDetector.py | 10 - src/argaze/ArUcoMarkers/ArUcoScene.py | 11 - src/argaze/AreaOfInterest/AOIFeatures.py | 15 +- src/argaze/DataFeatures.py | 129 ++++++++++- src/argaze/GazeAnalysis/Basic.py | 1 + src/argaze/GazeFeatures.py | 13 -- .../utils/demo_data/demo_aruco_markers_setup.json | 12 +- .../utils/demo_data/demo_gaze_analysis_setup.json | 8 +- src/argaze/utils/demo_data/demo_layer_logger.py | 38 ++++ src/argaze/utils/demo_data/frame_logger.py | 2 +- src/argaze/utils/demo_data/main_layer_logger.py | 38 ---- src/argaze/utils/demo_gaze_analysis_run.py | 25 ++- 15 files changed, 269 insertions(+), 290 deletions(-) create mode 100644 src/argaze/utils/demo_data/demo_layer_logger.py delete mode 100644 src/argaze/utils/demo_data/main_layer_logger.py diff --git a/setup.py b/setup.py index 706f414..45ea442 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ setup( packages=find_packages(where='src'), python_requires='>=3.11', - install_requires=['opencv-python>=4.7.0', 'opencv-contrib-python>=4.7.0', 'numpy', 'pandas', 'matplotlib', 'shapely', 'lempel_ziv_complexity', 'scipy', 'scikit-learn'], + install_requires=['opencv-python>=4.7.0', 'opencv-contrib-python>=4.7.0', 'numpy', 'pandas', 'colorama', 'matplotlib', 'shapely', 'lempel_ziv_complexity', 'scipy', 'scikit-learn'], project_urls={ 'Bug Reports': 'https://git.recherche.enac.fr/projects/argaze/issues', diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 8969f60..1c2de8e 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -101,11 +101,10 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Inherits from DataFeatures.SharedObject class to be shared by multiple threads. """ - def __init__(self, name: str = None, aoi_scene: AOIFeatures.AOIScene = None, aoi_matcher: GazeFeatures.AOIMatcher = None, aoi_scan_path: GazeFeatures.AOIScanPath = None, aoi_scan_path_analyzers: dict = None, draw_parameters: dict = None, **kwargs): + def __init__(self, aoi_scene: AOIFeatures.AOIScene = None, aoi_matcher: GazeFeatures.AOIMatcher = None, aoi_scan_path: GazeFeatures.AOIScanPath = None, aoi_scan_path_analyzers: dict = None, draw_parameters: dict = None, **kwargs): """ Initialize ArLayer Parameters: - name: name of the layer aoi_scene: AOI scene description aoi_matcher: AOI matcher object aoi_scan_path: AOI scan path object @@ -118,13 +117,12 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): DataFeatures.PipelineStepObject.__init__(self, **kwargs) # Init private attributes - self.__name = name self.__aoi_scene = aoi_scene self.__aoi_matcher = aoi_matcher self.__aoi_scan_path = aoi_scan_path self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers self.__draw_parameters = draw_parameters - self.__parent = None # it will be setup by parent later + self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() self.__looked_aoi_name = None self.__aoi_scan_path_analyzed = False @@ -138,10 +136,31 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__aoi_scene = AOI3DScene.AOI3DScene(self.__aoi_scene) - @property - def name(self) -> str: - """Get layer's name.""" - return self.__name + # Edit expected AOI list by removing AOI with name equals to layer name + expected_aoi = list(self.__aoi_scene.keys()) + + if self.name in expected_aoi: + + expected_aoi.remove(self.name) + + self.__aoi_scan_path.expected_aoi = expected_aoi + + # Edit pipeline step objects parent + if self.__aoi_scene is not None: + + self.__aoi_scene.parent = self + + if self.__aoi_matcher is not None: + + self.__aoi_matcher.parent = self + + if self.__aoi_scan_path is not None: + + self.__aoi_scan_path.parent = self + + for name, analyzer in self.__aoi_scan_path_analyzers.items(): + + analyzer.parent = self @property def aoi_scene(self) -> AOIFeatures.AOIScene: @@ -174,16 +193,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return self.__draw_parameters @property - def parent(self) -> object: - """Get layer's parent object.""" - return self.__parent - - @parent.setter - def parent(self, parent: object): - """Set layer's parent object.""" - self.__parent = parent - - @property def looked_aoi_name(self) -> str: """Get aoi matcher looked aoi name.""" return self.__looked_aoi_name @@ -211,13 +220,12 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """Export ArLayer attributes as dictionary.""" return { - "name": self.__name, + **DataFeatures.PipelineStepObject.as_dict(), "aoi_scene": self.__aoi_scene, "aoi_matcher": self.__aoi_matcher, "aoi_scan_path": self.__aoi_scan_path, "aoi_scan_path_analyzers": self.__aoi_scan_path_analyzers, - "draw_parameters": self.__draw_parameters, - **DataFeatures.PipelineStepObject.as_dict(self) + "draw_parameters": self.__draw_parameters } @classmethod @@ -234,15 +242,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): sys.path.append(working_directory) - # Load name - try: - - new_layer_name = layer_data.pop('name') - - except KeyError: - - new_layer_name = None - # Load aoi scene try: @@ -281,12 +280,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Add AOI 2D Scene by default new_aoi_scene = AOI2DScene.AOI2DScene() - # Edit expected AOI list by removing AOI with name equals to layer name - expected_aoi = list(new_aoi_scene.keys()) - - if new_layer_name in expected_aoi: - expected_aoi.remove(new_layer_name) - # Load aoi matcher try: @@ -309,13 +302,11 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): try: new_aoi_scan_path_data = layer_data.pop('aoi_scan_path') - new_aoi_scan_path_data['expected_aoi'] = expected_aoi new_aoi_scan_path = GazeFeatures.AOIScanPath(**new_aoi_scan_path_data) except KeyError: new_aoi_scan_path_data = {} - new_aoi_scan_path_data['expected_aoi'] = expected_aoi new_aoi_scan_path = None # Load AOI scan path analyzers @@ -384,7 +375,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Create layer return ArLayer( \ - new_layer_name, \ new_aoi_scene, \ new_aoi_matcher, \ new_aoi_scan_path, \ @@ -409,14 +399,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return ArLayer.from_dict(layer_data, working_directory) - def __str__(self) -> str: - """ - Returns: - String representation - """ - - return str(self.aoi_scene.keys()) - @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): """ @@ -532,11 +514,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Inherits from DataFeatures.SharedObject class to be shared by multiple threads """ - def __init__(self, name: str = None, size: tuple[int] = (1, 1), gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = None, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = None, filter_in_progress_identification: bool = True, scan_path: GazeFeatures.ScanPath = None, scan_path_analyzers: dict = None, background: numpy.array = numpy.array([]), heatmap: AOIFeatures.Heatmap = None, layers: dict = None, image_parameters: dict = DEFAULT_ARFRAME_IMAGE_PARAMETERS, **kwargs): + def __init__(self, size: tuple[int] = (1, 1), gaze_position_calibrator: GazeFeatures.GazePositionCalibrator = None, gaze_movement_identifier: GazeFeatures.GazeMovementIdentifier = None, filter_in_progress_identification: bool = True, scan_path: GazeFeatures.ScanPath = None, scan_path_analyzers: dict = None, background: numpy.array = numpy.array([]), heatmap: AOIFeatures.Heatmap = None, layers: dict = None, image_parameters: dict = DEFAULT_ARFRAME_IMAGE_PARAMETERS, **kwargs): """ Initialize ArFrame Parameters: - name: name of the frame size: defines the dimension of the rectangular area where gaze positions are projected gaze_position_calibrator: gaze position calibration algoritm gaze_movement_identifier: gaze movement identification algorithm @@ -554,7 +535,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): DataFeatures.PipelineStepObject.__init__(self, **kwargs) # Init private attributes - self.__name = name self.__size = size self.__gaze_position_calibrator = gaze_position_calibrator self.__gaze_movement_identifier = gaze_movement_identifier @@ -565,20 +545,35 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__heatmap = heatmap self.__layers = layers self.__image_parameters = image_parameters - self.__parent = None # it will be setup by parent later + self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() self.__scan_path_analyzed = False - # Setup layers parent attribute + # Edit pipeline step objects parent + if self.__gaze_position_calibrator is not None: + + self.__gaze_position_calibrator.parent = self + + if self.__gaze_movement_identifier is not None: + + self.__gaze_movement_identifier.parent = self + + if self.__scan_path is not None: + + self.__scan_path.parent = self + + for name, analyzer in self.__scan_path_analyzers.items(): + + analyzer.parent = self + + if self.__heatmap is not None: + + self.__heatmap.parent = self + for name, layer in self.__layers.items(): layer.parent = self - - @property - def name(self) -> str: - """Get frame's name.""" - return self.__name @property def size(self) -> tuple[int]: @@ -596,9 +591,9 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return self.__gaze_movement_identifier @property - def filter_in_progress_indentification(self) -> bool: + def filter_in_progress_identification(self) -> bool: """Is frame filtering in progress identification?""" - return self.__filter_in_progress_indentification + return self.__filter_in_progress_identification @property def scan_path(self) -> GazeFeatures.ScanPath: @@ -634,16 +629,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): def image_parameters(self) -> dict: """Get frame's image parameters dictionary.""" return self.__image_parameters - - @property - def parent(self) -> object: - """Get frame's parent object.""" - return self.__parent - - @parent.setter - def parent(self, parent: object): - """Set frame's parent object.""" - self.__parent = parent @property def gaze_position(self) -> object: @@ -679,8 +664,8 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Returns: frame_data: dictionary with frame attributes values. """ - return { - "name": self.__name, + d = { + **DataFeatures.PipelineStepObject.as_dict(), "size": self.__size, "gaze_position_calibrator": self.__gaze_position_calibrator, "gaze_movement_identifier": self.__gaze_movement_identifier, @@ -690,10 +675,13 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): "background": self.__background, "heatmap": self.__heatmap, "layers": self.__layers, - "image_parameters": self.__image_parameters, - **DataFeatures.PipelineStepObject.as_dict(self) + "image_parameters": self.__image_parameters } + print('ArFrame.as_dict', DataFeatures.PipelineStepObject.as_dict()) + + return d + @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: """Load ArFrame attributes from dictionary. @@ -708,15 +696,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): sys.path.append(working_directory) - # Load name - try: - - new_frame_name = frame_data.pop('name') - - except KeyError: - - new_frame_name = None - # Load size try: @@ -900,7 +879,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Create frame return ArFrame( \ - new_frame_name, \ new_frame_size, \ new_gaze_position_calibrator, \ new_gaze_movement_identifier, \ @@ -930,14 +908,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return ArFrame.from_dict(frame_data, working_directory) - def __str__(self) -> str: - """ - Returns: - String representation - """ - - return str(self.size) - @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]: """ @@ -1117,11 +1087,10 @@ class ArScene(DataFeatures.PipelineStepObject): Define abstract Augmented Reality scene with ArLayers and ArFrames inside. """ - def __init__(self, name: str = None, layers: dict = None, frames: dict = None, angle_tolerance: float = 0., distance_tolerance: float = 0., **kwargs): + def __init__(self, layers: dict = None, frames: dict = None, angle_tolerance: float = 0., distance_tolerance: float = 0., **kwargs): """ Initialize ArScene Parameters: - name: name of the scene layers: dictionary of ArLayers to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. frames: dictionary to ArFrames to project once the pose is estimated: see [project][argaze.ArFeatures.ArScene.project] function below. angle_tolerance: Optional angle error tolerance to validate marker pose in degree used into [estimate_pose][argaze.ArFeatures.ArScene.estimate_pose] function. @@ -1132,27 +1101,19 @@ class ArScene(DataFeatures.PipelineStepObject): super().__init__(**kwargs) # Init private attributes - self.__name = name self.__layers = layers self.__frames = frames self.__angle_tolerance = angle_tolerance self.__distance_tolerance = distance_tolerance - self.__parent = None # it will be setup by parent later - # Setup layer parent attribute + # Edit pipeline step objects parent for name, layer in self.__layers.items(): layer.parent = self - # Setup frame parent attribute for name, frame in self.__frames.items(): frame.parent = self - - @property - def name(self) -> str: - """Get scene's name.""" - return self.__name @property def layers(self) -> dict: @@ -1183,27 +1144,16 @@ class ArScene(DataFeatures.PipelineStepObject): def distance_tolerance(self, value: float): """Set scene's distance tolerance.""" self.__distance_tolerance = value - - @property - def parent(self) -> object: - """Get frame's parent object.""" - return self.__parent - - @parent.setter - def parent(self, parent: object): - """Set frame's parent object.""" - self.__parent = parent def as_dict(self) -> dict: """Export ArScene attributes as dictionary.""" return { - "name": self.__name, + **DataFeatures.PipelineStepObject.as_dict(), "layers": self.__layers, "frames": self.__frames, "angle_tolerance": self.__angle_tolerance, - "distance_tolerance": self.__distance_tolerance, - **DataFeatures.PipelineStepObject.as_dict(self) + "distance_tolerance": self.__distance_tolerance } @classmethod @@ -1216,15 +1166,6 @@ class ArScene(DataFeatures.PipelineStepObject): working_directory: folder path where to load files when a dictionary value is a relative filepath. """ - # Load name - try: - - new_scene_name = scene_data.pop('name') - - except KeyError: - - new_scene_name = None - # Load layers new_layers = {} @@ -1324,27 +1265,16 @@ class ArScene(DataFeatures.PipelineStepObject): pass - return ArScene(new_scene_name, new_layers, new_frames, **scene_data) - - def __str__(self) -> str: - """ - Returns: - String representation - """ - - output = f'parent:\n{self.__parent.name}\n' + # Load temporary pipeline step object from scene_data then export it as dict + temp_pipeline_step_object_data = DataFeatures.PipelineStepObject.from_dict(scene_data, working_directory).as_dict() - if len(self.__layers): - output += f'ArLayers:\n' - for name, layer in self.__layers.items(): - output += f'{name}:\n{layer}\n' - - if len(self.__frames): - output += f'ArFrames:\n' - for name, frame in self.__frames.items(): - output += f'{name}:\n{frame}\n' - - return output + # Create scene + return ArScene( \ + new_layers, \ + new_frames, \ + **scene_data, \ + **temp_pipeline_step_object_data \ + ) def estimate_pose(self, detected_features: any) -> Tuple[numpy.array, numpy.array]: """Define abstract estimate scene pose method. @@ -1429,7 +1359,7 @@ class ArCamera(ArFrame): self.__visual_hfov = visual_hfov self.__visual_vfov = visual_vfov - # Setup scenes parent attribute + # Edit pipeline step objects parent for name, scene in self.__scenes.items(): scene.parent = self @@ -1513,25 +1443,12 @@ class ArCamera(ArFrame): """Export ArCamera attributes as dictionary.""" return { + **ArFrame.as_dict(), "scenes": self.__scenes, "visual_hfov": self.__visual_hfov, - "visual_vfov": self.__visual_vfov, - **ArFrame.as_dict(self) + "visual_vfov": self.__visual_vfov } - def __str__(self) -> str: - """ - Returns: - String representation - """ - - output = f'Name:\n{self.name}\n' - - for name, scene in self.__scenes.items(): - output += f'\"{name}\" {type(scene)}:\n{scene}\n' - - return output - @DataFeatures.PipelineStepMethod def watch(self, timestamp: int|float, image: numpy.array): """Detect AR features from image and project scenes into camera frame. @@ -1617,10 +1534,3 @@ class ArCamera(ArFrame): except KeyError: pass - - def to_json(self, json_filepath): - """Save camera to .json file.""" - - with open(json_filepath, 'w', encoding='utf-8') as file: - - json.dump(self, file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index 12afbd9..f1ad6f2 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -142,17 +142,6 @@ class ArUcoCamera(ArFeatures.ArCamera): return ArUcoCamera.from_dict(aruco_camera_data, working_directory) - def __str__(self) -> str: - """ - Returns: - String representation - """ - - output = super().__str__() - output += f'ArUcoDetector:\n{self.__aruco_detector}\n' - - return output - @DataFeatures.PipelineStepMethod def watch(self, timestamp: int|float, image: numpy.array): """Detect environment aruco markers from image and project scenes into camera frame. diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py index 9585b5e..0dd07c8 100644 --- a/src/argaze/ArUcoMarkers/ArUcoDetector.py +++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py @@ -286,16 +286,6 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): return ArUcoDetector.from_dict(aruco_detector_data, working_directory) - def __str__(self) -> str: - """String display""" - - output = f'\n\tDictionary: {self.__dictionary}\n' - output += f'\tMarker size: {self.__marker_size} cm\n\n' - output += f'\tOptic parameters:\n{self.__optic_parameters}\n' - output += f'\tDetection Parameters:\n{self.__parameters}' - - return output - @DataFeatures.PipelineStepMethod def detect_markers(self, timestamp: int|float, image: numpy.array) -> float: """Detect all ArUco markers into an image. diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py index 997ad40..b8817b3 100644 --- a/src/argaze/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze/ArUcoMarkers/ArUcoScene.py @@ -93,17 +93,6 @@ class ArUcoScene(ArFeatures.ArScene): aruco_markers_group = new_aruco_markers_group, \ **temp_scene_data \ ) - - def __str__(self) -> str: - """ - Returns: - String representation - """ - - output = output = super().__str__() - output += f'ArUcoMarkersGroup:\n{self.__aruco_markers_group}\n' - - return output def estimate_pose(self, detected_markers) -> Tuple[numpy.array, numpy.array, dict]: """Estimate scene pose from detected ArUco markers. diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index e452f05..ff33a29 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -268,7 +268,7 @@ class AreaOfInterest(numpy.ndarray): AOISceneType = TypeVar('AOIScene', bound="AOIScene") # Type definition for type annotation convenience -class AOIScene(): +class AOIScene(DataFeatures.PipelineStepObject): """Define AOI scene as a dictionary of AOI.""" def __init__(self, dimension: int, areas: dict = None): @@ -276,6 +276,8 @@ class AOIScene(): assert(dimension > 0) + super().__init__() + self.__dimension = dimension self.__areas = {} @@ -395,17 +397,6 @@ class AOIScene(): return str(self.__areas) - def __str__(self) -> str: - """String display""" - - output = '' - - for name, area in self.__areas.items(): - - output += f'\n\t{name}:\n{area}' - - return output - def __add__(self, add_vector) -> AOISceneType: """Add vector to scene.""" diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 0dcd8c4..3200190 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -23,6 +23,7 @@ import pandas import numpy import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches +from colorama import Style, Fore TimeStampType = TypeVar('TimeStamp', int, float) """Type definition for timestamp as integer or float values.""" @@ -393,7 +394,7 @@ class PipelineStepObject(): Define class to assess pipeline step methods execution time and observe them. """ - def __init__(self, observers: dict = None): + def __init__(self, name: str = None, observers: dict = None): """Initialize PipelineStepObject Parameters: @@ -401,9 +402,28 @@ class PipelineStepObject(): """ # Init private attribute + self.__name = name self.__observers = observers if observers is not None else {} self.__execution_times = {} + # parent attribute will be setup later by parent it self + self.__parent = None + + @property + def name(self) -> str: + """Get layer's name.""" + return self.__name + + @property + def parent(self) -> object: + """Get layer's parent object.""" + return self.__parent + + @parent.setter + def parent(self, parent: object): + """Set layer's parent object.""" + self.__parent = parent + @property def observers(self) -> dict: """Get pipeline step object observers dictionary.""" @@ -420,7 +440,9 @@ class PipelineStepObject(): Returns: object_data: dictionary with pipeline step object attributes values. """ + return { + "name": self.__name, "observers": self.__observers } @@ -432,6 +454,15 @@ class PipelineStepObject(): object_data: dictionary with pipeline step object attributes values. working_directory: folder path where to load files when a dictionary value is a relative filepath. """ + + # Load name + try: + + new_name = object_data.pop('name') + + except KeyError: + + new_name = None # Load observers new_observers = {} @@ -459,27 +490,113 @@ class PipelineStepObject(): pass - # Create pipeline step - return PipelineStepObject(new_observers) + # Create pipeline step object + return PipelineStepObject(\ + new_name, \ + new_observers \ + ) @classmethod def from_json(self, json_filepath: str) -> object: """ - Define abstract method to load PipelineStepObject from .json file. + Define abstract method to load pipeline step object from .json file. Parameters: json_filepath: path to json file """ raise NotImplementedError('from_json() method not implemented') + def to_json(self, json_filepath: str = None): + """Save pipeline step object into .json file.""" + + # Remember file path to ease rewriting + if json_filepath is not None: + + self.__json_filepath = json_filepath + + # Open file + with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: + + json.dump({DataFeatures.module_path(self):DataFeatures.JsonEncoder().default(self)}, object_file, ensure_ascii=False, indent=4) + + # QUESTION: maybe we need two saving mode? + #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) + def __str__(self) -> str: """ - Define abstract method to have a string representation of PipelineStepObject. + String representation of pipeline step object. Returns: String representation """ - raise NotImplementedError('__str__() method not implemented') + + tabs = self.tabulation + output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n' + + if self.__name is not None: + output += f'{tabs}\t{Style.BRIGHT}name{Style.RESET_ALL}: {self.__name}\n' + + if self.__parent is not None: + output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {self.__parent.name}\n' + + if len(self.__observers): + output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n' + for name, observer in self.__observers.items(): + output += f'{tabs}\t - {Fore.RED}{name}{Style.RESET_ALL}: {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n' + + for name, value in self.attributes: + + output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: ' + + if type(value) == dict: + + output += '\n' + + for k, v in value.items(): + + output += f'{tabs}\t - {Fore.RED}{k}{Style.RESET_ALL}: {v}\n' + + elif type(value) == numpy.ndarray: + + output += f'numpy.array{value.shape}\n' + + elif type(value) == pandas.DataFrame: + + output += f'pandas.DataFrame{value.shape}\n' + + else: + + output += f'{value}' + + if output[-1] != '\n': + + output += '\n' + + return output + + @property + def tabulation(self) -> str: + """Edit tabulation string according parents number.""" + + tabs = '' + parent = self.__parent + + while (parent is not None): + + tabs += '\t' + parent = parent.parent + + return tabs + + @property + def attributes(self) -> list: + """Iterate over pipeline step attributes values.""" + + for name, item in self.__class__.__dict__.items(): + + if isinstance(item, property): + + yield name, getattr(self, name) def PipelineStepAttribute(method): diff --git a/src/argaze/GazeAnalysis/Basic.py b/src/argaze/GazeAnalysis/Basic.py index 55c0737..54135d4 100644 --- a/src/argaze/GazeAnalysis/Basic.py +++ b/src/argaze/GazeAnalysis/Basic.py @@ -70,6 +70,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__path_duration = 0 self.__steps_number = 0 self.__step_fixation_durations_average = 0 + self.__aoi_fixation_distribution = {} @DataFeatures.PipelineStepMethod def analyze(self, timestamp: int|float, aoi_scan_path: GazeFeatures.ScanPathType): diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index af5a940..815a496 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -244,19 +244,6 @@ class GazePositionCalibrator(): return GazePositionCalibrator.from_dict(json.load(calibration_file)) - def to_json(self, json_filepath: str = None): - """Save calibrator into .json file.""" - - # Remember file path to ease rewriting - if json_filepath is not None: - - self.__json_filepath = json_filepath - - # Open file - with open(self.__json_filepath, 'w', encoding='utf-8') as calibration_file: - - json.dump({DataFeatures.module_path(self):DataFeatures.JsonEncoder().default(self)}, calibration_file, ensure_ascii=False, indent=4) - def store(self, timestamp: int|float, observed_gaze_position: GazePosition, expected_gaze_position: GazePosition): """Store observed and expected gaze positions. diff --git a/src/argaze/utils/demo_data/demo_aruco_markers_setup.json b/src/argaze/utils/demo_data/demo_aruco_markers_setup.json index 2b54955..52de261 100644 --- a/src/argaze/utils/demo_data/demo_aruco_markers_setup.json +++ b/src/argaze/utils/demo_data/demo_aruco_markers_setup.json @@ -1,5 +1,5 @@ { - "name": "ArUcoCamera Demo", + "name": "demo_camera", "size": [1280, 720], "aruco_detector": { "dictionary": "DICT_APRILTAG_16h5", @@ -9,12 +9,12 @@ } }, "layers": { - "main_layer": {} + "demo_layer": {} }, "image_parameters": { "background_weight": 1, "draw_layers": { - "main_layer": { + "demo_layer": { "draw_aoi_scene": { "draw_aoi": { "color": [255, 255, 255], @@ -48,7 +48,7 @@ "ArScene Demo" : { "aruco_markers_group": "aruco_markers_group.json", "layers": { - "main_layer" : { + "demo_layer" : { "aoi_scene": "aoi_3d_scene.obj" } }, @@ -66,7 +66,7 @@ "duration_max": 10000 }, "layers": { - "main_layer": { + "demo_layer": { "aoi_scene": "aoi_2d_scene.json", "aoi_matcher": { "FocusPointInside": {} @@ -86,7 +86,7 @@ } }, "draw_layers": { - "main_layer": { + "demo_layer": { "draw_aoi_scene": { "draw_aoi": { "color": [255, 255, 255], diff --git a/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json b/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json index 3fffc9f..a155693 100644 --- a/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json +++ b/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json @@ -1,5 +1,5 @@ { - "name": "ArFrame Demo", + "name": "demo_frame", "size": [1920, 1149], "background": "frame_background.jpg", "gaze_movement_identifier": { @@ -26,7 +26,7 @@ "size": [320, 240] }, "layers": { - "main_layer": { + "demo_layer": { "aoi_scene": "aoi_2d_scene.json", "aoi_matcher": { "DeviationCircleCoverage": { @@ -47,7 +47,7 @@ }, "Entropy":{} }, - "observers": "main_layer_logger.py" + "observers": "demo_layer_logger.py" } }, "image_parameters": { @@ -64,7 +64,7 @@ } }, "draw_layers": { - "main_layer": { + "demo_layer": { "draw_aoi_scene": { "draw_aoi": { "color": [255, 255, 255], diff --git a/src/argaze/utils/demo_data/demo_layer_logger.py b/src/argaze/utils/demo_data/demo_layer_logger.py new file mode 100644 index 0000000..eba7c74 --- /dev/null +++ b/src/argaze/utils/demo_data/demo_layer_logger.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +""" """ + +__author__ = "Théo de la Hogue" +__credits__ = [] +__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" +__license__ = "BSD" + +from argaze import ArFeatures, GazeFeatures +from argaze.utils import UtilsFeatures + +from argaze import ArFeatures, GazeFeatures, DataFeatures +from argaze.utils import UtilsFeatures + +class AOIScanPathAnalysisLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter): + + def on_look(self, timestamp, layer): + """Log aoi scan path metrics""" + + #print(timestamp, "AOIScanPathAnalysisLogger.on_look:", type(self), type(layer)) + + if layer.aoi_scan_path_analyzed: + + log = ( + timestamp, + layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].path_duration, + layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].steps_number, + layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.KCoefficient'].K, + layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.LempelZivComplexity'].lempel_ziv_complexity + ) + + self.write(log) + +# Export loggers instances to register them as pipeline step object observers +__observers__ = { + "AOI Scan path analysis logger": AOIScanPathAnalysisLogger(path="_export/logs/aoi_scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, LZC") + } \ No newline at end of file diff --git a/src/argaze/utils/demo_data/frame_logger.py b/src/argaze/utils/demo_data/frame_logger.py index 18fc151..256be7f 100644 --- a/src/argaze/utils/demo_data/frame_logger.py +++ b/src/argaze/utils/demo_data/frame_logger.py @@ -24,7 +24,7 @@ class FixationLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter timestamp, frame.gaze_movement.focus, frame.gaze_movement.duration, - frame.layers['main_layer'].looked_aoi_name + frame.layers['demo_layer'].looked_aoi_name ) self.write(log) diff --git a/src/argaze/utils/demo_data/main_layer_logger.py b/src/argaze/utils/demo_data/main_layer_logger.py deleted file mode 100644 index eba7c74..0000000 --- a/src/argaze/utils/demo_data/main_layer_logger.py +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env python - -""" """ - -__author__ = "Théo de la Hogue" -__credits__ = [] -__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" -__license__ = "BSD" - -from argaze import ArFeatures, GazeFeatures -from argaze.utils import UtilsFeatures - -from argaze import ArFeatures, GazeFeatures, DataFeatures -from argaze.utils import UtilsFeatures - -class AOIScanPathAnalysisLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter): - - def on_look(self, timestamp, layer): - """Log aoi scan path metrics""" - - #print(timestamp, "AOIScanPathAnalysisLogger.on_look:", type(self), type(layer)) - - if layer.aoi_scan_path_analyzed: - - log = ( - timestamp, - layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].path_duration, - layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].steps_number, - layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.KCoefficient'].K, - layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.LempelZivComplexity'].lempel_ziv_complexity - ) - - self.write(log) - -# Export loggers instances to register them as pipeline step object observers -__observers__ = { - "AOI Scan path analysis logger": AOIScanPathAnalysisLogger(path="_export/logs/aoi_scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, LZC") - } \ No newline at end of file diff --git a/src/argaze/utils/demo_gaze_analysis_run.py b/src/argaze/utils/demo_gaze_analysis_run.py index acc05c4..5f58349 100644 --- a/src/argaze/utils/demo_gaze_analysis_run.py +++ b/src/argaze/utils/demo_gaze_analysis_run.py @@ -23,6 +23,7 @@ current_directory = os.path.dirname(os.path.abspath(__file__)) # Manage arguments parser = argparse.ArgumentParser(description=__doc__.split('-')[0]) parser.add_argument('configuration', metavar='CONFIGURATION', type=str, help='configuration filepath') +parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') args = parser.parse_args() def main(): @@ -30,6 +31,10 @@ def main(): # Load ArFrame ar_frame = ArFeatures.ArFrame.from_json(args.configuration) + if args.verbose: + + print(ar_frame) + # Create a window to display ArCamera cv2.namedWindow(ar_frame.name, cv2.WINDOW_AUTOSIZE) @@ -74,18 +79,18 @@ def main(): # Write last 5 steps of aoi scan path path = '' - for step in ar_frame.layers["main_layer"].aoi_scan_path[-5:]: + for step in ar_frame.layers["demo_layer"].aoi_scan_path[-5:]: path += f'> {step.aoi} ' - path += f'> {ar_frame.layers["main_layer"].aoi_scan_path.current_aoi}' + path += f'> {ar_frame.layers["demo_layer"].aoi_scan_path.current_aoi}' cv2.putText(frame_image, path, (20, ar_frame.size[1]-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) # Display Transition matrix analysis if loaded try: - transition_matrix_analyzer = ar_frame.layers["main_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.TransitionMatrix"] + transition_matrix_analyzer = ar_frame.layers["demo_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.TransitionMatrix"] cv2.putText(frame_image, f'Transition matrix density: {transition_matrix_analyzer.transition_matrix_density:.2f}', (20, ar_frame.size[1]-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) @@ -97,8 +102,8 @@ def main(): if from_aoi != to_aoi and probability > 0.0: - from_center = ar_frame.layers["main_layer"].aoi_scene[from_aoi].center.astype(int) - to_center = ar_frame.layers["main_layer"].aoi_scene[to_aoi].center.astype(int) + from_center = ar_frame.layers["demo_layer"].aoi_scene[from_aoi].center.astype(int) + to_center = ar_frame.layers["demo_layer"].aoi_scene[to_aoi].center.astype(int) start_line = (0.5 * from_center + 0.5 * to_center).astype(int) color = [int(probability*200) + 55, int(probability*200) + 55, int(probability*200) + 55] @@ -112,7 +117,7 @@ def main(): # Display aoi scan path basic metrics analysis if loaded try: - basic_analyzer = ar_frame.layers["main_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.Basic"] + basic_analyzer = ar_frame.layers["demo_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.Basic"] # Write basic analysis cv2.putText(frame_image, f'Step number: {basic_analyzer.steps_number}', (20, ar_frame.size[1]-440), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) @@ -141,7 +146,7 @@ def main(): # Display aoi scan path K-modified coefficient analysis if loaded try: - aoi_kc_analyzer = ar_frame.layers["main_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.KCoefficient"] + aoi_kc_analyzer = ar_frame.layers["demo_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.KCoefficient"] # Write aoi Kc analysis if aoi_kc_analyzer.K < 0.: @@ -158,7 +163,7 @@ def main(): # Display Lempel-Ziv complexity analysis if loaded try: - lzc_analyzer = ar_frame.layers["main_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.LempelZivComplexity"] + lzc_analyzer = ar_frame.layers["demo_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.LempelZivComplexity"] cv2.putText(frame_image, f'Lempel-Ziv complexity: {lzc_analyzer.lempel_ziv_complexity}', (20, ar_frame.size[1]-200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) @@ -168,7 +173,7 @@ def main(): # Display N-Gram analysis if loaded try: - ngram_analyzer = ar_frame.layers["main_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.NGram"] + ngram_analyzer = ar_frame.layers["demo_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.NGram"] # Display only 3-gram analysis start = ar_frame.size[1] - ((len(ngram_analyzer.ngrams_count[3]) + 1) * 40) @@ -188,7 +193,7 @@ def main(): # Display Entropy analysis if loaded try: - entropy_analyzer = ar_frame.layers["main_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.Entropy"] + entropy_analyzer = ar_frame.layers["demo_layer"].aoi_scan_path_analyzers["argaze.GazeAnalysis.Entropy"] cv2.putText(frame_image, f'Stationary entropy: {entropy_analyzer.stationary_entropy:.3f},', (20, ar_frame.size[1]-280), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) cv2.putText(frame_image, f'Transition entropy: {entropy_analyzer.transition_entropy:.3f},', (20, ar_frame.size[1]-240), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) -- cgit v1.1