aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/argaze/GazeAnalysis/CoefficientK.py44
-rw-r--r--src/argaze/GazeFeatures.py123
-rw-r--r--src/argaze/utils/demo_gaze_features_run.py49
3 files changed, 200 insertions, 16 deletions
diff --git a/src/argaze/GazeAnalysis/CoefficientK.py b/src/argaze/GazeAnalysis/CoefficientK.py
index 7bcb3b3..d836864 100644
--- a/src/argaze/GazeAnalysis/CoefficientK.py
+++ b/src/argaze/GazeAnalysis/CoefficientK.py
@@ -9,8 +9,48 @@ from argaze import GazeFeatures
import numpy
@dataclass
+class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer):
+ """Implementation of Coefficient K algorithm as proposed by A. Duchowski and Krejtz, 2017.
+ """
+
+ def __post_init__(self):
+
+ pass
+
+ def analyze(self, scan_path: GazeFeatures.ScanPathType) -> Any:
+ """Analyze scan path."""
+
+ assert(len(scan_path) > 1)
+
+ durations = []
+ amplitudes = []
+
+ for scan_step in scan_path:
+
+ durations.append(scan_step.duration)
+ amplitudes.append(scan_step.last_saccade.amplitude)
+
+ durations = numpy.array(durations)
+ amplitudes = numpy.array(amplitudes)
+
+ duration_mean = numpy.mean(durations)
+ amplitude_mean = numpy.mean(amplitudes)
+
+ duration_std = numpy.std(durations)
+ amplitude_std = numpy.std(amplitudes)
+
+ Ks = []
+ for scan_step in scan_path:
+
+ Ks.append(((scan_step.duration - duration_mean) / duration_std) - ((scan_step.last_saccade.amplitude - amplitude_mean) / amplitude_std))
+
+ K = numpy.array(Ks).mean()
+
+ return K
+
+@dataclass
class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer):
- """Implementation of transition probability matrix algorithm as described by Christophe Lounis in its thesis "Monitor the monitoring: pilot assistance through gaze tracking and aoi scanning analyses".
+ """Implementation of AOI based Coefficient K algorithm as described by Christophe Lounis in its thesis "Monitor the monitoring: pilot assistance through gaze tracking and aoi scanning analyses".
"""
def __post_init__(self):
@@ -18,7 +58,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer):
pass
def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType) -> Any:
- """Analyze aoi scan."""
+ """Analyze aoi scan path."""
assert(len(aoi_scan_path) > 1)
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index f0600bd..fb72bd9 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -176,6 +176,9 @@ class GazeMovement():
return output
+FixationType = TypeVar('Fixation', bound="Fixation")
+# Type definition for type annotation convenience
+
class Fixation(GazeMovement):
"""Define abstract fixation as gaze movement."""
@@ -186,6 +189,11 @@ class Fixation(GazeMovement):
super().__post_init__()
+ def merge(self, fixation) -> FixationType:
+ """Merge another fixation into this fixation."""
+
+ raise NotImplementedError('merge() method not implemented')
+
def is_fixation(gaze_movement):
"""Is a gaze movement a fixation?"""
@@ -307,6 +315,116 @@ class GazeMovementIdentifier():
return ts_fixations, ts_saccades, ts_status
+ScanStepType = TypeVar('ScanStep', bound="ScanStep")
+# Type definition for type annotation convenience
+
+class ScanStepError(Exception):
+ """Exception raised at ScanStepError creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade."""
+
+ def __init__(self, message):
+
+ super().__init__(message)
+
+@dataclass(frozen=True)
+class ScanStep():
+ """Define a scan step as a pair of successive fixation and saccade.
+ .. warning::
+ Scan step have to start by a fixation and then end by a saccade."""
+
+ first_fixation: Fixation
+ """A fixation that comes before the next saccade."""
+
+ last_saccade: Saccade
+ """A saccade that comes after the previous fixation."""
+
+ def __post_init__(self):
+
+ # First movement have to be a fixation
+ if not is_fixation(self.first_fixation):
+
+ raise ScanStepError('First step movement is not a fixation')
+
+ # Last movement have to be a saccade
+ if not is_saccade(self.last_saccade):
+
+ raise ScanStepError('Last step movement is not a saccade')
+
+ @property
+ def duration(self):
+ """Time spent on AOI."""
+
+ # Timestamp of first position of first fixation
+ first_ts, _ = self.first_fixation.positions.first
+
+ # Timestamp of first position of last saccade
+ last_ts, _ = self.last_saccade.positions.first
+
+ return last_ts - first_ts
+
+ScanPathType = TypeVar('ScanPathType', bound="ScanPathType")
+# Type definition for type annotation convenience
+
+class ScanPath(list):
+ """List of scan steps."""
+
+ def __init__(self):
+
+ super().__init__()
+
+ self.__last_fixation = None
+
+ def append_saccade(self, ts, saccade):
+ """Append new saccade to scan path and return last new scan step if one have been created."""
+
+ # Ignore saccade if no fixation came before
+ if self.__last_fixation != None:
+
+ try:
+
+ # Edit new step
+ new_step = ScanStep(self.__last_fixation, saccade)
+
+ # Append new step
+ super().append(new_step)
+
+ # Return new step
+ return new_step
+
+ finally:
+
+ # Clear last fixation
+ self.__last_fixation = None
+
+ def append_fixation(self, ts, fixation) -> bool:
+ """Append new fixation to scan path.
+
+ .. warning::
+ It could raise ScanStepError"""
+
+ # No fixation came before
+ if self.__last_fixation == None:
+
+ self.__last_fixation = fixation
+
+ # Merge successive fixations
+ else:
+
+ try:
+ self.__last_fixation.merge(fixation)
+
+ # Merge method not implemented for this kind of fixation
+ except NotImplementedError as e:
+
+ print(e)
+
+class ScanPathAnalyzer():
+ """Abstract class to define what should provide a scan path analyzer."""
+
+ def analyze(self, scan_path: ScanPathType) -> Any:
+ """Analyze scan path."""
+
+ raise NotImplementedError('analyze() method not implemented')
+
AOIScanStepType = TypeVar('AOIScanStep', bound="AOIScanStep")
# Type definition for type annotation convenience
@@ -323,7 +441,7 @@ class AOIScanStepError(Exception):
class AOIScanStep():
"""Define a aoi scan step as a set of successive gaze movements onto a same AOI.
.. warning::
- Visual scan step have to start by a fixation and then end by a saccade."""
+ Aoi scan step have to start by a fixation and then end by a saccade."""
movements: TimeStampedGazeMovements
"""All movements over an AOI and the last saccade that comes out."""
@@ -331,9 +449,6 @@ class AOIScanStep():
aoi: str = field(default='')
"""AOI name."""
- #identifier: int = field(default=None)
- """AOI identifier."""
-
def __post_init__(self):
# First movement have to be a fixation
diff --git a/src/argaze/utils/demo_gaze_features_run.py b/src/argaze/utils/demo_gaze_features_run.py
index 022ad59..6294527 100644
--- a/src/argaze/utils/demo_gaze_features_run.py
+++ b/src/argaze/utils/demo_gaze_features_run.py
@@ -54,13 +54,19 @@ def main():
}
identification_mode = 'I-DT'
+ raw_scan_path = GazeFeatures.ScanPath()
aoi_scan_path = GazeFeatures.AOIScanPath()
tpm = TransitionProbabilityMatrix.AOIScanPathAnalyzer()
tpm_analysis = pandas.DataFrame()
- cK = CoefficientK.AOIScanPathAnalyzer()
- cK_analysis = 0
+ raw_cK_analyzer = CoefficientK.ScanPathAnalyzer()
+ raw_cK_analysis = 0
+
+ aoi_cK_analyzer = CoefficientK.AOIScanPathAnalyzer()
+ aoi_cK_analysis = 0
+
+ ck_mode = 'raw'
gaze_movement_lock = threading.Lock()
@@ -72,7 +78,8 @@ def main():
nonlocal gaze_position
nonlocal tpm_analysis
- nonlocal cK_analysis
+ nonlocal raw_cK_analysis
+ nonlocal aoi_cK_analysis
# Edit millisecond timestamp
data_ts = int((time.time() - start_ts) * 1e3)
@@ -105,17 +112,22 @@ def main():
look_at = name
break
+
+
+ # Append fixation to raw scan path
+ raw_scan_path.append_fixation(data_ts, gaze_movement)
+
try:
# Append fixation to aoi scan path
new_step = aoi_scan_path.append_fixation(data_ts, gaze_movement, look_at)
- # Analyse transition probabilities
+ # Analyse aoi scan path
if new_step and len(aoi_scan_path) > 1:
tpm_analysis = tpm.analyze(aoi_scan_path)
- cK_analysis = cK.analyze(aoi_scan_path)
+ aoi_cK_analysis = aoi_cK_analyzer.analyze(aoi_scan_path)
except GazeFeatures.AOIScanStepError as e:
@@ -123,6 +135,14 @@ def main():
elif GazeFeatures.is_saccade(gaze_movement):
+ # Append saccade to raw scan path
+ new_step = raw_scan_path.append_saccade(data_ts, gaze_movement)
+
+ # Analyse scan path
+ if new_step and len(raw_scan_path) > 1:
+
+ raw_cK_analysis = raw_cK_analyzer.analyze(raw_scan_path)
+
# Append saccade to aoi scan path
aoi_scan_path.append_saccade(data_ts, gaze_movement)
@@ -222,14 +242,23 @@ def main():
cv2.line(aoi_matrix, start_line, to_center, color, int(probability*10) + 2)
cv2.line(aoi_matrix, from_center, to_center, [55, 55, 55], 2)
- # Write cK analysis
- if cK_analysis < 0.:
+ # Write raw cK analysis
+ if raw_cK_analysis < 0.:
+
+ cv2.putText(aoi_matrix, f'Raw: Ambient attention', (20, window_size[1]-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+
+ elif raw_cK_analysis > 0.:
+
+ cv2.putText(aoi_matrix, f'Raw: Focal attention', (20, window_size[1]-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 1, cv2.LINE_AA)
+
+ # Write aoi cK analysis
+ if aoi_cK_analysis < 0.:
- cv2.putText(aoi_matrix, f'Ambient attention', (20, window_size[1]-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(aoi_matrix, f'AOI: Ambient attention', (20, window_size[1]-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- elif cK_analysis > 0.:
+ elif aoi_cK_analysis > 0.:
- cv2.putText(aoi_matrix, f'Focal attention', (20, window_size[1]-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 1, cv2.LINE_AA)
+ cv2.putText(aoi_matrix, f'AOI: Focal attention', (20, window_size[1]-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 1, cv2.LINE_AA)
# Unlock gaze movement identification
gaze_movement_lock.release()