"""Velocity threshold identification (I-VT) module. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import cv2 import numpy from argaze import GazeFeatures, DataFeatures class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): super().__init__(positions, finished, message, **kwargs) if positions: positions_array = numpy.asarray(self.values()) centroid = numpy.mean(positions_array, axis=0) deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) # Set focus as positions centroid self.focus = (centroid[0], centroid[1]) # Set deviation_max attribute self.__deviation_max = deviations_array.max() @property def deviation_max(self): """Get fixation's maximal deviation.""" return self.__deviation_max def is_overlapping(self, fixation: GazeFeatures.Fixation) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" positions_array = numpy.asarray(fixation.values()) centroid = numpy.mean(self.focus, axis=0) deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) return min(deviations_array) <= self.deviation_max def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None): """Draw fixation into image. Parameters: image: where to draw deviation_circle_color: color of circle representing fixation's deviation duration_border_color: color of border representing fixation's duration duration_factor: how many pixels per duration unit draw_positions: """ # Draw duration border if required if duration_border_color is not None: cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), duration_border_color, int(self.duration * duration_factor)) # Draw deviation circle if required if deviation_circle_color is not None: cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), deviation_circle_color, -1) # Draw positions if required if draw_positions is not None: self.draw_positions(image, **draw_positions) class Saccade(GazeFeatures.Saccade): """Define dispersion based saccade.""" def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): super().__init__(positions, finished, message, **kwargs) def draw(self, image: numpy.array, line_color: tuple = None): """Draw saccade into image. Parameters: image: where to draw line_color: color of line from first position to last position """ # Draw line if required if line_color is not None: start_position = self[0] last_position = self[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): """Implementation of the I-VT algorithm as described in: Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and saccades in eye-tracking protocols. In Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, 71-78. [http://dx.doi.org/10.1145/355017.355028](http://dx.doi.org/10.1145/355017.355028) """ @DataFeatures.PipelineStepInit def __init__(self, **kwargs): # Init GazeMovementIdentifier class super().__init__() self.__velocity_max_threshold = 0 self.__duration_min_threshold = 0 self.__last_ts = -1 self.__last_position = None self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() @property def velocity_max_threshold(self) -> int|float: """Maximal velocity allowed to consider a gaze movement as a fixation.""" return self.__velocity_max_threshold @velocity_max_threshold.setter def velocity_max_threshold(self, velocity_max_threshold: int|float): self.__velocity_max_threshold = velocity_max_threshold @property def duration_min_threshold(self) -> int|float: """Minimal duration allowed to wait valid gaze positions.""" return self.__duration_min_threshold @duration_min_threshold.setter def duration_min_threshold(self, duration_min_threshold: int|float): self.__duration_min_threshold = duration_min_threshold @DataFeatures.PipelineStepMethod def identify(self, gaze_position, terminate=False) -> GazeFeatures.GazeMovement: # Ignore empty gaze position if not gaze_position: return GazeFeatures.GazeMovement() if not terminate else self.current_fixation().finish() # Store first valid position if self.__last_ts < 0: self.__last_ts = gaze_position.timestamp self.__last_position = gaze_position return GazeFeatures.GazeMovement() # Check if too much time elapsed since last gaze position if (gaze_position.timestamp - self.__last_ts) > self.duration_min_threshold: # Remember last position self.__last_ts = gaze_position.timestamp self.__last_position = gaze_position # Get last movement last_movement = self.current_gaze_movement().finish() # Clear all former gaze positions self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Return last valid movement if exist return last_movement # Velocity velocity = abs(gaze_position.distance(self.__last_position) / (gaze_position.timestamp - self.__last_ts)) # Remember last position self.__last_ts = gaze_position.timestamp self.__last_position = gaze_position # Velocity is greater than threshold if velocity > self.velocity_max_threshold: last_fixation = GazeFeatures.GazeMovement() # Does last fixation exist? if len(self.__fixation_positions) > 0: # Copy most recent fixation position into saccade positions self.__saccade_positions.append(self.__fixation_positions[-1]) # Create last fixation last_fixation = self.current_fixation().finish() # Clear fixation positions self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() # Append to saccade positions self.__saccade_positions.append(gaze_position) # Output last fixation return last_fixation if not terminate else self.current_saccade().finish() # Velocity is less or equals to threshold else: last_saccade = GazeFeatures.GazeMovement() # Does last saccade exist? if self.__saccade_positions: # Copy most recent saccade position into fixation positions self.__fixation_positions.append(self.__saccade_positions[-1]) # Create last saccade last_saccade = self.current_saccade().finish() # Clear fixation positions self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Append to fixation positions self.__fixation_positions.append(gaze_position) # Output last saccade return last_saccade if not terminate else self.current_fixation().finish() def current_gaze_movement(self) -> GazeFeatures.GazeMovement: # It shouldn't have a current fixation and a current saccade at the same time assert(not (self.__fixation_positions and self.__saccade_positions)) if self.__fixation_positions: return Fixation(self.__fixation_positions) if len(self.__saccade_positions) > 1: return Saccade(self.__saccade_positions) # Always return empty gaze movement at least return GazeFeatures.GazeMovement() def current_fixation(self) -> GazeFeatures.GazeMovement: if self.__fixation_positions: return Fixation(self.__fixation_positions) # Always return empty gaze movement at least return GazeFeatures.GazeMovement() def current_saccade(self) -> GazeFeatures.GazeMovement: if len(self.__saccade_positions) > 1: return Saccade(self.__saccade_positions) # Always return empty gaze movement at least return GazeFeatures.GazeMovement()