aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2024-02-28 13:57:31 +0100
committerThéo de la Hogue2024-02-28 13:57:31 +0100
commit5f915a84f32405dc8bddae4ecbf95f4745af6fbc (patch)
treef882ba799e2fc8a824274fe32ac4ae48511690a9
parentd6754148e3866cd8051ff01d1dbbd5534664bc2a (diff)
downloadargaze-5f915a84f32405dc8bddae4ecbf95f4745af6fbc.zip
argaze-5f915a84f32405dc8bddae4ecbf95f4745af6fbc.tar.gz
argaze-5f915a84f32405dc8bddae4ecbf95f4745af6fbc.tar.bz2
argaze-5f915a84f32405dc8bddae4ecbf95f4745af6fbc.tar.xz
More work on TimestampedGazePositions and GazeMovements.
-rw-r--r--src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py4
-rw-r--r--src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py4
-rw-r--r--src/argaze.test/GazeFeatures.py122
-rw-r--r--src/argaze/ArFeatures.py6
-rw-r--r--src/argaze/DataFeatures.py5
-rw-r--r--src/argaze/GazeAnalysis/DispersionThresholdIdentification.py48
-rw-r--r--src/argaze/GazeFeatures.py178
7 files changed, 184 insertions, 183 deletions
diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
index b7475b5..f0d286a 100644
--- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
@@ -47,7 +47,7 @@ def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time
else:
- gaze_position = GazeFeatures.UnvalidGazePosition()
+ gaze_position = GazeFeatures.GazePosition()
# Store gaze position
ts = time.time() - start_time + start_ts
@@ -85,7 +85,7 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl
else:
- gaze_position = GazeFeatures.UnvalidGazePosition()
+ gaze_position = GazeFeatures.GazePosition()
# Store gaze position
ts = time.time() - start_time + start_ts
diff --git a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py
index 425d592..24f2e3c 100644
--- a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py
+++ b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py
@@ -53,7 +53,7 @@ def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float,
else:
- gaze_position = GazeFeatures.UnvalidGazePosition()
+ gaze_position = GazeFeatures.GazePosition()
# Store gaze position
ts = time.time() - start_time + start_ts
@@ -91,7 +91,7 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl
else:
- gaze_position = GazeFeatures.UnvalidGazePosition()
+ gaze_position = GazeFeatures.GazePosition()
# Store gaze position
ts = time.time() - start_time + start_ts
diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py
index fdc140d..7d18976 100644
--- a/src/argaze.test/GazeFeatures.py
+++ b/src/argaze.test/GazeFeatures.py
@@ -39,28 +39,27 @@ def random_gaze_positions(size, frame_dimension: tuple[float, float] = (1, 1)):
return ts_gaze_positions
-@dataclass(frozen=True)
class TestFixation(GazeFeatures.Fixation):
"""Define basic fixation class for test."""
- def __post_init__(self):
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
- super().__post_init__()
+ super().__init__(positions, finished, message, **kwargs)
- points = self.positions.values()
- points_x, points_y = [p[0] for p in points], [p[1] for p in points]
- points_array = numpy.column_stack([points_x, points_y])
- centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)])
+ if positions:
- # Update frozen focus attribute using centroid
- object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1]))
+ positions_array = numpy.asarray(self.values())
+ centroid = numpy.mean(positions_array, axis=0)
+
+ # Update focus attribute using centroid
+ self.focus = (centroid[0], centroid[1])
-@dataclass(frozen=True)
class TestSaccade(GazeFeatures.Saccade):
"""Define basic saccade for test."""
- def __post_init__(self):
- super().__post_init__()
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
+
+ super().__init__(positions, finished, message, **kwargs)
class TestGazePositionClass(unittest.TestCase):
"""Test GazePosition class."""
@@ -265,11 +264,11 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase):
self.assertEqual(ts_gaze_positions[0].precision, 15)
self.assertEqual(bool(ts_gaze_positions[0]), True)
- # Check third gaze position is correctly stored and accessible as a UnvalidGazePosition
+ # Check third gaze position is correctly stored and accessible as a GazePosition
self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition)
self.assertEqual(numpy.isnan(ts_gaze_positions[2].precision), True)
self.assertEqual(bool(ts_gaze_positions[2]), False)
-
+
def test_as_dataframe(self):
"""Test inherited as_dataframe method."""
@@ -309,19 +308,19 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase):
class TestGazeMovementClass(unittest.TestCase):
"""Test GazeMovement class."""
- @unittest.skip("DEBUG")
+
def test_new(self):
"""Test GazeMovement creation."""
abstract_gaze_movement = GazeFeatures.GazeMovement(random_gaze_positions(0))
# Check abstract GazeMovement
- self.assertEqual(len(abstract_gaze_movement.positions), 0)
- self.assertEqual(abstract_gaze_movement.duration, -1)
- self.assertEqual(abstract_gaze_movement.amplitude, -1)
- self.assertEqual(abstract_gaze_movement.valid, False)
+ self.assertEqual(len(abstract_gaze_movement), 0)
+ self.assertEqual(abstract_gaze_movement.duration, 0)
+ self.assertEqual(abstract_gaze_movement.amplitude, 0)
+ self.assertEqual(bool(abstract_gaze_movement), False)
self.assertEqual(abstract_gaze_movement.finished, False)
- @unittest.skip("DEBUG")
+
def test_finish(self):
"""Test GazeMovement finishing."""
@@ -340,25 +339,22 @@ class TestGazeMovementClass(unittest.TestCase):
self.assertEqual(abstract_gaze_movement.finished, True)
self.assertEqual(abstract_gaze_movement_ref.finished, True)
-class TestUnvalidGazeMovementClass(unittest.TestCase):
- """Test UnvalidGazeMovement class."""
- @unittest.skip("DEBUG")
- def test_new(self):
- """Test UnvalidGazeMovement creation."""
+ def test_message(self):
+ """Test GazeMovement creation with message only."""
- unvalid_gaze_movement = GazeFeatures.UnvalidGazeMovement('test')
+ gaze_movement = GazeFeatures.GazeMovement(message='test')
- # Check UnvalidGazeMovement
- self.assertEqual(len(unvalid_gaze_movement.positions), 0)
- self.assertEqual(unvalid_gaze_movement.duration, -1)
- self.assertEqual(unvalid_gaze_movement.amplitude, -1)
- self.assertEqual(unvalid_gaze_movement.valid, False)
- self.assertEqual(unvalid_gaze_movement.finished, False)
- self.assertEqual(unvalid_gaze_movement.message, 'test')
+ # Check GazeMovement
+ self.assertEqual(len(gaze_movement), 0)
+ self.assertEqual(gaze_movement.duration, 0)
+ self.assertEqual(gaze_movement.amplitude, 0)
+ self.assertEqual(bool(gaze_movement), False)
+ self.assertEqual(gaze_movement.finished, False)
+ self.assertEqual(gaze_movement.message, 'test')
class TestScanStepClass(unittest.TestCase):
"""Test ScanStep class."""
- @unittest.skip("DEBUG")
+
def test_new(self):
"""Test ScanStep creation."""
@@ -371,7 +367,7 @@ class TestScanStepClass(unittest.TestCase):
self.assertEqual(scan_step.first_fixation, fixation)
self.assertEqual(scan_step.last_saccade, saccade)
self.assertGreater(scan_step.duration, 0)
-
+
def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)):
"""Build scan path"""
@@ -380,18 +376,18 @@ def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)):
for i in range(size):
fixation = TestFixation(random_gaze_positions(10, frame_dimension))
- ts, _ = fixation.positions.first
+ ts, _ = fixation.first
scan_path.append_fixation(ts, fixation)
saccade = TestSaccade(random_gaze_positions(2, frame_dimension))
- ts, _ = saccade.positions.first
+ ts, _ = saccade.first
scan_path.append_saccade(ts, saccade)
return scan_path
class TestScanPathClass(unittest.TestCase):
"""Test ScanPath class."""
- @unittest.skip("DEBUG")
+
def test_new(self):
"""Test ScanPath creation."""
@@ -400,7 +396,7 @@ class TestScanPathClass(unittest.TestCase):
self.assertEqual(len(scan_path), 0)
self.assertEqual(scan_path.duration, 0)
- @unittest.skip("DEBUG")
+
def test_append(self):
"""Test ScanPath append methods."""
@@ -408,7 +404,7 @@ class TestScanPathClass(unittest.TestCase):
# Append a saccade that should be ignored
saccade = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade.positions.first
+ ts = saccade[0].timestamp
new_step = scan_path.append_saccade(ts, saccade)
@@ -419,7 +415,7 @@ class TestScanPathClass(unittest.TestCase):
# Append first fixation
fixation_A = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_A.positions.first
+ ts = fixation_A[0].timestamp
new_step = scan_path.append_fixation(ts, fixation_A)
@@ -430,7 +426,7 @@ class TestScanPathClass(unittest.TestCase):
# Append consecutive saccade
saccade_A = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade_A.positions.first
+ ts = saccade_A[0].timestamp
new_step_A = scan_path.append_saccade(ts, saccade_A)
@@ -443,7 +439,7 @@ class TestScanPathClass(unittest.TestCase):
# Append 2 consecutive fixations then a saccade
fixation_B1 = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_B1.positions.first
+ ts = fixation_B1[0].timestamp
new_step = scan_path.append_fixation(ts, fixation_B1)
@@ -453,7 +449,7 @@ class TestScanPathClass(unittest.TestCase):
self.assertEqual(new_step, None)
fixation_B2 = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_B2.positions.first
+ ts = fixation_B2[0].timestamp
new_step = scan_path.append_fixation(ts, fixation_B2)
@@ -463,7 +459,7 @@ class TestScanPathClass(unittest.TestCase):
self.assertEqual(new_step, None)
saccade_B = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade_B.positions.first
+ ts = saccade_B[0].timestamp
new_step_B = scan_path.append_saccade(ts, saccade_B)
@@ -476,19 +472,19 @@ class TestScanPathClass(unittest.TestCase):
class TestAOIScanStepClass(unittest.TestCase):
"""Test AOIScanStep class."""
- @unittest.skip("DEBUG")
+
def test_new(self):
"""Test AOIScanStep creation."""
movements = GazeFeatures.TimeStampedGazeMovements()
fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
- movements[ts] = fixation
+ ts = fixation[0].timestamp
+ movements.append(fixation)
saccade = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade.positions.first
- movements[ts] = saccade
+ ts = saccade[0].timestamp
+ movements.append(saccade)
aoi_scan_step = GazeFeatures.AOIScanStep(movements, 'Test')
@@ -505,12 +501,12 @@ class TestAOIScanStepClass(unittest.TestCase):
movements = GazeFeatures.TimeStampedGazeMovements()
saccade = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade.positions.first
- movements[ts] = saccade
+ ts = saccade[0].timestamp
+ movements.append(saccade)
fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
- movements[ts] = fixation
+ ts = fixation[0].timestamp
+ movements.append(fixation)
# Check that aoi scan step creation fail
with self.assertRaises(GazeFeatures.AOIScanStepError):
@@ -528,11 +524,11 @@ def build_aoi_scan_path(expected_aoi, aoi_path):
for aoi in aoi_path:
fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
+ ts, _ = fixation.first
aoi_scan_path.append_fixation(ts, fixation, aoi)
saccade = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade.positions.first
+ ts, _ = saccade.first
aoi_scan_path.append_saccade(ts, saccade)
return aoi_scan_path
@@ -555,7 +551,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append fixation on A aoi
fixation_A = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_A.positions.first
+ ts, _ = fixation_A.first
new_step = aoi_scan_path.append_fixation(ts, fixation_A, 'Foo')
@@ -566,7 +562,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append saccade
saccade_A = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade_A.positions.first
+ ts, _ = saccade_A.first
new_step = aoi_scan_path.append_saccade(ts, saccade_A)
@@ -577,7 +573,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append fixation on B aoi
fixation_B = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_B.positions.first
+ ts, _ = fixation_B.first
new_step_A = aoi_scan_path.append_fixation(ts, fixation_B, 'Bar')
@@ -588,8 +584,8 @@ class TestAOIScanPathClass(unittest.TestCase):
self.assertEqual(new_step_A.aoi, 'Foo')
self.assertEqual(new_step_A.letter, 'A')
- first_ts, _ = fixation_A.positions.first
- last_ts, _ = saccade_A.positions.last
+ first_ts, _ = fixation_A.first
+ last_ts, _ = saccade_A.last
self.assertEqual(new_step_A.duration, last_ts - first_ts)
@@ -603,7 +599,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append fixation on A aoi
fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
+ ts, _ = fixation.first
new_step = aoi_scan_path.append_fixation(ts, fixation, 'Foo')
@@ -613,7 +609,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append fixation on B aoi
fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
+ ts, _ = fixation.first
# Check that aoi scan step creation fail when fixation is appened after another fixation
with self.assertRaises(GazeFeatures.AOIScanStepError):
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 5fcc990..47a91e9 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -384,7 +384,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
)
@DataFeatures.PipelineStepMethod
- def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()):
+ def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()):
"""
Project timestamped gaze movement into layer.
@@ -531,7 +531,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__layers = layers
self.__image_parameters = image_parameters
- self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition()
+ self.__calibrated_gaze_position = GazeFeatures.GazePosition()
self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
self.__scan_path_analyzed = False
@@ -875,7 +875,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
)
@DataFeatures.PipelineStepMethod
- def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]:
+ def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()) -> Iterator[Union[object, type, dict]]:
"""
Project timestamped gaze position into frame.
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index 931c21d..7c53c2a 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -133,7 +133,6 @@ class TimeStampedObjectsList(list):
def __init__(self, ts_object_type: type, ts_objects: list = []):
- super().__init__()
self.__object_type = ts_object_type
self.__object_properties = properties(self.__object_type)
@@ -384,11 +383,11 @@ class TimestampedObject():
def untimestamp(self):
"""Reset object timestamp."""
- self._timestamp = math.nan
+ self.timestamp = math.nan
def is_timestamped(self) -> bool:
"""Is the object timestamped?"""
- timestamped = not math.isnan(self._timestamp)
+ timestamped = not math.isnan(self.timestamp)
return timestamped
diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
index 6f8c554..f8e519f 100644
--- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
@@ -37,17 +37,15 @@ class Fixation(GazeFeatures.Fixation):
super().__post_init__()
- points = self.positions.values()
- points_x, points_y = [p[0] for p in points], [p[1] for p in points]
- points_array = numpy.column_stack([points_x, points_y])
- centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)])
- deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))
+ positions_array = numpy.asarray(self.positions.values())
+ centroid = numpy.mean(positions_array, axis=0)
+ deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
# Update frozen focus attribute using centroid
- object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1]))
+ object.__setattr__(self, 'focus', (centroid[0], centroid[1]))
# Update frozen deviation_max attribute
- object.__setattr__(self, 'deviation_max', max(deviations_array))
+ object.__setattr__(self, 'deviation_max', deviations_array.max())
def point_deviation(self, gaze_position) -> float:
"""Get distance of a point from the fixation's centroïd."""
@@ -57,13 +55,11 @@ class Fixation(GazeFeatures.Fixation):
def overlap(self, fixation) -> bool:
"""Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?"""
- points = fixation.positions.values()
- points_x, points_y = [p[0] for p in points], [p[1] for p in points]
- points_array = numpy.column_stack([points_x, points_y])
- centroid_array = numpy.array([self.focus[0], self.focus[1]])
- deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))
+ positions_array = numpy.asarray(self.positions.values())
+ centroid = numpy.array(list(self.focus))
+ deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
- return min(deviations_array) <= self.deviation_max
+ return deviations_array.min() <= self.deviation_max
def merge(self, fixation) -> FixationType:
"""Merge another fixation into this fixation."""
@@ -114,8 +110,8 @@ class Saccade(GazeFeatures.Saccade):
# Draw line if required
if line_color is not None:
- _, start_position = self.positions.first
- _, last_position = self.positions.last
+ _, start_position = self.positions[0]
+ _, last_position = self.positions[-1]
cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2)
@@ -148,14 +144,14 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType:
# Ignore non valid gaze position
- if not gaze_position.valid:
+ if not gaze_position:
return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish()
# Check if too much time elapsed since last valid gaze position
if len(self.__valid_positions) > 0:
- ts_last, _ = self.__valid_positions.last
+ ts_last, _ = self.__valid_positions[-1]
if (ts - ts_last) > self.duration_min_threshold:
@@ -168,16 +164,16 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
# Store valid gaze position
- self.__valid_positions[ts] = gaze_position
+ self.__valid_positions.append(gaze_position)
# Return last valid movement if exist
return last_movement
# Store gaze positions until a minimal duration
- self.__valid_positions[ts] = gaze_position
+ self.__valid_positions.append(gaze_position)
- first_ts, _ = self.__valid_positions.first
- last_ts, _ = self.__valid_positions.last
+ first_ts, _ = self.__valid_positions[0]
+ last_ts, _ = self.__valid_positions[-1]
# Once the minimal duration is reached
if last_ts - first_ts >= self.duration_min_threshold:
@@ -194,8 +190,8 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
if len(self.__saccade_positions) > 0:
# Copy oldest valid position into saccade positions
- first_ts, first_position = self.__valid_positions.first
- self.__saccade_positions[first_ts] = first_position
+ first_ts, first_position = self.__valid_positions[0]
+ self.__saccade_positions.append(first_position)
# Finish last saccade
last_saccade = self.current_saccade.finish()
@@ -218,8 +214,8 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
if len(self.__fixation_positions) > 0:
# Copy most recent fixation position into saccade positions
- last_ts, last_position = self.__fixation_positions.last
- self.__saccade_positions[last_ts] = last_position
+ last_ts, last_position = self.__fixation_positions[-1]
+ self.__saccade_positions.append(last_position)
# Finish last fixation
last_fixation = self.current_fixation.finish()
@@ -238,7 +234,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Move oldest valid position into saccade positions
first_ts, first_position = self.__valid_positions.pop_first()
- self.__saccade_positions[first_ts] = first_position
+ self.__saccade_positions.append(first_position)
# Always return unvalid gaze movement at least
return GazeFeatures.UnvalidGazeMovement()
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index 1c27540..54784ac 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -17,6 +17,7 @@ from inspect import getmembers
from argaze import DataFeatures
from argaze.AreaOfInterest import AOIFeatures
+from argaze.utils import UtilsFeatures # DEBUG
import numpy
import pandas
@@ -161,7 +162,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject):
def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True):
"""Draw gaze position point and precision circle."""
- if self.valid:
+ if self:
int_value = (int(self[0]), int(self[1]))
@@ -170,7 +171,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject):
cv2.circle(image, int_value, size, color, -1)
# Draw precision circle
- if self.__precision > 0 and draw_precision:
+ if self.__precision is not None and draw_precision:
cv2.circle(image, int_value, round(self.__precision), color, 1)
TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeStampedGazePositions")
@@ -179,9 +180,14 @@ TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeSt
class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList):
"""Handle timestamped gaze positions into a list"""
+ #@UtilsFeatures.PrintCallStack
def __init__(self, gaze_positions: list = []):
- super().__init__(GazePosition, gaze_positions)
+ DataFeatures.TimeStampedObjectsList.__init__(self, GazePosition, gaze_positions)
+
+ def values(self) -> list:
+ """Get all timestamped position values as list of tuple."""
+ return [tuple(ts_position) for ts_position in self]
@classmethod
def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType:
@@ -206,29 +212,24 @@ class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList):
"""
# Copy columns
- if precision:
+ columns = (timestamp, x, y)
- df = dataframe.loc[:, (timestamp, x, y, precision)]
+ if precision is not None:
- else:
+ columns += (precision,)
+
+ if message is not None:
- df = dataframe.loc[:, (timestamp, x, y)]
+ columns += (message,)
- # DEBUG
- print(df)
+ df = dataframe.loc[:, columns]
# Merge x and y columns into one 'value' column
df['value'] = tuple(zip(df[x], df[y]))
df.drop(columns=[x, y], inplace=True, axis=1)
- # DEBUG
- print(df)
-
- # Replace (NaN, NaN) values by ()
- df['value'] = df.apply(lambda row: print(row.values[1], pandas.isnull(row.values[1])), axis=True)
-
- # DEBUG
- print(df)
+ # Replace tuple values containing NaN values by ()
+ df['value'] = df.apply(lambda row: () if pandas.isnull(list(row.value)).any() else row.value, axis=True)
# Handle precision data
if precision:
@@ -358,58 +359,85 @@ class GazePositionCalibrator():
GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement")
# Type definition for type annotation convenience
-@dataclass(frozen=True)
-class GazeMovement(DataFeatures.TimestampedObject):
- """Define abstract gaze movement class as a buffer of timestamped positions."""
+class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject):
+ """Define abstract gaze movement class as timestamped gaze positions list.
- positions: TimeStampedGazePositions
- """All timestamp gaze positions."""
+ Parameters:
+ positions: timestamp gaze positions.
+ finished: is the movement finished?
+ message: a string to describe why the movement is what it is.
+ """
- duration: float = field(init=False)
- """Inferred duration from first and last timestamps."""
+ def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
- amplitude: float = field(init=False)
- """Inferred amplitude from first and last positions."""
+ return TimeStampedGazePositions.__new__(cls, positions)
- finished: bool = field(init=False, default=False)
- """Is the movement finished?"""
+ def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
+ """Initialize GazeMovement"""
- def __post_init__(self):
+ TimeStampedGazePositions.__init__(self, positions)
+ DataFeatures.TimestampedObject.__init__(self, **kwargs)
- if self.valid:
+ self.__finished = finished
+ self.__message = message
- start_position_ts, start_position = self.positions.first
- end_position_ts, end_position = self.positions.last
+ @property
+ def timestamp(self) -> int|float:
+ """Get first position timestamp."""
+ return self[0].timestamp
- # Update frozen duration attribute
- object.__setattr__(self, 'duration', end_position_ts - start_position_ts)
+ @timestamp.setter
+ def timestamp(self, timestamp: int|float):
+ """Block gaze movment timestamp setting."""
+ raise('GazeMovement timestamp is first positon timestamp.')
- _, start_position = self.positions.first
- _, end_position = self.positions.last
+ @property
+ def finished(self) -> bool:
+ """Is the movement finished?"""
+ return self.__finished
+
+ def finish(self) -> GazeMovementType:
+ """Set gaze movement as finished"""
+ self.__finished = True
+ return self
+
+ @property
+ def message(self):
+ """Get movement's message."""
+ return self.__message
- amplitude = numpy.linalg.norm(start_position - end_position)
+ @property
+ def duration(self):
+ """Get inferred duration from first and last timestamps."""
+ if self:
- # Update frozen amplitude attribute
- object.__setattr__(self, 'amplitude', amplitude)
+ return self[-1].timestamp - self[0].timestamp
else:
- # Update frozen duration attribute
- object.__setattr__(self, 'duration', -1)
+ return 0
+
+ @property
+ def amplitude(self):
+ """Get inferred amplitude from first and last positions."""
+ if self:
- # Update frozen amplitude attribute
- object.__setattr__(self, 'amplitude', -1)
+ return numpy.linalg.norm(self[0] - self[-1])
+
+ else:
+
+ return 0
def __str__(self) -> str:
"""String display"""
- if self.valid:
+ if self:
- output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self.positions)}\n\tfinished={self.finished}'
+ output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.finished}'
- for ts, position in self.positions.items():
+ for position in self:
- output += f'\n\t{ts}:\n\t\tvalue={position},\n\t\tprecision={position.precision}'
+ output += f'\n\t{position.timestamp}:\n\t\tvalue={position},\n\t\tprecision={position.precision}'
else:
@@ -417,20 +445,6 @@ class GazeMovement(DataFeatures.TimestampedObject):
return output
- @property
- def valid(self) -> bool:
- """Is there positions?"""
-
- return len(self.positions) > 0
-
- def finish(self) -> GazeMovementType:
- """Set gaze movement as finished"""
-
- # Update frozen finished attribute
- object.__setattr__(self, 'finished', True)
-
- return self
-
def draw_positions(self, image: numpy.array, position_color: tuple = None, line_color: tuple = None):
"""Draw gaze movement positions with line between each position.
@@ -439,12 +453,12 @@ class GazeMovement(DataFeatures.TimestampedObject):
line_color: color of line between each position
"""
- gaze_positions = self.positions.copy()
+ positions = self.copy()
- while len(gaze_positions) >= 2:
+ while len(positions) >= 2:
- ts_start, start_gaze_position = gaze_positions.pop_first()
- ts_next, next_gaze_position = gaze_positions.first
+ start_gaze_position = positions.pop(0)
+ next_gaze_position = positions[0]
# Draw line between positions if required
if line_color is not None:
@@ -461,31 +475,27 @@ class GazeMovement(DataFeatures.TimestampedObject):
raise NotImplementedError('draw() method not implemented')
-class UnvalidGazeMovement(GazeMovement):
- """Unvalid gaze movement."""
-
- def __init__(self, message=None):
-
- self.message = message
-
- super().__init__(TimeStampedGazePositions())
-
- def draw(self, image: numpy.array, **kwargs):
-
- pass
-
FixationType = TypeVar('Fixation', bound="Fixation")
# Type definition for type annotation convenience
class Fixation(GazeMovement):
"""Define abstract fixation as gaze movement."""
- focus: tuple = field(init=False)
- """Representative position of the fixation."""
+ def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
- def __post_init__(self):
+ super().__init__(positions, finished, message, **kwargs)
+
+ self._focus = ()
- super().__post_init__()
+ @property
+ def focus(self) -> tuple:
+ """Get representative position of the fixation."""
+ return self._focus
+
+ @focus.setter
+ def focus(self, focus: tuple):
+ """Set representative position of the fixation."""
+ self._focus = focus
def merge(self, fixation) -> FixationType:
"""Merge another fixation into this fixation."""
@@ -500,9 +510,9 @@ def is_fixation(gaze_movement):
class Saccade(GazeMovement):
"""Define abstract saccade as gaze movement."""
- def __post_init__(self):
+ def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
- super().__post_init__()
+ super().__init__(positions, finished, message, **kwargs)
def is_saccade(gaze_movement):
"""Is a gaze movement a saccade?"""