aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/argaze/ArFeatures.py116
-rw-r--r--src/argaze/DataFeatures.py50
-rw-r--r--src/argaze/DataLog/FileWriter.py60
-rw-r--r--src/argaze/DataLog/__init__.py4
-rw-r--r--src/argaze/utils/UtilsFeatures.py58
-rw-r--r--src/argaze/utils/demo_aruco_markers_run.py4
-rw-r--r--src/argaze/utils/demo_gaze_analysis_run.py68
7 files changed, 174 insertions, 186 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 4989e65..ff29baa 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -131,6 +131,9 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Init current looked aoi name
self.__looked_aoi_name = None
+ # Init aoi scan path analyzed state
+ self.__aoi_scan_path_analyzed = False
+
# Cast aoi scene to its effective dimension
if self.aoi_scene.dimension == 2:
@@ -337,8 +340,27 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return self.__looked_aoi_name
+ @property
+ def aoi_scan_path_analyzed(self) -> bool:
+ """Are aoi scan path analysis ready?"""
+
+ return self.__aoi_scan_path_analyzed
+
+ def aoi_scan_path_analysis(self) -> Iterator[Union[str, dict]]:
+ """Get aoi scan path analysis.
+
+ Returns
+ iterator: analyzer module path, analysis dictionary
+ """
+
+ assert(self.__aoi_scan_path_analyzed)
+
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
+
+ yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
+
@DataFeatures.PipelineStepMethod
- def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]:
+ def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()):
"""
Project timestamped gaze movement into layer.
@@ -346,10 +368,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute.
Parameters:
+ timestamp: ny number used to know when the given gaze position occurs
gaze_movement: gaze movement to project
-
- Returns:
- iterator: this layer, analyzer type and analysis dictionary.
"""
# Use try block to always release the layer lock in finally block
@@ -364,6 +384,9 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# No looked aoi by default
self.__looked_aoi_name = None
+ # Reset aoi scan path analyzed state
+ self.__aoi_scan_path_analyzed = False
+
if self.aoi_matcher is not None:
# Update looked aoi thanks to aoi matcher
@@ -383,13 +406,13 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Is there a new step?
if aoi_scan_step is not None and len(self.aoi_scan_path) > 1:
+ # Analyze aoi scan path
for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
- # Analyze aoi scan path
aoi_scan_path_analyzer.analyze(self.aoi_scan_path)
- # Output analysis
- yield self, aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
+ # Update aoi scan path analyzed state
+ self.__aoi_scan_path_analyzed = True
elif GazeFeatures.is_saccade(gaze_movement):
@@ -454,11 +477,6 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = {
}
}
-def is_layer(obj):
- """Is an object a layer?"""
-
- return type(obj) == ArLayer
-
@dataclass
class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""
@@ -512,6 +530,9 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Init current gaze movement
self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ # Init scan path analyzed state
+ self.__scan_path_analyzed = False
+
@classmethod
def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType:
"""Load attributes from dictionary.
@@ -762,6 +783,25 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return self.__identified_gaze_movement
+ @property
+ def scan_path_analyzed(self) -> bool:
+ """Are scan path analysis ready?"""
+
+ return self.__scan_path_analyzed
+
+ def scan_path_analysis(self) -> Iterator[Union[str, dict]]:
+ """Get scan path analysis.
+
+ Returns
+ iterator: analyzer module path, analysis dictionary
+ """
+
+ assert(self.__scan_path_analyzed)
+
+ for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items():
+
+ yield aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer.analysis
+
@DataFeatures.PipelineStepMethod
def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]:
"""
@@ -773,9 +813,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
Parameters:
timestamp: any number used to know when the given gaze position occurs
gaze_position: gaze position to project
-
- Returns:
- iterator: this frame or one of its layers, analyzer type and analysis dictionary.
"""
# Use try block to always release the frame lock in finally block
@@ -787,6 +824,9 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# No gaze movement identified by default
self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ # Reset scan path analyzed state
+ self.__scan_path_analyzed = False
+
# Apply gaze position calibration
if self.gaze_position_calibrator is not None:
@@ -823,13 +863,13 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Is there a new step?
if scan_step and len(self.scan_path) > 1:
+ # Analyze aoi scan path
for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items():
- # Analyze aoi scan path
scan_path_analyzer.analyze(self.scan_path)
- # Output analysis
- yield self, scan_path_analyzer_module_path, scan_path_analyzer.analysis
+ # Update scan path analyzed state
+ self.__scan_path_analyzed = True
# No valid finished gaze movement: optionnaly stop in progress identification filtering
elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification:
@@ -849,10 +889,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally
for layer_name, layer in self.layers.items():
- for layer_output in layer.look(timestamp, self.__identified_gaze_movement):
-
- # Output layer analysis
- yield layer_output
+ layer.look(timestamp, self.__identified_gaze_movement)
finally:
@@ -950,11 +987,6 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
return self.__image(**self.image_parameters)
-def is_frame(obj):
- """Is an object a frame?"""
-
- return type(obj) == ArFrame or is_camera(obj)
-
@dataclass
class ArScene():
"""
@@ -1201,11 +1233,6 @@ class ArScene():
raise NotImplementedError('draw() method not implemented')
-def is_scene(obj):
- """Is an object a scene?"""
-
- return type(obj).__bases__[0] == ArScene
-
@dataclass
class ArCamera(ArFrame):
"""
@@ -1329,25 +1356,19 @@ class ArCamera(ArFrame):
raise NotImplementedError('watch() method not implemented')
@DataFeatures.PipelineStepMethod
- def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition) -> Iterator[Union[object, type, dict]]:
+ def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition):
"""Project timestamped gaze position into each scene frames.
!!! warning
watch method needs to be called first.
Parameters:
- timestamp: gaze position time stamp (unit does'nt matter)
- gaze_position: GazePosition object
-
- Returns:
- iterator: this camera frame or a scene frame or one of their layers, analyzer type and analysis dictionary.
+ timestamp: ny number used to know when the given gaze position occurs
+ gaze_movement: gaze movement to project
"""
# Project gaze position into camera frame
- for camera_frame_output in super().look(timestamp, gaze_position):
-
- # Output camera frame analysis
- yield camera_frame_output
+ super().look(timestamp, gaze_position)
# Use try block to always release the camera frame lock in finally block
try:
@@ -1373,10 +1394,8 @@ class ArCamera(ArFrame):
# QUESTION: How to project gaze precision?
inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y))
- for scene_frame_output in scene_frame.look(timestamp, inner_gaze_position * scene_frame.size):
-
- # output scene frame analysis
- yield scene_frame_output
+ # Project inner gaze position into scene frame
+ scene_frame.look(timestamp, inner_gaze_position * scene_frame.size)
# Ignore missing aoi in camera frame layer projection
except KeyError as e:
@@ -1428,8 +1447,3 @@ class ArCamera(ArFrame):
with open(json_filepath, 'w', encoding='utf-8') as file:
json.dump(self, file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder)
-
-def is_camera(obj):
- """Is an object a camera?"""
-
- return type(obj).__bases__[0] == ArCamera \ No newline at end of file
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index a751fc5..1e6abfe 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -439,53 +439,3 @@ def PipelineStepMethod(method):
return result
return wrapper
-
-# Import libraries that can be used in selector or formatter codes
-from argaze import GazeFeatures
-
-TimeStampedDataLoggerType = TypeVar('TimeStampedDataLogger', bound="TimeStampedDataLogger")
-# Type definition for type annotation convenience
-
-@dataclass
-class TimeStampedDataLogger():
- """Abstract class to define what should provide a timestamped data logger."""
-
- formatter: str = field(default='')
- """Code evaluated to edit the log."""
-
- @classmethod
- def from_dict(self, logger_module_path: str, logger_parameters: dict) -> TimeStampedDataLoggerType:
- """Load timestamped data logger from dictionary.
-
- Parameters:
- logger_module_path: module to load
- logger_parameters: instance parameters
- """
-
- # Prepend argaze.DataLog path when a single name is provided
- if len(logger_module_path.split('.')) == 1:
- logger_module_path = f'argaze.DataLog.{logger_module_path}'
-
- logger_module = importlib.import_module(logger_module_path)
- return logger_module.TimeStampedDataLogger(**logger_parameters)
-
- def emit(self, context: dict) -> Any:
- """Apply selector code to decide if context have to be logged, then apply formatter code before to call specific logger handle method."""
-
- try:
-
- self.handle(eval(self.formatter, globals(), context))
-
- except Exception as e:
-
- print(f'Warning: the following error occurs in TimeStampedDataLogger.emit method ({self.name}):', e)
-
- def setup(self, log: str|tuple):
- """Prepare log emission to destination."""
-
- raise NotImplementedError('setup() method not implemented')
-
- def handle(self, log: str|tuple):
- """Handle log emission to destination."""
-
- raise NotImplementedError('handle() method not implemented')
diff --git a/src/argaze/DataLog/FileWriter.py b/src/argaze/DataLog/FileWriter.py
deleted file mode 100644
index 388ec44..0000000
--- a/src/argaze/DataLog/FileWriter.py
+++ /dev/null
@@ -1,60 +0,0 @@
-#!/usr/bin/env python
-
-"""Module for file logging.
-"""
-
-__author__ = "Théo de la Hogue"
-__credits__ = []
-__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
-__license__ = "BSD"
-
-from typing import TypeVar, Tuple
-from dataclasses import dataclass, field
-import os, pathlib
-
-from argaze import DataFeatures
-
-@dataclass
-class TimeStampedDataLogger(DataFeatures.TimeStampedDataLogger):
- """Implementation of file logger."""
-
- path: str = field(default=None)
- """File path where to write data."""
-
- header: str = field(default=None)
- """String to write first."""
-
- separator: str = field(default=", ")
- """String used to separate list or tuple formatted log."""
-
- def __post_init__(self):
- """Check that folder structure exist and create file."""
-
- self.path = pathlib.Path(self.path)
-
- if not os.path.exists(self.path.parent.absolute()):
- os.makedirs(self.path.parent.absolute())
-
- # Open file
- self._file = open(self.path, 'w', encoding='utf-8', buffering=1)
-
- # Write header if required
- if self.header is not None:
-
- print(self.header, file=self._file, flush=True)
-
- def __del__(self):
- """Close file."""
-
- self._file.close()
-
- def handle(self, log: any):
- """Write log as a new line into file. List or tuple are converted into strings separated by separator char."""
-
- # Format list or tuple element into quoted strings
- if not isinstance(log, str):
-
- log = self.separator.join(f'\"{d}\"' for d in log)
-
- # Write into file
- print(log, file=self._file, flush=True) \ No newline at end of file
diff --git a/src/argaze/DataLog/__init__.py b/src/argaze/DataLog/__init__.py
deleted file mode 100644
index febd648..0000000
--- a/src/argaze/DataLog/__init__.py
+++ /dev/null
@@ -1,4 +0,0 @@
-"""
-Various data logger handler.
-"""
-__all__ = ['FileWriter'] \ No newline at end of file
diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py
index 7971167..0cb6d77 100644
--- a/src/argaze/utils/UtilsFeatures.py
+++ b/src/argaze/utils/UtilsFeatures.py
@@ -146,3 +146,61 @@ class TimeProbe():
self.start()
+def tuple_to_string(t: tuple, separator: str = ", ") -> str:
+ """Convert tuple elements into quoted strings separated by a separator string."""
+
+ return separator.join(f'\"{e}\"' for e in t)
+
+class FileWriter():
+ """Write into a file line by line.
+
+ Parameters:
+ path: File path where to write data.
+ header: String or tuple to write first.
+ separator: String used to separate elements during tuple to string conversion.
+ """
+
+ def __init__(self, path: str, header: str|tuple, separator: str = ", "):
+ """Check that folder structure exist and create file then, write header line."""
+
+ import os
+ import pathlib
+
+ self.path = pathlib.Path(path)
+ self.separator = separator
+
+ if not os.path.exists(self.path.parent.absolute()):
+ os.makedirs(self.path.parent.absolute())
+
+ # Open file
+ self._file = open(self.path, 'w', encoding='utf-8', buffering=1)
+
+ # Write header if required
+ if header is not None:
+
+ # Format list or tuple element into quoted strings
+ if not isinstance(header, str):
+
+ header = tuple_to_string(header, self.separator)
+
+ print(header, file=self._file, flush=True)
+
+ def __del__(self):
+ """Close file."""
+
+ self._file.close()
+
+ def write(self, log: str|tuple):
+ """Write log as a new line into file.
+
+ !!! note
+ Tuple elements are converted into quoted strings separated by separator string.
+ """
+
+ # Format list or tuple element into quoted strings
+ if not isinstance(log, str):
+
+ log = tuple_to_string(log, self.separator)
+
+ # Write into file
+ print(log, file=self._file, flush=True) \ No newline at end of file
diff --git a/src/argaze/utils/demo_aruco_markers_run.py b/src/argaze/utils/demo_aruco_markers_run.py
index 091b1e1..87b4bc3 100644
--- a/src/argaze/utils/demo_aruco_markers_run.py
+++ b/src/argaze/utils/demo_aruco_markers_run.py
@@ -68,9 +68,7 @@ def main():
try:
# Project gaze position into camera
- for _ in aruco_camera.look(timestamp, GazeFeatures.GazePosition((x, y))):
-
- pass
+ aruco_camera.look(timestamp, GazeFeatures.GazePosition((x, y)))
# Assess gaze analysis
gaze_analysis_time = aruco_camera.execution_times['look']
diff --git a/src/argaze/utils/demo_gaze_analysis_run.py b/src/argaze/utils/demo_gaze_analysis_run.py
index d36f1c8..9640d18 100644
--- a/src/argaze/utils/demo_gaze_analysis_run.py
+++ b/src/argaze/utils/demo_gaze_analysis_run.py
@@ -14,7 +14,7 @@ import time
from argaze import ArFeatures, GazeFeatures
from argaze.AreaOfInterest import AOIFeatures
from argaze.GazeAnalysis import *
-from argaze.DataLog import FileWriter
+from argaze.utils import UtilsFeatures
import cv2
import numpy
@@ -27,6 +27,8 @@ def main():
current_directory = os.path.dirname(os.path.abspath(__file__))
+ print(current_directory)
+
# Manage arguments
parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
parser.add_argument('frame', metavar='FRAME', type=str, help='ar frame filepath')
@@ -35,10 +37,13 @@ def main():
# Load ArFrame
ar_frame = ArFeatures.ArFrame.from_json(args.frame)
+ # Bind to a frame layer
+ main_layer = ar_frame.layers['main_layer']
+
# Create FileWriter loggers
- fixation_logger = FileWriter.TimeStampedDataLogger(path="_export/logs/fixations.csv", header="Timestamp (ms), Focus (px), Duration (ms), AOI")
- scan_path_logger = FileWriter.TimeStampedDataLogger(path="_export/logs/scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, NNI, XXR")
- aoi_scan_path_logger = FileWriter.TimeStampedDataLogger(path="_export/logs/aoi_scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, LZC")
+ fixation_logger = UtilsFeatures.FileWriter(path="_export/logs/fixations.csv", header="Timestamp (ms), Focus (px), Duration (ms), AOI")
+ scan_path_logger = UtilsFeatures.FileWriter(path="_export/logs/scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, NNI, XXR")
+ aoi_scan_path_logger = UtilsFeatures.FileWriter(path="_export/logs/aoi_scan_path_metrics.csv", header="Timestamp (ms), Duration (ms), Step, K, LZC")
# Create a window to display ArCamera
cv2.namedWindow(ar_frame.name, cv2.WINDOW_AUTOSIZE)
@@ -55,27 +60,54 @@ def main():
# Edit millisecond timestamp
timestamp = int((time.time() - start_time) * 1e3)
- # Project gaze position into frame and iterate over analysis
- for element, module, analysis in ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y))):
+ try:
- # Ckeck if analysis comes from frame
- if ArFeatures.is_frame(element):
+ # Project gaze position into frame
+ ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y)))
- # Do something with scan path module analysis
- ...
+ # Catch pipeline exception
+ except Exception as e:
- # Ckeck if analysis comes from frame
- elif ArFeatures.is_layer(element):
-
- # Do something with aoi scan path module analysis
- ...
+ print('Gaze projection error:', e)
# Log fixations
if GazeFeatures.is_fixation(ar_frame.gaze_movement) and ar_frame.gaze_movement.finished:
- log = timestamp, ar_frame.gaze_movement.focus, ar_frame.gaze_movement.duration, ar_frame.layers['main_layer'].looked_aoi_name
-
- fixation_logger.handle(log)
+ log = (
+ timestamp,
+ ar_frame.gaze_movement.focus,
+ ar_frame.gaze_movement.duration,
+ ar_frame.layers['main_layer'].looked_aoi_name
+ )
+
+ fixation_logger.write(log)
+
+ # Log scan path metrics
+ if ar_frame.scan_path_analyzed:
+
+ log = (
+ timestamp,
+ ar_frame.scan_path_analyzers['argaze.GazeAnalysis.Basic'].path_duration,
+ ar_frame.scan_path_analyzers['argaze.GazeAnalysis.Basic'].steps_number,
+ ar_frame.scan_path_analyzers['argaze.GazeAnalysis.KCoefficient'].K,
+ ar_frame.scan_path_analyzers['argaze.GazeAnalysis.NearestNeighborIndex'].nearest_neighbor_index,
+ ar_frame.scan_path_analyzers['argaze.GazeAnalysis.ExploreExploitRatio'].explore_exploit_ratio
+ )
+
+ scan_path_logger.write(log)
+
+ # Log aoi scan path metrics
+ if main_layer.aoi_scan_path_analyzed:
+
+ log = (
+ timestamp,
+ main_layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].path_duration,
+ main_layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.Basic'].steps_number,
+ main_layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.KCoefficient'].K,
+ main_layer.aoi_scan_path_analyzers['argaze.GazeAnalysis.LempelZivComplexity'].lempel_ziv_complexity
+ )
+
+ aoi_scan_path_logger.write(log)
# Attach mouse callback to window
cv2.setMouseCallback(ar_frame.name, on_mouse_event)