#!/usr/bin/env python from dataclasses import dataclass, field import math import json from argaze import DataStructures from argaze.AreaOfInterest import AOIFeatures import numpy import pandas import cv2 as cv @dataclass(frozen=True) class GazePosition(): """Define gaze position as a tuple of coordinates with accuracy.""" value: tuple[int | float] = field(default=(0, 0)) """Position's value.""" accuracy: float = field(default=0., kw_only=True) """Position's accuracy.""" def __getitem__(self, axis: int) -> int | float: """Get position value along a particular axis.""" return self.value[axis] def __iter__(self) -> iter: """Iterate over each position value axis.""" return iter(self.value) def __len__(self) -> int: """Number of axis in position value.""" return len(self.value) def __repr__(self): """String representation""" return json.dumps(self, ensure_ascii = False, default=vars) def __array__(self): """Cast as numpy array.""" return numpy.array(self.value) @property def valid(self) -> bool: """Is the accuracy not None?""" return self.accuracy is not None def draw(self, frame, color=(0, 255, 255)): """Draw gaze position point and accuracy circle.""" if self.valid: # Draw point at position cv.circle(frame, self.value, 2, color, -1) # Draw accuracy circle if self.accuracy > 0: cv.circle(frame, self.value, round(self.accuracy), color, 1) class UnvalidGazePosition(GazePosition): """Unvalid gaze position.""" def __init__(self): super().__init__((None, None), accuracy=None) class TimeStampedGazePositions(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store gaze positions.""" def __setitem__(self, key, value: GazePosition): """Force value to be GazePosition.""" assert(type(value) == GazePosition or type(value) == UnvalidGazePosition) super().__setitem__(key, value) @dataclass class Movement(): """Define abstract movement class as a buffer of timestamped positions.""" positions: TimeStampedGazePositions """All timestamp gaze positions.""" duration: float = field(init=False) """Inferred duration from first and last timestamps.""" def __post_init__(self): start_position_ts, start_position = self.positions.first end_position_ts, end_position = self.positions.last self.duration = round(end_position_ts - start_position_ts) Fixation = Movement """Define abstract fixation as movement.""" Saccade = Movement """Define abstract saccade as movement.""" class TimeStampedMovements(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store movements.""" def __setitem__(self, key, value: Movement): """Force value to inherit from Movement.""" assert(type(value).__bases__[0] == Movement) super().__setitem__(key, value) @dataclass class GazeStatus(): """Define gaze status as a position belonging to an identified and indexed movement.""" position: GazePosition """Gaze position""" movement_type: str """Movement type to which gaze position belongs.""" movement_index: int """Movement index to which gaze positon belongs.""" class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store gaze status.""" def __setitem__(self, key, value: GazeStatus): super().__setitem__(key, value) class MovementIdentifier(): """Abstract class to define what should provide a movement identifier.""" def __init__(self, ts_gaze_positions: TimeStampedGazePositions): if type(ts_gaze_positions) != TimeStampedGazePositions: raise ValueError('argument must be a TimeStampedGazePositions') def __iter__(self): raise NotImplementedError('__iter__() method not implemented') def __next__(self): raise NotImplementedError('__next__() method not implemented') class DispersionBasedMovementIdentifier(MovementIdentifier): """Implementation of the I-DT algorithm as described in: Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and saccades in eye-tracking protocols. In Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, 71-78. DOI=http://dx.doi.org/10.1145/355017.355028 """ @dataclass class DispersionBasedFixation(Fixation): """Define dispersion based fixation as an algorithm specific fixation.""" dispersion: float = field(init=False) euclidian: bool = field(default=True) centroid: tuple = field(init=False) def __post_init__(self): super().__post_init__() x_list = [gp[0] for (ts, gp) in list(self.positions.items())] y_list = [gp[1] for (ts, gp) in list(self.positions.items())] cx = round(numpy.mean(x_list)) cy = round(numpy.mean(y_list)) # select dispersion algorithm if self.euclidian: c = [cx, cy] points = numpy.column_stack([x_list, y_list]) dist = (points - c)**2 dist = numpy.sum(dist, axis=1) dist = numpy.sqrt(dist) self.dispersion = round(max(dist)) else: self.dispersion = (max(x_list) - min(x_list)) + (max(y_list) - min(y_list)) self.centroid = (cx, cy) @dataclass class DispersionBasedSaccade(Saccade): """Define dispersion based saccade as an algorithm specific saccade.""" def __post_init__(self): super().__post_init__() def __init__(self, ts_gaze_positions, dispersion_threshold = 10, duration_threshold = 100): super().__init__(ts_gaze_positions) self.__dispersion_threshold = dispersion_threshold self.__duration_threshold = duration_threshold # process identification on a copy self.__ts_gaze_positions = ts_gaze_positions.copy() self.__last_fixation = None def __iter__(self): """Movement identification generator.""" # while there are 2 gaze positions at least while len(self.__ts_gaze_positions) >= 2: # copy remaining timestamped gaze positions remaining_ts_gaze_positions = self.__ts_gaze_positions.copy() # select timestamped gaze position until a duration threshold (ts_start, gaze_position_start) = remaining_ts_gaze_positions.pop_first() # Invalid start position if not gaze_position_start.valid: self.__ts_gaze_positions.pop_first() continue ts_gaze_positions = TimeStampedGazePositions() ts_gaze_positions[ts_start] = gaze_position_start (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first() while (ts_current - ts_start) < self.__duration_threshold: # Ignore non valid position # TODO ? Consider invalid position to not break fixation ? if gaze_position_current.valid: ts_gaze_positions[ts_current] = gaze_position_current try: (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first() except: break # is it a new fixation ? new_fixation = DispersionBasedMovementIdentifier.DispersionBasedFixation(ts_gaze_positions) # dispersion is small if new_fixation.dispersion <= self.__dispersion_threshold: # remove selected gaze positions for gp in ts_gaze_positions: self.__ts_gaze_positions.pop_first() # are next gaze positions not too dispersed ? while len(remaining_ts_gaze_positions) > 0: # select next gaze position ts_next, position_next = remaining_ts_gaze_positions.pop_first() # Invalid next position if not position_next.valid: continue ts_gaze_positions[ts_next] = position_next # how much gaze is dispersed ? updated_fixation = DispersionBasedMovementIdentifier.DispersionBasedFixation(ts_gaze_positions) # dispersion is becomes too wide : ignore updated fixation if updated_fixation.dispersion > self.__dispersion_threshold: break # update new fixation new_fixation = updated_fixation # remove selected gaze position self.__ts_gaze_positions.pop_first() # is the new fixation have a duration ? if new_fixation.duration > 0: if self.__last_fixation != None: # store start and end positions in a timestamped buffer ts_saccade_positions = TimeStampedGazePositions() start_position_ts, start_position = self.__last_fixation.positions.pop_last() ts_saccade_positions[start_position_ts] = start_position end_position_ts, end_position = new_fixation.positions.pop_first() ts_saccade_positions[end_position_ts] = end_position if end_position_ts > start_position_ts: new_saccade = DispersionBasedMovementIdentifier.DispersionBasedSaccade(ts_saccade_positions) yield new_saccade self.__last_fixation = new_fixation yield new_fixation # dispersion too wide : consider next gaze position else: self.__ts_gaze_positions.pop_first() @dataclass class VisualScanStep(): """Define a visual scan step as a start timestamp, duration, the name of the area of interest and where gaze looked at in each frame during the step.""" timestamp: int duration: float area: str look_at: DataStructures.TimeStampedBuffer class VisualScanGenerator(): """Abstract class to define when an aoi starts to be looked and when it stops.""" visual_scan_steps: list def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes): if type(ts_aoi_scenes) != AOIFeatures.TimeStampedAOIScenes: raise ValueError('argument must be a TimeStampedAOIScenes') self.visual_scan_steps = [] for step in self: if step == None: continue self.visual_scan_steps.append(step) def __iter__(self): raise NotImplementedError('__iter__() method not implemented') def steps(self) -> list: """Get visual scan steps.""" return self.visual_scan_steps def as_dataframe(self) -> pandas.DataFrame: """Convert buffer as pandas dataframe.""" df = pandas.DataFrame.from_dict(self.visual_scan_steps) df.set_index('timestamp', inplace=True) df.sort_values(by=['timestamp'], inplace=True) return df def save_as_csv(self, filepath): """Write buffer content into a csv file.""" try: self.as_dataframe().to_csv(filepath, index=True) except: raise RuntimeError(f'Can\' write {filepath}') class PointerBasedVisualScan(VisualScanGenerator): """Build visual scan on the basis of which AOI are looked.""" def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_gaze_positions: TimeStampedGazePositions): # process identification on a copy self.__ts_aoi_scenes = ts_aoi_scenes.copy() self.__ts_gaze_positions = ts_gaze_positions.copy() # a dictionary to store when an aoi starts to be looked self.__step_dict = {} # build visual scan super().__init__(ts_aoi_scenes) def __iter__(self): """Visual scan generator function.""" # while there is aoi scene to process while len(self.__ts_aoi_scenes) > 0: (ts_current, aoi_scene_current) = self.__ts_aoi_scenes.pop_first() # is aoi scene a missing exception ? try: raise aoi_scene_current # when aoi scene is missing except AOIFeatures.AOISceneMissing as e: pass # when aoi scene is not missing except: try: gaze_position = self.__ts_gaze_positions[ts_current] # is aoi scene a missing exception ? raise gaze_position # when gaze position is missing except GazePositionMissing as e: pass # when there is no gaze position at current time except KeyError as e: pass # when gaze position is not missing except: for name, aoi in aoi_scene_current.items(): looked = aoi.contains_point(gaze_position) if looked: if not name in self.__step_dict.keys(): # aoi starts to be looked self.__step_dict[name] = { 'start': ts_current, 'look_at': DataStructures.TimeStampedBuffer() } # store where the aoi is looked for 4 corners aoi if len(aoi) == 4: self.__step_dict[name]['look_at'][round(ts_current)] = aoi.inner_axis(gaze_position) elif name in self.__step_dict.keys(): ts_start = self.__step_dict[name]['start'] # aoi stops to be looked yield VisualScanStep(round(ts_start), round(ts_current - ts_start), name, self.__step_dict[name]['look_at']) # forget the aoi del self.__step_dict[name] # close started steps for name, step in self.__step_dict.items(): ts_start = step['start'] # aoi stops to be looked yield VisualScanStep(round(ts_start), round(ts_current - ts_start), name, step['look_at']) class FixationBasedVisualScan(VisualScanGenerator): """Build visual scan on the basis of timestamped fixations.""" def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_fixations: TimeStampedMovements): super().__init__(ts_aoi_scenes) if type(ts_fixations) != TimeStampedMovements: raise ValueError('second argument must be a GazeFeatures.TimeStampedMovements') # process identification on a copy self.__ts_aoi_scenes = ts_aoi_scenes.copy() self.__ts_fixations = ts_fixations.copy() def __iter__(self): """Visual scan generator function.""" yield -1, None