"""Generic gaze data and class definitions.""" """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import json import math from typing import Self import cv2 import numpy import pandas from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures @DataFeatures.timestamp class GazePosition(tuple): """Define gaze position as a tuple of coordinates with precision. Parameters: precision: the radius of a circle around value where other same gaze position measurements could be. message: a string to describe why the position is what it is. """ def __new__(cls, position: tuple = (), **kwargs): return tuple.__new__(cls, position) def __init__(self, position: tuple = (), precision: int | float = None, message: str = None): self.__precision = precision self.__message = message @property def value(self): """Get position's tuple value.""" return tuple(self) @property def precision(self): """Get position's precision.""" return self.__precision @property def message(self): """Get position's message.""" return self.__message @classmethod def from_dict(cls, position_data: dict) -> Self: if 'value' in position_data.keys(): value = position_data.pop('value') return GazePosition(value, **position_data) else: return GazePosition(**position_data) def __bool__(self) -> bool: """Is the position value valid?""" return len(self) > 0 def __repr__(self): """String representation""" return json.dumps(DataFeatures.as_dict(self)) def __add__(self, position: Self) -> Self: """Add position. !!! note The returned position precision is the maximal precision. !!! note The returned position timestamp is the self object timestamp. """ if self.__precision is not None and position.precision is not None: return GazePosition(tuple(numpy.array(self) + numpy.array(position)), precision=max(self.__precision, position.precision), timestamp=self.timestamp) else: return GazePosition(tuple(numpy.array(self) + numpy.array(position)), timestamp=self.timestamp) __radd__ = __add__ def __sub__(self, position: Self) -> Self: """Subtract position. !!! note The returned position precision is the maximal precision. !!! note The returned position timestamp is the self object timestamp. """ if self.__precision is not None and position.precision is not None: return GazePosition(tuple(numpy.array(self) - numpy.array(position)), precision=max(self.__precision, position.precision), timestamp=self.timestamp) else: return GazePosition(tuple(numpy.array(self) - numpy.array(position)), timestamp=self.timestamp) def __rsub__(self, position: Self) -> Self: """Reversed subtract position. !!! note The returned position precision is the maximal precision. !!! note The returned position timestamp is the self object timestamp. """ if self.__precision is not None and position.precision is not None: return GazePosition(tuple(numpy.array(position) - numpy.array(self)), precision=max(self.__precision, position.precision), timestamp=self.timestamp) else: return GazePosition(tuple(numpy.array(position) - numpy.array(self)), timestamp=self.timestamp) def __mul__(self, factor: int | float | tuple) -> Self: """Multiply position by a factor. !!! note The returned position precision is also multiplied by the factor. !!! note The returned position timestamp is the self object timestamp. """ return GazePosition(tuple(numpy.array(self) * factor), precision=self.__precision * factor if self.__precision is not None else None, timestamp=self.timestamp) def __truediv__(self, factor: int | float | tuple) -> Self: """divide position by a factor. !!! note The returned position precision is also divided by the factor. !!! note The returned position timestamp is the self object timestamp. """ return GazePosition(tuple(numpy.array(self) / factor), precision=self.__precision / factor if self.__precision is not None else None, timestamp=self.timestamp) def __pow__(self, factor: int | float) -> Self: """Power position by a factor. !!! note The returned position precision is also powered by the factor. !!! note The returned position timestamp is the self object timestamp. """ return GazePosition(tuple(numpy.array(self) ** factor), precision=self.__precision ** factor if self.__precision is not None else None, timestamp=self.timestamp) def distance(self, gaze_position) -> float: """Distance to another gaze positions.""" distance = (self[0] - gaze_position[0]) ** 2 + (self[1] - gaze_position[1]) ** 2 distance = numpy.sqrt(distance) return distance def overlap(self, gaze_position, both=False) -> float: """Does this gaze position overlap another gaze position considering its precision? Set both to True to test if the other gaze position overlaps this one too.""" distance = numpy.sqrt(numpy.sum((self - gaze_position) ** 2)) if both: return distance < min(self.__precision, gaze_position.precision) else: return distance < self.__precision def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True): """Draw gaze position point and precision circle.""" if self: int_value = (int(self[0]), int(self[1])) # Draw point at position if required if color is not None: cv2.circle(image, int_value, size, color, -1) # Draw precision circle if self.__precision is not None and draw_precision: cv2.circle(image, int_value, round(self.__precision), color, 1) class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze positions into a list.""" def __init__(self, gaze_positions=None): if gaze_positions is None: gaze_positions = [] DataFeatures.TimestampedObjectsList.__init__(self, GazePosition, gaze_positions) def values(self) -> list: """Get all timestamped position values as list of tuple.""" return [tuple(ts_position) for ts_position in self] ''' Is it still needed as there is a TimestampedObjectsList.from_json method? @classmethod def from_json(self, json_filepath: str) -> TimeStampedGazePositions: """Create a TimeStampedGazePositions from .json file.""" with open(json_filepath, encoding='utf-8') as ts_positions_file: json_positions = json.load(ts_positions_file) return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions}) ''' def centroid(self) -> numpy.array: """Calculate positions' centroid. Returns: centroid: centroid of all positions. """ positions_array = numpy.asarray(self.values()) centroid = numpy.mean(positions_array, axis=0) return (centroid[0], centroid[1]) def distances(self, point: numpy.array) -> numpy.array: """Calculate all positions' distances to a point. Returns: distances: array with all distances to the point. """ positions_array = numpy.asarray(self.values()) distances_array = numpy.sqrt(numpy.sum((positions_array - point)**2, axis=1)) return distances_array @classmethod def from_dataframe(cls, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> Self: """Create a TimeStampedGazePositions from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). Parameters: dataframe: timestamp: specific timestamp column label. x: specific x column label. y: specific y column label. precision: specific precision column label if exist. message: specific message column label if exist. """ # Copy columns columns = (timestamp, x, y) if precision is not None: columns += (precision,) if message is not None: columns += (message,) df = dataframe.loc[:, columns] # Merge x and y columns into one 'value' column df['value'] = tuple(zip(df[x], df[y])) df.drop(columns=[x, y], inplace=True, axis=1) # Replace tuple values containing NaN values by () df['value'] = df.apply(lambda row: () if pandas.isnull(list(row.value)).any() else row.value, axis=True) # Handle precision data if precision: # Rename precision column into 'precision' column df.rename(columns={precision: 'precision'}, inplace=True) else: # Append a None precision column df['precision'] = df.apply(lambda row: None, axis=True) # Handle message data if message: # Rename message column into 'message' column df.rename(columns={precision: 'message'}, inplace=True) else: # Append a None message column df['message'] = df.apply(lambda row: None, axis=True) # Rename timestamp column into 'timestamp' column df.rename(columns={timestamp: 'timestamp'}, inplace=True) # Filter duplicate timestamps df = df[df.timestamp.duplicated() == False] # Create timestamped gaze positions return TimeStampedGazePositions(df.apply( lambda row: GazePosition(row.value, precision=row.precision, message=row.message, timestamp=row.timestamp), axis=True)) class GazePositionCalibrationFailed(Exception): """Exception raised by GazePositionCalibrator.""" def __init__(self, message): super().__init__(message) class GazePositionCalibrator(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a gaze position calibrator algorithm.""" # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): pass def store(self, observed_gaze_position: GazePosition, expected_gaze_position: GazePosition): """Store observed and expected gaze positions. Parameters: observed_gaze_position: where gaze position actually is expected_gaze_position: where gaze position should be """ raise NotImplementedError('calibrate() method not implemented') def reset(self): """Reset observed and expected gaze positions.""" raise NotImplementedError('reset() method not implemented') def calibrate(self) -> any: """Process calibration from observed and expected gaze positions. Returns: calibration outputs: any data returned to assess calibration """ raise NotImplementedError('terminate() method not implemented') def apply(self, observed_gaze_position: GazePosition) -> GazePosition: """Apply calibration onto observed gaze position. Parameters: observed_gaze_position: where gaze position actually is Returns: expected_gaze_position: where gaze position should be if the calibrator is ready else, observed gaze position """ raise NotImplementedError('apply() method not implemented') def draw(self, image: numpy.array, **kwargs): """Draw calibration into image. Parameters: image: where to draw """ raise NotImplementedError('draw() method not implemented') def is_calibrating(self) -> bool: """Is the calibration running?""" raise NotImplementedError('ready getter not implemented') @DataFeatures.timestamp class GazeMovement(TimeStampedGazePositions): """Define abstract gaze movement class as timestamped gaze positions list. !!! note Gaze movement timestamp is always equal to its first position timestamp. Parameters: positions: timestamp gaze positions. finished: is the movement finished? message: a string to describe why the movement is what it is. """ def __new__(cls, positions: TimeStampedGazePositions = None, **kwargs): # noinspection PyArgumentList return TimeStampedGazePositions.__new__(cls, positions) def __init__(self, positions: TimeStampedGazePositions = None, finished: bool = False, message: str = None): """Initialize GazeMovement""" TimeStampedGazePositions.__init__(self, positions) self.__finished = finished self.__message = message def is_finished(self) -> bool: """Is the movement finished?""" return self.__finished def finish(self) -> Self: """Set gaze movement as finished""" self.__finished = True return self @property def message(self): """Get movement's message.""" return self.__message @property def amplitude(self): """Get inferred amplitude from first and last positions.""" if self: return numpy.linalg.norm(self[0] - self[-1]) else: return 0 def __str__(self) -> str: """String display""" if self: output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.is_finished()}' for position in self: output += f'\n\t{position.timestamp}:\n\t\tvalue={position},\n\t\tprecision={position.precision}' else: output = f'{type(self)}' return output def draw_positions(self, image: numpy.array, position_color: tuple = None, line_color: tuple = None): """Draw gaze movement positions with line between each position. Parameters: image: where to draw position_color: color of position point line_color: color of line between each position """ positions = self.copy() while len(positions) >= 2: start_gaze_position = positions.pop(0) next_gaze_position = positions[0] # Draw line between positions if required if line_color is not None: cv2.line(image, (int(start_gaze_position[0]), int(start_gaze_position[1])), (int(next_gaze_position[0]), int(next_gaze_position[1])), line_color, 1) # Draw position if required if position_color is not None: start_gaze_position.draw(image, position_color, draw_precision=False) def draw(self, image: numpy.array, **kwargs): """Draw gaze movement into image.""" raise NotImplementedError('draw() method not implemented') class Fixation(GazeMovement): """Define abstract fixation as gaze movement.""" def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): super().__init__(positions, finished, message, **kwargs) self._focus = () @property def focus(self) -> tuple: """Get representative position of the fixation.""" return self._focus def merge(self, fixation) -> Self: """Merge another fixation into this fixation.""" raise NotImplementedError('merge() method not implemented') def is_fixation(gaze_movement): """Is a gaze movement a fixation?""" return type(gaze_movement).__bases__[0] == Fixation or type(gaze_movement) == Fixation class Saccade(GazeMovement): """Define abstract saccade as gaze movement.""" def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): super().__init__(positions, finished, message, **kwargs) def is_saccade(gaze_movement): """Is a gaze movement a saccade?""" return type(gaze_movement).__bases__[0] == Saccade or type(gaze_movement) == Saccade class TimeStampedGazeMovements(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze movements into a list""" def __init__(self, gaze_movements: list = []): DataFeatures.TimestampedObjectsList.__init__(self, GazeMovement, gaze_movements) @DataFeatures.timestamp class GazeStatus(list): """Define gaze status as a list of 1 or 2 (index, GazeMovement) tuples. Parameters: position: the position that the status represents. """ def __init__(self, position: GazePosition): self.timestamp = position.timestamp self.__position = position @property def position(self) -> GazePosition: """Get gaze status position.""" return self.__position def append(self, movement_index: int, movement_type: type): """Append movement index and type.""" super().append((movement_index, movement_type)) class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze status into a list.""" def __init__(self): super().__init__(GazeStatus) class GazeMovementIdentifier(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a gaze movement identifier.""" # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): pass @DataFeatures.PipelineStepMethod def identify(self, timestamped_gaze_position: GazePosition, terminate: bool = False) -> GazeMovement: """Identify gaze movement from successive timestamped gaze positions. !!! warning "Mandatory" Each identified gaze movement have to share its first/last gaze position with previous/next gaze movement. Parameters: timestamped_gaze_position: new gaze position from where identification have to be done considering former gaze positions. terminate: allows to notify identification algorithm that given gaze position will be the last one. Returns: gaze_movement: identified gaze movement once it is finished otherwise it returns empty gaze movement at least. """ raise NotImplementedError('identify() method not implemented') def current_gaze_movement(self) -> GazeMovement: """Get the current identified gaze movement (finished or in progress) if it exists otherwise, an empty gaze movement.""" raise NotImplementedError('current_gaze_movement getter not implemented') def current_fixation(self) -> Fixation: """Get the current identified fixation (finished or in progress) if it exists otherwise, an empty gaze movement.""" raise NotImplementedError('current_fixation getter not implemented') def current_saccade(self) -> Saccade: """Get the current identified saccade (finished or in progress) if it exists otherwise, an empty gaze movement.""" raise NotImplementedError('current_saccade getter not implemented') def browse(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[ TimeStampedGazeMovements, TimeStampedGazeMovements, TimeStampedGazeStatus]: """Identify fixations and saccades browsing timestamped gaze positions. Returns: timestamped_fixations: all fixations stored by timestamped. timestamped_saccades: all saccades stored by timestamped. timestamped_gaze_status: all gaze status stored by timestamped. """ assert (type(ts_gaze_positions) == TimeStampedGazePositions) ts_fixations = TimeStampedGazeMovements() ts_saccades = TimeStampedGazeMovements() ts_status = TimeStampedGazeStatus() # Get last ts to terminate identification on last gaze position last_ts = ts_gaze_positions[-1].timestamp # Iterate on gaze positions for gaze_position in ts_gaze_positions: gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts)) if gaze_movement: # First gaze movement position is always shared with previous gaze movement for movement_position in gaze_movement: # Is a status already exist for this position? gaze_status = ts_status.look_for(movement_position.timestamp) if not gaze_status: gaze_status = GazeStatus(movement_position) ts_status.append(gaze_status) gaze_status.append(len(ts_fixations), type(gaze_movement)) # Store gaze movement into the appropriate list if is_fixation(gaze_movement): ts_fixations.append(gaze_movement) elif is_saccade(gaze_movement): ts_saccades.append(gaze_movement) return ts_fixations, ts_saccades, ts_status def __call__(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[int | float, GazeMovement]: """GazeMovement generator. Parameters: ts_gaze_positions: timestamped gaze positions to process. Returns: timestamp: first gaze position date of identified gaze movement gaze_movement: identified gaze movement once it is finished """ assert (type(ts_gaze_positions) == TimeStampedGazePositions) # Get last ts to terminate identification on last gaze position last_ts = ts_gaze_positions[-1] # Iterate on gaze positions for gaze_position in ts_gaze_positions: gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts)) if gaze_movement: yield gaze_movement class ScanStepError(Exception): """Exception raised at ScanStep creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade.""" def __init__(self, message): super().__init__(message) class ScanStep(): """Define a scan step as a fixation and a consecutive saccade. Parameters: first_fixation: a fixation that comes before the next saccade. last_saccade: a saccade that comes after the previous fixation. !!! warning Scan step have to start by a fixation and then end by a saccade. """ def __init__(self, first_fixation: Fixation, last_saccade: Saccade): self.__first_fixation = first_fixation self.__last_saccade = last_saccade # First movement have to be a fixation if not is_fixation(self.__first_fixation): raise ScanStepError('First step movement is not a fixation') # Last movement have to be a saccade if not is_saccade(self.__last_saccade): raise ScanStepError('Last step movement is not a saccade') @property def first_fixation(self): """Get scan step first fixation.""" return self.__first_fixation @property def last_saccade(self): """Get scan step last saccade.""" return self.__last_saccade @property def fixation_duration(self) -> int | float: """Time spent on AOI Returns: fixation duration """ return self.__first_fixation.duration @property def duration(self) -> int | float: """Time spent on AOI and time spent to go to next AOI Returns: duration """ return self.__first_fixation.duration + self.__last_saccade.duration class ScanPath(list): """List of scan steps.""" def __init__(self, duration_max: int | float = 0): super().__init__() self.__duration_max = duration_max self.__last_fixation = None self.__duration = 0 @property def duration_max(self) -> float: """Duration from which older scan steps are removed each time new scan steps are added. 0 means no maximal duration.""" return self.__duration_max @duration_max.setter def duration_max(self, duration_max: float): self.__duration_max = duration_max @property def duration(self) -> int | float: """Sum of all scan steps duration Returns: duration """ return self.__duration def __check_duration(self): """Constrain path duration to maximal duration.""" if self.__duration_max > 0: while self.__duration > self.__duration_max: oldest_step = self.pop(0) self.__duration -= oldest_step.duration def append_saccade(self, saccade) -> ScanStep: """Append new saccade to scan path and return last new scan step if one have been created.""" # Ignore saccade if no fixation came before if self.__last_fixation != None: try: # Edit new step new_step = ScanStep(self.__last_fixation, saccade) # Append new step super().append(new_step) # Update duration self.__duration += new_step.duration # Constrain path duration to maximal duration self.__check_duration() # Return new step return new_step finally: # Clear last fixation self.__last_fixation = None def append_fixation(self, fixation): """Append new fixation to scan path. !!! warning Consecutive fixations are ignored keeping the last fixation""" self.__last_fixation = fixation def draw(self, image: numpy.array, draw_fixations: dict = None, draw_saccades: dict = None, deepness: int = 0): """Draw scan path into image. Parameters: image: where to draw draw_fixations: Fixation.draw parameters (which depends on the loaded gaze movement identifier module, if None, no fixation is drawn) draw_saccades: Saccade.draw parameters (which depends on the loaded gaze movement identifier module, if None, no saccade is drawn) deepness: number of steps back to draw """ for step in self[-deepness:]: # Draw fixation if required if draw_fixations is not None: step.first_fixation.draw(image, **draw_fixations) # Draw saccade if required if draw_saccades is not None: step.last_saccade.draw(image, **draw_saccades) class ScanPathAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a scan path analyzer.""" # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if isinstance(value, property) and value.fset is None] def analysis(self) -> DataFeatures.DataDictionary: """Get all scan path analyzer analysis as data dictionary.""" return DataFeatures.DataDictionary({a: getattr(self, a) for a in self.__analysis}) @DataFeatures.PipelineStepMethod def analyze(self, scan_path: ScanPath): """Analyze scan path.""" raise NotImplementedError('analyze() method not implemented') class AOIMatcher(DataFeatures.PipelineStepObject): """Abstract class to define what should provide an AOI matcher algorithm.""" # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): self.__exclude = [] @property def exclude(self): """List of AOI to exclude from matching.""" return self.__exclude @exclude.setter def exclude(self, exclude: list[str]): self.__exclude = exclude def match(self, gaze_movement: GazeMovement, aoi_scene: AOIFeatures.AOIScene) -> tuple[ str, AOIFeatures.AreaOfInterest]: """Which AOI is looked in the scene?""" raise NotImplementedError('match() method not implemented') def draw(self, image: numpy.array, aoi_scene: AOIFeatures.AOIScene): """Draw matching into image. Parameters: image: where to draw aoi_scene: to refresh looked aoi if required """ raise NotImplementedError('draw() method not implemented') def looked_aoi(self) -> AOIFeatures.AreaOfInterest: """Get most likely looked aoi.""" raise NotImplementedError('looked_aoi() method not implemented') def looked_aoi_name(self) -> str: """Get most likely looked aoi name.""" raise NotImplementedError('looked_aoi_name() method not implemented') class AOIScanStepError(Exception): """ Exception raised at AOIScanStep creation if an aoi scan step doesn't start by a fixation or doesn't end by a saccade. """ def __init__(self, message, aoi=''): super().__init__(message) self.aoi = aoi class AOIScanStep(): """Define an aoi scan step as a set of successive gaze movements onto a same AOI. Parameters: movements: all movements over an AOI and the last saccade that comes out. aoi: AOI name letter: AOI unique letter to ease sequence analysis. !!! warning Aoi scan step have to start by a fixation and then end by a saccade. """ def __init__(self, movements: TimeStampedGazeMovements, aoi: str = '', letter: str = ''): self.__movements = movements self.__aoi = aoi self.__letter = letter # First movement have to be a fixation if not is_fixation(self.first_fixation): raise AOIScanStepError('First step movement is not a fixation', self.aoi) # Last movement have to be a saccade if not is_saccade(self.last_saccade): raise AOIScanStepError('Last step movement is not a saccade', self.aoi) @property def movements(self): """Get AOI scan step movements.""" return self.__movements @property def aoi(self): """Get AOI scan step aoi.""" return self.__aoi @property def letter(self): """Get AOI scan step letter.""" return self.__letter @property def first_fixation(self): """First fixation on AOI.""" return self.movements[0] @property def last_saccade(self): """Last saccade that comes out AOI.""" return self.movements[-1] @property def fixation_duration(self) -> int | float: """Time spent on AOI Returns: fixation duration """ return self.last_saccade[0].timestamp - self.first_fixation[0].timestamp @property def duration(self) -> int | float: """Time spent on AOI and time spent to go to next AOI Returns: duration """ return self.last_saccade[-1].timestamp - self.first_fixation[0].timestamp # Define strings for outside AOI case OutsideAOI = 'GazeFeatures.OutsideAOI' class AOIScanPath(list): """List of aoi scan steps over successive aoi.""" def __init__(self, expected_aoi: list[str] = [], duration_max: int | float = 0): super().__init__() self.__expected_aoi = expected_aoi self.__duration_max = duration_max self.__duration = 0 self.clear() @property def expected_aoi(self): """List of all expected aoi.""" return self.__expected_aoi @expected_aoi.setter def expected_aoi(self, expected_aoi: list[str] = []): """Edit list of all expected aoi. !!! warning This will clear the AOIScanPath """ # Check expected aoi are not the same as previous ones if len(expected_aoi) == len(self.__expected_aoi[1:]): equal = [a == b for a, b in zip(expected_aoi, self.__expected_aoi[1:])] if all(equal): return # Otherwise, update expected aoi self.__expected_aoi = [OutsideAOI] self.__expected_aoi += expected_aoi self.clear() @property def duration_max(self) -> float: """Duration from which older aoi scan steps are removed each time new aoi scan steps are added. 0 means no maximal duration.""" return self.__duration_max @duration_max.setter def duration_max(self, duration_max: float): self.__duration_max = duration_max @property def duration(self) -> float: """Sum of all scan steps duration""" return self.__duration def __check_duration(self): """Constrain path duration to maximal duration.""" if self.__duration_max > 0: while self.__duration > self.__duration_max: oldest_step = self.pop(0) self.__duration -= oldest_step.duration # Edit transition matrix if len(self) > 0: # Decrement [index: source, columns: destination] value self.__transition_matrix.loc[oldest_step.aoi, self[0].aoi,] -= 1 def clear(self): """Clear aoi scan steps list, letter sequence and transition matrix.""" super().clear() # noinspection PyAttributeOutsideInit self.__movements = TimeStampedGazeMovements() # noinspection PyAttributeOutsideInit self.__current_aoi = '' # noinspection PyAttributeOutsideInit self.__index = ord('A') # noinspection PyAttributeOutsideInit self.__aoi_letter = {} # noinspection PyAttributeOutsideInit self.__letter_aoi = {} size = len(self.__expected_aoi) # noinspection PyAttributeOutsideInit self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi, columns=self.__expected_aoi) def __get_aoi_letter(self, aoi): try: return self.__aoi_letter[aoi] except KeyError: letter = chr(self.__index) self.__aoi_letter[aoi] = letter self.__index += 1 return letter def get_letter_aoi(self, letter): """Get which aoi is related to a unique letter.""" return self.__letter_aoi[letter] @property def letter_sequence(self) -> str: """Convert aoi scan path into a string with unique letter per aoi step.""" sequence = '' for step in self: sequence += step.letter return sequence @property def current_aoi(self): """AOI name of aoi scan step under construction""" return self.__current_aoi @property def transition_matrix(self) -> pandas.DataFrame: """[Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) where indexes are transition departures and columns are transition destinations.""" return self.__transition_matrix def append_saccade(self, saccade): """Append new saccade to aoi scan path.""" # Ignore saccade if no fixation have been stored before if len(self.__movements) > 0: self.__movements.append(saccade) def append_fixation(self, fixation, looked_aoi: str): """Append new fixation to aoi scan path and return last new aoi scan step if one have been created. !!! warning It could raise AOIScanStepError """ # Replace None aoi by generic OutsideAOI name if looked_aoi is None: looked_aoi = OutsideAOI # Raise error when aoi is not expected elif looked_aoi not in self.__expected_aoi: raise AOIScanStepError('AOI not expected', looked_aoi) # Is it fixation onto a new aoi? if looked_aoi != self.__current_aoi and len(self.__movements) > 0: try: # Edit unique letter per aoi letter = self.__get_aoi_letter(self.__current_aoi) # Remember which letter identify which aoi self.__letter_aoi[letter] = self.__current_aoi # Edit new step new_step = AOIScanStep(self.__movements, self.__current_aoi, letter) # Edit transition matrix if len(self) > 0: # Increment [index: source, columns: destination] value self.__transition_matrix.loc[self[-1].aoi, self.__current_aoi,] += 1 # Append new step super().append(new_step) # Update duration self.__duration += new_step.duration # Constrain path duration to maximal duration self.__check_duration() # Return new step return new_step finally: # Clear movements # noinspection PyAttributeOutsideInit self.__movements = TimeStampedGazeMovements() # Append new fixation self.__movements.append(fixation) # Remember new aoi self.__current_aoi = looked_aoi else: # Append new fixation self.__movements.append(fixation) # Remember aoi # noinspection PyAttributeOutsideInit self.__current_aoi = looked_aoi return None def fixations_count(self): """Get how many fixations are there in the scan path and how many fixation are there in each aoi.""" scan_fixations_count = 0 aoi_fixations_count = {aoi: 0 for aoi in self.__expected_aoi} for aoi_scan_step in self: step_fixations_count = len(aoi_scan_step.movements) - 1 # -1: to ignore last saccade scan_fixations_count += step_fixations_count aoi_fixations_count[aoi_scan_step.aoi] += step_fixations_count return scan_fixations_count, aoi_fixations_count class AOIScanPathAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide an aoi scan path analyzer.""" # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if isinstance(value, property) and value.fset is None] def analysis(self) -> DataFeatures.DataDictionary: """Get all aoi scan path analyzer analysis as data dictionary.""" return DataFeatures.DataDictionary({a: getattr(self, a) for a in self.__analysis}) @DataFeatures.PipelineStepMethod def analyze(self, aoi_scan_path: AOIScanPath): """Analyze aoi scan path.""" raise NotImplementedError('analyze() method not implemented')