From 1a3aac125980019ae86493782795569327bc8eaa Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 11:09:32 +0100 Subject: Fixing VelocityThresholdIdentification tests. --- .../DispersionThresholdIdentification.py | 16 +-- .../VelocityThresholdIdentification.py | 123 ++++++++------------ .../DispersionThresholdIdentification.py | 18 +-- .../VelocityThresholdIdentification.py | 129 ++++++++++----------- 4 files changed, 124 insertions(+), 162 deletions(-) diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index 07496c3..156f6f1 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -359,13 +359,10 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) - # Get last ts to terminate identification on last gaze position - last_ts = ts_gaze_positions[-1].timestamp - # Iterate on gaze positions for gaze_position in ts_gaze_positions: - finished_gaze_movement = gaze_movement_identifier.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) + finished_gaze_movement = gaze_movement_identifier.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == ts_gaze_positions[-1].timestamp)) if GazeFeatures.is_fixation(finished_gaze_movement): @@ -382,13 +379,6 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(finished_gaze_movement.duration, max_time) self.assertLessEqual(finished_gaze_movement.finished, True) - # Check that last gaze position date is not equal to given gaze position date - if finished_gaze_movement: - - last_ts = finished_gaze_movement[-1].timestamp - - self.assertNotEqual(last_ts, gaze_position.timestamp) - # Check that last gaze position date of current fixation is equal to given gaze position date # NOTE: This is not true for saccade as, for I-DT, there is a minimal time window while the gaze movement is unknown current_gaze_movement = gaze_movement_identifier.current_gaze_movement @@ -396,9 +386,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): if GazeFeatures.is_fixation(current_gaze_movement): - last_ts = current_gaze_movement[-1].timestamp - - self.assertEqual(last_ts, gaze_position.timestamp) + self.assertEqual(current_gaze_movement[-1].timestamp, gaze_position.timestamp) def test_identification_generator(self): """Test DispersionThresholdIdentification identification using generator.""" diff --git a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py index 24f2e3c..262cfc0 100644 --- a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py @@ -17,8 +17,8 @@ from argaze.GazeAnalysis import VelocityThresholdIdentification import numpy -def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float, min_time: float, max_time: float, start_ts: float = 0., validity: list = []): - """ Generate N TimeStampedGazePositions strating from a starting position for testing purpose. +def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time: float, max_time: float, start_ts: float = 0., validity: list = []): + """ Generate N TimeStampedGazePsoitions dispersed around a center point for testing purpose. Timestamps are current time after random sleep (second). GazePositions are random values. """ @@ -26,8 +26,6 @@ def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float, start_time = time.time() - last_valid_position = start_position - for i in range(0, size): # Sleep a random time @@ -43,21 +41,19 @@ def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float, if valid: # Edit gaze position - random_x = last_valid_position[0] + deviation_max * (random.random() - 0.5) / math.sqrt(2) - random_y = last_valid_position[1] + deviation_max * (random.random() - 0.5) / math.sqrt(2) - + random_x = center[0] + deviation_max * (random.random() - 0.5) / math.sqrt(2) + random_y = center[1] + deviation_max * (random.random() - 0.5) / math.sqrt(2) gaze_position = GazeFeatures.GazePosition((random_x, random_y)) - # Remember last valid gaze position - last_valid_position = gaze_position.value - else: gaze_position = GazeFeatures.GazePosition() + # Timestamp gaze position + gaze_position.timestamp = time.time() - start_time + start_ts + # Store gaze position - ts = time.time() - start_time + start_ts - ts_gaze_positions[ts] = gaze_position + ts_gaze_positions.append(gaze_position) return ts_gaze_positions @@ -93,9 +89,11 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl gaze_position = GazeFeatures.GazePosition() + # Timestamp gaze position + gaze_position.timestamp = time.time() - start_time + start_ts + # Store gaze position - ts = time.time() - start_time + start_ts - ts_gaze_positions[ts] = gaze_position + ts_gaze_positions.append(gaze_position) return ts_gaze_positions @@ -122,9 +120,9 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), size - 1) # Check fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size - 1) + self.assertEqual(len(fixation), size - 1) self.assertGreaterEqual(fixation.duration, (size - 2) * min_time) self.assertLessEqual(fixation.duration, (size - 2) * max_time) self.assertLessEqual(fixation.finished, True) @@ -141,9 +139,9 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): velocity_max = deviation_max / min_time ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B gaze_movement_identifier = VelocityThresholdIdentification.GazeMovementIdentifier(velocity_max_threshold=velocity_max, duration_min_threshold=max_time*2) ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions) @@ -154,42 +152,36 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), size * 2 - 1) # Check first fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size - 1) + self.assertEqual(len(fixation), size - 1) self.assertGreaterEqual(fixation.duration, (size - 2) * min_time) self.assertLessEqual(fixation.duration, (size - 2) * max_time) self.assertLessEqual(fixation.finished, True) # Check first saccade - ts, saccade = ts_saccades.pop_first() + saccade = ts_saccades.pop(0) - self.assertEqual(len(saccade.positions.keys()), 2) + self.assertEqual(len(saccade), 2) self.assertGreaterEqual(saccade.duration, min_time) self.assertLessEqual(saccade.duration, max_time) self.assertLessEqual(saccade.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = fixation.positions.last - first_ts, first_position = saccade.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) + self.assertEqual(fixation[-1].value, saccade[0].value) # Check second fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size) + self.assertEqual(len(fixation), size) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) self.assertLessEqual(fixation.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = saccade.positions.last - first_ts, first_position = fixation.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp) + self.assertEqual(saccade[-1].value, fixation[0].value) def test_fixation_and_short_saccade_identification(self): """Test VelocityThresholdIdentification fixation and saccade identification.""" @@ -205,10 +197,10 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): velocity_max = deviation_max / min_time ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A.last[0]) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions.last[0]) + ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A[-1].timestamp) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_move_positions).append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_move_positions + ts_gaze_positions_B gaze_movement_identifier = VelocityThresholdIdentification.GazeMovementIdentifier(velocity_max_threshold=velocity_max, duration_min_threshold=max_time*2) ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions) @@ -219,42 +211,36 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), 2 * size + move - 1) # Check first fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size - 1) # BUG: NOT ALWAYS TRUE !!! + self.assertEqual(len(fixation), size - 1) # BUG: NOT ALWAYS TRUE !!! self.assertGreaterEqual(fixation.duration, (size - 2) * min_time) self.assertLessEqual(fixation.duration, (size - 2) * max_time) self.assertLessEqual(fixation.finished, True) # Check first saccade - ts, saccade = ts_saccades.pop_first() + saccade = ts_saccades.pop(0) - self.assertEqual(len(saccade.positions.keys()), move + 2) + self.assertEqual(len(saccade), move + 2) self.assertGreaterEqual(saccade.duration, (move + 1) * min_time) self.assertLessEqual(saccade.duration, (move + 1) * max_time) self.assertLessEqual(saccade.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = fixation.positions.last - first_ts, first_position = saccade.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) + self.assertEqual(fixation[-1].value, saccade[0].value) # Check second fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size) + self.assertEqual(len(fixation), size) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) self.assertLessEqual(fixation.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = saccade.positions.last - first_ts, first_position = fixation.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(saccade[-1], fixation[0]) + self.assertEqual(saccade[-1].value, fixation[0].value) def test_invalid_gaze_position(self): """Test VelocityThresholdIdentification fixation and saccade identification with invalid gaze position.""" @@ -278,17 +264,17 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), len(validity)-5) # Check first fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), 6) + self.assertEqual(len(fixation), 6) self.assertGreaterEqual(fixation.duration, 5 * min_time) self.assertLessEqual(fixation.duration, 5 * max_time) self.assertLessEqual(fixation.finished, True) # Check second fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), 4) + self.assertEqual(len(fixation), 4) self.assertGreaterEqual(fixation.duration, 3 * min_time) self.assertLessEqual(fixation.duration, 3 * max_time) self.assertLessEqual(fixation.finished, True) @@ -305,34 +291,27 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): velocity_max = deviation_max / min_time ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B gaze_movement_identifier = VelocityThresholdIdentification.GazeMovementIdentifier(velocity_max_threshold=velocity_max, duration_min_threshold=max_time*2) - - # Get last ts to terminate identification on last gaze position - last_ts, _ = ts_gaze_positions.last # Iterate on gaze positions - for ts, gaze_position in ts_gaze_positions.items(): + for gaze_position in ts_gaze_positions: - finished_gaze_movement = gaze_movement_identifier.identify(ts, gaze_position, terminate=(ts == last_ts)) + finished_gaze_movement = gaze_movement_identifier.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == ts_gaze_positions[-1])) # Check that last gaze position date is not equal to given gaze position date - if finished_gaze_movement.valid: + if finished_gaze_movement: - last_ts, _ = finished_gaze_movement.positions.last - - self.assertNotEqual(last_ts, ts) + self.assertNotEqual(finished_gaze_movement[-1].timestamp, gaze_position.timestamp) # Check that last gaze position date of current movement is equal to given gaze position date current_gaze_movement = gaze_movement_identifier.current_gaze_movement - if current_gaze_movement.valid: - - last_ts, _ = current_gaze_movement.positions.last + if current_gaze_movement: - self.assertEqual(last_ts, ts) + self.assertEqual(current_gaze_movement[-1].timestamp, gaze_position.timestamp) if __name__ == '__main__': diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index c85e576..13529e7 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -100,8 +100,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - _, start_position = self.positions[0] - _, last_position = self.positions[-1] + start_position = self.positions[0] + last_position = self.positions[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @@ -143,13 +143,13 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @DataFeatures.PipelineStepMethod def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: - # Ignore non valid gaze position + # Ignore empty gaze position if not gaze_position: return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() # Check if too much time elapsed since last valid gaze position - if len(self.__valid_positions) > 0: + if self.__valid_positions: ts_last = self.__valid_positions[-1].timestamp @@ -184,7 +184,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): last_saccade = GazeFeatures.GazeMovement() # Is there saccade positions? - if len(self.__saccade_positions) > 0: + if self.__saccade_positions: # Copy oldest valid position into saccade positions self.__saccade_positions.append(self.__valid_positions[0]) @@ -207,7 +207,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): last_fixation = GazeFeatures.GazeMovement() # Is there fixation positions? - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: # Copy most recent fixation position into saccade positions self.__saccade_positions.append(self.__fixation_positions[-1]) @@ -237,9 +237,9 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): def current_gaze_movement(self) -> GazeMovementType: # It shouldn't have a current fixation and a current saccade at the same time - assert(not (len(self.__fixation_positions) > 0 and len(self.__saccade_positions) > 0)) + assert(not (self.__fixation_positions and len(self.__saccade_positions) > 1)) - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: return Fixation(self.__fixation_positions) @@ -252,7 +252,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @property def current_fixation(self) -> FixationType: - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: return Fixation(self.__fixation_positions) diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index 971ba9b..c1d448a 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -25,53 +25,39 @@ FixationType = TypeVar('Fixation', bound="Fixation") SaccadeType = TypeVar('Saccade', bound="Saccade") # Type definition for type annotation convenience -@dataclass(frozen=True) class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" - deviation_max: float = field(init=False) - """Maximal gaze position distance to the centroïd.""" + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): - def __post_init__(self): + super().__init__(positions, finished, message, **kwargs) - super().__post_init__() + if positions: - points = self.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)]) - deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) + positions_array = numpy.asarray(self.values()) + centroid = numpy.mean(positions_array, axis=0) + deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) - # Update frozen focus attribute using centroid - object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1])) + # Set focus as positions centroid + self.focus = (centroid[0], centroid[1]) - # Update frozen deviation_max attribute - object.__setattr__(self, 'deviation_max', max(deviations_array)) + # Set deviation_max attribute + self.__deviation_max = deviations_array.max() - def point_deviation(self, gaze_position) -> float: - """Get distance of a point from the fixation's centroïd.""" - - return numpy.sqrt((self.centroid[0] - gaze_position.value[0])**2 + (self.centroid[1] - gaze_position.value[1])**2) + @property + def deviation_max(self): + """Get fixation's maximal deviation.""" + return self.__deviation_max - def overlap(self, fixation) -> bool: + def overlap(self, fixation: FixationType) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" - - points = fixation.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([self.centroid[0], self.centroid[1]]) - deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) + + positions_array = numpy.asarray(fixation.values()) + centroid = numpy.mean(self.focus, axis=0) + deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) return min(deviations_array) <= self.deviation_max - - def merge(self, fixation) -> FixationType: - """Merge another fixation into this fixation.""" - - self.positions.append(fixation.positions) - self.__post_init__() - - return self - + def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None): """Draw fixation into image. @@ -85,7 +71,7 @@ class Fixation(GazeFeatures.Fixation): if duration_border_color is not None: cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), duration_border_color, int(self.duration * duration_factor)) - + # Draw deviation circle if required if deviation_circle_color is not None: @@ -96,12 +82,12 @@ class Fixation(GazeFeatures.Fixation): self.draw_positions(image, **draw_positions) -@dataclass(frozen=True) class Saccade(GazeFeatures.Saccade): """Define dispersion based saccade.""" - def __post_init__(self): - super().__post_init__() + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): + + super().__init__(positions, finished, message, **kwargs) def draw(self, image: numpy.array, line_color: tuple = None): """Draw saccade into image. @@ -113,8 +99,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - _, start_position = self.positions.first - _, last_position = self.positions.last + start_position = self.positions[0] + last_position = self.positions[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @@ -126,45 +112,56 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): saccades in eye-tracking protocols. In Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, 71-78. [http://dx.doi.org/10.1145/355017.355028](http://dx.doi.org/10.1145/355017.355028) - """ - - velocity_max_threshold: int|float - """Maximal velocity allowed to consider a gaze movement as a fixation.""" - duration_min_threshold: int|float - """Minimal duration allowed to wait valid gaze positions.""" + Parameters: + velocity_max_threshold: Maximal velocity allowed to consider a gaze movement as a fixation. + duration_min_threshold: Minimal duration allowed to wait valid gaze positions. + """ - def __post_init__(self): + def __init__(self, velocity_max_threshold: int|float, duration_min_threshold: int|float): super().__init__() + self.__velocity_max_threshold = velocity_max_threshold + self.__duration_min_threshold = duration_min_threshold + self.__last_ts = -1 self.__last_position = None self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() + @property + def velocity_max_threshold(self): + """Get identifier's velocity max threshold.""" + return self.__velocity_max_threshold + + @property + def duration_min_threshold(self): + """Get identifier duration min threshold.""" + return self.__duration_min_threshold + @DataFeatures.PipelineStepMethod - def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType: + def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: - # Ignore non valid gaze position - if not gaze_position.valid: + # Ignore empty gaze position + if not gaze_position: return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() # Store first valid position if self.__last_ts < 0: - self.__last_ts = ts + self.__last_ts = timestamp self.__last_position = gaze_position return GazeFeatures.GazeMovement() # Check if too much time elapsed since last gaze position - if (ts - self.__last_ts) > self.duration_min_threshold: + if (timestamp - self.__last_ts) > self.duration_min_threshold: # Remember last position - self.__last_ts = ts + self.__last_ts = timestamp self.__last_position = gaze_position # Get last movement @@ -178,10 +175,10 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return last_movement # Velocity - velocity = abs(gaze_position.distance(self.__last_position) / (ts - self.__last_ts)) + velocity = abs(gaze_position.distance(self.__last_position) / (timestamp - self.__last_ts)) # Remember last position - self.__last_ts = ts + self.__last_ts = timestamp self.__last_position = gaze_position # Velocity is greater than threshold @@ -193,8 +190,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): if len(self.__fixation_positions) > 0: # Copy most recent fixation position into saccade positions - last_ts, last_position = self.__fixation_positions.last - self.__saccade_positions[last_ts] = last_position + self.__saccade_positions.append(self.__fixation_positions[-1]) # Create last fixation last_fixation = self.current_fixation.finish() @@ -203,7 +199,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() # Append to saccade positions - self.__saccade_positions[ts] = gaze_position + self.__saccade_positions.append(gaze_position) # Output last fixation return last_fixation if not terminate else self.current_saccade.finish() @@ -214,11 +210,10 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): last_saccade = GazeFeatures.GazeMovement() # Does last saccade exist? - if len(self.__saccade_positions) > 0: + if self.__saccade_positions: # Copy most recent saccade position into fixation positions - last_ts, last_position = self.__saccade_positions.last - self.__fixation_positions[last_ts] = last_position + self.__fixation_positions.append(self.__saccade_positions[-1]) # Create last saccade last_saccade = self.current_saccade.finish() @@ -227,7 +222,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Append to fixation positions - self.__fixation_positions[ts] = gaze_position + self.__fixation_positions.append(gaze_position) # Output last saccade return last_saccade if not terminate else self.current_fixation.finish() @@ -239,13 +234,13 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): def current_gaze_movement(self) -> GazeMovementType: # It shouldn't have a current fixation and a current saccade at the same time - assert(not (len(self.__fixation_positions) > 0 and len(self.__saccade_positions) > 0)) + assert(not (self.__fixation_positions and self.__saccade_positions)) - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: return Fixation(self.__fixation_positions) - if len(self.__saccade_positions) > 0: + if len(self.__saccade_positions) > 1: return Saccade(self.__saccade_positions) @@ -255,7 +250,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @property def current_fixation(self) -> FixationType: - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: return Fixation(self.__fixation_positions) @@ -265,7 +260,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @property def current_saccade(self) -> SaccadeType: - if len(self.__saccade_positions) > 0: + if len(self.__saccade_positions) > 1: return Saccade(self.__saccade_positions) -- cgit v1.1