aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2024-02-29 16:40:52 +0100
committerThéo de la Hogue2024-02-29 16:40:52 +0100
commitaddbaaef4265688916f355d4cde24c96f0605cf9 (patch)
tree56913dc313697f794eef9691760baad9a4df2273
parent9c42e9f1ee8208e14dadcb73cf030a9baef236ed (diff)
parentac8cc60d27a57c892354214b04327878b511cc44 (diff)
downloadargaze-addbaaef4265688916f355d4cde24c96f0605cf9.zip
argaze-addbaaef4265688916f355d4cde24c96f0605cf9.tar.gz
argaze-addbaaef4265688916f355d4cde24c96f0605cf9.tar.bz2
argaze-addbaaef4265688916f355d4cde24c96f0605cf9.tar.xz
Merge branch 'fast_idt' into logging
-rw-r--r--docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md4
-rw-r--r--src/argaze.test/ArUcoMarkers/ArUcoCamera.py83
-rw-r--r--src/argaze.test/ArUcoMarkers/utils/aoi_3d.obj7
-rw-r--r--src/argaze.test/ArUcoMarkers/utils/aruco_camera.json39
-rw-r--r--src/argaze.test/DataFeatures.py464
-rw-r--r--src/argaze.test/DataLog/FileWriter.py37
-rw-r--r--src/argaze.test/DataLog/__init__.py0
-rw-r--r--src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py154
-rw-r--r--src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py127
-rw-r--r--src/argaze.test/GazeFeatures.py358
-rw-r--r--src/argaze.test/PupillFeatures.py60
-rw-r--r--src/argaze.test/utils/ts_buffer.json1
-rw-r--r--src/argaze.test/utils/ts_data_file.json82
-rw-r--r--src/argaze/ArFeatures.py70
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoCamera.py4
-rw-r--r--src/argaze/AreaOfInterest/AOIFeatures.py10
-rw-r--r--src/argaze/DataFeatures.py403
-rw-r--r--src/argaze/GazeAnalysis/DeviationCircleCoverage.py2
-rw-r--r--src/argaze/GazeAnalysis/DispersionThresholdIdentification.py156
-rw-r--r--src/argaze/GazeAnalysis/LinearRegression.py6
-rw-r--r--src/argaze/GazeAnalysis/VelocityThresholdIdentification.py145
-rw-r--r--src/argaze/GazeFeatures.py635
-rw-r--r--src/argaze/PupillAnalysis/WorkloadIndex.py44
-rw-r--r--src/argaze/PupillFeatures.py78
24 files changed, 1509 insertions, 1460 deletions
diff --git a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md
index c3c0266..11e561e 100644
--- a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md
+++ b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md
@@ -80,7 +80,7 @@ Calling [ArFrame.look](../../../argaze.md/#argaze.ArFeatures.ArFrame.look) metho
... ar_frame.last_gaze_position
# Check if a gaze movement has been identified
- if ar_frame.last_gaze_movement.valid and ar_frame.last_gaze_movement.finished:
+ if ar_frame.last_gaze_movement and ar_frame.last_gaze_movement.finished:
# Do something with identified fixation
if GazeFeatures.is_fixation(ar_frame.last_gaze_movement):
@@ -125,7 +125,7 @@ This is the last calibrated [GazePosition](../../../argaze.md/#argaze.GazeFeatur
### *ar_frame.last_gaze_movement*
-Last [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement) identified by [ArFrame.gaze_movement_identifier](../../../argaze.md/#argaze.ArFeatures.ArFrame) object from incoming consecutive timestamped gaze positions. If no gaze movement have been identified, it returns an [UnvalidGazeMovement](../../../argaze.md/#argaze.GazeFeatures.UnvalidGazeMovement).
+Last [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement) identified by [ArFrame.gaze_movement_identifier](../../../argaze.md/#argaze.ArFeatures.ArFrame) object from incoming consecutive timestamped gaze positions. If no gaze movement have been identified, it returns an empty [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement).
This could also be the current gaze movement if [ArFrame.filter_in_progress_identification](../../../argaze.md/#argaze.ArFeatures.ArFrame) attribute is false.
In that case, the last gaze movement *finished* flag is false.
diff --git a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py
index 6145f40..a3c5943 100644
--- a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py
+++ b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py
@@ -25,49 +25,46 @@ class TestArUcoCameraClass(unittest.TestCase):
json_filepath = os.path.join(current_directory, 'utils/aruco_camera.json')
# Load test aruco camera
- aruco_camera = ArUcoCamera.ArUcoCamera.from_json(json_filepath)
-
- # Check aruco camera meta data
- self.assertEqual(aruco_camera.name, "TestArUcoCamera")
-
- # Check ArUco detector
- self.assertEqual(aruco_camera.aruco_detector.dictionary.name, "DICT_ARUCO_ORIGINAL")
- self.assertEqual(aruco_camera.aruco_detector.marker_size, 3.0)
- self.assertEqual(aruco_camera.aruco_detector.parameters.cornerRefinementMethod, 3)
- self.assertEqual(aruco_camera.aruco_detector.parameters.aprilTagQuadSigma, 2)
- self.assertEqual(aruco_camera.aruco_detector.parameters.aprilTagDeglitch, 1)
-
- # Check ArUco detector optic parameters
- self.assertEqual(aruco_camera.aruco_detector.optic_parameters.rms, 1.0)
- self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.dimensions, [1920, 1080]))
- self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.K, [[1.0, 0.0, 1.0], [0.0, 1.0, 1.0], [0.0, 0.0, 1.0]]))
- self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.D, [-1.0, -0.5, 0.0, 0.5, 1.0]))
-
- # Check camera scenes
- self.assertEqual(len(aruco_camera.scenes), 2)
- self.assertIsNone(numpy.testing.assert_array_equal(list(aruco_camera.scenes.keys()), ["TestSceneA", "TestSceneB"]))
-
- # Load test scene
- ar_scene = aruco_camera.scenes["TestSceneA"]
-
- # Check Aruco scene
- self.assertEqual(len(ar_scene.aruco_markers_group.places), 2)
- self.assertIsNone(numpy.testing.assert_array_equal(ar_scene.aruco_markers_group.places[0].translation, [1, 0, 0]))
- self.assertIsNone(numpy.testing.assert_array_equal(ar_scene.aruco_markers_group.places[0].rotation, [[1.,0.,0.],[0.,1.,0.],[0.,0.,1.]]))
- self.assertEqual(ar_scene.aruco_markers_group.places[0].marker.identifier, 0)
-
- self.assertIsNone(numpy.testing.assert_array_equal(ar_scene.aruco_markers_group.places[1].translation, [0, 1, 0]))
- self.assertIsNone(numpy.testing.assert_array_almost_equal(ar_scene.aruco_markers_group.places[1].rotation, [[0.,0.,1.],[0., 1.,0.],[-1.,0.,0.]]))
- self.assertEqual(ar_scene.aruco_markers_group.places[1].marker.identifier, 1)
-
- # Check AOI scene
- self.assertEqual(len(ar_scene.aoi_scene.items()), 1)
- self.assertEqual(ar_scene.aoi_scene['Test'].points_number, 4)
- self.assertIsNone(numpy.testing.assert_array_equal(ar_scene.aoi_scene['Test'].size, [1., 1., 0.]))
-
- # Check ArScene
- self.assertEqual(ar_scene.angle_tolerance, 1.0)
- self.assertEqual(ar_scene.distance_tolerance, 2.0)
+ with ArUcoCamera.ArUcoCamera.from_json(json_filepath) as aruco_camera:
+
+ # Check aruco camera meta data
+ self.assertEqual(aruco_camera.name, "TestArUcoCamera")
+
+ # Check ArUco detector
+ self.assertEqual(aruco_camera.aruco_detector.dictionary.name, "DICT_ARUCO_ORIGINAL")
+ self.assertEqual(aruco_camera.aruco_detector.parameters.cornerRefinementMethod, 3)
+ self.assertEqual(aruco_camera.aruco_detector.parameters.aprilTagQuadSigma, 2)
+ self.assertEqual(aruco_camera.aruco_detector.parameters.aprilTagDeglitch, 1)
+
+ # Check ArUco detector optic parameters
+ self.assertEqual(aruco_camera.aruco_detector.optic_parameters.rms, 1.0)
+ self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.dimensions, [1920, 1080]))
+ self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.K, [[1.0, 0.0, 1.0], [0.0, 1.0, 1.0], [0.0, 0.0, 1.0]]))
+ self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.D, [-1.0, -0.5, 0.0, 0.5, 1.0]))
+
+ # Check camera scenes
+ self.assertEqual(len(aruco_camera.scenes), 2)
+ self.assertIsNone(numpy.testing.assert_array_equal(list(aruco_camera.scenes.keys()), ["TestSceneA", "TestSceneB"]))
+
+ # Load test scene
+ ar_scene = aruco_camera.scenes["TestSceneA"]
+
+ # Check Aruco scene
+ self.assertEqual(len(ar_scene.aruco_markers_group.places), 2)
+ self.assertIsNone(numpy.testing.assert_allclose(ar_scene.aruco_markers_group.places[0].corners[0], [-0.5, 1.5, 0.], rtol=0, atol=1e-3))
+ self.assertEqual(ar_scene.aruco_markers_group.places[0].marker.identifier, 0)
+
+ self.assertIsNone(numpy.testing.assert_allclose(ar_scene.aruco_markers_group.places[1].corners[0], [0., 2.5, -1.5], rtol=0, atol=1e-3))
+ self.assertEqual(ar_scene.aruco_markers_group.places[1].marker.identifier, 1)
+
+ # Check layers and AOI scene
+ self.assertEqual(len(ar_scene.layers.items()), 1)
+ self.assertEqual(len(ar_scene.layers["Main"].aoi_scene), 1)
+ self.assertEqual(ar_scene.layers["Main"].aoi_scene['Test'].points_number, 4)
+
+ # Check ArScene
+ self.assertEqual(ar_scene.angle_tolerance, 1.0)
+ self.assertEqual(ar_scene.distance_tolerance, 2.0)
if __name__ == '__main__':
diff --git a/src/argaze.test/ArUcoMarkers/utils/aoi_3d.obj b/src/argaze.test/ArUcoMarkers/utils/aoi_3d.obj
new file mode 100644
index 0000000..92e85bd
--- /dev/null
+++ b/src/argaze.test/ArUcoMarkers/utils/aoi_3d.obj
@@ -0,0 +1,7 @@
+o Test
+v 0.000000 0.000000 0.000000
+v 25.000000 0.000000 0.000000
+v 0.000000 14.960000 0.000000
+v 25.000000 14.960000 0.000000
+s off
+f 1 2 4 3
diff --git a/src/argaze.test/ArUcoMarkers/utils/aruco_camera.json b/src/argaze.test/ArUcoMarkers/utils/aruco_camera.json
index 7648916..980dc9f 100644
--- a/src/argaze.test/ArUcoMarkers/utils/aruco_camera.json
+++ b/src/argaze.test/ArUcoMarkers/utils/aruco_camera.json
@@ -1,10 +1,10 @@
{
"name": "TestArUcoCamera",
+ "size": [1920, 1080],
"aruco_detector": {
"dictionary": {
"name": "DICT_ARUCO_ORIGINAL"
},
- "marker_size": 3.0,
"optic_parameters": {
"rms": 1.0,
"dimensions": [
@@ -45,45 +45,54 @@
"scenes": {
"TestSceneA" : {
"aruco_markers_group": {
- "marker_size": 3.0,
- "dictionary": {
- "name": "DICT_ARUCO_ORIGINAL"
- },
+ "dictionary": "DICT_ARUCO_ORIGINAL",
"places": {
"0": {
"translation": [1, 0, 0],
- "rotation": [0, 0, 0]
+ "rotation": [0, 0, 0],
+ "size": 3.0
},
"1": {
"translation": [0, 1, 0],
- "rotation": [0, 90, 0]
+ "rotation": [0, 90, 0],
+ "size": 3.0
}
}
},
- "aoi_scene": "aoi.obj",
+ "layers": {
+ "Main" : {
+ "aoi_scene": "aoi_3d.obj"
+ }
+ },
"angle_tolerance": 1.0,
"distance_tolerance": 2.0
},
"TestSceneB" : {
"aruco_markers_group": {
- "marker_size": 3.0,
- "dictionary": {
- "name": "DICT_ARUCO_ORIGINAL"
- },
+ "dictionary": "DICT_ARUCO_ORIGINAL",
"places": {
"0": {
"translation": [1, 0, 0],
- "rotation": [0, 0, 0]
+ "rotation": [0, 0, 0],
+ "size": 3.0
},
"1": {
"translation": [0, 1, 0],
- "rotation": [0, 90, 0]
+ "rotation": [0, 90, 0],
+ "size": 3.0
}
}
},
- "aoi_scene": "aoi.obj",
+ "layers": {
+ "Main" : {
+ "aoi_scene": "aoi_3d.obj"
+ }
+ },
"angle_tolerance": 1.0,
"distance_tolerance": 2.0
}
+ },
+ "layers": {
+ "Main": {}
}
} \ No newline at end of file
diff --git a/src/argaze.test/DataFeatures.py b/src/argaze.test/DataFeatures.py
index b30c560..c64ad4c 100644
--- a/src/argaze.test/DataFeatures.py
+++ b/src/argaze.test/DataFeatures.py
@@ -16,334 +16,360 @@ from argaze import DataFeatures
import pandas
import numpy
-def random_data_buffer(size, data_keys):
- """ Generate a random TimeStampedBuffer for testing purpose.
+class BasicDataClass(DataFeatures.TimestampedObject):
+ """Define a basic dataclass for testing purpose."""
+
+ def __init__(self, value: tuple = (), message: str = None, **kwargs):
+
+ DataFeatures.TimestampedObject.__init__(self, **kwargs)
+
+ self.__value = value
+ self.__message = message
+
+ @property
+ def value(self):
+ return self.__value
+
+ @property
+ def message(self):
+ return self.__message
+
+def random_data_list(size):
+ """ Generate a random TimestampedObjectsList for testing purpose.
Timestamps are current time.
Values are tuples containing an expected value and a random value.
"""
-
import random
import time
- ts_buffer = DataFeatures.TimeStampedBuffer()
+ data_list = []
for i in range(0, size):
# Edit data
- random_data = {}
- for key in data_keys:
- random_data[key] = (i, random.random())
+ random_data = BasicDataClass((i, random.random()), f'test_{i}')
+
+ # Timestamp data
+ random_data.timestamp = time.time()
# Store data
- ts_buffer[time.time()] = random_data
+ data_list.append(random_data)
time.sleep(0.0001)
- return ts_buffer
+ return DataFeatures.TimestampedObjectsList(BasicDataClass, data_list)
-@dataclass()
-class BasicDataClass():
- """Define a basic dataclass for testing purpose."""
-
- value: tuple
+# DEBUG
+'''
+print('test_as_dataframe: export ts_data_file.json')
+current_directory = os.path.dirname(os.path.abspath(__file__))
+json_filepath = os.path.join(current_directory, 'utils/ts_data_file.json')
+random_data_list(10).to_json(json_filepath)
+'''
-class TestTimeStampedBufferClass(unittest.TestCase):
- """Test TimeStampedBuffer class."""
+class TestTimestampedObjectsListClass(unittest.TestCase):
+ """Test TimestampedObjectsList class."""
def test_new(self):
- """Test TimeStampedBuffer creation."""
+ """Test TimestampedObjectsList creation."""
+
+ # Check TimestampedObjectsList length after creation
+ self.assertEqual(len(DataFeatures.TimestampedObjectsList(BasicDataClass)), 0)
+
+ # Check TimestampedObjectsList timestamps after creation
+ self.assertEqual(DataFeatures.TimestampedObjectsList(BasicDataClass).timestamps(), [])
+
+ # Check TimestampedObjectsList items after creation
+ self.assertEqual(DataFeatures.TimestampedObjectsList(BasicDataClass), [])
+
+ # Check that TimestampedObjectsList creation fails when data are not timestamped
+ with self.assertRaises(ValueError):
+
+ data_list = [BasicDataClass((0, 0))]
+ DataFeatures.TimestampedObjectsList(BasicDataClass, data_list)
- # Check TimeStampedBuffer length after creation
- self.assertEqual(len(DataFeatures.TimeStampedBuffer()), 0)
- self.assertEqual(len(DataFeatures.TimeStampedBuffer({0: ""})), 1)
- self.assertEqual(len(DataFeatures.TimeStampedBuffer({0.1: ""})), 1)
- self.assertEqual(len(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})), 2)
+ def test_as_dataframe(self):
+ """Test TimestampedObjectsList as_dataframe method."""
- # Check TimeStampedBuffer keys after creation
- self.assertEqual(list(DataFeatures.TimeStampedBuffer().keys()), [])
- self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: ""}).keys()), [0])
- self.assertEqual(list(DataFeatures.TimeStampedBuffer({0.1: ""}).keys()), [0.1])
- self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).keys()), [0, 1])
+ data_frame = random_data_list(10).as_dataframe()
- # Check TimeStampedBuffer items after creation
- self.assertEqual(list(DataFeatures.TimeStampedBuffer().items()), [])
- self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: ""}).items()), [(0, "")])
- self.assertEqual(list(DataFeatures.TimeStampedBuffer({0.1: ""}).items()), [(0.1, "")])
- self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).items()), [(0, "A"), (1, "B")])
+ # Check dataframe conversion
+ self.assertEqual(data_frame.index.name, "timestamp")
+ self.assertEqual(data_frame.index.size, 10)
+ self.assertEqual(data_frame.columns.size, 2)
+ self.assertEqual(data_frame.index.dtype, 'float64')
+ self.assertEqual(data_frame["value"].dtype, 'object')
+ self.assertEqual(data_frame["message"].dtype, 'object')
- # Check that TimeStampedBuffer creation fails when keys are not numbers
- with self.assertRaises(AssertionError):
+ # Check data exclusion option
+ data_frame = random_data_list(10).as_dataframe(exclude=["value"])
- DataFeatures.TimeStampedBuffer({"first": ""})
+ self.assertEqual(data_frame.index.name, "timestamp")
+ self.assertEqual(data_frame.index.size, 10)
+ self.assertEqual(data_frame.columns.size, 1)
+ # Check dataframe split option
+ data_frame = random_data_list(10).as_dataframe(split={"value": ["value_0", "value_1"]})
+
+ self.assertEqual(data_frame.index.name, "timestamp")
+ self.assertEqual(data_frame.index.size, 10)
+ self.assertEqual(data_frame.columns.size, 3)
+ self.assertEqual(data_frame["value_0"].dtype, 'int64')
+ self.assertEqual(data_frame["value_1"].dtype, 'float64')
+
def test_from_dataframe(self):
- """Test TimeStampedBuffer creation from pandas dataframe."""
+ """Test TimestampedObjectsList creation from pandas dataframe."""
- ts_buffer = random_data_buffer(10, ["data_A", "data_B", "data_C"])
+ data_frame = random_data_list(10).as_dataframe()
# Check dataframe conversion
- ts_buffer_from_df = DataFeatures.TimeStampedBuffer.from_dataframe(ts_buffer.as_dataframe())
-
- self.assertEqual(len(ts_buffer_from_df), 10)
+ data_list = DataFeatures.TimestampedObjectsList.from_dataframe(BasicDataClass, data_frame)
+ self.assertEqual(len(data_list), 10)
+
def test_from_json(self):
- """Test TimeStampedBuffer creation from json file."""
+ """Test TimestampedObjectsList creation from json file."""
# Edit dataframe csv file path
current_directory = os.path.dirname(os.path.abspath(__file__))
- json_filepath = os.path.join(current_directory, 'utils/ts_buffer.json')
+ json_filepath = os.path.join(current_directory, 'utils/ts_data_file.json')
- # Load TimeStampedBuffer from json file
- ts_buffer = DataFeatures.TimeStampedBuffer.from_json(json_filepath)
-
- self.assertEqual(len(ts_buffer), 3)
+ # Load TimestampedObjectsList from json file
+ data_list = DataFeatures.TimestampedObjectsList.from_json(BasicDataClass, json_filepath)
+ self.assertEqual(len(data_list), 10)
+ self.assertEqual(type(data_list[0]), BasicDataClass)
+
def test___repr__(self):
- """Test TimeStampedBuffer string representation."""
+ """Test TimestampedObjectsList string representation."""
- self.assertEqual(repr(DataFeatures.TimeStampedBuffer()), "{}")
- self.assertEqual(repr(DataFeatures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}")
- self.assertEqual(repr(DataFeatures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}")
+ self.assertEqual(repr(DataFeatures.TimestampedObjectsList(BasicDataClass)), "[]")
- data = BasicDataClass((123, 456))
- ts_buffer = DataFeatures.TimeStampedBuffer({0: data})
-
- self.assertEqual(repr(ts_buffer), "{\"0\": {\"value\": [123, 456]}}")
-
- array = numpy.zeros(3)
- ts_buffer = DataFeatures.TimeStampedBuffer({0: array})
-
- self.assertEqual(repr(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}")
+ data_list = [BasicDataClass((0, 0), 'test', timestamp=0)]
+ self.assertEqual(repr(DataFeatures.TimestampedObjectsList(BasicDataClass, data_list)), "[{\"value\": [0, 0], \"message\": \"test\", \"timestamp\": 0}]")
def test___str__(self):
- """Test TimeStampedBuffer string representation."""
-
- self.assertEqual(str(DataFeatures.TimeStampedBuffer()), "{}")
- self.assertEqual(str(DataFeatures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}")
- self.assertEqual(str(DataFeatures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}")
+ """Test TimestampedObjectsList string representation."""
- data = BasicDataClass((123, 456))
- ts_buffer = DataFeatures.TimeStampedBuffer({0: data})
+ self.assertEqual(str(DataFeatures.TimestampedObjectsList(BasicDataClass)), "[]")
- self.assertEqual(str(ts_buffer), "{\"0\": {\"value\": [123, 456]}}")
-
- array = numpy.zeros(3)
- ts_buffer = DataFeatures.TimeStampedBuffer({0: array})
-
- self.assertEqual(str(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}")
+ data_list = [BasicDataClass((0, 0), 'test', timestamp=0)]
+ self.assertEqual(str(DataFeatures.TimestampedObjectsList(BasicDataClass, data_list)), "[{\"value\": [0, 0], \"message\": \"test\", \"timestamp\": 0}]")
def test_append(self):
- """Test TimeStampedBuffer append method."""
-
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})
- ts_buffer_next = DataFeatures.TimeStampedBuffer({2: "C", 3: "D"})
-
- self.assertEqual(len(ts_buffer.append(ts_buffer_next)), 4)
- self.assertEqual(list(ts_buffer.append(ts_buffer_next).keys()), [0, 1, 2, 3])
+ """Test TimestampedObjectsList append method."""
- def test_first(self):
- """Test TimeStampedBuffer first property."""
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass)
+ next_data = BasicDataClass((0, 0), 'test', timestamp=0)
- self.assertEqual(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).first, (0, "A"))
+ data_list.append(next_data)
- # Check that accessing to first item of an empty TimeStampedBuffer fails
- with self.assertRaises(IndexError):
+ self.assertEqual(len(data_list), 1)
+ self.assertEqual(data_list.timestamps(), [0])
+
+ def test_pop_first(self):
+ """Test TimestampedObjectsList pop_first method."""
- DataFeatures.TimeStampedBuffer().first
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \
+ [
+ BasicDataClass(message="A", timestamp=0),
+ BasicDataClass(message="B", timestamp=1)
+ ])
- def test_pop_first(self):
- """Test TimeStampedBuffer pop_first method."""
+ first = data_list.pop(0)
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})
+ self.assertEqual(len(data_list), 1)
+ self.assertEqual(first.message, "A")
+ self.assertEqual(first.timestamp, 0)
+ self.assertEqual(data_list[0].message, "B")
+ self.assertEqual(data_list[0].timestamp, 1)
- self.assertEqual(ts_buffer.pop_first(), (0, "A"))
- self.assertEqual(len(ts_buffer), 1)
- self.assertEqual(ts_buffer.first, (1, "B"))
-
def test_pop_last_until(self):
- """Test TimeStampedBuffer pop_last_until method."""
+ """Test TimestampedObjectsList pop_last_until method."""
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \
+ [
+ BasicDataClass(message="A", timestamp=0),
+ BasicDataClass(message="B", timestamp=1),
+ BasicDataClass(message="C", timestamp=2),
+ BasicDataClass(message="D", timestamp=3)
+ ])
# Check pop until an existing timestamp
- pop_last_until_2 = ts_buffer.pop_last_until(2)
+ pop_last_until_2 = data_list.pop_last_until(2)
- self.assertEqual(pop_last_until_2, (2, "C"))
- self.assertEqual(len(ts_buffer), 2)
- self.assertEqual(ts_buffer.first, (2, "C"))
+ self.assertEqual(pop_last_until_2.message, "C")
+ self.assertEqual(pop_last_until_2.timestamp, 2)
+ self.assertEqual(len(data_list), 2)
+ self.assertEqual(data_list[0].message, "C")
+ self.assertEqual(data_list[0].timestamp, 2)
# Check first until an none existing timestamp
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
-
- pop_last_until_1dot5 = ts_buffer.pop_last_until(1.5)
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \
+ [
+ BasicDataClass(message="A", timestamp=0),
+ BasicDataClass(message="B", timestamp=1),
+ BasicDataClass(message="C", timestamp=2),
+ BasicDataClass(message="D", timestamp=3)
+ ])
+
+ pop_last_until_1dot5 = data_list.pop_last_until(1.5)
+
+ self.assertEqual(pop_last_until_1dot5.message, "B")
+ self.assertEqual(pop_last_until_1dot5.timestamp, 1)
+ self.assertEqual(len(data_list), 3)
+ self.assertEqual(data_list[0].message, "B")
+ self.assertEqual(data_list[0].timestamp, 1)
- self.assertEqual(pop_last_until_1dot5, (1, "B"))
- self.assertEqual(len(ts_buffer), 3)
- self.assertEqual(ts_buffer.first, (1, "B"))
-
def test_pop_last_before(self):
- """Test TimeStampedBuffer pop_last_before method."""
+ """Test TimestampedObjectsList pop_last_before method."""
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \
+ [
+ BasicDataClass(message="A", timestamp=0),
+ BasicDataClass(message="B", timestamp=1),
+ BasicDataClass(message="C", timestamp=2),
+ BasicDataClass(message="D", timestamp=3)
+ ])
# Check pop until an existing timestamp
- last_before_2 = ts_buffer.pop_last_before(2)
+ last_before_2 = data_list.pop_last_before(2)
- self.assertEqual(last_before_2, (1, "B"))
- self.assertEqual(len(ts_buffer), 2)
- self.assertEqual(ts_buffer.first, (2, "C"))
+ self.assertEqual(last_before_2.message, "B")
+ self.assertEqual(last_before_2.timestamp, 1)
+ self.assertEqual(len(data_list), 2)
+ self.assertEqual(data_list[0].message, "C")
+ self.assertEqual(data_list[0].timestamp, 2)
# Check pop until an none existing timestamp
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
-
- first_until_1dot5 = ts_buffer.pop_last_before(1.5)
-
- self.assertEqual(first_until_1dot5, (1, "B"))
- self.assertEqual(len(ts_buffer), 2)
- self.assertEqual(ts_buffer.first, (2, "C"))
-
- def test_last(self):
- """Test TimeStampedBuffer last property."""
-
- self.assertEqual(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).last, (1, "B"))
-
- # Check that accessing to last item of an empty TimeStampedBuffer fails
- with self.assertRaises(IndexError):
-
- DataFeatures.TimeStampedBuffer().last
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \
+ [
+ BasicDataClass(message="A", timestamp=0),
+ BasicDataClass(message="B", timestamp=1),
+ BasicDataClass(message="C", timestamp=2),
+ BasicDataClass(message="D", timestamp=3)
+ ])
+
+ first_until_1dot5 = data_list.pop_last_before(1.5)
+
+ self.assertEqual(first_until_1dot5.message, "B")
+ self.assertEqual(first_until_1dot5.timestamp, 1)
+ self.assertEqual(len(data_list), 2)
+ self.assertEqual(data_list[0].message, "C")
+ self.assertEqual(data_list[0].timestamp, 2)
def test_pop_last(self):
- """Test TimeStampedBuffer pop_last method."""
+ """Test TimestampedObjectsList pop_last method."""
+
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \
+ [
+ BasicDataClass(message="A", timestamp=0),
+ BasicDataClass(message="B", timestamp=1)
+ ])
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})
+ last = data_list.pop(-1)
- self.assertEqual(ts_buffer.pop_last(), (1, "B"))
- self.assertEqual(len(ts_buffer), 1)
- self.assertEqual(ts_buffer.last, (0, "A"))
+ self.assertEqual(len(data_list), 1)
+ self.assertEqual(last.message, "B")
+ self.assertEqual(last.timestamp, 1)
+ self.assertEqual(data_list[0].message, "A")
+ self.assertEqual(data_list[0].timestamp, 0)
def test_get_first_from(self):
- """Test TimeStampedBuffer get_first_from method."""
+ """Test TimestampedObjectsList get_first_from method."""
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \
+ [
+ BasicDataClass(message="A", timestamp=0),
+ BasicDataClass(message="B", timestamp=1),
+ BasicDataClass(message="C", timestamp=2),
+ BasicDataClass(message="D", timestamp=3)
+ ])
- get_first_from_1 = ts_buffer.get_first_from(1)
+ get_first_from_1 = data_list.get_first_from(1)
- self.assertEqual(get_first_from_1, (1, "B"))
- self.assertEqual(len(ts_buffer), 4)
+ self.assertEqual(get_first_from_1.message, "B")
+ self.assertEqual(get_first_from_1.timestamp, 1)
+ self.assertEqual(len(data_list), 4)
- get_first_from_1dot5 = ts_buffer.get_first_from(1.5)
+ get_first_from_1dot5 = data_list.get_first_from(1.5)
- self.assertEqual(get_first_from_1dot5, (2, "C"))
+ self.assertEqual(get_first_from_1dot5.message, "C")
+ self.assertEqual(get_first_from_1dot5.timestamp, 2)
- get_first_from_0 = ts_buffer.get_first_from(0)
+ get_first_from_0 = data_list.get_first_from(0)
- self.assertEqual(get_first_from_0, (0, "A"))
+ self.assertEqual(get_first_from_0.message, "A")
+ self.assertEqual(get_first_from_0.timestamp, 0)
# Check that accessing to lately timestamp fails
with self.assertRaises(KeyError):
- ts_buffer.get_first_from(4)
+ data_list.get_first_from(4)
def test_get_last_before(self):
- """Test TimeStampedBuffer get_last_before method."""
+ """Test TimestampedObjectsList get_last_before method."""
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \
+ [
+ BasicDataClass(message="A", timestamp=0),
+ BasicDataClass(message="B", timestamp=1),
+ BasicDataClass(message="C", timestamp=2),
+ BasicDataClass(message="D", timestamp=3)
+ ])
- get_last_before_2 = ts_buffer.get_last_before(2)
+ get_last_before_2 = data_list.get_last_before(2)
- self.assertEqual(get_last_before_2, (1, "B"))
- self.assertEqual(len(ts_buffer), 4)
+ self.assertEqual(get_last_before_2.message, "B")
+ self.assertEqual(get_last_before_2.timestamp, 1)
+ self.assertEqual(len(data_list), 4)
- get_last_before_1dot5 = ts_buffer.get_last_before(1.5)
+ get_last_before_1dot5 = data_list.get_last_before(1.5)
- self.assertEqual(get_last_before_1dot5, (1, "B"))
+ self.assertEqual(get_last_before_1dot5.message,"B")
+ self.assertEqual(get_last_before_1dot5.timestamp, 1)
- get_last_before_4 = ts_buffer.get_last_before(4)
+ get_last_before_4 = data_list.get_last_before(4)
- self.assertEqual(get_last_before_4, (3, "D"))
+ self.assertEqual(get_last_before_4.message, "D")
+ self.assertEqual(get_last_before_4.timestamp, 3)
# Check that accessing to early timestamp fails
with self.assertRaises(KeyError):
- ts_buffer.get_last_before(-1)
-
-
+ data_list.get_last_before(-1)
+
def test_get_last_until(self):
- """Test TimeStampedBuffer get_last_until method."""
+ """Test TimestampedObjectsList get_last_until method."""
- ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+ data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \
+ [
+ BasicDataClass(message="A", timestamp=0),
+ BasicDataClass(message="B", timestamp=1),
+ BasicDataClass(message="C", timestamp=2),
+ BasicDataClass(message="D", timestamp=3)
+ ])
- get_last_until_2 = ts_buffer.get_last_until(2)
+ get_last_until_2 = data_list.get_last_until(2)
- self.assertEqual(get_last_until_2, (2, "C"))
- self.assertEqual(len(ts_buffer), 4)
+ self.assertEqual(get_last_until_2.message, "C")
+ self.assertEqual(get_last_until_2.timestamp, 2)
+ self.assertEqual(len(data_list), 4)
- get_last_until_1dot5 = ts_buffer.get_last_until(1.5)
+ get_last_until_1dot5 = data_list.get_last_until(1.5)
- self.assertEqual(get_last_until_1dot5, (1, "B"))
+ self.assertEqual(get_last_until_1dot5.message, "B")
+ self.assertEqual(get_last_until_1dot5.timestamp, 1)
- get_last_until_4 = ts_buffer.get_last_until(4)
+ get_last_until_4 = data_list.get_last_until(4)
- self.assertEqual(get_last_until_4, (3, "D"))
+ self.assertEqual(get_last_until_4.message, "D")
+ self.assertEqual(get_last_until_4.timestamp, 3)
# Check that accessing to early timestamp fails
with self.assertRaises(KeyError):
- ts_buffer.get_last_until(-1)
-
- def test_as_dataframe(self):
- """Test TimeStampedBuffer as_dataframe method."""
-
- ts_buffer = random_data_buffer(10, ["data_A", "data_B", "data_C"])
-
- # Check dataframe conversion
- ts_buffer_dataframe = ts_buffer.as_dataframe()
-
- self.assertEqual(ts_buffer_dataframe.index.name, "timestamp")
- self.assertEqual(ts_buffer_dataframe.index.size, 10)
-
- self.assertEqual(ts_buffer_dataframe.columns.size, 3)
- self.assertEqual(ts_buffer_dataframe.columns[0], "data_A")
- self.assertEqual(ts_buffer_dataframe.columns[1], "data_B")
- self.assertEqual(ts_buffer_dataframe.columns[2], "data_C")
-
- self.assertEqual(ts_buffer_dataframe.index.dtype, 'float64')
- self.assertEqual(ts_buffer_dataframe["data_A"].dtype, 'object')
- self.assertEqual(ts_buffer_dataframe["data_B"].dtype, 'object')
- self.assertEqual(ts_buffer_dataframe["data_C"].dtype, 'object')
-
- # Check data exclusion option
- ts_buffer_dataframe = ts_buffer.as_dataframe(exclude=["data_B"])
-
- self.assertEqual(ts_buffer_dataframe.index.name, "timestamp")
- self.assertEqual(ts_buffer_dataframe.index.size, 10)
-
- self.assertEqual(ts_buffer_dataframe.columns.size, 2)
- self.assertEqual(ts_buffer_dataframe.columns[0], "data_A")
- self.assertEqual(ts_buffer_dataframe.columns[1], "data_C")
-
- # Check dataframe split option
- ts_buffer_dataframe = ts_buffer.as_dataframe(split={"data_B": ["data_B0", "data_B1"]})
-
- self.assertEqual(ts_buffer_dataframe.index.name, "timestamp")
- self.assertEqual(ts_buffer_dataframe.index.size, 10)
-
- self.assertEqual(ts_buffer_dataframe.columns.size, 4)
- self.assertEqual(ts_buffer_dataframe.columns[0], "data_A")
- self.assertEqual(ts_buffer_dataframe.columns[1], "data_B0")
- self.assertEqual(ts_buffer_dataframe.columns[2], "data_B1")
- self.assertEqual(ts_buffer_dataframe.columns[3], "data_C")
-
- # Check dataframe conversion with dataclass
- data = BasicDataClass((123, 456))
- ts_buffer_dataframe = DataFeatures.TimeStampedBuffer({0: data}).as_dataframe()
-
- self.assertEqual(ts_buffer_dataframe.index.name, "timestamp")
- self.assertEqual(ts_buffer_dataframe.index.size, 1)
-
- self.assertEqual(ts_buffer_dataframe.columns.size, 1)
- self.assertEqual(ts_buffer_dataframe.columns[0], "value")
-
+ data_list.get_last_until(-1)
if __name__ == '__main__':
diff --git a/src/argaze.test/DataLog/FileWriter.py b/src/argaze.test/DataLog/FileWriter.py
deleted file mode 100644
index 648385c..0000000
--- a/src/argaze.test/DataLog/FileWriter.py
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/usr/bin/env python
-
-""" """
-
-__author__ = "Théo de la Hogue"
-__credits__ = []
-__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
-__license__ = "BSD"
-
-import unittest
-import os
-
-from argaze import DataFeatures
-from argaze.DataLog import FileWriter
-from argaze.utils import UtilsFeatures
-
-DataFeaturesTest = UtilsFeatures.importFromTestPackage('DataFeatures')
-
-class TestTimeStampedDataLogger(unittest.TestCase):
- """Test DataLogger class."""
-
- def test_creation(self):
- """Test logger creation."""
-
- file_writer = FileWriter.TimeStampedDataLogger(path='./_export/logs/data.txt', separator=',')
-
- # Check file creation
- self.assertEqual(os.path.exists('./_export/logs/data.txt'), True)
-
- # Write into file
- file_writer.emit(0, 'A')
- file_writer.emit(1, 'B')
- file_writer.emit(2, 'C')
-
-if __name__ == '__main__':
-
- unittest.main() \ No newline at end of file
diff --git a/src/argaze.test/DataLog/__init__.py b/src/argaze.test/DataLog/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/src/argaze.test/DataLog/__init__.py
+++ /dev/null
diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
index b7475b5..156f6f1 100644
--- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
@@ -47,11 +47,13 @@ def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time
else:
- gaze_position = GazeFeatures.UnvalidGazePosition()
+ gaze_position = GazeFeatures.GazePosition()
+
+ # Timestamp gaze position
+ gaze_position.timestamp = time.time() - start_time + start_ts
# Store gaze position
- ts = time.time() - start_time + start_ts
- ts_gaze_positions[ts] = gaze_position
+ ts_gaze_positions.append(gaze_position)
return ts_gaze_positions
@@ -85,11 +87,13 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl
else:
- gaze_position = GazeFeatures.UnvalidGazePosition()
+ gaze_position = GazeFeatures.GazePosition()
+
+ # Timestamp gaze position
+ gaze_position.timestamp = time.time() - start_time + start_ts
# Store gaze position
- ts = time.time() - start_time + start_ts
- ts_gaze_positions[ts] = gaze_position
+ ts_gaze_positions.append(gaze_position)
return ts_gaze_positions
@@ -115,14 +119,14 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
self.assertEqual(len(ts_status), size)
# Check fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
-
+
def test_fixation_and_direct_saccade_identification(self):
"""Test DispersionThresholdIdentification fixation and saccade identification."""
@@ -134,9 +138,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
max_time = 0.1
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
- ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0])
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp)
- ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B)
+ ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B
gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2)
ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions)
@@ -147,45 +151,39 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
self.assertEqual(len(ts_status), size * 2)
# Check first fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
# Check first saccade
- ts, saccade = ts_saccades.pop_first()
+ saccade = ts_saccades.pop(0)
- self.assertEqual(len(saccade.positions.keys()), 2)
+ self.assertEqual(len(saccade), 2)
self.assertGreaterEqual(saccade.duration, min_time)
self.assertLessEqual(saccade.duration, max_time)
self.assertLessEqual(saccade.finished, True)
# Check that last position of a movement is equal to first position of next movement
- last_ts, last_position = fixation.positions.last
- first_ts, first_position = saccade.positions.first
-
- self.assertEqual(last_ts, first_ts)
- self.assertEqual(last_position.value, first_position.value)
+ self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp)
+ self.assertEqual(fixation[-1].value, saccade[0].value)
# Check second fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
# Check that last position of a movement is equal to first position of next movement
- last_ts, last_position = saccade.positions.last
- first_ts, first_position = fixation.positions.first
-
- self.assertEqual(last_ts, first_ts)
- self.assertEqual(last_position.value, first_position.value)
-
+ self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp)
+ self.assertEqual(saccade[-1].value, fixation[0].value)
+
def test_fixation_and_short_saccade_identification(self):
"""Test DispersionThresholdIdentification fixation and saccade identification."""
@@ -199,10 +197,10 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
max_time = 0.1
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
- ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A.last[0])
- ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions.last[0])
+ ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A[-1].timestamp)
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions[-1].timestamp)
- ts_gaze_positions = ts_gaze_positions_A.append(ts_move_positions).append(ts_gaze_positions_B)
+ ts_gaze_positions = ts_gaze_positions_A + ts_move_positions + ts_gaze_positions_B
gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2)
ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions)
@@ -213,47 +211,41 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
self.assertEqual(len(ts_status), size * 2 + move)
# Check first fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
# Check first saccade
- ts, saccade = ts_saccades.pop_first()
+ saccade = ts_saccades.pop(0)
- self.assertEqual(len(saccade.positions.keys()), move + 2)
+ self.assertEqual(len(saccade), move + 2)
self.assertGreaterEqual(saccade.duration, (move + 1) * min_time)
self.assertLessEqual(saccade.duration, (move + 1) * max_time)
self.assertLessEqual(saccade.finished, True)
# Check that last position of a movement is equal to first position of next movement
- last_ts, last_position = fixation.positions.last
- first_ts, first_position = saccade.positions.first
-
- self.assertEqual(last_ts, first_ts)
- self.assertEqual(last_position.value, first_position.value)
+ self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp)
+ self.assertEqual(fixation[-1].value, saccade[0].value)
# Check second fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
# Check that last position of a movement is equal to first position of next movement
- last_ts, last_position = saccade.positions.last
- first_ts, first_position = fixation.positions.first
-
- self.assertEqual(last_ts, first_ts)
- self.assertEqual(last_position.value, first_position.value)
-
- def test_invalid_gaze_position(self):
- """Test DispersionThresholdIdentification fixation and saccade identification with invalid gaze position."""
+ self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp)
+ self.assertEqual(saccade[-1].value, fixation[0].value)
+
+ def test_empty_gaze_position(self):
+ """Test DispersionThresholdIdentification fixation and saccade identification with empty gaze position."""
size = 15
center = (0, 0)
@@ -273,23 +265,23 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
self.assertEqual(len(ts_status), size-3)
# Check first fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), 7)
+ self.assertEqual(len(fixation), 7)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, 6 * min_time)
self.assertLessEqual(fixation.duration, 6 * max_time)
self.assertLessEqual(fixation.finished, True)
# Check second fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), 5)
+ self.assertEqual(len(fixation), 5)
self.assertLessEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, 4 * min_time)
self.assertLessEqual(fixation.duration, 4 * max_time)
self.assertLessEqual(fixation.finished, True)
-
+
def test_fixation_overlapping(self):
"""Test Fixation overlap function."""
@@ -302,8 +294,8 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
max_time = 0.1
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
- ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0])
- ts_gaze_positions_C = build_gaze_fixation(size, center_C, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_B.last[0])
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp)
+ ts_gaze_positions_C = build_gaze_fixation(size, center_C, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_B[-1].timestamp)
fixation_A = DispersionThresholdIdentification.Fixation(ts_gaze_positions_A)
fixation_B = DispersionThresholdIdentification.Fixation(ts_gaze_positions_B)
@@ -329,7 +321,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
max_time = 0.1
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
- ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0])
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp)
ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B)
@@ -342,14 +334,14 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
self.assertEqual(len(ts_status), size*2)
# Check unique fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size * 2)
+ self.assertEqual(len(fixation), size * 2)
#self.assertGreaterEqual(fixation.deviation_max, deviation_max)
self.assertGreaterEqual(fixation.duration, (2 * size - 1) * min_time)
self.assertLessEqual(fixation.duration, (2 * size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
-
+
def test_identification_browsing(self):
"""Test DispersionThresholdIdentification identification browsing."""
@@ -361,23 +353,20 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
max_time = 0.1
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
- ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0])
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp)
- ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B)
+ ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B
gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2)
- # Get last ts to terminate identification on last gaze position
- last_ts, _ = ts_gaze_positions.last
-
# Iterate on gaze positions
- for ts, gaze_position in ts_gaze_positions.items():
+ for gaze_position in ts_gaze_positions:
- finished_gaze_movement = gaze_movement_identifier.identify(ts, gaze_position, terminate=(ts == last_ts))
+ finished_gaze_movement = gaze_movement_identifier.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == ts_gaze_positions[-1].timestamp))
if GazeFeatures.is_fixation(finished_gaze_movement):
- self.assertEqual(len(finished_gaze_movement.positions.keys()), size)
+ self.assertEqual(len(finished_gaze_movement), size)
self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max)
self.assertGreaterEqual(finished_gaze_movement.duration, (size-1) * min_time)
self.assertLessEqual(finished_gaze_movement.duration, (size-1) * max_time)
@@ -385,29 +374,20 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
elif GazeFeatures.is_saccade(finished_gaze_movement):
- self.assertEqual(len(finished_gaze_movement.positions.keys()), 2)
+ self.assertEqual(len(finished_gaze_movement), 2)
self.assertGreaterEqual(finished_gaze_movement.duration, min_time)
self.assertLessEqual(finished_gaze_movement.duration, max_time)
self.assertLessEqual(finished_gaze_movement.finished, True)
- # Check that last gaze position date is not equal to given gaze position date
- if finished_gaze_movement.valid:
-
- last_ts, _ = finished_gaze_movement.positions.last
-
- self.assertNotEqual(last_ts, ts)
-
# Check that last gaze position date of current fixation is equal to given gaze position date
# NOTE: This is not true for saccade as, for I-DT, there is a minimal time window while the gaze movement is unknown
current_gaze_movement = gaze_movement_identifier.current_gaze_movement
- if current_gaze_movement.valid:
+ if current_gaze_movement:
if GazeFeatures.is_fixation(current_gaze_movement):
- last_ts, _ = current_gaze_movement.positions.last
-
- self.assertEqual(last_ts, ts)
-
+ self.assertEqual(current_gaze_movement[-1].timestamp, gaze_position.timestamp)
+
def test_identification_generator(self):
"""Test DispersionThresholdIdentification identification using generator."""
@@ -419,17 +399,17 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
max_time = 0.1
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
- ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0])
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp)
- ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B)
+ ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B
gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2)
- for ts, finished_gaze_movement in gaze_movement_identifier(ts_gaze_positions):
+ for finished_gaze_movement in gaze_movement_identifier(ts_gaze_positions):
if GazeFeatures.is_fixation(finished_gaze_movement):
- self.assertEqual(len(finished_gaze_movement.positions.keys()), size)
+ self.assertEqual(len(finished_gaze_movement), size)
self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max)
self.assertGreaterEqual(finished_gaze_movement.duration, size * min_time)
self.assertLessEqual(finished_gaze_movement.duration, size * max_time)
@@ -437,7 +417,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase):
elif GazeFeatures.is_saccade(finished_gaze_movement):
- self.assertEqual(len(finished_gaze_movement.positions.keys()), 2)
+ self.assertEqual(len(finished_gaze_movement), 2)
self.assertGreaterEqual(finished_gaze_movement.duration, 2 * min_time)
self.assertLessEqual(finished_gaze_movement.duration, 2 * max_time)
self.assertLessEqual(finished_gaze_movement.finished, True)
diff --git a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py
index 425d592..262cfc0 100644
--- a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py
+++ b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py
@@ -17,8 +17,8 @@ from argaze.GazeAnalysis import VelocityThresholdIdentification
import numpy
-def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float, min_time: float, max_time: float, start_ts: float = 0., validity: list = []):
- """ Generate N TimeStampedGazePositions strating from a starting position for testing purpose.
+def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time: float, max_time: float, start_ts: float = 0., validity: list = []):
+ """ Generate N TimeStampedGazePsoitions dispersed around a center point for testing purpose.
Timestamps are current time after random sleep (second).
GazePositions are random values.
"""
@@ -26,8 +26,6 @@ def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float,
start_time = time.time()
- last_valid_position = start_position
-
for i in range(0, size):
# Sleep a random time
@@ -43,21 +41,19 @@ def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float,
if valid:
# Edit gaze position
- random_x = last_valid_position[0] + deviation_max * (random.random() - 0.5) / math.sqrt(2)
- random_y = last_valid_position[1] + deviation_max * (random.random() - 0.5) / math.sqrt(2)
-
+ random_x = center[0] + deviation_max * (random.random() - 0.5) / math.sqrt(2)
+ random_y = center[1] + deviation_max * (random.random() - 0.5) / math.sqrt(2)
gaze_position = GazeFeatures.GazePosition((random_x, random_y))
- # Remember last valid gaze position
- last_valid_position = gaze_position.value
-
else:
- gaze_position = GazeFeatures.UnvalidGazePosition()
+ gaze_position = GazeFeatures.GazePosition()
+
+ # Timestamp gaze position
+ gaze_position.timestamp = time.time() - start_time + start_ts
# Store gaze position
- ts = time.time() - start_time + start_ts
- ts_gaze_positions[ts] = gaze_position
+ ts_gaze_positions.append(gaze_position)
return ts_gaze_positions
@@ -91,11 +87,13 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl
else:
- gaze_position = GazeFeatures.UnvalidGazePosition()
+ gaze_position = GazeFeatures.GazePosition()
+
+ # Timestamp gaze position
+ gaze_position.timestamp = time.time() - start_time + start_ts
# Store gaze position
- ts = time.time() - start_time + start_ts
- ts_gaze_positions[ts] = gaze_position
+ ts_gaze_positions.append(gaze_position)
return ts_gaze_positions
@@ -122,9 +120,9 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase):
self.assertEqual(len(ts_status), size - 1)
# Check fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size - 1)
+ self.assertEqual(len(fixation), size - 1)
self.assertGreaterEqual(fixation.duration, (size - 2) * min_time)
self.assertLessEqual(fixation.duration, (size - 2) * max_time)
self.assertLessEqual(fixation.finished, True)
@@ -141,9 +139,9 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase):
velocity_max = deviation_max / min_time
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
- ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0])
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp)
- ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B)
+ ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B
gaze_movement_identifier = VelocityThresholdIdentification.GazeMovementIdentifier(velocity_max_threshold=velocity_max, duration_min_threshold=max_time*2)
ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions)
@@ -154,42 +152,36 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase):
self.assertEqual(len(ts_status), size * 2 - 1)
# Check first fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size - 1)
+ self.assertEqual(len(fixation), size - 1)
self.assertGreaterEqual(fixation.duration, (size - 2) * min_time)
self.assertLessEqual(fixation.duration, (size - 2) * max_time)
self.assertLessEqual(fixation.finished, True)
# Check first saccade
- ts, saccade = ts_saccades.pop_first()
+ saccade = ts_saccades.pop(0)
- self.assertEqual(len(saccade.positions.keys()), 2)
+ self.assertEqual(len(saccade), 2)
self.assertGreaterEqual(saccade.duration, min_time)
self.assertLessEqual(saccade.duration, max_time)
self.assertLessEqual(saccade.finished, True)
# Check that last position of a movement is equal to first position of next movement
- last_ts, last_position = fixation.positions.last
- first_ts, first_position = saccade.positions.first
-
- self.assertEqual(last_ts, first_ts)
- self.assertEqual(last_position.value, first_position.value)
+ self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp)
+ self.assertEqual(fixation[-1].value, saccade[0].value)
# Check second fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
# Check that last position of a movement is equal to first position of next movement
- last_ts, last_position = saccade.positions.last
- first_ts, first_position = fixation.positions.first
-
- self.assertEqual(last_ts, first_ts)
- self.assertEqual(last_position.value, first_position.value)
+ self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp)
+ self.assertEqual(saccade[-1].value, fixation[0].value)
def test_fixation_and_short_saccade_identification(self):
"""Test VelocityThresholdIdentification fixation and saccade identification."""
@@ -205,10 +197,10 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase):
velocity_max = deviation_max / min_time
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
- ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A.last[0])
- ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions.last[0])
+ ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A[-1].timestamp)
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions[-1].timestamp)
- ts_gaze_positions = ts_gaze_positions_A.append(ts_move_positions).append(ts_gaze_positions_B)
+ ts_gaze_positions = ts_gaze_positions_A + ts_move_positions + ts_gaze_positions_B
gaze_movement_identifier = VelocityThresholdIdentification.GazeMovementIdentifier(velocity_max_threshold=velocity_max, duration_min_threshold=max_time*2)
ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions)
@@ -219,42 +211,36 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase):
self.assertEqual(len(ts_status), 2 * size + move - 1)
# Check first fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size - 1) # BUG: NOT ALWAYS TRUE !!!
+ self.assertEqual(len(fixation), size - 1) # BUG: NOT ALWAYS TRUE !!!
self.assertGreaterEqual(fixation.duration, (size - 2) * min_time)
self.assertLessEqual(fixation.duration, (size - 2) * max_time)
self.assertLessEqual(fixation.finished, True)
# Check first saccade
- ts, saccade = ts_saccades.pop_first()
+ saccade = ts_saccades.pop(0)
- self.assertEqual(len(saccade.positions.keys()), move + 2)
+ self.assertEqual(len(saccade), move + 2)
self.assertGreaterEqual(saccade.duration, (move + 1) * min_time)
self.assertLessEqual(saccade.duration, (move + 1) * max_time)
self.assertLessEqual(saccade.finished, True)
# Check that last position of a movement is equal to first position of next movement
- last_ts, last_position = fixation.positions.last
- first_ts, first_position = saccade.positions.first
-
- self.assertEqual(last_ts, first_ts)
- self.assertEqual(last_position.value, first_position.value)
+ self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp)
+ self.assertEqual(fixation[-1].value, saccade[0].value)
# Check second fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), size)
+ self.assertEqual(len(fixation), size)
self.assertGreaterEqual(fixation.duration, (size - 1) * min_time)
self.assertLessEqual(fixation.duration, (size - 1) * max_time)
self.assertLessEqual(fixation.finished, True)
# Check that last position of a movement is equal to first position of next movement
- last_ts, last_position = saccade.positions.last
- first_ts, first_position = fixation.positions.first
-
- self.assertEqual(last_ts, first_ts)
- self.assertEqual(last_position.value, first_position.value)
+ self.assertEqual(saccade[-1], fixation[0])
+ self.assertEqual(saccade[-1].value, fixation[0].value)
def test_invalid_gaze_position(self):
"""Test VelocityThresholdIdentification fixation and saccade identification with invalid gaze position."""
@@ -278,17 +264,17 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase):
self.assertEqual(len(ts_status), len(validity)-5)
# Check first fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), 6)
+ self.assertEqual(len(fixation), 6)
self.assertGreaterEqual(fixation.duration, 5 * min_time)
self.assertLessEqual(fixation.duration, 5 * max_time)
self.assertLessEqual(fixation.finished, True)
# Check second fixation
- ts, fixation = ts_fixations.pop_first()
+ fixation = ts_fixations.pop(0)
- self.assertEqual(len(fixation.positions.keys()), 4)
+ self.assertEqual(len(fixation), 4)
self.assertGreaterEqual(fixation.duration, 3 * min_time)
self.assertLessEqual(fixation.duration, 3 * max_time)
self.assertLessEqual(fixation.finished, True)
@@ -305,34 +291,27 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase):
velocity_max = deviation_max / min_time
ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time)
- ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0])
+ ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp)
- ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B)
+ ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B
gaze_movement_identifier = VelocityThresholdIdentification.GazeMovementIdentifier(velocity_max_threshold=velocity_max, duration_min_threshold=max_time*2)
-
- # Get last ts to terminate identification on last gaze position
- last_ts, _ = ts_gaze_positions.last
# Iterate on gaze positions
- for ts, gaze_position in ts_gaze_positions.items():
+ for gaze_position in ts_gaze_positions:
- finished_gaze_movement = gaze_movement_identifier.identify(ts, gaze_position, terminate=(ts == last_ts))
+ finished_gaze_movement = gaze_movement_identifier.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == ts_gaze_positions[-1]))
# Check that last gaze position date is not equal to given gaze position date
- if finished_gaze_movement.valid:
+ if finished_gaze_movement:
- last_ts, _ = finished_gaze_movement.positions.last
-
- self.assertNotEqual(last_ts, ts)
+ self.assertNotEqual(finished_gaze_movement[-1].timestamp, gaze_position.timestamp)
# Check that last gaze position date of current movement is equal to given gaze position date
current_gaze_movement = gaze_movement_identifier.current_gaze_movement
- if current_gaze_movement.valid:
-
- last_ts, _ = current_gaze_movement.positions.last
+ if current_gaze_movement:
- self.assertEqual(last_ts, ts)
+ self.assertEqual(current_gaze_movement[-1].timestamp, gaze_position.timestamp)
if __name__ == '__main__':
diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py
index b41c7c7..035c76a 100644
--- a/src/argaze.test/GazeFeatures.py
+++ b/src/argaze.test/GazeFeatures.py
@@ -29,79 +29,79 @@ def random_gaze_positions(size, frame_dimension: tuple[float, float] = (1, 1)):
for i in range(0, size):
# Edit gaze position
- random_gaze_position = GazeFeatures.GazePosition((random.random() * frame_dimension[0], random.random() * frame_dimension[1]))
+ random_gaze_position = GazeFeatures.GazePosition((random.random() * frame_dimension[0], random.random() * frame_dimension[1]), precision=5)
- # Store gaze position
- ts_gaze_positions[time.time()] = random_gaze_position
+ # Timestamp gaze position
+ random_gaze_position.timestamp = time.time()
+
+ # Store timestamped gaze position
+ ts_gaze_positions.append(random_gaze_position)
return ts_gaze_positions
-@dataclass(frozen=True)
class TestFixation(GazeFeatures.Fixation):
"""Define basic fixation class for test."""
- def __post_init__(self):
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
+
+ super().__init__(positions, finished, message, **kwargs)
- super().__post_init__()
+ if positions:
- points = self.positions.values()
- points_x, points_y = [p[0] for p in points], [p[1] for p in points]
- points_array = numpy.column_stack([points_x, points_y])
- centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)])
+ positions_array = numpy.asarray(self.values())
+ centroid = numpy.mean(positions_array, axis=0)
- # Update frozen focus attribute using centroid
- object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1]))
+ # Update focus attribute using centroid
+ self.focus = (centroid[0], centroid[1])
-@dataclass(frozen=True)
class TestSaccade(GazeFeatures.Saccade):
"""Define basic saccade for test."""
- def __post_init__(self):
- super().__post_init__()
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
+
+ super().__init__(positions, finished, message, **kwargs)
class TestGazePositionClass(unittest.TestCase):
"""Test GazePosition class."""
-
+
def test_new(self):
"""Test GazePosition creation."""
# Check empty GazePosition
- empty_gaze_position = GazeFeatures.GazePosition()
+ empty_gaze_position = GazeFeatures.GazePosition(message="empty for test")
- self.assertEqual(empty_gaze_position.value, (0, 0))
- self.assertEqual(empty_gaze_position[0], 0)
- self.assertEqual(empty_gaze_position[1], 0)
-
- for v in empty_gaze_position:
- self.assertEqual(v, 0)
-
- self.assertEqual(empty_gaze_position.precision, 0.)
- self.assertEqual(len(empty_gaze_position), 2)
- self.assertEqual(empty_gaze_position.valid, True)
- self.assertEqual(numpy.array(empty_gaze_position).shape, (2,))
+ self.assertEqual(empty_gaze_position, ())
+ self.assertEqual(empty_gaze_position, ())
+ self.assertEqual(empty_gaze_position.precision, None)
+ self.assertEqual(empty_gaze_position.message, "empty for test")
+ self.assertEqual(len(empty_gaze_position), 0)
+ self.assertEqual(bool(empty_gaze_position), False)
+ self.assertEqual(numpy.array(empty_gaze_position).shape, (0,))
# Check integer GazePosition
int_gaze_position = GazeFeatures.GazePosition((123, 456), precision=55)
- self.assertEqual(int_gaze_position.value, (123, 456))
+ self.assertEqual(int_gaze_position, (123, 456))
self.assertEqual(int_gaze_position[0], 123)
self.assertEqual(int_gaze_position[1], 456)
+ self.assertEqual(int_gaze_position, (123, 456))
self.assertEqual(int_gaze_position.precision, 55)
self.assertEqual(len(int_gaze_position), 2)
- self.assertEqual(int_gaze_position.valid, True)
- self.assertEqual(numpy.array(empty_gaze_position).shape, (2,))
+ self.assertEqual(bool(int_gaze_position), True)
+ self.assertEqual(numpy.array(int_gaze_position).shape, (2,))
# Check float GazePosition
float_gaze_position = GazeFeatures.GazePosition((1.23, 4.56), precision=5.5)
- self.assertEqual(float_gaze_position.value, (1.23, 4.56))
+ self.assertEqual(float_gaze_position, (1.23, 4.56))
self.assertEqual(float_gaze_position[0], 1.23)
self.assertEqual(float_gaze_position[1], 4.56)
+ self.assertEqual(float_gaze_position, (1.23, 4.56))
self.assertEqual(float_gaze_position.precision, 5.5)
self.assertEqual(len(float_gaze_position), 2)
- self.assertEqual(float_gaze_position.valid, True)
- self.assertEqual(numpy.array(empty_gaze_position).shape, (2,))
-
+ self.assertEqual(bool(float_gaze_position), True)
+ self.assertEqual(numpy.array(float_gaze_position).shape, (2,))
+
def test_properties(self):
"""Test GazePosition properties cannot be modified after creation."""
@@ -110,11 +110,29 @@ class TestGazePositionClass(unittest.TestCase):
# Check that gaze position value setting fails
with self.assertRaises(AttributeError):
- gaze_position.value = (123, 456)
+ gaze_position.value = (12, 34)
+ # WARNING: gaze_position = (12, 34) is possible !!!!
+ # How to prevent this?
- self.assertNotEqual(gaze_position.value, (123, 456))
- self.assertEqual(gaze_position.value, (0, 0))
+ self.assertNotEqual(gaze_position, (12, 34))
+ self.assertEqual(gaze_position, ())
+ # Check that gaze position precision setting fails
+ with self.assertRaises(AttributeError):
+
+ gaze_position.precision = 5
+
+ self.assertNotEqual(gaze_position.precision, 5)
+ self.assertEqual(gaze_position.precision, None)
+
+ # Check that gaze position message setting fails
+ with self.assertRaises(AttributeError):
+
+ gaze_position.message = "later setting"
+
+ self.assertNotEqual(gaze_position.message, "later setting")
+ self.assertEqual(gaze_position.message, None)
+
def test_overlapping(self):
"""Test GazePosition overlap method."""
@@ -133,83 +151,80 @@ class TestGazePositionClass(unittest.TestCase):
self.assertFalse(gaze_position_C.overlap(gaze_position_A))
self.assertFalse(gaze_position_C.overlap(gaze_position_B))
-
+
def test___repr__(self):
"""Test GazePosition string representation."""
# Check empty GazePosition representation
- self.assertEqual(repr(GazeFeatures.GazePosition()), "{\"value\": [0, 0], \"precision\": 0.0}")
+ self.assertEqual(repr(GazeFeatures.GazePosition()), "{\"value\": [], \"timestamp\": NaN}")
-class TestUnvalidGazePositionClass(unittest.TestCase):
- """Test UnvalidGazePosition class."""
+ # Check GazePosition representation
+ self.assertEqual(repr(GazeFeatures.GazePosition((12, 345), precision=50, message="ok")), \
+ "{\"value\": [12, 345], \"precision\": 50, \"message\": \"ok\", \"timestamp\": NaN}")
+class TestTimeStampedGazePositionsClass(unittest.TestCase):
+ """Test TimeStampedGazePositions class."""
+
def test_new(self):
- """Test UnvalidGazePosition creation."""
-
- import math
-
- unvalid_gaze_position = GazeFeatures.UnvalidGazePosition()
-
- self.assertEqual(unvalid_gaze_position.value, (None, None))
- self.assertEqual(unvalid_gaze_position.precision, None)
- self.assertEqual(unvalid_gaze_position.valid, False)
+ """Test TimeStampedGazePositions creation."""
- def test___repr__(self):
- """Test UnvalidGazePosition string representation."""
-
- self.assertEqual(repr(GazeFeatures.UnvalidGazePosition()), "{\"message\": null, \"value\": [null, null], \"precision\": null}")
+ empty_gaze_position = GazeFeatures.GazePosition()
+ empty_gaze_position.timestamp = 100
-class TestTimeStampedGazePositionsClass(unittest.TestCase):
- """Test TimeStampedGazePositions class."""
+ gaze_position_with_message = GazeFeatures.GazePosition((12, 345), message="second position")
+ gaze_position_with_message.timestamp = 200
- def test___setitem__(self):
- """Test __setitem__ method."""
+ dict_gaze_position = {"timestamp": 300, "value": (0, 0), "precision": 0.}
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
- ts_gaze_positions[0] = GazeFeatures.GazePosition()
- ts_gaze_positions[1] = GazeFeatures.UnvalidGazePosition()
- ts_gaze_positions[2] = {"value": (0, 0), "precision": 0.}
+ ts_gaze_positions = GazeFeatures.TimeStampedGazePositions([empty_gaze_position, gaze_position_with_message, dict_gaze_position])
- # Check GazePosition is correctly stored and accessible as a GazePosition
+ # Check first GazePosition is correctly stored and accessible as a GazePosition
self.assertIsInstance(ts_gaze_positions[0], GazeFeatures.GazePosition)
- self.assertEqual(ts_gaze_positions[0].valid, True)
+ self.assertEqual(bool(ts_gaze_positions[0]), False)
+ self.assertEqual(ts_gaze_positions[0].timestamp, 100)
- # Check UnvalidGazePosition is correctly stored and accessible as a UnvalidGazePosition
- self.assertIsInstance(ts_gaze_positions[1], GazeFeatures.UnvalidGazePosition)
- self.assertEqual(ts_gaze_positions[1].valid, False)
+ # Check second GazePosition is correctly stored and accessible as a GazePosition
+ self.assertIsInstance(ts_gaze_positions[1], GazeFeatures.GazePosition)
+ self.assertEqual(bool(ts_gaze_positions[1]), True)
+ self.assertEqual(ts_gaze_positions[1].timestamp, 200)
- # Check dict with "value" and "precision" keys is correctly stored and accessible as a GazePosition
+ # Check third GazePosition from dict is correctly stored and accessible as a GazePosition
self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition)
- self.assertEqual(ts_gaze_positions[2].valid, True)
+ self.assertEqual(bool(ts_gaze_positions[2]), True)
+ self.assertEqual(ts_gaze_positions[2].timestamp, 300)
# Check that bad data type insertion fails
- with self.assertRaises(AssertionError):
+ with self.assertRaises(TypeError):
- ts_gaze_positions[3] = "This string is not a gaze position value."
+ ts_gaze_positions.append("This string is not a gaze position value.")
# Check that dict with bad keys insertion fails
- with self.assertRaises(AssertionError):
+ with self.assertRaises(TypeError):
- ts_gaze_positions[4] = {"bad_key": (0, 0), "precision": 0.}
+ ts_gaze_positions.append({"bad_key": (0, 0), "precision": 0.})
# Check final lenght
self.assertEqual(len(ts_gaze_positions), 3)
-
+
def test___repr__(self):
"""Test inherited string representation."""
ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
- self.assertEqual(repr(GazeFeatures.TimeStampedGazePositions()), "{}")
+ self.assertEqual(repr(GazeFeatures.TimeStampedGazePositions()), "[]")
- ts_gaze_positions[0] = GazeFeatures.GazePosition()
-
- self.assertEqual(repr(ts_gaze_positions), "{\"0\": {\"value\": [0, 0], \"precision\": 0.0}}")
+ empty_gaze_position = GazeFeatures.GazePosition()
+ empty_gaze_position.timestamp = 100
+ ts_gaze_positions.append(empty_gaze_position)
- ts_gaze_positions[0] = GazeFeatures.UnvalidGazePosition()
+ self.assertEqual(repr(ts_gaze_positions), "[{\"value\": [], \"timestamp\": 100}]")
- self.assertEqual(repr(ts_gaze_positions), "{\"0\": {\"message\": null, \"value\": [null, null], \"precision\": null}}")
+ full_gaze_position = GazeFeatures.GazePosition((12, 345), precision=50, message="ok")
+ full_gaze_position.timestamp = 200
+ ts_gaze_positions[0] = full_gaze_position
+ self.assertEqual(repr(ts_gaze_positions), "[{\"value\": [12, 345], \"precision\": 50, \"message\": \"ok\", \"timestamp\": 200}]")
+
def test_from_dataframe(self):
"""Test from_dataframe classmethod."""
@@ -226,13 +241,13 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase):
# Check first gaze position is correctly stored and accessible as a GazePosition
self.assertIsInstance(ts_gaze_positions[0], GazeFeatures.GazePosition)
- self.assertEqual(ts_gaze_positions[0].precision, 0)
- self.assertEqual(ts_gaze_positions[0].valid, True)
+ self.assertEqual(ts_gaze_positions[0].precision, None)
+ self.assertEqual(bool(ts_gaze_positions[0]), True)
- # Check third gaze position is correctly stored and accessible as a UnvalidGazePosition
- self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.UnvalidGazePosition)
+ # Check third gaze position is correctly stored and accessible as a GazePosition
+ self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition)
self.assertEqual(ts_gaze_positions[2].precision, None)
- self.assertEqual(ts_gaze_positions[2].valid, False)
+ self.assertEqual(bool(ts_gaze_positions[2]), False)
data = {'Specific timestamp label': [0, 1, 2, 3, 4],
'Specific gaze position x label': [0, 10, numpy.nan, 30, 40],
@@ -249,12 +264,12 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase):
# Check first gaze position is correctly stored and accessible as a GazePosition
self.assertIsInstance(ts_gaze_positions[0], GazeFeatures.GazePosition)
self.assertEqual(ts_gaze_positions[0].precision, 15)
- self.assertEqual(ts_gaze_positions[0].valid, True)
+ self.assertEqual(bool(ts_gaze_positions[0]), True)
- # Check third gaze position is correctly stored and accessible as a UnvalidGazePosition
- self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.UnvalidGazePosition)
- self.assertEqual(ts_gaze_positions[2].precision, None)
- self.assertEqual(ts_gaze_positions[2].valid, False)
+ # Check third gaze position is correctly stored and accessible as a GazePosition
+ self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition)
+ self.assertEqual(numpy.isnan(ts_gaze_positions[2].precision), True)
+ self.assertEqual(bool(ts_gaze_positions[2]), False)
def test_as_dataframe(self):
"""Test inherited as_dataframe method."""
@@ -265,27 +280,33 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase):
self.assertEqual(ts_gaze_positions_dataframe.index.name, "timestamp")
self.assertEqual(ts_gaze_positions_dataframe.index.size, 10)
- self.assertEqual(ts_gaze_positions_dataframe.columns.size, 2)
+ self.assertEqual(ts_gaze_positions_dataframe.columns.size, 3)
self.assertEqual(ts_gaze_positions_dataframe.columns[0], "value")
self.assertEqual(ts_gaze_positions_dataframe.columns[1], "precision")
+ self.assertEqual(ts_gaze_positions_dataframe.columns[2], "message")
self.assertEqual(ts_gaze_positions_dataframe["value"].dtype, 'object')
- self.assertEqual(ts_gaze_positions_dataframe["precision"].dtype, 'float64')
+ self.assertEqual(ts_gaze_positions_dataframe["precision"].dtype, 'int64')
+ self.assertEqual(ts_gaze_positions_dataframe["message"].dtype, 'O') # Python object type
- # Check unvalid position conversion
- ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
- ts_gaze_positions[0] = GazeFeatures.UnvalidGazePosition()
+ # Check empty position conversion
+ empty_gaze_position = GazeFeatures.GazePosition()
+ empty_gaze_position.timestamp = 100
+
+ ts_gaze_positions = GazeFeatures.TimeStampedGazePositions([empty_gaze_position])
ts_gaze_positions_dataframe = ts_gaze_positions.as_dataframe()
self.assertEqual(ts_gaze_positions_dataframe.index.name, "timestamp")
self.assertEqual(ts_gaze_positions_dataframe.index.size, 1)
- self.assertEqual(ts_gaze_positions_dataframe.columns.size, 2)
+ self.assertEqual(ts_gaze_positions_dataframe.columns.size, 3)
self.assertEqual(ts_gaze_positions_dataframe.columns[0], "value")
self.assertEqual(ts_gaze_positions_dataframe.columns[1], "precision")
+ self.assertEqual(ts_gaze_positions_dataframe.columns[2], "message")
self.assertEqual(ts_gaze_positions_dataframe["value"].dtype, 'object')
self.assertEqual(ts_gaze_positions_dataframe["precision"].dtype, 'O') # Python object type
+ self.assertEqual(ts_gaze_positions_dataframe["message"].dtype, 'O') # Python object type
class TestGazeMovementClass(unittest.TestCase):
"""Test GazeMovement class."""
@@ -296,10 +317,10 @@ class TestGazeMovementClass(unittest.TestCase):
abstract_gaze_movement = GazeFeatures.GazeMovement(random_gaze_positions(0))
# Check abstract GazeMovement
- self.assertEqual(len(abstract_gaze_movement.positions), 0)
- self.assertEqual(abstract_gaze_movement.duration, -1)
- self.assertEqual(abstract_gaze_movement.amplitude, -1)
- self.assertEqual(abstract_gaze_movement.valid, False)
+ self.assertEqual(len(abstract_gaze_movement), 0)
+ self.assertEqual(abstract_gaze_movement.duration, 0)
+ self.assertEqual(abstract_gaze_movement.amplitude, 0)
+ self.assertEqual(bool(abstract_gaze_movement), False)
self.assertEqual(abstract_gaze_movement.finished, False)
def test_finish(self):
@@ -320,25 +341,22 @@ class TestGazeMovementClass(unittest.TestCase):
self.assertEqual(abstract_gaze_movement.finished, True)
self.assertEqual(abstract_gaze_movement_ref.finished, True)
-class TestUnvalidGazeMovementClass(unittest.TestCase):
- """Test UnvalidGazeMovement class."""
-
- def test_new(self):
- """Test UnvalidGazeMovement creation."""
+ def test_message(self):
+ """Test GazeMovement creation with message only."""
- unvalid_gaze_movement = GazeFeatures.UnvalidGazeMovement('test')
+ gaze_movement = GazeFeatures.GazeMovement(message='test')
- # Check UnvalidGazeMovement
- self.assertEqual(len(unvalid_gaze_movement.positions), 0)
- self.assertEqual(unvalid_gaze_movement.duration, -1)
- self.assertEqual(unvalid_gaze_movement.amplitude, -1)
- self.assertEqual(unvalid_gaze_movement.valid, False)
- self.assertEqual(unvalid_gaze_movement.finished, False)
- self.assertEqual(unvalid_gaze_movement.message, 'test')
+ # Check GazeMovement
+ self.assertEqual(len(gaze_movement), 0)
+ self.assertEqual(gaze_movement.duration, 0)
+ self.assertEqual(gaze_movement.amplitude, 0)
+ self.assertEqual(bool(gaze_movement), False)
+ self.assertEqual(gaze_movement.finished, False)
+ self.assertEqual(gaze_movement.message, 'test')
class TestScanStepClass(unittest.TestCase):
"""Test ScanStep class."""
-
+
def test_new(self):
"""Test ScanStep creation."""
@@ -351,7 +369,7 @@ class TestScanStepClass(unittest.TestCase):
self.assertEqual(scan_step.first_fixation, fixation)
self.assertEqual(scan_step.last_saccade, saccade)
self.assertGreater(scan_step.duration, 0)
-
+
def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)):
"""Build scan path"""
@@ -359,19 +377,17 @@ def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)):
for i in range(size):
- fixation = TestFixation(random_gaze_positions(10, frame_dimension))
- ts, _ = fixation.positions.first
- scan_path.append_fixation(ts, fixation)
+ fixation = TestFixation(random_gaze_positions(10, frame_dimension), timestamp=i)
+ scan_path.append_fixation(fixation)
- saccade = TestSaccade(random_gaze_positions(2, frame_dimension))
- ts, _ = saccade.positions.first
- scan_path.append_saccade(ts, saccade)
+ saccade = TestSaccade(random_gaze_positions(2, frame_dimension), timestamp=i+1)
+ scan_path.append_saccade(saccade)
return scan_path
class TestScanPathClass(unittest.TestCase):
"""Test ScanPath class."""
-
+
def test_new(self):
"""Test ScanPath creation."""
@@ -380,7 +396,7 @@ class TestScanPathClass(unittest.TestCase):
self.assertEqual(len(scan_path), 0)
self.assertEqual(scan_path.duration, 0)
-
+
def test_append(self):
"""Test ScanPath append methods."""
@@ -388,9 +404,7 @@ class TestScanPathClass(unittest.TestCase):
# Append a saccade that should be ignored
saccade = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade.positions.first
-
- new_step = scan_path.append_saccade(ts, saccade)
+ new_step = scan_path.append_saccade(saccade)
# Check that no scan step have been created yet
self.assertEqual(len(scan_path), 0)
@@ -399,9 +413,7 @@ class TestScanPathClass(unittest.TestCase):
# Append first fixation
fixation_A = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_A.positions.first
-
- new_step = scan_path.append_fixation(ts, fixation_A)
+ new_step = scan_path.append_fixation(fixation_A)
# Check that no scan step have been created yet
self.assertEqual(len(scan_path), 0)
@@ -410,9 +422,7 @@ class TestScanPathClass(unittest.TestCase):
# Append consecutive saccade
saccade_A = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade_A.positions.first
-
- new_step_A = scan_path.append_saccade(ts, saccade_A)
+ new_step_A = scan_path.append_saccade(saccade_A)
# Check that new scan step have been created
self.assertEqual(len(scan_path), 1)
@@ -423,9 +433,7 @@ class TestScanPathClass(unittest.TestCase):
# Append 2 consecutive fixations then a saccade
fixation_B1 = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_B1.positions.first
-
- new_step = scan_path.append_fixation(ts, fixation_B1)
+ new_step = scan_path.append_fixation(fixation_B1)
# Check that no scan step have been created yet
self.assertEqual(len(scan_path), 1)
@@ -433,9 +441,7 @@ class TestScanPathClass(unittest.TestCase):
self.assertEqual(new_step, None)
fixation_B2 = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_B2.positions.first
-
- new_step = scan_path.append_fixation(ts, fixation_B2)
+ new_step = scan_path.append_fixation(fixation_B2)
# Check that no scan step have been created yet
self.assertEqual(len(scan_path), 1)
@@ -443,9 +449,7 @@ class TestScanPathClass(unittest.TestCase):
self.assertEqual(new_step, None)
saccade_B = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade_B.positions.first
-
- new_step_B = scan_path.append_saccade(ts, saccade_B)
+ new_step_B = scan_path.append_saccade(saccade_B)
# Check that new scan step have been created
self.assertEqual(len(scan_path), 2)
@@ -456,19 +460,19 @@ class TestScanPathClass(unittest.TestCase):
class TestAOIScanStepClass(unittest.TestCase):
"""Test AOIScanStep class."""
-
+
def test_new(self):
"""Test AOIScanStep creation."""
movements = GazeFeatures.TimeStampedGazeMovements()
fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
- movements[ts] = fixation
+ ts = fixation[0].timestamp
+ movements.append(fixation)
saccade = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade.positions.first
- movements[ts] = saccade
+ ts = saccade[0].timestamp
+ movements.append(saccade)
aoi_scan_step = GazeFeatures.AOIScanStep(movements, 'Test')
@@ -478,19 +482,19 @@ class TestAOIScanStepClass(unittest.TestCase):
self.assertEqual(aoi_scan_step.first_fixation, fixation)
self.assertEqual(aoi_scan_step.last_saccade, saccade)
self.assertGreater(aoi_scan_step.duration, 0)
-
+
def test_error(self):
"""Test AOIScanStep creation error."""
movements = GazeFeatures.TimeStampedGazeMovements()
saccade = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade.positions.first
- movements[ts] = saccade
+ ts = saccade[0].timestamp
+ movements.append(saccade)
fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
- movements[ts] = fixation
+ ts = fixation[0].timestamp
+ movements.append(fixation)
# Check that aoi scan step creation fail
with self.assertRaises(GazeFeatures.AOIScanStepError):
@@ -505,21 +509,19 @@ def build_aoi_scan_path(expected_aoi, aoi_path):
# Append a hidden last step to allow last given step creation
aoi_path.append(aoi_path[-2])
- for aoi in aoi_path:
+ for i, aoi in enumerate(aoi_path):
- fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
- aoi_scan_path.append_fixation(ts, fixation, aoi)
+ fixation = TestFixation(random_gaze_positions(10), timestamp=i)
+ aoi_scan_path.append_fixation(fixation, aoi)
- saccade = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade.positions.first
- aoi_scan_path.append_saccade(ts, saccade)
+ saccade = TestSaccade(random_gaze_positions(2), timestamp=i+1)
+ aoi_scan_path.append_saccade(saccade)
return aoi_scan_path
class TestAOIScanPathClass(unittest.TestCase):
"""Test AOIScanPath class."""
-
+
def test_new(self):
"""Test AOIScanPath creation."""
@@ -527,7 +529,7 @@ class TestAOIScanPathClass(unittest.TestCase):
aoi_scan_path = GazeFeatures.AOIScanPath()
self.assertEqual(len(aoi_scan_path), 0)
-
+
def test_append(self):
"""Test AOIScanPath append methods."""
@@ -535,9 +537,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append fixation on A aoi
fixation_A = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_A.positions.first
-
- new_step = aoi_scan_path.append_fixation(ts, fixation_A, 'Foo')
+ new_step = aoi_scan_path.append_fixation(fixation_A, 'Foo')
# Check that no aoi scan step have been created yet
self.assertEqual(len(aoi_scan_path), 0)
@@ -546,9 +546,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append saccade
saccade_A = TestSaccade(random_gaze_positions(2))
- ts, _ = saccade_A.positions.first
-
- new_step = aoi_scan_path.append_saccade(ts, saccade_A)
+ new_step = aoi_scan_path.append_saccade(saccade_A)
# Check that no aoi scan step have been created yet
self.assertEqual(len(aoi_scan_path), 0)
@@ -557,9 +555,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append fixation on B aoi
fixation_B = TestFixation(random_gaze_positions(10))
- ts, _ = fixation_B.positions.first
-
- new_step_A = aoi_scan_path.append_fixation(ts, fixation_B, 'Bar')
+ new_step_A = aoi_scan_path.append_fixation(fixation_B, 'Bar')
# Check a first aoi scan step have been created once a new fixation is appened
self.assertEqual(len(aoi_scan_path), 1)
@@ -568,14 +564,11 @@ class TestAOIScanPathClass(unittest.TestCase):
self.assertEqual(new_step_A.aoi, 'Foo')
self.assertEqual(new_step_A.letter, 'A')
- first_ts, _ = fixation_A.positions.first
- last_ts, _ = saccade_A.positions.last
-
- self.assertEqual(new_step_A.duration, last_ts - first_ts)
+ self.assertEqual(new_step_A.duration, saccade_A[-1].timestamp - fixation_A[0].timestamp)
# Check letter affectation
self.assertEqual(aoi_scan_path.get_letter_aoi('A'), 'Foo')
-
+
def test_append_error(self):
"""Test AOIScanPath append error."""
@@ -583,9 +576,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append fixation on A aoi
fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
-
- new_step = aoi_scan_path.append_fixation(ts, fixation, 'Foo')
+ new_step = aoi_scan_path.append_fixation(fixation, 'Foo')
# Check that no aoi scan step have been created yet
self.assertEqual(len(aoi_scan_path), 0)
@@ -593,18 +584,17 @@ class TestAOIScanPathClass(unittest.TestCase):
# Append fixation on B aoi
fixation = TestFixation(random_gaze_positions(10))
- ts, _ = fixation.positions.first
# Check that aoi scan step creation fail when fixation is appened after another fixation
with self.assertRaises(GazeFeatures.AOIScanStepError):
- new_step = aoi_scan_path.append_fixation(ts, fixation, 'Bar')
+ new_step = aoi_scan_path.append_fixation(fixation, 'Bar')
# Check that unexpected aoi scan step creation fail
with self.assertRaises(GazeFeatures.AOIScanStepError):
- new_step = aoi_scan_path.append_fixation(ts, fixation, 'Shu')
-
+ new_step = aoi_scan_path.append_fixation(fixation, 'Shu')
+
def test_letter_index_and_string_reprentation(self):
"""Test AOIScanPath letter index and string representation feature."""
@@ -633,7 +623,7 @@ class TestAOIScanPathClass(unittest.TestCase):
# Check letter sequence representation
self.assertEqual(aoi_scan_path.letter_sequence, 'ABCA')
-
+
def test_transition_matrix(self):
"""Test AOIScanPath transition matrix feature."""
@@ -652,7 +642,7 @@ class TestAOIScanPathClass(unittest.TestCase):
self.assertEqual(aoi_scan_path.transition_matrix['Shu']['Foo'], 0)
self.assertEqual(aoi_scan_path.transition_matrix['Shu']['Bar'], 1)
-
+
def test_transition_matrix(self):
"""Test AOIScanPath fixations count feature."""
diff --git a/src/argaze.test/PupillFeatures.py b/src/argaze.test/PupillFeatures.py
index f0e8e1b..9cf26eb 100644
--- a/src/argaze.test/PupillFeatures.py
+++ b/src/argaze.test/PupillFeatures.py
@@ -8,6 +8,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "BSD"
import unittest
+import math
from argaze import PupillFeatures
@@ -43,14 +44,12 @@ class TestPupillDiameterClass(unittest.TestCase):
# Check empty PupillDiameter
empty_pupill_diameter = PupillFeatures.PupillDiameter()
- self.assertEqual(empty_pupill_diameter.value, 0.)
- self.assertEqual(empty_pupill_diameter.valid, False)
+ self.assertEqual(empty_pupill_diameter, math.nan)
# Check float PupillDiameter
float_pupill_diameter = PupillFeatures.PupillDiameter(1.23)
- self.assertEqual(float_pupill_diameter.value, 1.23)
- self.assertEqual(float_pupill_diameter.valid, True)
+ self.assertEqual(float_pupill_diameter, 1.23)
def test_properties(self):
"""Test PupillDiameter properties cannot be modified after creation."""
@@ -60,32 +59,16 @@ class TestPupillDiameterClass(unittest.TestCase):
# Check that pupill diameter value setting fails
with self.assertRaises(AttributeError):
- pupill_diameter.value = 123
+ pupill_diameter = 123
- self.assertNotEqual(pupill_diameter.value, 123)
- self.assertEqual(pupill_diameter.value, 0.)
+ self.assertNotEqual(pupill_diameter, 123)
+ self.assertEqual(pupill_diameter, math.nan)
def test___repr__(self):
"""Test PupillDiameter string representation."""
# Check empty PupillDiameter representation
- self.assertEqual(repr(PupillFeatures.PupillDiameter()), "{\"value\": 0.0}")
-
-class TestUnvalidPupillDiameterClass(unittest.TestCase):
- """Test UnvalidPupillDiameter class."""
-
- def test_new(self):
- """Test UnvalidPupillDiameter creation."""
-
- unvalid_pupill_diameter = PupillFeatures.UnvalidPupillDiameter()
-
- self.assertEqual(unvalid_pupill_diameter.value, 0.)
- self.assertEqual(unvalid_pupill_diameter.valid, False)
-
- def test___repr__(self):
- """Test UnvalidPupillDiameter string representation."""
-
- self.assertEqual(repr(PupillFeatures.UnvalidPupillDiameter()), "{\"message\": null, \"value\": 0.0}")
+ self.assertEqual(repr(PupillFeatures.PupillDiameter()), "{\"value\": NaN}")
class TestTimeStampedPupillDiametersClass(unittest.TestCase):
"""Test TimeStampedPupillDiameters class."""
@@ -93,22 +76,23 @@ class TestTimeStampedPupillDiametersClass(unittest.TestCase):
def test___setitem__(self):
"""Test __setitem__ method."""
- ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters()
- ts_pupill_diameters[0] = PupillFeatures.PupillDiameter()
- ts_pupill_diameters[1] = PupillFeatures.UnvalidPupillDiameter()
- ts_pupill_diameters[2] = {"value": 1.23}
+ ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters([
+ PupillFeatures.PupillDiameter(),
+ PupillFeatures.PupillDiameter(0.63),
+ {"value": 1.23}
+ ])
- # Check PupillDiameter is correctly stored and accessible as a PupillDiameter
+ # Check empty PupillDiameter is correctly stored and accessible as a PupillDiameter
self.assertIsInstance(ts_pupill_diameters[0], PupillFeatures.PupillDiameter)
- self.assertEqual(ts_pupill_diameters[0].valid, False)
+ self.assertEqual(ts_pupill_diameters[0], math.nan)
- # Check UnvalidPupillDiameter is correctly stored and accessible as a UnvalidPupillDiameter
- self.assertIsInstance(ts_pupill_diameters[1], PupillFeatures.UnvalidPupillDiameter)
- self.assertEqual(ts_pupill_diameters[1].valid, False)
+ # Check PupillDiameter is correctly stored and accessible as a PupillDiameter
+ self.assertIsInstance(ts_pupill_diameters[1], PupillFeatures.PupillDiameter)
+ self.assertEqual(ts_pupill_diameters[0], 0.63)
- # Check dict with "value" and "precision" keys is correctly stored and accessible as a PupillDiameter
+ # Check dict with "value" key is correctly stored and accessible as a PupillDiameter
self.assertIsInstance(ts_pupill_diameters[2], PupillFeatures.PupillDiameter)
- self.assertEqual(ts_pupill_diameters[2].valid, True)
+ self.assertEqual(ts_pupill_diameters[0], 1.23)
# Check that bad data type insertion fails
with self.assertRaises(AssertionError):
@@ -125,11 +109,11 @@ class TestTimeStampedPupillDiametersClass(unittest.TestCase):
ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters()
- self.assertEqual(repr(PupillFeatures.TimeStampedPupillDiameters()), "{}")
+ self.assertEqual(repr(PupillFeatures.TimeStampedPupillDiameters()), "[]")
- ts_pupill_diameters[0] = PupillFeatures.PupillDiameter()
+ ts_pupill_diameters.append(PupillFeatures.PupillDiameter())
- self.assertEqual(repr(ts_pupill_diameters), "{\"0\": {\"value\": 0.0}}")
+ self.assertEqual(repr(ts_pupill_diameters), "[{\"value\": NaN, \"timestamp\": 0}]")
ts_pupill_diameters[0] = PupillFeatures.UnvalidPupillDiameter()
diff --git a/src/argaze.test/utils/ts_buffer.json b/src/argaze.test/utils/ts_buffer.json
deleted file mode 100644
index b1433de..0000000
--- a/src/argaze.test/utils/ts_buffer.json
+++ /dev/null
@@ -1 +0,0 @@
-{"0":{"timestamp":0,"int_value":1,"float_value":0.000001,"string_value":"A","list_value":[[0,0],[0,1],[1,1],[1,0]],"tuple_value":[0,0],"nan_value":1.0,"json_value":{"0":"A","1":"B","2":"C","3":"D"}},"1":{"timestamp":1,"int_value":2,"float_value":0.000002,"string_value":"B","list_value":[[0,0],[0,2],[2,2],[2,0]],"tuple_value":[1,1],"nan_value":null,"json_value":{"0":"A","1":"B","2":"C","3":"D"}},"2":{"timestamp":2,"int_value":3,"float_value":0.000003,"string_value":"C","list_value":[[0,0],[0,3],[3,3],[3,0]],"tuple_value":[2,2],"nan_value":1.0,"json_value":{"0":"A","1":"B","2":"C","3":"D"}}} \ No newline at end of file
diff --git a/src/argaze.test/utils/ts_data_file.json b/src/argaze.test/utils/ts_data_file.json
new file mode 100644
index 0000000..d69dd77
--- /dev/null
+++ b/src/argaze.test/utils/ts_data_file.json
@@ -0,0 +1,82 @@
+[
+ {
+ "value": [
+ 0,
+ 0.04245991513702008
+ ],
+ "message": "test_0",
+ "timestamp": 1709160858.387703
+ },
+ {
+ "value": [
+ 1,
+ 0.6761490271896192
+ ],
+ "message": "test_1",
+ "timestamp": 1709160858.38784
+ },
+ {
+ "value": [
+ 2,
+ 0.046407850274610474
+ ],
+ "message": "test_2",
+ "timestamp": 1709160858.387973
+ },
+ {
+ "value": [
+ 3,
+ 0.9378067398496651
+ ],
+ "message": "test_3",
+ "timestamp": 1709160858.388105
+ },
+ {
+ "value": [
+ 4,
+ 0.4197936347606107
+ ],
+ "message": "test_4",
+ "timestamp": 1709160858.388236
+ },
+ {
+ "value": [
+ 5,
+ 0.26937423401632943
+ ],
+ "message": "test_5",
+ "timestamp": 1709160858.3883672
+ },
+ {
+ "value": [
+ 6,
+ 0.9478731386524537
+ ],
+ "message": "test_6",
+ "timestamp": 1709160858.3884978
+ },
+ {
+ "value": [
+ 7,
+ 0.39010865778889914
+ ],
+ "message": "test_7",
+ "timestamp": 1709160858.388629
+ },
+ {
+ "value": [
+ 8,
+ 0.4100480317631575
+ ],
+ "message": "test_8",
+ "timestamp": 1709160858.388763
+ },
+ {
+ "value": [
+ 9,
+ 0.5900791904864906
+ ],
+ "message": "test_9",
+ "timestamp": 1709160858.388895
+ }
+] \ No newline at end of file
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 5fcc990..abe4ed7 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -123,7 +123,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers
self.__draw_parameters = draw_parameters
- self.__gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ self.__gaze_movement = GazeFeatures.GazeMovement()
self.__looked_aoi_name = None
self.__aoi_scan_path_analyzed = False
@@ -384,7 +384,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
)
@DataFeatures.PipelineStepMethod
- def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()):
+ def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()):
"""
Project timestamped gaze movement into layer.
@@ -415,7 +415,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement)
# Valid and finished gaze movement has been identified
- if gaze_movement.valid and gaze_movement.finished:
+ if gaze_movement and gaze_movement.finished:
if GazeFeatures.is_fixation(gaze_movement):
@@ -423,7 +423,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# TODO: add an option to filter None looked_aoi_name or not
if self.__aoi_scan_path is not None:
- aoi_scan_step = self.__aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name)
+ aoi_scan_step = self.__aoi_scan_path.append_fixation(gaze_movement, self.__looked_aoi_name)
# Is there a new step?
if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1:
@@ -441,7 +441,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Append saccade to aoi scan path
if self.__aoi_scan_path is not None:
- self.__aoi_scan_path.append_saccade(timestamp, gaze_movement)
+ self.__aoi_scan_path.append_saccade(gaze_movement)
def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None):
"""
@@ -531,8 +531,8 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__layers = layers
self.__image_parameters = image_parameters
- self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition()
- self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ self.__calibrated_gaze_position = GazeFeatures.GazePosition()
+ self.__identified_gaze_movement = GazeFeatures.GazeMovement()
self.__scan_path_analyzed = False
# Edit pipeline step objects parent
@@ -875,23 +875,29 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
)
@DataFeatures.PipelineStepMethod
- def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]:
+ def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()) -> Iterator[Union[object, type, dict]]:
"""
Project timestamped gaze position into frame.
!!! warning
Be aware that gaze positions are in the same range of value than size attribute.
+ !!! note
+ This method timestamps incoming gaze position.
+
Parameters:
timestamp: method call timestamp (unit does'nt matter)
gaze_position: gaze position to project
"""
+ # Timestamp gaze position
+ gaze_position.timestamp = timestamp
+
# Use frame lock feature
with self._lock:
# No gaze movement identified by default
- self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement()
+ self.__identified_gaze_movement = GazeFeatures.GazeMovement()
# Reset scan path analyzed state
self.__scan_path_analyzed = False
@@ -908,33 +914,33 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Identify gaze movement
if self.__gaze_movement_identifier is not None:
-
+
# Identify finished gaze movement
self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position)
# Valid and finished gaze movement has been identified
- if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished:
-
+ if self.__identified_gaze_movement and self.__identified_gaze_movement.finished:
+
if GazeFeatures.is_fixation(self.__identified_gaze_movement):
-
+
# Append fixation to scan path
if self.__scan_path is not None:
-
- self.__scan_path.append_fixation(timestamp, self.__identified_gaze_movement)
+
+ self.__scan_path.append_fixation(self.__identified_gaze_movement)
elif GazeFeatures.is_saccade(self.__identified_gaze_movement):
-
+
# Append saccade to scan path
if self.__scan_path is not None:
- scan_step = self.__scan_path.append_saccade(timestamp, self.__identified_gaze_movement)
+ scan_step = self.__scan_path.append_saccade(self.__identified_gaze_movement)
# Is there a new step?
if scan_step and len(self.__scan_path) > 1:
-
+
# Analyze aoi scan path
for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items():
-
+
scan_path_analyzer.analyze(timestamp, self.__scan_path)
# Update scan path analyzed state
@@ -952,7 +958,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]])
# Update heatmap image
- self.__heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale)
+ self.__heatmap.update(timestamp, self.__calibrated_gaze_position * scale)
# Look layers with valid identified gaze movement
# Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally
@@ -1013,12 +1019,16 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Draw current fixation if required
if draw_fixations is not None and self.__gaze_movement_identifier is not None:
- self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations)
+ if self.__gaze_movement_identifier.current_fixation:
+
+ self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations)
# Draw current saccade if required
if draw_saccades is not None and self.__gaze_movement_identifier is not None:
- self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades)
+ if self.__gaze_movement_identifier.current_saccade:
+
+ self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades)
# Draw layers if required
if draw_layers is not None:
@@ -1460,16 +1470,18 @@ class ArCamera(ArFrame):
aoi_2d = camera_layer.aoi_scene[scene_frame.name]
- # TODO?: Should we prefer to use camera frame AOIMatcher object?
- if aoi_2d.contains_point(gaze_position.value):
+ if gaze_position:
+
+ # TODO?: Should we prefer to use camera frame AOIMatcher object?
+ if aoi_2d.contains_point(gaze_position):
- inner_x, inner_y = aoi_2d.clockwise().inner_axis(*gaze_position.value)
+ inner_x, inner_y = aoi_2d.clockwise().inner_axis(*gaze_position)
- # QUESTION: How to project gaze precision?
- inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y))
+ # QUESTION: How to project gaze precision?
+ inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y))
- # Project inner gaze position into scene frame
- scene_frame.look(timestamp, inner_gaze_position * scene_frame.size)
+ # Project inner gaze position into scene frame
+ scene_frame.look(timestamp, inner_gaze_position * scene_frame.size)
# Ignore missing aoi in camera frame layer projection
except KeyError as e:
diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py
index 7f30252..f6297a8 100644
--- a/src/argaze/ArUcoMarkers/ArUcoCamera.py
+++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py
@@ -187,8 +187,8 @@ class ArUcoCamera(ArFeatures.ArCamera):
pass
- # Timestamp camera frame
- self.timestamp = timestamp
+ # Timestamp camera frame
+ self.timestamp = timestamp
def __image(self, draw_detected_markers: dict = None, draw_scenes: dict = None, draw_optic_parameters_grid: dict = None, **kwargs: dict) -> numpy.array:
"""Get frame image with ArUco detection visualisation.
diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py
index 6149da4..dbecfc1 100644
--- a/src/argaze/AreaOfInterest/AOIFeatures.py
+++ b/src/argaze/AreaOfInterest/AOIFeatures.py
@@ -553,16 +553,6 @@ class AOIScene():
return output
-class TimeStampedAOIScenes(DataFeatures.TimeStampedBuffer):
- """Define timestamped buffer to store AOI scenes in time."""
-
- def __setitem__(self, ts, scene):
- """Force value to inherit from AOIScene."""
-
- assert(type(scene).__bases__[0] == AOIScene)
-
- super().__setitem__(ts, scene)
-
HeatmapType = TypeVar('Heatmap', bound="Heatmap")
# Type definition for type annotation convenience
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index edbf8e9..849601f 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -30,10 +30,10 @@ from colorama import Style, Fore
TimeStampType = TypeVar('TimeStamp', int, float)
"""Type definition for timestamp as integer or float values."""
-DataType = TypeVar('Data')
-"""Type definition for data to store anything in time."""
+TimestampedObjectType = TypeVar('TimestampedObject', bound="TimestampedObject")
+# Type definition for type annotation convenience
-TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer")
+TimestampedObjectsListType = TypeVar('TimestampedObjectsList', bound="TimestampedObjectsList")
# Type definition for type annotation convenience
def module_path(obj) -> str:
@@ -45,6 +45,39 @@ def module_path(obj) -> str:
"""
return obj.__class__.__module__
+def properties(cls) -> list:
+ """get class properties name."""
+
+ properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)]
+
+ for base in cls.__bases__:
+
+ for name, item in base.__dict__.items():
+
+ if isinstance(item, property):
+
+ properties.append(name)
+
+ return properties
+
+def as_dict(obj, filter: bool=True) -> dict:
+ """Export object as dictionary.
+
+ Parameters:
+ filter: remove None attribute values.
+ """
+ _dict = {}
+
+ for p in properties(obj.__class__):
+
+ v = getattr(obj, p)
+
+ if not filter or v is not None:
+
+ _dict[p] = v
+
+ return _dict
+
class JsonEncoder(json.JSONEncoder):
"""Specific ArGaze JSON Encoder."""
@@ -90,178 +123,132 @@ class JsonEncoder(json.JSONEncoder):
return public_dict
-class TimeStampedBuffer(collections.OrderedDict):
- """Ordered dictionary to handle timestamped data.
- ```
- {
- timestamp1: data1,
- timestamp2: data2,
- ...
- }
- ```
-
- !!! warning
-
- Timestamps must be numbers.
-
- !!! warning "Timestamps are not sorted internally"
-
- Data are considered to be stored according at their coming time.
- """
-
- def __new__(cls, args = None):
- """Inheritance"""
-
- return super(TimeStampedBuffer, cls).__new__(cls)
+class DataDictionary(dict):
+ """Enable dot.notation access to dictionary attributes"""
- def __setitem__(self, ts: TimeStampType, data: DataType):
- """Store data at given timestamp."""
+ __getattr__ = dict.get
+ __setattr__ = dict.__setitem__
+ __delattr__ = dict.__delitem__
- assert(type(ts) == int or type(ts) == float)
+class TimestampedObject():
+ """Abstract class to enable timestamp management."""
- super().__setitem__(ts, data)
+ def __init__(self, timestamp: int|float = math.nan):
+ """Initialize TimestampedObject."""
+ self._timestamp = timestamp
def __repr__(self):
- """String representation"""
-
- return json.dumps(self, ensure_ascii=False, cls=JsonEncoder)
-
- def __str__(self):
- """String representation"""
-
- return json.dumps(self, ensure_ascii=False, cls=JsonEncoder)
-
- def append(self, timestamped_buffer: TimeStampedBufferType) -> TimeStampedBufferType:
- """Append a timestamped buffer."""
-
- for ts, value in timestamped_buffer.items():
- self[ts] = value
-
- return self
+ """String representation."""
+ return json.dumps(as_dict(self))
@property
- def first(self) -> Tuple[TimeStampType, DataType]:
- """Easing access to first item."""
-
- return list(self.items())[0]
-
- def pop_first(self) -> Tuple[TimeStampType, DataType]:
- """Easing FIFO access mode."""
-
- return self.popitem(last=False)
+ def timestamp(self) -> int|float:
+ """Get object timestamp."""
+ return self._timestamp
- def pop_last_until(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]:
- """Pop all item until a given timestamped value and return the first after."""
+ @timestamp.setter
+ def timestamp(self, timestamp: int|float):
+ """Set object timestamp."""
+ self._timestamp = timestamp
- # get last item before given timestamp
- earliest_ts, earliest_value = self.get_last_until(ts)
+ def untimestamp(self):
+ """Reset object timestamp."""
+ self._timestamp = math.nan
- first_ts, first_value = self.first
+ def is_timestamped(self) -> bool:
+ """Is the object timestamped?"""
+ return not math.isnan(self._timestamp)
- while first_ts < earliest_ts:
- self.pop_first()
- first_ts, first_value = self.first
-
- return first_ts, first_value
+class TimestampedObjectsList(list):
+ """Handle timestamped object into a list.
- def pop_last_before(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]:
- """Pop all item before a given timestamped value and return the last one."""
+ !!! warning "Timestamped objects are not sorted internally"
+
+ Timestamped objects are considered to be stored according at their coming time.
+ """
- # get last item before given timestamp
- earliest_ts, earliest_value = self.get_last_before(ts)
+ def __init__(self, ts_object_type: type, ts_objects: list = []):
- popep_ts, poped_value = self.pop_first()
+ self.__object_type = ts_object_type
+ self.__object_properties = properties(self.__object_type)
- while popep_ts != earliest_ts:
- popep_ts, poped_value = self.pop_first()
+ for ts_object in ts_objects:
- return popep_ts, poped_value
+ self.append(ts_object)
@property
- def last(self) -> Tuple[TimeStampType, DataType]:
- """Easing access to last item."""
+ def object_type(self):
+ """Get object type handled by the list."""
+ return self.__object_type
- return list(self.items())[-1]
+ def append(self, ts_object: TimestampedObjectType|dict):
+ """Append timestamped object."""
+
+ # Convert dict into GazePosition
+ if type(ts_object) == dict:
- def pop_last(self) -> Tuple[TimeStampType, DataType]:
- """Easing FIFO access mode."""
+ ts_object = self.__object_type.from_dict(ts_object)
- return self.popitem(last=True)
+ # Check object type
+ if type(ts_object) != self.__object_type:
- def get_first_from(self, ts) -> Tuple[TimeStampType, DataType]:
- """Retreive first item timestamp from a given timestamp value."""
+ if not issubclass(ts_object.__class__, self.__object_type):
- ts_list = list(self.keys())
- first_from_index = bisect.bisect_left(ts_list, ts)
+ raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance')
+
+ if not ts_object.is_timestamped():
- if first_from_index < len(self):
+ raise ValueError(f'object is not timestamped')
+
+ super().append(ts_object)
- first_from_ts = ts_list[first_from_index]
-
- return first_from_ts, self[first_from_ts]
+ def look_for(self, timestamp: TimeStampType) -> TimestampedObjectType:
+ """Look for object at given timestamp."""
+ for ts_object in self:
- else:
-
- raise KeyError(f'No data stored after {ts} timestamp.')
-
- def get_last_before(self, ts) -> Tuple[TimeStampType, DataType]:
- """Retreive last item timestamp before a given timestamp value."""
+ if ts_object.timestamp == timestamp:
- ts_list = list(self.keys())
- last_before_index = bisect.bisect_left(ts_list, ts) - 1
+ return ts_object
- if last_before_index >= 0:
+ def __add__(self, ts_objects: list = []) -> TimestampedObjectsListType:
+ """Append timestamped objects list."""
- last_before_ts = ts_list[last_before_index]
-
- return last_before_ts, self[last_before_ts]
-
- else:
-
- raise KeyError(f'No data stored before {ts} timestamp.')
-
- def get_last_until(self, ts) -> Tuple[TimeStampType, DataType]:
- """Retreive last item timestamp until a given timestamp value."""
+ for ts_object in ts_objects:
- ts_list = list(self.keys())
- last_until_index = bisect.bisect_right(ts_list, ts) - 1
+ self.append(ts_object)
- if last_until_index >= 0:
-
- last_until_ts = ts_list[last_until_index]
-
- return last_until_ts, self[last_until_ts]
-
- else:
-
- raise KeyError(f'No data stored until {ts} timestamp.')
-
- @classmethod
- def from_json(self, json_filepath: str) -> TimeStampedBufferType:
- """Create a TimeStampedBuffer from .json file."""
+ return self
- with open(json_filepath, encoding='utf-8') as ts_buffer_file:
+ @property
+ def duration(self):
+ """Get inferred duration from first and last timestamps."""
+ if self:
- json_buffer = json.load(ts_buffer_file)
+ return self[-1].timestamp - self[0].timestamp
- return TimeStampedBuffer({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer})
+ else:
- def to_json(self, json_filepath: str):
- """Save a TimeStampedBuffer to .json file."""
+ return 0
- with open(json_filepath, 'w', encoding='utf-8') as ts_buffer_file:
+ def timestamps(self):
+ """Get all timestamps in list."""
+ return [ts_object.timestamp for ts_object in self]
- json.dump(self, ts_buffer_file, ensure_ascii=False, cls=JsonEncoder)
+ def tuples(self) -> list:
+ """Get all timestamped objects as list of tuple."""
+ return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self]
@classmethod
- def from_dataframe(self, dataframe: pandas.DataFrame, exclude=[]) -> TimeStampedBufferType:
- """Create a TimeStampedBuffer from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)."""
+ def from_dataframe(self, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> TimestampedObjectsListType:
+ """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)."""
dataframe.drop(exclude, inplace=True, axis=True)
assert(dataframe.index.name == 'timestamp')
- return TimeStampedBuffer(dataframe.to_dict('index'))
+ object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in dataframe.to_dict('index').items()]
+
+ return TimestampedObjectsList(ts_object_type, object_list)
def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame:
"""Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
@@ -278,7 +265,7 @@ class TimeStampedBuffer(collections.OrderedDict):
Timestamps are stored as index column called 'timestamp'.
"""
- df = pandas.DataFrame.from_dict(self.values())
+ df = pandas.DataFrame(self.tuples(), columns=self.__object_properties)
# Exclude columns
df.drop(exclude, inplace=True, axis=True)
@@ -307,11 +294,104 @@ class TimeStampedBuffer(collections.OrderedDict):
df = df[splited_columns]
# Append timestamps as index column
- df['timestamp'] = self.keys()
+ df['timestamp'] = self.timestamps()
df.set_index('timestamp', inplace=True)
return df
+ @classmethod
+ def from_json(self, ts_object_type: type, json_filepath: str) -> TimestampedObjectsListType:
+ """Create a TimestampedObjectsList from .json file."""
+
+ with open(json_filepath, encoding='utf-8') as ts_objects_file:
+
+ json_ts_objects = json.load(ts_objects_file)
+
+ return TimestampedObjectsList(ts_object_type, [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects])
+
+ def to_json(self, json_filepath: str):
+ """Save a TimestampedObjectsList to .json file."""
+
+ with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file:
+
+ json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ')
+
+ def __repr__(self):
+ """String representation"""
+ return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,)
+
+ def __str__(self):
+ """String representation"""
+ return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,)
+
+ def pop_last_until(self, timestamp: TimeStampType) -> TimestampedObjectType:
+ """Pop all item until a given timestamped value and return the first after."""
+
+ # get last item before given timestamp
+ earliest_value = self.get_last_until(timestamp)
+
+ while self[0].timestamp < earliest_value.timestamp:
+
+ self.pop(0)
+
+ return self[0]
+
+ def pop_last_before(self, timestamp: TimeStampType) -> TimestampedObjectType:
+ """Pop all item before a given timestamped value and return the last one."""
+
+ # get last item before given timestamp
+ earliest_value = self.get_last_before(timestamp)
+
+ poped_value = self.pop(0)
+
+ while poped_value.timestamp != earliest_value.timestamp:
+
+ poped_value = self.pop(0)
+
+ return poped_value
+
+ def get_first_from(self, timestamp: TimeStampType) -> TimestampedObjectType:
+ """Retreive first item timestamp from a given timestamp value."""
+
+ ts_list = self.timestamps()
+ first_from_index = bisect.bisect_left(ts_list, timestamp)
+
+ if first_from_index < len(self):
+
+ return self[ts_list[first_from_index]]
+
+ else:
+
+ raise KeyError(f'No data stored after {timestamp} timestamp.')
+
+ def get_last_before(self, timestamp: TimeStampType) -> TimestampedObjectType:
+ """Retreive last item timestamp before a given timestamp value."""
+
+ ts_list = self.timestamps()
+ last_before_index = bisect.bisect_left(ts_list, timestamp) - 1
+
+ if last_before_index >= 0:
+
+ return self[ts_list[last_before_index]]
+
+ else:
+
+ raise KeyError(f'No data stored before {timestamp} timestamp.')
+
+ def get_last_until(self, timestamp: TimeStampType) -> TimestampedObjectType:
+ """Retreive last item timestamp until a given timestamp value."""
+
+ ts_list = self.timestamps()
+ last_until_index = bisect.bisect_right(ts_list, timestamp) - 1
+
+ if last_until_index >= 0:
+
+ return self[ts_list[last_until_index]]
+
+ else:
+
+ raise KeyError(f'No data stored until {timestamp} timestamp.')
+
def plot(self, names=[], colors=[], split={}, samples=None) -> list:
"""Plot as [matplotlib](https://matplotlib.org/) time chart."""
@@ -337,59 +417,16 @@ class TimeStampedBuffer(collections.OrderedDict):
return legend_patches
-class DataDictionary(dict):
- """Enable dot.notation access to dictionary attributes"""
-
- __getattr__ = dict.get
- __setattr__ = dict.__setitem__
- __delattr__ = dict.__delitem__
-
-class SharedObject():
- """Abstract class to enable multiple threads sharing and timestamp management."""
+class SharedObject(TimestampedObject):
+ """Abstract class to enable multiple threads sharing for timestamped object."""
- def __init__(self):
+ def __init__(self, timestamp: int|float = math.nan):
+ TimestampedObject.__init__(self, timestamp)
self._lock = threading.Lock()
- self._timestamp = math.nan
self._execution_times = {}
self._exceptions = {}
- @property
- def lock(self) -> threading.Lock:
- """Get shared object lock object."""
- return self._lock
-
- @property
- def timestamp(self) -> int|float:
- """Get shared object timestamp."""
- self._lock.acquire()
- timestamp = self._timestamp
- self._lock.release()
-
- return timestamp
-
- @timestamp.setter
- def timestamp(self, timestamp: int|float):
- """Set shared object timestamp."""
- self._lock.acquire()
- self._timestamp = timestamp
- self._lock.release()
-
- def untimestamp(self):
- """Reset shared object timestamp."""
- self._lock.acquire()
- self._timestamp = math.nan
- self._lock.release()
-
- @property
- def timestamped(self) -> bool:
- """Is the object timestamped?"""
- self._lock.acquire()
- timestamped = not math.isnan(self._timestamp)
- self._lock.release()
-
- return timestamped
-
class PipelineStepObject():
"""
Define class to assess pipeline step methods execution time and observe them.
@@ -715,7 +752,7 @@ def PipelineStepMethod(method):
PipelineStepMethod must have a timestamp as first argument.
"""
- def wrapper(self, timestamp, *args, unwrap: bool = False):
+ def wrapper(self, timestamp, *args, unwrap: bool = False, **kwargs):
"""Wrap pipeline step method to measure execution time.
Parameters:
@@ -725,7 +762,7 @@ def PipelineStepMethod(method):
"""
if unwrap:
- return method(self, timestamp, *args)
+ return method(self, timestamp, *args, **kwargs)
# Initialize execution time assessment
start = time.perf_counter()
@@ -735,7 +772,7 @@ def PipelineStepMethod(method):
try:
# Execute wrapped method
- result = method(self, timestamp, *args)
+ result = method(self, timestamp, *args, **kwargs)
except Exception as e:
diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py
index 62b5e9a..3849d59 100644
--- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py
+++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py
@@ -107,7 +107,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher):
self.__reset()
- elif not gaze_movement.valid:
+ elif not gaze_movement:
self.__reset()
diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
index 6f8c554..2b89cf6 100644
--- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
@@ -26,53 +26,39 @@ FixationType = TypeVar('Fixation', bound="Fixation")
SaccadeType = TypeVar('Saccade', bound="Saccade")
# Type definition for type annotation convenience
-@dataclass(frozen=True)
class Fixation(GazeFeatures.Fixation):
"""Define dispersion based fixation."""
- deviation_max: float = field(init=False)
- """Maximal gaze position distance to the centroïd."""
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
- def __post_init__(self):
+ super().__init__(positions, finished, message, **kwargs)
- super().__post_init__()
+ if positions:
- points = self.positions.values()
- points_x, points_y = [p[0] for p in points], [p[1] for p in points]
- points_array = numpy.column_stack([points_x, points_y])
- centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)])
- deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))
+ positions_array = numpy.asarray(self.values())
+ centroid = numpy.mean(positions_array, axis=0)
+ deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
- # Update frozen focus attribute using centroid
- object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1]))
+ # Set focus as positions centroid
+ self.focus = (centroid[0], centroid[1])
- # Update frozen deviation_max attribute
- object.__setattr__(self, 'deviation_max', max(deviations_array))
+ # Set deviation_max attribute
+ self.__deviation_max = deviations_array.max()
- def point_deviation(self, gaze_position) -> float:
- """Get distance of a point from the fixation's centroïd."""
-
- return numpy.sqrt((self.focus[0] - gaze_position.value[0])**2 + (self.focus[1] - gaze_position.value[1])**2)
+ @property
+ def deviation_max(self):
+ """Get fixation's maximal deviation."""
+ return self.__deviation_max
- def overlap(self, fixation) -> bool:
+ def overlap(self, fixation: FixationType) -> bool:
"""Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?"""
-
- points = fixation.positions.values()
- points_x, points_y = [p[0] for p in points], [p[1] for p in points]
- points_array = numpy.column_stack([points_x, points_y])
- centroid_array = numpy.array([self.focus[0], self.focus[1]])
- deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))
+
+ positions_array = numpy.asarray(fixation.values())
+ centroid = numpy.mean(self.focus, axis=0)
+ deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
return min(deviations_array) <= self.deviation_max
-
- def merge(self, fixation) -> FixationType:
- """Merge another fixation into this fixation."""
-
- self.positions.append(fixation.positions)
- self.__post_init__()
-
- return self
-
+
def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None):
"""Draw fixation into image.
@@ -97,12 +83,12 @@ class Fixation(GazeFeatures.Fixation):
self.draw_positions(image, **draw_positions)
-@dataclass(frozen=True)
class Saccade(GazeFeatures.Saccade):
"""Define dispersion based saccade."""
- def __post_init__(self):
- super().__post_init__()
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
+
+ super().__init__(positions, finished, message, **kwargs)
def draw(self, image: numpy.array, line_color: tuple = None):
"""Draw saccade into image.
@@ -114,12 +100,11 @@ class Saccade(GazeFeatures.Saccade):
# Draw line if required
if line_color is not None:
- _, start_position = self.positions.first
- _, last_position = self.positions.last
+ start_position = self[0]
+ last_position = self[-1]
cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2)
-@dataclass
class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
"""Implementation of the I-DT algorithm as described in:
@@ -127,37 +112,48 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
*Identifying fixations and saccades in eye-tracking protocols.*
Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA'00, 71-78).
[https://doi.org/10.1145/355017.355028](https://doi.org/10.1145/355017.355028)
+
+ Parameters:
+ deviation_max_threshold: Maximal distance allowed to consider a gaze movement as a fixation.
+ duration_min_threshold: Minimal duration allowed to consider a gaze movement as a fixation. \
+ It is also used as maximal duration allowed to wait valid gaze positions.
"""
- deviation_max_threshold: int|float
- """Maximal distance allowed to consider a gaze movement as a fixation."""
-
- duration_min_threshold: int|float
- """Minimal duration allowed to consider a gaze movement as a fixation.
- It is also used as maximal duration allowed to wait valid gaze positions."""
-
- def __post_init__(self):
+ def __init__(self, deviation_max_threshold: int|float, duration_min_threshold: int|float):
super().__init__()
+ self.__deviation_max_threshold = deviation_max_threshold
+ self.__duration_min_threshold = duration_min_threshold
+
self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
- @DataFeatures.PipelineStepMethod
- def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType:
+ @property
+ def deviation_max_threshold(self):
+ """Get identifier's deviation max threshold."""
+ return self.__deviation_max_threshold
- # Ignore non valid gaze position
- if not gaze_position.valid:
+ @property
+ def duration_min_threshold(self):
+ """Get identifier duration min threshold."""
+ return self.__duration_min_threshold
+
+ @DataFeatures.PipelineStepMethod
+ def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType:
+
+ # Ignore empty gaze position
+ if not gaze_position:
- return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish()
-
+ return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish()
+
# Check if too much time elapsed since last valid gaze position
- if len(self.__valid_positions) > 0:
+ if self.__valid_positions:
- ts_last, _ = self.__valid_positions.last
+ ts_last = self.__valid_positions[-1].timestamp
- if (ts - ts_last) > self.duration_min_threshold:
+ if (timestamp - ts_last) > self.__duration_min_threshold:
# Get last movement
last_movement = self.current_gaze_movement.finish()
@@ -168,34 +164,30 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
# Store valid gaze position
- self.__valid_positions[ts] = gaze_position
+ self.__valid_positions.append(gaze_position)
# Return last valid movement if exist
return last_movement
# Store gaze positions until a minimal duration
- self.__valid_positions[ts] = gaze_position
-
- first_ts, _ = self.__valid_positions.first
- last_ts, _ = self.__valid_positions.last
-
+ self.__valid_positions.append(gaze_position)
+
# Once the minimal duration is reached
- if last_ts - first_ts >= self.duration_min_threshold:
+ if self.__valid_positions.duration >= self.__duration_min_threshold:
# Calculate the deviation of valid gaze positions
deviation = Fixation(self.__valid_positions).deviation_max
# Valid gaze positions deviation small enough
- if deviation <= self.deviation_max_threshold:
+ if deviation <= self.__deviation_max_threshold:
- last_saccade = GazeFeatures.UnvalidGazeMovement()
+ last_saccade = GazeFeatures.GazeMovement()
# Is there saccade positions?
- if len(self.__saccade_positions) > 0:
+ if self.__saccade_positions:
# Copy oldest valid position into saccade positions
- first_ts, first_position = self.__valid_positions.first
- self.__saccade_positions[first_ts] = first_position
+ self.__saccade_positions.append(self.__valid_positions[0])
# Finish last saccade
last_saccade = self.current_saccade.finish()
@@ -212,14 +204,13 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Valid gaze positions deviation too wide
else:
- last_fixation = GazeFeatures.UnvalidGazeMovement()
+ last_fixation = GazeFeatures.GazeMovement()
# Is there fixation positions?
- if len(self.__fixation_positions) > 0:
+ if self.__fixation_positions:
# Copy most recent fixation position into saccade positions
- last_ts, last_position = self.__fixation_positions.last
- self.__saccade_positions[last_ts] = last_position
+ self.__saccade_positions.append(self.__fixation_positions[-1])
# Finish last fixation
last_fixation = self.current_fixation.finish()
@@ -231,25 +222,24 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
self.__valid_positions = GazeFeatures.TimeStampedGazePositions()
# Store current gaze position
- self.__valid_positions[ts] = gaze_position
+ self.__valid_positions.append(gaze_position)
# Output last fixation
return last_fixation if not terminate else self.current_saccade.finish()
# Move oldest valid position into saccade positions
- first_ts, first_position = self.__valid_positions.pop_first()
- self.__saccade_positions[first_ts] = first_position
+ self.__saccade_positions.append(self.__valid_positions.pop(0))
# Always return unvalid gaze movement at least
- return GazeFeatures.UnvalidGazeMovement()
+ return GazeFeatures.GazeMovement()
@property
def current_gaze_movement(self) -> GazeMovementType:
# It shouldn't have a current fixation and a current saccade at the same time
- assert(not (len(self.__fixation_positions) > 0 and len(self.__saccade_positions) > 0))
+ assert(not (self.__fixation_positions and len(self.__saccade_positions) > 1))
- if len(self.__fixation_positions) > 0:
+ if self.__fixation_positions:
return Fixation(self.__fixation_positions)
@@ -258,16 +248,16 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
return Saccade(self.__saccade_positions)
# Always return unvalid gaze movement at least
- return GazeFeatures.UnvalidGazeMovement()
+ return GazeFeatures.GazeMovement()
@property
def current_fixation(self) -> FixationType:
- if len(self.__fixation_positions) > 0:
+ if self.__fixation_positions:
return Fixation(self.__fixation_positions)
# Always return unvalid gaze movement at least
- return GazeFeatures.UnvalidGazeMovement()
+ return GazeFeatures.GazeMovement()
@property
def current_saccade(self) -> SaccadeType:
@@ -277,4 +267,4 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
return Saccade(self.__saccade_positions)
# Always return unvalid gaze movement at least
- return GazeFeatures.UnvalidGazeMovement()
+ return GazeFeatures.GazeMovement()
diff --git a/src/argaze/GazeAnalysis/LinearRegression.py b/src/argaze/GazeAnalysis/LinearRegression.py
index 414832a..717e8a3 100644
--- a/src/argaze/GazeAnalysis/LinearRegression.py
+++ b/src/argaze/GazeAnalysis/LinearRegression.py
@@ -45,8 +45,8 @@ class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator):
def store(self, timestamp: int|float, observed_gaze_position: GazeFeatures.GazePosition, expected_gaze_position: GazeFeatures.GazePosition):
"""Store observed and expected gaze positions."""
- self.__observed_positions.append(observed_gaze_position.value)
- self.__expected_positions.append(expected_gaze_position.value)
+ self.__observed_positions.append(observed_gaze_position)
+ self.__expected_positions.append(expected_gaze_position)
def reset(self):
"""Reset observed and expected gaze positions."""
@@ -78,7 +78,7 @@ class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator):
if not self.calibrating:
- return GazeFeatures.GazePosition(self.__linear_regression.predict(numpy.array([gaze_position.value]))[0], precision=gaze_position.precision)
+ return GazeFeatures.GazePosition(self.__linear_regression.predict(numpy.array([gaze_position]))[0], precision=gaze_position.precision)
else:
diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
index d246db4..a95905f 100644
--- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
@@ -25,53 +25,39 @@ FixationType = TypeVar('Fixation', bound="Fixation")
SaccadeType = TypeVar('Saccade', bound="Saccade")
# Type definition for type annotation convenience
-@dataclass(frozen=True)
class Fixation(GazeFeatures.Fixation):
"""Define dispersion based fixation."""
- deviation_max: float = field(init=False)
- """Maximal gaze position distance to the centroïd."""
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
- def __post_init__(self):
+ super().__init__(positions, finished, message, **kwargs)
- super().__post_init__()
+ if positions:
- points = self.positions.values()
- points_x, points_y = [p[0] for p in points], [p[1] for p in points]
- points_array = numpy.column_stack([points_x, points_y])
- centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)])
- deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))
+ positions_array = numpy.asarray(self.values())
+ centroid = numpy.mean(positions_array, axis=0)
+ deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
- # Update frozen focus attribute using centroid
- object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1]))
+ # Set focus as positions centroid
+ self.focus = (centroid[0], centroid[1])
- # Update frozen deviation_max attribute
- object.__setattr__(self, 'deviation_max', max(deviations_array))
+ # Set deviation_max attribute
+ self.__deviation_max = deviations_array.max()
- def point_deviation(self, gaze_position) -> float:
- """Get distance of a point from the fixation's centroïd."""
-
- return numpy.sqrt((self.centroid[0] - gaze_position.value[0])**2 + (self.centroid[1] - gaze_position.value[1])**2)
+ @property
+ def deviation_max(self):
+ """Get fixation's maximal deviation."""
+ return self.__deviation_max
- def overlap(self, fixation) -> bool:
+ def overlap(self, fixation: FixationType) -> bool:
"""Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?"""
-
- points = fixation.positions.values()
- points_x, points_y = [p[0] for p in points], [p[1] for p in points]
- points_array = numpy.column_stack([points_x, points_y])
- centroid_array = numpy.array([self.centroid[0], self.centroid[1]])
- deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))
+
+ positions_array = numpy.asarray(fixation.values())
+ centroid = numpy.mean(self.focus, axis=0)
+ deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1))
return min(deviations_array) <= self.deviation_max
-
- def merge(self, fixation) -> FixationType:
- """Merge another fixation into this fixation."""
-
- self.positions.append(fixation.positions)
- self.__post_init__()
-
- return self
-
+
def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None):
"""Draw fixation into image.
@@ -85,7 +71,7 @@ class Fixation(GazeFeatures.Fixation):
if duration_border_color is not None:
cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), duration_border_color, int(self.duration * duration_factor))
-
+
# Draw deviation circle if required
if deviation_circle_color is not None:
@@ -96,12 +82,12 @@ class Fixation(GazeFeatures.Fixation):
self.draw_positions(image, **draw_positions)
-@dataclass(frozen=True)
class Saccade(GazeFeatures.Saccade):
"""Define dispersion based saccade."""
- def __post_init__(self):
- super().__post_init__()
+ def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs):
+
+ super().__init__(positions, finished, message, **kwargs)
def draw(self, image: numpy.array, line_color: tuple = None):
"""Draw saccade into image.
@@ -113,8 +99,8 @@ class Saccade(GazeFeatures.Saccade):
# Draw line if required
if line_color is not None:
- _, start_position = self.positions.first
- _, last_position = self.positions.last
+ start_position = self[0]
+ last_position = self[-1]
cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2)
@@ -126,45 +112,56 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
saccades in eye-tracking protocols. In Proceedings of the 2000 symposium
on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA,
71-78. [http://dx.doi.org/10.1145/355017.355028](http://dx.doi.org/10.1145/355017.355028)
- """
-
- velocity_max_threshold: int|float
- """Maximal velocity allowed to consider a gaze movement as a fixation."""
- duration_min_threshold: int|float
- """Minimal duration allowed to wait valid gaze positions."""
+ Parameters:
+ velocity_max_threshold: Maximal velocity allowed to consider a gaze movement as a fixation.
+ duration_min_threshold: Minimal duration allowed to wait valid gaze positions.
+ """
- def __post_init__(self):
+ def __init__(self, velocity_max_threshold: int|float, duration_min_threshold: int|float):
super().__init__()
+ self.__velocity_max_threshold = velocity_max_threshold
+ self.__duration_min_threshold = duration_min_threshold
+
self.__last_ts = -1
self.__last_position = None
self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
+ @property
+ def velocity_max_threshold(self):
+ """Get identifier's velocity max threshold."""
+ return self.__velocity_max_threshold
+
+ @property
+ def duration_min_threshold(self):
+ """Get identifier duration min threshold."""
+ return self.__duration_min_threshold
+
@DataFeatures.PipelineStepMethod
- def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType:
+ def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType:
- # Ignore non valid gaze position
- if not gaze_position.valid:
+ # Ignore empty gaze position
+ if not gaze_position:
- return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish()
+ return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish()
# Store first valid position
if self.__last_ts < 0:
- self.__last_ts = ts
+ self.__last_ts = timestamp
self.__last_position = gaze_position
- return GazeFeatures.UnvalidGazeMovement()
+ return GazeFeatures.GazeMovement()
# Check if too much time elapsed since last gaze position
- if (ts - self.__last_ts) > self.duration_min_threshold:
+ if (timestamp - self.__last_ts) > self.duration_min_threshold:
# Remember last position
- self.__last_ts = ts
+ self.__last_ts = timestamp
self.__last_position = gaze_position
# Get last movement
@@ -178,23 +175,22 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
return last_movement
# Velocity
- velocity = abs(gaze_position.distance(self.__last_position) / (ts - self.__last_ts))
+ velocity = abs(gaze_position.distance(self.__last_position) / (timestamp - self.__last_ts))
# Remember last position
- self.__last_ts = ts
+ self.__last_ts = timestamp
self.__last_position = gaze_position
# Velocity is greater than threshold
if velocity > self.velocity_max_threshold:
- last_fixation = GazeFeatures.UnvalidGazeMovement()
+ last_fixation = GazeFeatures.GazeMovement()
# Does last fixation exist?
if len(self.__fixation_positions) > 0:
# Copy most recent fixation position into saccade positions
- last_ts, last_position = self.__fixation_positions.last
- self.__saccade_positions[last_ts] = last_position
+ self.__saccade_positions.append(self.__fixation_positions[-1])
# Create last fixation
last_fixation = self.current_fixation.finish()
@@ -203,7 +199,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
self.__fixation_positions = GazeFeatures.TimeStampedGazePositions()
# Append to saccade positions
- self.__saccade_positions[ts] = gaze_position
+ self.__saccade_positions.append(gaze_position)
# Output last fixation
return last_fixation if not terminate else self.current_saccade.finish()
@@ -211,14 +207,13 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
# Velocity is less or equals to threshold
else:
- last_saccade = GazeFeatures.UnvalidGazeMovement()
+ last_saccade = GazeFeatures.GazeMovement()
# Does last saccade exist?
- if len(self.__saccade_positions) > 0:
+ if self.__saccade_positions:
# Copy most recent saccade position into fixation positions
- last_ts, last_position = self.__saccade_positions.last
- self.__fixation_positions[last_ts] = last_position
+ self.__fixation_positions.append(self.__saccade_positions[-1])
# Create last saccade
last_saccade = self.current_saccade.finish()
@@ -227,47 +222,47 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
self.__saccade_positions = GazeFeatures.TimeStampedGazePositions()
# Append to fixation positions
- self.__fixation_positions[ts] = gaze_position
+ self.__fixation_positions.append(gaze_position)
# Output last saccade
return last_saccade if not terminate else self.current_fixation.finish()
# Always return unvalid gaze movement at least
- return GazeFeatures.UnvalidGazeMovement()
+ return GazeFeatures.GazeMovement()
@property
def current_gaze_movement(self) -> GazeMovementType:
# It shouldn't have a current fixation and a current saccade at the same time
- assert(not (len(self.__fixation_positions) > 0 and len(self.__saccade_positions) > 0))
+ assert(not (self.__fixation_positions and self.__saccade_positions))
- if len(self.__fixation_positions) > 0:
+ if self.__fixation_positions:
return Fixation(self.__fixation_positions)
- if len(self.__saccade_positions) > 0:
+ if len(self.__saccade_positions) > 1:
return Saccade(self.__saccade_positions)
# Always return unvalid gaze movement at least
- return GazeFeatures.UnvalidGazeMovement()
+ return GazeFeatures.GazeMovement()
@property
def current_fixation(self) -> FixationType:
- if len(self.__fixation_positions) > 0:
+ if self.__fixation_positions:
return Fixation(self.__fixation_positions)
# Always return unvalid gaze movement at least
- return GazeFeatures.UnvalidGazeMovement()
+ return GazeFeatures.GazeMovement()
@property
def current_saccade(self) -> SaccadeType:
- if len(self.__saccade_positions) > 0:
+ if len(self.__saccade_positions) > 1:
return Saccade(self.__saccade_positions)
# Always return unvalid gaze movement at least
- return GazeFeatures.UnvalidGazeMovement()
+ return GazeFeatures.GazeMovement()
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index cfd7419..263f793 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -17,6 +17,7 @@ from inspect import getmembers
from argaze import DataFeatures
from argaze.AreaOfInterest import AOIFeatures
+from argaze.utils import UtilsFeatures # DEBUG
import numpy
import pandas
@@ -25,57 +26,124 @@ import cv2
GazePositionType = TypeVar('GazePosition', bound="GazePosition")
# Type definition for type annotation convenience
-@dataclass(frozen=True)
-class GazePosition():
- """Define gaze position as a tuple of coordinates with precision."""
+class GazePosition(tuple, DataFeatures.TimestampedObject):
+ """Define gaze position as a tuple of coordinates with precision.
- value: tuple[int | float] = field(default=(0, 0))
- """Position's value."""
+ Parameters:
+ precision: the radius of a circle around value where other same gaze position measurements could be.
+ message: a string to describe why the the position is what it is.
+ """
- precision: float = field(default=0., kw_only=True)
- """Position's precision represents the radius of a circle around \
- this gaze position value where other same gaze position measurements could be."""
+ def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan):
- def __getitem__(self, axis: int) -> int | float:
- """Get position value along a particular axis."""
+ return tuple.__new__(cls, position)
- return self.value[axis]
+ def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan):
- def __iter__(self) -> iter:
- """Iterate over each position value axis."""
+ DataFeatures.TimestampedObject.__init__(self, timestamp)
+ self.__precision = precision
+ self.__message = message
- return iter(self.value)
-
- def __len__(self) -> int:
- """Number of axis in position value."""
+ @property
+ def value(self):
+ """Get position's tuple value."""
+ return tuple(self)
+
+ @property
+ def precision(self):
+ """Get position's precision."""
+ return self.__precision
+
+ @property
+ def message(self):
+ """Get position's message."""
+ return self.__message
+
+ @classmethod
+ def from_dict(self, position_data: dict) -> GazePositionType:
+
+ if 'value' in position_data.keys():
- return len(self.value)
+ value = position_data.pop('value')
+ return GazePosition(value, **position_data)
+
+ else:
+
+ return GazePosition(**position_data)
+
+ def __bool__(self) -> bool:
+ """Is the position value valid?"""
+ return len(self) > 0
def __repr__(self):
"""String representation"""
- return json.dumps(self, ensure_ascii = False, default=vars)
+ return json.dumps(DataFeatures.as_dict(self))
+
+ def __add__(self, position: GazePositionType) -> GazePositionType:
+ """Add position.
- def __mul__(self, value) -> GazePositionType:
- """Multiply gaze position."""
+ !!! note
+ The returned position precision is the maximal precision.
+ """
+ if self.__precision is not None and position.precision is not None:
- return GazePosition(numpy.array(self.value) * value, precision= self.precision * numpy.linalg.norm(value))
+ return GazePosition(numpy.array(self) + numpy.array(position), precision = max(self.__precision, position.precision))
- def __array__(self):
- """Cast as numpy array."""
+ else:
- return numpy.array(self.value)
+ return GazePosition(numpy.array(self) + numpy.array(position))
- @property
- def valid(self) -> bool:
- """Is the precision not None?"""
+ __radd__ = __add__
+
+ def __sub__(self, position: GazePositionType) -> GazePositionType:
+ """Substract position.
+
+ !!! note
+ The returned position precision is the maximal precision.
+ """
+ if self.__precision is not None and position.precision is not None:
+
+ return GazePosition(numpy.array(self) - numpy.array(position), precision = max(self.__precision, position.precision))
+
+ else:
+
+ return GazePosition(numpy.array(self) - numpy.array(position))
+
+ def __rsub__(self, position: GazePositionType) -> GazePositionType:
+ """Reversed substract position.
+
+ !!! note
+ The returned position precision is the maximal precision.
+ """
+ if self.__precision is not None and position.precision is not None:
+
+ return GazePosition(numpy.array(position) - numpy.array(self), precision = max(self.__precision, position.precision))
+
+ else:
- return self.precision is not None
+ return GazePosition(numpy.array(position) - numpy.array(self))
+
+ def __mul__(self, factor: int|float) -> GazePositionType:
+ """Multiply position by a factor.
+
+ !!! note
+ The returned position precision is also multiplied by the factor.
+ """
+ return GazePosition(numpy.array(self) * factor, precision = self.__precision * factor if self.__precision is not None else None)
+
+ def __pow__(self, factor: int|float) -> GazePositionType:
+ """Power position by a factor.
+
+ !!! note
+ The returned position precision is also powered by the factor.
+ """
+ return GazePosition(numpy.array(self) ** factor, precision = self.__precision ** factor if self.__precision is not None else None)
def distance(self, gaze_position) -> float:
"""Distance to another gaze positions."""
- distance = (self.value[0] - gaze_position.value[0])**2 + (self.value[1] - gaze_position.value[1])**2
+ distance = (self[0] - gaze_position[0])**2 + (self[1] - gaze_position[1])**2
distance = numpy.sqrt(distance)
return distance
@@ -84,82 +152,56 @@ class GazePosition():
"""Does this gaze position overlap another gaze position considering its precision?
Set both to True to test if the other gaze position overlaps this one too."""
- distance = (self.value[0] - gaze_position.value[0])**2 + (self.value[1] - gaze_position.value[1])**2
- distance = numpy.sqrt(distance)
+ distance = numpy.sqrt(numpy.sum((self - gaze_position)**2))
if both:
- return distance < min(self.precision, gaze_position.precision)
+ return distance < min(self.__precision, gaze_position.precision)
else:
- return distance < self.precision
+ return distance < self.__precision
def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True):
"""Draw gaze position point and precision circle."""
- if self.valid:
+ if self:
- int_value = (int(self.value[0]), int(self.value[1]))
+ int_value = (int(self[0]), int(self[1]))
# Draw point at position if required
if color is not None:
cv2.circle(image, int_value, size, color, -1)
# Draw precision circle
- if self.precision > 0 and draw_precision:
- cv2.circle(image, int_value, round(self.precision), color, 1)
-
-class UnvalidGazePosition(GazePosition):
- """Unvalid gaze position."""
-
- def __init__(self, message=None):
-
- self.message = message
-
- super().__init__((None, None), precision=None)
+ if self.__precision is not None and draw_precision:
+ cv2.circle(image, int_value, round(self.__precision), color, 1)
TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeStampedGazePositions")
# Type definition for type annotation convenience
-class TimeStampedGazePositions(DataFeatures.TimeStampedBuffer):
- """Define timestamped buffer to store gaze positions."""
-
- def __setitem__(self, key, value: GazePosition|dict):
- """Force GazePosition storage."""
-
- # Convert dict into GazePosition
- if type(value) == dict:
-
- assert(set(['value', 'precision']).issubset(value.keys()))
-
- if math.isnan(value['precision']):
-
- if 'message' in value.keys():
-
- value = UnvalidGazePosition(value['message'])
-
- else :
-
- value = UnvalidGazePosition()
-
- else:
-
- value = GazePosition(value['value'], precision=value['precision'])
+class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList):
+ """Handle timestamped gaze positions into a list."""
+
+ def __init__(self, gaze_positions: list = []):
- assert(type(value) == GazePosition or type(value) == UnvalidGazePosition)
+ DataFeatures.TimestampedObjectsList.__init__(self, GazePosition, gaze_positions)
- super().__setitem__(key, value)
+ def values(self) -> list:
+ """Get all timestamped position values as list of tuple."""
+ return [tuple(ts_position) for ts_position in self]
+ ''' Is it still needed as there is a TimestampedObjectsList.from_json method?
@classmethod
def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType:
"""Create a TimeStampedGazePositionsType from .json file."""
- with open(json_filepath, encoding='utf-8') as ts_buffer_file:
+ with open(json_filepath, encoding='utf-8') as ts_positions_file:
- json_buffer = json.load(ts_buffer_file)
+ json_positions = json.load(ts_positions_file)
- return TimeStampedGazePositions({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer})
+ return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions})
+ '''
@classmethod
- def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None) -> TimeStampedGazePositionsType:
+ def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> TimeStampedGazePositionsType:
"""Create a TimeStampedGazePositions from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Parameters:
@@ -167,40 +209,59 @@ class TimeStampedGazePositions(DataFeatures.TimeStampedBuffer):
x: specific x column label.
y: specific y column label.
precision: specific precision column label if exist.
+ message: specific message column label if exist.
"""
# Copy columns
- if precision:
+ columns = (timestamp, x, y)
- df = dataframe.loc[:, (timestamp, x, y, precision)]
+ if precision is not None:
- else:
+ columns += (precision,)
+
+ if message is not None:
+
+ columns += (message,)
- df = dataframe.loc[:, (timestamp, x, y)]
+ df = dataframe.loc[:, columns]
# Merge x and y columns into one 'value' column
df['value'] = tuple(zip(df[x], df[y]))
- df.drop(columns= [x, y], inplace=True, axis=1)
+ df.drop(columns=[x, y], inplace=True, axis=1)
+
+ # Replace tuple values containing NaN values by ()
+ df['value'] = df.apply(lambda row: () if pandas.isnull(list(row.value)).any() else row.value, axis=True)
# Handle precision data
if precision:
- # Rename precision column into 'precision' column
+ # Rename precision column into 'precision' column
df.rename(columns={precision: 'precision'}, inplace=True)
else:
- # Append a precision column where precision is NaN if value is a tuple of NaN else 0
- df['precision'] = df.apply(lambda row: numpy.nan if math.isnan(row.value[0]) or math.isnan(row.value[1]) else 0, axis=True)
+ # Append a None precision column
+ df['precision'] = df.apply(lambda row: None, axis=True)
+
+ # Handle message data
+ if message:
+
+ # Rename message column into 'message' column
+ df.rename(columns={precision: 'message'}, inplace=True)
+
+ else:
- # Rename timestamp column into 'timestamp' column then use it as index
+ # Append a None message column
+ df['message'] = df.apply(lambda row: None, axis=True)
+
+ # Rename timestamp column into 'timestamp' column
df.rename(columns={timestamp: 'timestamp'}, inplace=True)
- df.set_index('timestamp', inplace=True)
# Filter duplicate timestamps
- df = df[df.index.duplicated() == False]
+ df = df[df.timestamp.duplicated() == False]
- return TimeStampedGazePositions(df.to_dict('index'))
+ # Create timestamped gaze positions
+ return TimeStampedGazePositions(df.apply(lambda row: GazePosition(row.value, precision=row.precision, message=row.message, timestamp=row.timestamp), axis=True))
class GazePositionCalibrationFailed(Exception):
"""Exception raised by GazePositionCalibrator."""
@@ -299,58 +360,81 @@ class GazePositionCalibrator():
GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement")
# Type definition for type annotation convenience
-@dataclass(frozen=True)
-class GazeMovement():
- """Define abstract gaze movement class as a buffer of timestamped positions."""
+class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject):
+ """Define abstract gaze movement class as timestamped gaze positions list.
- positions: TimeStampedGazePositions
- """All timestamp gaze positions."""
+ !!! note
+ Gaze movement timestamp is always equal to its first position timestamp.
- duration: float = field(init=False)
- """Inferred duration from first and last timestamps."""
+ Parameters:
+ positions: timestamp gaze positions.
+ finished: is the movement finished?
+ message: a string to describe why the movement is what it is.
+ """
- amplitude: float = field(init=False)
- """Inferred amplitude from first and last positions."""
+ def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan):
- finished: bool = field(init=False, default=False)
- """Is the movement finished?"""
+ return TimeStampedGazePositions.__new__(cls, positions)
- def __post_init__(self):
+ def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan):
+ """Initialize GazeMovement"""
- if self.valid:
+ TimeStampedGazePositions.__init__(self, positions)
+ DataFeatures.TimestampedObject.__init__(self, timestamp)
- start_position_ts, start_position = self.positions.first
- end_position_ts, end_position = self.positions.last
+ self.__finished = finished
+ self.__message = message
- # Update frozen duration attribute
- object.__setattr__(self, 'duration', end_position_ts - start_position_ts)
+ @property
+ def timestamp(self) -> int|float:
+ """Get first position timestamp."""
+ return self[0].timestamp
- _, start_position = self.positions.first
- _, end_position = self.positions.last
+ def is_timestamped(self) -> bool:
+ """If first position exist, the movement is timestamped."""
+ return bool(self)
- amplitude = numpy.linalg.norm( numpy.array(start_position.value) - numpy.array(end_position.value))
+ @timestamp.setter
+ def timestamp(self, timestamp: int|float):
+ """Block gaze movement timestamp setting."""
+ raise('GazeMovement timestamp is first positon timestamp.')
- # Update frozen amplitude attribute
- object.__setattr__(self, 'amplitude', amplitude)
+ @property
+ def finished(self) -> bool:
+ """Is the movement finished?"""
+ return self.__finished
- else:
+ def finish(self) -> GazeMovementType:
+ """Set gaze movement as finished"""
+ self.__finished = True
+ return self
- # Update frozen duration attribute
- object.__setattr__(self, 'duration', -1)
+ @property
+ def message(self):
+ """Get movement's message."""
+ return self.__message
+
+ @property
+ def amplitude(self):
+ """Get inferred amplitude from first and last positions."""
+ if self:
+
+ return numpy.linalg.norm(self[0] - self[-1])
+
+ else:
- # Update frozen amplitude attribute
- object.__setattr__(self, 'amplitude', -1)
+ return 0
def __str__(self) -> str:
"""String display"""
- if self.valid:
+ if self:
- output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self.positions)}\n\tfinished={self.finished}'
+ output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.finished}'
- for ts, position in self.positions.items():
+ for position in self:
- output += f'\n\t{ts}:\n\t\tvalue={position.value},\n\t\tprecision={position.precision}'
+ output += f'\n\t{position.timestamp}:\n\t\tvalue={position},\n\t\tprecision={position.precision}'
else:
@@ -358,20 +442,6 @@ class GazeMovement():
return output
- @property
- def valid(self) -> bool:
- """Is there positions?"""
-
- return len(self.positions) > 0
-
- def finish(self) -> GazeMovementType:
- """Set gaze movement as finished"""
-
- # Update frozen finished attribute
- object.__setattr__(self, 'finished', True)
-
- return self
-
def draw_positions(self, image: numpy.array, position_color: tuple = None, line_color: tuple = None):
"""Draw gaze movement positions with line between each position.
@@ -380,12 +450,12 @@ class GazeMovement():
line_color: color of line between each position
"""
- gaze_positions = self.positions.copy()
+ positions = self.copy()
- while len(gaze_positions) >= 2:
+ while len(positions) >= 2:
- ts_start, start_gaze_position = gaze_positions.pop_first()
- ts_next, next_gaze_position = gaze_positions.first
+ start_gaze_position = positions.pop(0)
+ next_gaze_position = positions[0]
# Draw line between positions if required
if line_color is not None:
@@ -402,31 +472,27 @@ class GazeMovement():
raise NotImplementedError('draw() method not implemented')
-class UnvalidGazeMovement(GazeMovement):
- """Unvalid gaze movement."""
-
- def __init__(self, message=None):
-
- self.message = message
-
- super().__init__(TimeStampedGazePositions())
-
- def draw(self, image: numpy.array, **kwargs):
-
- pass
-
FixationType = TypeVar('Fixation', bound="Fixation")
# Type definition for type annotation convenience
class Fixation(GazeMovement):
"""Define abstract fixation as gaze movement."""
- focus: tuple = field(init=False)
- """Representative position of the fixation."""
+ def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
+
+ super().__init__(positions, finished, message, **kwargs)
- def __post_init__(self):
+ self._focus = ()
- super().__post_init__()
+ @property
+ def focus(self) -> tuple:
+ """Get representative position of the fixation."""
+ return self._focus
+
+ @focus.setter
+ def focus(self, focus: tuple):
+ """Set representative position of the fixation."""
+ self._focus = focus
def merge(self, fixation) -> FixationType:
"""Merge another fixation into this fixation."""
@@ -441,9 +507,9 @@ def is_fixation(gaze_movement):
class Saccade(GazeMovement):
"""Define abstract saccade as gaze movement."""
- def __post_init__(self):
+ def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
- super().__post_init__()
+ super().__init__(positions, finished, message, **kwargs)
def is_saccade(gaze_movement):
"""Is a gaze movement a saccade?"""
@@ -453,59 +519,48 @@ def is_saccade(gaze_movement):
TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeStampedGazeMovements")
# Type definition for type annotation convenience
-class TimeStampedGazeMovements(DataFeatures.TimeStampedBuffer):
- """Define timestamped buffer to store gaze movements."""
-
- def __setitem__(self, key, value: GazeMovement):
- """Force value to be or inherit from GazeMovement."""
+class TimeStampedGazeMovements(DataFeatures.TimestampedObjectsList):
+ """Handle timestamped gaze movements into a list"""
- assert(isinstance(value, GazeMovement) or type(value).__bases__[0] == Fixation or type(value).__bases__[0] == Saccade)
+ def __init__(self, gaze_movements: list = []):
- super().__setitem__(key, value)
+ DataFeatures.TimestampedObjectsList.__init__(self, GazeMovement, gaze_movements)
- def __str__(self):
-
- output = ''
- for ts, item in self.items():
+GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus")
+# Type definition for type annotation convenience
- output += f'\n{item}'
+class GazeStatus(list, DataFeatures.TimestampedObject):
+ """Define gaze status as a list of 1 or 2 (index, GazeMovementType) tuples.
- return output
+ Parameters:
+ position: the position that the status represents.
+ """
-GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus")
-# Type definition for type annotation convenience
+ def __init__(self, position: GazePosition):
-@dataclass(frozen=True)
-class GazeStatus(GazePosition):
- """Define gaze status as a gaze position belonging to an identified and indexed gaze movement."""
+ DataFeatures.TimestampedObject.__init__(self, timestamp=position.timestamp)
- movement_type: str = field(kw_only=True)
- """GazeMovement type to which gaze position belongs."""
+ self.__position = position
- movement_index: int = field(kw_only=True)
- """GazeMovement index to which gaze positon belongs."""
+ @property
+ def position(self) -> GazePosition:
+ """Get gaze status position."""
+ return self.__position
- @classmethod
- def from_position(cls, gaze_position: GazePosition, movement_type: str, movement_index: int) -> GazeStatusType:
- """Initialize from a gaze position instance."""
+ def append(self, movement_index: int, movement_type:type):
+ """Append movement index and type."""
- return cls(gaze_position.value, precision=gaze_position.precision, movement_type=movement_type, movement_index=movement_index)
+ super().append((movement_index, movement_type))
TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus")
# Type definition for type annotation convenience
-class TimeStampedGazeStatus(DataFeatures.TimeStampedBuffer):
- """Define timestamped buffer to store list of gaze statusa.
+class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList):
+ """Handle timestamped gaze status into a list."""
- !!! note
- List of gaze status are required as a gaze position can belongs to two consecutive gaze movements as last and first position.
- """
-
- def __setitem__(self, key, value: list):
-
- assert(isinstance(value, list))
+ def __init__(self):
- super().__setitem__(key, value)
+ super().__init__(GazeStatus)
class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
"""Abstract class to define what should provide a gaze movement identifier."""
@@ -525,7 +580,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
terminate: allows to notify identification algorithm that given gaze position will be the last one.
Returns:
- finished_gaze_movement: identified gaze movement once it is finished otherwise it returns unvalid gaze movement.
+ gaze_movement: identified gaze movement once it is finished otherwise it returns empty gaze movement.
"""
raise NotImplementedError('identify() method not implemented')
@@ -564,66 +619,36 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
ts_status = TimeStampedGazeStatus()
# Get last ts to terminate identification on last gaze position
- last_ts, _ = ts_gaze_positions.last
+ last_ts = ts_gaze_positions[-1].timestamp
# Iterate on gaze positions
- for ts, gaze_position in ts_gaze_positions.items():
-
- finished_gaze_movement = self.identify(ts, gaze_position, terminate=(ts == last_ts))
+ for gaze_position in ts_gaze_positions:
- if is_fixation(finished_gaze_movement):
+ gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts))
- start_ts, start_position = finished_gaze_movement.positions.first
-
- ts_fixations[start_ts] = finished_gaze_movement
+ if gaze_movement:
# First gaze movement position is always shared with previous gaze movement
- for ts, position in finished_gaze_movement.positions.items():
-
- gaze_status = GazeStatus.from_position(position, 'Fixation', len(ts_fixations))
-
- if ts != start_ts:
-
- ts_status[ts] = [gaze_status]
-
- else:
+ for movement_position in gaze_movement:
- try:
+ # Is a status already exist for this position?
+ gaze_status = ts_status.look_for(movement_position.timestamp)
- ts_status[start_ts].append(gaze_status)
+ if not gaze_status:
+
+ gaze_status = GazeStatus(movement_position)
+ ts_status.append(gaze_status)
- except KeyError:
+ gaze_status.append(len(ts_fixations), type(gaze_movement))
- ts_status[start_ts] = [gaze_status]
-
- elif is_saccade(finished_gaze_movement):
-
- start_ts, start_position = finished_gaze_movement.positions.first
-
- ts_saccades[start_ts] = finished_gaze_movement
-
- # First gaze movement position is always shared with previous gaze movement
- for ts, position in finished_gaze_movement.positions.items():
+ # Store gaze movment into the appropriate list
+ if is_fixation(gaze_movement):
- gaze_status = GazeStatus.from_position(position, 'Saccade', len(ts_saccades))
+ ts_fixations.append(gaze_movement)
- if ts != start_ts:
+ elif is_saccade(gaze_movement):
- ts_status[ts] = [gaze_status]
-
- else:
-
- try:
-
- ts_status[start_ts].append(gaze_status)
-
- except KeyError:
-
- ts_status[start_ts] = [gaze_status]
-
- else:
-
- continue
+ ts_saccades.append(gaze_movement)
return ts_fixations, ts_saccades, ts_status
@@ -635,24 +660,22 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
Returns:
timestamp: first gaze position date of identified gaze movement
- finished_gaze_movement: identified gaze movement once it is finished
+ gaze_movement: identified gaze movement once it is finished
"""
assert(type(ts_gaze_positions) == TimeStampedGazePositions)
# Get last ts to terminate identification on last gaze position
- last_ts, _ = ts_gaze_positions.last
+ last_ts = ts_gaze_positions[-1]
# Iterate on gaze positions
- for ts, gaze_position in ts_gaze_positions.items():
-
- finished_gaze_movement = self.identify(ts, gaze_position, terminate=(ts == last_ts))
+ for gaze_position in ts_gaze_positions:
- if finished_gaze_movement.valid:
+ gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts))
- start_ts, start_position = finished_gaze_movement.positions.first
+ if gaze_movement:
- yield start_ts, finished_gaze_movement
+ yield gaze_movement
ScanStepType = TypeVar('ScanStep', bound="ScanStep")
# Type definition for type annotation convenience
@@ -664,34 +687,43 @@ class ScanStepError(Exception):
super().__init__(message)
-@dataclass(frozen=True)
class ScanStep():
"""Define a scan step as a fixation and a consecutive saccade.
+
+ Parameters:
+ first_fixation: a fixation that comes before the next saccade.
+ last_saccade: a saccade that comes after the previous fixation.
!!! warning
-
Scan step have to start by a fixation and then end by a saccade.
"""
- first_fixation: Fixation
- """A fixation that comes before the next saccade."""
+ def __init__(self, first_fixation: Fixation, last_saccade: Saccade):
- last_saccade: Saccade
- """A saccade that comes after the previous fixation."""
-
- def __post_init__(self):
+ self.__first_fixation = first_fixation
+ self.__last_saccade = last_saccade
# First movement have to be a fixation
- if not is_fixation(self.first_fixation):
+ if not is_fixation(self.__first_fixation):
raise ScanStepError('First step movement is not a fixation')
# Last movement have to be a saccade
- if not is_saccade(self.last_saccade):
+ if not is_saccade(self.__last_saccade):
raise ScanStepError('Last step movement is not a saccade')
@property
+ def first_fixation(self):
+ """Get scan step first fixation."""
+ return self.__first_fixation
+
+ @property
+ def last_saccade(self):
+ """Get scan step last saccade."""
+ return self.__last_saccade
+
+ @property
def fixation_duration(self) -> int|float:
"""Time spent on AOI
@@ -699,7 +731,7 @@ class ScanStep():
fixation duration
"""
- return self.first_fixation.duration
+ return self.__first_fixation.duration
@property
def duration(self) -> int|float:
@@ -709,7 +741,7 @@ class ScanStep():
duration
"""
- return self.first_fixation.duration + self.last_saccade.duration
+ return self.__first_fixation.duration + self.__last_saccade.duration
ScanPathType = TypeVar('ScanPathType', bound="ScanPathType")
# Type definition for type annotation convenience
@@ -751,7 +783,7 @@ class ScanPath(list):
self.__duration -= oldest_step.duration
- def append_saccade(self, ts, saccade) -> ScanStepType:
+ def append_saccade(self, saccade) -> ScanStepType:
"""Append new saccade to scan path and return last new scan step if one have been created."""
# Ignore saccade if no fixation came before
@@ -779,7 +811,7 @@ class ScanPath(list):
# Clear last fixation
self.__last_fixation = None
- def append_fixation(self, ts, fixation):
+ def append_fixation(self, fixation):
"""Append new fixation to scan path.
!!! warning
Consecutives fixations are ignored keeping the last fixation"""
@@ -882,24 +914,23 @@ class AOIScanStepError(Exception):
self.aoi = aoi
-@dataclass(frozen=True)
class AOIScanStep():
"""Define an aoi scan step as a set of successive gaze movements onto a same AOI.
- !!! warning
-
- Aoi scan step have to start by a fixation and then end by a saccade."""
-
- movements: TimeStampedGazeMovements
- """All movements over an AOI and the last saccade that comes out."""
+ Parameters:
+ movements: all movements over an AOI and the last saccade that comes out.
+ aoi: AOI name
+ letter: AOI unique letter to ease sequence analysis.
- aoi: str = field(default='')
- """AOI name."""
+ !!! warning
+ Aoi scan step have to start by a fixation and then end by a saccade.
+ """
- letter: str = field(default='')
- """AOI unique letter to ease sequence analysis."""
+ def __init__(self, movements: TimeStampedGazeMovements, aoi: str = '', letter: str = ''):
- def __post_init__(self):
+ self.__movements = movements
+ self.__aoi = aoi
+ self.__letter = letter
# First movement have to be a fixation
if not is_fixation(self.first_fixation):
@@ -912,18 +943,29 @@ class AOIScanStep():
raise AOIScanStepError('Last step movement is not a saccade', self.aoi)
@property
+ def movements(self):
+ """Get AOI scan step movements."""
+ return self.__movements
+
+ @property
+ def aoi(self):
+ """Get AOI scan step aoi."""
+ return self.__aoi
+
+ @property
+ def letter(self):
+ """Get AOI scan step letter."""
+ return self.__letter
+
+ @property
def first_fixation(self):
"""First fixation on AOI."""
-
- _, first_movement = self.movements.first
- return first_movement
+ return self.movements[0]
@property
def last_saccade(self):
"""Last saccade that comes out AOI."""
-
- _, last_movement = self.movements.last
- return last_movement
+ return self.movements[-1]
@property
def fixation_duration(self) -> int|float:
@@ -932,14 +974,7 @@ class AOIScanStep():
Returns:
fixation duration
"""
-
- # Timestamp of first position of first fixation
- first_ts, _ = self.first_fixation.positions.first
-
- # Timestamp of first position of last saccade
- last_ts, _ = self.last_saccade.positions.first
-
- return last_ts - first_ts
+ return self.last_saccade[0].timestamp - self.first_fixation[0].timestamp
@property
def duration(self) -> int|float:
@@ -948,14 +983,7 @@ class AOIScanStep():
Returns:
duration
"""
-
- # Timestamp of first position of first fixation
- first_ts, _ = self.first_fixation.positions.first
-
- # Timestamp of last position of last saccade
- last_ts, _ = self.last_saccade.positions.last
-
- return last_ts - first_ts
+ return self.last_saccade[-1].timestamp - self.first_fixation[0].timestamp
AOIScanPathType = TypeVar('AOIScanPathType', bound="AOIScanPathType")
# Type definition for type annotation convenience
@@ -1075,19 +1103,20 @@ class AOIScanPath(list):
return self.__transition_matrix
- def append_saccade(self, ts, saccade):
+ def append_saccade(self, saccade):
"""Append new saccade to aoi scan path."""
# Ignore saccade if no fixation have been stored before
if len(self.__movements) > 0:
- self.__movements[ts] = saccade
+ self.__movements.append(saccade)
- def append_fixation(self, ts, fixation, looked_aoi: str) -> bool:
+ def append_fixation(self, fixation, looked_aoi: str) -> bool:
"""Append new fixation to aoi scan path and return last new aoi scan step if one have been created.
!!! warning
- It could raise AOIScanStepError"""
+ It could raise AOIScanStepError
+ """
# Replace None aoi by generic OutsideAOI name
if looked_aoi is None:
@@ -1137,14 +1166,14 @@ class AOIScanPath(list):
self.__movements = TimeStampedGazeMovements()
# Append new fixation
- self.__movements[ts] = fixation
+ self.__movements.append(fixation)
# Remember new aoi
self.__current_aoi = looked_aoi
else:
# Append new fixation
- self.__movements[ts] = fixation
+ self.__movements.append(fixation)
# Remember aoi
self.__current_aoi = looked_aoi
diff --git a/src/argaze/PupillAnalysis/WorkloadIndex.py b/src/argaze/PupillAnalysis/WorkloadIndex.py
index 1f3c586..f97dce3 100644
--- a/src/argaze/PupillAnalysis/WorkloadIndex.py
+++ b/src/argaze/PupillAnalysis/WorkloadIndex.py
@@ -15,51 +15,61 @@ from argaze import PupillFeatures
import numpy
-@dataclass
class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer):
- """Periodic average of pupill diameter variations to pupill diameter reference value."""
+ """Periodic average of pupill diameter variations to pupill diameter reference value.
- reference: PupillFeatures.PupillDiameter
- """ """
+ Parameters:
+ reference: base line value.
+ period: identification period length.
+ """
+ def __init__(self, reference: PupillFeatures.PupillDiameter, period: int|float = 1):
- period: int | float = field(default=1)
- """Identification period length."""
+ assert(not math.isnan(self.__reference))
- def __post_init__(self):
-
- assert(self.reference.valid)
+ self.__reference = reference
+ self.__period = period
self.__variations_sum = 0.
self.__variations_number = 0
self.__last_ts = 0
+ @property
+ def reference(self) -> PupillFeatures.PupillDiameter:
+ """Get workload index reference."""
+ return self.__reference
+
+ @property
+ def period(self) -> int|float:
+ """Get workload index period."""
+ return self.__period
+
@DataFeatures.PipelineStepMethod
- def analyze(self, ts: int|float, pupill_diameter) -> float:
+ def analyze(self, pupill_diameter: PupillFeatures.PupillDiameter) -> float:
"""Analyze workload index from successive timestamped pupill diameters."""
# Ignore non valid pupill diameter
- if not pupill_diameter.valid:
+ if not math.isnan(pupill_diameter):
return None
- if ts - self.__last_ts >= self.period:
+ if pupill_diameter.timestamp - self.__last_ts >= self.__period:
- if self.__variations_number > 0 and self.reference.value > 0.:
+ if self.__variations_number > 0 and self.__reference.value > 0.:
- workload_index = (self.__variations_sum / self.__variations_number) / self.reference.value
+ workload_index = (self.__variations_sum / self.__variations_number) / self.__reference.value
else:
workload_index = 0.
- self.__variations_sum = pupill_diameter.value - self.reference.value
+ self.__variations_sum = pupill_diameter.value - self.__reference.value
self.__variations_number = 1
- self.__last_ts = ts
+ self.__last_ts = pupill_diameter.timestamp
return workload_index
else:
- self.__variations_sum += pupill_diameter.value - self.reference.value
+ self.__variations_sum += pupill_diameter.value - self.__reference.value
self.__variations_number += 1
\ No newline at end of file
diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupillFeatures.py
index d8f9331..492e7ca 100644
--- a/src/argaze/PupillFeatures.py
+++ b/src/argaze/PupillFeatures.py
@@ -10,71 +10,41 @@ __license__ = "BSD"
from typing import TypeVar
from dataclasses import dataclass, field
import json
+import math
from argaze import DataFeatures
-@dataclass(frozen=True)
-class PupillDiameter():
- """Define pupill diameter as ..."""
-
- value: float = field(default=0.)
- """Pupill diameter value."""
-
- @property
- def valid(self) -> bool:
- """Is the value not 0"""
-
- return self.value != 0.
-
- def __repr__(self):
- """String representation"""
+PupillDiameterType = TypeVar('PupillDiameter', bound="PupillDiameter")
+# Type definition for type annotation convenience
- return json.dumps(self, ensure_ascii = False, default=vars)
+class PupillDiameter(float, DataFeatures.TimestampedObject):
+ """Define pupill diameter as a single float value.
-class UnvalidPupillDiameter(PupillDiameter):
- """Unvalid pupill diameter."""
+ Parameters:
+ value: pupill diameter value.
+ """
+ def __new__(cls, value: float = math.nan, **kwargs):
- def __init__(self, message=None):
+ return float.__new__(cls, value)
- self.message = message
+ def __init__(self, value: float = math.nan, **kwargs):
- super().__init__(0.)
+ super().__init__(**kwargs)
+ @property
+ def value(self):
+ """Get pupill diameter value."""
+ return float(self)
+
TimeStampedPupillDiametersType = TypeVar('TimeStampedPupillDiameters', bound="TimeStampedPupillDiameters")
# Type definition for type annotation convenience
-class TimeStampedPupillDiameters(DataFeatures.TimeStampedBuffer):
- """Define timestamped buffer to store pupill diameters."""
-
- def __setitem__(self, key, value: PupillDiameter|dict):
- """Force PupillDiameter storage."""
-
- # Convert dict into PupillDiameter
- if type(value) == dict:
-
- assert(set(['value']).issubset(value.keys()))
-
- if 'message' in value.keys():
-
- value = UnvalidPupillDiameter(value['message'])
-
- else:
-
- value = PupillDiameter(value['value'])
-
- assert(type(value) == PupillDiameter or type(value) == UnvalidPupillDiameter)
-
- super().__setitem__(key, value)
-
- @classmethod
- def from_json(self, json_filepath: str) -> TimeStampedPupillDiametersType:
- """Create a TimeStampedPupillDiametersType from .json file."""
-
- with open(json_filepath, encoding='utf-8') as ts_buffer_file:
+class TimeStampedPupillDiameters(DataFeatures.TimestampedObjectsList):
+ """Handle timestamped pupill diamters into a list."""
- json_buffer = json.load(ts_buffer_file)
+ def __init__(self, pupill_diameters: list = []):
- return TimeStampedPupillDiameters({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer})
+ DataFeatures.TimestampedObjectsList.__init__(self, PupillDiameter, pupill_diameters)
TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer")
# Type definition for type annotation convenience
@@ -83,7 +53,7 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject):
"""Abstract class to define what should provide a pupill diameter analyser."""
@DataFeatures.PipelineStepMethod
- def analyze(self, timestamp: int|float, pupill_diameter, float) -> float:
+ def analyze(self, pupill_diameter: PupillDiameterType) -> any:
"""Analyze pupill diameter from successive timestamped pupill diameters."""
raise NotImplementedError('analyze() method not implemented')
@@ -96,9 +66,9 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject):
ts_analyzis = DataFeatures.TimeStampedBuffer()
# Iterate on pupill diameters
- for ts, pupill_diameter in ts_pupill_diameters.items():
+ for pupill_diameter in ts_pupill_diameters:
- analysis = self.analyze(ts, pupill_diameter)
+ analysis = self.analyze(pupill_diameter)
if analysis is not None: