diff options
-rw-r--r-- | src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py | 143 |
1 files changed, 90 insertions, 53 deletions
diff --git a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py index fc565c6..b60291c 100644 --- a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py +++ b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py @@ -73,6 +73,13 @@ class Saccade(GazeFeatures.Saccade): def __post_init__(self): super().__post_init__() +@dataclass(frozen=True) +class UnknownGazeMovement(GazeFeatures.UnknownGazeMovement): + """Define dispersion based unknown movement.""" + + def __post_init__(self): + super().__post_init__() + @dataclass class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): """Implementation of the I-DT algorithm as described in: @@ -93,124 +100,146 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): """GazeMovement identification generator.""" self.__last_fixation = None + moving_gaze_positions = GazeFeatures.TimeStampedGazePositions() - # while there are 2 gaze positions at least + # While there are 2 gaze positions at least while len(self.__ts_gaze_positions) >= 2: - # copy remaining timestamped gaze positions - remaining_ts_gaze_positions = self.__ts_gaze_positions.copy() + # Copy remaining gaze positions + remaining_gaze_positions = self.__ts_gaze_positions.copy() - # select timestamped gaze position until a duration threshold - ts_start, gaze_position_start = remaining_ts_gaze_positions.pop_first() + # Select gaze position until a duration threshold + ts_start, gaze_position_start = remaining_gaze_positions.pop_first() - # Ignore non valid start position - if not gaze_position_start.valid: + # Ignore and remove non valid start positions + while not gaze_position_start.valid: + ts_start, gaze_position_start = remaining_gaze_positions.pop_first() self.__ts_gaze_positions.pop_first() - continue - ts_gaze_positions = GazeFeatures.TimeStampedGazePositions() - ts_gaze_positions[ts_start] = gaze_position_start + valid_gaze_positions = GazeFeatures.TimeStampedGazePositions() + valid_gaze_positions[ts_start] = gaze_position_start + + unvalid_gaze_positions = GazeFeatures.TimeStampedGazePositions() # Select next position - ts_next, gaze_position_next = remaining_ts_gaze_positions.first + ts_next, gaze_position_next = remaining_gaze_positions.first while (ts_next - ts_start) < self.duration_threshold: - # Ignore non valid position - # Should we consider invalid position to not break fixation ? + # Store valid position if gaze_position_next.valid: - # Store selected position - ts, gaze_position = remaining_ts_gaze_positions.pop_first() - ts_gaze_positions[ts] = gaze_position + ts, valid_gaze_position = remaining_gaze_positions.pop_first() + valid_gaze_positions[ts] = valid_gaze_position + # Store non valid position else: - remaining_ts_gaze_positions.pop_first() + ts, unvalid_gaze_position = remaining_gaze_positions.pop_first() + unvalid_gaze_positions[ts] = unvalid_gaze_position try: # Read next position - ts_next, gaze_position_next = remaining_ts_gaze_positions.first + ts_next, gaze_position_next = remaining_gaze_positions.first except: break - # is it a new fixation ? - new_fixation = Fixation(ts_gaze_positions) + # Consider the last valid gaze positions as a new fixation + new_fixation = Fixation(valid_gaze_positions) - # dispersion is small : extending fixation + # Dispersion is small : extending fixation if new_fixation.dispersion <= self.dispersion_threshold: - # remove selected gaze positions - for gp in ts_gaze_positions: + # Remove valid gaze positions + for ts, gp in valid_gaze_positions.items(): + self.__ts_gaze_positions.pop(ts) - self.__ts_gaze_positions.pop_first() + # Remove unvalid gaze positions + for ts, gp in unvalid_gaze_positions.items(): + self.__ts_gaze_positions.pop(ts) - # extend fixation position from a copy - ts_gaze_positions_extension = ts_gaze_positions.copy() + # Copy new fixation positions before to try to extend them + extended_gaze_positions = new_fixation.positions.copy() - # are next gaze positions not too dispersed ? - while len(remaining_ts_gaze_positions) > 0: + # Are next gaze positions not too dispersed ? + while len(remaining_gaze_positions) > 0: # Select next gaze position - ts_next, gaze_position_next = remaining_ts_gaze_positions.first + ts_next, gaze_position_next = remaining_gaze_positions.first - # Ignore non valid position - # Should we consider invalid position to not break fixation ? + # Ignore and remove non valid next positions if not gaze_position_next.valid: - remaining_ts_gaze_positions.pop_first() + ts_next, gaze_position_next = remaining_gaze_positions.pop_first() self.__ts_gaze_positions.pop_first() continue - ts_gaze_positions_extension[ts_next] = gaze_position_next + extended_gaze_positions[ts_next] = gaze_position_next # how much gaze is dispersed ? - extended_fixation = Fixation(ts_gaze_positions_extension) + extended_fixation = Fixation(extended_gaze_positions) # dispersion becomes too wide : ignore extended fixation if extended_fixation.dispersion > self.dispersion_threshold: break # update new fixation - new_fixation = Fixation(ts_gaze_positions_extension.copy()) + new_fixation = Fixation(extended_gaze_positions.copy()) # remove selected gaze position - remaining_ts_gaze_positions.pop_first() + remaining_gaze_positions.pop_first() self.__ts_gaze_positions.pop_first() - # is the new fixation have a duration ? + # Is the new fixation have a duration ? if new_fixation.duration > 0: - # does a former fixation have been identified ? + # Does a former fixation have been identified ? if self.__last_fixation != None: - # merge new fixation if it overlaps last fixation + # Merge new fixation if it overlaps last fixation if self.__last_fixation.overlap(new_fixation): self.__last_fixation.merge(new_fixation) new_fixation = None - # else output last fixation and create a saccade + # Else output last fixation and consider moving gaze positions buffer else: yield self.__last_fixation - - # store start and end positions in a timestamped buffer - ts_saccade_positions = GazeFeatures.TimeStampedGazePositions() - start_position_ts, start_position = self.__last_fixation.positions.last - ts_saccade_positions[start_position_ts] = start_position + # Is there a new movement ? + if len(moving_gaze_positions) > 0: - end_position_ts, end_position = new_fixation.positions.first - ts_saccade_positions[end_position_ts] = end_position + # Check first and last gaze position of the moving positions buffer + start_position_ts, start_position = moving_gaze_positions.first + end_position_ts, end_position = moving_gaze_positions.last - if end_position_ts > start_position_ts: + if not moving_gaze_positions.first[0] > self.__last_fixation.positions.last[0]: + print('first moving_gaze_positions not after last self.__last_fixation.positions') + print(moving_gaze_positions.first[0], self.__last_fixation.positions.last[0]) - new_saccade = Saccade(ts_saccade_positions) + if not moving_gaze_positions.last[0] < new_fixation.positions.first[0]: + print('last moving_gaze_positions not before first new_fixation.positions') + print(moving_gaze_positions.last[0], new_fixation.positions.first[0]) - yield new_saccade + # Saccade shouldn't be longer than fixation + # TODO : add a saccade duration threshold attribute ? + if end_position_ts - start_position_ts < self.duration_threshold: + + yield Saccade(moving_gaze_positions) + + # This movement is unknown + else: + + # This unknown movement should be dispersed + assert(Fixation(moving_gaze_positions).dispersion > self.dispersion_threshold) + + yield UnknownGazeMovement(moving_gaze_positions) + + # Forget former moving gaze positions + moving_gaze_positions = GazeFeatures.TimeStampedGazePositions() self.__last_fixation = new_fixation @@ -219,10 +248,18 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__last_fixation = new_fixation yield self.__last_fixation - # dispersion too wide : consider next gaze position + # Forget former moving gaze positions + moving_gaze_positions = GazeFeatures.TimeStampedGazePositions() + + # Dispersion too wide : + # - This gaze position is part of a movement but not a fixation + # - Consider next gaze position else: - self.__ts_gaze_positions.pop_first() - # output last fixation + ts_moving, moving_gaze_position = self.__ts_gaze_positions.pop_first() + moving_gaze_positions[ts_moving] = moving_gaze_position + + # Output last fixation if self.__last_fixation != None: + yield self.__last_fixation |