#!/usr/bin/env python from typing import TypeVar, Tuple from dataclasses import dataclass, field import math from argaze import GazeFeatures import numpy FixationType = TypeVar('Fixation', bound="Fixation") # Type definition for type annotation convenience SaccadeType = TypeVar('Saccade', bound="Saccade") # Type definition for type annotation convenience @dataclass(frozen=True) class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" centroid: tuple = field(init=False) """Centroïd of all gaze positions belonging to the fixation.""" deviation_max: float = field(init=False) """Maximal gaze position distance to the centroïd.""" def __post_init__(self): super().__post_init__() self.update() def point_deviation(self, gaze_position) -> float: """Get distance of a point from the fixation's centroïd.""" return numpy.sqrt((self.centroid[0] - gaze_position.value[0])**2 + (self.centroid[1] - gaze_position.value[1])**2) def update(self): """Update fixation's centroïd then maximal gaze positions deviation from this centroïd.""" points = self.positions.values() points_x, points_y = [p[0] for p in points], [p[1] for p in points] points_array = numpy.column_stack([points_x, points_y]) centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)]) deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) # Update frozen centroid attribute object.__setattr__(self, 'centroid', (centroid_array[0], centroid_array[1])) # Update frozen deviation_max attribute object.__setattr__(self, 'deviation_max', max(deviations_array)) def overlap(self, fixation) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" points = fixation.positions.values() points_x, points_y = [p[0] for p in points], [p[1] for p in points] points_array = numpy.column_stack([points_x, points_y]) centroid_array = numpy.array([self.centroid[0], self.centroid[1]]) deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) return min(deviations_array) <= self.deviation_max def merge(self, fixation) -> FixationType: """Merge another fixation into this fixation.""" self.positions.append(fixation.positions) self.__post_init__() return self @dataclass(frozen=True) class Saccade(GazeFeatures.Saccade): """Define dispersion based saccade.""" def __post_init__(self): super().__post_init__() @dataclass class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): """Implementation of the I-DT algorithm as described in: Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and saccades in eye-tracking protocols. In Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, 71-78. [DOI=http://dx.doi.org/10.1145/355017.355028](DOI=http://dx.doi.org/10.1145/355017.355028) """ deviation_max_threshold: int|float """Maximal distance allowed to consider a gaze movement as a fixation.""" duration_min_threshold: int|float """Minimal duration allowed to consider a gaze movement as a fixation. It is also used as maximal duration allowed to consider a gaze movement as a saccade.""" def __post_init__(self): self.__valid_positions = GazeFeatures.TimeStampedGazePositions() self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() def identify(self, ts, gaze_position, terminate=False): """Identify gaze movement from successive timestamped gaze positions. The optional *terminate* argument allows to notify identification algorithm that given gaze position will be the last one. """ # Ignore non valid gaze position if not gaze_position.valid: return None if not terminate else self.current_fixation # Check if too much time elapsed since last gaze position if len(self.__valid_positions) > 0: ts_last, _ = self.__valid_positions.last if (ts - ts_last) > self.duration_min_threshold: # Clear all former gaze positions self.__valid_positions = GazeFeatures.TimeStampedGazePositions() self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Store gaze positions until a minimal duration self.__valid_positions[ts] = gaze_position first_ts, _ = self.__valid_positions.first last_ts, _ = self.__valid_positions.last # Once the minimal duration is reached if last_ts - first_ts >= self.duration_min_threshold: # Calculate the deviation of valid gaze positions deviation = Fixation(self.__valid_positions).deviation_max # Valid gaze positions deviation small enough if deviation <= self.deviation_max_threshold: # Store last saccade last_saccade = self.current_saccade # Clear saccade positions self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Copy valid gaze positions into fixation positions self.__fixation_positions = self.__valid_positions.copy() # Output last saccade return last_saccade if not terminate else self.current_fixation # Valid gaze positions deviation too wide while identifying fixation elif len(self.__fixation_positions) > 0: # Store last fixation last_fixation = self.current_fixation # Start saccade positions with current gaze position self.__saccade_positions[ts] = gaze_position # Clear fixation positions self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() # Clear valid positions self.__valid_positions = GazeFeatures.TimeStampedGazePositions() # Output last fixation return last_fixation if not terminate else self.current_saccade # Valid gaze positions deviation too wide while identifying saccade (or not) else: # Move oldest valid position into saccade positions first_ts, first_position = self.__valid_positions.pop_first() self.__saccade_positions[first_ts] = first_position @property def current_fixation(self) -> FixationType: if len(self.__fixation_positions) > 0: return Fixation(self.__fixation_positions) @property def current_saccade(self) -> SaccadeType: if len(self.__saccade_positions) > 0: return Saccade(self.__saccade_positions)