aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/argaze/GazeAnalysis/VelocityThresholdIdentification.py248
1 files changed, 140 insertions, 108 deletions
diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
index 10a7ceb..57b8a28 100644
--- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
@@ -1,4 +1,4 @@
-"""Velocity threshold identification (I-VT) module."""
+"""Dispersion threshold identification (I-DT) module."""
"""
This program is free software: you can redistribute it and/or modify it under
@@ -17,35 +17,28 @@ __credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
-import cv2
-import numpy
+import logging
+import math
from argaze import GazeFeatures, DataFeatures
+import cv2
+import numpy
class Fixation(GazeFeatures.Fixation):
"""Define dispersion based fixation."""
- def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
-
- super().__init__(positions, finished, message, **kwargs)
+ def __init__(self, focus: tuple = (), velocity_max: float = math.nan, **kwargs):
- if positions:
+ super().__init__(**kwargs)
- positions_array = numpy.asarray(self.values())
- centroid = numpy.mean(positions_array, axis=0)
- deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
-
- # Set focus as positions centroid
- self.focus = (centroid[0], centroid[1])
-
- # Set deviation_max attribute
- self.__deviation_max = deviations_array.max()
+ self._focus = focus
+ self.__velocity_max = velocity_max
@property
- def deviation_max(self):
- """Get fixation's maximal deviation."""
- return self.__deviation_max
+ def velocity_max(self) -> float:
+ """Fixation's maximal velocity."""
+ return self.__velocity_max
def is_overlapping(self, fixation: GazeFeatures.Fixation) -> bool:
"""Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?"""
@@ -56,12 +49,13 @@ class Fixation(GazeFeatures.Fixation):
return min(deviations_array) <= self.deviation_max
- def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None):
+ def draw(self, image: numpy.array, amplitude_circle_color: tuple = None, duration_border_color: tuple = None,
+ duration_factor: float = 1., draw_positions: dict = None):
"""Draw fixation into image.
Parameters:
image: where to draw
- deviation_circle_color: color of circle representing fixation's deviation
+ amplitude_circle_color: color of circle representing fixation's amplitude
duration_border_color: color of border representing fixation's duration
duration_factor: how many pixels per duration unit
draw_positions:
@@ -70,12 +64,12 @@ class Fixation(GazeFeatures.Fixation):
# Draw duration border if required
if duration_border_color is not None:
- cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), duration_border_color, int(self.duration * duration_factor))
+ cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.amplitude), duration_border_color, int(self.duration * duration_factor))
- # Draw deviation circle if required
- if deviation_circle_color is not None:
+ # Draw amplitude circle if required
+ if amplitude_circle_color is not None:
- cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), deviation_circle_color, -1)
+ cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.amplitude), amplitude_circle_color, -1)
# Draw positions if required
if draw_positions is not None:
@@ -85,11 +79,11 @@ class Fixation(GazeFeatures.Fixation):
class Saccade(GazeFeatures.Saccade):
"""Define dispersion based saccade."""
- def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = None, **kwargs):
- super().__init__(positions, finished, message, **kwargs)
+ super().__init__(positions, **kwargs)
- def draw(self, image: numpy.array, line_color: tuple = None):
+ def draw(self, image: numpy.array, line_color: tuple = None, draw_positions: dict = None):
"""Draw saccade into image.
Parameters:
@@ -105,13 +99,18 @@ class Saccade(GazeFeatures.Saccade):
cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2)
+ # Draw positions if required
+ if draw_positions is not None:
+
+ self.draw_positions(image, **draw_positions)
+
class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
- """Implementation of the I-VT algorithm as described in:
-
- Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and
- saccades in eye-tracking protocols. In Proceedings of the 2000 symposium
- on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA,
- 71-78. [http://dx.doi.org/10.1145/355017.355028](http://dx.doi.org/10.1145/355017.355028)
+ """Implementation of the I-DT algorithm as described in:
+
+ **Dario D. Salvucci and Joseph H. Goldberg (2000).**
+ *Identifying fixations and saccades in eye-tracking protocols.*
+ Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA'00, 71-78).
+ [https://doi.org/10.1145/355017.355028](https://doi.org/10.1145/355017.355028)
"""
@DataFeatures.PipelineStepInit
@@ -123,11 +122,13 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
self.__velocity_max_threshold = 0
self.__duration_min_threshold = 0
- self.__last_ts = -1
- self.__last_position = None
+ self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
+
+ self.__centroid = ()
+ self.__velocities = []
- self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
- self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
+ self.__fixation = Fixation()
+ self.__saccade = Saccade()
@property
def velocity_max_threshold(self) -> int|float:
@@ -148,123 +149,154 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
def duration_min_threshold(self, duration_min_threshold: int|float):
self.__duration_min_threshold = duration_min_threshold
-
+
@DataFeatures.PipelineStepMethod
def identify(self, gaze_position, terminate=False) -> GazeFeatures.GazeMovement:
- # Ignore empty gaze position
- if not gaze_position:
+ # When too much time elapsed since the last valid gaze position
+ if self.__valid_positions:
- return GazeFeatures.GazeMovement() if not terminate else self.current_fixation().finish()
+ elapsed_time = gaze_position.timestamp - self.__valid_positions[-1].timestamp
- # Store first valid position
- if self.__last_ts < 0:
+ if elapsed_time > self.__duration_min_threshold:
- self.__last_ts = gaze_position.timestamp
- self.__last_position = gaze_position
+ try:
- return GazeFeatures.GazeMovement()
+ # Finish and return current gaze movement
+ return self.current_gaze_movement().finish()
- # Check if too much time elapsed since last gaze position
- if (gaze_position.timestamp - self.__last_ts) > self.duration_min_threshold:
+ finally:
- # Remember last position
- self.__last_ts = gaze_position.timestamp
- self.__last_position = gaze_position
+ # Reset valid gaze positions
+ self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
- # Get last movement
- last_movement = self.current_gaze_movement().finish()
+ # Reset centroid and velocities
+ self.__centroid = ()
+ self.__velocities = []
- # Clear all former gaze positions
- self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
- self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
-
- # Return last valid movement if exist
- return last_movement
+ # Clear gaze movements
+ self.__fixation = Fixation()
+ self.__saccade = Saccade()
- # Velocity
- velocity = abs(gaze_position.distance(self.__last_position) / (gaze_position.timestamp - self.__last_ts))
+ # Consider only valid gaze position
+ if gaze_position:
+
+ # Update centroid and velocities
+ if not self.__valid_positions:
- # Remember last position
- self.__last_ts = gaze_position.timestamp
- self.__last_position = gaze_position
+ self.__centroid = gaze_position
+ self.__velocities = []
- # Velocity is greater than threshold
- if velocity > self.velocity_max_threshold:
+ else:
- last_fixation = GazeFeatures.GazeMovement()
+ self.__centroid = self.__centroid + (gaze_position - self.__centroid) / (len(self.__valid_positions) + 1)
- # Does last fixation exist?
- if len(self.__fixation_positions) > 0:
+ if gaze_position.timestamp > self.__valid_positions[-1].timestamp:
- # Copy most recent fixation position into saccade positions
- self.__saccade_positions.append(self.__fixation_positions[-1])
+ delta_d = gaze_position.distance(self.__valid_positions[-1])
+ delta_t = (gaze_position.timestamp - self.__valid_positions[-1].timestamp)
+ self.__velocities.append( delta_d / delta_t )
- # Create last fixation
- last_fixation = self.current_fixation().finish()
+ else:
- # Clear fixation positions
- self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
+ self.__velocities.append( math.nan )
- # Append to saccade positions
- self.__saccade_positions.append(gaze_position)
+ # Store valid gaze position
+ self.__valid_positions.append(gaze_position)
- # Output last fixation
- return last_fixation if not terminate else self.current_saccade().finish()
+ # Once the minimal duration is reached
+ if self.__valid_positions.duration >= self.__duration_min_threshold:
- # Velocity is less or equals to threshold
- else:
+ velocity_max = max(self.__velocities)
- last_saccade = GazeFeatures.GazeMovement()
+ logging.debug("%s: velocity_max %f", DataFeatures.get_class_path(self), velocity_max)
- # Does last saccade exist?
- if self.__saccade_positions:
+ # Maximal velocity small enough
+ if velocity_max <= self.__velocity_max_threshold:
- # Copy most recent saccade position into fixation positions
- self.__fixation_positions.append(self.__saccade_positions[-1])
+ # Make valid gaze positions as current fixation
+ self.__fixation = Fixation(positions=self.__valid_positions, focus=self.__centroid, velocity_max=velocity_max)
- # Create last saccade
- last_saccade = self.current_saccade().finish()
+ # Is there a current saccade?
+ if self.__saccade:
- # Clear fixation positions
- self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
+ try:
- # Append to fixation positions
- self.__fixation_positions.append(gaze_position)
+ # Share first fixation position with current saccade
+ self.__saccade.append(self.__fixation[0])
- # Output last saccade
- return last_saccade if not terminate else self.current_fixation().finish()
+ # Finish and return the current saccade
+ return self.__saccade.finish()
+
+ # Clear saccade after the return
+ finally:
+
+ self.__saccade = Saccade()
+
+ # Maximal velocity too high
+ else:
+
+ # Is there a current fixation?
+ if self.__fixation:
+
+ try:
+
+ # Share last fixation position with current saccade
+ self.__saccade.append(self.__fixation[-1])
+
+ # Clear valid positions
+ self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
+
+ # Finish and return the current fixation
+ return self.__fixation.finish()
+
+ # Clear fixation after the return
+ finally:
+
+ self.__fixation = Fixation()
+
+ # No fixation case:
+
+ # Remove oldest valid position
+ old_gaze_position = self.__valid_positions.pop(0)
+
+ # Move oldest valid position into current saccade
+ self.__saccade.append(old_gaze_position)
+
+ # Update centroid and velocities
+ self.__centroid = self.__centroid - (old_gaze_position - self.__centroid) / (len(self.__valid_positions) + 1)
+ self.__velocities.pop(0)
+
+ # Return current gaze movement
+ return self.current_gaze_movement() if not terminate else self.current_gaze_movement().finish()
def current_gaze_movement(self) -> GazeFeatures.GazeMovement:
- # It shouldn't have a current fixation and a current saccade at the same time
- assert(not (self.__fixation_positions and self.__saccade_positions))
+ if self.__fixation:
- if self.__fixation_positions:
+ return self.__fixation
- return Fixation(self.__fixation_positions)
+ if len(self.__saccade) > 1:
- if len(self.__saccade_positions) > 1:
-
- return Saccade(self.__saccade_positions)
+ return self.__saccade
# Always return empty gaze movement at least
return GazeFeatures.GazeMovement()
-
+
def current_fixation(self) -> GazeFeatures.GazeMovement:
- if self.__fixation_positions:
+ if self.__fixation:
- return Fixation(self.__fixation_positions)
+ return self.__fixation
# Always return empty gaze movement at least
return GazeFeatures.GazeMovement()
def current_saccade(self) -> GazeFeatures.GazeMovement:
- if len(self.__saccade_positions) > 1:
-
- return Saccade(self.__saccade_positions)
+ if len(self.__saccade) > 1:
+
+ return self.__saccade
# Always return empty gaze movement at least
return GazeFeatures.GazeMovement()