From cd601be0b9366a9bd1554523319e57801440ed64 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 14:18:50 +0100 Subject: More work on time management. --- src/argaze/ArFeatures.py | 26 +++++--- src/argaze/DataFeatures.py | 46 ++----------- src/argaze/GazeAnalysis/DeviationCircleCoverage.py | 2 +- .../DispersionThresholdIdentification.py | 10 +-- .../VelocityThresholdIdentification.py | 4 +- src/argaze/GazeFeatures.py | 26 +++++--- src/argaze/PupillAnalysis/WorkloadIndex.py | 44 +++++++----- src/argaze/PupillFeatures.py | 78 +++++++--------------- 8 files changed, 97 insertions(+), 139 deletions(-) (limited to 'src/argaze') diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index b3ecad6..8005d48 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -415,7 +415,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement) # Valid and finished gaze movement has been identified - if gaze_movement.valid and gaze_movement.finished: + if gaze_movement and gaze_movement.finished: if GazeFeatures.is_fixation(gaze_movement): @@ -908,22 +908,22 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Identify gaze movement if self.__gaze_movement_identifier is not None: - + # Identify finished gaze movement self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified - if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished: - + if self.__identified_gaze_movement and self.__identified_gaze_movement.finished: + if GazeFeatures.is_fixation(self.__identified_gaze_movement): - + # Append fixation to scan path if self.__scan_path is not None: - + self.__scan_path.append_fixation(self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): - + # Append saccade to scan path if self.__scan_path is not None: @@ -931,10 +931,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Is there a new step? if scan_step and len(self.__scan_path) > 1: - + # Analyze aoi scan path for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items(): - + scan_path_analyzer.analyze(timestamp, self.__scan_path) # Update scan path analyzed state @@ -1013,12 +1013,16 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Draw current fixation if required if draw_fixations is not None and self.__gaze_movement_identifier is not None: - self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) + if self.__gaze_movement_identifier.current_fixation: + + self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) # Draw current saccade if required if draw_saccades is not None and self.__gaze_movement_identifier is not None: - self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) + if self.__gaze_movement_identifier.current_saccade: + + self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) # Draw layers if required if draw_layers is not None: diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index ff9baec..8df991b 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -149,20 +149,15 @@ class TimestampedObject(): @timestamp.setter def timestamp(self, timestamp: int|float): """Set object timestamp.""" - - assert(type(timestamp) == int or type(timestamp) == float) - self._timestamp = timestamp def untimestamp(self): """Reset object timestamp.""" - self.timestamp = math.nan + self._timestamp = math.nan def is_timestamped(self) -> bool: """Is the object timestamped?""" - timestamped = not math.isnan(self.timestamp) - - return timestamped + return not math.isnan(self._timestamp) class TimestampedObjectsList(list): """Handle timestamped object into a list. @@ -188,7 +183,7 @@ class TimestampedObjectsList(list): def append(self, ts_object: TimestampedObjectType|dict): """Append timestamped object.""" - + # Convert dict into GazePosition if type(ts_object) == dict: @@ -200,7 +195,7 @@ class TimestampedObjectsList(list): if not issubclass(ts_object.__class__, self.__object_type): raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') - + assert(ts_object.is_timestamped()) super().append(ts_object) @@ -421,42 +416,15 @@ class TimestampedObjectsList(list): return legend_patches class SharedObject(TimestampedObject): - """Abstract class to enable multiple threads sharing and timestamp management.""" + """Abstract class to enable multiple threads sharing for timestamped object.""" - def __init__(self): + def __init__(self, timestamp: int|float = math.nan): - super().__init__() + TimestampedObject.__init__(self, timestamp) self._lock = threading.Lock() self._execution_times = {} self._exceptions = {} - @property - def lock(self) -> threading.Lock: - """Get shared object lock object.""" - return self._lock - - @property - def timestamp(self) -> int|float: - """Get shared object timestamp.""" - with self._lock: - return super().timestamp - - @timestamp.setter - def timestamp(self, timestamp: int|float): - """Set shared object timestamp.""" - with self._lock: - super().timestamp = timestamp - - def untimestamp(self): - """Reset shared object timestamp.""" - with self._lock: - self.timestamp = math.nan - - def is_timestamped(self) -> bool: - """Is the object timestamped?""" - with self._lock: - return super().is_timestamped() - class PipelineStepObject(): """ Define class to assess pipeline step methods execution time and observe them. diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py index 62b5e9a..3849d59 100644 --- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py +++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py @@ -107,7 +107,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__reset() - elif not gaze_movement.valid: + elif not gaze_movement: self.__reset() diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index 13529e7..2b89cf6 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -100,8 +100,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - start_position = self.positions[0] - last_position = self.positions[-1] + start_position = self[0] + last_position = self[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @@ -142,12 +142,12 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @DataFeatures.PipelineStepMethod def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: - + # Ignore empty gaze position if not gaze_position: return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() - + # Check if too much time elapsed since last valid gaze position if self.__valid_positions: @@ -171,7 +171,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Store gaze positions until a minimal duration self.__valid_positions.append(gaze_position) - + # Once the minimal duration is reached if self.__valid_positions.duration >= self.__duration_min_threshold: diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index c1d448a..a95905f 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -99,8 +99,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - start_position = self.positions[0] - last_position = self.positions[-1] + start_position = self[0] + last_position = self[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index eac9e5c..6a02142 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -34,13 +34,13 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): message: a string to describe why the the position is what it is. """ - def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, **kwargs): + def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan): return tuple.__new__(cls, position) - def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, **kwargs): + def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan): - DataFeatures.TimestampedObject.__init__(self, **kwargs) + DataFeatures.TimestampedObject.__init__(self, timestamp) self.__precision = precision self.__message = message @@ -178,7 +178,7 @@ TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeSt # Type definition for type annotation convenience class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): - """Handle timestamped gaze positions into a list""" + """Handle timestamped gaze positions into a list.""" def __init__(self, gaze_positions: list = []): @@ -188,6 +188,7 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): """Get all timestamped position values as list of tuple.""" return [tuple(ts_position) for ts_position in self] + ''' Is it still needed as there is a TimestampedObjectsList.from_json method? @classmethod def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType: """Create a TimeStampedGazePositionsType from .json file.""" @@ -197,6 +198,7 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): json_positions = json.load(ts_positions_file) return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions}) + ''' @classmethod def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> TimeStampedGazePositionsType: @@ -367,15 +369,15 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): message: a string to describe why the movement is what it is. """ - def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan): return TimeStampedGazePositions.__new__(cls, positions) - def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan): """Initialize GazeMovement""" TimeStampedGazePositions.__init__(self, positions) - DataFeatures.TimestampedObject.__init__(self, **kwargs) + DataFeatures.TimestampedObject.__init__(self, timestamp) self.__finished = finished self.__message = message @@ -385,9 +387,13 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): """Get first position timestamp.""" return self[0].timestamp + def is_timestamped(self) -> bool: + """If first position exist, the movement is timestamped.""" + return bool(self) + @timestamp.setter def timestamp(self, timestamp: int|float): - """Block gaze movment timestamp setting.""" + """Block gaze movement timestamp setting.""" raise('GazeMovement timestamp is first positon timestamp.') @property @@ -774,7 +780,7 @@ class ScanPath(list): self.__duration -= oldest_step.duration - def append_saccade(self, ts, saccade) -> ScanStepType: + def append_saccade(self, saccade) -> ScanStepType: """Append new saccade to scan path and return last new scan step if one have been created.""" # Ignore saccade if no fixation came before @@ -802,7 +808,7 @@ class ScanPath(list): # Clear last fixation self.__last_fixation = None - def append_fixation(self, ts, fixation): + def append_fixation(self, fixation): """Append new fixation to scan path. !!! warning Consecutives fixations are ignored keeping the last fixation""" diff --git a/src/argaze/PupillAnalysis/WorkloadIndex.py b/src/argaze/PupillAnalysis/WorkloadIndex.py index 1f3c586..f97dce3 100644 --- a/src/argaze/PupillAnalysis/WorkloadIndex.py +++ b/src/argaze/PupillAnalysis/WorkloadIndex.py @@ -15,51 +15,61 @@ from argaze import PupillFeatures import numpy -@dataclass class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer): - """Periodic average of pupill diameter variations to pupill diameter reference value.""" + """Periodic average of pupill diameter variations to pupill diameter reference value. - reference: PupillFeatures.PupillDiameter - """ """ + Parameters: + reference: base line value. + period: identification period length. + """ + def __init__(self, reference: PupillFeatures.PupillDiameter, period: int|float = 1): - period: int | float = field(default=1) - """Identification period length.""" + assert(not math.isnan(self.__reference)) - def __post_init__(self): - - assert(self.reference.valid) + self.__reference = reference + self.__period = period self.__variations_sum = 0. self.__variations_number = 0 self.__last_ts = 0 + @property + def reference(self) -> PupillFeatures.PupillDiameter: + """Get workload index reference.""" + return self.__reference + + @property + def period(self) -> int|float: + """Get workload index period.""" + return self.__period + @DataFeatures.PipelineStepMethod - def analyze(self, ts: int|float, pupill_diameter) -> float: + def analyze(self, pupill_diameter: PupillFeatures.PupillDiameter) -> float: """Analyze workload index from successive timestamped pupill diameters.""" # Ignore non valid pupill diameter - if not pupill_diameter.valid: + if not math.isnan(pupill_diameter): return None - if ts - self.__last_ts >= self.period: + if pupill_diameter.timestamp - self.__last_ts >= self.__period: - if self.__variations_number > 0 and self.reference.value > 0.: + if self.__variations_number > 0 and self.__reference.value > 0.: - workload_index = (self.__variations_sum / self.__variations_number) / self.reference.value + workload_index = (self.__variations_sum / self.__variations_number) / self.__reference.value else: workload_index = 0. - self.__variations_sum = pupill_diameter.value - self.reference.value + self.__variations_sum = pupill_diameter.value - self.__reference.value self.__variations_number = 1 - self.__last_ts = ts + self.__last_ts = pupill_diameter.timestamp return workload_index else: - self.__variations_sum += pupill_diameter.value - self.reference.value + self.__variations_sum += pupill_diameter.value - self.__reference.value self.__variations_number += 1 \ No newline at end of file diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupillFeatures.py index d8f9331..492e7ca 100644 --- a/src/argaze/PupillFeatures.py +++ b/src/argaze/PupillFeatures.py @@ -10,71 +10,41 @@ __license__ = "BSD" from typing import TypeVar from dataclasses import dataclass, field import json +import math from argaze import DataFeatures -@dataclass(frozen=True) -class PupillDiameter(): - """Define pupill diameter as ...""" - - value: float = field(default=0.) - """Pupill diameter value.""" - - @property - def valid(self) -> bool: - """Is the value not 0""" - - return self.value != 0. - - def __repr__(self): - """String representation""" +PupillDiameterType = TypeVar('PupillDiameter', bound="PupillDiameter") +# Type definition for type annotation convenience - return json.dumps(self, ensure_ascii = False, default=vars) +class PupillDiameter(float, DataFeatures.TimestampedObject): + """Define pupill diameter as a single float value. -class UnvalidPupillDiameter(PupillDiameter): - """Unvalid pupill diameter.""" + Parameters: + value: pupill diameter value. + """ + def __new__(cls, value: float = math.nan, **kwargs): - def __init__(self, message=None): + return float.__new__(cls, value) - self.message = message + def __init__(self, value: float = math.nan, **kwargs): - super().__init__(0.) + super().__init__(**kwargs) + @property + def value(self): + """Get pupill diameter value.""" + return float(self) + TimeStampedPupillDiametersType = TypeVar('TimeStampedPupillDiameters', bound="TimeStampedPupillDiameters") # Type definition for type annotation convenience -class TimeStampedPupillDiameters(DataFeatures.TimeStampedBuffer): - """Define timestamped buffer to store pupill diameters.""" - - def __setitem__(self, key, value: PupillDiameter|dict): - """Force PupillDiameter storage.""" - - # Convert dict into PupillDiameter - if type(value) == dict: - - assert(set(['value']).issubset(value.keys())) - - if 'message' in value.keys(): - - value = UnvalidPupillDiameter(value['message']) - - else: - - value = PupillDiameter(value['value']) - - assert(type(value) == PupillDiameter or type(value) == UnvalidPupillDiameter) - - super().__setitem__(key, value) - - @classmethod - def from_json(self, json_filepath: str) -> TimeStampedPupillDiametersType: - """Create a TimeStampedPupillDiametersType from .json file.""" - - with open(json_filepath, encoding='utf-8') as ts_buffer_file: +class TimeStampedPupillDiameters(DataFeatures.TimestampedObjectsList): + """Handle timestamped pupill diamters into a list.""" - json_buffer = json.load(ts_buffer_file) + def __init__(self, pupill_diameters: list = []): - return TimeStampedPupillDiameters({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) + DataFeatures.TimestampedObjectsList.__init__(self, PupillDiameter, pupill_diameters) TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") # Type definition for type annotation convenience @@ -83,7 +53,7 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a pupill diameter analyser.""" @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, pupill_diameter, float) -> float: + def analyze(self, pupill_diameter: PupillDiameterType) -> any: """Analyze pupill diameter from successive timestamped pupill diameters.""" raise NotImplementedError('analyze() method not implemented') @@ -96,9 +66,9 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): ts_analyzis = DataFeatures.TimeStampedBuffer() # Iterate on pupill diameters - for ts, pupill_diameter in ts_pupill_diameters.items(): + for pupill_diameter in ts_pupill_diameters: - analysis = self.analyze(ts, pupill_diameter) + analysis = self.analyze(pupill_diameter) if analysis is not None: -- cgit v1.1