From 96007cbe6a42d26c4dece35ad7ecee2ddd8bdade Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 28 Feb 2024 22:30:14 +0100 Subject: Making all GazeFeatures test working again. --- .../advanced_topics/scripting.md | 2 +- src/argaze.test/GazeFeatures.py | 46 ++---- src/argaze/ArFeatures.py | 14 +- src/argaze/DataFeatures.py | 69 ++++---- .../DispersionThresholdIdentification.py | 14 +- .../VelocityThresholdIdentification.py | 16 +- src/argaze/GazeFeatures.py | 174 ++++++++++----------- 7 files changed, 156 insertions(+), 179 deletions(-) diff --git a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md index 7952e9f..8c21dec 100644 --- a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md +++ b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md @@ -125,7 +125,7 @@ This is the last calibrated [GazePosition](../../../argaze.md/#argaze.GazeFeatur ### *ar_frame.last_gaze_movement* -Last [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement) identified by [ArFrame.gaze_movement_identifier](../../../argaze.md/#argaze.ArFeatures.ArFrame) object from incoming consecutive timestamped gaze positions. If no gaze movement have been identified, it returns an [UnvalidGazeMovement](../../../argaze.md/#argaze.GazeFeatures.UnvalidGazeMovement). +Last [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement) identified by [ArFrame.gaze_movement_identifier](../../../argaze.md/#argaze.ArFeatures.ArFrame) object from incoming consecutive timestamped gaze positions. If no gaze movement have been identified, it returns an empty [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement). This could also be the current gaze movement if [ArFrame.filter_in_progress_identification](../../../argaze.md/#argaze.ArFeatures.ArFrame) attribute is false. In that case, the last gaze movement *finished* flag is false. diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index 7d18976..c6ccfca 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -494,7 +494,7 @@ class TestAOIScanStepClass(unittest.TestCase): self.assertEqual(aoi_scan_step.first_fixation, fixation) self.assertEqual(aoi_scan_step.last_saccade, saccade) self.assertGreater(aoi_scan_step.duration, 0) - @unittest.skip("DEBUG") + def test_error(self): """Test AOIScanStep creation error.""" @@ -524,18 +524,16 @@ def build_aoi_scan_path(expected_aoi, aoi_path): for aoi in aoi_path: fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.first - aoi_scan_path.append_fixation(ts, fixation, aoi) + aoi_scan_path.append_fixation(fixation, aoi) saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.first - aoi_scan_path.append_saccade(ts, saccade) + aoi_scan_path.append_saccade(saccade) return aoi_scan_path class TestAOIScanPathClass(unittest.TestCase): """Test AOIScanPath class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test AOIScanPath creation.""" @@ -543,7 +541,7 @@ class TestAOIScanPathClass(unittest.TestCase): aoi_scan_path = GazeFeatures.AOIScanPath() self.assertEqual(len(aoi_scan_path), 0) - @unittest.skip("DEBUG") + def test_append(self): """Test AOIScanPath append methods.""" @@ -551,9 +549,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on A aoi fixation_A = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_A.first - - new_step = aoi_scan_path.append_fixation(ts, fixation_A, 'Foo') + new_step = aoi_scan_path.append_fixation(fixation_A, 'Foo') # Check that no aoi scan step have been created yet self.assertEqual(len(aoi_scan_path), 0) @@ -562,9 +558,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append saccade saccade_A = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade_A.first - - new_step = aoi_scan_path.append_saccade(ts, saccade_A) + new_step = aoi_scan_path.append_saccade(saccade_A) # Check that no aoi scan step have been created yet self.assertEqual(len(aoi_scan_path), 0) @@ -573,9 +567,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on B aoi fixation_B = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_B.first - - new_step_A = aoi_scan_path.append_fixation(ts, fixation_B, 'Bar') + new_step_A = aoi_scan_path.append_fixation(fixation_B, 'Bar') # Check a first aoi scan step have been created once a new fixation is appened self.assertEqual(len(aoi_scan_path), 1) @@ -584,14 +576,11 @@ class TestAOIScanPathClass(unittest.TestCase): self.assertEqual(new_step_A.aoi, 'Foo') self.assertEqual(new_step_A.letter, 'A') - first_ts, _ = fixation_A.first - last_ts, _ = saccade_A.last - - self.assertEqual(new_step_A.duration, last_ts - first_ts) + self.assertEqual(new_step_A.duration, saccade_A[-1].timestamp - fixation_A[0].timestamp) # Check letter affectation self.assertEqual(aoi_scan_path.get_letter_aoi('A'), 'Foo') - @unittest.skip("DEBUG") + def test_append_error(self): """Test AOIScanPath append error.""" @@ -599,9 +588,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on A aoi fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.first - - new_step = aoi_scan_path.append_fixation(ts, fixation, 'Foo') + new_step = aoi_scan_path.append_fixation(fixation, 'Foo') # Check that no aoi scan step have been created yet self.assertEqual(len(aoi_scan_path), 0) @@ -609,18 +596,17 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on B aoi fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.first # Check that aoi scan step creation fail when fixation is appened after another fixation with self.assertRaises(GazeFeatures.AOIScanStepError): - new_step = aoi_scan_path.append_fixation(ts, fixation, 'Bar') + new_step = aoi_scan_path.append_fixation(fixation, 'Bar') # Check that unexpected aoi scan step creation fail with self.assertRaises(GazeFeatures.AOIScanStepError): - new_step = aoi_scan_path.append_fixation(ts, fixation, 'Shu') - @unittest.skip("DEBUG") + new_step = aoi_scan_path.append_fixation(fixation, 'Shu') + def test_letter_index_and_string_reprentation(self): """Test AOIScanPath letter index and string representation feature.""" @@ -649,7 +635,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Check letter sequence representation self.assertEqual(aoi_scan_path.letter_sequence, 'ABCA') - @unittest.skip("DEBUG") + def test_transition_matrix(self): """Test AOIScanPath transition matrix feature.""" @@ -668,7 +654,7 @@ class TestAOIScanPathClass(unittest.TestCase): self.assertEqual(aoi_scan_path.transition_matrix['Shu']['Foo'], 0) self.assertEqual(aoi_scan_path.transition_matrix['Shu']['Bar'], 1) - @unittest.skip("DEBUG") + def test_transition_matrix(self): """Test AOIScanPath fixations count feature.""" diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 47a91e9..b3ecad6 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -123,7 +123,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers self.__draw_parameters = draw_parameters - self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() + self.__gaze_movement = GazeFeatures.GazeMovement() self.__looked_aoi_name = None self.__aoi_scan_path_analyzed = False @@ -423,7 +423,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # TODO: add an option to filter None looked_aoi_name or not if self.__aoi_scan_path is not None: - aoi_scan_step = self.__aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name) + aoi_scan_step = self.__aoi_scan_path.append_fixation(gaze_movement, self.__looked_aoi_name) # Is there a new step? if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1: @@ -441,7 +441,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Append saccade to aoi scan path if self.__aoi_scan_path is not None: - self.__aoi_scan_path.append_saccade(timestamp, gaze_movement) + self.__aoi_scan_path.append_saccade(gaze_movement) def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ @@ -532,7 +532,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__image_parameters = image_parameters self.__calibrated_gaze_position = GazeFeatures.GazePosition() - self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + self.__identified_gaze_movement = GazeFeatures.GazeMovement() self.__scan_path_analyzed = False # Edit pipeline step objects parent @@ -891,7 +891,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): with self._lock: # No gaze movement identified by default - self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + self.__identified_gaze_movement = GazeFeatures.GazeMovement() # Reset scan path analyzed state self.__scan_path_analyzed = False @@ -920,14 +920,14 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Append fixation to scan path if self.__scan_path is not None: - self.__scan_path.append_fixation(timestamp, self.__identified_gaze_movement) + self.__scan_path.append_fixation(self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): # Append saccade to scan path if self.__scan_path is not None: - scan_step = self.__scan_path.append_saccade(timestamp, self.__identified_gaze_movement) + scan_step = self.__scan_path.append_saccade(self.__identified_gaze_movement) # Is there a new step? if scan_step and len(self.__scan_path) > 1: diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 7c53c2a..ce3ce52 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -156,12 +156,25 @@ class TimeStampedObjectsList(list): # Check object type if type(ts_object) != self.__object_type: - raise TypeError(f'object type have to be {self.__object_type} not {type(ts_object)}') + if not issubclass(ts_object.__class__, self.__object_type): + + raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') assert(ts_object.is_timestamped()) super().append(ts_object) + @property + def duration(self): + """Get inferred duration from first and last timestamps.""" + if self: + + return self[-1].timestamp - self[0].timestamp + + else: + + return 0 + def timestamps(self): """Get all timestamps in list.""" return [ts_object.timestamp for ts_object in self] @@ -178,76 +191,66 @@ class TimeStampedObjectsList(list): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) - def pop_last_until(self, ts: TimeStampType) -> TimeStampedObjectType: + def pop_last_until(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp - earliest_ts, earliest_value = self.get_last_until(ts) + earliest_value = self.get_last_until(timestamp) - first_ts, first_value = self.first + while self[0].timestamp < earliest_value.timestamp: - while first_ts < earliest_ts: - self.pop_first() - first_ts, first_value = self.first + self.pop(0) - return first_ts, first_value + return self[0] - def pop_last_before(self, ts: TimeStampType) -> TimeStampedObjectType: + def pop_last_before(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp - earliest_ts, earliest_value = self.get_last_before(ts) + earliest_value = self.get_last_before(timestamp) + + poped_value = self.pop(0) - popep_ts, poped_value = self.pop_first() + while poped_value.timestamp != earliest_value.timestamp: - while popep_ts != earliest_ts: - popep_ts, poped_value = self.pop_first() + poped_value = self.pop(0) - return popep_ts, poped_value + return poped_value - def get_first_from(self, ts) -> TimeStampedObjectType: + def get_first_from(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Retreive first item timestamp from a given timestamp value.""" - ts_list = list(self.keys()) - first_from_index = bisect.bisect_left(ts_list, ts) + first_from_index = bisect.bisect_left(self.timestamps(), timestamp) if first_from_index < len(self): - first_from_ts = ts_list[first_from_index] - - return first_from_ts, self[first_from_ts] + return self[ts_list[first_from_index]] else: - raise KeyError(f'No data stored after {ts} timestamp.') + raise KeyError(f'No data stored after {timestamp} timestamp.') - def get_last_before(self, ts) -> TimeStampedObjectType: + def get_last_before(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Retreive last item timestamp before a given timestamp value.""" - ts_list = list(self.keys()) - last_before_index = bisect.bisect_left(ts_list, ts) - 1 + last_before_index = bisect.bisect_left(self.timestamps(), timestamp) - 1 if last_before_index >= 0: - last_before_ts = ts_list[last_before_index] - - return last_before_ts, self[last_before_ts] + return self[ts_list[last_before_index]] else: raise KeyError(f'No data stored before {ts} timestamp.') - def get_last_until(self, ts) -> TimeStampedObjectType: + def get_last_until(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Retreive last item timestamp until a given timestamp value.""" - ts_list = list(self.keys()) - last_until_index = bisect.bisect_right(ts_list, ts) - 1 + last_until_index = bisect.bisect_right(self.timestamps(), timestamp) - 1 if last_until_index >= 0: - last_until_ts = ts_list[last_until_index] - - return last_until_ts, self[last_until_ts] + return self[ts_list[last_until_index]] else: diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index f8e519f..f928c5a 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -146,7 +146,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Ignore non valid gaze position if not gaze_position: - return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish() + return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() # Check if too much time elapsed since last valid gaze position if len(self.__valid_positions) > 0: @@ -184,7 +184,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Valid gaze positions deviation small enough if deviation <= self.deviation_max_threshold: - last_saccade = GazeFeatures.UnvalidGazeMovement() + last_saccade = GazeFeatures.GazeMovement() # Is there saccade positions? if len(self.__saccade_positions) > 0: @@ -208,7 +208,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Valid gaze positions deviation too wide else: - last_fixation = GazeFeatures.UnvalidGazeMovement() + last_fixation = GazeFeatures.GazeMovement() # Is there fixation positions? if len(self.__fixation_positions) > 0: @@ -237,7 +237,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__saccade_positions.append(first_position) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_gaze_movement(self) -> GazeMovementType: @@ -254,7 +254,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_fixation(self) -> FixationType: @@ -263,7 +263,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Fixation(self.__fixation_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_saccade(self) -> SaccadeType: @@ -273,4 +273,4 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index d246db4..971ba9b 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -150,7 +150,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Ignore non valid gaze position if not gaze_position.valid: - return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish() + return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() # Store first valid position if self.__last_ts < 0: @@ -158,7 +158,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__last_ts = ts self.__last_position = gaze_position - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() # Check if too much time elapsed since last gaze position if (ts - self.__last_ts) > self.duration_min_threshold: @@ -187,7 +187,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Velocity is greater than threshold if velocity > self.velocity_max_threshold: - last_fixation = GazeFeatures.UnvalidGazeMovement() + last_fixation = GazeFeatures.GazeMovement() # Does last fixation exist? if len(self.__fixation_positions) > 0: @@ -211,7 +211,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Velocity is less or equals to threshold else: - last_saccade = GazeFeatures.UnvalidGazeMovement() + last_saccade = GazeFeatures.GazeMovement() # Does last saccade exist? if len(self.__saccade_positions) > 0: @@ -233,7 +233,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return last_saccade if not terminate else self.current_fixation.finish() # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_gaze_movement(self) -> GazeMovementType: @@ -250,7 +250,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_fixation(self) -> FixationType: @@ -260,7 +260,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Fixation(self.__fixation_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_saccade(self) -> SaccadeType: @@ -270,4 +270,4 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 54784ac..2f83703 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -180,7 +180,6 @@ TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeSt class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList): """Handle timestamped gaze positions into a list""" - #@UtilsFeatures.PrintCallStack def __init__(self, gaze_positions: list = []): DataFeatures.TimeStampedObjectsList.__init__(self, GazePosition, gaze_positions) @@ -407,17 +406,6 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): return self.__message @property - def duration(self): - """Get inferred duration from first and last timestamps.""" - if self: - - return self[-1].timestamp - self[0].timestamp - - else: - - return 0 - - @property def amplitude(self): """Get inferred amplitude from first and last positions.""" if self: @@ -525,9 +513,9 @@ TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeSt class TimeStampedGazeMovements(DataFeatures.TimeStampedObjectsList): """Handle timestamped gaze movements into a list""" - def __init__(self): + def __init__(self, gaze_movements: list = []): - super().__init__(GazeMovement) + DataFeatures.TimeStampedObjectsList.__init__(self, GazeMovement, gaze_movements) GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus") # Type definition for type annotation convenience @@ -619,62 +607,58 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): ts_status = TimeStampedGazeStatus() # Get last ts to terminate identification on last gaze position - last_ts, _ = ts_gaze_positions.last + last_ts = ts_gaze_positions[-1].timestamp # Iterate on gaze positions - for ts, gaze_position in ts_gaze_positions.items(): + for gaze_position in ts_gaze_positions: - finished_gaze_movement = self.identify(ts, gaze_position, terminate=(ts == last_ts)) + finished_gaze_movement = self.identify(ts, gaze_position, terminate=(gaze_position.timestamp == last_ts)) if is_fixation(finished_gaze_movement): - start_ts, start_position = finished_gaze_movement.positions.first - - ts_fixations[start_ts] = finished_gaze_movement + ts_fixations.append(finished_gaze_movement) # First gaze movement position is always shared with previous gaze movement - for ts, position in finished_gaze_movement.positions.items(): + for movement_position in finished_gaze_movement: - gaze_status = GazeStatus.from_position(position, 'Fixation', len(ts_fixations)) + gaze_status = GazeStatus.from_position(movement_position, 'Fixation', len(ts_fixations)) - if ts != start_ts: + if movement_position.timestamp != finished_gaze_movement.timestamp: - ts_status[ts] = [gaze_status] + ts_status.append([gaze_status]) else: try: - ts_status[start_ts].append(gaze_status) + ts_status[finished_gaze_movement.timestamp].append(gaze_status) except KeyError: - ts_status[start_ts] = [gaze_status] + ts_status[finished_gaze_movement.timestamp] = [gaze_status] elif is_saccade(finished_gaze_movement): - start_ts, start_position = finished_gaze_movement.positions.first - - ts_saccades[start_ts] = finished_gaze_movement + ts_saccades.append(finished_gaze_movement) # First gaze movement position is always shared with previous gaze movement - for ts, position in finished_gaze_movement.positions.items(): + for movement_position in finished_gaze_movement: gaze_status = GazeStatus.from_position(position, 'Saccade', len(ts_saccades)) - if ts != start_ts: + if movement_position.timestamp != finished_gaze_movement.timestamp: - ts_status[ts] = [gaze_status] + ts_status.append([gaze_status]) else: try: - ts_status[start_ts].append(gaze_status) + ts_status[finished_gaze_movement.timestamp].append(gaze_status) except KeyError: - ts_status[start_ts] = [gaze_status] + ts_status[finished_gaze_movement.timestamp] = [gaze_status] else: @@ -696,18 +680,16 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): assert(type(ts_gaze_positions) == TimeStampedGazePositions) # Get last ts to terminate identification on last gaze position - last_ts, _ = ts_gaze_positions.last + last_ts = ts_gaze_positions[-1] # Iterate on gaze positions - for ts, gaze_position in ts_gaze_positions.items(): - - finished_gaze_movement = self.identify(ts, gaze_position, terminate=(ts == last_ts)) + for gaze_position in ts_gaze_positions: - if finished_gaze_movement.valid: + finished_gaze_movement = self.identify(ts, gaze_position, terminate=(gaze_position.timestamp == last_ts)) - start_ts, start_position = finished_gaze_movement.positions.first + if finished_gaze_movement: - yield start_ts, finished_gaze_movement + yield finished_gaze_movement ScanStepType = TypeVar('ScanStep', bound="ScanStep") # Type definition for type annotation convenience @@ -719,34 +701,43 @@ class ScanStepError(Exception): super().__init__(message) -@dataclass(frozen=True) class ScanStep(): """Define a scan step as a fixation and a consecutive saccade. + + Parameters: + first_fixation: a fixation that comes before the next saccade. + last_saccade: a saccade that comes after the previous fixation. !!! warning - Scan step have to start by a fixation and then end by a saccade. """ - first_fixation: Fixation - """A fixation that comes before the next saccade.""" - - last_saccade: Saccade - """A saccade that comes after the previous fixation.""" + def __init__(self, first_fixation: Fixation, last_saccade: Saccade): - def __post_init__(self): + self.__first_fixation = first_fixation + self.__last_saccade = last_saccade # First movement have to be a fixation - if not is_fixation(self.first_fixation): + if not is_fixation(self.__first_fixation): raise ScanStepError('First step movement is not a fixation') # Last movement have to be a saccade - if not is_saccade(self.last_saccade): + if not is_saccade(self.__last_saccade): raise ScanStepError('Last step movement is not a saccade') @property + def first_fixation(self): + """Get scan step first fixation.""" + return self.__first_fixation + + @property + def last_saccade(self): + """Get scan step last saccade.""" + return self.__last_saccade + + @property def fixation_duration(self) -> int|float: """Time spent on AOI @@ -754,7 +745,7 @@ class ScanStep(): fixation duration """ - return self.first_fixation.duration + return self.__first_fixation.duration @property def duration(self) -> int|float: @@ -764,7 +755,7 @@ class ScanStep(): duration """ - return self.first_fixation.duration + self.last_saccade.duration + return self.__first_fixation.duration + self.__last_saccade.duration ScanPathType = TypeVar('ScanPathType', bound="ScanPathType") # Type definition for type annotation convenience @@ -937,24 +928,23 @@ class AOIScanStepError(Exception): self.aoi = aoi -@dataclass(frozen=True) class AOIScanStep(): """Define an aoi scan step as a set of successive gaze movements onto a same AOI. - !!! warning - - Aoi scan step have to start by a fixation and then end by a saccade.""" - - movements: TimeStampedGazeMovements - """All movements over an AOI and the last saccade that comes out.""" + Parameters: + movements: all movements over an AOI and the last saccade that comes out. + aoi: AOI name + letter: AOI unique letter to ease sequence analysis. - aoi: str = field(default='') - """AOI name.""" + !!! warning + Aoi scan step have to start by a fixation and then end by a saccade. + """ - letter: str = field(default='') - """AOI unique letter to ease sequence analysis.""" + def __init__(self, movements: TimeStampedGazeMovements, aoi: str = '', letter: str = ''): - def __post_init__(self): + self.__movements = movements + self.__aoi = aoi + self.__letter = letter # First movement have to be a fixation if not is_fixation(self.first_fixation): @@ -967,18 +957,29 @@ class AOIScanStep(): raise AOIScanStepError('Last step movement is not a saccade', self.aoi) @property + def movements(self): + """Get AOI scan step movements.""" + return self.__movements + + @property + def aoi(self): + """Get AOI scan step aoi.""" + return self.__aoi + + @property + def letter(self): + """Get AOI scan step letter.""" + return self.__letter + + @property def first_fixation(self): """First fixation on AOI.""" - - _, first_movement = self.movements.first - return first_movement + return self.movements[0] @property def last_saccade(self): """Last saccade that comes out AOI.""" - - _, last_movement = self.movements.last - return last_movement + return self.movements[-1] @property def fixation_duration(self) -> int|float: @@ -987,14 +988,7 @@ class AOIScanStep(): Returns: fixation duration """ - - # Timestamp of first position of first fixation - first_ts, _ = self.first_fixation.positions.first - - # Timestamp of first position of last saccade - last_ts, _ = self.last_saccade.positions.first - - return last_ts - first_ts + return self.last_saccade[0].timestamp - self.first_fixation[0].timestamp @property def duration(self) -> int|float: @@ -1003,14 +997,7 @@ class AOIScanStep(): Returns: duration """ - - # Timestamp of first position of first fixation - first_ts, _ = self.first_fixation.positions.first - - # Timestamp of last position of last saccade - last_ts, _ = self.last_saccade.positions.last - - return last_ts - first_ts + return self.last_saccade[-1].timestamp - self.first_fixation[0].timestamp AOIScanPathType = TypeVar('AOIScanPathType', bound="AOIScanPathType") # Type definition for type annotation convenience @@ -1130,19 +1117,20 @@ class AOIScanPath(list): return self.__transition_matrix - def append_saccade(self, ts, saccade): + def append_saccade(self, saccade): """Append new saccade to aoi scan path.""" # Ignore saccade if no fixation have been stored before if len(self.__movements) > 0: - self.__movements[ts] = saccade + self.__movements.append(saccade) - def append_fixation(self, ts, fixation, looked_aoi: str) -> bool: + def append_fixation(self, fixation, looked_aoi: str) -> bool: """Append new fixation to aoi scan path and return last new aoi scan step if one have been created. !!! warning - It could raise AOIScanStepError""" + It could raise AOIScanStepError + """ # Replace None aoi by generic OutsideAOI name if looked_aoi is None: @@ -1192,14 +1180,14 @@ class AOIScanPath(list): self.__movements = TimeStampedGazeMovements() # Append new fixation - self.__movements[ts] = fixation + self.__movements.append(fixation) # Remember new aoi self.__current_aoi = looked_aoi else: # Append new fixation - self.__movements[ts] = fixation + self.__movements.append(fixation) # Remember aoi self.__current_aoi = looked_aoi -- cgit v1.1