From 49d6cf63d02b80d10c96cfe2edc6fd197558c899 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Mon, 10 Jul 2023 14:08:30 +0200 Subject: Improving and testing gaze movement identification. Now each movement have to share its last position with the next movement. --- .../DispersionThresholdIdentification.py | 122 +++++++++++++++------ .../DispersionThresholdIdentification.py | 55 ++++++---- src/argaze/GazeFeatures.py | 46 +++++++- 3 files changed, 164 insertions(+), 59 deletions(-) (limited to 'src') diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index f1d02d6..09eb58b 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -27,6 +27,10 @@ def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time for i in range(0, size): + # Sleep a random time + sleep_time = random.random() * (max_time - min_time) + min_time + time.sleep(sleep_time) + # Check position validity valid = True if len(validity) > i: @@ -48,10 +52,6 @@ def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time ts = time.time() - start_time + start_ts ts_gaze_positions[ts] = gaze_position - # Sleep a random time - sleep_time = random.random() * (max_time - min_time) + min_time - time.sleep(sleep_time) - return ts_gaze_positions def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: float, max_time: float, start_ts: float = 0., validity: list = []): @@ -65,6 +65,10 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl for i in range(0, size): + # Sleep a random time + sleep_time = random.random() * (max_time - min_time) + min_time + time.sleep(sleep_time) + # Check position validity valid = True if len(validity) > i: @@ -86,10 +90,6 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl ts = time.time() - start_time + start_ts ts_gaze_positions[ts] = gaze_position - # Sleep a random time - sleep_time = random.random() * (max_time - min_time) + min_time - time.sleep(sleep_time) - return ts_gaze_positions class TestDispersionThresholdIdentificationClass(unittest.TestCase): @@ -121,7 +121,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertGreaterEqual(fixation.duration, size * min_time) self.assertLessEqual(fixation.duration, size * max_time) self.assertLessEqual(fixation.finished, True) - + def test_fixation_and_direct_saccade_identification(self): """Test DispersionThresholdIdentification fixation and saccade identification.""" @@ -157,20 +157,20 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check first saccade ts, saccade = ts_saccades.pop_first() - self.assertEqual(len(saccade.positions.keys()), 1) - self.assertGreaterEqual(saccade.duration, 0.) - self.assertLessEqual(saccade.duration, 0.) + self.assertEqual(len(saccade.positions.keys()), 2) + self.assertGreaterEqual(saccade.duration, min_time) + self.assertLessEqual(saccade.duration, max_time + min_time) self.assertLessEqual(saccade.finished, True) # Check second fixation ts, fixation = ts_fixations.pop_first() - self.assertEqual(len(fixation.positions.keys()), size-1) + self.assertEqual(len(fixation.positions.keys()), size) self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, size * min_time) self.assertLessEqual(fixation.duration, size * max_time) self.assertLessEqual(fixation.finished, True) - + def test_fixation_and_short_saccade_identification(self): """Test DispersionThresholdIdentification fixation and saccade identification.""" @@ -209,9 +209,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check first saccade ts, saccade = ts_saccades.pop_first() - self.assertEqual(len(saccade.positions.keys()), move) + self.assertEqual(len(saccade.positions.keys()), move + 2) self.assertGreaterEqual(saccade.duration, min_time) - self.assertLessEqual(saccade.duration, max_time) + self.assertLessEqual(saccade.duration, max_time + 2 * min_time) self.assertLessEqual(saccade.finished, True) # Check second fixation @@ -222,7 +222,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertGreaterEqual(fixation.duration, size * min_time) self.assertLessEqual(fixation.duration, size * max_time) self.assertLessEqual(fixation.finished, True) - + def test_invalid_gaze_position(self): """Test DispersionThresholdIdentification fixation and saccade identification with invalid gaze position.""" @@ -260,7 +260,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertGreaterEqual(fixation.duration, 5 * min_time) self.assertLessEqual(fixation.duration, 5 * max_time) self.assertLessEqual(fixation.finished, True) - + def test_fixation_overlapping(self): """Test Fixation overlap function.""" @@ -321,6 +321,64 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.duration, 2 * size * max_time) self.assertLessEqual(fixation.finished, True) + def test_identification_browsing(self): + """Test DispersionThresholdIdentification identification browsing.""" + + size = 10 + center_A = (0, 0) + center_B = (50, 50) + deviation_max = 10 + min_time = 0.01 + max_time = 0.1 + + ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) + + ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) + + gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) + + # Get last ts to terminate identification on last gaze position + last_ts, _ = ts_gaze_positions.last + + # Iterate on gaze positions + for ts, gaze_position in ts_gaze_positions.items(): + + finished_gaze_movement = gaze_movement_identifier.identify(ts, gaze_position, terminate=(ts == last_ts)) + + if GazeFeatures.is_fixation(finished_gaze_movement): + + self.assertEqual(len(finished_gaze_movement.positions.keys()), size) + self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max) + self.assertGreaterEqual(finished_gaze_movement.duration, size * min_time) + self.assertLessEqual(finished_gaze_movement.duration, size * max_time) + self.assertLessEqual(finished_gaze_movement.finished, True) + + elif GazeFeatures.is_saccade(finished_gaze_movement): + + self.assertEqual(len(finished_gaze_movement.positions.keys()), 2) + self.assertGreaterEqual(finished_gaze_movement.duration, min_time) + self.assertLessEqual(finished_gaze_movement.duration, max_time + min_time) + self.assertLessEqual(finished_gaze_movement.finished, True) + + # Check that last gaze position date is not equal to given gaze position date + if finished_gaze_movement.valid: + + last_ts, _ = finished_gaze_movement.positions.last + + self.assertNotEqual(last_ts, ts) + + # Check that last gaze position date of current fixation is equal to given gaze position date + # NOTE: This is not true for saccade as there is a minimal time window while the gaze movement is unknown + current_gaze_movement = gaze_movement_identifier.current_gaze_movement + if current_gaze_movement.valid: + + if GazeFeatures.is_fixation(current_gaze_movement): + + last_ts, _ = current_gaze_movement.positions.last + + self.assertEqual(last_ts, ts) + def test_identification_generator(self): """Test DispersionThresholdIdentification identification using generator.""" @@ -337,27 +395,23 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) - - fixation_count = 0 - - for ts, gaze_movement in gaze_movement_identifier(ts_gaze_positions): - if GazeFeatures.is_fixation(gaze_movement): + for ts, finished_gaze_movement in gaze_movement_identifier(ts_gaze_positions): - self.assertEqual(len(gaze_movement.positions.keys()), size-fixation_count) - self.assertLessEqual(gaze_movement.deviation_max, deviation_max) - self.assertGreaterEqual(gaze_movement.duration, size * min_time) - self.assertLessEqual(gaze_movement.duration, size * max_time) - self.assertLessEqual(gaze_movement.finished, True) + if GazeFeatures.is_fixation(finished_gaze_movement): - fixation_count += 1 + self.assertEqual(len(finished_gaze_movement.positions.keys()), size) + self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max) + self.assertGreaterEqual(finished_gaze_movement.duration, size * min_time) + self.assertLessEqual(finished_gaze_movement.duration, size * max_time) + self.assertLessEqual(finished_gaze_movement.finished, True) - elif GazeFeatures.is_saccade(gaze_movement): + elif GazeFeatures.is_saccade(finished_gaze_movement): - self.assertEqual(len(gaze_movement.positions.keys()), 1) - self.assertGreaterEqual(gaze_movement.duration, 0.) - self.assertLessEqual(gaze_movement.duration, 0.) - self.assertLessEqual(gaze_movement.finished, True) + self.assertEqual(len(finished_gaze_movement.positions.keys()), 2) + self.assertGreaterEqual(finished_gaze_movement.duration, min_time) + self.assertLessEqual(finished_gaze_movement.duration, max_time + min_time) + self.assertLessEqual(finished_gaze_movement.finished, True) if __name__ == '__main__': diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index f29858a..ae7105e 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -126,13 +126,15 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish() - # Check if too much time elapsed since last gaze position + # Check if too much time elapsed since last valid gaze position if len(self.__valid_positions) > 0: ts_last, _ = self.__valid_positions.last if (ts - ts_last) > self.duration_min_threshold: + # What about last valid positions !? + # Get last movement last_movement = self.current_saccade.finish() if len(self.__fixation_positions) == 0 else self.current_fixation.finish() @@ -162,11 +164,20 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Valid gaze positions deviation small enough if deviation <= self.deviation_max_threshold: - # Store last saccade - last_saccade = self.current_saccade.finish() + last_saccade = GazeFeatures.UnvalidGazeMovement() - # Clear saccade positions - self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() + # Is there saccade positions? + if len(self.__saccade_positions) > 0: + + # Copy oldest valid position into saccade positions + first_ts, first_position = self.__valid_positions.first + self.__saccade_positions[first_ts] = first_position + + # Finish last saccade + last_saccade = self.current_saccade.finish() + + # Clear saccade positions + self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Copy valid gaze positions into fixation positions self.__fixation_positions = self.__valid_positions.copy() @@ -174,26 +185,32 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Output last saccade return last_saccade if not terminate else self.current_fixation.finish() - # Valid gaze positions deviation too wide while identifying fixation - elif len(self.__fixation_positions) > 0: + # Valid gaze positions deviation too wide + else: - # Store last fixation - last_fixation = self.current_fixation.finish() + last_fixation = GazeFeatures.UnvalidGazeMovement() - # Start saccade positions with current gaze position - self.__saccade_positions[ts] = gaze_position + # Is there fixation positions? + if len(self.__fixation_positions) > 0: - # Clear fixation positions - self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() + # Copy most recent fixation position into saccade positions + last_ts, last_position = self.__fixation_positions.last + self.__saccade_positions[last_ts] = last_position - # Clear valid positions - self.__valid_positions = GazeFeatures.TimeStampedGazePositions() + # Finish last fixation + last_fixation = self.current_fixation.finish() - # Output last fixation - return last_fixation if not terminate else self.current_saccade.finish() + # Clear fixation positions + self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() - # Valid gaze positions deviation too wide while identifying saccade (or not) - else: + # Clear valid positions + self.__valid_positions = GazeFeatures.TimeStampedGazePositions() + + # Store current gaze position + self.__valid_positions[ts] = gaze_position + + # Output last fixation + return last_fixation if not terminate else self.current_saccade.finish() # Move oldest valid position into saccade positions first_ts, first_position = self.__valid_positions.pop_first() diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index bb99cf3..4602218 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -401,12 +401,16 @@ class GazeMovementIdentifier(): """Abstract class to define what should provide a gaze movement identifier.""" def identify(self, ts, gaze_position, terminate=False) -> Tuple[GazeMovementType, GazeMovementType]: - """Identify gaze movement from successive timestamped gaze positions. + """Identify gaze movement from successive timestamped gaze positions. + Each identified gaze movement should share its first/last gaze position with previous/next gaze movement. - The optional *terminate* argument allows to notify identification algorithm that given gaze position will be the last one. + Parameters: + ts: + gaze_position: + terminate: allows to notify identification algorithm that given gaze position will be the last one. - Returns: - finished_gaze_movement: identified gaze movement once it is finished + Returns: + finished_gaze_movement: identified gaze movement once it is finished otherwise it returns unvalid gaze movement. """ raise NotImplementedError('identify() method not implemented') @@ -452,9 +456,24 @@ class GazeMovementIdentifier(): ts_fixations[start_ts] = finished_gaze_movement + # First gaze movement position is always shared with previous gaze movement for ts, position in finished_gaze_movement.positions.items(): - ts_status[ts] = GazeStatus.from_position(position, 'Fixation', len(ts_fixations)) + gaze_status = GazeStatus.from_position(position, 'Fixation', len(ts_fixations)) + + if ts != start_ts: + + ts_status[ts] = [gaze_status] + + else: + + try: + + ts_status[start_ts].append(gaze_status) + + except KeyError: + + ts_status[start_ts] = [gaze_status] elif is_saccade(finished_gaze_movement): @@ -462,9 +481,24 @@ class GazeMovementIdentifier(): ts_saccades[start_ts] = finished_gaze_movement + # First gaze movement position is always shared with previous gaze movement for ts, position in finished_gaze_movement.positions.items(): - ts_status[ts] = GazeStatus.from_position(position, 'Saccade', len(ts_saccades)) + gaze_status = GazeStatus.from_position(position, 'Saccade', len(ts_saccades)) + + if ts != start_ts: + + ts_status[ts] = [gaze_status] + + else: + + try: + + ts_status[start_ts].append(gaze_status) + + except KeyError: + + ts_status[start_ts] = [gaze_status] else: -- cgit v1.1