From f989212bf56a45600cfa17686664bc9375749d6c Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Tue, 16 May 2023 11:13:52 +0200 Subject: Adding a new scan path type and processing coefficient K on it. --- src/argaze/GazeAnalysis/CoefficientK.py | 44 ++++++++++- src/argaze/GazeFeatures.py | 123 ++++++++++++++++++++++++++++- src/argaze/utils/demo_gaze_features_run.py | 49 +++++++++--- 3 files changed, 200 insertions(+), 16 deletions(-) diff --git a/src/argaze/GazeAnalysis/CoefficientK.py b/src/argaze/GazeAnalysis/CoefficientK.py index 7bcb3b3..d836864 100644 --- a/src/argaze/GazeAnalysis/CoefficientK.py +++ b/src/argaze/GazeAnalysis/CoefficientK.py @@ -9,8 +9,48 @@ from argaze import GazeFeatures import numpy @dataclass +class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): + """Implementation of Coefficient K algorithm as proposed by A. Duchowski and Krejtz, 2017. + """ + + def __post_init__(self): + + pass + + def analyze(self, scan_path: GazeFeatures.ScanPathType) -> Any: + """Analyze scan path.""" + + assert(len(scan_path) > 1) + + durations = [] + amplitudes = [] + + for scan_step in scan_path: + + durations.append(scan_step.duration) + amplitudes.append(scan_step.last_saccade.amplitude) + + durations = numpy.array(durations) + amplitudes = numpy.array(amplitudes) + + duration_mean = numpy.mean(durations) + amplitude_mean = numpy.mean(amplitudes) + + duration_std = numpy.std(durations) + amplitude_std = numpy.std(amplitudes) + + Ks = [] + for scan_step in scan_path: + + Ks.append(((scan_step.duration - duration_mean) / duration_std) - ((scan_step.last_saccade.amplitude - amplitude_mean) / amplitude_std)) + + K = numpy.array(Ks).mean() + + return K + +@dataclass class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): - """Implementation of transition probability matrix algorithm as described by Christophe Lounis in its thesis "Monitor the monitoring: pilot assistance through gaze tracking and aoi scanning analyses". + """Implementation of AOI based Coefficient K algorithm as described by Christophe Lounis in its thesis "Monitor the monitoring: pilot assistance through gaze tracking and aoi scanning analyses". """ def __post_init__(self): @@ -18,7 +58,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): pass def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType) -> Any: - """Analyze aoi scan.""" + """Analyze aoi scan path.""" assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index f0600bd..fb72bd9 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -176,6 +176,9 @@ class GazeMovement(): return output +FixationType = TypeVar('Fixation', bound="Fixation") +# Type definition for type annotation convenience + class Fixation(GazeMovement): """Define abstract fixation as gaze movement.""" @@ -186,6 +189,11 @@ class Fixation(GazeMovement): super().__post_init__() + def merge(self, fixation) -> FixationType: + """Merge another fixation into this fixation.""" + + raise NotImplementedError('merge() method not implemented') + def is_fixation(gaze_movement): """Is a gaze movement a fixation?""" @@ -307,6 +315,116 @@ class GazeMovementIdentifier(): return ts_fixations, ts_saccades, ts_status +ScanStepType = TypeVar('ScanStep', bound="ScanStep") +# Type definition for type annotation convenience + +class ScanStepError(Exception): + """Exception raised at ScanStepError creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade.""" + + def __init__(self, message): + + super().__init__(message) + +@dataclass(frozen=True) +class ScanStep(): + """Define a scan step as a pair of successive fixation and saccade. + .. warning:: + Scan step have to start by a fixation and then end by a saccade.""" + + first_fixation: Fixation + """A fixation that comes before the next saccade.""" + + last_saccade: Saccade + """A saccade that comes after the previous fixation.""" + + def __post_init__(self): + + # First movement have to be a fixation + if not is_fixation(self.first_fixation): + + raise ScanStepError('First step movement is not a fixation') + + # Last movement have to be a saccade + if not is_saccade(self.last_saccade): + + raise ScanStepError('Last step movement is not a saccade') + + @property + def duration(self): + """Time spent on AOI.""" + + # Timestamp of first position of first fixation + first_ts, _ = self.first_fixation.positions.first + + # Timestamp of first position of last saccade + last_ts, _ = self.last_saccade.positions.first + + return last_ts - first_ts + +ScanPathType = TypeVar('ScanPathType', bound="ScanPathType") +# Type definition for type annotation convenience + +class ScanPath(list): + """List of scan steps.""" + + def __init__(self): + + super().__init__() + + self.__last_fixation = None + + def append_saccade(self, ts, saccade): + """Append new saccade to scan path and return last new scan step if one have been created.""" + + # Ignore saccade if no fixation came before + if self.__last_fixation != None: + + try: + + # Edit new step + new_step = ScanStep(self.__last_fixation, saccade) + + # Append new step + super().append(new_step) + + # Return new step + return new_step + + finally: + + # Clear last fixation + self.__last_fixation = None + + def append_fixation(self, ts, fixation) -> bool: + """Append new fixation to scan path. + + .. warning:: + It could raise ScanStepError""" + + # No fixation came before + if self.__last_fixation == None: + + self.__last_fixation = fixation + + # Merge successive fixations + else: + + try: + self.__last_fixation.merge(fixation) + + # Merge method not implemented for this kind of fixation + except NotImplementedError as e: + + print(e) + +class ScanPathAnalyzer(): + """Abstract class to define what should provide a scan path analyzer.""" + + def analyze(self, scan_path: ScanPathType) -> Any: + """Analyze scan path.""" + + raise NotImplementedError('analyze() method not implemented') + AOIScanStepType = TypeVar('AOIScanStep', bound="AOIScanStep") # Type definition for type annotation convenience @@ -323,7 +441,7 @@ class AOIScanStepError(Exception): class AOIScanStep(): """Define a aoi scan step as a set of successive gaze movements onto a same AOI. .. warning:: - Visual scan step have to start by a fixation and then end by a saccade.""" + Aoi scan step have to start by a fixation and then end by a saccade.""" movements: TimeStampedGazeMovements """All movements over an AOI and the last saccade that comes out.""" @@ -331,9 +449,6 @@ class AOIScanStep(): aoi: str = field(default='') """AOI name.""" - #identifier: int = field(default=None) - """AOI identifier.""" - def __post_init__(self): # First movement have to be a fixation diff --git a/src/argaze/utils/demo_gaze_features_run.py b/src/argaze/utils/demo_gaze_features_run.py index 022ad59..6294527 100644 --- a/src/argaze/utils/demo_gaze_features_run.py +++ b/src/argaze/utils/demo_gaze_features_run.py @@ -54,13 +54,19 @@ def main(): } identification_mode = 'I-DT' + raw_scan_path = GazeFeatures.ScanPath() aoi_scan_path = GazeFeatures.AOIScanPath() tpm = TransitionProbabilityMatrix.AOIScanPathAnalyzer() tpm_analysis = pandas.DataFrame() - cK = CoefficientK.AOIScanPathAnalyzer() - cK_analysis = 0 + raw_cK_analyzer = CoefficientK.ScanPathAnalyzer() + raw_cK_analysis = 0 + + aoi_cK_analyzer = CoefficientK.AOIScanPathAnalyzer() + aoi_cK_analysis = 0 + + ck_mode = 'raw' gaze_movement_lock = threading.Lock() @@ -72,7 +78,8 @@ def main(): nonlocal gaze_position nonlocal tpm_analysis - nonlocal cK_analysis + nonlocal raw_cK_analysis + nonlocal aoi_cK_analysis # Edit millisecond timestamp data_ts = int((time.time() - start_ts) * 1e3) @@ -105,17 +112,22 @@ def main(): look_at = name break + + + # Append fixation to raw scan path + raw_scan_path.append_fixation(data_ts, gaze_movement) + try: # Append fixation to aoi scan path new_step = aoi_scan_path.append_fixation(data_ts, gaze_movement, look_at) - # Analyse transition probabilities + # Analyse aoi scan path if new_step and len(aoi_scan_path) > 1: tpm_analysis = tpm.analyze(aoi_scan_path) - cK_analysis = cK.analyze(aoi_scan_path) + aoi_cK_analysis = aoi_cK_analyzer.analyze(aoi_scan_path) except GazeFeatures.AOIScanStepError as e: @@ -123,6 +135,14 @@ def main(): elif GazeFeatures.is_saccade(gaze_movement): + # Append saccade to raw scan path + new_step = raw_scan_path.append_saccade(data_ts, gaze_movement) + + # Analyse scan path + if new_step and len(raw_scan_path) > 1: + + raw_cK_analysis = raw_cK_analyzer.analyze(raw_scan_path) + # Append saccade to aoi scan path aoi_scan_path.append_saccade(data_ts, gaze_movement) @@ -222,14 +242,23 @@ def main(): cv2.line(aoi_matrix, start_line, to_center, color, int(probability*10) + 2) cv2.line(aoi_matrix, from_center, to_center, [55, 55, 55], 2) - # Write cK analysis - if cK_analysis < 0.: + # Write raw cK analysis + if raw_cK_analysis < 0.: + + cv2.putText(aoi_matrix, f'Raw: Ambient attention', (20, window_size[1]-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + + elif raw_cK_analysis > 0.: + + cv2.putText(aoi_matrix, f'Raw: Focal attention', (20, window_size[1]-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 1, cv2.LINE_AA) + + # Write aoi cK analysis + if aoi_cK_analysis < 0.: - cv2.putText(aoi_matrix, f'Ambient attention', (20, window_size[1]-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) + cv2.putText(aoi_matrix, f'AOI: Ambient attention', (20, window_size[1]-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA) - elif cK_analysis > 0.: + elif aoi_cK_analysis > 0.: - cv2.putText(aoi_matrix, f'Focal attention', (20, window_size[1]-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 1, cv2.LINE_AA) + cv2.putText(aoi_matrix, f'AOI: Focal attention', (20, window_size[1]-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 1, cv2.LINE_AA) # Unlock gaze movement identification gaze_movement_lock.release() -- cgit v1.1