From 4aad44683cbe3c3d20f6f793665d21f9ca5bf551 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Fri, 19 Jan 2024 12:04:25 +0100 Subject: Defining a new PipelineStep decorator to assess time execution and catch exceptions. Improving logging scope. --- .../advanced_topics/scripting.md | 86 +++--- src/argaze/ArFeatures.py | 331 ++++++++------------- src/argaze/AreaOfInterest/AOIFeatures.py | 1 + src/argaze/DataFeatures.py | 72 ++++- src/argaze/DataLog/FileWriter.py | 8 +- src/argaze/GazeAnalysis/Basic.py | 7 +- src/argaze/GazeAnalysis/DeviationCircleCoverage.py | 3 +- .../DispersionThresholdIdentification.py | 3 +- src/argaze/GazeAnalysis/Entropy.py | 3 +- src/argaze/GazeAnalysis/ExploreExploitRatio.py | 3 +- src/argaze/GazeAnalysis/FocusPointInside.py | 3 +- src/argaze/GazeAnalysis/KCoefficient.py | 4 +- src/argaze/GazeAnalysis/LempelZivComplexity.py | 3 +- src/argaze/GazeAnalysis/NGram.py | 3 +- src/argaze/GazeAnalysis/NearestNeighborIndex.py | 3 +- src/argaze/GazeAnalysis/TransitionMatrix.py | 3 +- .../VelocityThresholdIdentification.py | 3 +- src/argaze/GazeFeatures.py | 3 + src/argaze/PupillAnalysis/WorkloadIndex.py | 1 + src/argaze/PupillFeatures.py | 1 + .../utils/demo_data/demo_gaze_analysis_setup.json | 19 +- src/argaze/utils/demo_gaze_analysis_run.py | 2 +- 22 files changed, 273 insertions(+), 292 deletions(-) diff --git a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md index 9c20c7a..837c8ff 100644 --- a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md +++ b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md @@ -63,74 +63,68 @@ for name, ar_layer in ar_frame.layers.items(): ... ``` -## Pipeline execution outputs +## Pipeline execution updates -[ArFrame.look](../../../argaze.md/#argaze.ArFeatures.ArFrame.look) method returns many data about pipeline execution. +Calling [ArFrame.look](../../../argaze.md/#argaze.ArFeatures.ArFrame.look) method leads to update many data into the pipeline. ```python # Assuming that timestamped gaze positions are available ... # Look ArFrame at a timestamped gaze position - gaze_position, gaze_movement, scan_path_analysis, execution_times, exception, layers = ar_frame.look(timestamp, gaze_position).values() + execution_time, exception = ar_frame.look(timestamp, gaze_position).values() + + # Do something with calibrated gaze position + ... ar_frame.gaze_position # Check if a gaze movement has been identified - if gaze_movement.valid and gaze_movement.finished: + if ar_frame.gaze_movement.valid and ar_frame.gaze_movement.finished: # Do something with identified fixation - if GazeFeatures.is_fixation(gaze_movement): + if GazeFeatures.is_fixation(ar_frame.gaze_movement): ... # Do something with identified saccade - elif GazeFeatures.is_saccade(gaze_movement): + elif GazeFeatures.is_saccade(ar_frame.gaze_movement): ... - # Do something with scan path analysis - for module, analysis in scan_path_analysis.items(): - for data, value in analysis.items(): - ... + # Check if new scan path analysis are available + if ar_frame.new_analysis_available: - # Do something with ArFrame look execution times - ... + # Access to each scan path analyzer + for analyzer_name, analyzer in ar_frame.scan_path_analyzers.items(): - # Do something with ArFrame look exception - if exception: - ... + # Do something with analysis results + ... analyzer.analysis - # Do something with each ArLayer look data - for layer_name, layer_look_data in layers.items(): - - gaze_movement, looked_aoi_name, looked_aoi, aoi_scan_path_analysis, layer_execution_times, layer_exception = layer_look_data.values() + # Iterate over each ArFrame layers + for name, ar_layer in ar_frame.layers.items(): + + # Check if new aoi scan path analysis are available + if ar_layer.new_analysis_available: - # Do something with gaze movement - ... + # Access to each aoi scan path analyzer + for analyzer_name, analyzer in ar_layer.aoi_scan_path_analyzers.items(): - # Do something with looked AOI name - ... + # Do something with analysis results + ... analyzer.analysis +``` - # Do something with looked AOI shape - ... +Let's understand the meaning of each data. - # Do something with ArLayer AOI scan path analysis - for module, analysis in aoi_scan_path_analysis.items(): - for data, value in analysis.items(): - ... +### *execution_times* - # Do something with ArLayer look execution times - ... +A dictionary with each pipeline step execution time. - # Do something with ArLayer look exception - if exception: - ... -``` +### *exception* -Let's understand the meaning of each returned data. +A [python Exception](https://docs.python.org/3/tutorial/errors.html#exceptions) object raised during pipeline execution. -### *gaze_position* +### *ar_frame.gaze_position* This is the calibrated [GazePosition](../../../argaze.md/#argaze.GazeFeatures.GazePosition) returned by [GazePositionCalibrator](../../../argaze.md/#argaze.GazeFeatures.GazePositionCalibrator) if one is instanciated else, it is the given [GazePosition](../../../argaze.md/#argaze.GazeFeatures.GazePosition). -### *gaze_movement* +### *ar_frame.gaze_movement* A [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement) once it have been identified by [ArFrame.gaze_movement_identifier](../../../argaze.md/#argaze.ArFeatures.ArFrame) object from incoming consecutive timestamped gaze positions. If no gaze movement have been identified, it returns an [UnvalidGazeMovement](../../../argaze.md/#argaze.GazeFeatures.UnvalidGazeMovement). @@ -139,21 +133,13 @@ In that case, the returned gaze movement *finished* flag is false. Then, the returned gaze movement type can be tested thanks to [GazeFeatures.is_fixation](../../../argaze.md/#argaze.GazeFeatures.is_fixation) and [GazeFeatures.is_saccade](../../../argaze.md/#argaze.GazeFeatures.is_saccade) functions. -### *scan_path_analysis* - -A dictionary with all last scan path analysis if new scan step have been added to the [ArFrame.scan_path](../../../argaze.md/#argaze.ArFeatures.ArFrame) object. - -### *layers_analysis* - -A dictionary with all layers AOI scan path analysis if new AOI scan step have been added to an [ArLayer.aoi_scan_path](../../../argaze.md/#argaze.ArFeatures.ArLayer) object. +### *ar_frame.new_analysis_available* and *ar_layer.new_analysis_available* -### *execution_times* +This flag allows to now when new scan path and aoi scan path analysis are available. -A dictionary with each pipeline step execution time. +### *analyzer.analysis* -### *exception* - -A [python Exception](https://docs.python.org/3/tutorial/errors.html#exceptions) object raised during pipeline execution. +A dict containing all data produced by an analyzer. ## Setup ArFrame image parameters diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 281dec8..30044a7 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -130,6 +130,9 @@ class ArLayer(DataFeatures.SharedObject): # Init current gaze movement self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() + # Init new analysis available state + self.__new_analysis_available = False + # Cast aoi scene to its effective dimension if self.aoi_scene.dimension == 2: @@ -293,7 +296,10 @@ class ArLayer(DataFeatures.SharedObject): for logger_name, logger_data in new_loggers_value.items(): - new_loggers[logger_name] = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) + logger = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) + logger.name = logger_name + + new_loggers[logger_name] = logger except KeyError: @@ -346,6 +352,13 @@ class ArLayer(DataFeatures.SharedObject): self.__parent = parent + @property + def new_analysis_available(self) -> bool: + """Is there new aoi scan path analysis to check?""" + + return self.__new_analysis_available + + @DataFeatures.PipelineStep def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> dict: """ Project timestamped gaze movement into layer. @@ -355,123 +368,65 @@ class ArLayer(DataFeatures.SharedObject): Parameters: gaze_movement: gaze movement to project - - Returns: - look_data: data dictionary - - !!! note "look data dictionary" - - **gaze_movement**: incoming gaze movement - - **looked_aoi_name**: most likely looked aoi name - - **looked_aoi**: most likely looked aoi shape - - **aoi_scan_path_analysis**: aoi scan path analysis at each new scan step if aoi_scan_path is instanciated - - **exception**: error catched during gaze movement processing """ # Lock layer exploitation self.acquire() - # Store look execution start date - look_start = time.perf_counter() + # Gather look data + look_data = locals() # Update current gaze movement self.__gaze_movement = gaze_movement - # Init looked aoi - looked_aoi_name, looked_aoi = (None, None) - - # Init aoi scan path analysis report - aoi_scan_path_analysis = {} - - # Assess pipeline execution times - execution_times = { - 'aoi_matcher': None, - 'aoi_scan_path_analyzers': {} - } - - # Catch any error - exception = None - - try: - - if self.aoi_matcher is not None: - - # Store aoi matching start date - matching_start = time.perf_counter() - - # Update looked aoi thanks to aoi matcher - # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally - looked_aoi_name, looked_aoi = self.aoi_matcher.match(self.aoi_scene, gaze_movement) + # No new analysis available by default + self.__new_analysis_available = False - # Assess aoi matching time in ms - execution_times['aoi_matcher'] = (time.perf_counter() - matching_start) * 1e3 + # Init looked aoi name + looked_aoi_name = None - # Valid and finished gaze movement has been identified - if gaze_movement.valid and gaze_movement.finished: + if self.aoi_matcher is not None: - if GazeFeatures.is_fixation(gaze_movement): + # Update looked aoi thanks to aoi matcher + # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally + looked_aoi_name, _ , match_time, match_exception = self.aoi_matcher.match(self.aoi_scene, gaze_movement) - # Append fixation to aoi scan path - if self.aoi_scan_path is not None and looked_aoi_name is not None: + # Valid and finished gaze movement has been identified + if gaze_movement.valid and gaze_movement.finished: - aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, looked_aoi_name) + if GazeFeatures.is_fixation(gaze_movement): - # Is there a new step? - if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: + # Append fixation to aoi scan path + if self.aoi_scan_path is not None and looked_aoi_name is not None: - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): + aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, looked_aoi_name) - # Store aoi scan path analysis start date - aoi_scan_path_analysis_start = time.perf_counter() + # Is there a new step? + if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: - # Analyze aoi scan path - aoi_scan_path_analyzer.analyze(self.aoi_scan_path) + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): - # Assess aoi scan path analysis time in ms - execution_times['aoi_scan_path_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_path_analysis_start) * 1e3 + # Analyze aoi scan path + analyze_time, analyze_exception = aoi_scan_path_analyzer.analyze(self.aoi_scan_path) - # Store analysis - aoi_scan_path_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis + # Update new analysis available state + self.__new_analysis_available = True - elif GazeFeatures.is_saccade(gaze_movement): + elif GazeFeatures.is_saccade(gaze_movement): - # Append saccade to aoi scan path - if self.aoi_scan_path is not None: + # Append saccade to aoi scan path + if self.aoi_scan_path is not None: - self.aoi_scan_path.append_saccade(timestamp, gaze_movement) - - except Exception as e: - - print('Warning: the following error occurs in ArLayer.look method:', e) - - looked_aoi_name = None - looked_aoi = None - aoi_scan_path_analysis = {} - exception = e - - # Assess total execution time in ms - execution_times['total'] = (time.perf_counter() - look_start) * 1e3 - - # Edit look data dictionary - look_data = DataFeatures.DataDictionary({ - "gaze_movement": gaze_movement, - "looked_aoi_name": looked_aoi_name, - "looked_aoi": looked_aoi, - "aoi_scan_path_analysis": DataFeatures.DataDictionary(aoi_scan_path_analysis), - "execution_times": DataFeatures.DataDictionary(execution_times), - "exception": exception - }) + self.aoi_scan_path.append_saccade(timestamp, gaze_movement) # Log look data for logger_name, logger in self.loggers.items(): - logger.emit(timestamp, DataFeatures.DataDictionary(look_data)) + logger.emit(look_data) # Unlock layer exploitation self.release() - # Return look data dictionary - return look_data - def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ Draw into image. @@ -573,7 +528,13 @@ class ArFrame(DataFeatures.SharedObject): layer.parent = self # Init current gaze position - self.__gaze_position = GazeFeatures.UnvalidGazePosition() + self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() + + # Init current gaze movement + self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + + # Init new analysis available state + self.__new_analysis_available = False @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: @@ -771,7 +732,10 @@ class ArFrame(DataFeatures.SharedObject): for logger_name, logger_data in new_loggers_value.items(): - new_loggers[logger_name] = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) + logger = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) + logger.name = logger_name + + new_loggers[logger_name] = logger except KeyError: @@ -829,7 +793,26 @@ class ArFrame(DataFeatures.SharedObject): self.__parent = parent - def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> DataFeatures.DataDictionary: + @property + def gaze_position(self) -> object: + """Get current calibrated gaze position""" + + return self.__calibrated_gaze_position + + @property + def gaze_movement(self) -> object: + """Get current identified gaze movement""" + + return self.__identified_gaze_movement + + @property + def new_analysis_available(self) -> bool: + """Is there new scan path analysis to check?""" + + return self.__new_analysis_available + + @DataFeatures.PipelineStep + def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): """ Project gaze position into frame. @@ -839,162 +822,92 @@ class ArFrame(DataFeatures.SharedObject): Parameters: timestamp: any number used to know when the given gaze position occurs gaze_position: gaze position to project - - Returns: - look_data: data dictionary - - !!! note "look data dictionary" - - **gaze_position**: calibrated gaze position if gaze_position_calibrator is instanciated else, given gaze position. - - **gaze_movement**: identified gaze movement from incoming consecutive timestamped gaze positions if gaze_movement_identifier is instanciated. Current gaze movement if filter_in_progress_identification is False. - - **scan_path_analysis**: scan path analysis at each new scan step if scan_path is instanciated. - - **execution_times**: all pipeline steps execution times. - - **exception**: error catched during gaze position processing. - - **layers**: data dictionary with each layer's look data. """ # Lock frame exploitation self.acquire() - # Store look execution start date - look_start = time.perf_counter() - - # No gaze movement identified by default - identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() - - # Init scan path analysis report - scan_step_analysis = {} - - # Assess pipeline execution times - execution_times = { - 'gaze_movement_identifier': None, - 'scan_path_analyzers': DataFeatures.DataDictionary({}), - 'heatmap': None - } - - # Catch any error - exception = None - - # Init layers look data report - layers_look_data = {} - - try: - - # Apply gaze position calibration - if self.gaze_position_calibrator is not None: + # Store look arguments + look_data = locals() - self.__gaze_position = self.gaze_position_calibrator.apply(gaze_position) + # No new analysis by default + self.__new_analysis_available = False - # Or update gaze position at least - else: - - self.__gaze_position = gaze_position - - # Identify gaze movement - if self.gaze_movement_identifier is not None: - - # Store movement identification start date - identification_start = time.perf_counter() - - # Identify finished gaze movement - identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__gaze_position) - - # Assess movement identification time in ms - execution_times['gaze_movement_identifier'] = (time.perf_counter() - identification_start) * 1e3 - - # Valid and finished gaze movement has been identified - if identified_gaze_movement.valid and identified_gaze_movement.finished: - - if GazeFeatures.is_fixation(identified_gaze_movement): - - # Append fixation to scan path - if self.scan_path is not None: + # No gaze movement identified by default + self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() - self.scan_path.append_fixation(timestamp, identified_gaze_movement) + # Apply gaze position calibration + if self.gaze_position_calibrator is not None: - elif GazeFeatures.is_saccade(identified_gaze_movement): + self.__calibrated_gaze_position = self.gaze_position_calibrator.apply(gaze_position) - # Append saccade to scan path - if self.scan_path is not None: - - scan_step = self.scan_path.append_saccade(timestamp, identified_gaze_movement) + # Or update gaze position at least + else: - # Is there a new step? - if scan_step and len(self.scan_path) > 1: + self.__calibrated_gaze_position = gaze_position - for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): + # Identify gaze movement + if self.gaze_movement_identifier is not None: - # Store scan step analysis start date - scan_step_analysis_start = time.perf_counter() + # Identify finished gaze movement + self.__identified_gaze_movement, identify_time, identify_exception = self.gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) - # Analyze aoi scan path - scan_path_analyzer.analyze(self.scan_path) + # Valid and finished gaze movement has been identified + if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished: - # Assess scan path analysis time in ms - execution_times['scan_path_analyzers'][scan_path_analyzer_module_path] = (time.perf_counter() - scan_step_analysis_start) * 1e3 + if GazeFeatures.is_fixation(self.__identified_gaze_movement): - # Store analysis - scan_step_analysis[scan_path_analyzer_module_path] = scan_path_analyzer.analysis + # Append fixation to scan path + if self.scan_path is not None: - # No valid finished gaze movement: optionnaly stop in progress identification filtering - elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification: + self.scan_path.append_fixation(timestamp, self.__identified_gaze_movement) - identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement + elif GazeFeatures.is_saccade(self.__identified_gaze_movement): - # Update heatmap - if self.heatmap is not None: + # Append saccade to scan path + if self.scan_path is not None: + + scan_step = self.scan_path.append_saccade(timestamp, self.__identified_gaze_movement) - # Store heatmap start date - heatmap_start = time.perf_counter() + # Is there a new step? + if scan_step and len(self.scan_path) > 1: - # Scale gaze position value - scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]]) + for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): - # Update heatmap image - self.heatmap.update(self.__gaze_position.value * scale) + # Analyze aoi scan path + analyze_time, analyze_exception = scan_path_analyzer.analyze(self.scan_path) - # Assess heatmap time in ms - execution_times['heatmap'] = (time.perf_counter() - heatmap_start) * 1e3 + # Update new analysis available state + self.__new_analysis_available = True - # Look layers with valid identified gaze movement - # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally - for layer_name, layer in self.layers.items(): + # No valid finished gaze movement: optionnaly stop in progress identification filtering + elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification: - layers_look_data[layer_name] = layer.look(timestamp, identified_gaze_movement) + self.__identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement - except Exception as e: + # Update heatmap + if self.heatmap is not None: - print('Warning: the following error occurs in ArFrame.look method:', e) + # Scale gaze position value + scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]]) - self.__gaze_position = GazeFeatures.UnvalidGazePosition() - identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() - scan_step_analysis = {} - exception = e - layers_look_data = {} + # Update heatmap image + update_time, update_exception = self.heatmap.update(self.__calibrated_gaze_position.value * scale) - # Assess total execution time in ms - execution_times['total'] = (time.perf_counter() - look_start) * 1e3 + # Look layers with valid identified gaze movement + # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally + for layer_name, layer in self.layers.items(): - # Edit look data dictionary - look_data = DataFeatures.DataDictionary({ - "gaze_position": self.__gaze_position, - "gaze_movement": identified_gaze_movement, - "scan_path_analysis": DataFeatures.DataDictionary(scan_step_analysis), - "execution_times": DataFeatures.DataDictionary(execution_times), - "exception": exception, - "layers": DataFeatures.DataDictionary(layers_look_data) - }) + look_time, look_exception = layer.look(timestamp, self.__identified_gaze_movement) # Log look data for logger_name, logger in self.loggers.items(): - logger.emit(timestamp, look_data) + logger.emit(look_data) # Unlock frame exploitation self.release() - # Return look data - return look_data - def __image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. @@ -1065,7 +978,7 @@ class ArFrame(DataFeatures.SharedObject): # Draw current gaze position if required if draw_gaze_positions is not None: - self.__gaze_position.draw(image, **draw_gaze_positions) + self.__calibrated_gaze_position.draw(image, **draw_gaze_positions) # Unlock frame exploitation self.release() diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index 9f9f4ad..e20717c 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -599,6 +599,7 @@ class Heatmap(): self.__point_spread_buffer = [] self.__point_spread_buffer_size = self.buffer + @DataFeatures.PipelineStep def update(self, point: tuple): """Update heatmap image.""" diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 3e372fb..12e7bff 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -10,13 +10,14 @@ __license__ = "BSD" from typing import TypeVar, Tuple, Any from dataclasses import dataclass, field import importlib -from inspect import getmembers +from inspect import getmembers, getmodule import collections import json import ast import bisect import threading import math +import time import pandas import numpy @@ -428,6 +429,51 @@ class DataDictionary(dict): __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ +def PipelineStep(method): + """Define a decorator to define a method as a pipeline step.""" + + def handler(*args, **kw) -> Tuple[Any, float, Exception]: + """Handle pipeline step + + Returns: + method_returns: what the handled method returns + execution_time: measure of method time execution in millisecond. + exception: any error catched during method execution. + """ + + # Initialize execution time assessment + start = time.perf_counter() + + try: + + result = method(*args, **kw) + exception = None + + except Exception as e: + + result = None + exception = e + + # Measure execution time + execution_time = (time.perf_counter() - start) * 1e3 + + # Edit result tuple + if type(result) is tuple: + + result = result + (execution_time, exception) + + elif result is not None: + + result = result, execution_time, exception + + else: + + result = execution_time, exception + + return result + + return handler + # Import libraries that can be used in selector or formatter codes from argaze import GazeFeatures @@ -438,6 +484,9 @@ TimeStampedDataLoggerType = TypeVar('TimeStampedDataLogger', bound="TimeStampedD class TimeStampedDataLogger(): """Abstract class to define what should provide a timestamped data logger.""" + name: str = field(default='') + """Name of logger.""" + selector: str = field(default='True') """Code evaluated to handle log under a condition. Default 'True' string means that all incoming data will be accepted.""" @@ -449,8 +498,7 @@ class TimeStampedDataLogger(): """Load timestamped data logger from dictionary. Parameters: - logger_module_path: class name to load - logger_parameters: attributes to load + logger_data: dict to load """ logger_module_path, logger_parameters = logger_data.popitem() @@ -462,16 +510,20 @@ class TimeStampedDataLogger(): logger_module = importlib.import_module(logger_module_path) return logger_module.TimeStampedDataLogger(**logger_parameters) - def emit(self, timestamp: TimeStampType, data: DataDictionary) -> Any: - """Apply selector code to decide if data have to be logged, then apply formatter code before to call specific logger handle method.""" + def emit(self, context: dict) -> Any: + """Apply selector code to decide if context have to be logged, then apply formatter code before to call specific logger handle method.""" + + try: + + if eval(self.selector, globals(), context): - data['timestamp'] = timestamp + self.handle(eval(self.formatter, globals(), context)) - if eval(self.selector, globals(), data): + except Exception as e: - self.handle(eval(self.formatter, globals(), data)) + print(f'Warning: the following error occurs in TimeStampedDataLogger.emit method ({self.name}):', e) - def handle(self, formatted_log: any): - """Handle formatted log emission to destination.""" + def handle(self, log: any): + """Handle log emission to destination.""" raise NotImplementedError('handle() method not implemented') diff --git a/src/argaze/DataLog/FileWriter.py b/src/argaze/DataLog/FileWriter.py index 9b9fc5f..388ec44 100644 --- a/src/argaze/DataLog/FileWriter.py +++ b/src/argaze/DataLog/FileWriter.py @@ -48,13 +48,13 @@ class TimeStampedDataLogger(DataFeatures.TimeStampedDataLogger): self._file.close() - def handle(self, formatted_log: any): + def handle(self, log: any): """Write log as a new line into file. List or tuple are converted into strings separated by separator char.""" # Format list or tuple element into quoted strings - if not isinstance(formatted_log, str): + if not isinstance(log, str): - formatted_log = self.separator.join(f'\"{d}\"' for d in formatted_log) + log = self.separator.join(f'\"{d}\"' for d in log) # Write into file - print(formatted_log, file=self._file, flush=True) \ No newline at end of file + print(log, file=self._file, flush=True) \ No newline at end of file diff --git a/src/argaze/GazeAnalysis/Basic.py b/src/argaze/GazeAnalysis/Basic.py index a11991f..4631483 100644 --- a/src/argaze/GazeAnalysis/Basic.py +++ b/src/argaze/GazeAnalysis/Basic.py @@ -10,7 +10,7 @@ __license__ = "BSD" from dataclasses import dataclass -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures import numpy @@ -26,6 +26,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__steps_number = 0 self.__step_fixation_durations_average = 0 + @DataFeatures.PipelineStep def analyze(self, scan_path: GazeFeatures.ScanPathType): self.__path_duration = scan_path.duration @@ -34,6 +35,9 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): sum_fixation_durations = 0 + # DEBUG + a = 1 / 0 + for scan_step in scan_path: sum_fixation_durations += scan_step.fixation_duration @@ -70,6 +74,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__steps_number = 0 self.__step_fixation_durations_average = 0 + @DataFeatures.PipelineStep def analyze(self, aoi_scan_path: GazeFeatures.ScanPathType): self.__path_duration = aoi_scan_path.duration diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py index fde9543..c86ebed 100644 --- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py +++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py @@ -12,7 +12,7 @@ from typing import TypeVar, Tuple from dataclasses import dataclass, field import math -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures from argaze.AreaOfInterest import AOIFeatures import numpy @@ -37,6 +37,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__matched_gaze_movement = None self.__matched_region = None + @DataFeatures.PipelineStep def match(self, aoi_scene, gaze_movement) -> Tuple[str, AOIFeatures.AreaOfInterest]: """Returns AOI with the maximal fixation's deviation circle coverage if above coverage threshold.""" diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index 36dfabf..f3ee608 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -12,7 +12,7 @@ from typing import TypeVar, Tuple from dataclasses import dataclass, field import math -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures import numpy import cv2 @@ -142,6 +142,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() + @DataFeatures.PipelineStep def identify(self, ts, gaze_position, terminate=False) -> GazeMovementType: # Ignore non valid gaze position diff --git a/src/argaze/GazeAnalysis/Entropy.py b/src/argaze/GazeAnalysis/Entropy.py index 631a8ea..a391092 100644 --- a/src/argaze/GazeAnalysis/Entropy.py +++ b/src/argaze/GazeAnalysis/Entropy.py @@ -11,7 +11,7 @@ __license__ = "BSD" from typing import Tuple from dataclasses import dataclass, field -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures from argaze.GazeAnalysis import TransitionMatrix import pandas @@ -37,6 +37,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__stationary_entropy = -1 self.__transition_entropy = -1 + @DataFeatures.PipelineStep def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/ExploreExploitRatio.py b/src/argaze/GazeAnalysis/ExploreExploitRatio.py index b66076b..d4c0b6c 100644 --- a/src/argaze/GazeAnalysis/ExploreExploitRatio.py +++ b/src/argaze/GazeAnalysis/ExploreExploitRatio.py @@ -10,7 +10,7 @@ __license__ = "BSD" from dataclasses import dataclass, field -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures import numpy @@ -33,6 +33,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__explore_exploit_ratio = 0. + @DataFeatures.PipelineStep def analyze(self, scan_path: GazeFeatures.ScanPathType): assert(len(scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/FocusPointInside.py b/src/argaze/GazeAnalysis/FocusPointInside.py index e7da766..62ce054 100644 --- a/src/argaze/GazeAnalysis/FocusPointInside.py +++ b/src/argaze/GazeAnalysis/FocusPointInside.py @@ -12,7 +12,7 @@ from typing import TypeVar, Tuple from dataclasses import dataclass, field import math -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures from argaze.AreaOfInterest import AOIFeatures import numpy @@ -30,6 +30,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__looked_aoi_data = (None, None) self.__matched_gaze_movement = None + @DataFeatures.PipelineStep def match(self, aoi_scene, gaze_movement) -> Tuple[str, AOIFeatures.AreaOfInterest]: """Returns AOI containing fixation focus point.""" diff --git a/src/argaze/GazeAnalysis/KCoefficient.py b/src/argaze/GazeAnalysis/KCoefficient.py index c8fa398..c6dfa15 100644 --- a/src/argaze/GazeAnalysis/KCoefficient.py +++ b/src/argaze/GazeAnalysis/KCoefficient.py @@ -10,7 +10,7 @@ __license__ = "BSD" from dataclasses import dataclass -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures import numpy @@ -30,6 +30,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__K = 0 + @DataFeatures.PipelineStep def analyze(self, scan_path: GazeFeatures.ScanPathType): assert(len(scan_path) > 1) @@ -86,6 +87,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__K = 0 + @DataFeatures.PipelineStep def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType) -> float: assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/LempelZivComplexity.py b/src/argaze/GazeAnalysis/LempelZivComplexity.py index 552ad75..67e5001 100644 --- a/src/argaze/GazeAnalysis/LempelZivComplexity.py +++ b/src/argaze/GazeAnalysis/LempelZivComplexity.py @@ -10,7 +10,7 @@ __license__ = "BSD" from dataclasses import dataclass -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures from lempel_ziv_complexity import lempel_ziv_complexity @@ -31,6 +31,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__lempel_ziv_complexity = 0 + @DataFeatures.PipelineStep def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/NGram.py b/src/argaze/GazeAnalysis/NGram.py index 903c0c0..b1e5ab3 100644 --- a/src/argaze/GazeAnalysis/NGram.py +++ b/src/argaze/GazeAnalysis/NGram.py @@ -11,7 +11,7 @@ __license__ = "BSD" from typing import TypeVar, Tuple, Any from dataclasses import dataclass, field -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures @dataclass class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): @@ -35,6 +35,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__ngrams_count = {} + @DataFeatures.PipelineStep def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/NearestNeighborIndex.py b/src/argaze/GazeAnalysis/NearestNeighborIndex.py index 1ca336f..1dc692e 100644 --- a/src/argaze/GazeAnalysis/NearestNeighborIndex.py +++ b/src/argaze/GazeAnalysis/NearestNeighborIndex.py @@ -11,7 +11,7 @@ __license__ = "BSD" from typing import TypeVar, Tuple, Any from dataclasses import dataclass, field -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures import numpy from scipy.spatial.distance import cdist @@ -35,6 +35,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__nearest_neighbor_index = 0 + @DataFeatures.PipelineStep def analyze(self, scan_path: GazeFeatures.ScanPathType): assert(len(scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/TransitionMatrix.py b/src/argaze/GazeAnalysis/TransitionMatrix.py index 5e3bdf5..313c945 100644 --- a/src/argaze/GazeAnalysis/TransitionMatrix.py +++ b/src/argaze/GazeAnalysis/TransitionMatrix.py @@ -11,7 +11,7 @@ __license__ = "BSD" from typing import Tuple from dataclasses import dataclass -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures import pandas import numpy @@ -33,6 +33,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__transition_matrix_probabilities = pandas.DataFrame() self.__transition_matrix_density = 0. + @DataFeatures.PipelineStep def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index e9f770a..1e486e1 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -11,7 +11,7 @@ from typing import TypeVar, Tuple from dataclasses import dataclass, field import math -from argaze import GazeFeatures +from argaze import GazeFeatures, DataFeatures import numpy import cv2 @@ -142,6 +142,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() + @DataFeatures.PipelineStep def identify(self, ts, gaze_position, terminate=False) -> GazeMovementType: # Ignore non valid gaze position diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index a57a127..c9269e0 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -523,6 +523,7 @@ class TimeStampedGazeStatus(DataFeatures.TimeStampedBuffer): class GazeMovementIdentifier(): """Abstract class to define what should provide a gaze movement identifier.""" + @DataFeatures.PipelineStep def identify(self, timestamp: int|float, gaze_position: GazePosition, terminate:bool=False) -> Tuple[GazeMovementType, GazeMovementType]: """Identify gaze movement from successive timestamped gaze positions. Each identified gaze movement should share its first/last gaze position with previous/next gaze movement. @@ -835,6 +836,7 @@ class ScanPathAnalyzer(): return DataFeatures.DataDictionary(analysis) + @DataFeatures.PipelineStep def analyze(self, scan_path: ScanPathType): """Analyze scan path.""" @@ -1172,6 +1174,7 @@ class AOIScanPathAnalyzer(): return DataFeatures.DataDictionary(analysis) + @DataFeatures.PipelineStep def analyze(self, aoi_scan_path: AOIScanPathType): """Analyze aoi scan path.""" diff --git a/src/argaze/PupillAnalysis/WorkloadIndex.py b/src/argaze/PupillAnalysis/WorkloadIndex.py index 4a20091..1429eaf 100644 --- a/src/argaze/PupillAnalysis/WorkloadIndex.py +++ b/src/argaze/PupillAnalysis/WorkloadIndex.py @@ -33,6 +33,7 @@ class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer): self.__variations_number = 0 self.__last_ts = 0 + @DataFeatures.PipelineStep def analyze(self, ts, pupill_diameter) -> float: """Analyze workload index from successive timestamped pupill diameters.""" diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupillFeatures.py index d751af6..5eb70ce 100644 --- a/src/argaze/PupillFeatures.py +++ b/src/argaze/PupillFeatures.py @@ -82,6 +82,7 @@ TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") class PupillDiameterAnalyzer(): """Abstract class to define what should provide a pupill diameter analyser.""" + @DataFeatures.PipelineStep def analyze(self, ts, pupill_diameter) -> float: """Analyze pupill diameter from successive timestamped pupill diameters.""" diff --git a/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json b/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json index 71006f2..2763a27 100644 --- a/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json +++ b/src/argaze/utils/demo_data/demo_gaze_analysis_setup.json @@ -52,8 +52,8 @@ "FileWriter" : { "path": "_export/logs/aoi_scan_path_metrics.csv", "header": "Timestamp (ms), Duration (ms), Step, K, LZC", - "selector": "len(aoi_scan_path_analysis) > 0", - "formatter": "timestamp, aoi_scan_path_analysis['argaze.GazeAnalysis.Basic'].path_duration, aoi_scan_path_analysis['argaze.GazeAnalysis.Basic'].steps_number, aoi_scan_path_analysis['argaze.GazeAnalysis.KCoefficient'].K, aoi_scan_path_analysis['argaze.GazeAnalysis.LempelZivComplexity'].lempel_ziv_complexity" + "selector": "self.new_analysis_available", + "formatter": "timestamp, self.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].path_duration, self.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].steps_number, self.aoi_scan_path_analyzers['argaze.GazeAnalysis.KCoefficient'].K, self.aoi_scan_path_analyzers['argaze.GazeAnalysis.LempelZivComplexity'].lempel_ziv_complexity" } } } @@ -64,16 +64,23 @@ "FileWriter" : { "path": "_export/logs/Fixations.csv", "header": "Timestamp (ms), Focus (px), Duration (ms), AOI", - "selector": "GazeFeatures.is_fixation(gaze_movement) and gaze_movement.finished", - "formatter": "timestamp, gaze_movement.focus, gaze_movement.duration, layers.main_layer.looked_aoi_name" + "selector": "GazeFeatures.is_fixation(self.gaze_movement) and self.gaze_movement.finished", + "formatter": "timestamp, self.gaze_movement.focus, self.gaze_movement.duration, self.layers['main_layer'].aoi_matcher.looked_aoi_name" + } + }, + "messages": { + "FileWriter" : { + "path": "_export/logs/Messages.csv", + "selector": "GazeFeatures.is_fixation(self.gaze_movement) and not self.gaze_movement.finished", + "formatter": "f'FixationInProgress Start={self.gaze_movement.positions.first[0]} Duration={self.gaze_movement.duration} AOI={self.layers[\"main_layer\"].aoi_matcher.looked_aoi_name} Probabilities={self.layers[\"main_layer\"].aoi_matcher.looked_probabilities}'" } }, "scan_path_metrics": { "FileWriter" : { "path": "_export/logs/scan_path_metrics.csv", "header": "Timestamp (ms), Duration (ms), Step, K, NNI, XXR", - "selector": "len(scan_path_analysis) > 0", - "formatter": "timestamp, scan_path_analysis['argaze.GazeAnalysis.Basic'].path_duration, scan_path_analysis['argaze.GazeAnalysis.Basic'].steps_number, scan_path_analysis['argaze.GazeAnalysis.KCoefficient'].K, scan_path_analysis['argaze.GazeAnalysis.NearestNeighborIndex'].nearest_neighbor_index, scan_path_analysis['argaze.GazeAnalysis.ExploreExploitRatio'].explore_exploit_ratio" + "selector": "self.new_analysis_available", + "formatter": "timestamp, self.scan_path_analyzers['argaze.GazeAnalysis.Basic'].path_duration, self.scan_path_analyzers['argaze.GazeAnalysis.Basic'].steps_number, self.scan_path_analyzers['argaze.GazeAnalysis.KCoefficient'].K, self.scan_path_analyzers['argaze.GazeAnalysis.NearestNeighborIndex'].nearest_neighbor_index, self.scan_path_analyzers['argaze.GazeAnalysis.ExploreExploitRatio'].explore_exploit_ratio" } } }, diff --git a/src/argaze/utils/demo_gaze_analysis_run.py b/src/argaze/utils/demo_gaze_analysis_run.py index 5f46596..210e188 100644 --- a/src/argaze/utils/demo_gaze_analysis_run.py +++ b/src/argaze/utils/demo_gaze_analysis_run.py @@ -50,7 +50,7 @@ def main(): timestamp = int((time.time() - start_time) * 1e3) # Project gaze position into frame - ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y))) + execution_time, exception = ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y))) # Attach mouse callback to window cv2.setMouseCallback(ar_frame.name, on_mouse_event) -- cgit v1.1