From 5e7c357a857a9d3ddcd689d8027468bd9fa80e25 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 24 Nov 2022 08:41:17 +0100 Subject: Moving specific gaze analysis algorithm into a dedicated GazeAnalysis submodule. --- .../DispersionBasedGazeMovementIdentifier.py | 186 ++++++++++++++++ src/argaze/GazeAnalysis/README.md | 9 + src/argaze/GazeAnalysis/__init__.py | 5 + src/argaze/GazeFeatures.py | 248 +++++++-------------- src/argaze/__init__.py | 2 +- 5 files changed, 285 insertions(+), 165 deletions(-) create mode 100644 src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py create mode 100644 src/argaze/GazeAnalysis/README.md create mode 100644 src/argaze/GazeAnalysis/__init__.py diff --git a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py new file mode 100644 index 0000000..fd46d7e --- /dev/null +++ b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python + +from dataclasses import dataclass, field +import math + +from argaze import GazeFeatures + +import numpy + +@dataclass(frozen=True) +class Fixation(GazeFeatures.Fixation): + """Define dispersion based fixation.""" + + dispersion: float = field(init=False) + """Dispersion of all gaze positions belonging to the fixation.""" + + euclidian: bool = field(default=True) + """Does the distance is calculated in euclidian way.""" + + centroid: tuple = field(init=False) + """Centroïd of all gaze positions belonging to the fixation.""" + + def __post_init__(self): + + super().__post_init__() + + x_list = [gp[0] for (ts, gp) in list(self.positions.items())] + y_list = [gp[1] for (ts, gp) in list(self.positions.items())] + + cx = numpy.mean(x_list) + cy = numpy.mean(y_list) + + # Select dispersion algorithm + if self.euclidian: + + c = [cx, cy] + points = numpy.column_stack([x_list, y_list]) + + dist = (points - c)**2 + dist = numpy.sum(dist, axis=1) + dist = numpy.sqrt(dist) + + __dispersion = max(dist) + + else: + + __dispersion = (max(x_list) - min(x_list)) + (max(y_list) - min(y_list)) + + # Update frozen dispersion attribute + object.__setattr__(self, 'dispersion', __dispersion) + + # Update frozen centroid attribute + object.__setattr__(self, 'centroid', (cx, cy)) + +@dataclass(frozen=True) +class Saccade(GazeFeatures.Saccade): + """Define dispersion based saccade.""" + + def __post_init__(self): + super().__post_init__() + +@dataclass +class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): + """Implementation of the I-DT algorithm as described in: + + Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and + saccades in eye-tracking protocols. In Proceedings of the 2000 symposium + on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, + 71-78. [DOI=http://dx.doi.org/10.1145/355017.355028](DOI=http://dx.doi.org/10.1145/355017.355028) + """ + + dispersion_threshold: int|float + """Maximal distance allowed to consider several gaze positions as a fixation.""" + + duration_threshold: int|float + """Minimal duration allowed to consider several gaze positions as a fixation.""" + + def __iter__(self) -> GazeFeatures.GazeMovementType: + """GazeMovement identification generator.""" + + self.__last_fixation = None + + # while there are 2 gaze positions at least + while len(self.__ts_gaze_positions) >= 2: + + # copy remaining timestamped gaze positions + remaining_ts_gaze_positions = self.__ts_gaze_positions.copy() + + # select timestamped gaze position until a duration threshold + ts_start, gaze_position_start = remaining_ts_gaze_positions.pop_first() + + # Invalid start position + if not gaze_position_start.valid: + + self.__ts_gaze_positions.pop_first() + continue + + ts_gaze_positions = GazeFeatures.TimeStampedGazePositions() + ts_gaze_positions[ts_start] = gaze_position_start + + # Select next position + ts_next, gaze_position_next = remaining_ts_gaze_positions.first + + while (ts_next - ts_start) < self.duration_threshold: + + # Ignore non valid position + # TODO ? Consider invalid position to not break fixation ? + if gaze_position_next.valid: + + # Store selected position + ts, gaze_position = remaining_ts_gaze_positions.pop_first() + ts_gaze_positions[ts] = gaze_position + + try: + # Read next position + ts_next, gaze_position_next = remaining_ts_gaze_positions.first + + except: + break + + # is it a new fixation ? + new_fixation = Fixation(ts_gaze_positions) + + # dispersion is small : extending fixation + if new_fixation.dispersion <= self.dispersion_threshold: + + # remove selected gaze positions + for gp in ts_gaze_positions: + self.__ts_gaze_positions.pop_first() + + # extend fixation position from a copy + ts_gaze_positions_extension = ts_gaze_positions.copy() + + # are next gaze positions not too dispersed ? + while len(remaining_ts_gaze_positions) > 0: + + # Select next gaze position + ts_next, gaze_position_next = remaining_ts_gaze_positions.first + + # Invalid next position + if not gaze_position_next.valid: + continue + + ts_gaze_positions_extension[ts_next] = gaze_position_next + + # how much gaze is dispersed ? + extended_fixation = Fixation(ts_gaze_positions_extension) + + # dispersion becomes too wide : ignore extended fixation + if extended_fixation.dispersion > self.dispersion_threshold: + break + + # update new fixation + new_fixation = Fixation(ts_gaze_positions_extension.copy()) + + # remove selected gaze position + remaining_ts_gaze_positions.pop_first() + self.__ts_gaze_positions.pop_first() + + # is the new fixation have a duration ? + if new_fixation.duration > 0: + + if self.__last_fixation != None: + + # store start and end positions in a timestamped buffer + ts_saccade_positions = GazeFeatures.TimeStampedGazePositions() + + start_position_ts, start_position = self.__last_fixation.positions.last + ts_saccade_positions[start_position_ts] = start_position + + end_position_ts, end_position = new_fixation.positions.first + ts_saccade_positions[end_position_ts] = end_position + + if end_position_ts > start_position_ts: + + new_saccade = Saccade(ts_saccade_positions) + + yield new_saccade + + self.__last_fixation = new_fixation + + yield new_fixation + + # dispersion too wide : consider next gaze position + else: + self.__ts_gaze_positions.pop_first() diff --git a/src/argaze/GazeAnalysis/README.md b/src/argaze/GazeAnalysis/README.md new file mode 100644 index 0000000..fd778e4 --- /dev/null +++ b/src/argaze/GazeAnalysis/README.md @@ -0,0 +1,9 @@ +Class interface to work with various gaze analysis algorithms. + +# Gaze movements identification algorithms + +*"The act of classifying eye movements into distinct events is, on a general level, driven by a desire to isolate different intervals of the data stream strongly correlated with certain oculomotor or cognitive properties."* Citation from ["One algorithm to rule them all? An evaluation and discussion of ten eye movement event-detection algorithms"](https://link.springer.com/article/10.3758/s13428-016-0738-9) article. + +## Dispersion based gaze movement identifier (I-DT) + +The code is based on the implementation of the I-DT algorithm as described in ["Identifying fixations and saccades in eye-tracking protocols"](http://dx.doi.org/10.1145/355017.355028) article. \ No newline at end of file diff --git a/src/argaze/GazeAnalysis/__init__.py b/src/argaze/GazeAnalysis/__init__.py new file mode 100644 index 0000000..4643641 --- /dev/null +++ b/src/argaze/GazeAnalysis/__init__.py @@ -0,0 +1,5 @@ +""" +.. include:: README.md +""" +__docformat__ = "restructuredtext" +__all__ = ['DispersionBasedGazeMovementIdentifier'] \ No newline at end of file diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 1e7ba9b..de30735 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +from typing import TypeVar, Tuple from dataclasses import dataclass, field import math import json @@ -87,7 +88,10 @@ class TimeStampedGazePositions(DataStructures.TimeStampedBuffer): super().__setitem__(key, value) -@dataclass +GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement") +# Type definition for type annotation convenience + +@dataclass(frozen=True) class GazeMovement(): """Define abstract gaze movement class as a buffer of timestamped positions.""" @@ -102,221 +106,137 @@ class GazeMovement(): start_position_ts, start_position = self.positions.first end_position_ts, end_position = self.positions.last - self.duration = round(end_position_ts - start_position_ts) - -Fixation = GazeMovement -"""Define abstract fixation as gaze movement.""" - -Saccade = GazeMovement -"""Define abstract saccade as gaze movement.""" + # Update frozen duration attribute + object.__setattr__(self, 'duration', end_position_ts - start_position_ts) -class TimeStampedGazeMovements(DataStructures.TimeStampedBuffer): - """Define timestamped buffer to store gaze movements.""" - - def __setitem__(self, key, value: GazeMovement): - """Force value to inherit from GazeMovement.""" + def __str__(self) -> str: + """String display""" - assert(type(value).__bases__[0] == GazeMovement) + output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self.positions)}' - super().__setitem__(key, value) + for ts, position in self.positions.items(): -@dataclass -class GazeStatus(): - """Define gaze status as a position belonging to an identified and indexed gaze movement.""" + output += f'\n\t{ts}:\n\t\tvalue={position.value},\n\t\taccurracy={position.accuracy}' - position: GazePosition - """Gaze position""" + return output - movement_type: str - """GazeMovement type to which gaze position belongs.""" +class Fixation(GazeMovement): + """Define abstract fixation as gaze movement.""" - movement_index: int - """GazeMovement index to which gaze positon belongs.""" + def __post_init__(self): -class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer): - """Define timestamped buffer to store gaze status.""" + super().__post_init__() - def __setitem__(self, key, value: GazeStatus): - super().__setitem__(key, value) +class Saccade(GazeMovement): + """Define abstract saccade as gaze movement.""" -class GazeMovementIdentifier(): - """Abstract class to define what should provide a gaze movement identifier.""" + def __post_init__(self): - def __init__(self, ts_gaze_positions: TimeStampedGazePositions): + super().__post_init__() - if type(ts_gaze_positions) != TimeStampedGazePositions: - raise ValueError('argument must be a TimeStampedGazePositions') +TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeStampedGazeMovements") +# Type definition for type annotation convenience - def __iter__(self): - raise NotImplementedError('__iter__() method not implemented') +class TimeStampedGazeMovements(DataStructures.TimeStampedBuffer): + """Define timestamped buffer to store gaze movements.""" - def __next__(self): - raise NotImplementedError('__next__() method not implemented') + def __setitem__(self, key, value: GazeMovement): + """Force value to inherit from GazeMovement.""" -class DispersionBasedGazeMovementIdentifier(GazeMovementIdentifier): - """Implementation of the I-DT algorithm as described in: - - Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and - saccades in eye-tracking protocols. In Proceedings of the 2000 symposium - on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, - 71-78. DOI=http://dx.doi.org/10.1145/355017.355028 - """ + assert(type(value).__bases__[0] == Fixation or type(value).__bases__[0] == Saccade) - @dataclass - class DispersionBasedFixation(Fixation): - """Define dispersion based fixation as an algorithm specific fixation.""" + super().__setitem__(key, value) - dispersion: float = field(init=False) - euclidian: bool = field(default=True) + def __str__(self): - centroid: tuple = field(init=False) + output = '' + for ts, item in self.items(): - def __post_init__(self): + output += f'\n{item}' - super().__post_init__() + return output - x_list = [gp[0] for (ts, gp) in list(self.positions.items())] - y_list = [gp[1] for (ts, gp) in list(self.positions.items())] +GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus") +# Type definition for type annotation convenience - cx = round(numpy.mean(x_list)) - cy = round(numpy.mean(y_list)) +@dataclass(frozen=True) +class GazeStatus(GazePosition): + """Define gaze status as a gaze position belonging to an identified and indexed gaze movement.""" - # select dispersion algorithm - if self.euclidian: + movement_type: str = field(kw_only=True) + """GazeMovement type to which gaze position belongs.""" - c = [cx, cy] - points = numpy.column_stack([x_list, y_list]) + movement_index: int = field(kw_only=True) + """GazeMovement index to which gaze positon belongs.""" - dist = (points - c)**2 - dist = numpy.sum(dist, axis=1) - dist = numpy.sqrt(dist) + @classmethod + def from_position(cls, gaze_position: GazePosition, movement_type: str, movement_index: int) -> GazeStatusType: + """Initialize from a gaze position instance.""" - self.dispersion = round(max(dist)) + return cls(gaze_position.value, accuracy=gaze_position.accuracy, movement_type=movement_type, movement_index=movement_index) - else: +TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus") +# Type definition for type annotation convenience - self.dispersion = (max(x_list) - min(x_list)) + (max(y_list) - min(y_list)) +class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer): + """Define timestamped buffer to store gaze status.""" - self.centroid = (cx, cy) + def __setitem__(self, key, value: GazeStatus): + super().__setitem__(key, value) - @dataclass - class DispersionBasedSaccade(Saccade): - """Define dispersion based saccade as an algorithm specific saccade.""" +class GazeMovementIdentifier(): + """Abstract class to define what should provide a gaze movement identifier.""" - def __post_init__(self): - super().__post_init__() + def __iter__(self) -> GazeMovementType: + raise NotImplementedError('__iter__() method not implemented') - def __init__(self, ts_gaze_positions, dispersion_threshold: float, duration_threshold: float): + def __next__(self): + raise NotImplementedError('__next__() method not implemented') - super().__init__(ts_gaze_positions) + def __call__(self, ts_gaze_positions: TimeStampedGazePositions): - self.__dispersion_threshold = dispersion_threshold - self.__duration_threshold = duration_threshold + assert(type(ts_gaze_positions) == TimeStampedGazePositions) # process identification on a copy self.__ts_gaze_positions = ts_gaze_positions.copy() - self.__last_fixation = None + return self - def __iter__(self): - """GazeMovement identification generator.""" - - # while there are 2 gaze positions at least - while len(self.__ts_gaze_positions) >= 2: - - # copy remaining timestamped gaze positions - remaining_ts_gaze_positions = self.__ts_gaze_positions.copy() + def identify(self, ts_gaze_positions: TimeStampedGazePositions) -> Tuple[TimeStampedGazeMovementsType, TimeStampedGazeMovementsType, TimeStampedGazeStatusType]: + """Identifiy fixations and saccades from timestamped gaze positions.""" - # select timestamped gaze position until a duration threshold - (ts_start, gaze_position_start) = remaining_ts_gaze_positions.pop_first() + assert(type(ts_gaze_positions) == TimeStampedGazePositions) - # Invalid start position - if not gaze_position_start.valid: - - self.__ts_gaze_positions.pop_first() - continue + ts_fixations = TimeStampedGazeMovements() + ts_saccades = TimeStampedGazeMovements() + ts_status = TimeStampedGazeStatus() - ts_gaze_positions = TimeStampedGazePositions() - ts_gaze_positions[ts_start] = gaze_position_start + for gaze_movement in self(ts_gaze_positions): - (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first() + if isinstance(gaze_movement, Fixation): - while (ts_current - ts_start) < self.__duration_threshold: + start_ts, start_position = gaze_movement.positions.first - # Ignore non valid position - # TODO ? Consider invalid position to not break fixation ? - if gaze_position_current.valid: - - ts_gaze_positions[ts_current] = gaze_position_current - - try: - (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first() - - except: - break + ts_fixations[start_ts] = gaze_movement - # is it a new fixation ? - new_fixation = DispersionBasedGazeMovementIdentifier.DispersionBasedFixation(ts_gaze_positions) + for ts, position in gaze_movement.positions.items(): - # dispersion is small - if new_fixation.dispersion <= self.__dispersion_threshold: + ts_status[ts] = GazeStatus.from_position(position, 'Fixation', len(ts_fixations)) - # remove selected gaze positions - for gp in ts_gaze_positions: - self.__ts_gaze_positions.pop_first() + elif isinstance(gaze_movement, Saccade): - # are next gaze positions not too dispersed ? - while len(remaining_ts_gaze_positions) > 0: + start_ts, start_position = gaze_movement.positions.first + end_ts, end_position = gaze_movement.positions.last + + ts_saccades[start_ts] = gaze_movement - # select next gaze position - ts_next, position_next = remaining_ts_gaze_positions.pop_first() + ts_status[start_ts] = GazeStatus.from_position(start_position, 'Saccade', len(ts_saccades)) + ts_status[end_ts] = GazeStatus.from_position(end_position, 'Saccade', len(ts_saccades)) - # Invalid next position - if not position_next.valid: - continue - - ts_gaze_positions[ts_next] = position_next - - # how much gaze is dispersed ? - updated_fixation = DispersionBasedGazeMovementIdentifier.DispersionBasedFixation(ts_gaze_positions) - - # dispersion is becomes too wide : ignore updated fixation - if updated_fixation.dispersion > self.__dispersion_threshold: - break - - # update new fixation - new_fixation = updated_fixation - - # remove selected gaze position - self.__ts_gaze_positions.pop_first() - - # is the new fixation have a duration ? - if new_fixation.duration > 0: - - if self.__last_fixation != None: - - # store start and end positions in a timestamped buffer - ts_saccade_positions = TimeStampedGazePositions() - - start_position_ts, start_position = self.__last_fixation.positions.pop_last() - ts_saccade_positions[start_position_ts] = start_position - - end_position_ts, end_position = new_fixation.positions.pop_first() - ts_saccade_positions[end_position_ts] = end_position - - if end_position_ts > start_position_ts: - - new_saccade = DispersionBasedGazeMovementIdentifier.DispersionBasedSaccade(ts_saccade_positions) - - yield new_saccade - - self.__last_fixation = new_fixation - - yield new_fixation - - # dispersion too wide : consider next gaze position else: - self.__ts_gaze_positions.pop_first() + continue + + return ts_fixations, ts_saccades, ts_status @dataclass class VisualScanStep(): diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py index 57ccd2c..4945206 100644 --- a/src/argaze/__init__.py +++ b/src/argaze/__init__.py @@ -2,4 +2,4 @@ .. include:: ../../README.md """ __docformat__ = "restructuredtext" -__all__ = ['utils','ArUcoMarkers','AreaOfInterest','GazeFeatures','DataStructures','TobiiGlassesPro2'] \ No newline at end of file +__all__ = ['utils','ArUcoMarkers','AreaOfInterest','GazeFeatures','DataStructures','GazeAnalysis','TobiiGlassesPro2'] \ No newline at end of file -- cgit v1.1