#!/usr/bin/env python """Velocity threshold identification (I-VT) module.""" __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple from dataclasses import dataclass, field import math from argaze import GazeFeatures, DataFeatures import numpy import cv2 GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement") # Type definition for type annotation convenience FixationType = TypeVar('Fixation', bound="Fixation") # Type definition for type annotation convenience SaccadeType = TypeVar('Saccade', bound="Saccade") # Type definition for type annotation convenience @dataclass(frozen=True) class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" deviation_max: float = field(init=False) """Maximal gaze position distance to the centroïd.""" def __post_init__(self): super().__post_init__() points = self.positions.values() points_x, points_y = [p[0] for p in points], [p[1] for p in points] points_array = numpy.column_stack([points_x, points_y]) centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)]) deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) # Update frozen focus attribute using centroid object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1])) # Update frozen deviation_max attribute object.__setattr__(self, 'deviation_max', max(deviations_array)) def point_deviation(self, gaze_position) -> float: """Get distance of a point from the fixation's centroïd.""" return numpy.sqrt((self.centroid[0] - gaze_position.value[0])**2 + (self.centroid[1] - gaze_position.value[1])**2) def overlap(self, fixation) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" points = fixation.positions.values() points_x, points_y = [p[0] for p in points], [p[1] for p in points] points_array = numpy.column_stack([points_x, points_y]) centroid_array = numpy.array([self.centroid[0], self.centroid[1]]) deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) return min(deviations_array) <= self.deviation_max def merge(self, fixation) -> FixationType: """Merge another fixation into this fixation.""" self.positions.append(fixation.positions) self.__post_init__() return self def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None): """Draw fixation into image. Parameters: deviation_circle_color: color of circle representing fixation's deviation duration_border_color: color of border representing fixation's duration duration_factor: how many pixels per duration unit """ # Draw duration border if required if duration_border_color is not None: cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), duration_border_color, int(self.duration * duration_factor)) # Draw deviation circle if required if deviation_circle_color is not None: cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), deviation_circle_color, -1) # Draw positions if required if draw_positions is not None: self.draw_positions(image, **draw_positions) @dataclass(frozen=True) class Saccade(GazeFeatures.Saccade): """Define dispersion based saccade.""" def __post_init__(self): super().__post_init__() def draw(self, image: numpy.array, line_color: tuple = None): """Draw saccade into image. Parameters: line_color: color of line from first position to last position """ # Draw line if required if line_color is not None: _, start_position = self.positions.first _, last_position = self.positions.last cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @dataclass class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): """Implementation of the I-VT algorithm as described in: Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and saccades in eye-tracking protocols. In Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, 71-78. [http://dx.doi.org/10.1145/355017.355028](http://dx.doi.org/10.1145/355017.355028) """ velocity_max_threshold: int|float """Maximal velocity allowed to consider a gaze movement as a fixation.""" duration_min_threshold: int|float """Minimal duration allowed to wait valid gaze positions.""" def __post_init__(self): super().__init__() self.__last_ts = -1 self.__last_position = None self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() @DataFeatures.PipelineStepMethod def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType: # Ignore non valid gaze position if not gaze_position.valid: return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() # Store first valid position if self.__last_ts < 0: self.__last_ts = ts self.__last_position = gaze_position return GazeFeatures.GazeMovement() # Check if too much time elapsed since last gaze position if (ts - self.__last_ts) > self.duration_min_threshold: # Remember last position self.__last_ts = ts self.__last_position = gaze_position # Get last movement last_movement = self.current_gaze_movement.finish() # Clear all former gaze positions self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Return last valid movement if exist return last_movement # Velocity velocity = abs(gaze_position.distance(self.__last_position) / (ts - self.__last_ts)) # Remember last position self.__last_ts = ts self.__last_position = gaze_position # Velocity is greater than threshold if velocity > self.velocity_max_threshold: last_fixation = GazeFeatures.GazeMovement() # Does last fixation exist? if len(self.__fixation_positions) > 0: # Copy most recent fixation position into saccade positions last_ts, last_position = self.__fixation_positions.last self.__saccade_positions[last_ts] = last_position # Create last fixation last_fixation = self.current_fixation.finish() # Clear fixation positions self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() # Append to saccade positions self.__saccade_positions[ts] = gaze_position # Output last fixation return last_fixation if not terminate else self.current_saccade.finish() # Velocity is less or equals to threshold else: last_saccade = GazeFeatures.GazeMovement() # Does last saccade exist? if len(self.__saccade_positions) > 0: # Copy most recent saccade position into fixation positions last_ts, last_position = self.__saccade_positions.last self.__fixation_positions[last_ts] = last_position # Create last saccade last_saccade = self.current_saccade.finish() # Clear fixation positions self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Append to fixation positions self.__fixation_positions[ts] = gaze_position # Output last saccade return last_saccade if not terminate else self.current_fixation.finish() # Always return unvalid gaze movement at least return GazeFeatures.GazeMovement() @property def current_gaze_movement(self) -> GazeMovementType: # It shouldn't have a current fixation and a current saccade at the same time assert(not (len(self.__fixation_positions) > 0 and len(self.__saccade_positions) > 0)) if len(self.__fixation_positions) > 0: return Fixation(self.__fixation_positions) if len(self.__saccade_positions) > 0: return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least return GazeFeatures.GazeMovement() @property def current_fixation(self) -> FixationType: if len(self.__fixation_positions) > 0: return Fixation(self.__fixation_positions) # Always return unvalid gaze movement at least return GazeFeatures.GazeMovement() @property def current_saccade(self) -> SaccadeType: if len(self.__saccade_positions) > 0: return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least return GazeFeatures.GazeMovement()