aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py3
-rw-r--r--src/argaze.test/GazeFeatures.py43
-rw-r--r--src/argaze.test/PupillFeatures.py60
-rw-r--r--src/argaze/ArFeatures.py26
-rw-r--r--src/argaze/DataFeatures.py46
-rw-r--r--src/argaze/GazeAnalysis/DeviationCircleCoverage.py2
-rw-r--r--src/argaze/GazeAnalysis/DispersionThresholdIdentification.py10
-rw-r--r--src/argaze/GazeAnalysis/VelocityThresholdIdentification.py4
-rw-r--r--src/argaze/GazeFeatures.py26
-rw-r--r--src/argaze/PupillAnalysis/WorkloadIndex.py44
-rw-r--r--src/argaze/PupillFeatures.py78
11 files changed, 138 insertions, 204 deletions
diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
index 156f6f1..311f31b 100644
--- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
@@ -113,6 +113,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2)
ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions)
+ # DEBUG
+ print(gaze_movement_identifier)
+
# Check result size
self.assertEqual(len(ts_fixations), 1)
self.assertEqual(len(ts_saccades), 0)
diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py
index c6ccfca..e678093 100644
--- a/src/argaze.test/GazeFeatures.py
+++ b/src/argaze.test/GazeFeatures.py
@@ -31,6 +31,9 @@ def random_gaze_positions(size, frame_dimension: tuple[float, float] = (1, 1)):
# Edit gaze position
random_gaze_position = GazeFeatures.GazePosition((random.random() * frame_dimension[0], random.random() * frame_dimension[1]), precision=5)
+ # DEBUG
+ print('random_gaze_position', type(random_gaze_position), random_gaze_position.__class__.__bases__)
+
# Timestamp gaze position
random_gaze_position.timestamp = time.time()
@@ -375,13 +378,11 @@ def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)):
for i in range(size):
- fixation = TestFixation(random_gaze_positions(10, frame_dimension))
- ts, _ = fixation.first
- scan_path.append_fixation(ts, fixation)
+ fixation = TestFixation(random_gaze_positions(10, frame_dimension), timestamp=i)
+ scan_path.append_fixation(fixation)
- saccade = TestSaccade(random_gaze_positions(2, frame_dimension))
- ts, _ = saccade.first
- scan_path.append_saccade(ts, saccade)
+ saccade = TestSaccade(random_gaze_positions(2, frame_dimension), timestamp=i+1)
+ scan_path.append_saccade(saccade)
return scan_path
@@ -404,9 +405,7 @@ class TestScanPathClass(unittest.TestCase):
# Append a saccade that should be ignored
saccade = TestSaccade(random_gaze_positions(2))
- ts = saccade[0].timestamp
-
- new_step = scan_path.append_saccade(ts, saccade)
+ new_step = scan_path.append_saccade(saccade)
# Check that no scan step have been created yet
self.assertEqual(len(scan_path), 0)
@@ -415,9 +414,7 @@ class TestScanPathClass(unittest.TestCase):
# Append first fixation
fixation_A = TestFixation(random_gaze_positions(10))
- ts = fixation_A[0].timestamp
-
- new_step = scan_path.append_fixation(ts, fixation_A)
+ new_step = scan_path.append_fixation(fixation_A)
# Check that no scan step have been created yet
self.assertEqual(len(scan_path), 0)
@@ -426,9 +423,7 @@ class TestScanPathClass(unittest.TestCase):
# Append consecutive saccade
saccade_A = TestSaccade(random_gaze_positions(2))
- ts = saccade_A[0].timestamp
-
- new_step_A = scan_path.append_saccade(ts, saccade_A)
+ new_step_A = scan_path.append_saccade(saccade_A)
# Check that new scan step have been created
self.assertEqual(len(scan_path), 1)
@@ -439,9 +434,7 @@ class TestScanPathClass(unittest.TestCase):
# Append 2 consecutive fixations then a saccade
fixation_B1 = TestFixation(random_gaze_positions(10))
- ts = fixation_B1[0].timestamp
-
- new_step = scan_path.append_fixation(ts, fixation_B1)
+ new_step = scan_path.append_fixation(fixation_B1)
# Check that no scan step have been created yet
self.assertEqual(len(scan_path), 1)
@@ -449,9 +442,7 @@ class TestScanPathClass(unittest.TestCase):
self.assertEqual(new_step, None)
fixation_B2 = TestFixation(random_gaze_positions(10))
- ts = fixation_B2[0].timestamp
-
- new_step = scan_path.append_fixation(ts, fixation_B2)
+ new_step = scan_path.append_fixation(fixation_B2)
# Check that no scan step have been created yet
self.assertEqual(len(scan_path), 1)
@@ -459,9 +450,7 @@ class TestScanPathClass(unittest.TestCase):
self.assertEqual(new_step, None)
saccade_B = TestSaccade(random_gaze_positions(2))
- ts = saccade_B[0].timestamp
-
- new_step_B = scan_path.append_saccade(ts, saccade_B)
+ new_step_B = scan_path.append_saccade(saccade_B)
# Check that new scan step have been created
self.assertEqual(len(scan_path), 2)
@@ -521,12 +510,12 @@ def build_aoi_scan_path(expected_aoi, aoi_path):
# Append a hidden last step to allow last given step creation
aoi_path.append(aoi_path[-2])
- for aoi in aoi_path:
+ for i, aoi in enumerate(aoi_path):
- fixation = TestFixation(random_gaze_positions(10))
+ fixation = TestFixation(random_gaze_positions(10), timestamp=i)
aoi_scan_path.append_fixation(fixation, aoi)
- saccade = TestSaccade(random_gaze_positions(2))
+ saccade = TestSaccade(random_gaze_positions(2), timestamp=i+1)
aoi_scan_path.append_saccade(saccade)
return aoi_scan_path
diff --git a/src/argaze.test/PupillFeatures.py b/src/argaze.test/PupillFeatures.py
index f0e8e1b..9cf26eb 100644
--- a/src/argaze.test/PupillFeatures.py
+++ b/src/argaze.test/PupillFeatures.py
@@ -8,6 +8,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "BSD"
import unittest
+import math
from argaze import PupillFeatures
@@ -43,14 +44,12 @@ class TestPupillDiameterClass(unittest.TestCase):
# Check empty PupillDiameter
empty_pupill_diameter = PupillFeatures.PupillDiameter()
- self.assertEqual(empty_pupill_diameter.value, 0.)
- self.assertEqual(empty_pupill_diameter.valid, False)
+ self.assertEqual(empty_pupill_diameter, math.nan)
# Check float PupillDiameter
float_pupill_diameter = PupillFeatures.PupillDiameter(1.23)
- self.assertEqual(float_pupill_diameter.value, 1.23)
- self.assertEqual(float_pupill_diameter.valid, True)
+ self.assertEqual(float_pupill_diameter, 1.23)
def test_properties(self):
"""Test PupillDiameter properties cannot be modified after creation."""
@@ -60,32 +59,16 @@ class TestPupillDiameterClass(unittest.TestCase):
# Check that pupill diameter value setting fails
with self.assertRaises(AttributeError):
- pupill_diameter.value = 123
+ pupill_diameter = 123
- self.assertNotEqual(pupill_diameter.value, 123)
- self.assertEqual(pupill_diameter.value, 0.)
+ self.assertNotEqual(pupill_diameter, 123)
+ self.assertEqual(pupill_diameter, math.nan)
def test___repr__(self):
"""Test PupillDiameter string representation."""
# Check empty PupillDiameter representation
- self.assertEqual(repr(PupillFeatures.PupillDiameter()), "{\"value\": 0.0}")
-
-class TestUnvalidPupillDiameterClass(unittest.TestCase):
- """Test UnvalidPupillDiameter class."""
-
- def test_new(self):
- """Test UnvalidPupillDiameter creation."""
-
- unvalid_pupill_diameter = PupillFeatures.UnvalidPupillDiameter()
-
- self.assertEqual(unvalid_pupill_diameter.value, 0.)
- self.assertEqual(unvalid_pupill_diameter.valid, False)
-
- def test___repr__(self):
- """Test UnvalidPupillDiameter string representation."""
-
- self.assertEqual(repr(PupillFeatures.UnvalidPupillDiameter()), "{\"message\": null, \"value\": 0.0}")
+ self.assertEqual(repr(PupillFeatures.PupillDiameter()), "{\"value\": NaN}")
class TestTimeStampedPupillDiametersClass(unittest.TestCase):
"""Test TimeStampedPupillDiameters class."""
@@ -93,22 +76,23 @@ class TestTimeStampedPupillDiametersClass(unittest.TestCase):
def test___setitem__(self):
"""Test __setitem__ method."""
- ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters()
- ts_pupill_diameters[0] = PupillFeatures.PupillDiameter()
- ts_pupill_diameters[1] = PupillFeatures.UnvalidPupillDiameter()
- ts_pupill_diameters[2] = {"value": 1.23}
+ ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters([
+ PupillFeatures.PupillDiameter(),
+ PupillFeatures.PupillDiameter(0.63),
+ {"value": 1.23}
+ ])
- # Check PupillDiameter is correctly stored and accessible as a PupillDiameter
+ # Check empty PupillDiameter is correctly stored and accessible as a PupillDiameter
self.assertIsInstance(ts_pupill_diameters[0], PupillFeatures.PupillDiameter)
- self.assertEqual(ts_pupill_diameters[0].valid, False)
+ self.assertEqual(ts_pupill_diameters[0], math.nan)
- # Check UnvalidPupillDiameter is correctly stored and accessible as a UnvalidPupillDiameter
- self.assertIsInstance(ts_pupill_diameters[1], PupillFeatures.UnvalidPupillDiameter)
- self.assertEqual(ts_pupill_diameters[1].valid, False)
+ # Check PupillDiameter is correctly stored and accessible as a PupillDiameter
+ self.assertIsInstance(ts_pupill_diameters[1], PupillFeatures.PupillDiameter)
+ self.assertEqual(ts_pupill_diameters[0], 0.63)
- # Check dict with "value" and "precision" keys is correctly stored and accessible as a PupillDiameter
+ # Check dict with "value" key is correctly stored and accessible as a PupillDiameter
self.assertIsInstance(ts_pupill_diameters[2], PupillFeatures.PupillDiameter)
- self.assertEqual(ts_pupill_diameters[2].valid, True)
+ self.assertEqual(ts_pupill_diameters[0], 1.23)
# Check that bad data type insertion fails
with self.assertRaises(AssertionError):
@@ -125,11 +109,11 @@ class TestTimeStampedPupillDiametersClass(unittest.TestCase):
ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters()
- self.assertEqual(repr(PupillFeatures.TimeStampedPupillDiameters()), "{}")
+ self.assertEqual(repr(PupillFeatures.TimeStampedPupillDiameters()), "[]")
- ts_pupill_diameters[0] = PupillFeatures.PupillDiameter()
+ ts_pupill_diameters.append(PupillFeatures.PupillDiameter())
- self.assertEqual(repr(ts_pupill_diameters), "{\"0\": {\"value\": 0.0}}")
+ self.assertEqual(repr(ts_pupill_diameters), "[{\"value\": NaN, \"timestamp\": 0}]")
ts_pupill_diameters[0] = PupillFeatures.UnvalidPupillDiameter()
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index b3ecad6..8005d48 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -415,7 +415,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement)
# Valid and finished gaze movement has been identified
- if gaze_movement.valid and gaze_movement.finished:
+ if gaze_movement and gaze_movement.finished:
if GazeFeatures.is_fixation(gaze_movement):
@@ -908,22 +908,22 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Identify gaze movement
if self.__gaze_movement_identifier is not None:
-
+
# Identify finished gaze movement
self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position)
# Valid and finished gaze movement has been identified
- if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished:
-
+ if self.__identified_gaze_movement and self.__identified_gaze_movement.finished:
+
if GazeFeatures.is_fixation(self.__identified_gaze_movement):
-
+
# Append fixation to scan path
if self.__scan_path is not None:
-
+
self.__scan_path.append_fixation(self.__identified_gaze_movement)
elif GazeFeatures.is_saccade(self.__identified_gaze_movement):
-
+
# Append saccade to scan path
if self.__scan_path is not None:
@@ -931,10 +931,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Is there a new step?
if scan_step and len(self.__scan_path) > 1:
-
+
# Analyze aoi scan path
for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items():
-
+
scan_path_analyzer.analyze(timestamp, self.__scan_path)
# Update scan path analyzed state
@@ -1013,12 +1013,16 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Draw current fixation if required
if draw_fixations is not None and self.__gaze_movement_identifier is not None:
- self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations)
+ if self.__gaze_movement_identifier.current_fixation:
+
+ self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations)
# Draw current saccade if required
if draw_saccades is not None and self.__gaze_movement_identifier is not None:
- self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades)
+ if self.__gaze_movement_identifier.current_saccade:
+
+ self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades)
# Draw layers if required
if draw_layers is not None:
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index ff9baec..8df991b 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -149,20 +149,15 @@ class TimestampedObject():
@timestamp.setter
def timestamp(self, timestamp: int|float):
"""Set object timestamp."""
-
- assert(type(timestamp) == int or type(timestamp) == float)
-
self._timestamp = timestamp
def untimestamp(self):
"""Reset object timestamp."""
- self.timestamp = math.nan
+ self._timestamp = math.nan
def is_timestamped(self) -> bool:
"""Is the object timestamped?"""
- timestamped = not math.isnan(self.timestamp)
-
- return timestamped
+ return not math.isnan(self._timestamp)
class TimestampedObjectsList(list):
"""Handle timestamped object into a list.
@@ -188,7 +183,7 @@ class TimestampedObjectsList(list):
def append(self, ts_object: TimestampedObjectType|dict):
"""Append timestamped object."""
-
+
# Convert dict into GazePosition
if type(ts_object) == dict:
@@ -200,7 +195,7 @@ class TimestampedObjectsList(list):
if not issubclass(ts_object.__class__, self.__object_type):
raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance')
-
+
assert(ts_object.is_timestamped())
super().append(ts_object)
@@ -421,42 +416,15 @@ class TimestampedObjectsList(list):
return legend_patches
class SharedObject(TimestampedObject):
- """Abstract class to enable multiple threads sharing and timestamp management."""
+ """Abstract class to enable multiple threads sharing for timestamped object."""
- def __init__(self):
+ def __init__(self, timestamp: int|float = math.nan):
- super().__init__()
+ TimestampedObject.__init__(self, timestamp)
self._lock = threading.Lock()
self._execution_times = {}
self._exceptions = {}
- @property
- def lock(self) -> threading.Lock:
- """Get shared object lock object."""
- return self._lock
-
- @property
- def timestamp(self) -> int|float:
- """Get shared object timestamp."""
- with self._lock:
- return super().timestamp
-
- @timestamp.setter
- def timestamp(self, timestamp: int|float):
- """Set shared object timestamp."""
- with self._lock:
- super().timestamp = timestamp
-
- def untimestamp(self):
- """Reset shared object timestamp."""
- with self._lock:
- self.timestamp = math.nan
-
- def is_timestamped(self) -> bool:
- """Is the object timestamped?"""
- with self._lock:
- return super().is_timestamped()
-
class PipelineStepObject():
"""
Define class to assess pipeline step methods execution time and observe them.
diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py
index 62b5e9a..3849d59 100644
--- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py
+++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py
@@ -107,7 +107,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher):
self.__reset()
- elif not gaze_movement.valid:
+ elif not gaze_movement:
self.__reset()
diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
index 13529e7..2b89cf6 100644
--- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
@@ -100,8 +100,8 @@ class Saccade(GazeFeatures.Saccade):
# Draw line if required
if line_color is not None:
- start_position = self.positions[0]
- last_position = self.positions[-1]
+ start_position = self[0]
+ last_position = self[-1]
cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2)
@@ -142,12 +142,12 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
@DataFeatures.PipelineStepMethod
def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType:
-
+
# Ignore empty gaze position
if not gaze_position:
return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish()
-
+
# Check if too much time elapsed since last valid gaze position
if self.__valid_positions:
@@ -171,7 +171,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Store gaze positions until a minimal duration
self.__valid_positions.append(gaze_position)
-
+
# Once the minimal duration is reached
if self.__valid_positions.duration >= self.__duration_min_threshold:
diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
index c1d448a..a95905f 100644
--- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
@@ -99,8 +99,8 @@ class Saccade(GazeFeatures.Saccade):
# Draw line if required
if line_color is not None:
- start_position = self.positions[0]
- last_position = self.positions[-1]
+ start_position = self[0]
+ last_position = self[-1]
cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2)
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index eac9e5c..6a02142 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -34,13 +34,13 @@ class GazePosition(tuple, DataFeatures.TimestampedObject):
message: a string to describe why the the position is what it is.
"""
- def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, **kwargs):
+ def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan):
return tuple.__new__(cls, position)
- def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, **kwargs):
+ def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan):
- DataFeatures.TimestampedObject.__init__(self, **kwargs)
+ DataFeatures.TimestampedObject.__init__(self, timestamp)
self.__precision = precision
self.__message = message
@@ -178,7 +178,7 @@ TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeSt
# Type definition for type annotation convenience
class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList):
- """Handle timestamped gaze positions into a list"""
+ """Handle timestamped gaze positions into a list."""
def __init__(self, gaze_positions: list = []):
@@ -188,6 +188,7 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList):
"""Get all timestamped position values as list of tuple."""
return [tuple(ts_position) for ts_position in self]
+ ''' Is it still needed as there is a TimestampedObjectsList.from_json method?
@classmethod
def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType:
"""Create a TimeStampedGazePositionsType from .json file."""
@@ -197,6 +198,7 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList):
json_positions = json.load(ts_positions_file)
return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions})
+ '''
@classmethod
def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> TimeStampedGazePositionsType:
@@ -367,15 +369,15 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject):
message: a string to describe why the movement is what it is.
"""
- def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
+ def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan):
return TimeStampedGazePositions.__new__(cls, positions)
- def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
+ def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan):
"""Initialize GazeMovement"""
TimeStampedGazePositions.__init__(self, positions)
- DataFeatures.TimestampedObject.__init__(self, **kwargs)
+ DataFeatures.TimestampedObject.__init__(self, timestamp)
self.__finished = finished
self.__message = message
@@ -385,9 +387,13 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject):
"""Get first position timestamp."""
return self[0].timestamp
+ def is_timestamped(self) -> bool:
+ """If first position exist, the movement is timestamped."""
+ return bool(self)
+
@timestamp.setter
def timestamp(self, timestamp: int|float):
- """Block gaze movment timestamp setting."""
+ """Block gaze movement timestamp setting."""
raise('GazeMovement timestamp is first positon timestamp.')
@property
@@ -774,7 +780,7 @@ class ScanPath(list):
self.__duration -= oldest_step.duration
- def append_saccade(self, ts, saccade) -> ScanStepType:
+ def append_saccade(self, saccade) -> ScanStepType:
"""Append new saccade to scan path and return last new scan step if one have been created."""
# Ignore saccade if no fixation came before
@@ -802,7 +808,7 @@ class ScanPath(list):
# Clear last fixation
self.__last_fixation = None
- def append_fixation(self, ts, fixation):
+ def append_fixation(self, fixation):
"""Append new fixation to scan path.
!!! warning
Consecutives fixations are ignored keeping the last fixation"""
diff --git a/src/argaze/PupillAnalysis/WorkloadIndex.py b/src/argaze/PupillAnalysis/WorkloadIndex.py
index 1f3c586..f97dce3 100644
--- a/src/argaze/PupillAnalysis/WorkloadIndex.py
+++ b/src/argaze/PupillAnalysis/WorkloadIndex.py
@@ -15,51 +15,61 @@ from argaze import PupillFeatures
import numpy
-@dataclass
class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer):
- """Periodic average of pupill diameter variations to pupill diameter reference value."""
+ """Periodic average of pupill diameter variations to pupill diameter reference value.
- reference: PupillFeatures.PupillDiameter
- """ """
+ Parameters:
+ reference: base line value.
+ period: identification period length.
+ """
+ def __init__(self, reference: PupillFeatures.PupillDiameter, period: int|float = 1):
- period: int | float = field(default=1)
- """Identification period length."""
+ assert(not math.isnan(self.__reference))
- def __post_init__(self):
-
- assert(self.reference.valid)
+ self.__reference = reference
+ self.__period = period
self.__variations_sum = 0.
self.__variations_number = 0
self.__last_ts = 0
+ @property
+ def reference(self) -> PupillFeatures.PupillDiameter:
+ """Get workload index reference."""
+ return self.__reference
+
+ @property
+ def period(self) -> int|float:
+ """Get workload index period."""
+ return self.__period
+
@DataFeatures.PipelineStepMethod
- def analyze(self, ts: int|float, pupill_diameter) -> float:
+ def analyze(self, pupill_diameter: PupillFeatures.PupillDiameter) -> float:
"""Analyze workload index from successive timestamped pupill diameters."""
# Ignore non valid pupill diameter
- if not pupill_diameter.valid:
+ if not math.isnan(pupill_diameter):
return None
- if ts - self.__last_ts >= self.period:
+ if pupill_diameter.timestamp - self.__last_ts >= self.__period:
- if self.__variations_number > 0 and self.reference.value > 0.:
+ if self.__variations_number > 0 and self.__reference.value > 0.:
- workload_index = (self.__variations_sum / self.__variations_number) / self.reference.value
+ workload_index = (self.__variations_sum / self.__variations_number) / self.__reference.value
else:
workload_index = 0.
- self.__variations_sum = pupill_diameter.value - self.reference.value
+ self.__variations_sum = pupill_diameter.value - self.__reference.value
self.__variations_number = 1
- self.__last_ts = ts
+ self.__last_ts = pupill_diameter.timestamp
return workload_index
else:
- self.__variations_sum += pupill_diameter.value - self.reference.value
+ self.__variations_sum += pupill_diameter.value - self.__reference.value
self.__variations_number += 1
\ No newline at end of file
diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupillFeatures.py
index d8f9331..492e7ca 100644
--- a/src/argaze/PupillFeatures.py
+++ b/src/argaze/PupillFeatures.py
@@ -10,71 +10,41 @@ __license__ = "BSD"
from typing import TypeVar
from dataclasses import dataclass, field
import json
+import math
from argaze import DataFeatures
-@dataclass(frozen=True)
-class PupillDiameter():
- """Define pupill diameter as ..."""
-
- value: float = field(default=0.)
- """Pupill diameter value."""
-
- @property
- def valid(self) -> bool:
- """Is the value not 0"""
-
- return self.value != 0.
-
- def __repr__(self):
- """String representation"""
+PupillDiameterType = TypeVar('PupillDiameter', bound="PupillDiameter")
+# Type definition for type annotation convenience
- return json.dumps(self, ensure_ascii = False, default=vars)
+class PupillDiameter(float, DataFeatures.TimestampedObject):
+ """Define pupill diameter as a single float value.
-class UnvalidPupillDiameter(PupillDiameter):
- """Unvalid pupill diameter."""
+ Parameters:
+ value: pupill diameter value.
+ """
+ def __new__(cls, value: float = math.nan, **kwargs):
- def __init__(self, message=None):
+ return float.__new__(cls, value)
- self.message = message
+ def __init__(self, value: float = math.nan, **kwargs):
- super().__init__(0.)
+ super().__init__(**kwargs)
+ @property
+ def value(self):
+ """Get pupill diameter value."""
+ return float(self)
+
TimeStampedPupillDiametersType = TypeVar('TimeStampedPupillDiameters', bound="TimeStampedPupillDiameters")
# Type definition for type annotation convenience
-class TimeStampedPupillDiameters(DataFeatures.TimeStampedBuffer):
- """Define timestamped buffer to store pupill diameters."""
-
- def __setitem__(self, key, value: PupillDiameter|dict):
- """Force PupillDiameter storage."""
-
- # Convert dict into PupillDiameter
- if type(value) == dict:
-
- assert(set(['value']).issubset(value.keys()))
-
- if 'message' in value.keys():
-
- value = UnvalidPupillDiameter(value['message'])
-
- else:
-
- value = PupillDiameter(value['value'])
-
- assert(type(value) == PupillDiameter or type(value) == UnvalidPupillDiameter)
-
- super().__setitem__(key, value)
-
- @classmethod
- def from_json(self, json_filepath: str) -> TimeStampedPupillDiametersType:
- """Create a TimeStampedPupillDiametersType from .json file."""
-
- with open(json_filepath, encoding='utf-8') as ts_buffer_file:
+class TimeStampedPupillDiameters(DataFeatures.TimestampedObjectsList):
+ """Handle timestamped pupill diamters into a list."""
- json_buffer = json.load(ts_buffer_file)
+ def __init__(self, pupill_diameters: list = []):
- return TimeStampedPupillDiameters({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer})
+ DataFeatures.TimestampedObjectsList.__init__(self, PupillDiameter, pupill_diameters)
TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer")
# Type definition for type annotation convenience
@@ -83,7 +53,7 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject):
"""Abstract class to define what should provide a pupill diameter analyser."""
@DataFeatures.PipelineStepMethod
- def analyze(self, timestamp: int|float, pupill_diameter, float) -> float:
+ def analyze(self, pupill_diameter: PupillDiameterType) -> any:
"""Analyze pupill diameter from successive timestamped pupill diameters."""
raise NotImplementedError('analyze() method not implemented')
@@ -96,9 +66,9 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject):
ts_analyzis = DataFeatures.TimeStampedBuffer()
# Iterate on pupill diameters
- for ts, pupill_diameter in ts_pupill_diameters.items():
+ for pupill_diameter in ts_pupill_diameters:
- analysis = self.analyze(ts, pupill_diameter)
+ analysis = self.analyze(pupill_diameter)
if analysis is not None: