From 9b1c0e0b6a134fd90163fefaba94b1f4ad87247c Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Mon, 20 Feb 2023 17:06:41 +0100 Subject: Changing GazeMovementIdentifier interface. --- .../DispersionBasedGazeMovementIdentifier.py | 279 +-------------------- src/argaze/GazeFeatures.py | 22 +- 2 files changed, 8 insertions(+), 293 deletions(-) (limited to 'src') diff --git a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py index 6b5ec09..07136f8 100644 --- a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py +++ b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py @@ -93,281 +93,6 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): """Minimal duration allowed to consider a gaze movement as a fixation. It is also used as maximal duration allowed to consider a gaze movement as a saccade.""" - def __iter__(self) -> GazeFeatures.GazeMovementType: - """GazeMovement identification generator.""" - - self.__last_fixation = None - unmatched_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - # While there are 2 gaze positions at least - while len(self.__ts_gaze_positions) >= 2: - - # Remove all unvalid gaze positions until to find a valid one - ts_current, gaze_position_current = self.__ts_gaze_positions.pop_first() - - while not gaze_position_current.valid and len(self.__ts_gaze_positions) > 0: - ts_current, gaze_position_current = self.__ts_gaze_positions.pop_first() - - # Prepare to select current and next valid gaze positions until a duration threshold - valid_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - # Output last fixation after too much unvalid positions - if self.__last_fixation != None: - - ts_last, gaze_position_last = self.__last_fixation.positions.last - - if (ts_current - ts_last) > self.duration_min_threshold: - - # Get last gaze position of the last fixation as it is it's out position - last_new_ts, last_new_position = self.__last_fixation.positions.pop_last() - - # Update last fixation - self.__last_fixation.update() - - # Append this last out position to the valid gaze positions selection - valid_gaze_positions[last_new_ts] = last_new_position - - yield self.__last_fixation - self.__last_fixation = None - - # Append current gaze position to valid gaze positions selection - valid_gaze_positions[ts_current] = gaze_position_current - - # Store unvalid gaze positions to count them - unvalid_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - # Keep detect of last valid timestamp - ts_last_valid = ts_current - - for ts_next, gaze_position_next in self.__ts_gaze_positions.items(): - - if (ts_next - ts_current) < self.duration_min_threshold: - - # Store valid position - if gaze_position_next.valid: - - valid_gaze_positions[ts_next] = gaze_position_next - - # Keep detect of last valid timestamp - ts_last_valid = ts_next - - # Store non valid position - else: - - unvalid_gaze_positions[ts_next] = gaze_position_next - - else: - - break - - # If there is at least 3 valid gaze positions selected: - # - 1 entering position - # - 1 staying position - # - 1 outing position (which may become a staying position after extension) - if len(valid_gaze_positions) >= 3: - - # Consider selected valid gaze positions as part of a maybe new fixation - new_fixation = Fixation(valid_gaze_positions) - - # Dispersion small enough: it is a fixation ! Try to extend it - if new_fixation.deviation_max <= self.deviation_max_threshold: - - # Remove valid and unvalid gaze positions as there as now stored in new fixation - # -1 as current gaze position have already been poped - for _ in range(len(valid_gaze_positions) + len(unvalid_gaze_positions) - 1): - self.__ts_gaze_positions.pop_first() - - # Copy new fixation positions before to try to extend them - extended_gaze_positions = new_fixation.positions.copy() - extended_fixation = new_fixation - - # Are next gaze positions not too dispersed ? - while len(self.__ts_gaze_positions) > 0: - - # Select and remove next gaze position - ts_next, gaze_position_next = self.__ts_gaze_positions.pop_first() - - # Consider only valid next position - if gaze_position_next.valid: - - # Get deviation of the nex gaze position from the new extended fixation - deviation_from_extended = extended_fixation.point_deviation(gaze_position_next) - - # Extend fixation anyway even with last out position: it will be popped later. - extended_gaze_positions[ts_next] = gaze_position_next - extended_fixation = Fixation(extended_gaze_positions) - - # Stop fixation extension - if deviation_from_extended > self.deviation_max_threshold: - - break - - # Check that consecutive unvalid gaze positions do not exceed fixation duration threshold - elif ts_next - ts_last_valid >= self.duration_min_threshold: - - break - - # Update new fixation - new_fixation = extended_fixation - - # Does a former fixation have been identified ? - if self.__last_fixation != None: - - # Remove last gaze position of the new fixation as it is it's out position - last_new_ts, last_new_position = new_fixation.positions.pop_last() - - # Edit inter fixations movement gaze positions - movement_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - # If such unmatched positions exist - if len(unmatched_gaze_positions) > 0: - - # Inter fixations movement should: - # - starts at last position of last fixation (this position is out so it have to be popped) - # - stops at the first position inside new fixation - start_movement_ts, start_position = self.__last_fixation.positions.pop_last() - stop_movement_ts, stop_position = new_fixation.positions.pop_first() - - # Edit first movement gaze position - movement_gaze_positions[start_movement_ts] = start_position - - # Edit movement positions with unmatched positions if there are between the 2 fixations - start_unmatched_ts, _ = unmatched_gaze_positions.first - end_unmatched_ts, _ = unmatched_gaze_positions.last - - # Does unmatched gaze positions happened between the last and the new fixation ? - if start_unmatched_ts > start_movement_ts and end_unmatched_ts < stop_movement_ts: - - # Append unmatched gaze positions to saccade - movement_gaze_positions.append(unmatched_gaze_positions) - - # Unmatched gaze positions happened before last fixation - else: - - # Ignore them: GazeMovementIdentifier have to output movements according their time apparition - pass - - # Edit last movement gaze position - movement_gaze_positions[stop_movement_ts] = stop_position - - # When there is no unmatched positions between 2 fixations (*rare case) - else: - - # the last fixation position is the same than the first position of the new fixation - stop_movement_ts, stop_position = self.__last_fixation.positions.pop_last() - start_movement_ts, start_position = self.__last_fixation.positions.pop_last() - - # Edit first movement gaze position - movement_gaze_positions[start_movement_ts] = start_position - - # Edit last movement gaze position - movement_gaze_positions[stop_movement_ts] = stop_position - - # Update last and new fixations - self.__last_fixation.update() - new_fixation.update() - - # Does new fixation overlap last fixation? - if self.__last_fixation.overlap(new_fixation): - - # (*rare case) - if len(unmatched_gaze_positions) == 0: - self.__last_fixation.positions[start_movement_ts] = start_position - self.__last_fixation.positions[stop_movement_ts] = stop_position - - # Merge new fixation into last fixation - self.__last_fixation.merge(new_fixation) - - # QUESTION: What to do if the time between the two fixations it very long ? - # It would be dangerous to set a timeout value as a fixation duration has no limit. - - # Forget new fixation - new_fixation = None - - # NOTE: Ignore inter fixations gaze positions: there was probably noisy positions. - - # Otherwise, - else: - - # Output last fixation - yield self.__last_fixation - - # New fixation becomes the last fixation to allow further merging - self.__last_fixation = new_fixation - - # Short time between fixations : this movement is a saccade - if stop_movement_ts - start_movement_ts <= self.duration_min_threshold: - - # Output saccade - yield Saccade(movement_gaze_positions) - - # Too much time between fixations: this movement is unknown - else: - - # Output unknown movement - yield GazeFeatures.GazeMovement(movement_gaze_positions) - - # Append out position to last fixation: it will be popped as start_position the next time - self.__last_fixation.positions[last_new_ts] = last_new_position - - # In any case, forget former unmatched gaze positions - unmatched_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - # First fixation is stored to allow further merging - # The movement before is outputed - else: - - self.__last_fixation = new_fixation - - # Is there unmatched gaze positions? - if len(unmatched_gaze_positions) > 0: - - start_movement_ts, _ = unmatched_gaze_positions.first - stop_movement_ts, _ = unmatched_gaze_positions.last - - # Short time between fixations : this movement is a saccade - if stop_movement_ts - start_movement_ts <= self.duration_min_threshold: - - # Output saccade - yield Saccade(unmatched_gaze_positions) - - # Too much time between fixations: this movement is unknown - else: - - # Output unknown movement - yield GazeFeatures.GazeMovement(unmatched_gaze_positions) - - # In any case, forget former unmatched gaze positions - unmatched_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - # Dispersion too wide: - # Current gaze position is not part of a fixation - else: - - unmatched_gaze_positions[ts_current] = gaze_position_current - - # Only one valid gaze position selected: - # Current gaze position is not part of a fixation - else: - - unmatched_gaze_positions[ts_current] = gaze_position_current - - # Output last fixation - if self.__last_fixation != None: - yield self.__last_fixation - - -""""FOR QUICK DEMO PURPOSE""" -@dataclass -class TobiiDataStream_GazeMovementIdentifier(): - - deviation_max_threshold: int|float - """Maximal distance allowed to consider a gaze movement as a fixation.""" - - duration_min_threshold: int|float - """Minimal duration allowed to consider a gaze movement as a fixation. - It is also used as maximal duration allowed to consider a gaze movement as a saccade.""" - def __post_init__(self): self.__valid_positions = GazeFeatures.TimeStampedGazePositions() @@ -377,8 +102,8 @@ class TobiiDataStream_GazeMovementIdentifier(): def identify(self, ts, gaze_position): # Ignore non valid gaze position - #if not gaze_position.valid: - # return + if not gaze_position.valid: + return # Check if too much time elapsed since last gaze position if len(self.__valid_positions) > 0: diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 2a0096c..40ce77b 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -236,22 +236,10 @@ class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer): class GazeMovementIdentifier(): """Abstract class to define what should provide a gaze movement identifier.""" - def __iter__(self) -> GazeMovementType: - raise NotImplementedError('__iter__() method not implemented') - - def __next__(self): - raise NotImplementedError('__next__() method not implemented') - - def __call__(self, ts_gaze_positions: TimeStampedGazePositions): - - assert(type(ts_gaze_positions) == TimeStampedGazePositions) + def identify(self, ts, gaze_position) -> GazeMovementType: + raise NotImplementedError('identify_position() method not implemented') - # process identification on a copy - self.__ts_gaze_positions = ts_gaze_positions.copy() - - return self - - def identify(self, ts_gaze_positions: TimeStampedGazePositions) -> Tuple[TimeStampedGazeMovementsType, TimeStampedGazeMovementsType, TimeStampedGazeStatusType]: + def __call__(self, ts_gaze_positions: TimeStampedGazePositions) -> Tuple[TimeStampedGazeMovementsType, TimeStampedGazeMovementsType, TimeStampedGazeStatusType]: """Identifiy fixations and saccades from timestamped gaze positions.""" assert(type(ts_gaze_positions) == TimeStampedGazePositions) @@ -261,7 +249,9 @@ class GazeMovementIdentifier(): ts_movements = TimeStampedGazeMovements() ts_status = TimeStampedGazeStatus() - for gaze_movement in self(ts_gaze_positions): + for ts, gaze_position in ts_gaze_positions.items(): + + gaze_movement = self.identify(ts, gaze_position) if isinstance(gaze_movement, Fixation): -- cgit v1.1