From cd601be0b9366a9bd1554523319e57801440ed64 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 14:18:50 +0100 Subject: More work on time management. --- .../advanced_topics/scripting.md | 2 +- .../DispersionThresholdIdentification.py | 3 + src/argaze.test/GazeFeatures.py | 43 +++++------- src/argaze.test/PupillFeatures.py | 60 ++++++----------- src/argaze/ArFeatures.py | 26 +++++--- src/argaze/DataFeatures.py | 46 ++----------- src/argaze/GazeAnalysis/DeviationCircleCoverage.py | 2 +- .../DispersionThresholdIdentification.py | 10 +-- .../VelocityThresholdIdentification.py | 4 +- src/argaze/GazeFeatures.py | 26 +++++--- src/argaze/PupillAnalysis/WorkloadIndex.py | 44 +++++++----- src/argaze/PupillFeatures.py | 78 +++++++--------------- 12 files changed, 139 insertions(+), 205 deletions(-) diff --git a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md index 8c21dec..5999cbc 100644 --- a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md +++ b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md @@ -80,7 +80,7 @@ Calling [ArFrame.look](../../../argaze.md/#argaze.ArFeatures.ArFrame.look) metho ... ar_frame.last_gaze_position # Check if a gaze movement has been identified - if ar_frame.last_gaze_movement.valid and ar_frame.last_gaze_movement.finished: + if ar_frame.last_gaze_movement and ar_frame.last_gaze_movement.finished: # Do something with identified fixation if GazeFeatures.is_fixation(ar_frame.last_gaze_movement): diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index 156f6f1..311f31b 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -113,6 +113,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions) + # DEBUG + print(gaze_movement_identifier) + # Check result size self.assertEqual(len(ts_fixations), 1) self.assertEqual(len(ts_saccades), 0) diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index c6ccfca..e678093 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -31,6 +31,9 @@ def random_gaze_positions(size, frame_dimension: tuple[float, float] = (1, 1)): # Edit gaze position random_gaze_position = GazeFeatures.GazePosition((random.random() * frame_dimension[0], random.random() * frame_dimension[1]), precision=5) + # DEBUG + print('random_gaze_position', type(random_gaze_position), random_gaze_position.__class__.__bases__) + # Timestamp gaze position random_gaze_position.timestamp = time.time() @@ -375,13 +378,11 @@ def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)): for i in range(size): - fixation = TestFixation(random_gaze_positions(10, frame_dimension)) - ts, _ = fixation.first - scan_path.append_fixation(ts, fixation) + fixation = TestFixation(random_gaze_positions(10, frame_dimension), timestamp=i) + scan_path.append_fixation(fixation) - saccade = TestSaccade(random_gaze_positions(2, frame_dimension)) - ts, _ = saccade.first - scan_path.append_saccade(ts, saccade) + saccade = TestSaccade(random_gaze_positions(2, frame_dimension), timestamp=i+1) + scan_path.append_saccade(saccade) return scan_path @@ -404,9 +405,7 @@ class TestScanPathClass(unittest.TestCase): # Append a saccade that should be ignored saccade = TestSaccade(random_gaze_positions(2)) - ts = saccade[0].timestamp - - new_step = scan_path.append_saccade(ts, saccade) + new_step = scan_path.append_saccade(saccade) # Check that no scan step have been created yet self.assertEqual(len(scan_path), 0) @@ -415,9 +414,7 @@ class TestScanPathClass(unittest.TestCase): # Append first fixation fixation_A = TestFixation(random_gaze_positions(10)) - ts = fixation_A[0].timestamp - - new_step = scan_path.append_fixation(ts, fixation_A) + new_step = scan_path.append_fixation(fixation_A) # Check that no scan step have been created yet self.assertEqual(len(scan_path), 0) @@ -426,9 +423,7 @@ class TestScanPathClass(unittest.TestCase): # Append consecutive saccade saccade_A = TestSaccade(random_gaze_positions(2)) - ts = saccade_A[0].timestamp - - new_step_A = scan_path.append_saccade(ts, saccade_A) + new_step_A = scan_path.append_saccade(saccade_A) # Check that new scan step have been created self.assertEqual(len(scan_path), 1) @@ -439,9 +434,7 @@ class TestScanPathClass(unittest.TestCase): # Append 2 consecutive fixations then a saccade fixation_B1 = TestFixation(random_gaze_positions(10)) - ts = fixation_B1[0].timestamp - - new_step = scan_path.append_fixation(ts, fixation_B1) + new_step = scan_path.append_fixation(fixation_B1) # Check that no scan step have been created yet self.assertEqual(len(scan_path), 1) @@ -449,9 +442,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(new_step, None) fixation_B2 = TestFixation(random_gaze_positions(10)) - ts = fixation_B2[0].timestamp - - new_step = scan_path.append_fixation(ts, fixation_B2) + new_step = scan_path.append_fixation(fixation_B2) # Check that no scan step have been created yet self.assertEqual(len(scan_path), 1) @@ -459,9 +450,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(new_step, None) saccade_B = TestSaccade(random_gaze_positions(2)) - ts = saccade_B[0].timestamp - - new_step_B = scan_path.append_saccade(ts, saccade_B) + new_step_B = scan_path.append_saccade(saccade_B) # Check that new scan step have been created self.assertEqual(len(scan_path), 2) @@ -521,12 +510,12 @@ def build_aoi_scan_path(expected_aoi, aoi_path): # Append a hidden last step to allow last given step creation aoi_path.append(aoi_path[-2]) - for aoi in aoi_path: + for i, aoi in enumerate(aoi_path): - fixation = TestFixation(random_gaze_positions(10)) + fixation = TestFixation(random_gaze_positions(10), timestamp=i) aoi_scan_path.append_fixation(fixation, aoi) - saccade = TestSaccade(random_gaze_positions(2)) + saccade = TestSaccade(random_gaze_positions(2), timestamp=i+1) aoi_scan_path.append_saccade(saccade) return aoi_scan_path diff --git a/src/argaze.test/PupillFeatures.py b/src/argaze.test/PupillFeatures.py index f0e8e1b..9cf26eb 100644 --- a/src/argaze.test/PupillFeatures.py +++ b/src/argaze.test/PupillFeatures.py @@ -8,6 +8,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" import unittest +import math from argaze import PupillFeatures @@ -43,14 +44,12 @@ class TestPupillDiameterClass(unittest.TestCase): # Check empty PupillDiameter empty_pupill_diameter = PupillFeatures.PupillDiameter() - self.assertEqual(empty_pupill_diameter.value, 0.) - self.assertEqual(empty_pupill_diameter.valid, False) + self.assertEqual(empty_pupill_diameter, math.nan) # Check float PupillDiameter float_pupill_diameter = PupillFeatures.PupillDiameter(1.23) - self.assertEqual(float_pupill_diameter.value, 1.23) - self.assertEqual(float_pupill_diameter.valid, True) + self.assertEqual(float_pupill_diameter, 1.23) def test_properties(self): """Test PupillDiameter properties cannot be modified after creation.""" @@ -60,32 +59,16 @@ class TestPupillDiameterClass(unittest.TestCase): # Check that pupill diameter value setting fails with self.assertRaises(AttributeError): - pupill_diameter.value = 123 + pupill_diameter = 123 - self.assertNotEqual(pupill_diameter.value, 123) - self.assertEqual(pupill_diameter.value, 0.) + self.assertNotEqual(pupill_diameter, 123) + self.assertEqual(pupill_diameter, math.nan) def test___repr__(self): """Test PupillDiameter string representation.""" # Check empty PupillDiameter representation - self.assertEqual(repr(PupillFeatures.PupillDiameter()), "{\"value\": 0.0}") - -class TestUnvalidPupillDiameterClass(unittest.TestCase): - """Test UnvalidPupillDiameter class.""" - - def test_new(self): - """Test UnvalidPupillDiameter creation.""" - - unvalid_pupill_diameter = PupillFeatures.UnvalidPupillDiameter() - - self.assertEqual(unvalid_pupill_diameter.value, 0.) - self.assertEqual(unvalid_pupill_diameter.valid, False) - - def test___repr__(self): - """Test UnvalidPupillDiameter string representation.""" - - self.assertEqual(repr(PupillFeatures.UnvalidPupillDiameter()), "{\"message\": null, \"value\": 0.0}") + self.assertEqual(repr(PupillFeatures.PupillDiameter()), "{\"value\": NaN}") class TestTimeStampedPupillDiametersClass(unittest.TestCase): """Test TimeStampedPupillDiameters class.""" @@ -93,22 +76,23 @@ class TestTimeStampedPupillDiametersClass(unittest.TestCase): def test___setitem__(self): """Test __setitem__ method.""" - ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters() - ts_pupill_diameters[0] = PupillFeatures.PupillDiameter() - ts_pupill_diameters[1] = PupillFeatures.UnvalidPupillDiameter() - ts_pupill_diameters[2] = {"value": 1.23} + ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters([ + PupillFeatures.PupillDiameter(), + PupillFeatures.PupillDiameter(0.63), + {"value": 1.23} + ]) - # Check PupillDiameter is correctly stored and accessible as a PupillDiameter + # Check empty PupillDiameter is correctly stored and accessible as a PupillDiameter self.assertIsInstance(ts_pupill_diameters[0], PupillFeatures.PupillDiameter) - self.assertEqual(ts_pupill_diameters[0].valid, False) + self.assertEqual(ts_pupill_diameters[0], math.nan) - # Check UnvalidPupillDiameter is correctly stored and accessible as a UnvalidPupillDiameter - self.assertIsInstance(ts_pupill_diameters[1], PupillFeatures.UnvalidPupillDiameter) - self.assertEqual(ts_pupill_diameters[1].valid, False) + # Check PupillDiameter is correctly stored and accessible as a PupillDiameter + self.assertIsInstance(ts_pupill_diameters[1], PupillFeatures.PupillDiameter) + self.assertEqual(ts_pupill_diameters[0], 0.63) - # Check dict with "value" and "precision" keys is correctly stored and accessible as a PupillDiameter + # Check dict with "value" key is correctly stored and accessible as a PupillDiameter self.assertIsInstance(ts_pupill_diameters[2], PupillFeatures.PupillDiameter) - self.assertEqual(ts_pupill_diameters[2].valid, True) + self.assertEqual(ts_pupill_diameters[0], 1.23) # Check that bad data type insertion fails with self.assertRaises(AssertionError): @@ -125,11 +109,11 @@ class TestTimeStampedPupillDiametersClass(unittest.TestCase): ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters() - self.assertEqual(repr(PupillFeatures.TimeStampedPupillDiameters()), "{}") + self.assertEqual(repr(PupillFeatures.TimeStampedPupillDiameters()), "[]") - ts_pupill_diameters[0] = PupillFeatures.PupillDiameter() + ts_pupill_diameters.append(PupillFeatures.PupillDiameter()) - self.assertEqual(repr(ts_pupill_diameters), "{\"0\": {\"value\": 0.0}}") + self.assertEqual(repr(ts_pupill_diameters), "[{\"value\": NaN, \"timestamp\": 0}]") ts_pupill_diameters[0] = PupillFeatures.UnvalidPupillDiameter() diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index b3ecad6..8005d48 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -415,7 +415,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement) # Valid and finished gaze movement has been identified - if gaze_movement.valid and gaze_movement.finished: + if gaze_movement and gaze_movement.finished: if GazeFeatures.is_fixation(gaze_movement): @@ -908,22 +908,22 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Identify gaze movement if self.__gaze_movement_identifier is not None: - + # Identify finished gaze movement self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified - if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished: - + if self.__identified_gaze_movement and self.__identified_gaze_movement.finished: + if GazeFeatures.is_fixation(self.__identified_gaze_movement): - + # Append fixation to scan path if self.__scan_path is not None: - + self.__scan_path.append_fixation(self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): - + # Append saccade to scan path if self.__scan_path is not None: @@ -931,10 +931,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Is there a new step? if scan_step and len(self.__scan_path) > 1: - + # Analyze aoi scan path for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items(): - + scan_path_analyzer.analyze(timestamp, self.__scan_path) # Update scan path analyzed state @@ -1013,12 +1013,16 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Draw current fixation if required if draw_fixations is not None and self.__gaze_movement_identifier is not None: - self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) + if self.__gaze_movement_identifier.current_fixation: + + self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) # Draw current saccade if required if draw_saccades is not None and self.__gaze_movement_identifier is not None: - self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) + if self.__gaze_movement_identifier.current_saccade: + + self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) # Draw layers if required if draw_layers is not None: diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index ff9baec..8df991b 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -149,20 +149,15 @@ class TimestampedObject(): @timestamp.setter def timestamp(self, timestamp: int|float): """Set object timestamp.""" - - assert(type(timestamp) == int or type(timestamp) == float) - self._timestamp = timestamp def untimestamp(self): """Reset object timestamp.""" - self.timestamp = math.nan + self._timestamp = math.nan def is_timestamped(self) -> bool: """Is the object timestamped?""" - timestamped = not math.isnan(self.timestamp) - - return timestamped + return not math.isnan(self._timestamp) class TimestampedObjectsList(list): """Handle timestamped object into a list. @@ -188,7 +183,7 @@ class TimestampedObjectsList(list): def append(self, ts_object: TimestampedObjectType|dict): """Append timestamped object.""" - + # Convert dict into GazePosition if type(ts_object) == dict: @@ -200,7 +195,7 @@ class TimestampedObjectsList(list): if not issubclass(ts_object.__class__, self.__object_type): raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') - + assert(ts_object.is_timestamped()) super().append(ts_object) @@ -421,42 +416,15 @@ class TimestampedObjectsList(list): return legend_patches class SharedObject(TimestampedObject): - """Abstract class to enable multiple threads sharing and timestamp management.""" + """Abstract class to enable multiple threads sharing for timestamped object.""" - def __init__(self): + def __init__(self, timestamp: int|float = math.nan): - super().__init__() + TimestampedObject.__init__(self, timestamp) self._lock = threading.Lock() self._execution_times = {} self._exceptions = {} - @property - def lock(self) -> threading.Lock: - """Get shared object lock object.""" - return self._lock - - @property - def timestamp(self) -> int|float: - """Get shared object timestamp.""" - with self._lock: - return super().timestamp - - @timestamp.setter - def timestamp(self, timestamp: int|float): - """Set shared object timestamp.""" - with self._lock: - super().timestamp = timestamp - - def untimestamp(self): - """Reset shared object timestamp.""" - with self._lock: - self.timestamp = math.nan - - def is_timestamped(self) -> bool: - """Is the object timestamped?""" - with self._lock: - return super().is_timestamped() - class PipelineStepObject(): """ Define class to assess pipeline step methods execution time and observe them. diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py index 62b5e9a..3849d59 100644 --- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py +++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py @@ -107,7 +107,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__reset() - elif not gaze_movement.valid: + elif not gaze_movement: self.__reset() diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index 13529e7..2b89cf6 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -100,8 +100,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - start_position = self.positions[0] - last_position = self.positions[-1] + start_position = self[0] + last_position = self[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @@ -142,12 +142,12 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @DataFeatures.PipelineStepMethod def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: - + # Ignore empty gaze position if not gaze_position: return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() - + # Check if too much time elapsed since last valid gaze position if self.__valid_positions: @@ -171,7 +171,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Store gaze positions until a minimal duration self.__valid_positions.append(gaze_position) - + # Once the minimal duration is reached if self.__valid_positions.duration >= self.__duration_min_threshold: diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index c1d448a..a95905f 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -99,8 +99,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - start_position = self.positions[0] - last_position = self.positions[-1] + start_position = self[0] + last_position = self[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index eac9e5c..6a02142 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -34,13 +34,13 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): message: a string to describe why the the position is what it is. """ - def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, **kwargs): + def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan): return tuple.__new__(cls, position) - def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, **kwargs): + def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan): - DataFeatures.TimestampedObject.__init__(self, **kwargs) + DataFeatures.TimestampedObject.__init__(self, timestamp) self.__precision = precision self.__message = message @@ -178,7 +178,7 @@ TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeSt # Type definition for type annotation convenience class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): - """Handle timestamped gaze positions into a list""" + """Handle timestamped gaze positions into a list.""" def __init__(self, gaze_positions: list = []): @@ -188,6 +188,7 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): """Get all timestamped position values as list of tuple.""" return [tuple(ts_position) for ts_position in self] + ''' Is it still needed as there is a TimestampedObjectsList.from_json method? @classmethod def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType: """Create a TimeStampedGazePositionsType from .json file.""" @@ -197,6 +198,7 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): json_positions = json.load(ts_positions_file) return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions}) + ''' @classmethod def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> TimeStampedGazePositionsType: @@ -367,15 +369,15 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): message: a string to describe why the movement is what it is. """ - def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan): return TimeStampedGazePositions.__new__(cls, positions) - def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan): """Initialize GazeMovement""" TimeStampedGazePositions.__init__(self, positions) - DataFeatures.TimestampedObject.__init__(self, **kwargs) + DataFeatures.TimestampedObject.__init__(self, timestamp) self.__finished = finished self.__message = message @@ -385,9 +387,13 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): """Get first position timestamp.""" return self[0].timestamp + def is_timestamped(self) -> bool: + """If first position exist, the movement is timestamped.""" + return bool(self) + @timestamp.setter def timestamp(self, timestamp: int|float): - """Block gaze movment timestamp setting.""" + """Block gaze movement timestamp setting.""" raise('GazeMovement timestamp is first positon timestamp.') @property @@ -774,7 +780,7 @@ class ScanPath(list): self.__duration -= oldest_step.duration - def append_saccade(self, ts, saccade) -> ScanStepType: + def append_saccade(self, saccade) -> ScanStepType: """Append new saccade to scan path and return last new scan step if one have been created.""" # Ignore saccade if no fixation came before @@ -802,7 +808,7 @@ class ScanPath(list): # Clear last fixation self.__last_fixation = None - def append_fixation(self, ts, fixation): + def append_fixation(self, fixation): """Append new fixation to scan path. !!! warning Consecutives fixations are ignored keeping the last fixation""" diff --git a/src/argaze/PupillAnalysis/WorkloadIndex.py b/src/argaze/PupillAnalysis/WorkloadIndex.py index 1f3c586..f97dce3 100644 --- a/src/argaze/PupillAnalysis/WorkloadIndex.py +++ b/src/argaze/PupillAnalysis/WorkloadIndex.py @@ -15,51 +15,61 @@ from argaze import PupillFeatures import numpy -@dataclass class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer): - """Periodic average of pupill diameter variations to pupill diameter reference value.""" + """Periodic average of pupill diameter variations to pupill diameter reference value. - reference: PupillFeatures.PupillDiameter - """ """ + Parameters: + reference: base line value. + period: identification period length. + """ + def __init__(self, reference: PupillFeatures.PupillDiameter, period: int|float = 1): - period: int | float = field(default=1) - """Identification period length.""" + assert(not math.isnan(self.__reference)) - def __post_init__(self): - - assert(self.reference.valid) + self.__reference = reference + self.__period = period self.__variations_sum = 0. self.__variations_number = 0 self.__last_ts = 0 + @property + def reference(self) -> PupillFeatures.PupillDiameter: + """Get workload index reference.""" + return self.__reference + + @property + def period(self) -> int|float: + """Get workload index period.""" + return self.__period + @DataFeatures.PipelineStepMethod - def analyze(self, ts: int|float, pupill_diameter) -> float: + def analyze(self, pupill_diameter: PupillFeatures.PupillDiameter) -> float: """Analyze workload index from successive timestamped pupill diameters.""" # Ignore non valid pupill diameter - if not pupill_diameter.valid: + if not math.isnan(pupill_diameter): return None - if ts - self.__last_ts >= self.period: + if pupill_diameter.timestamp - self.__last_ts >= self.__period: - if self.__variations_number > 0 and self.reference.value > 0.: + if self.__variations_number > 0 and self.__reference.value > 0.: - workload_index = (self.__variations_sum / self.__variations_number) / self.reference.value + workload_index = (self.__variations_sum / self.__variations_number) / self.__reference.value else: workload_index = 0. - self.__variations_sum = pupill_diameter.value - self.reference.value + self.__variations_sum = pupill_diameter.value - self.__reference.value self.__variations_number = 1 - self.__last_ts = ts + self.__last_ts = pupill_diameter.timestamp return workload_index else: - self.__variations_sum += pupill_diameter.value - self.reference.value + self.__variations_sum += pupill_diameter.value - self.__reference.value self.__variations_number += 1 \ No newline at end of file diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupillFeatures.py index d8f9331..492e7ca 100644 --- a/src/argaze/PupillFeatures.py +++ b/src/argaze/PupillFeatures.py @@ -10,71 +10,41 @@ __license__ = "BSD" from typing import TypeVar from dataclasses import dataclass, field import json +import math from argaze import DataFeatures -@dataclass(frozen=True) -class PupillDiameter(): - """Define pupill diameter as ...""" - - value: float = field(default=0.) - """Pupill diameter value.""" - - @property - def valid(self) -> bool: - """Is the value not 0""" - - return self.value != 0. - - def __repr__(self): - """String representation""" +PupillDiameterType = TypeVar('PupillDiameter', bound="PupillDiameter") +# Type definition for type annotation convenience - return json.dumps(self, ensure_ascii = False, default=vars) +class PupillDiameter(float, DataFeatures.TimestampedObject): + """Define pupill diameter as a single float value. -class UnvalidPupillDiameter(PupillDiameter): - """Unvalid pupill diameter.""" + Parameters: + value: pupill diameter value. + """ + def __new__(cls, value: float = math.nan, **kwargs): - def __init__(self, message=None): + return float.__new__(cls, value) - self.message = message + def __init__(self, value: float = math.nan, **kwargs): - super().__init__(0.) + super().__init__(**kwargs) + @property + def value(self): + """Get pupill diameter value.""" + return float(self) + TimeStampedPupillDiametersType = TypeVar('TimeStampedPupillDiameters', bound="TimeStampedPupillDiameters") # Type definition for type annotation convenience -class TimeStampedPupillDiameters(DataFeatures.TimeStampedBuffer): - """Define timestamped buffer to store pupill diameters.""" - - def __setitem__(self, key, value: PupillDiameter|dict): - """Force PupillDiameter storage.""" - - # Convert dict into PupillDiameter - if type(value) == dict: - - assert(set(['value']).issubset(value.keys())) - - if 'message' in value.keys(): - - value = UnvalidPupillDiameter(value['message']) - - else: - - value = PupillDiameter(value['value']) - - assert(type(value) == PupillDiameter or type(value) == UnvalidPupillDiameter) - - super().__setitem__(key, value) - - @classmethod - def from_json(self, json_filepath: str) -> TimeStampedPupillDiametersType: - """Create a TimeStampedPupillDiametersType from .json file.""" - - with open(json_filepath, encoding='utf-8') as ts_buffer_file: +class TimeStampedPupillDiameters(DataFeatures.TimestampedObjectsList): + """Handle timestamped pupill diamters into a list.""" - json_buffer = json.load(ts_buffer_file) + def __init__(self, pupill_diameters: list = []): - return TimeStampedPupillDiameters({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) + DataFeatures.TimestampedObjectsList.__init__(self, PupillDiameter, pupill_diameters) TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") # Type definition for type annotation convenience @@ -83,7 +53,7 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a pupill diameter analyser.""" @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, pupill_diameter, float) -> float: + def analyze(self, pupill_diameter: PupillDiameterType) -> any: """Analyze pupill diameter from successive timestamped pupill diameters.""" raise NotImplementedError('analyze() method not implemented') @@ -96,9 +66,9 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): ts_analyzis = DataFeatures.TimeStampedBuffer() # Iterate on pupill diameters - for ts, pupill_diameter in ts_pupill_diameters.items(): + for pupill_diameter in ts_pupill_diameters: - analysis = self.analyze(ts, pupill_diameter) + analysis = self.analyze(pupill_diameter) if analysis is not None: -- cgit v1.1