diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/argaze/ArFeatures.py | 116 | ||||
-rw-r--r-- | src/argaze/DataFeatures.py | 50 | ||||
-rw-r--r-- | src/argaze/DataLog/FileWriter.py | 60 | ||||
-rw-r--r-- | src/argaze/DataLog/__init__.py | 4 | ||||
-rw-r--r-- | src/argaze/utils/UtilsFeatures.py | 58 | ||||
-rw-r--r-- | src/argaze/utils/demo_aruco_markers_run.py | 4 | ||||
-rw-r--r-- | src/argaze/utils/demo_gaze_analysis_run.py | 68 |
7 files changed, 174 insertions, 186 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 4989e65..ff29baa 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -131,6 +131,9 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Init current looked aoi name self.__looked_aoi_name = None + # Init aoi scan path analyzed state + self.__aoi_scan_path_analyzed = False + # Cast aoi scene to its effective dimension if self.aoi_scene.dimension == 2: @@ -337,8 +340,27 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return self.__looked_aoi_name + @property + def aoi_scan_path_analyzed(self) -> bool: + """Are aoi scan path analysis ready?""" + + return self.__aoi_scan_path_analyzed + + def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]: + """Get aoi scan path analysis. + + Returns + iterator: analyzer module path, analysis dictionary + """ + + assert(self.__aoi_scan_path_analyzed) + + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): + + yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis + @DataFeatures.PipelineStepMethod - def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]: + def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): """ Project timestamped gaze movement into layer. @@ -346,10 +368,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute. Parameters: + timestamp: ny number used to know when the given gaze position occurs gaze_movement: gaze movement to project - - Returns: - iterator: this layer, analyzer type and analysis dictionary. """ # Use try block to always release the layer lock in finally block @@ -364,6 +384,9 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # No looked aoi by default self.__looked_aoi_name = None + # Reset aoi scan path analyzed state + self.__aoi_scan_path_analyzed = False + if self.aoi_matcher is not None: # Update looked aoi thanks to aoi matcher @@ -383,13 +406,13 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Is there a new step? if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: + # Analyze aoi scan path for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): - # Analyze aoi scan path aoi_scan_path_analyzer.analyze(self.aoi_scan_path) - # Output analysis - yield self, aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis + # Update aoi scan path analyzed state + self.__aoi_scan_path_analyzed = True elif GazeFeatures.is_saccade(gaze_movement): @@ -454,11 +477,6 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = { } } -def is_layer(obj): - """Is an object a layer?""" - - return type(obj) == ArLayer - @dataclass class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """ @@ -512,6 +530,9 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Init current gaze movement self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + # Init scan path analyzed state + self.__scan_path_analyzed = False + @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: """Load attributes from dictionary. @@ -762,6 +783,25 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return self.__identified_gaze_movement + @property + def scan_path_analyzed(self) -> bool: + """Are scan path analysis ready?""" + + return self.__scan_path_analyzed + + def scan_path_analysis(self) -> Iterator[Union[str, dict]]: + """Get scan path analysis. + + Returns + iterator: analyzer module path, analysis dictionary + """ + + assert(self.__scan_path_analyzed) + + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): + + yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis + @DataFeatures.PipelineStepMethod def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]: """ @@ -773,9 +813,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Parameters: timestamp: any number used to know when the given gaze position occurs gaze_position: gaze position to project - - Returns: - iterator: this frame or one of its layers, analyzer type and analysis dictionary. """ # Use try block to always release the frame lock in finally block @@ -787,6 +824,9 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # No gaze movement identified by default self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + # Reset scan path analyzed state + self.__scan_path_analyzed = False + # Apply gaze position calibration if self.gaze_position_calibrator is not None: @@ -823,13 +863,13 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Is there a new step? if scan_step and len(self.scan_path) > 1: + # Analyze aoi scan path for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): - # Analyze aoi scan path scan_path_analyzer.analyze(self.scan_path) - # Output analysis - yield self, scan_path_analyzer_module_path, scan_path_analyzer.analysis + # Update scan path analyzed state + self.__scan_path_analyzed = True # No valid finished gaze movement: optionnaly stop in progress identification filtering elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification: @@ -849,10 +889,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally for layer_name, layer in self.layers.items(): - for layer_output in layer.look(timestamp, self.__identified_gaze_movement): - - # Output layer analysis - yield layer_output + layer.look(timestamp, self.__identified_gaze_movement) finally: @@ -950,11 +987,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return self.__image(**self.image_parameters) -def is_frame(obj): - """Is an object a frame?""" - - return type(obj) == ArFrame or is_camera(obj) - @dataclass class ArScene(): """ @@ -1201,11 +1233,6 @@ class ArScene(): raise NotImplementedError('draw() method not implemented') -def is_scene(obj): - """Is an object a scene?""" - - return type(obj).__bases__[0] == ArScene - @dataclass class ArCamera(ArFrame): """ @@ -1329,25 +1356,19 @@ class ArCamera(ArFrame): raise NotImplementedError('watch() method not implemented') @DataFeatures.PipelineStepMethod - def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition) -> Iterator[Union[object, type, dict]]: + def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition): """Project timestamped gaze position into each scene frames. !!! warning watch method needs to be called first. Parameters: - timestamp: gaze position time stamp (unit does'nt matter) - gaze_position: GazePosition object - - Returns: - iterator: this camera frame or a scene frame or one of their layers, analyzer type and analysis dictionary. + timestamp: ny number used to know when the given gaze position occurs + gaze_movement: gaze movement to project """ # Project gaze position into camera frame - for camera_frame_output in super().look(timestamp, gaze_position): - - # Output camera frame analysis - yield camera_frame_output + super().look(timestamp, gaze_position) # Use try block to always release the camera frame lock in finally block try: @@ -1373,10 +1394,8 @@ class ArCamera(ArFrame): # QUESTION: How to project gaze precision? inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) - for scene_frame_output in scene_frame.look(timestamp, inner_gaze_position * scene_frame.size): - - # output scene frame analysis - yield scene_frame_output + # Project inner gaze position into scene frame + scene_frame.look(timestamp, inner_gaze_position * scene_frame.size) # Ignore missing aoi in camera frame layer projection except KeyError as e: @@ -1428,8 +1447,3 @@ class ArCamera(ArFrame): with open(json_filepath, 'w', encoding='utf-8') as file: json.dump(self, file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) - -def is_camera(obj): - """Is an object a camera?""" - - return type(obj).__bases__[0] == ArCamera
\ No newline at end of file diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index a751fc5..1e6abfe 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -439,53 +439,3 @@ def PipelineStepMethod(method): return result return wrapper - -# Import libraries that can be used in selector or formatter codes -from argaze import GazeFeatures - -TimeStampedDataLoggerType = TypeVar('TimeStampedDataLogger', bound="TimeStampedDataLogger") -# Type definition for type annotation convenience - -@dataclass -class TimeStampedDataLogger(): - """Abstract class to define what should provide a timestamped data logger.""" - - formatter: str = field(default='') - """Code evaluated to edit the log.""" - - @classmethod - def from_dict(self, logger_module_path: str, logger_parameters: dict) -> TimeStampedDataLoggerType: - """Load timestamped data logger from dictionary. - - Parameters: - logger_module_path: module to load - logger_parameters: instance parameters - """ - - # Prepend argaze.DataLog path when a single name is provided - if len(logger_module_path.split('.')) == 1: - logger_module_path = f'argaze.DataLog.{logger_module_path}' - - logger_module = importlib.import_module(logger_module_path) - return logger_module.TimeStampedDataLogger(**logger_parameters) - - def emit(self, context: dict) -> Any: - """Apply selector code to decide if context have to be logged, then apply formatter code before to call specific logger handle method.""" - - try: - - self.handle(eval(self.formatter, globals(), context)) - - except Exception as e: - - print(f'Warning: the following error occurs in TimeStampedDataLogger.emit method ({self.name}):', e) - - def setup(self, log: str|tuple): - """Prepare log emission to destination.""" - - raise NotImplementedError('setup() method not implemented') - - def handle(self, log: str|tuple): - """Handle log emission to destination.""" - - raise NotImplementedError('handle() method not implemented') diff --git a/src/argaze/DataLog/FileWriter.py b/src/argaze/DataLog/FileWriter.py deleted file mode 100644 index 388ec44..0000000 --- a/src/argaze/DataLog/FileWriter.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python - -"""Module for file logging. -""" - -__author__ = "Théo de la Hogue" -__credits__ = [] -__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" -__license__ = "BSD" - -from typing import TypeVar, Tuple -from dataclasses import dataclass, field -import os, pathlib - -from argaze import DataFeatures - -@dataclass -class TimeStampedDataLogger(DataFeatures.TimeStampedDataLogger): - """Implementation of file logger.""" - - path: str = field(default=None) - """File path where to write data.""" - - header: str = field(default=None) - """String to write first.""" - - separator: str = field(default=", ") - """String used to separate list or tuple formatted log.""" - - def __post_init__(self): - """Check that folder structure exist and create file.""" - - self.path = pathlib.Path(self.path) - - if not os.path.exists(self.path.parent.absolute()): - os.makedirs(self.path.parent.absolute()) - - # Open file - self._file = open(self.path, 'w', encoding='utf-8', buffering=1) - - # Write header if required - if self.header is not None: - - print(self.header, file=self._file, flush=True) - - def __del__(self): - """Close file.""" - - self._file.close() - - def handle(self, log: any): - """Write log as a new line into file. List or tuple are converted into strings separated by separator char.""" - - # Format list or tuple element into quoted strings - if not isinstance(log, str): - - log = self.separator.join(f'\"{d}\"' for d in log) - - # Write into file - print(log, file=self._file, flush=True)
\ No newline at end of file diff --git a/src/argaze/DataLog/__init__.py b/src/argaze/DataLog/__init__.py deleted file mode 100644 index febd648..0000000 --- a/src/argaze/DataLog/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -""" -Various data logger handler. -""" -__all__ = ['FileWriter']
\ No newline at end of file diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py index 7971167..0cb6d77 100644 --- a/src/argaze/utils/UtilsFeatures.py +++ b/src/argaze/utils/UtilsFeatures.py @@ -146,3 +146,61 @@ class TimeProbe(): self.start() +def tuple_to_string(t: tuple, separator: str = ", ") -> str: + """Convert tuple elements into quoted strings separated by a separator string.""" + + return separator.join(f'\"{e}\"' for e in t) + +class FileWriter(): + """Write into a file line by line. + + Parameters: + path: File path where to write data. + header: String or tuple to write first. + separator: String used to separate elements during tuple to string conversion. + """ + + def __init__(self, path: str, header: str|tuple, separator: str = ", "): + """Check that folder structure exist and create file then, write header line.""" + + import os + import pathlib + + self.path = pathlib.Path(path) + self.separator = separator + + if not os.path.exists(self.path.parent.absolute()): + os.makedirs(self.path.parent.absolute()) + + # Open file + self._file = open(self.path, 'w', encoding='utf-8', buffering=1) + + # Write header if required + if header is not None: + + # Format list or tuple element into quoted strings + if not isinstance(header, str): + + header = tuple_to_string(header, self.separator) + + print(header, file=self._file, flush=True) + + def __del__(self): + """Close file.""" + + self._file.close() + + def write(self, log: str|tuple): + """Write log as a new line into file. + + !!! note + Tuple elements are converted into quoted strings separated by separator string. + """ + + # Format list or tuple element into quoted strings + if not isinstance(log, str): + + log = tuple_to_string(log, self.separator) + + # Write into file + print(log, file=self._file, flush=True)
\ No newline at end of file diff --git a/src/argaze/utils/demo_aruco_markers_run.py b/src/argaze/utils/demo_aruco_markers_run.py index 091b1e1..87b4bc3 100644 --- a/src/argaze/utils/demo_aruco_markers_run.py +++ b/src/argaze/utils/demo_aruco_markers_run.py @@ -68,9 +68,7 @@ def main(): try: # Project gaze position into camera - for _ in aruco_camera.look(timestamp, GazeFeatures.GazePosition((x, y))): - - pass + aruco_camera.look(timestamp, GazeFeatures.GazePosition((x, y))) # Assess gaze analysis gaze_analysis_time = aruco_camera.execution_times['look'] diff --git a/src/argaze/utils/demo_gaze_analysis_run.py b/src/argaze/utils/demo_gaze_analysis_run.py index d36f1c8..9640d18 100644 --- a/src/argaze/utils/demo_gaze_analysis_run.py +++ b/src/argaze/utils/demo_gaze_analysis_run.py @@ -14,7 +14,7 @@ import time from argaze import ArFeatures, GazeFeatures from argaze.AreaOfInterest import AOIFeatures from argaze.GazeAnalysis import * -from argaze.DataLog import FileWriter +from argaze.utils import UtilsFeatures import cv2 import numpy @@ -27,6 +27,8 @@ def main(): current_directory = os.path.dirname(os.path.abspath(__file__)) + print(current_directory) + # Manage arguments parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) parser.add_argument('frame', metavar='FRAME', type=str, help='ar frame filepath') @@ -35,10 +37,13 @@ def main(): # Load ArFrame ar_frame = ArFeatures.ArFrame.from_json(args.frame) + # Bind to a frame layer + main_layer = ar_frame.layers['main_layer'] + # Create FileWriter loggers - fixation_logger = FileWriter.TimeStampedDataLogger(path="_export/logs/fixations.csv", header="Timestamp (ms), Focus (px), Duration (ms), AOI") - scan_path_logger = FileWriter.TimeStampedDataLogger(path="_export/logs/scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, NNI, XXR") - aoi_scan_path_logger = FileWriter.TimeStampedDataLogger(path="_export/logs/aoi_scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, LZC") + fixation_logger = UtilsFeatures.FileWriter(path="_export/logs/fixations.csv", header="Timestamp (ms), Focus (px), Duration (ms), AOI") + scan_path_logger = UtilsFeatures.FileWriter(path="_export/logs/scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, NNI, XXR") + aoi_scan_path_logger = UtilsFeatures.FileWriter(path="_export/logs/aoi_scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, LZC") # Create a window to display ArCamera cv2.namedWindow(ar_frame.name, cv2.WINDOW_AUTOSIZE) @@ -55,27 +60,54 @@ def main(): # Edit millisecond timestamp timestamp = int((time.time() - start_time) * 1e3) - # Project gaze position into frame and iterate over analysis - for element, module, analysis in ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y))): + try: - # Ckeck if analysis comes from frame - if ArFeatures.is_frame(element): + # Project gaze position into frame + ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y))) - # Do something with scan path module analysis - ... + # Catch pipeline exception + except Exception as e: - # Ckeck if analysis comes from frame - elif ArFeatures.is_layer(element): - - # Do something with aoi scan path module analysis - ... + print('Gaze projection error:', e) # Log fixations if GazeFeatures.is_fixation(ar_frame.gaze_movement) and ar_frame.gaze_movement.finished: - log = timestamp, ar_frame.gaze_movement.focus, ar_frame.gaze_movement.duration, ar_frame.layers['main_layer'].looked_aoi_name - - fixation_logger.handle(log) + log = ( + timestamp, + ar_frame.gaze_movement.focus, + ar_frame.gaze_movement.duration, + ar_frame.layers['main_layer'].looked_aoi_name + ) + + fixation_logger.write(log) + + # Log scan path metrics + if ar_frame.scan_path_analyzed: + + log = ( + timestamp, + ar_frame.scan_path_analyzers['argaze.GazeAnalysis.Basic'].path_duration, + ar_frame.scan_path_analyzers['argaze.GazeAnalysis.Basic'].steps_number, + ar_frame.scan_path_analyzers['argaze.GazeAnalysis.KCoefficient'].K, + ar_frame.scan_path_analyzers['argaze.GazeAnalysis.NearestNeighborIndex'].nearest_neighbor_index, + ar_frame.scan_path_analyzers['argaze.GazeAnalysis.ExploreExploitRatio'].explore_exploit_ratio + ) + + scan_path_logger.write(log) + + # Log aoi scan path metrics + if main_layer.aoi_scan_path_analyzed: + + log = ( + timestamp, + main_layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].path_duration, + main_layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].steps_number, + main_layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.KCoefficient'].K, + main_layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.LempelZivComplexity'].lempel_ziv_complexity + ) + + aoi_scan_path_logger.write(log) # Attach mouse callback to window cv2.setMouseCallback(ar_frame.name, on_mouse_event) |