#!/usr/bin/env python from dataclasses import dataclass import math from argaze import DataStructures from argaze.AreaOfInterest import AOIFeatures import numpy FIXATION_MAX_DURATION = 1000 @dataclass class GazePosition(): """Define gaze position.""" x: float y: float def as_tuple(self): return (self.x, self.y) class TimeStampedGazePositions(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store gaze positions.""" def __setitem__(self, key, value: GazePosition): """Force value to be a GazePosition""" if type(value) != GazePosition: raise ValueError('value must be a GazePosition') super().__setitem__(key, value) @dataclass class Fixation(): """Define fixation""" duration: float dispersion: float centroid: tuple((float, float)) class TimeStampedFixations(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store fixations.""" def __setitem__(self, key, value: Fixation): """Force value to be a Fixation""" if type(value) != Fixation: raise ValueError('value must be a Fixation') super().__setitem__(key, value) class FixationIdentifier(): """Abstract class to define what should provide a fixation identifier.""" def __init__(self, ts_gaze_positions: TimeStampedGazePositions): if type(ts_gaze_positions) != TimeStampedGazePositions: raise ValueError('argument must be a TimeStampedGazePositions') def __iter__(self): raise NotImplementedError('__iter__() method not implemented') def __next__(self): raise NotImplementedError('__next__() method not implemented') def identify(self): fixations = GazeFeatures.TimeStampedFixations() for ts, item in self: if item == None: continue fixations[ts] = item return fixations class DispersionBasedFixationIdentifier(FixationIdentifier): """Implementation of the I-DT algorithm as described in: Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and saccades in eye-tracking protocols. In Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, 71-78. DOI=http://dx.doi.org/10.1145/355017.355028 """ def __init__(self, ts_gaze_positions, dispersion_threshold = 10, duration_threshold = 100): super().__init__(ts_gaze_positions) self.__dispersion_threshold = dispersion_threshold self.__duration_threshold = duration_threshold # process identification on a copy self.__ts_gaze_positions = ts_gaze_positions.copy() def __getEuclideanDispersion(self, ts_gaze_positions_list): """Euclidian dispersion algorithm""" x_list = [gp.x for (ts, gp) in ts_gaze_positions_list] y_list = [gp.y for (ts, gp) in ts_gaze_positions_list] cx = numpy.mean(x_list) cy = numpy.mean(y_list) c = [cx, cy] points = numpy.column_stack([x_list, y_list]) dist = (points - c)**2 dist = numpy.sum(dist, axis=1) dist = numpy.sqrt(dist) return max(dist), cx, cy def __getDispersion(self, ts_gaze_positions_list): """Basic dispersion algorithm""" # TODO : allow to select this algorithm x_list = [gp.x for (ts, gp) in ts_gaze_positions_list] y_list = [gp.y for (ts, gp) in ts_gaze_positions_list] return (max(x_list) - min(x_list)) + (max(y_list) - min(y_list)) def __iter__(self): """Start fixation identification""" return self def __next__(self): # while there are 2 gaze positions at least if len(self.__ts_gaze_positions) >= 2: # copy remaining timestamped gaze positions remaining_ts_gaze_positions = self.__ts_gaze_positions.copy() # select timestamped gaze position until a duration threshold (ts_start, gaze_position_start) = remaining_ts_gaze_positions.pop_first() (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first() ts_gaze_positions_list = [(ts_start, gaze_position_start)] while (ts_current - ts_start) < self.__duration_threshold: ts_gaze_positions_list.append( (ts_current, gaze_position_current) ) if len(remaining_ts_gaze_positions) > 0: (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first() else: break # how much gaze is dispersed ? dispersion, cx, cy = self.__getEuclideanDispersion(ts_gaze_positions_list) # little dispersion if dispersion <= self.__dispersion_threshold: # remove selected gaze positions for gp in ts_gaze_positions_list: self.__ts_gaze_positions.pop_first() # are next gaze positions not too dispersed ? while len(remaining_ts_gaze_positions) > 0: # select next gaze position ts_gaze_positions_list.append(remaining_ts_gaze_positions.pop_first()) new_dispersion, new_cx, new_cy = self.__getEuclideanDispersion(ts_gaze_positions_list) # dispersion too wide if new_dispersion > self.__dispersion_threshold: # remove last gaze position ts_gaze_positions_list.pop(-1) break # store new dispersion data dispersion = new_dispersion cx = new_cx cy = new_cy # remove selected gaze position self.__ts_gaze_positions.pop_first() # we have a new fixation ts_list = [ts for (ts, gp) in ts_gaze_positions_list] duration = ts_list[-1] - ts_list[0] if duration > FIXATION_MAX_DURATION: duration = FIXATION_MAX_DURATION if duration > 0: # return timestamp and fixation return ts_list[0], Fixation(duration, dispersion, (cx, cy)) return -1, None # dispersion too wide : consider next gaze position else: self.__ts_gaze_positions.pop_first() # if no fixation found, go to next return -1, None else: raise StopIteration return -1, None @dataclass class VisualScanStep(): """Define a visual scan step as a duration, the name of the area of interest and where gaze looked at in each frame during the step.""" duration: float area: str look_at: DataStructures.TimeStampedBuffer class TimeStampedVisualScanSteps(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store visual scan steps.""" def __setitem__(self, key, value: VisualScanStep): """Force value to be a VisualScanStep""" if type(value) != VisualScanStep: raise ValueError('value must be a VisualScanStep') super().__setitem__(key, value) class VisualScanGenerator(): """Abstract class to define when an aoi starts to be looked and when it stops.""" def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes): if type(ts_aoi_scenes) != AOIFeatures.TimeStampedAOIScenes: raise ValueError('argument must be a TimeStampedAOIScenes') def __iter__(self): raise NotImplementedError('__iter__() method not implemented') def build(self): visual_scan_steps = TimeStampedVisualScanSteps() for ts, step in self: if step == None: continue visual_scan_steps[ts] = step return TimeStampedVisualScanSteps(sorted(visual_scan_steps.items())) class PointerBasedVisualScan(VisualScanGenerator): """Build visual scan on the basis of which AOI are looked.""" def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_gaze_positions: TimeStampedGazePositions): super().__init__(ts_aoi_scenes) # process identification on a copy self.__ts_aoi_scenes = ts_aoi_scenes.copy() self.__ts_gaze_positions = ts_gaze_positions.copy() # a dictionary to store when an aoi starts to be looked self.__step_dict = {} def __iter__(self): """Visual scan generator function.""" # while there is aoi scene to process while len(self.__ts_aoi_scenes) > 0: (ts_current, aoi_scene_current) = self.__ts_aoi_scenes.pop_first() try: gaze_position = self.__ts_gaze_positions[ts_current] for name, aoi in aoi_scene_current.areas.items(): looked = aoi.looked(gaze_position) if looked: if not name in self.__step_dict.keys(): # aoi starts to be looked self.__step_dict[name] = { 'start': ts_current, 'look_at': DataStructures.TimeStampedBuffer() } # store where the aoi is looked self.__step_dict[name]['look_at'][ts_current] = aoi.look_at(gaze_position).tolist() elif name in self.__step_dict.keys(): ts_start = self.__step_dict[name]['start'] # aoi stops to be looked yield ts_start, VisualScanStep(ts_current - ts_start, name, self.__step_dict[name]['look_at']) # forget the aoi del self.__step_dict[name] # ignore missing gaze position except KeyError: pass class FixationBasedVisualScan(VisualScanGenerator): """Build visual scan on the basis of timestamped fixations.""" def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_fixations: TimeStampedFixations): super().__init__(ts_aoi_scenes) if type(ts_fixations) != TimeStampedFixations: raise ValueError('second argument must be a GazeFeatures.TimeStampedFixations') # process identification on a copy self.__ts_aoi_scenes = ts_aoi_scenes.copy() self.__ts_fixations = ts_fixations.copy() def __iter__(self): """Visual scan generator function.""" yield -1, None