aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/argaze.test/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py156
-rw-r--r--src/argaze.test/GazeFeatures.py14
-rw-r--r--src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py191
-rw-r--r--src/argaze/GazeAnalysis/README.md9
-rw-r--r--src/argaze/GazeAnalysis/__init__.py5
-rw-r--r--src/argaze/GazeFeatures.py269
-rw-r--r--src/argaze/__init__.py2
-rw-r--r--src/argaze/utils/README.md8
-rw-r--r--src/argaze/utils/tobii_segment_aruco_aoi_edit.py8
-rw-r--r--src/argaze/utils/tobii_segment_aruco_aoi_export.py24
-rw-r--r--src/argaze/utils/tobii_segment_gaze_movements_export.py115
-rw-r--r--src/argaze/utils/tobii_stream_aruco_aoi_display.py18
12 files changed, 562 insertions, 257 deletions
diff --git a/src/argaze.test/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py b/src/argaze.test/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py
new file mode 100644
index 0000000..01b2aad
--- /dev/null
+++ b/src/argaze.test/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py
@@ -0,0 +1,156 @@
+#!/usr/bin/env python
+
+import unittest
+import random
+import time
+
+from argaze import GazeFeatures
+from argaze.GazeAnalysis import DispersionBasedGazeMovementIdentifier
+
+import numpy
+
+def build_gaze_fixation(size: int, center: tuple, dispersion: float, start_time: float, min_time: float, max_time: float, validity: list = []):
+ """ Generate N TimeStampedGazePsoitions dispersed around a center point for testing purpose.
+ Timestamps are current time after random sleep (second).
+ GazePositions are random values.
+ """
+ ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
+
+ for i in range(0, size):
+
+ # Check position validity
+ valid = True
+ if len(validity) > i:
+
+ valid = validity[i]
+
+ if valid:
+
+ # Edit gaze position
+ random_x = center[0] + dispersion * (random.random() - 0.5)
+ random_y = center[1] + dispersion * (random.random() - 0.5)
+ gaze_position = GazeFeatures.GazePosition((random_x, random_y))
+
+ else:
+
+ gaze_position = GazeFeatures.UnvalidGazePosition()
+
+ # Store gaze position
+ ts = time.time() - start_time
+ ts_gaze_positions[ts] = gaze_position
+
+ # Sleep a random time
+ sleep_time = random.random() * (max_time - min_time) + min_time
+ time.sleep(sleep_time)
+
+ return ts_gaze_positions
+
+class TestDispersionBasedGazeMovementIdentifierClass(unittest.TestCase):
+ """Test DispersionBasedGazeMovementIdentifier class."""
+
+ def test_fixation_identification(self):
+ """Test DispersionBasedGazeMovementIdentifier fixation identification."""
+
+ size = 10
+ center = (0, 0)
+ dispersion = 10
+ start_time = time.time()
+ min_time = 0.01
+ max_time = 0.1
+
+ ts_gaze_positions = build_gaze_fixation(size, center, dispersion, start_time, min_time, max_time)
+ gaze_movement_identifier = DispersionBasedGazeMovementIdentifier.GazeMovementIdentifier(dispersion_threshold=dispersion, duration_threshold=min_time*2)
+ ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.identify(ts_gaze_positions)
+
+ # Check result size
+ self.assertEqual(len(ts_fixations), 1)
+ self.assertEqual(len(ts_saccades), 0)
+ self.assertEqual(len(ts_status), size)
+
+ # Check fixation
+ ts, fixation = ts_fixations.pop_first()
+
+ self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertLessEqual(fixation.dispersion, dispersion)
+ self.assertGreaterEqual(fixation.duration, size * min_time)
+ self.assertLessEqual(fixation.duration, size * max_time)
+
+ def test_fixation_and_saccade_identification(self):
+ """Test DispersionBasedGazeMovementIdentifier fixation and saccade identification."""
+
+ size = 10
+ center_A = (0, 0)
+ center_B = (50, 50)
+ dispersion = 10
+ start_time = time.time()
+ min_time = 0.01
+ max_time = 0.1
+
+ ts_gaze_positions_A = build_gaze_fixation(size, center_A, dispersion, start_time, min_time, max_time)
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, dispersion, start_time, min_time, max_time)
+
+ ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B)
+
+ gaze_movement_identifier = DispersionBasedGazeMovementIdentifier.GazeMovementIdentifier(dispersion_threshold=dispersion, duration_threshold=min_time*2)
+ ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.identify(ts_gaze_positions)
+
+ # Check result size
+ self.assertEqual(len(ts_fixations), 2)
+ self.assertEqual(len(ts_saccades), 1)
+ self.assertEqual(len(ts_status), size*2)
+
+ # Check first fixation
+ ts, fixation = ts_fixations.pop_first()
+
+ self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertLessEqual(fixation.dispersion, dispersion)
+ self.assertGreaterEqual(fixation.duration, size * min_time)
+ self.assertLessEqual(fixation.duration, size * max_time)
+
+ # Check first saccade
+ ts, saccade = ts_saccades.pop_first()
+
+ self.assertEqual(len(saccade.positions.keys()), 2)
+ self.assertGreaterEqual(saccade.duration, min_time)
+ self.assertLessEqual(saccade.duration, max_time)
+
+ # Check second fixation
+ ts, fixation = ts_fixations.pop_first()
+
+ self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertLessEqual(fixation.dispersion, dispersion)
+ self.assertGreaterEqual(fixation.duration, size * min_time)
+ self.assertLessEqual(fixation.duration, size * max_time)
+
+ def test_invalid_gaze_position(self):
+ """Test DispersionBasedGazeMovementIdentifier fixation and saccade identification with invalid gaze position."""
+
+ size = 10
+ center = (0, 0)
+ dispersion = 10
+ start_time = time.time()
+ min_time = 0.01
+ max_time = 0.1
+ validity = [True, True, True, True, False, False, True, True, True, True]
+
+ ts_gaze_positions = build_gaze_fixation(size, center, dispersion, start_time, min_time, max_time, validity)
+
+ gaze_movement_identifier = DispersionBasedGazeMovementIdentifier.GazeMovementIdentifier(dispersion_threshold=dispersion, duration_threshold=min_time*2)
+ ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.identify(ts_gaze_positions)
+
+ # Check result size
+ self.assertEqual(len(ts_fixations), 1)
+ self.assertEqual(len(ts_saccades), 0)
+ self.assertEqual(len(ts_status), size-2)
+
+ # Check fixation
+ ts, fixation = ts_fixations.pop_first()
+
+ self.assertEqual(len(fixation.positions.keys()), size-2)
+ self.assertLessEqual(fixation.dispersion, dispersion)
+ self.assertGreaterEqual(fixation.duration, size * min_time)
+ self.assertLessEqual(fixation.duration, size * max_time)
+
+if __name__ == '__main__':
+
+ unittest.main() \ No newline at end of file
diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py
index 5a3c2d9..dd0717c 100644
--- a/src/argaze.test/GazeFeatures.py
+++ b/src/argaze.test/GazeFeatures.py
@@ -117,6 +117,7 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase):
ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
ts_gaze_positions[0] = GazeFeatures.GazePosition()
ts_gaze_positions[1] = GazeFeatures.UnvalidGazePosition()
+ ts_gaze_positions[2] = {"value": (0, 0), "accuracy": 0.}
# Check GazePosition is correctly stored and accessible as a GazePosition
self.assertIsInstance(ts_gaze_positions[0], GazeFeatures.GazePosition)
@@ -126,13 +127,22 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase):
self.assertIsInstance(ts_gaze_positions[1], GazeFeatures.UnvalidGazePosition)
self.assertEqual(ts_gaze_positions[1].valid, False)
+ # Check dict with "value" and "accuracy" keys is correctly stored and accessible as a GazePosition
+ self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition)
+ self.assertEqual(ts_gaze_positions[2].valid, True)
+
# Check that bad data type insertion fails
with self.assertRaises(AssertionError):
- ts_gaze_positions[2] = "This string is not a gaze position value."
+ ts_gaze_positions[3] = "This string is not a gaze position value."
+
+ # Check that dict with bad keys insertion fails
+ with self.assertRaises(AssertionError):
+
+ ts_gaze_positions[4] = {"bad_key": (0, 0), "accuracy": 0.}
# Check final lenght
- self.assertEqual(len(ts_gaze_positions), 2)
+ self.assertEqual(len(ts_gaze_positions), 3)
def test___repr__(self):
"""Test inherited string representation."""
diff --git a/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py
new file mode 100644
index 0000000..4fd5aab
--- /dev/null
+++ b/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py
@@ -0,0 +1,191 @@
+#!/usr/bin/env python
+
+from dataclasses import dataclass, field
+import math
+
+from argaze import GazeFeatures
+
+import numpy
+
+@dataclass(frozen=True)
+class Fixation(GazeFeatures.Fixation):
+ """Define dispersion based fixation."""
+
+ dispersion: float = field(init=False)
+ """Dispersion of all gaze positions belonging to the fixation."""
+
+ euclidian: bool = field(default=True)
+ """Does the distance is calculated in euclidian way."""
+
+ centroid: tuple = field(init=False)
+ """Centroïd of all gaze positions belonging to the fixation."""
+
+ def __post_init__(self):
+
+ super().__post_init__()
+
+ x_list = [gp[0] for (ts, gp) in list(self.positions.items())]
+ y_list = [gp[1] for (ts, gp) in list(self.positions.items())]
+
+ cx = numpy.mean(x_list)
+ cy = numpy.mean(y_list)
+
+ # Select dispersion algorithm
+ if self.euclidian:
+
+ c = [cx, cy]
+ points = numpy.column_stack([x_list, y_list])
+
+ dist = (points - c)**2
+ dist = numpy.sum(dist, axis=1)
+ dist = numpy.sqrt(dist)
+
+ __dispersion = max(dist)
+
+ else:
+
+ __dispersion = (max(x_list) - min(x_list)) + (max(y_list) - min(y_list))
+
+ # Update frozen dispersion attribute
+ object.__setattr__(self, 'dispersion', __dispersion)
+
+ # Update frozen centroid attribute
+ object.__setattr__(self, 'centroid', (cx, cy))
+
+@dataclass(frozen=True)
+class Saccade(GazeFeatures.Saccade):
+ """Define dispersion based saccade."""
+
+ def __post_init__(self):
+ super().__post_init__()
+
+@dataclass
+class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
+ """Implementation of the I-DT algorithm as described in:
+
+ Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and
+ saccades in eye-tracking protocols. In Proceedings of the 2000 symposium
+ on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA,
+ 71-78. [DOI=http://dx.doi.org/10.1145/355017.355028](DOI=http://dx.doi.org/10.1145/355017.355028)
+ """
+
+ dispersion_threshold: int|float
+ """Maximal distance allowed to consider several gaze positions as a fixation."""
+
+ duration_threshold: int|float
+ """Minimal duration allowed to consider several gaze positions as a fixation."""
+
+ def __iter__(self) -> GazeFeatures.GazeMovementType:
+ """GazeMovement identification generator."""
+
+ self.__last_fixation = None
+
+ # while there are 2 gaze positions at least
+ while len(self.__ts_gaze_positions) >= 2:
+
+ # copy remaining timestamped gaze positions
+ remaining_ts_gaze_positions = self.__ts_gaze_positions.copy()
+
+ # select timestamped gaze position until a duration threshold
+ ts_start, gaze_position_start = remaining_ts_gaze_positions.pop_first()
+
+ # Ignore non valid start position
+ if not gaze_position_start.valid:
+ self.__ts_gaze_positions.pop_first()
+ continue
+
+ ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
+ ts_gaze_positions[ts_start] = gaze_position_start
+
+ # Select next position
+ ts_next, gaze_position_next = remaining_ts_gaze_positions.first
+
+ while (ts_next - ts_start) < self.duration_threshold:
+
+ # Ignore non valid position
+ # Should we consider invalid position to not break fixation ?
+ if gaze_position_next.valid:
+
+ # Store selected position
+ ts, gaze_position = remaining_ts_gaze_positions.pop_first()
+ ts_gaze_positions[ts] = gaze_position
+
+ else:
+
+ remaining_ts_gaze_positions.pop_first()
+
+ try:
+ # Read next position
+ ts_next, gaze_position_next = remaining_ts_gaze_positions.first
+
+ except:
+ break
+
+ # is it a new fixation ?
+ new_fixation = Fixation(ts_gaze_positions)
+
+ # dispersion is small : extending fixation
+ if new_fixation.dispersion <= self.dispersion_threshold:
+
+ # remove selected gaze positions
+ for gp in ts_gaze_positions:
+ self.__ts_gaze_positions.pop_first()
+
+ # extend fixation position from a copy
+ ts_gaze_positions_extension = ts_gaze_positions.copy()
+
+ # are next gaze positions not too dispersed ?
+ while len(remaining_ts_gaze_positions) > 0:
+
+ # Select next gaze position
+ ts_next, gaze_position_next = remaining_ts_gaze_positions.first
+
+ # Ignore non valid position
+ # Should we consider invalid position to not break fixation ?
+ if not gaze_position_next.valid:
+ remaining_ts_gaze_positions.pop_first()
+ continue
+
+ ts_gaze_positions_extension[ts_next] = gaze_position_next
+
+ # how much gaze is dispersed ?
+ extended_fixation = Fixation(ts_gaze_positions_extension)
+
+ # dispersion becomes too wide : ignore extended fixation
+ if extended_fixation.dispersion > self.dispersion_threshold:
+ break
+
+ # update new fixation
+ new_fixation = Fixation(ts_gaze_positions_extension.copy())
+
+ # remove selected gaze position
+ remaining_ts_gaze_positions.pop_first()
+ self.__ts_gaze_positions.pop_first()
+
+ # is the new fixation have a duration ?
+ if new_fixation.duration > 0:
+
+ if self.__last_fixation != None:
+
+ # store start and end positions in a timestamped buffer
+ ts_saccade_positions = GazeFeatures.TimeStampedGazePositions()
+
+ start_position_ts, start_position = self.__last_fixation.positions.last
+ ts_saccade_positions[start_position_ts] = start_position
+
+ end_position_ts, end_position = new_fixation.positions.first
+ ts_saccade_positions[end_position_ts] = end_position
+
+ if end_position_ts > start_position_ts:
+
+ new_saccade = Saccade(ts_saccade_positions)
+
+ yield new_saccade
+
+ self.__last_fixation = new_fixation
+
+ yield new_fixation
+
+ # dispersion too wide : consider next gaze position
+ else:
+ self.__ts_gaze_positions.pop_first()
diff --git a/src/argaze/GazeAnalysis/README.md b/src/argaze/GazeAnalysis/README.md
new file mode 100644
index 0000000..fd778e4
--- /dev/null
+++ b/src/argaze/GazeAnalysis/README.md
@@ -0,0 +1,9 @@
+Class interface to work with various gaze analysis algorithms.
+
+# Gaze movements identification algorithms
+
+*"The act of classifying eye movements into distinct events is, on a general level, driven by a desire to isolate different intervals of the data stream strongly correlated with certain oculomotor or cognitive properties."* Citation from ["One algorithm to rule them all? An evaluation and discussion of ten eye movement event-detection algorithms"](https://link.springer.com/article/10.3758/s13428-016-0738-9) article.
+
+## Dispersion based gaze movement identifier (I-DT)
+
+The code is based on the implementation of the I-DT algorithm as described in ["Identifying fixations and saccades in eye-tracking protocols"](http://dx.doi.org/10.1145/355017.355028) article. \ No newline at end of file
diff --git a/src/argaze/GazeAnalysis/__init__.py b/src/argaze/GazeAnalysis/__init__.py
new file mode 100644
index 0000000..4643641
--- /dev/null
+++ b/src/argaze/GazeAnalysis/__init__.py
@@ -0,0 +1,5 @@
+"""
+.. include:: README.md
+"""
+__docformat__ = "restructuredtext"
+__all__ = ['DispersionBasedGazeMovementIdentifier'] \ No newline at end of file
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index 33c10db..de30735 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+from typing import TypeVar, Tuple
from dataclasses import dataclass, field
import math
import json
@@ -73,16 +74,26 @@ class UnvalidGazePosition(GazePosition):
class TimeStampedGazePositions(DataStructures.TimeStampedBuffer):
"""Define timestamped buffer to store gaze positions."""
- def __setitem__(self, key, value: GazePosition):
- """Force value to be GazePosition."""
+ def __setitem__(self, key, value: GazePosition|dict):
+ """Force GazePosition storage."""
+
+ # Convert dict into GazePosition
+ if type(value) == dict:
+
+ assert(set(["value", "accuracy"]).issubset(value.keys()))
+
+ value = GazePosition(value["value"], accuracy=value["accuracy"])
assert(type(value) == GazePosition or type(value) == UnvalidGazePosition)
super().__setitem__(key, value)
-@dataclass
-class Movement():
- """Define abstract movement class as a buffer of timestamped positions."""
+GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement")
+# Type definition for type annotation convenience
+
+@dataclass(frozen=True)
+class GazeMovement():
+ """Define abstract gaze movement class as a buffer of timestamped positions."""
positions: TimeStampedGazePositions
"""All timestamp gaze positions."""
@@ -95,221 +106,137 @@ class Movement():
start_position_ts, start_position = self.positions.first
end_position_ts, end_position = self.positions.last
- self.duration = round(end_position_ts - start_position_ts)
+ # Update frozen duration attribute
+ object.__setattr__(self, 'duration', end_position_ts - start_position_ts)
-Fixation = Movement
-"""Define abstract fixation as movement."""
+ def __str__(self) -> str:
+ """String display"""
-Saccade = Movement
-"""Define abstract saccade as movement."""
+ output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self.positions)}'
-class TimeStampedMovements(DataStructures.TimeStampedBuffer):
- """Define timestamped buffer to store movements."""
+ for ts, position in self.positions.items():
- def __setitem__(self, key, value: Movement):
- """Force value to inherit from Movement."""
+ output += f'\n\t{ts}:\n\t\tvalue={position.value},\n\t\taccurracy={position.accuracy}'
- assert(type(value).__bases__[0] == Movement)
+ return output
- super().__setitem__(key, value)
+class Fixation(GazeMovement):
+ """Define abstract fixation as gaze movement."""
-@dataclass
-class GazeStatus():
- """Define gaze status as a position belonging to an identified and indexed movement."""
-
- position: GazePosition
- """Gaze position"""
+ def __post_init__(self):
- movement_type: str
- """Movement type to which gaze position belongs."""
+ super().__post_init__()
- movement_index: int
- """Movement index to which gaze positon belongs."""
+class Saccade(GazeMovement):
+ """Define abstract saccade as gaze movement."""
-class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer):
- """Define timestamped buffer to store gaze status."""
-
- def __setitem__(self, key, value: GazeStatus):
- super().__setitem__(key, value)
+ def __post_init__(self):
-class MovementIdentifier():
- """Abstract class to define what should provide a movement identifier."""
+ super().__post_init__()
- def __init__(self, ts_gaze_positions: TimeStampedGazePositions):
+TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeStampedGazeMovements")
+# Type definition for type annotation convenience
- if type(ts_gaze_positions) != TimeStampedGazePositions:
- raise ValueError('argument must be a TimeStampedGazePositions')
+class TimeStampedGazeMovements(DataStructures.TimeStampedBuffer):
+ """Define timestamped buffer to store gaze movements."""
- def __iter__(self):
- raise NotImplementedError('__iter__() method not implemented')
+ def __setitem__(self, key, value: GazeMovement):
+ """Force value to inherit from GazeMovement."""
- def __next__(self):
- raise NotImplementedError('__next__() method not implemented')
-
-class DispersionBasedMovementIdentifier(MovementIdentifier):
- """Implementation of the I-DT algorithm as described in:
-
- Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and
- saccades in eye-tracking protocols. In Proceedings of the 2000 symposium
- on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA,
- 71-78. DOI=http://dx.doi.org/10.1145/355017.355028
- """
+ assert(type(value).__bases__[0] == Fixation or type(value).__bases__[0] == Saccade)
- @dataclass
- class DispersionBasedFixation(Fixation):
- """Define dispersion based fixation as an algorithm specific fixation."""
+ super().__setitem__(key, value)
- dispersion: float = field(init=False)
- euclidian: bool = field(default=True)
+ def __str__(self):
- centroid: tuple = field(init=False)
+ output = ''
+ for ts, item in self.items():
- def __post_init__(self):
+ output += f'\n{item}'
- super().__post_init__()
+ return output
- x_list = [gp[0] for (ts, gp) in list(self.positions.items())]
- y_list = [gp[1] for (ts, gp) in list(self.positions.items())]
+GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus")
+# Type definition for type annotation convenience
- cx = round(numpy.mean(x_list))
- cy = round(numpy.mean(y_list))
+@dataclass(frozen=True)
+class GazeStatus(GazePosition):
+ """Define gaze status as a gaze position belonging to an identified and indexed gaze movement."""
- # select dispersion algorithm
- if self.euclidian:
+ movement_type: str = field(kw_only=True)
+ """GazeMovement type to which gaze position belongs."""
- c = [cx, cy]
- points = numpy.column_stack([x_list, y_list])
+ movement_index: int = field(kw_only=True)
+ """GazeMovement index to which gaze positon belongs."""
- dist = (points - c)**2
- dist = numpy.sum(dist, axis=1)
- dist = numpy.sqrt(dist)
+ @classmethod
+ def from_position(cls, gaze_position: GazePosition, movement_type: str, movement_index: int) -> GazeStatusType:
+ """Initialize from a gaze position instance."""
- self.dispersion = round(max(dist))
+ return cls(gaze_position.value, accuracy=gaze_position.accuracy, movement_type=movement_type, movement_index=movement_index)
- else:
+TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus")
+# Type definition for type annotation convenience
- self.dispersion = (max(x_list) - min(x_list)) + (max(y_list) - min(y_list))
+class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer):
+ """Define timestamped buffer to store gaze status."""
- self.centroid = (cx, cy)
+ def __setitem__(self, key, value: GazeStatus):
+ super().__setitem__(key, value)
- @dataclass
- class DispersionBasedSaccade(Saccade):
- """Define dispersion based saccade as an algorithm specific saccade."""
+class GazeMovementIdentifier():
+ """Abstract class to define what should provide a gaze movement identifier."""
- def __post_init__(self):
- super().__post_init__()
+ def __iter__(self) -> GazeMovementType:
+ raise NotImplementedError('__iter__() method not implemented')
- def __init__(self, ts_gaze_positions, dispersion_threshold = 10, duration_threshold = 100):
+ def __next__(self):
+ raise NotImplementedError('__next__() method not implemented')
- super().__init__(ts_gaze_positions)
+ def __call__(self, ts_gaze_positions: TimeStampedGazePositions):
- self.__dispersion_threshold = dispersion_threshold
- self.__duration_threshold = duration_threshold
+ assert(type(ts_gaze_positions) == TimeStampedGazePositions)
# process identification on a copy
self.__ts_gaze_positions = ts_gaze_positions.copy()
- self.__last_fixation = None
-
- def __iter__(self):
- """Movement identification generator."""
-
- # while there are 2 gaze positions at least
- while len(self.__ts_gaze_positions) >= 2:
+ return self
- # copy remaining timestamped gaze positions
- remaining_ts_gaze_positions = self.__ts_gaze_positions.copy()
+ def identify(self, ts_gaze_positions: TimeStampedGazePositions) -> Tuple[TimeStampedGazeMovementsType, TimeStampedGazeMovementsType, TimeStampedGazeStatusType]:
+ """Identifiy fixations and saccades from timestamped gaze positions."""
- # select timestamped gaze position until a duration threshold
- (ts_start, gaze_position_start) = remaining_ts_gaze_positions.pop_first()
+ assert(type(ts_gaze_positions) == TimeStampedGazePositions)
- # Invalid start position
- if not gaze_position_start.valid:
-
- self.__ts_gaze_positions.pop_first()
- continue
+ ts_fixations = TimeStampedGazeMovements()
+ ts_saccades = TimeStampedGazeMovements()
+ ts_status = TimeStampedGazeStatus()
- ts_gaze_positions = TimeStampedGazePositions()
- ts_gaze_positions[ts_start] = gaze_position_start
-
- (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first()
-
- while (ts_current - ts_start) < self.__duration_threshold:
-
- # Ignore non valid position
- # TODO ? Consider invalid position to not break fixation ?
- if gaze_position_current.valid:
-
- ts_gaze_positions[ts_current] = gaze_position_current
-
- try:
- (ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first()
-
- except:
- break
+ for gaze_movement in self(ts_gaze_positions):
- # is it a new fixation ?
- new_fixation = DispersionBasedMovementIdentifier.DispersionBasedFixation(ts_gaze_positions)
+ if isinstance(gaze_movement, Fixation):
- # dispersion is small
- if new_fixation.dispersion <= self.__dispersion_threshold:
+ start_ts, start_position = gaze_movement.positions.first
- # remove selected gaze positions
- for gp in ts_gaze_positions:
- self.__ts_gaze_positions.pop_first()
+ ts_fixations[start_ts] = gaze_movement
- # are next gaze positions not too dispersed ?
- while len(remaining_ts_gaze_positions) > 0:
+ for ts, position in gaze_movement.positions.items():
- # select next gaze position
- ts_next, position_next = remaining_ts_gaze_positions.pop_first()
+ ts_status[ts] = GazeStatus.from_position(position, 'Fixation', len(ts_fixations))
- # Invalid next position
- if not position_next.valid:
- continue
+ elif isinstance(gaze_movement, Saccade):
- ts_gaze_positions[ts_next] = position_next
+ start_ts, start_position = gaze_movement.positions.first
+ end_ts, end_position = gaze_movement.positions.last
+
+ ts_saccades[start_ts] = gaze_movement
- # how much gaze is dispersed ?
- updated_fixation = DispersionBasedMovementIdentifier.DispersionBasedFixation(ts_gaze_positions)
+ ts_status[start_ts] = GazeStatus.from_position(start_position, 'Saccade', len(ts_saccades))
+ ts_status[end_ts] = GazeStatus.from_position(end_position, 'Saccade', len(ts_saccades))
- # dispersion is becomes too wide : ignore updated fixation
- if updated_fixation.dispersion > self.__dispersion_threshold:
- break
-
- # update new fixation
- new_fixation = updated_fixation
-
- # remove selected gaze position
- self.__ts_gaze_positions.pop_first()
-
- # is the new fixation have a duration ?
- if new_fixation.duration > 0:
-
- if self.__last_fixation != None:
-
- # store start and end positions in a timestamped buffer
- ts_saccade_positions = TimeStampedGazePositions()
-
- start_position_ts, start_position = self.__last_fixation.positions.pop_last()
- ts_saccade_positions[start_position_ts] = start_position
-
- end_position_ts, end_position = new_fixation.positions.pop_first()
- ts_saccade_positions[end_position_ts] = end_position
-
- if end_position_ts > start_position_ts:
-
- new_saccade = DispersionBasedMovementIdentifier.DispersionBasedSaccade(ts_saccade_positions)
-
- yield new_saccade
-
- self.__last_fixation = new_fixation
-
- yield new_fixation
-
- # dispersion too wide : consider next gaze position
else:
- self.__ts_gaze_positions.pop_first()
+ continue
+
+ return ts_fixations, ts_saccades, ts_status
@dataclass
class VisualScanStep():
@@ -458,12 +385,12 @@ class PointerBasedVisualScan(VisualScanGenerator):
class FixationBasedVisualScan(VisualScanGenerator):
"""Build visual scan on the basis of timestamped fixations."""
- def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_fixations: TimeStampedMovements):
+ def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_fixations: TimeStampedGazeMovements):
super().__init__(ts_aoi_scenes)
- if type(ts_fixations) != TimeStampedMovements:
- raise ValueError('second argument must be a GazeFeatures.TimeStampedMovements')
+ if type(ts_fixations) != TimeStampedGazeMovements:
+ raise ValueError('second argument must be a GazeFeatures.TimeStampedGazeMovements')
# process identification on a copy
self.__ts_aoi_scenes = ts_aoi_scenes.copy()
diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py
index 57ccd2c..4945206 100644
--- a/src/argaze/__init__.py
+++ b/src/argaze/__init__.py
@@ -2,4 +2,4 @@
.. include:: ../../README.md
"""
__docformat__ = "restructuredtext"
-__all__ = ['utils','ArUcoMarkers','AreaOfInterest','GazeFeatures','DataStructures','TobiiGlassesPro2'] \ No newline at end of file
+__all__ = ['utils','ArUcoMarkers','AreaOfInterest','GazeFeatures','DataStructures','GazeAnalysis','TobiiGlassesPro2'] \ No newline at end of file
diff --git a/src/argaze/utils/README.md b/src/argaze/utils/README.md
index 7bc56bd..9be98f4 100644
--- a/src/argaze/utils/README.md
+++ b/src/argaze/utils/README.md
@@ -84,13 +84,13 @@ python ./src/argaze/utils/tobii_segment_gaze_movements_export.py -s SEGMENT_PATH
# Tobii with ArUco
-Track ArUco markers into Tobii camera video stream (-t IP_ADDRESS). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame:
+Track ArUco markers (-md MARKER_DICT -ms MARKER_SIZE) into Tobii camera video stream (-t IP_ADDRESS). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame:
```
-python ./src/argaze/utils/tobii_stream_aruco_aoi_display.py -t IP_ADDRESS -c export/tobii_camera.json -ms 5 -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}'
+python ./src/argaze/utils/tobii_stream_aruco_aoi_display.py -t IP_ADDRESS -c export/tobii_camera.json -md MARKER_DICT -ms MARKER_SIZE -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}'
```
-Track ArUco markers into a Tobii camera video segment (-s SEGMENT_PATH) into a time range selection (-r IN OUT). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame. Export aoi video and data as a aruco_aoi.csv, aruco_aoi.mp4 files:
+Track ArUco markers (-md MARKER_DICT -ms MARKER_SIZE) into a Tobii camera video segment (-s SEGMENT_PATH) into a time range selection (-r IN OUT). Load aoi scene .obj file related to each marker (-mi MARKER_ID, PATH_TO_AOI_SCENE), position each scene virtually relatively to its detected ArUco markers then project the scene into camera frame. Export aoi video and data as a aruco_aoi.csv, aruco_aoi.mp4 files:
```
-python ./src/argaze/utils/tobii_segment_aruco_aoi_export.py -s SEGMENT_PATH -c export/tobii_camera.json -r IN OUT -ms 5 -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}'
+python ./src/argaze/utils/tobii_segment_aruco_aoi_export.py -s SEGMENT_PATH -c export/tobii_camera.json -md MARKER_DICT -ms MARKER_SIZE -mi '{"MARKER_ID":"PATH_TO_AOI_SCENE.obj",...}' -r IN OUT
```
diff --git a/src/argaze/utils/tobii_segment_aruco_aoi_edit.py b/src/argaze/utils/tobii_segment_aruco_aoi_edit.py
index 278d3ae..fc27b97 100644
--- a/src/argaze/utils/tobii_segment_aruco_aoi_edit.py
+++ b/src/argaze/utils/tobii_segment_aruco_aoi_edit.py
@@ -287,16 +287,16 @@ def main():
# Write rotation matrix
R, _ = cv.Rodrigues(aoi3D_scene_edit['rotation'])
cv.putText(visu_frame.matrix, f'Rotation matrix:', (20, 160), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{R[0][0]:.3f} {R[0][1]:.3f} {R[0][2]:.3f}', (40, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'{R[0][0]:.3f} {R[0][1]:.3f} {R[0][2]:.3f}', (40, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
cv.putText(visu_frame.matrix, f'{R[1][0]:.3f} {R[1][1]:.3f} {R[1][2]:.3f}', (40, 240), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{R[2][0]:.3f} {R[2][1]:.3f} {R[2][2]:.3f}', (40, 280), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'{R[2][0]:.3f} {R[2][1]:.3f} {R[2][2]:.3f}', (40, 280), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA)
# Write translation vector
T = aoi3D_scene_edit['translation']
cv.putText(visu_frame.matrix, f'Translation vector:', (20, 320), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{T[0]:.3f}', (40, 360), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'{T[0]:.3f}', (40, 360), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
cv.putText(visu_frame.matrix, f'{T[1]:.3f}', (40, 400), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv.LINE_AA)
- cv.putText(visu_frame.matrix, f'{T[2]:.3f}', (40, 440), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'{T[2]:.3f}', (40, 440), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 1, cv.LINE_AA)
# DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it
# This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable.
diff --git a/src/argaze/utils/tobii_segment_aruco_aoi_export.py b/src/argaze/utils/tobii_segment_aruco_aoi_export.py
index 9f6ae78..7adba2f 100644
--- a/src/argaze/utils/tobii_segment_aruco_aoi_export.py
+++ b/src/argaze/utils/tobii_segment_aruco_aoi_export.py
@@ -124,21 +124,23 @@ def main():
aoi2D_visu_scenes = {}
all_aois_names = []
- for marker_id, aoi_scene_filepath in args.marker_id_scene.items():
+ if args.marker_id_scene != None:
- marker_id = int(marker_id)
-
- aoi3D_scenes[marker_id] = AOI3DScene.AOI3DScene()
- aoi3D_scenes[marker_id].load(aoi_scene_filepath)
+ for marker_id, aoi_scene_filepath in args.marker_id_scene.items():
- print(f'AOI in {os.path.basename(aoi_scene_filepath)} scene related to marker #{marker_id}:')
- for aoi in aoi3D_scenes[marker_id].keys():
+ marker_id = int(marker_id)
+
+ aoi3D_scenes[marker_id] = AOI3DScene.AOI3DScene()
+ aoi3D_scenes[marker_id].load(aoi_scene_filepath)
+
+ print(f'AOI in {os.path.basename(aoi_scene_filepath)} scene related to marker #{marker_id}:')
+ for aoi in aoi3D_scenes[marker_id].keys():
- print(f'\t{aoi}')
+ print(f'\t{aoi}')
- # Store aoi name once
- if aoi not in all_aois_names:
- all_aois_names.append(aoi)
+ # Store aoi name once
+ if aoi not in all_aois_names:
+ all_aois_names.append(aoi)
def aoi3D_scene_selector(marker_id):
return aoi3D_scenes.get(marker_id, None)
diff --git a/src/argaze/utils/tobii_segment_gaze_movements_export.py b/src/argaze/utils/tobii_segment_gaze_movements_export.py
index fee8960..1ffc836 100644
--- a/src/argaze/utils/tobii_segment_gaze_movements_export.py
+++ b/src/argaze/utils/tobii_segment_gaze_movements_export.py
@@ -4,6 +4,7 @@ import argparse
import os
from argaze import GazeFeatures
+from argaze.GazeAnalysis import DispersionBasedGazeMovementIdentifier
from argaze.TobiiGlassesPro2 import TobiiEntities, TobiiVideo, TobiiSpecifications
from argaze.utils import MiscFeatures
@@ -55,8 +56,8 @@ def main():
os.makedirs(destination_path)
print(f'{destination_path} folder created')
- fixations_filepath = f'{destination_path}/movements_fixations.csv'
- saccades_filepath = f'{destination_path}/movements_saccades.csv'
+ fixations_filepath = f'{destination_path}/gaze_fixations.csv'
+ saccades_filepath = f'{destination_path}/gaze_saccades.csv'
gaze_status_filepath = f'{destination_path}/gaze_status.csv'
gaze_status_video_filepath = f'{destination_path}/gaze_status.mp4'
@@ -89,7 +90,7 @@ def main():
# Test gaze position validity
if tobii_gaze_position.validity == 0:
- gaze_position_pixel = GazeFeatures.GazePosition( (int(tobii_gaze_position.value[0] * tobii_segment_video.width), int(tobii_gaze_position.value[1] * tobii_segment_video.height)) )
+ gaze_position_px = (int(tobii_gaze_position.value[0] * tobii_segment_video.width), int(tobii_gaze_position.value[1] * tobii_segment_video.height))
# Get gaze position 3D at same gaze position timestamp
tobii_gaze_position_3d = tobii_ts_gaze_positions_3d.pop(ts)
@@ -100,70 +101,70 @@ def main():
gaze_accuracy_mm = numpy.sin(numpy.deg2rad(TobiiSpecifications.ACCURACY)) * tobii_gaze_position_3d.value[2]
tobii_camera_hfov_mm = numpy.sin(numpy.deg2rad(TobiiSpecifications.CAMERA_HFOV)) * tobii_gaze_position_3d.value[2]
- gaze_position_pixel.accuracy = round(tobii_segment_video.width * float(gaze_accuracy_mm) / float(tobii_camera_hfov_mm))
+ gaze_accuracy_px = round(tobii_segment_video.width * float(gaze_accuracy_mm) / float(tobii_camera_hfov_mm))
- # Store gaze position using millisecond unit timestamp
- ts_gaze_positions[ts/1e3] = gaze_position_pixel
+ # Store gaze position
+ ts_gaze_positions[ts] = GazeFeatures.GazePosition(gaze_position_px, accuracy=gaze_accuracy_px)
continue
# Store unvalid gaze position for further movement processing
- ts_gaze_positions[ts/1e3] = GazeFeatures.UnvalidGazePosition()
+ ts_gaze_positions[ts] = GazeFeatures.UnvalidGazePosition()
- print(f'Movement identifier parameters:')
+ print(f'GazeMovement identifier parameters:')
print(f'\tDispersion threshold = {args.dispersion_threshold}')
print(f'\tDuration threshold = {args.duration_threshold}')
# Start movement identification
- movement_identifier = GazeFeatures.DispersionBasedMovementIdentifier(ts_gaze_positions, args.dispersion_threshold, args.duration_threshold)
- fixations = GazeFeatures.TimeStampedMovements()
- saccades = GazeFeatures.TimeStampedMovements()
- gaze_status = GazeFeatures.TimeStampedGazeStatus()
+ movement_identifier = DispersionBasedGazeMovementIdentifier.GazeMovementIdentifier(args.dispersion_threshold, args.duration_threshold*1e3)
+ ts_fixations = GazeFeatures.TimeStampedGazeMovements()
+ ts_saccades = GazeFeatures.TimeStampedGazeMovements()
+ ts_status = GazeFeatures.TimeStampedGazeStatus()
# Initialise progress bar
- MiscFeatures.printProgressBar(0, int(tobii_segment_video.duration/1e3), prefix = 'Movements identification:', suffix = 'Complete', length = 100)
+ MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = 'GazeMovements identification:', suffix = 'Complete', length = 100)
- for item in movement_identifier:
+ for gaze_movement in movement_identifier(ts_gaze_positions):
- if isinstance(item, GazeFeatures.DispersionBasedMovementIdentifier.DispersionBasedFixation):
+ if isinstance(gaze_movement, DispersionBasedGazeMovementIdentifier.Fixation):
- start_ts, start_position = item.positions.first
+ start_ts, start_position = gaze_movement.positions.first
- fixations[start_ts] = item
+ ts_fixations[start_ts] = gaze_movement
- for ts, position in item.positions.items():
+ for ts, position in gaze_movement.positions.items():
- gaze_status[ts] = GazeFeatures.GazeStatus(position, 'Fixation', len(fixations))
+ ts_status[ts] = GazeFeatures.GazeStatus.from_position(position, 'Fixation', len(ts_fixations))
- elif isinstance(item, GazeFeatures.DispersionBasedMovementIdentifier.DispersionBasedSaccade):
+ elif isinstance(gaze_movement, DispersionBasedGazeMovementIdentifier.Saccade):
- start_ts, start_position = item.positions.first
- end_ts, end_position = item.positions.last
+ start_ts, start_position = gaze_movement.positions.first
+ end_ts, end_position = gaze_movement.positions.last
- saccades[start_ts] = item
+ ts_saccades[start_ts] = gaze_movement
- gaze_status[start_ts] = GazeFeatures.GazeStatus(start_position, 'Saccade', len(saccades))
- gaze_status[end_ts] = GazeFeatures.GazeStatus(end_position, 'Saccade', len(saccades))
+ ts_status[start_ts] = GazeFeatures.GazeStatus.from_position(start_position, 'Saccade', len(ts_saccades))
+ ts_status[end_ts] = GazeFeatures.GazeStatus.from_position(end_position, 'Saccade', len(ts_saccades))
else:
continue
# Update Progress Bar
- progress = ts - int(args.time_range[0] * 1e3)
- MiscFeatures.printProgressBar(progress, int(tobii_segment_video.duration/1e3), prefix = 'Movements identification:', suffix = 'Complete', length = 100)
+ progress = ts - int(args.time_range[0] * 1e6)
+ MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'GazeMovements identification:', suffix = 'Complete', length = 100)
- print(f'\n{len(fixations)} fixations and {len(saccades)} saccades found')
+ print(f'\n{len(ts_fixations)} fixations and {len(ts_saccades)} saccades found')
# Export fixations analysis
- fixations.as_dataframe().to_csv(fixations_filepath, index=True)
+ ts_fixations.as_dataframe().to_csv(fixations_filepath, index=True)
print(f'Fixations saved into {fixations_filepath}')
# Export saccades analysis
- saccades.as_dataframe().to_csv(saccades_filepath, index=True)
+ ts_saccades.as_dataframe().to_csv(saccades_filepath, index=True)
print(f'Saccades saved into {saccades_filepath}')
# Export gaze status analysis
- gaze_status.as_dataframe().to_csv(gaze_status_filepath, index=True)
+ ts_status.as_dataframe().to_csv(gaze_status_filepath, index=True)
print(f'Gaze status saved into {gaze_status_filepath}')
# Prepare video exportation at the same format than segment video
@@ -173,47 +174,49 @@ def main():
try:
# Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration/1e3, prefix = 'Video with movements processing:', suffix = 'Complete', length = 100)
+ MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = 'Video with movements processing:', suffix = 'Complete', length = 100)
- current_fixation_ts, current_fixation = fixations.pop_first()
+ current_fixation_ts, current_fixation = ts_fixations.pop_first()
current_fixation_time_counter = 0
- current_saccade_ts, current_saccade = saccades.pop_first()
-
+ current_saccade_ts, current_saccade = ts_saccades.pop_first()
+
# Iterate on video frames
for video_ts, video_frame in tobii_segment_video.frames():
- video_ts_ms = video_ts / 1e3
-
- # Draw current fixation
- if len(fixations) > 0:
+ # While current time belongs to the current fixation
+ if video_ts >= current_fixation_ts and video_ts < current_fixation_ts + current_fixation.duration:
- if video_ts_ms > current_fixation_ts + current_fixation.duration:
+ current_fixation_time_counter += 1
- current_fixation_ts, current_fixation = fixations.pop_first()
- current_fixation_time_counter = 1
+ # Draw current fixation
+ cv.circle(video_frame.matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.dispersion), (0, 255, 0), current_fixation_time_counter)
- # Draw saccade
- if len(saccades) > 0:
+ # Check next fixation
+ elif video_ts >= current_fixation_ts + current_fixation.duration and len(ts_fixations) > 0:
- if video_ts_ms > current_saccade_ts + current_saccade.duration:
+ current_fixation_ts, current_fixation = ts_fixations.pop_first()
+ current_fixation_time_counter = 0
- current_saccade_ts, current_saccade = saccades.pop_first()
- start_ts, start_position = current_saccade.positions.pop_first()
- end_ts, end_position = current_saccade.positions.pop_first()
+ # While current time belongs to the current saccade
+ if video_ts >= current_saccade_ts and current_fixation_time_counter == 0:
- cv.line(video_frame.matrix, start_position, end_position, (0, 0, 255), 2)
+ start_ts, start_position = current_saccade.positions.first
+ end_ts, end_position = current_saccade.positions.last
- else:
+ # Draw saccade
+ cv.line(video_frame.matrix, start_position, end_position, (0, 0, 255), 2)
- current_fixation_time_counter += 1
+ # Check next saccade
+ elif video_ts >= current_saccade_ts + current_saccade.duration and len(ts_saccades) > 0:
- cv.circle(video_frame.matrix, current_fixation.centroid, current_fixation.dispersion, (0, 255, 0), current_fixation_time_counter)
+ current_saccade_ts, current_saccade = ts_saccades.pop_first()
+ # Check next gaze
try:
# Get closest gaze position before video timestamp and remove all gaze positions before
- _, nearest_gaze_position = ts_gaze_positions.pop_first_until(video_ts_ms)
+ _, nearest_gaze_position = ts_gaze_positions.pop_first_until(video_ts)
# Draw gaze
nearest_gaze_position.draw(video_frame.matrix)
@@ -224,7 +227,7 @@ def main():
# Write segment timing
cv.rectangle(video_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(video_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+ cv.putText(video_frame.matrix, f'Segment time: {int(video_ts/1e3)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
# Write movement identification parameters
cv.rectangle(video_frame.matrix, (0, 90), (550, 150), (63, 63, 63), -1)
@@ -244,8 +247,8 @@ def main():
output_video.write(video_frame.matrix)
# Update Progress Bar
- progress = video_ts_ms - int(args.time_range[0] * 1e3)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration/1e3, prefix = 'Video with movements processing:', suffix = 'Complete', length = 100)
+ progress = video_ts - int(args.time_range[0] * 1e6)
+ MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Video with movements processing:', suffix = 'Complete', length = 100)
# Exit on 'ctrl+C' interruption
except KeyboardInterrupt:
diff --git a/src/argaze/utils/tobii_stream_aruco_aoi_display.py b/src/argaze/utils/tobii_stream_aruco_aoi_display.py
index c5becff..7eb5196 100644
--- a/src/argaze/utils/tobii_stream_aruco_aoi_display.py
+++ b/src/argaze/utils/tobii_stream_aruco_aoi_display.py
@@ -90,17 +90,19 @@ def main():
aoi3D_scenes = {}
aoi2D_visu_scenes = {}
- for marker_id, aoi_scene_filepath in args.marker_id_scene.items():
+ if args.marker_id_scene != None:
- marker_id = int(marker_id)
-
- aoi3D_scenes[marker_id] = AOI3DScene.AOI3DScene()
- aoi3D_scenes[marker_id].load(aoi_scene_filepath)
+ for marker_id, aoi_scene_filepath in args.marker_id_scene.items():
- print(f'AOI in {os.path.basename(aoi_scene_filepath)} scene related to marker #{marker_id}:')
- for aoi in aoi3D_scenes[marker_id].keys():
+ marker_id = int(marker_id)
+
+ aoi3D_scenes[marker_id] = AOI3DScene.AOI3DScene()
+ aoi3D_scenes[marker_id].load(aoi_scene_filepath)
+
+ print(f'AOI in {os.path.basename(aoi_scene_filepath)} scene related to marker #{marker_id}:')
+ for aoi in aoi3D_scenes[marker_id].keys():
- print(f'\t{aoi}')
+ print(f'\t{aoi}')
def aoi3D_scene_selector(marker_id):
return aoi3D_scenes.get(marker_id, None)