From fdaaa5be7e99252cf0f25342b04485576a11bcea Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Mon, 30 Jan 2023 15:39:28 +0100 Subject: Adding a demo specific DispersionBasedGazeMovementIdentifier to show real time identification. --- .../DispersionBasedGazeMovementIdentifier.py | 111 ++++++++++++++++++++- 1 file changed, 109 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py index e3562a5..849223c 100644 --- a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py +++ b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import TypeVar, Tuple from dataclasses import dataclass, field import math @@ -7,6 +8,12 @@ from argaze import GazeFeatures import numpy +FixationType = TypeVar('Fixation', bound="Fixation") +# Type definition for type annotation convenience + +SaccadeType = TypeVar('Saccade', bound="Saccade") +# Type definition for type annotation convenience + @dataclass(frozen=True) class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" @@ -43,7 +50,7 @@ class Fixation(GazeFeatures.Fixation): # Update frozen deviation_max attribute object.__setattr__(self, 'deviation_max', max(deviations_array)) - def overlap(self, fixation) -> list: + def overlap(self, fixation) -> bool: """Does a gaze position from another fixation have a deviation to this fixation centroïd smaller than maximal deviation?""" points = fixation.positions.values() @@ -54,12 +61,14 @@ class Fixation(GazeFeatures.Fixation): return min(deviations_array) <= self.deviation_max - def merge(self, fixation) -> float: + def merge(self, fixation) -> FixationType: """Merge another fixation into this fixation.""" self.positions.append(fixation.positions) self.__post_init__() + return self + @dataclass(frozen=True) class Saccade(GazeFeatures.Saccade): """Define dispersion based saccade.""" @@ -346,3 +355,101 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Output last fixation if self.__last_fixation != None: yield self.__last_fixation + + +""""FOR QUICK DEMO PURPOSE""" +@dataclass +class TobiiDataStream_GazeMovementIdentifier(): + + deviation_max_threshold: int|float + """Maximal distance allowed to consider a gaze movement as a fixation.""" + + duration_min_threshold: int|float + """Minimal duration allowed to consider a gaze movement as a fixation. + It is also used as maximal duration allowed to consider a gaze movement as a saccade.""" + + def __post_init__(self): + + self.__valid_gaze_positions = GazeFeatures.TimeStampedGazePositions() + self.__current_fixation = None + self.__current_saccade = None + + def identify(self, ts, gaze_position): + + # Check if too much time elapsed since last gaze position + if len(self.__valid_gaze_positions) > 0: + + ts_last, _ = self.__valid_gaze_positions.last + + if (ts - ts_last) > self.duration_min_threshold: + + # Clear valid positions + self.__valid_gaze_positions = GazeFeatures.TimeStampedGazePositions() + + # Clear current movements + self.__current_fixation = None + self.__current_saccade = None + + # Store gaze positions until a minimal duration + self.__valid_gaze_positions[ts] = gaze_position + + first_ts, _ = self.__valid_gaze_positions.first + last_ts, _ = self.__valid_gaze_positions.last + + # Once the minimal duration is reached + if last_ts - first_ts >= self.duration_min_threshold: + + # Calculate the deviation of valid gaze positions + extended_fixation = Fixation(self.__valid_gaze_positions) + deviation = extended_fixation.deviation_max + + # Valid gaze positions deviation small enough + if deviation <= self.deviation_max_threshold: + + # Clear current saccade + # Question : Should we emit a SaccadeStop event? + self.__current_saccade = None + + # Start/extend current fixation + # Question : Should we emit a FixationStart event? + self.__current_fixation = extended_fixation + + # Valid gaze positions deviation too wide while extending fixation + elif self.__current_fixation != None: + + # Create saccade from last position of current fixation to current gaze position + # Question : Should we emit a SaccadeStart event? + saccade_positions = GazeFeatures.TimeStampedGazePositions() + last_ts, last_position = self.__current_fixation.positions.last + saccade_positions[last_ts] = last_position + saccade_positions[ts] = gaze_position + + self.__current_saccade = Saccade(saccade_positions) + + # Clear current fixation + # Question : Should we emit a FixationStop event? + self.__current_fixation = None + + # Reset valid positions with current gaze position + self.__valid_gaze_positions = GazeFeatures.TimeStampedGazePositions() + self.__valid_gaze_positions[ts] = gaze_position + + # Valid gaze positions deviation too wide with no current fixation + else: + + # Create saccade with valid gaze positions + self.__current_saccade = Saccade(self.__valid_gaze_positions) + + # Reset valid positions with current gaze position + self.__valid_gaze_positions = GazeFeatures.TimeStampedGazePositions() + self.__valid_gaze_positions[ts] = gaze_position + + @property + def current_fixation(self) -> FixationType: + + return self.__current_fixation + + @property + def current_saccade(self) -> SaccadeType: + + return self.__current_saccade -- cgit v1.1