aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py279
-rw-r--r--src/argaze/GazeFeatures.py22
2 files changed, 8 insertions, 293 deletions
diff --git a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py
index 6b5ec09..07136f8 100644
--- a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py
+++ b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py
@@ -93,281 +93,6 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
"""Minimal duration allowed to consider a gaze movement as a fixation.
It is also used as maximal duration allowed to consider a gaze movement as a saccade."""
- def __iter__(self) -> GazeFeatures.GazeMovementType:
- """GazeMovement identification generator."""
-
- self.__last_fixation = None
- unmatched_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # While there are 2 gaze positions at least
- while len(self.__ts_gaze_positions) >= 2:
-
- # Remove all unvalid gaze positions until to find a valid one
- ts_current, gaze_position_current = self.__ts_gaze_positions.pop_first()
-
- while not gaze_position_current.valid and len(self.__ts_gaze_positions) > 0:
- ts_current, gaze_position_current = self.__ts_gaze_positions.pop_first()
-
- # Prepare to select current and next valid gaze positions until a duration threshold
- valid_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # Output last fixation after too much unvalid positions
- if self.__last_fixation != None:
-
- ts_last, gaze_position_last = self.__last_fixation.positions.last
-
- if (ts_current - ts_last) > self.duration_min_threshold:
-
- # Get last gaze position of the last fixation as it is it's out position
- last_new_ts, last_new_position = self.__last_fixation.positions.pop_last()
-
- # Update last fixation
- self.__last_fixation.update()
-
- # Append this last out position to the valid gaze positions selection
- valid_gaze_positions[last_new_ts] = last_new_position
-
- yield self.__last_fixation
- self.__last_fixation = None
-
- # Append current gaze position to valid gaze positions selection
- valid_gaze_positions[ts_current] = gaze_position_current
-
- # Store unvalid gaze positions to count them
- unvalid_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # Keep detect of last valid timestamp
- ts_last_valid = ts_current
-
- for ts_next, gaze_position_next in self.__ts_gaze_positions.items():
-
- if (ts_next - ts_current) < self.duration_min_threshold:
-
- # Store valid position
- if gaze_position_next.valid:
-
- valid_gaze_positions[ts_next] = gaze_position_next
-
- # Keep detect of last valid timestamp
- ts_last_valid = ts_next
-
- # Store non valid position
- else:
-
- unvalid_gaze_positions[ts_next] = gaze_position_next
-
- else:
-
- break
-
- # If there is at least 3 valid gaze positions selected:
- # - 1 entering position
- # - 1 staying position
- # - 1 outing position (which may become a staying position after extension)
- if len(valid_gaze_positions) >= 3:
-
- # Consider selected valid gaze positions as part of a maybe new fixation
- new_fixation = Fixation(valid_gaze_positions)
-
- # Dispersion small enough: it is a fixation ! Try to extend it
- if new_fixation.deviation_max <= self.deviation_max_threshold:
-
- # Remove valid and unvalid gaze positions as there as now stored in new fixation
- # -1 as current gaze position have already been poped
- for _ in range(len(valid_gaze_positions) + len(unvalid_gaze_positions) - 1):
- self.__ts_gaze_positions.pop_first()
-
- # Copy new fixation positions before to try to extend them
- extended_gaze_positions = new_fixation.positions.copy()
- extended_fixation = new_fixation
-
- # Are next gaze positions not too dispersed ?
- while len(self.__ts_gaze_positions) > 0:
-
- # Select and remove next gaze position
- ts_next, gaze_position_next = self.__ts_gaze_positions.pop_first()
-
- # Consider only valid next position
- if gaze_position_next.valid:
-
- # Get deviation of the nex gaze position from the new extended fixation
- deviation_from_extended = extended_fixation.point_deviation(gaze_position_next)
-
- # Extend fixation anyway even with last out position: it will be popped later.
- extended_gaze_positions[ts_next] = gaze_position_next
- extended_fixation = Fixation(extended_gaze_positions)
-
- # Stop fixation extension
- if deviation_from_extended > self.deviation_max_threshold:
-
- break
-
- # Check that consecutive unvalid gaze positions do not exceed fixation duration threshold
- elif ts_next - ts_last_valid >= self.duration_min_threshold:
-
- break
-
- # Update new fixation
- new_fixation = extended_fixation
-
- # Does a former fixation have been identified ?
- if self.__last_fixation != None:
-
- # Remove last gaze position of the new fixation as it is it's out position
- last_new_ts, last_new_position = new_fixation.positions.pop_last()
-
- # Edit inter fixations movement gaze positions
- movement_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # If such unmatched positions exist
- if len(unmatched_gaze_positions) > 0:
-
- # Inter fixations movement should:
- # - starts at last position of last fixation (this position is out so it have to be popped)
- # - stops at the first position inside new fixation
- start_movement_ts, start_position = self.__last_fixation.positions.pop_last()
- stop_movement_ts, stop_position = new_fixation.positions.pop_first()
-
- # Edit first movement gaze position
- movement_gaze_positions[start_movement_ts] = start_position
-
- # Edit movement positions with unmatched positions if there are between the 2 fixations
- start_unmatched_ts, _ = unmatched_gaze_positions.first
- end_unmatched_ts, _ = unmatched_gaze_positions.last
-
- # Does unmatched gaze positions happened between the last and the new fixation ?
- if start_unmatched_ts > start_movement_ts and end_unmatched_ts < stop_movement_ts:
-
- # Append unmatched gaze positions to saccade
- movement_gaze_positions.append(unmatched_gaze_positions)
-
- # Unmatched gaze positions happened before last fixation
- else:
-
- # Ignore them: GazeMovementIdentifier have to output movements according their time apparition
- pass
-
- # Edit last movement gaze position
- movement_gaze_positions[stop_movement_ts] = stop_position
-
- # When there is no unmatched positions between 2 fixations (*rare case)
- else:
-
- # the last fixation position is the same than the first position of the new fixation
- stop_movement_ts, stop_position = self.__last_fixation.positions.pop_last()
- start_movement_ts, start_position = self.__last_fixation.positions.pop_last()
-
- # Edit first movement gaze position
- movement_gaze_positions[start_movement_ts] = start_position
-
- # Edit last movement gaze position
- movement_gaze_positions[stop_movement_ts] = stop_position
-
- # Update last and new fixations
- self.__last_fixation.update()
- new_fixation.update()
-
- # Does new fixation overlap last fixation?
- if self.__last_fixation.overlap(new_fixation):
-
- # (*rare case)
- if len(unmatched_gaze_positions) == 0:
- self.__last_fixation.positions[start_movement_ts] = start_position
- self.__last_fixation.positions[stop_movement_ts] = stop_position
-
- # Merge new fixation into last fixation
- self.__last_fixation.merge(new_fixation)
-
- # QUESTION: What to do if the time between the two fixations it very long ?
- # It would be dangerous to set a timeout value as a fixation duration has no limit.
-
- # Forget new fixation
- new_fixation = None
-
- # NOTE: Ignore inter fixations gaze positions: there was probably noisy positions.
-
- # Otherwise,
- else:
-
- # Output last fixation
- yield self.__last_fixation
-
- # New fixation becomes the last fixation to allow further merging
- self.__last_fixation = new_fixation
-
- # Short time between fixations : this movement is a saccade
- if stop_movement_ts - start_movement_ts <= self.duration_min_threshold:
-
- # Output saccade
- yield Saccade(movement_gaze_positions)
-
- # Too much time between fixations: this movement is unknown
- else:
-
- # Output unknown movement
- yield GazeFeatures.GazeMovement(movement_gaze_positions)
-
- # Append out position to last fixation: it will be popped as start_position the next time
- self.__last_fixation.positions[last_new_ts] = last_new_position
-
- # In any case, forget former unmatched gaze positions
- unmatched_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # First fixation is stored to allow further merging
- # The movement before is outputed
- else:
-
- self.__last_fixation = new_fixation
-
- # Is there unmatched gaze positions?
- if len(unmatched_gaze_positions) > 0:
-
- start_movement_ts, _ = unmatched_gaze_positions.first
- stop_movement_ts, _ = unmatched_gaze_positions.last
-
- # Short time between fixations : this movement is a saccade
- if stop_movement_ts - start_movement_ts <= self.duration_min_threshold:
-
- # Output saccade
- yield Saccade(unmatched_gaze_positions)
-
- # Too much time between fixations: this movement is unknown
- else:
-
- # Output unknown movement
- yield GazeFeatures.GazeMovement(unmatched_gaze_positions)
-
- # In any case, forget former unmatched gaze positions
- unmatched_gaze_positions = GazeFeatures.TimeStampedGazePositions()
-
- # Dispersion too wide:
- # Current gaze position is not part of a fixation
- else:
-
- unmatched_gaze_positions[ts_current] = gaze_position_current
-
- # Only one valid gaze position selected:
- # Current gaze position is not part of a fixation
- else:
-
- unmatched_gaze_positions[ts_current] = gaze_position_current
-
- # Output last fixation
- if self.__last_fixation != None:
- yield self.__last_fixation
-
-
-""""FOR QUICK DEMO PURPOSE"""
-@dataclass
-class TobiiDataStream_GazeMovementIdentifier():
-
- deviation_max_threshold: int|float
- """Maximal distance allowed to consider a gaze movement as a fixation."""
-
- duration_min_threshold: int|float
- """Minimal duration allowed to consider a gaze movement as a fixation.
- It is also used as maximal duration allowed to consider a gaze movement as a saccade."""
-
def __post_init__(self):
self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
@@ -377,8 +102,8 @@ class TobiiDataStream_GazeMovementIdentifier():
def identify(self, ts, gaze_position):
# Ignore non valid gaze position
- #if not gaze_position.valid:
- # return
+ if not gaze_position.valid:
+ return
# Check if too much time elapsed since last gaze position
if len(self.__valid_positions) > 0:
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index 2a0096c..40ce77b 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -236,22 +236,10 @@ class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer):
class GazeMovementIdentifier():
"""Abstract class to define what should provide a gaze movement identifier."""
- def __iter__(self) -> GazeMovementType:
- raise NotImplementedError('__iter__() method not implemented')
-
- def __next__(self):
- raise NotImplementedError('__next__() method not implemented')
-
- def __call__(self, ts_gaze_positions: TimeStampedGazePositions):
-
- assert(type(ts_gaze_positions) == TimeStampedGazePositions)
+ def identify(self, ts, gaze_position) -> GazeMovementType:
+ raise NotImplementedError('identify_position() method not implemented')
- # process identification on a copy
- self.__ts_gaze_positions = ts_gaze_positions.copy()
-
- return self
-
- def identify(self, ts_gaze_positions: TimeStampedGazePositions) -> Tuple[TimeStampedGazeMovementsType, TimeStampedGazeMovementsType, TimeStampedGazeStatusType]:
+ def __call__(self, ts_gaze_positions: TimeStampedGazePositions) -> Tuple[TimeStampedGazeMovementsType, TimeStampedGazeMovementsType, TimeStampedGazeStatusType]:
"""Identifiy fixations and saccades from timestamped gaze positions."""
assert(type(ts_gaze_positions) == TimeStampedGazePositions)
@@ -261,7 +249,9 @@ class GazeMovementIdentifier():
ts_movements = TimeStampedGazeMovements()
ts_status = TimeStampedGazeStatus()
- for gaze_movement in self(ts_gaze_positions):
+ for ts, gaze_position in ts_gaze_positions.items():
+
+ gaze_movement = self.identify(ts, gaze_position)
if isinstance(gaze_movement, Fixation):