From cb090f1202e21e2851a494c87c5a48d0dea3133f Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 28 Apr 2022 09:13:40 +0200 Subject: Making MovementIdentifier as a generator of fixations and saccades. --- src/argaze/GazeFeatures.py | 97 +++++++++++++--------- src/argaze/utils/export_tobii_segment_fixations.py | 93 --------------------- 2 files changed, 60 insertions(+), 130 deletions(-) delete mode 100644 src/argaze/utils/export_tobii_segment_fixations.py diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 24b4b0b..63a1e55 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -8,8 +8,6 @@ from argaze.AreaOfInterest import AOIFeatures import numpy -FIXATION_MAX_DURATION = 1000 - @dataclass class GazePosition(): """Define gaze position.""" @@ -24,32 +22,43 @@ class TimeStampedGazePositions(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store gaze positions.""" def __setitem__(self, key, value: GazePosition): - """Force value to be a GazePosition""" - if type(value) != GazePosition: - raise ValueError('value must be a GazePosition') - super().__setitem__(key, value) @dataclass class Fixation(): - """Define fixation""" + """Define gaze fixation.""" duration: float dispersion: float - centroid: tuple((float, float)) + centroid: GazePosition + positions: TimeStampedGazePositions class TimeStampedFixations(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store fixations.""" def __setitem__(self, key, value: Fixation): - """Force value to be a Fixation""" - if type(value) != Fixation: - raise ValueError('value must be a Fixation') + super().__setitem__(key, value) + +@dataclass +class Saccade(): + """Define gaze saccade.""" + + duration: float + start_position: GazePosition + end_position: GazePosition + +class TimeStampedSaccades(DataStructures.TimeStampedBuffer): + """Define timestamped buffer to store saccades.""" + + def __setitem__(self, key, value: Saccade): + """Force value to be a Saccade""" + if not isinstance(value, Saccade): + raise ValueError('value must be a Saccade') super().__setitem__(key, value) -class FixationIdentifier(): - """Abstract class to define what should provide a fixation identifier.""" +class MovementIdentifier(): + """Abstract class to define what should provide a movement identifier.""" def __init__(self, ts_gaze_positions: TimeStampedGazePositions): @@ -65,17 +74,22 @@ class FixationIdentifier(): def identify(self): fixations = GazeFeatures.TimeStampedFixations() + saccades = GazeFeatures.TimeStampedSaccades() for ts, item in self: - if item == None: - continue + if isinstance(item, GazeFeatures.Fixation): + fixations[ts] = item - fixations[ts] = item + elif isinstance(item, GazeFeatures.Saccade): + saccades[ts] = item - return fixations + else: + continue + + return fixations, saccades -class DispersionBasedFixationIdentifier(FixationIdentifier): +class DispersionBasedMovementIdentifier(MovementIdentifier): """Implementation of the I-DT algorithm as described in: Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and @@ -94,6 +108,9 @@ class DispersionBasedFixationIdentifier(FixationIdentifier): # process identification on a copy self.__ts_gaze_positions = ts_gaze_positions.copy() + self.__last_fixation = None + self.__last_fixation_ts = -1 + def __getEuclideanDispersion(self, ts_gaze_positions_list): """Euclidian dispersion algorithm""" @@ -122,13 +139,10 @@ class DispersionBasedFixationIdentifier(FixationIdentifier): return (max(x_list) - min(x_list)) + (max(y_list) - min(y_list)) def __iter__(self): - """Start fixation identification""" - return self - - def __next__(self): + """Movement identification generator.""" # while there are 2 gaze positions at least - if len(self.__ts_gaze_positions) >= 2: + while len(self.__ts_gaze_positions) >= 2: # copy remaining timestamped gaze positions remaining_ts_gaze_positions = self.__ts_gaze_positions.copy() @@ -185,27 +199,36 @@ class DispersionBasedFixationIdentifier(FixationIdentifier): ts_list = [ts for (ts, gp) in ts_gaze_positions_list] duration = ts_list[-1] - ts_list[0] - if duration > FIXATION_MAX_DURATION: - duration = FIXATION_MAX_DURATION - if duration > 0: - # return timestamp and fixation - return ts_list[0], Fixation(duration, dispersion, (cx, cy)) + # store all positions in a timestamped buffer + ts_gaze_positions = TimeStampedGazePositions() - return -1, None + for (ts, gp) in ts_gaze_positions_list: + ts_gaze_positions[ts] = gp - # dispersion too wide : consider next gaze position - else: - self.__ts_gaze_positions.pop_first() + new_fixation = Fixation(duration, dispersion, GazePosition(cx, cy), ts_gaze_positions) + new_fixation_ts = ts_list[0] + + if self.__last_fixation != None: - # if no fixation found, go to next - return -1, None + new_saccade_ts = self.__last_fixation_ts + self.__last_fixation.duration + new_saccade_duration = new_fixation_ts - new_saccade_ts - else: - raise StopIteration + if new_saccade_duration > 0: - return -1, None + new_saccade = Saccade(new_saccade_duration, self.__last_fixation.positions.pop_last()[1], new_fixation.positions.pop_first()[1]) + + yield new_saccade_ts, new_saccade + + self.__last_fixation = new_fixation + self.__last_fixation_ts = new_fixation_ts + + yield new_fixation_ts, new_fixation + + # dispersion too wide : consider next gaze position + else: + self.__ts_gaze_positions.pop_first() @dataclass class VisualScanStep(): diff --git a/src/argaze/utils/export_tobii_segment_fixations.py b/src/argaze/utils/export_tobii_segment_fixations.py deleted file mode 100644 index b257a88..0000000 --- a/src/argaze/utils/export_tobii_segment_fixations.py +++ /dev/null @@ -1,93 +0,0 @@ -#!/usr/bin/env python - -import argparse -import bisect -import os - -from argaze import GazeFeatures -from argaze.TobiiGlassesPro2 import TobiiEntities -from argaze.utils import MiscFeatures - -def main(): - """ - Analyse Tobii segment fixations - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='path to a tobii segment folder') - parser.add_argument('-r', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') - parser.add_argument('-d', '--dispersion_threshold', metavar='DISPERSION_THRESHOLD', type=int, default=10, help='dispersion threshold in pixel') - parser.add_argument('-t', '--duration_threshold', metavar='DURATION_THRESHOLD', type=int, default=100, help='duration threshold in millisecond') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)') - args = parser.parse_args() - - if args.segment_path != None: - - # Manage destination path - if args.output != None: - - if not os.path.exists(os.path.dirname(args.output)): - - os.makedirs(os.path.dirname(args.output)) - print(f'{os.path.dirname(args.output)} folder created') - - fixations_filepath = f'{args.output}/fixations.csv' - - else: - - fixations_filepath = f'{args.segment_path}/fixations.csv' - - # Load a tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1000000), int(args.time_range[1] * 1000000) if args.time_range[1] != None else None) - - # Load a tobii segment video - tobii_segment_video = tobii_segment.load_video() - print(f'Video duration: {tobii_segment_video.get_duration()/1000000}, width: {tobii_segment_video.get_width()}, height: {tobii_segment_video.get_height()}') - - # Load a tobii segment data - tobii_segment_data = tobii_segment.load_data() - print(f'Data keys: {tobii_segment_data.keys()}') - - # Access to timestamped gaze position data buffer - tobii_ts_gaze_positions = tobii_segment_data.gidx_l_gp - print(f'{len(tobii_ts_gaze_positions)} gaze positions loaded') - - # Format tobii gaze data into generic gaze data and store them using millisecond unit timestamp - generic_ts_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - for ts, tobii_data in tobii_ts_gaze_positions.items(): - generic_data = GazeFeatures.GazePosition(tobii_data.gp[0] * tobii_segment_video.get_width(), tobii_data.gp[1] * tobii_segment_video.get_height()) - generic_ts_gaze_positions[ts/1000] = generic_data - - print(f'Dispersion threshold: {args.dispersion_threshold}') - print(f'Duration threshold: {args.duration_threshold}') - - # Start fixation identification - fixation_analyser = GazeFeatures.DispersionBasedFixationIdentifier(generic_ts_gaze_positions, args.dispersion_threshold, args.duration_threshold) - fixations = GazeFeatures.TimeStampedFixations() - - # Initialise progress bar - MiscFeatures.printProgressBar(0, int(tobii_segment_video.get_duration()/1000), prefix = 'Progress:', suffix = 'Complete', length = 100) - - for ts, item in fixation_analyser: - - if item == None: - continue - - fixations[ts] = item - - # Update Progress Bar - progress = ts - int(args.time_range[0] * 1000) - MiscFeatures.printProgressBar(progress, int(tobii_segment_video.get_duration()/1000), prefix = 'Progress:', suffix = 'Complete', length = 100) - - print(f'\n{len(fixations)} fixations found') - - # Export fixations analysis results - fixations.export_as_csv(fixations_filepath) - - print(f'Fixations saved into {fixations_filepath}') - -if __name__ == '__main__': - - main() \ No newline at end of file -- cgit v1.1