#!/usr/bin/env python from typing import TypeVar, Tuple, Any from dataclasses import dataclass, field import math import ast import json from argaze import DataStructures import numpy import pandas import cv2 as cv @dataclass(frozen=True) class GazePosition(): """Define gaze position as a tuple of coordinates with precision.""" value: tuple[int | float] = field(default=(0, 0)) """Position's value.""" precision: float = field(default=0., kw_only=True) """Position's precision represents the radius of a circle around \ this gaze position value where other same gaze position measurements could be.""" def __getitem__(self, axis: int) -> int | float: """Get position value along a particular axis.""" return self.value[axis] def __iter__(self) -> iter: """Iterate over each position value axis.""" return iter(self.value) def __len__(self) -> int: """Number of axis in position value.""" return len(self.value) def __repr__(self): """String representation""" return json.dumps(self, ensure_ascii = False, default=vars) def __array__(self): """Cast as numpy array.""" return numpy.array(self.value) @property def valid(self) -> bool: """Is the precision not None?""" return self.precision is not None def overlap(self, gaze_position, both=False) -> float: """Does this gaze position overlap another gaze position considering its precision? Set both to True to test if the other gaze position overlaps this one too.""" dist = (self.value[0] - gaze_position.value[0])**2 + (self.value[1] - gaze_position.value[1])**2 dist = numpy.sqrt(dist) if both: return dist < min(self.precision, gaze_position.precision) else: return dist < self.precision def draw(self, frame, color=(0, 255, 255), draw_precision=True): """Draw gaze position point and precision circle.""" if self.valid: int_value = (int(self.value[0]), int(self.value[1])) # Draw point at position cv.circle(frame, int_value, 2, color, -1) # Draw precision circle if self.precision > 0 and draw_precision: cv.circle(frame, int_value, round(self.precision), color, 1) class UnvalidGazePosition(GazePosition): """Unvalid gaze position.""" def __init__(self, message=None): self.message = message super().__init__((None, None), precision=None) TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeStampedGazePositions") # Type definition for type annotation convenience class TimeStampedGazePositions(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store gaze positions.""" def __setitem__(self, key, value: GazePosition|dict): """Force GazePosition storage.""" # Convert dict into GazePosition if type(value) == dict: assert(set(['value', 'precision']).issubset(value.keys())) if 'message' in value.keys(): value = UnvalidGazePosition(value['message']) else: value = GazePosition(value['value'], precision=value['precision']) assert(type(value) == GazePosition or type(value) == UnvalidGazePosition) super().__setitem__(key, value) @classmethod def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType: """Create a TimeStampedGazePositionsType from .json file.""" with open(json_filepath, encoding='utf-8') as ts_buffer_file: json_buffer = json.load(ts_buffer_file) return TimeStampedGazePositions({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement") # Type definition for type annotation convenience @dataclass(frozen=True) class GazeMovement(): """Define abstract gaze movement class as a buffer of timestamped positions.""" positions: TimeStampedGazePositions """All timestamp gaze positions.""" duration: float = field(init=False) """Inferred duration from first and last timestamps.""" distance: float = field(init=False) """Inferred distance from first and last positions.""" def __post_init__(self): start_position_ts, start_position = self.positions.first end_position_ts, end_position = self.positions.last # Update frozen duration attribute object.__setattr__(self, 'duration', end_position_ts - start_position_ts) _, start_position = self.positions.first _, end_position = self.positions.last distance = numpy.linalg.norm( numpy.array(start_position.value) - numpy.array(end_position.value)) # Update frozen distance attribute object.__setattr__(self, 'distance', distance) def __str__(self) -> str: """String display""" output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self.positions)}' for ts, position in self.positions.items(): output += f'\n\t{ts}:\n\t\tvalue={position.value},\n\t\taccurracy={position.precision}' return output class Fixation(GazeMovement): """Define abstract fixation as gaze movement.""" def __post_init__(self): super().__post_init__() class Saccade(GazeMovement): """Define abstract saccade as gaze movement.""" def __post_init__(self): super().__post_init__() TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeStampedGazeMovements") # Type definition for type annotation convenience class TimeStampedGazeMovements(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store gaze movements.""" def __setitem__(self, key, value: GazeMovement): """Force value to be or inherit from GazeMovement.""" assert(isinstance(value, GazeMovement) or type(value).__bases__[0] == Fixation or type(value).__bases__[0] == Saccade) super().__setitem__(key, value) def __str__(self): output = '' for ts, item in self.items(): output += f'\n{item}' return output GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus") # Type definition for type annotation convenience @dataclass(frozen=True) class GazeStatus(GazePosition): """Define gaze status as a gaze position belonging to an identified and indexed gaze movement.""" movement_type: str = field(kw_only=True) """GazeMovement type to which gaze position belongs.""" movement_index: int = field(kw_only=True) """GazeMovement index to which gaze positon belongs.""" @classmethod def from_position(cls, gaze_position: GazePosition, movement_type: str, movement_index: int) -> GazeStatusType: """Initialize from a gaze position instance.""" return cls(gaze_position.value, precision=gaze_position.precision, movement_type=movement_type, movement_index=movement_index) TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus") # Type definition for type annotation convenience class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer): """Define timestamped buffer to store gaze status.""" def __setitem__(self, key, value: GazeStatus): super().__setitem__(key, value) class GazeMovementIdentifier(): """Abstract class to define what should provide a gaze movement identifier.""" def identify(self, ts, gaze_position, terminate=False) -> GazeMovementType: """Identify gaze movement from successive timestamped gaze positions. The optional *terminate* argument allows to notify identification algorithm that given gaze position will be the last one. """ raise NotImplementedError('identify() method not implemented') def browse(self, ts_gaze_positions: TimeStampedGazePositions) -> Tuple[TimeStampedGazeMovementsType, TimeStampedGazeMovementsType, TimeStampedGazeStatusType]: """Identify fixations and saccades browsing timestamped gaze positions.""" assert(type(ts_gaze_positions) == TimeStampedGazePositions) ts_fixations = TimeStampedGazeMovements() ts_saccades = TimeStampedGazeMovements() ts_status = TimeStampedGazeStatus() # Get last ts to terminate identification on last gaze position last_ts, _ = ts_gaze_positions.last # Iterate on gaze positions for ts, gaze_position in ts_gaze_positions.items(): gaze_movement = self.identify(ts, gaze_position, terminate=(ts == last_ts)) if isinstance(gaze_movement, Fixation): start_ts, start_position = gaze_movement.positions.first ts_fixations[start_ts] = gaze_movement for ts, position in gaze_movement.positions.items(): ts_status[ts] = GazeStatus.from_position(position, 'Fixation', len(ts_fixations)) elif isinstance(gaze_movement, Saccade): start_ts, start_position = gaze_movement.positions.first ts_saccades[start_ts] = gaze_movement for ts, position in gaze_movement.positions.items(): ts_status[ts] = GazeStatus.from_position(position, 'Saccade', len(ts_saccades)) else: continue return ts_fixations, ts_saccades, ts_status VisualScanStepType = TypeVar('VisualScanStep', bound="VisualScanStep") # Type definition for type annotation convenience class VisualScanStepError(Exception): """Exception raised at VisualScanStepError creation if a visual scan step doesn't start by a fixation or doesn't end by a saccade.""" def __init__(self, message, aoi=''): super().__init__(message) self.aoi = aoi @dataclass(frozen=True) class VisualScanStep(): """Define a visual scan step as a set of successive gaze movements onto a same AOI. .. warning:: Visual scan step have to start by a fixation and then end by a saccade.""" movements: TimeStampedGazeMovements """All movements over an AOI and the last saccade that comes out.""" aoi: str = field(default='') """AOI name.""" #identifier: int = field(default=None) """AOI identifier.""" def __post_init__(self): # First movement have to be a fixation if type(self.first_fixation).__bases__[0] != Fixation and type(self.first_fixation) != Fixation: raise VisualScanStepError('First step movement is not a fixation', self.aoi) # Last movement have to be a saccade if type(self.last_saccade).__bases__[0] != Saccade and type(self.last_saccade) != Saccade: raise VisualScanStepError('Last step movement is not a saccade', self.aoi) @property def first_fixation(self): """First fixation on AOI.""" _, first_movement = self.movements.first return first_movement @property def last_saccade(self): """Last saccade that comes out AOI.""" _, last_movement = self.movements.last return last_movement @property def duration(self): """Time spent on AOI.""" # Timestamp of first position of first fixation first_ts, _ = self.first_fixation.positions.first # Timestamp of first position of last saccade last_ts, _ = self.last_saccade.positions.first return last_ts - first_ts VisualScanPathType = TypeVar('VisualScanPathType', bound="VisualScanPathType") # Type definition for type annotation convenience class VisualScanPath(list): """List of visual scan steps over successive aoi.""" def __init__(self): super().__init__() self.__movements = TimeStampedGazeMovements() self.__last_aoi = '' def __repr__(self): """String representation.""" return str(super()) def __str__(self) -> str: """String display.""" output = '' for step in self: output += f'> {step.aoi} ' return output def append_saccade(self, ts, saccade): """Append new saccade to visual scan path.""" # Ignore saccade if no fixation have been stored before if len(self.__movements) > 0: self.__movements[ts] = saccade def append_fixation(self, ts, fixation, looked_aoi: str) -> bool: """Append new fixation to visual scan path and return last new visual scan step if one have been created. .. warning:: It could raise VisualScanStepError""" # Is it fixation onto a new aoi? if looked_aoi != self.__last_aoi and len(self.__movements) > 0: try: # Edit new step new_step = VisualScanStep(self.__movements, self.__last_aoi) # Append new step super().append(new_step) # Return new step return new_step finally: # Clear movements self.__movements = TimeStampedGazeMovements() # Append new fixation self.__movements[ts] = fixation # Remember new aoi self.__last_aoi = looked_aoi else: # Append new fixation self.__movements[ts] = fixation # Remember aoi self.__last_aoi = looked_aoi return None class VisualScanPathAnalyzer(): """Abstract class to define what should provide a visual scan path analyzer.""" def analyze(self, visual_scan_path: VisualScanPathType) -> Any: """Analyze visual scan path.""" raise NotImplementedError('analyze() method not implemented')