aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/argaze/GazeAnalysis/DispersionThresholdIdentification.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
new file mode 100644
index 0000000..cb29055
--- /dev/null
+++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
@@ -0,0 +1,189 @@
+#!/usr/bin/env python
+
+from typing import TypeVar, Tuple
+from dataclasses import dataclass, field
+import math
+
+from argaze import GazeFeatures
+
+import numpy
+
+FixationType = TypeVar('Fixation', bound="Fixation")
+# Type definition for type annotation convenience
+
+SaccadeType = TypeVar('Saccade', bound="Saccade")
+# Type definition for type annotation convenience
+
+@dataclass(frozen=True)
+class Fixation(GazeFeatures.Fixation):
+ """Define dispersion based fixation."""
+
+ centroid: tuple = field(init=False)
+ """Centroïd of all gaze positions belonging to the fixation."""
+
+ deviation_max: float = field(init=False)
+ """Maximal gaze position distance to the centroïd."""
+
+ def __post_init__(self):
+
+ super().__post_init__()
+
+ self.update()
+
+ def point_deviation(self, gaze_position) -> float:
+ """Get distance of a point from the fixation's centroïd."""
+
+ return numpy.sqrt((self.centroid[0] - gaze_position.value[0])**2 + (self.centroid[1] - gaze_position.value[1])**2)
+
+ def update(self):
+ """Update fixation's centroïd then maximal gaze positions deviation from this centroïd."""
+
+ points = self.positions.values()
+ points_x, points_y = [p[0] for p in points], [p[1] for p in points]
+ points_array = numpy.column_stack([points_x, points_y])
+ centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)])
+ deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))
+
+ # Update frozen centroid attribute
+ object.__setattr__(self, 'centroid', (centroid_array[0], centroid_array[1]))
+
+ # Update frozen deviation_max attribute
+ object.__setattr__(self, 'deviation_max', max(deviations_array))
+
+ def overlap(self, fixation) -> bool:
+ """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?"""
+
+ points = fixation.positions.values()
+ points_x, points_y = [p[0] for p in points], [p[1] for p in points]
+ points_array = numpy.column_stack([points_x, points_y])
+ centroid_array = numpy.array([self.centroid[0], self.centroid[1]])
+ deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))
+
+ return min(deviations_array) <= self.deviation_max
+
+ def merge(self, fixation) -> FixationType:
+ """Merge another fixation into this fixation."""
+
+ self.positions.append(fixation.positions)
+ self.__post_init__()
+
+ return self
+
+@dataclass(frozen=True)
+class Saccade(GazeFeatures.Saccade):
+ """Define dispersion based saccade."""
+
+ def __post_init__(self):
+ super().__post_init__()
+
+@dataclass
+class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
+ """Implementation of the I-DT algorithm as described in:
+
+ Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and
+ saccades in eye-tracking protocols. In Proceedings of the 2000 symposium
+ on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA,
+ 71-78. [DOI=http://dx.doi.org/10.1145/355017.355028](DOI=http://dx.doi.org/10.1145/355017.355028)
+ """
+
+ deviation_max_threshold: int|float
+ """Maximal distance allowed to consider a gaze movement as a fixation."""
+
+ duration_min_threshold: int|float
+ """Minimal duration allowed to consider a gaze movement as a fixation.
+ It is also used as maximal duration allowed to consider a gaze movement as a saccade."""
+
+ def __post_init__(self):
+
+ self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
+ self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
+ self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
+
+ def identify(self, ts, gaze_position, terminate=False):
+ """Identify gaze movement from successive timestamped gaze positions.
+
+ The optional *terminate* argument allows to notify identification algorithm that given gaze position will be the last one.
+ """
+
+ # Ignore non valid gaze position
+ if not gaze_position.valid:
+
+ return None if not terminate else self.current_fixation
+
+ # Check if too much time elapsed since last gaze position
+ if len(self.__valid_positions) > 0:
+
+ ts_last, _ = self.__valid_positions.last
+
+ if (ts - ts_last) > self.duration_min_threshold:
+
+ # Clear all former gaze positions
+ self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
+ self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
+ self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
+
+ # Store gaze positions until a minimal duration
+ self.__valid_positions[ts] = gaze_position
+
+ first_ts, _ = self.__valid_positions.first
+ last_ts, _ = self.__valid_positions.last
+
+ # Once the minimal duration is reached
+ if last_ts - first_ts >= self.duration_min_threshold:
+
+ # Calculate the deviation of valid gaze positions
+ deviation = Fixation(self.__valid_positions).deviation_max
+
+ # Valid gaze positions deviation small enough
+ if deviation <= self.deviation_max_threshold:
+
+ # Store last saccade
+ last_saccade = self.current_saccade
+
+ # Clear saccade positions
+ self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
+
+ # Copy valid gaze positions into fixation positions
+ self.__fixation_positions = self.__valid_positions.copy()
+
+ # Output last saccade
+ return last_saccade if not terminate else self.current_fixation
+
+ # Valid gaze positions deviation too wide while identifying fixation
+ elif len(self.__fixation_positions) > 0:
+
+ # Store last fixation
+ last_fixation = self.current_fixation
+
+ # Start saccade positions with current gaze position
+ self.__saccade_positions[ts] = gaze_position
+
+ # Clear fixation positions
+ self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
+
+ # Clear valid positions
+ self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
+
+ # Output last fixation
+ return last_fixation if not terminate else self.current_saccade
+
+ # Valid gaze positions deviation too wide while identifying saccade (or not)
+ else:
+
+ # Move oldest valid position into saccade positions
+ first_ts, first_position = self.__valid_positions.pop_first()
+ self.__saccade_positions[first_ts] = first_position
+
+ @property
+ def current_fixation(self) -> FixationType:
+
+ if len(self.__fixation_positions) > 0:
+
+ return Fixation(self.__fixation_positions)
+
+ @property
+ def current_saccade(self) -> SaccadeType:
+
+ if len(self.__saccade_positions) > 0:
+
+ return Saccade(self.__saccade_positions)