From 2718252c06e7e23c1500e568769dea70e409a04f Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Tue, 9 Aug 2022 17:49:37 +0200 Subject: rewriting movement identification classes. --- src/argaze/DataStructures.py | 8 + src/argaze/GazeFeatures.py | 237 ++++++++------------- src/argaze/utils/export_tobii_segment_movements.py | 46 ++-- 3 files changed, 128 insertions(+), 163 deletions(-) diff --git a/src/argaze/DataStructures.py b/src/argaze/DataStructures.py index 637a007..15811c2 100644 --- a/src/argaze/DataStructures.py +++ b/src/argaze/DataStructures.py @@ -38,6 +38,10 @@ class TimeStampedBuffer(collections.OrderedDict): for ts, value in timestamped_buffer.items(): self[ts] = value + def get_first(self): + + return list(self.items())[0] + def pop_first(self): """Easing FIFO access mode""" return self.popitem(last=False) @@ -59,6 +63,10 @@ class TimeStampedBuffer(collections.OrderedDict): return popep_ts, poped_value + def get_last(self): + + return list(self.items())[-1] + def pop_last(self): """Easing FIFO access mode""" return self.popitem(last=True) diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 8d3d93f..fc07ca2 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from dataclasses import dataclass +from dataclasses import dataclass, field import math from argaze import DataStructures @@ -19,63 +19,43 @@ class TimeStampedGazePositions(DataStructures.TimeStampedBuffer): super().__setitem__(key, value) @dataclass -class Fixation(): - """Define gaze fixation.""" +class Movement(): + """Define abstract movement class.""" - index: int - duration: float - dispersion: float - centroid: GazePosition positions: TimeStampedGazePositions + duration: float = field(init=False) -class TimeStampedFixations(DataStructures.TimeStampedBuffer): - """Define timestamped buffer to store fixations.""" - - def __setitem__(self, key, value: Fixation): - """Force value to be a Fixation""" - - if not isinstance(value, Fixation): - raise ValueError('value must be a Fixation') + def __post_init__(self): - super().__setitem__(key, value) - -@dataclass -class Saccade(): - """Define gaze saccade.""" + start_position_ts, start_position = self.positions.get_first() + end_position_ts, end_position = self.positions.get_last() - index: int - duration: float - start_position: GazePosition - end_position: GazePosition + self.duration = round(end_position_ts - start_position_ts) -class TimeStampedSaccades(DataStructures.TimeStampedBuffer): - """Define timestamped buffer to store saccades.""" +Fixation = Movement +"""Define abstract fixation as movement.""" - def __setitem__(self, key, value: Saccade): - """Force value to be a Saccade""" +Saccade = Movement +"""Define abstract saccade as movement.""" - if not isinstance(value, Saccade): - raise ValueError('value must be a Saccade') +class TimeStampedMovements(DataStructures.TimeStampedBuffer): + """Define timestamped buffer to store movements.""" + def __setitem__(self, key, value: Movement): super().__setitem__(key, value) @dataclass -class Movement(): - """Define movement.""" +class GazeStatus(): + """Define gaze status as a position belonging to an identified and indexed movement.""" - index: int - type: str position: GazePosition + movement_type: str + movement_index: int -class TimeStampedMovements(DataStructures.TimeStampedBuffer): - """Define timestamped buffer to store movement.""" - - def __setitem__(self, key, value: Movement): - """Force value to be a Movement""" - - if not isinstance(value, Movement): - raise ValueError('value must be a Movement') +class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer): + """Define timestamped buffer to store gaze status.""" + def __setitem__(self, key, value: GazeStatus): super().__setitem__(key, value) class MovementIdentifier(): @@ -92,42 +72,58 @@ class MovementIdentifier(): def __next__(self): raise NotImplementedError('__next__() method not implemented') - def identify(self): +class DispersionBasedMovementIdentifier(MovementIdentifier): + """Implementation of the I-DT algorithm as described in: + + Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and + saccades in eye-tracking protocols. In Proceedings of the 2000 symposium + on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, + 71-78. DOI=http://dx.doi.org/10.1145/355017.355028 + """ + + @dataclass + class DispersionBasedFixation(Fixation): + """Define dispersion based fixation as an algorithm specific fixation.""" - fixations = TimeStampedFixations() - saccades = TimeStampedSaccades() - movements = TimeStampedMovement() + dispersion: float = field(init=False) + euclidian: bool = field(default=True) - for ts, item in self: + centroid: GazePosition = field(init=False) - if isinstance(item, GazeFeatures.Fixation): + def __post_init__(self): - fixations[ts] = item + super().__post_init__() - for ts, position in item.positions.items(): + x_list = [gp[0] for (ts, gp) in list(self.positions.items())] + y_list = [gp[1] for (ts, gp) in list(self.positions.items())] - movements[ts] = Movement(item.index, type(item).__name__, position) + cx = round(numpy.mean(x_list)) + cy = round(numpy.mean(y_list)) - elif isinstance(item, GazeFeatures.Saccade): + # select dispersion algorithm + if self.euclidian: - saccades[ts] = item + c = [cx, cy] + points = numpy.column_stack([x_list, y_list]) - movements[ts] = Movement(item.index, type(item).__name__, item.start_position) - movements[ts + item.duration] = Movement(item.index, type(item).__name__, item.end_position) + dist = (points - c)**2 + dist = numpy.sum(dist, axis=1) + dist = numpy.sqrt(dist) + + self.dispersion = round(max(dist)) else: - continue - return fixations, saccades, movements + self.dispersion = (max(x_list) - min(x_list)) + (max(y_list) - min(y_list)) -class DispersionBasedMovementIdentifier(MovementIdentifier): - """Implementation of the I-DT algorithm as described in: - - Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and - saccades in eye-tracking protocols. In Proceedings of the 2000 symposium - on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, - 71-78. DOI=http://dx.doi.org/10.1145/355017.355028 - """ + self.centroid = (cx, cy) + + @dataclass + class DispersionBasedSaccade(Saccade): + """Define dispersion based saccade as an algorithm specific saccade.""" + + def __post_init__(self): + super().__post_init__() def __init__(self, ts_gaze_positions, dispersion_threshold = 10, duration_threshold = 100): @@ -140,37 +136,6 @@ class DispersionBasedMovementIdentifier(MovementIdentifier): self.__ts_gaze_positions = ts_gaze_positions.copy() self.__last_fixation = None - self.__last_fixation_ts = -1 - - self.__fixations_count = 0 - self.__saccades_count = 0 - - def __getEuclideanDispersion(self, ts_gaze_positions_list): - """Euclidian dispersion algorithm""" - - x_list = [gp[0] for (ts, gp) in ts_gaze_positions_list] - y_list = [gp[1] for (ts, gp) in ts_gaze_positions_list] - - cx = numpy.mean(x_list) - cy = numpy.mean(y_list) - c = [cx, cy] - - points = numpy.column_stack([x_list, y_list]) - - dist = (points - c)**2 - dist = numpy.sum(dist, axis=1) - dist = numpy.sqrt(dist) - - return round(max(dist)), cx, cy - - def __getDispersion(self, ts_gaze_positions_list): - """Basic dispersion algorithm""" - # TODO : allow to select this algorithm - - x_list = [gp.x for (ts, gp) in ts_gaze_positions_list] - y_list = [gp.y for (ts, gp) in ts_gaze_positions_list] - - return (max(x_list) - min(x_list)) + (max(y_list) - min(y_list)) def __iter__(self): """Movement identification generator.""" @@ -185,87 +150,71 @@ class DispersionBasedMovementIdentifier(MovementIdentifier): (ts_start, gaze_position_start) = remaining_ts_gaze_positions.pop_first() (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first() - ts_gaze_positions_list = [(ts_start, gaze_position_start)] + ts_gaze_positions = TimeStampedGazePositions() + ts_gaze_positions[ts_start] = gaze_position_start while (ts_current - ts_start) < self.__duration_threshold: - ts_gaze_positions_list.append( (ts_current, gaze_position_current) ) + ts_gaze_positions[ts_current] = gaze_position_current if len(remaining_ts_gaze_positions) > 0: (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first() else: break - # how much gaze is dispersed ? - dispersion, cx, cy = self.__getEuclideanDispersion(ts_gaze_positions_list) + # is it a new fixation ? + new_fixation = DispersionBasedMovementIdentifier.DispersionBasedFixation(ts_gaze_positions) - # little dispersion - if dispersion <= self.__dispersion_threshold: + # dispersion is small + if new_fixation.dispersion <= self.__dispersion_threshold: # remove selected gaze positions - for gp in ts_gaze_positions_list: + for gp in ts_gaze_positions: self.__ts_gaze_positions.pop_first() # are next gaze positions not too dispersed ? while len(remaining_ts_gaze_positions) > 0: # select next gaze position - ts_gaze_positions_list.append(remaining_ts_gaze_positions.pop_first()) - - new_dispersion, new_cx, new_cy = self.__getEuclideanDispersion(ts_gaze_positions_list) + ts_next, position_next = remaining_ts_gaze_positions.pop_first() + ts_gaze_positions[ts_next] = position_next - # dispersion too wide - if new_dispersion > self.__dispersion_threshold: + # how much gaze is dispersed ? + updated_fixation = DispersionBasedMovementIdentifier.DispersionBasedFixation(ts_gaze_positions) - # remove last gaze position - ts_gaze_positions_list.pop(-1) + # dispersion is becomes too wide : ignore updated fixation + if updated_fixation.dispersion > self.__dispersion_threshold: break - # store new dispersion data - dispersion = new_dispersion - cx = new_cx - cy = new_cy + # update new fixation + new_fixation = updated_fixation # remove selected gaze position self.__ts_gaze_positions.pop_first() - # we have a new fixation - ts_list = [ts for (ts, gp) in ts_gaze_positions_list] - duration = ts_list[-1] - ts_list[0] - - if duration > 0: - - # store all positions in a timestamped buffer - ts_gaze_positions = TimeStampedGazePositions() - - for (ts, gp) in ts_gaze_positions_list: - ts_gaze_positions[round(ts)] = gp - - self.__fixations_count += 1 - - new_fixation = Fixation(self.__fixations_count, round(duration), dispersion, (round(cx), round(cy)), ts_gaze_positions) - new_fixation_ts = ts_list[0] + # is the new fixation have a duration ? + if new_fixation.duration > 0: if self.__last_fixation != None: - new_saccade_ts = self.__last_fixation_ts + self.__last_fixation.duration - new_saccade_duration = new_fixation_ts - new_saccade_ts + # store start and end positions in a timestamped buffer + ts_saccade_positions = TimeStampedGazePositions() + + start_position_ts, start_position = self.__last_fixation.positions.pop_last() + ts_saccade_positions[start_position_ts] = start_position - if new_saccade_duration > 0: + end_position_ts, end_position = new_fixation.positions.pop_first() + ts_saccade_positions[end_position_ts] = end_position - start_position_ts, start_position = self.__last_fixation.positions.pop_last() - end_position_ts, end_position = new_fixation.positions.pop_first() + if end_position_ts > start_position_ts: - self.__saccades_count += 1 - - new_saccade = Saccade(self.__saccades_count, round(new_saccade_duration), start_position, end_position) + new_saccade = DispersionBasedMovementIdentifier.DispersionBasedSaccade(ts_saccade_positions) - yield round(new_saccade_ts), new_saccade + yield new_saccade self.__last_fixation = new_fixation - self.__last_fixation_ts = new_fixation_ts - yield round(new_fixation_ts), new_fixation + yield new_fixation # dispersion too wide : consider next gaze position else: @@ -393,12 +342,12 @@ class PointerBasedVisualScan(VisualScanGenerator): class FixationBasedVisualScan(VisualScanGenerator): """Build visual scan on the basis of timestamped fixations.""" - def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_fixations: TimeStampedFixations): + def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_fixations: TimeStampedMovements): super().__init__(ts_aoi_scenes) - if type(ts_fixations) != TimeStampedFixations: - raise ValueError('second argument must be a GazeFeatures.TimeStampedFixations') + if type(ts_fixations) != TimeStampedMovements: + raise ValueError('second argument must be a GazeFeatures.TimeStampedMovements') # process identification on a copy self.__ts_aoi_scenes = ts_aoi_scenes.copy() diff --git a/src/argaze/utils/export_tobii_segment_movements.py b/src/argaze/utils/export_tobii_segment_movements.py index 7222414..ee41a60 100644 --- a/src/argaze/utils/export_tobii_segment_movements.py +++ b/src/argaze/utils/export_tobii_segment_movements.py @@ -41,10 +41,11 @@ def main(): destination_path = args.segment_path - gaze_video_filepath = f'{destination_path}/movements.mp4' fixations_filepath = f'{destination_path}/movements_fixations.csv' saccades_filepath = f'{destination_path}/movements_saccades.csv' - movements_filepath = f'{destination_path}/movements.csv' + + gaze_status_filepath = f'{destination_path}/gaze_status.csv' + gaze_status_video_filepath = f'{destination_path}/gaze_status.mp4' # Load a tobii segment tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) @@ -75,29 +76,34 @@ def main(): # Start movement identification movement_identifier = GazeFeatures.DispersionBasedMovementIdentifier(ts_gaze_positions, args.dispersion_threshold, args.duration_threshold) - fixations = GazeFeatures.TimeStampedFixations() - saccades = GazeFeatures.TimeStampedSaccades() - movements = GazeFeatures.TimeStampedMovements() + fixations = GazeFeatures.TimeStampedMovements() + saccades = GazeFeatures.TimeStampedMovements() + gaze_status = GazeFeatures.TimeStampedGazeStatus() # Initialise progress bar MiscFeatures.printProgressBar(0, int(tobii_segment_video.get_duration()/1000), prefix = 'Movements identification:', suffix = 'Complete', length = 100) - for ts, item in movement_identifier: + for item in movement_identifier: + + if isinstance(item, GazeFeatures.DispersionBasedMovementIdentifier.DispersionBasedFixation): - if isinstance(item, GazeFeatures.Fixation): + start_ts, start_position = item.positions.get_first() - fixations[ts] = item + fixations[start_ts] = item for ts, position in item.positions.items(): - movements[ts] = GazeFeatures.Movement(item.index, type(item).__name__, position) + gaze_status[ts] = GazeFeatures.GazeStatus(position, 'Fixation', len(fixations)) - elif isinstance(item, GazeFeatures.Saccade): + elif isinstance(item, GazeFeatures.DispersionBasedMovementIdentifier.DispersionBasedSaccade): - saccades[ts] = item + start_ts, start_position = item.positions.get_first() + end_ts, end_position = item.positions.get_last() + + saccades[start_ts] = item - movements[ts] = GazeFeatures.Movement(item.index, type(item).__name__, item.start_position) - movements[ts + item.duration] = GazeFeatures.Movement(item.index, type(item).__name__, item.end_position) + gaze_status[start_ts] = GazeFeatures.GazeStatus(start_position, 'Saccade', len(saccades)) + gaze_status[end_ts] = GazeFeatures.GazeStatus(end_position, 'Saccade', len(saccades)) else: continue @@ -116,12 +122,12 @@ def main(): saccades.export_as_csv(saccades_filepath) print(f'Saccades saved into {saccades_filepath}') - # Export movements analysis - movements.export_as_csv(movements_filepath) - print(f'Movements saved into {movements_filepath}') + # Export gaze status analysis + gaze_status.export_as_csv(gaze_status_filepath) + print(f'Gaze status saved into {gaze_status_filepath}') # Prepare video exportation at the same format than segment video - output_video = TobiiVideo.TobiiVideoOutput(gaze_video_filepath, tobii_segment_video.get_stream()) + output_video = TobiiVideo.TobiiVideoOutput(gaze_status_video_filepath, tobii_segment_video.get_stream()) # Video and data loop try: @@ -160,8 +166,10 @@ def main(): if video_ts_ms > current_saccade_ts + current_saccade.duration: current_saccade_ts, current_saccade = saccades.pop_first() + start_ts, start_position = current_saccade.positions.pop_first() + end_ts, end_position = current_saccade.positions.pop_first() - cv.line(video_frame.matrix, current_saccade.start_position, current_saccade.end_position, (0, 0, 255), 2) + cv.line(video_frame.matrix, start_position, end_position, (0, 0, 255), 2) else: @@ -203,7 +211,7 @@ def main(): # End output video file output_video.close() - print(f'\nVideo with movements saved into {gaze_video_filepath}') + print(f'\nVideo with movements saved into {gaze_status_video_filepath}') if __name__ == '__main__': -- cgit v1.1