aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThéo de la Hogue2024-02-29 10:36:47 +0100
committerThéo de la Hogue2024-02-29 10:36:47 +0100
commitfaa6d8acf3c9e4d11a3ee84df2d5a48501befd68 (patch)
treec908a9c1df99053f32f8d4d50994cca1bf87bc62 /src
parent10969554e3126c65d19e408b06b3169f35b81e41 (diff)
downloadargaze-faa6d8acf3c9e4d11a3ee84df2d5a48501befd68.zip
argaze-faa6d8acf3c9e4d11a3ee84df2d5a48501befd68.tar.gz
argaze-faa6d8acf3c9e4d11a3ee84df2d5a48501befd68.tar.bz2
argaze-faa6d8acf3c9e4d11a3ee84df2d5a48501befd68.tar.xz
Fixing DispersionThresholdIdentification test.
Diffstat (limited to 'src')
-rw-r--r--src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py54
-rw-r--r--src/argaze/DataFeatures.py16
-rw-r--r--src/argaze/GazeAnalysis/DispersionThresholdIdentification.py106
-rw-r--r--src/argaze/GazeFeatures.py101
4 files changed, 128 insertions, 149 deletions
diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
index 7e74c1d..07496c3 100644
--- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
@@ -121,12 +121,12 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check fixation
fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
-
+
def test_fixation_and_direct_saccade_identification(self):
"""Test DispersionThresholdIdentification fixation and saccade identification."""
@@ -153,7 +153,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check first fixation
fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
@@ -162,7 +162,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check first saccade
saccade = ts_saccades.pop(0)
- self.assertEqual(len(saccade.positions.keys()), 2)
+ self.assertEqual(len(saccade), 2)
self.assertGreaterEqual(saccade.duration, min_time)
self.assertLessEqual(saccade.duration, max_time)
self.assertLessEqual(saccade.finished, True)
@@ -174,7 +174,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check second fixation
fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
@@ -183,7 +183,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check that last position of a movement is equal to first position of next movement
self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp)
self.assertEqual(saccade[-1].value, fixation[0].value)
-
+
def test_fixation_and_short_saccade_identification(self):
"""Test DispersionThresholdIdentification fixation and saccade identification."""
@@ -213,7 +213,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check first fixation
fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
@@ -222,7 +222,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check first saccade
saccade = ts_saccades.pop(0)
- self.assertEqual(len(saccade.positions.keys()), move + 2)
+ self.assertEqual(len(saccade), move + 2)
self.assertGreaterEqual(saccade.duration, (move + 1) * min_time)
self.assertLessEqual(saccade.duration, (move + 1) * max_time)
self.assertLessEqual(saccade.finished, True)
@@ -234,7 +234,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check second fixation
fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
@@ -243,9 +243,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check that last position of a movement is equal to first position of next movement
self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp)
self.assertEqual(saccade[-1].value, fixation[0].value)
-
- def test_invalid_gaze_position(self):
- """Test DispersionThresholdIdentification fixation and saccade identification with invalid gaze position."""
+
+ def test_empty_gaze_position(self):
+ """Test DispersionThresholdIdentification fixation and saccade identification with empty gaze position."""
size = 15
center = (0, 0)
@@ -267,7 +267,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check first fixation
fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), 7)
+ self.assertEqual(len(fixation), 7)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, 6 * min_time)
self.assertLessEqual(fixation.duration, 6 * max_time)
@@ -276,12 +276,12 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check second fixation
fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), 5)
+ self.assertEqual(len(fixation), 5)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, 4 * min_time)
self.assertLessEqual(fixation.duration, 4 * max_time)
self.assertLessEqual(fixation.finished, True)
-
+
def test_fixation_overlapping(self):
"""Test Fixation overlap function."""
@@ -336,12 +336,12 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check unique fixation
fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size * 2)
+ self.assertEqual(len(fixation), size * 2)
#self.assertGreaterEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (2 * size - 1) * min_time)
self.assertLessEqual(fixation.duration, (2 * size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
-
+
def test_identification_browsing(self):
"""Test DispersionThresholdIdentification identification browsing."""
@@ -355,7 +355,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp)
- ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B)
+ ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B
gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2)
@@ -369,7 +369,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
if GazeFeatures.is_fixation(finished_gaze_movement):
- self.assertEqual(len(finished_gaze_movement.positions.keys()), size)
+ self.assertEqual(len(finished_gaze_movement), size)
self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max)
self.assertGreaterEqual(finished_gaze_movement.duration, (size-1) * min_time)
self.assertLessEqual(finished_gaze_movement.duration, (size-1) * max_time)
@@ -377,13 +377,13 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
elif GazeFeatures.is_saccade(finished_gaze_movement):
- self.assertEqual(len(finished_gaze_movement.positions.keys()), 2)
+ self.assertEqual(len(finished_gaze_movement), 2)
self.assertGreaterEqual(finished_gaze_movement.duration, min_time)
self.assertLessEqual(finished_gaze_movement.duration, max_time)
self.assertLessEqual(finished_gaze_movement.finished, True)
# Check that last gaze position date is not equal to given gaze position date
- if finished_gaze_movement.valid:
+ if finished_gaze_movement:
last_ts = finished_gaze_movement[-1].timestamp
@@ -392,14 +392,14 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
# Check that last gaze position date of current fixation is equal to given gaze position date
# NOTE: This is not true for saccade as, for I-DT, there is a minimal time window while the gaze movement is unknown
current_gaze_movement = gaze_movement_identifier.current_gaze_movement
- if current_gaze_movement.valid:
+ if current_gaze_movement:
if GazeFeatures.is_fixation(current_gaze_movement):
last_ts = current_gaze_movement[-1].timestamp
self.assertEqual(last_ts, gaze_position.timestamp)
-
+
def test_identification_generator(self):
"""Test DispersionThresholdIdentification identification using generator."""
@@ -413,15 +413,15 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp)
- ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B)
+ ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B
gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2)
- for ts, finished_gaze_movement in gaze_movement_identifier(ts_gaze_positions):
+ for finished_gaze_movement in gaze_movement_identifier(ts_gaze_positions):
if GazeFeatures.is_fixation(finished_gaze_movement):
- self.assertEqual(len(finished_gaze_movement.positions.keys()), size)
+ self.assertEqual(len(finished_gaze_movement), size)
self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max)
self.assertGreaterEqual(finished_gaze_movement.duration, size * min_time)
self.assertLessEqual(finished_gaze_movement.duration, size * max_time)
@@ -429,7 +429,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
elif GazeFeatures.is_saccade(finished_gaze_movement):
- self.assertEqual(len(finished_gaze_movement.positions.keys()), 2)
+ self.assertEqual(len(finished_gaze_movement), 2)
self.assertGreaterEqual(finished_gaze_movement.duration, 2 * min_time)
self.assertLessEqual(finished_gaze_movement.duration, 2 * max_time)
self.assertLessEqual(finished_gaze_movement.finished, True)
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index 6be946f..ff9baec 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -205,9 +205,17 @@ class TimestampedObjectsList(list):
super().append(ts_object)
+ def look_for(self, timestamp: TimeStampType) -> TimestampedObjectType:
+ """Look for object at given timestamp."""
+ for ts_object in self:
+
+ if ts_object.timestamp == timestamp:
+
+ return ts_object
+
def __add__(self, ts_objects: list = []) -> TimestampedObjectsListType:
"""Append timestamped objects list."""
-
+
for ts_object in ts_objects:
self.append(ts_object)
@@ -774,7 +782,7 @@ def PipelineStepMethod(method):
PipelineStepMethod must have a timestamp as first argument.
"""
- def wrapper(self, timestamp, *args, unwrap: bool = False):
+ def wrapper(self, timestamp, *args, unwrap: bool = False, **kwargs):
"""Wrap pipeline step method to measure execution time.
Parameters:
@@ -784,7 +792,7 @@ def PipelineStepMethod(method):
"""
if unwrap:
- return method(self, timestamp, *args)
+ return method(self, timestamp, *args, **kwargs)
# Initialize execution time assessment
start = time.perf_counter()
@@ -794,7 +802,7 @@ def PipelineStepMethod(method):
try:
# Execute wrapped method
- result = method(self, timestamp, *args)
+ result = method(self, timestamp, *args, **kwargs)
except Exception as e:
diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
index a452e98..c85e576 100644
--- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
@@ -26,49 +26,39 @@ FixationType = TypeVar('Fixation', bound="Fixation")
SaccadeType = TypeVar('Saccade', bound="Saccade")
# Type definition for type annotation convenience
-@dataclass(frozen=True)
class Fixation(GazeFeatures.Fixation):
"""Define dispersion based fixation."""
- deviation_max: float = field(init=False)
- """Maximal gaze position distance to the centroïd."""
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
- def __post_init__(self):
+ super().__init__(positions, finished, message, **kwargs)
- super().__post_init__()
+ if positions:
- positions_array = numpy.asarray(self.positions.values())
- centroid = numpy.mean(positions_array, axis=0)
- deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
-
- # Update frozen focus attribute using centroid
- object.__setattr__(self, 'focus', (centroid[0], centroid[1]))
+ positions_array = numpy.asarray(self.values())
+ centroid = numpy.mean(positions_array, axis=0)
+ deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
- # Update frozen deviation_max attribute
- object.__setattr__(self, 'deviation_max', deviations_array.max())
+ # Set focus as positions centroid
+ self.focus = (centroid[0], centroid[1])
- def point_deviation(self, gaze_position) -> float:
- """Get distance of a point from the fixation's centroïd."""
+ # Set deviation_max attribute
+ self.__deviation_max = deviations_array.max()
- return numpy.sqrt((self.focus[0] - gaze_position.value[0])**2 + (self.focus[1] - gaze_position.value[1])**2)
+ @property
+ def deviation_max(self):
+ """Get fixation's maximal deviation."""
+ return self.__deviation_max
- def overlap(self, fixation) -> bool:
+ def overlap(self, fixation: FixationType) -> bool:
"""Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?"""
-
- positions_array = numpy.asarray(self.positions.values())
- centroid = numpy.array(list(self.focus))
+
+ positions_array = numpy.asarray(fixation.values())
+ centroid = numpy.mean(self.focus, axis=0)
deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
- return deviations_array.min() <= self.deviation_max
-
- def merge(self, fixation) -> FixationType:
- """Merge another fixation into this fixation."""
-
- self.positions.append(fixation.positions)
- self.__post_init__()
-
- return self
-
+ return min(deviations_array) <= self.deviation_max
+
def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None):
"""Draw fixation into image.
@@ -93,12 +83,12 @@ class Fixation(GazeFeatures.Fixation):
self.draw_positions(image, **draw_positions)
-@dataclass(frozen=True)
class Saccade(GazeFeatures.Saccade):
"""Define dispersion based saccade."""
- def __post_init__(self):
- super().__post_init__()
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
+
+ super().__init__(positions, finished, message, **kwargs)
def draw(self, image: numpy.array, line_color: tuple = None):
"""Draw saccade into image.
@@ -115,7 +105,6 @@ class Saccade(GazeFeatures.Saccade):
cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2)
-@dataclass
class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
"""Implementation of the I-DT algorithm as described in:
@@ -123,25 +112,36 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
*Identifying fixations and saccades in eye-tracking protocols.*
Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA'00, 71-78).
[https://doi.org/10.1145/355017.355028](https://doi.org/10.1145/355017.355028)
+
+ Parameters:
+ deviation_max_threshold: Maximal distance allowed to consider a gaze movement as a fixation.
+ duration_min_threshold: Minimal duration allowed to consider a gaze movement as a fixation. \
+ It is also used as maximal duration allowed to wait valid gaze positions.
"""
- deviation_max_threshold: int|float
- """Maximal distance allowed to consider a gaze movement as a fixation."""
-
- duration_min_threshold: int|float
- """Minimal duration allowed to consider a gaze movement as a fixation.
- It is also used as maximal duration allowed to wait valid gaze positions."""
-
- def __post_init__(self):
+ def __init__(self, deviation_max_threshold: int|float, duration_min_threshold: int|float):
super().__init__()
+ self.__deviation_max_threshold = deviation_max_threshold
+ self.__duration_min_threshold = duration_min_threshold
+
self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
+ @property
+ def deviation_max_threshold(self):
+ """Get identifier's deviation max threshold."""
+ return self.__deviation_max_threshold
+
+ @property
+ def duration_min_threshold(self):
+ """Get identifier duration min threshold."""
+ return self.__duration_min_threshold
+
@DataFeatures.PipelineStepMethod
- def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType:
+ def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType:
# Ignore non valid gaze position
if not gaze_position:
@@ -151,9 +151,9 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Check if too much time elapsed since last valid gaze position
if len(self.__valid_positions) > 0:
- ts_last, _ = self.__valid_positions[-1]
+ ts_last = self.__valid_positions[-1].timestamp
- if (ts - ts_last) > self.duration_min_threshold:
+ if (timestamp - ts_last) > self.__duration_min_threshold:
# Get last movement
last_movement = self.current_gaze_movement.finish()
@@ -172,17 +172,14 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Store gaze positions until a minimal duration
self.__valid_positions.append(gaze_position)
- first_ts, _ = self.__valid_positions[0]
- last_ts, _ = self.__valid_positions[-1]
-
# Once the minimal duration is reached
- if last_ts - first_ts >= self.duration_min_threshold:
+ if self.__valid_positions.duration >= self.__duration_min_threshold:
# Calculate the deviation of valid gaze positions
deviation = Fixation(self.__valid_positions).deviation_max
# Valid gaze positions deviation small enough
- if deviation <= self.deviation_max_threshold:
+ if deviation <= self.__deviation_max_threshold:
last_saccade = GazeFeatures.GazeMovement()
@@ -190,8 +187,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
if len(self.__saccade_positions) > 0:
# Copy oldest valid position into saccade positions
- first_ts, first_position = self.__valid_positions[0]
- self.__saccade_positions.append(first_position)
+ self.__saccade_positions.append(self.__valid_positions[0])
# Finish last saccade
last_saccade = self.current_saccade.finish()
@@ -214,8 +210,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
if len(self.__fixation_positions) > 0:
# Copy most recent fixation position into saccade positions
- last_ts, last_position = self.__fixation_positions[-1]
- self.__saccade_positions.append(last_position)
+ self.__saccade_positions.append(self.__fixation_positions[-1])
# Finish last fixation
last_fixation = self.current_fixation.finish()
@@ -233,8 +228,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
return last_fixation if not terminate else self.current_saccade.finish()
# Move oldest valid position into saccade positions
- first_ts, first_position = self.__valid_positions.pop_first()
- self.__saccade_positions.append(first_position)
+ self.__saccade_positions.append(self.__valid_positions.pop(0))
# Always return unvalid gaze movement at least
return GazeFeatures.GazeMovement()
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index 71c643e..eac9e5c 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -520,31 +520,34 @@ class TimeStampedGazeMovements(DataFeatures.TimestampedObjectsList):
GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus")
# Type definition for type annotation convenience
-@dataclass(frozen=True)
-class GazeStatus(GazePosition):
- """Define gaze status as a gaze position belonging to an identified and indexed gaze movement."""
+class GazeStatus(list, DataFeatures.TimestampedObject):
+ """Define gaze status as a list of 1 or 2 (index, GazeMovementType) tuples.
- movement_type: str = field(kw_only=True)
- """GazeMovement type to which gaze position belongs."""
+ Parameters:
+ position: the position that the status represents.
+ """
- movement_index: int = field(kw_only=True)
- """GazeMovement index to which gaze positon belongs."""
+ def __init__(self, position: GazePosition):
- @classmethod
- def from_position(cls, gaze_position: GazePosition, movement_type: str, movement_index: int) -> GazeStatusType:
- """Initialize from a gaze position instance."""
+ DataFeatures.TimestampedObject.__init__(self, timestamp=position.timestamp)
+
+ self.__position = position
+
+ @property
+ def position(self) -> GazePosition:
+ """Get gaze status position."""
+ return self.__position
+
+ def append(self, movement_index: int, movement_type:type):
+ """Append movement index and type."""
- return cls(gaze_position, precision=gaze_position.precision, movement_type=movement_type, movement_index=movement_index)
+ super().append((movement_index, movement_type))
TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus")
# Type definition for type annotation convenience
class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList):
- """Handle timestamped gaze movements into a list
-
- !!! note
- List of gaze status are required as a gaze position can belongs to two consecutive gaze movements as last and first position.
- """
+ """Handle timestamped gaze status into a list."""
def __init__(self):
@@ -568,7 +571,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
terminate: allows to notify identification algorithm that given gaze position will be the last one.
Returns:
- finished_gaze_movement: identified gaze movement once it is finished otherwise it returns unvalid gaze movement.
+ gaze_movement: identified gaze movement once it is finished otherwise it returns empty gaze movement.
"""
raise NotImplementedError('identify() method not implemented')
@@ -612,57 +615,31 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
# Iterate on gaze positions
for gaze_position in ts_gaze_positions:
- finished_gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts))
-
- if is_fixation(finished_gaze_movement):
+ gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts))
- ts_fixations.append(finished_gaze_movement)
+ if gaze_movement:
# First gaze movement position is always shared with previous gaze movement
- for movement_position in finished_gaze_movement:
-
- gaze_status = GazeStatus.from_position(movement_position, 'Fixation', len(ts_fixations))
-
- if movement_position.timestamp != finished_gaze_movement.timestamp:
-
- ts_status.append([gaze_status])
-
- else:
-
- try:
-
- ts_status[finished_gaze_movement.timestamp].append(gaze_status)
-
- except KeyError:
-
- ts_status[finished_gaze_movement.timestamp] = [gaze_status]
-
- elif is_saccade(finished_gaze_movement):
-
- ts_saccades.append(finished_gaze_movement)
-
- # First gaze movement position is always shared with previous gaze movement
- for movement_position in finished_gaze_movement:
-
- gaze_status = GazeStatus.from_position(position, 'Saccade', len(ts_saccades))
-
- if movement_position.timestamp != finished_gaze_movement.timestamp:
-
- ts_status.append([gaze_status])
+ for movement_position in gaze_movement:
- else:
+ # Is a status already exist for this position?
+ gaze_status = ts_status.look_for(movement_position.timestamp)
- try:
+ if not gaze_status:
+
+ gaze_status = GazeStatus(movement_position)
+ ts_status.append(gaze_status)
- ts_status[finished_gaze_movement.timestamp].append(gaze_status)
+ gaze_status.append(len(ts_fixations), type(gaze_movement))
- except KeyError:
+ # Store gaze movment into the appropriate list
+ if is_fixation(gaze_movement):
- ts_status[finished_gaze_movement.timestamp] = [gaze_status]
+ ts_fixations.append(gaze_movement)
- else:
+ elif is_saccade(gaze_movement):
- continue
+ ts_saccades.append(gaze_movement)
return ts_fixations, ts_saccades, ts_status
@@ -674,7 +651,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
Returns:
timestamp: first gaze position date of identified gaze movement
- finished_gaze_movement: identified gaze movement once it is finished
+ gaze_movement: identified gaze movement once it is finished
"""
assert(type(ts_gaze_positions) == TimeStampedGazePositions)
@@ -685,11 +662,11 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
# Iterate on gaze positions
for gaze_position in ts_gaze_positions:
- finished_gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts))
+ gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts))
- if finished_gaze_movement:
+ if gaze_movement:
- yield finished_gaze_movement
+ yield gaze_movement
ScanStepType = TypeVar('ScanStep', bound="ScanStep")
# Type definition for type annotation convenience