From 9e829bfef38e181372f96ac0497161e598b616ef Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 25 Apr 2024 15:30:21 +0200 Subject: Fixing VelocityThresholdIdentification. --- .../VelocityThresholdIdentification.py | 248 ++++++++++++--------- 1 file changed, 140 insertions(+), 108 deletions(-) diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index 10a7ceb..57b8a28 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -1,4 +1,4 @@ -"""Velocity threshold identification (I-VT) module.""" +"""Dispersion threshold identification (I-DT) module.""" """ This program is free software: you can redistribute it and/or modify it under @@ -17,35 +17,28 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -import cv2 -import numpy +import logging +import math from argaze import GazeFeatures, DataFeatures +import cv2 +import numpy class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" - def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): - - super().__init__(positions, finished, message, **kwargs) + def __init__(self, focus: tuple = (), velocity_max: float = math.nan, **kwargs): - if positions: + super().__init__(**kwargs) - positions_array = numpy.asarray(self.values()) - centroid = numpy.mean(positions_array, axis=0) - deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) - - # Set focus as positions centroid - self.focus = (centroid[0], centroid[1]) - - # Set deviation_max attribute - self.__deviation_max = deviations_array.max() + self._focus = focus + self.__velocity_max = velocity_max @property - def deviation_max(self): - """Get fixation's maximal deviation.""" - return self.__deviation_max + def velocity_max(self) -> float: + """Fixation's maximal velocity.""" + return self.__velocity_max def is_overlapping(self, fixation: GazeFeatures.Fixation) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" @@ -56,12 +49,13 @@ class Fixation(GazeFeatures.Fixation): return min(deviations_array) <= self.deviation_max - def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None): + def draw(self, image: numpy.array, amplitude_circle_color: tuple = None, duration_border_color: tuple = None, + duration_factor: float = 1., draw_positions: dict = None): """Draw fixation into image. Parameters: image: where to draw - deviation_circle_color: color of circle representing fixation's deviation + amplitude_circle_color: color of circle representing fixation's amplitude duration_border_color: color of border representing fixation's duration duration_factor: how many pixels per duration unit draw_positions: @@ -70,12 +64,12 @@ class Fixation(GazeFeatures.Fixation): # Draw duration border if required if duration_border_color is not None: - cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), duration_border_color, int(self.duration * duration_factor)) + cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.amplitude), duration_border_color, int(self.duration * duration_factor)) - # Draw deviation circle if required - if deviation_circle_color is not None: + # Draw amplitude circle if required + if amplitude_circle_color is not None: - cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), deviation_circle_color, -1) + cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.amplitude), amplitude_circle_color, -1) # Draw positions if required if draw_positions is not None: @@ -85,11 +79,11 @@ class Fixation(GazeFeatures.Fixation): class Saccade(GazeFeatures.Saccade): """Define dispersion based saccade.""" - def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = None, **kwargs): - super().__init__(positions, finished, message, **kwargs) + super().__init__(positions, **kwargs) - def draw(self, image: numpy.array, line_color: tuple = None): + def draw(self, image: numpy.array, line_color: tuple = None, draw_positions: dict = None): """Draw saccade into image. Parameters: @@ -105,13 +99,18 @@ class Saccade(GazeFeatures.Saccade): cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) + # Draw positions if required + if draw_positions is not None: + + self.draw_positions(image, **draw_positions) + class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): - """Implementation of the I-VT algorithm as described in: - - Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and - saccades in eye-tracking protocols. In Proceedings of the 2000 symposium - on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, - 71-78. [http://dx.doi.org/10.1145/355017.355028](http://dx.doi.org/10.1145/355017.355028) + """Implementation of the I-DT algorithm as described in: + + **Dario D. Salvucci and Joseph H. Goldberg (2000).** + *Identifying fixations and saccades in eye-tracking protocols.* + Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA'00, 71-78). + [https://doi.org/10.1145/355017.355028](https://doi.org/10.1145/355017.355028) """ @DataFeatures.PipelineStepInit @@ -123,11 +122,13 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__velocity_max_threshold = 0 self.__duration_min_threshold = 0 - self.__last_ts = -1 - self.__last_position = None + self.__valid_positions = GazeFeatures.TimeStampedGazePositions() + + self.__centroid = () + self.__velocities = [] - self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() - self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() + self.__fixation = Fixation() + self.__saccade = Saccade() @property def velocity_max_threshold(self) -> int|float: @@ -148,123 +149,154 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): def duration_min_threshold(self, duration_min_threshold: int|float): self.__duration_min_threshold = duration_min_threshold - + @DataFeatures.PipelineStepMethod def identify(self, gaze_position, terminate=False) -> GazeFeatures.GazeMovement: - # Ignore empty gaze position - if not gaze_position: + # When too much time elapsed since the last valid gaze position + if self.__valid_positions: - return GazeFeatures.GazeMovement() if not terminate else self.current_fixation().finish() + elapsed_time = gaze_position.timestamp - self.__valid_positions[-1].timestamp - # Store first valid position - if self.__last_ts < 0: + if elapsed_time > self.__duration_min_threshold: - self.__last_ts = gaze_position.timestamp - self.__last_position = gaze_position + try: - return GazeFeatures.GazeMovement() + # Finish and return current gaze movement + return self.current_gaze_movement().finish() - # Check if too much time elapsed since last gaze position - if (gaze_position.timestamp - self.__last_ts) > self.duration_min_threshold: + finally: - # Remember last position - self.__last_ts = gaze_position.timestamp - self.__last_position = gaze_position + # Reset valid gaze positions + self.__valid_positions = GazeFeatures.TimeStampedGazePositions() - # Get last movement - last_movement = self.current_gaze_movement().finish() + # Reset centroid and velocities + self.__centroid = () + self.__velocities = [] - # Clear all former gaze positions - self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() - self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() - - # Return last valid movement if exist - return last_movement + # Clear gaze movements + self.__fixation = Fixation() + self.__saccade = Saccade() - # Velocity - velocity = abs(gaze_position.distance(self.__last_position) / (gaze_position.timestamp - self.__last_ts)) + # Consider only valid gaze position + if gaze_position: + + # Update centroid and velocities + if not self.__valid_positions: - # Remember last position - self.__last_ts = gaze_position.timestamp - self.__last_position = gaze_position + self.__centroid = gaze_position + self.__velocities = [] - # Velocity is greater than threshold - if velocity > self.velocity_max_threshold: + else: - last_fixation = GazeFeatures.GazeMovement() + self.__centroid = self.__centroid + (gaze_position - self.__centroid) / (len(self.__valid_positions) + 1) - # Does last fixation exist? - if len(self.__fixation_positions) > 0: + if gaze_position.timestamp > self.__valid_positions[-1].timestamp: - # Copy most recent fixation position into saccade positions - self.__saccade_positions.append(self.__fixation_positions[-1]) + delta_d = gaze_position.distance(self.__valid_positions[-1]) + delta_t = (gaze_position.timestamp - self.__valid_positions[-1].timestamp) + self.__velocities.append( delta_d / delta_t ) - # Create last fixation - last_fixation = self.current_fixation().finish() + else: - # Clear fixation positions - self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() + self.__velocities.append( math.nan ) - # Append to saccade positions - self.__saccade_positions.append(gaze_position) + # Store valid gaze position + self.__valid_positions.append(gaze_position) - # Output last fixation - return last_fixation if not terminate else self.current_saccade().finish() + # Once the minimal duration is reached + if self.__valid_positions.duration >= self.__duration_min_threshold: - # Velocity is less or equals to threshold - else: + velocity_max = max(self.__velocities) - last_saccade = GazeFeatures.GazeMovement() + logging.debug("%s: velocity_max %f", DataFeatures.get_class_path(self), velocity_max) - # Does last saccade exist? - if self.__saccade_positions: + # Maximal velocity small enough + if velocity_max <= self.__velocity_max_threshold: - # Copy most recent saccade position into fixation positions - self.__fixation_positions.append(self.__saccade_positions[-1]) + # Make valid gaze positions as current fixation + self.__fixation = Fixation(positions=self.__valid_positions, focus=self.__centroid, velocity_max=velocity_max) - # Create last saccade - last_saccade = self.current_saccade().finish() + # Is there a current saccade? + if self.__saccade: - # Clear fixation positions - self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() + try: - # Append to fixation positions - self.__fixation_positions.append(gaze_position) + # Share first fixation position with current saccade + self.__saccade.append(self.__fixation[0]) - # Output last saccade - return last_saccade if not terminate else self.current_fixation().finish() + # Finish and return the current saccade + return self.__saccade.finish() + + # Clear saccade after the return + finally: + + self.__saccade = Saccade() + + # Maximal velocity too high + else: + + # Is there a current fixation? + if self.__fixation: + + try: + + # Share last fixation position with current saccade + self.__saccade.append(self.__fixation[-1]) + + # Clear valid positions + self.__valid_positions = GazeFeatures.TimeStampedGazePositions() + + # Finish and return the current fixation + return self.__fixation.finish() + + # Clear fixation after the return + finally: + + self.__fixation = Fixation() + + # No fixation case: + + # Remove oldest valid position + old_gaze_position = self.__valid_positions.pop(0) + + # Move oldest valid position into current saccade + self.__saccade.append(old_gaze_position) + + # Update centroid and velocities + self.__centroid = self.__centroid - (old_gaze_position - self.__centroid) / (len(self.__valid_positions) + 1) + self.__velocities.pop(0) + + # Return current gaze movement + return self.current_gaze_movement() if not terminate else self.current_gaze_movement().finish() def current_gaze_movement(self) -> GazeFeatures.GazeMovement: - # It shouldn't have a current fixation and a current saccade at the same time - assert(not (self.__fixation_positions and self.__saccade_positions)) + if self.__fixation: - if self.__fixation_positions: + return self.__fixation - return Fixation(self.__fixation_positions) + if len(self.__saccade) > 1: - if len(self.__saccade_positions) > 1: - - return Saccade(self.__saccade_positions) + return self.__saccade # Always return empty gaze movement at least return GazeFeatures.GazeMovement() - + def current_fixation(self) -> GazeFeatures.GazeMovement: - if self.__fixation_positions: + if self.__fixation: - return Fixation(self.__fixation_positions) + return self.__fixation # Always return empty gaze movement at least return GazeFeatures.GazeMovement() def current_saccade(self) -> GazeFeatures.GazeMovement: - if len(self.__saccade_positions) > 1: - - return Saccade(self.__saccade_positions) + if len(self.__saccade) > 1: + + return self.__saccade # Always return empty gaze movement at least return GazeFeatures.GazeMovement() -- cgit v1.1