diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/argaze.test/AreaOfInterest/AOIFeatures.py | 10 | ||||
-rw-r--r-- | src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py | 28 | ||||
-rw-r--r-- | src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py | 18 | ||||
-rw-r--r-- | src/argaze.test/GazeFeatures.py | 12 | ||||
-rw-r--r-- | src/argaze/ArFeatures.py | 28 | ||||
-rw-r--r-- | src/argaze/AreaOfInterest/AOIFeatures.py | 10 | ||||
-rw-r--r-- | src/argaze/DataFeatures.py | 4 | ||||
-rw-r--r-- | src/argaze/GazeAnalysis/DispersionThresholdIdentification.py | 2 | ||||
-rw-r--r-- | src/argaze/GazeAnalysis/LinearRegression.py | 55 | ||||
-rw-r--r-- | src/argaze/GazeAnalysis/VelocityThresholdIdentification.py | 2 | ||||
-rw-r--r-- | src/argaze/GazeFeatures.py | 42 | ||||
-rw-r--r-- | src/argaze/utils/demo_data/demo_frame_logger.py | 4 | ||||
-rw-r--r-- | src/argaze/utils/demo_data/demo_layer_logger.py | 2 |
13 files changed, 93 insertions, 124 deletions
diff --git a/src/argaze.test/AreaOfInterest/AOIFeatures.py b/src/argaze.test/AreaOfInterest/AOIFeatures.py index cb8fb52..baebadf 100644 --- a/src/argaze.test/AreaOfInterest/AOIFeatures.py +++ b/src/argaze.test/AreaOfInterest/AOIFeatures.py @@ -23,12 +23,12 @@ class TestAreaOfInterestClass(unittest.TestCase): # Check that 0D AreaOfInterest creation is considered as empty aoi_0D = AOIFeatures.AreaOfInterest() - self.assertTrue(aoi_0D.empty) + self.assertTrue(aoi_0D.is_empty()) # Check 1 point 1D AreaOfInterest creation aoi_1D = AOIFeatures.AreaOfInterest([[0]]) - self.assertFalse(aoi_1D.empty) + self.assertFalse(aoi_1D.is_empty()) self.assertEqual(aoi_1D.dimension, 1) self.assertEqual(aoi_1D.points_number, 1) self.assertIsNone(numpy.testing.assert_array_equal(aoi_1D.bounds, [[0], [0]])) @@ -41,7 +41,7 @@ class TestAreaOfInterestClass(unittest.TestCase): # Check 2 points 1D AreaOfInterest creation aoi_1D = AOIFeatures.AreaOfInterest([[0], [1]]) - self.assertFalse(aoi_1D.empty) + self.assertFalse(aoi_1D.is_empty()) self.assertEqual(aoi_1D.dimension, 1) self.assertEqual(aoi_1D.points_number, 2) self.assertIsNone(numpy.testing.assert_array_equal(aoi_1D.bounds, [[0], [1]])) @@ -54,7 +54,7 @@ class TestAreaOfInterestClass(unittest.TestCase): # Check 4 points 2D AreaOfInterest creation aoi_2D = AOIFeatures.AreaOfInterest([[0, 0], [0, 1], [1, 0], [1, 1]]) - self.assertFalse(aoi_2D.empty) + self.assertFalse(aoi_2D.is_empty()) self.assertEqual(aoi_2D.dimension, 2) self.assertEqual(aoi_2D.points_number, 4) self.assertIsNone(numpy.testing.assert_array_equal(aoi_2D.bounds, [[0, 0], [1, 1]])) @@ -66,7 +66,7 @@ class TestAreaOfInterestClass(unittest.TestCase): # Check 8 points 3D AreaOfInterest creation aoi_3D = AOIFeatures.AreaOfInterest([[0, 0, 0], [0, 1, 0], [1, 0, 0], [1, 1, 0], [0, 0, 1], [0, 1, 1], [1, 0, 1], [1, 1, 1]]) - self.assertFalse(aoi_3D.empty) + self.assertFalse(aoi_3D.is_empty()) self.assertEqual(aoi_3D.dimension, 3) self.assertEqual(aoi_3D.points_number, 8) self.assertIsNone(numpy.testing.assert_array_equal(aoi_3D.bounds, [[0, 0, 0], [1, 1, 1]])) diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index 156f6f1..0bb8ed7 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -125,7 +125,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) def test_fixation_and_direct_saccade_identification(self): """Test DispersionThresholdIdentification fixation and saccade identification.""" @@ -157,7 +157,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check first saccade saccade = ts_saccades.pop(0) @@ -165,7 +165,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(saccade), 2) self.assertGreaterEqual(saccade.duration, min_time) self.assertLessEqual(saccade.duration, max_time) - self.assertLessEqual(saccade.finished, True) + self.assertLessEqual(saccade.is_finished(), True) # Check that last position of a movement is equal to first position of next movement self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) @@ -178,7 +178,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check that last position of a movement is equal to first position of next movement self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp) @@ -217,7 +217,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check first saccade saccade = ts_saccades.pop(0) @@ -225,7 +225,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(saccade), move + 2) self.assertGreaterEqual(saccade.duration, (move + 1) * min_time) self.assertLessEqual(saccade.duration, (move + 1) * max_time) - self.assertLessEqual(saccade.finished, True) + self.assertLessEqual(saccade.is_finished(), True) # Check that last position of a movement is equal to first position of next movement self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) @@ -238,7 +238,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check that last position of a movement is equal to first position of next movement self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp) @@ -271,7 +271,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, 6 * min_time) self.assertLessEqual(fixation.duration, 6 * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check second fixation fixation = ts_fixations.pop(0) @@ -280,7 +280,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, 4 * min_time) self.assertLessEqual(fixation.duration, 4 * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) def test_fixation_overlapping(self): """Test Fixation overlap function.""" @@ -340,7 +340,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): #self.assertGreaterEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (2 * size - 1) * min_time) self.assertLessEqual(fixation.duration, (2 * size - 1) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) def test_identification_browsing(self): """Test DispersionThresholdIdentification identification browsing.""" @@ -370,14 +370,14 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max) self.assertGreaterEqual(finished_gaze_movement.duration, (size-1) * min_time) self.assertLessEqual(finished_gaze_movement.duration, (size-1) * max_time) - self.assertLessEqual(finished_gaze_movement.finished, True) + self.assertLessEqual(finished_gaze_movement.is_finished(), True) elif GazeFeatures.is_saccade(finished_gaze_movement): self.assertEqual(len(finished_gaze_movement), 2) self.assertGreaterEqual(finished_gaze_movement.duration, min_time) self.assertLessEqual(finished_gaze_movement.duration, max_time) - self.assertLessEqual(finished_gaze_movement.finished, True) + self.assertLessEqual(finished_gaze_movement.is_finished(), True) # Check that last gaze position date of current fixation is equal to given gaze position date # NOTE: This is not true for saccade as, for I-DT, there is a minimal time window while the gaze movement is unknown @@ -413,14 +413,14 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max) self.assertGreaterEqual(finished_gaze_movement.duration, size * min_time) self.assertLessEqual(finished_gaze_movement.duration, size * max_time) - self.assertLessEqual(finished_gaze_movement.finished, True) + self.assertLessEqual(finished_gaze_movement.is_finished(), True) elif GazeFeatures.is_saccade(finished_gaze_movement): self.assertEqual(len(finished_gaze_movement), 2) self.assertGreaterEqual(finished_gaze_movement.duration, 2 * min_time) self.assertLessEqual(finished_gaze_movement.duration, 2 * max_time) - self.assertLessEqual(finished_gaze_movement.finished, True) + self.assertLessEqual(finished_gaze_movement.is_finished(), True) if __name__ == '__main__': diff --git a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py index 262cfc0..1c7f7e3 100644 --- a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py @@ -125,7 +125,7 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(fixation), size - 1) self.assertGreaterEqual(fixation.duration, (size - 2) * min_time) self.assertLessEqual(fixation.duration, (size - 2) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) def test_fixation_and_direct_saccade_identification(self): """Test VelocityThresholdIdentification fixation and saccade identification.""" @@ -157,7 +157,7 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(fixation), size - 1) self.assertGreaterEqual(fixation.duration, (size - 2) * min_time) self.assertLessEqual(fixation.duration, (size - 2) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check first saccade saccade = ts_saccades.pop(0) @@ -165,7 +165,7 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(saccade), 2) self.assertGreaterEqual(saccade.duration, min_time) self.assertLessEqual(saccade.duration, max_time) - self.assertLessEqual(saccade.finished, True) + self.assertLessEqual(saccade.is_finished(), True) # Check that last position of a movement is equal to first position of next movement self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) @@ -177,7 +177,7 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(fixation), size) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check that last position of a movement is equal to first position of next movement self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp) @@ -216,7 +216,7 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(fixation), size - 1) # BUG: NOT ALWAYS TRUE !!! self.assertGreaterEqual(fixation.duration, (size - 2) * min_time) self.assertLessEqual(fixation.duration, (size - 2) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check first saccade saccade = ts_saccades.pop(0) @@ -224,7 +224,7 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(saccade), move + 2) self.assertGreaterEqual(saccade.duration, (move + 1) * min_time) self.assertLessEqual(saccade.duration, (move + 1) * max_time) - self.assertLessEqual(saccade.finished, True) + self.assertLessEqual(saccade.is_finished(), True) # Check that last position of a movement is equal to first position of next movement self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) @@ -236,7 +236,7 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(fixation), size) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check that last position of a movement is equal to first position of next movement self.assertEqual(saccade[-1], fixation[0]) @@ -269,7 +269,7 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(fixation), 6) self.assertGreaterEqual(fixation.duration, 5 * min_time) self.assertLessEqual(fixation.duration, 5 * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) # Check second fixation fixation = ts_fixations.pop(0) @@ -277,7 +277,7 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(fixation), 4) self.assertGreaterEqual(fixation.duration, 3 * min_time) self.assertLessEqual(fixation.duration, 3 * max_time) - self.assertLessEqual(fixation.finished, True) + self.assertLessEqual(fixation.is_finished(), True) def test_identification_browsing(self): """Test VelocityThresholdIdentification identification browsing.""" diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index 035c76a..2d052ab 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -321,7 +321,7 @@ class TestGazeMovementClass(unittest.TestCase): self.assertEqual(abstract_gaze_movement.duration, 0) self.assertEqual(abstract_gaze_movement.amplitude, 0) self.assertEqual(bool(abstract_gaze_movement), False) - self.assertEqual(abstract_gaze_movement.finished, False) + self.assertEqual(abstract_gaze_movement.is_finished(), False) def test_finish(self): """Test GazeMovement finishing.""" @@ -331,15 +331,15 @@ class TestGazeMovementClass(unittest.TestCase): abstract_gaze_movement_ref = abstract_gaze_movement # Check abstract GazeMovement and its reference - self.assertEqual(abstract_gaze_movement.finished, False) - self.assertEqual(abstract_gaze_movement_ref.finished, False) + self.assertEqual(abstract_gaze_movement.is_finished(), False) + self.assertEqual(abstract_gaze_movement_ref.is_finished(), False) # Set gaze movement as finished abstract_gaze_movement.finish() # Check abstract GazeMovement and its reference - self.assertEqual(abstract_gaze_movement.finished, True) - self.assertEqual(abstract_gaze_movement_ref.finished, True) + self.assertEqual(abstract_gaze_movement.is_finished(), True) + self.assertEqual(abstract_gaze_movement_ref.is_finished(), True) def test_message(self): """Test GazeMovement creation with message only.""" @@ -351,7 +351,7 @@ class TestGazeMovementClass(unittest.TestCase): self.assertEqual(gaze_movement.duration, 0) self.assertEqual(gaze_movement.amplitude, 0) self.assertEqual(bool(gaze_movement), False) - self.assertEqual(gaze_movement.finished, False) + self.assertEqual(gaze_movement.is_finished(), False) self.assertEqual(gaze_movement.message, 'test') class TestScanStepClass(unittest.TestCase): diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index fb22afa..b5d1012 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -195,12 +195,11 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): return self.__draw_parameters @property - def last_looked_aoi_name(self) -> bool: + def last_looked_aoi_name(self) -> str: """Get last looked aoi name.""" return self.__looked_aoi_name - @property - def analysis_available(self) -> bool: + def is_analysis_available(self) -> bool: """Are aoi scan path analysis ready?""" return self.__aoi_scan_path_analyzed @@ -414,7 +413,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__looked_aoi_name, _ = self.__aoi_matcher.match(self.__aoi_scene, gaze_movement, timestamp=gaze_movement.timestamp) # Valid and finished gaze movement has been identified - if gaze_movement and gaze_movement.finished: + if gaze_movement and gaze_movement.is_finished(): if GazeFeatures.is_fixation(gaze_movement): @@ -624,8 +623,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """Get last identified gaze movement""" return self.__identified_gaze_movement - @property - def analysis_available(self) -> bool: + def is_analysis_available(self) -> bool: """Are scan path analysis ready?""" return self.__scan_path_analyzed @@ -700,12 +698,24 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # JSON file format if file_format == 'json': - new_gaze_position_calibrator = GazeFeatures.GazePositionCalibrator.from_json(filepath) + with open(filepath) as file: + + gaze_movement_calibrator_value = json.load(file) # dict: else: - new_gaze_position_calibrator = GazeFeatures.GazePositionCalibrator.from_dict(gaze_position_calibrator_value) + gaze_movement_calibrator_value = frame_data.pop('gaze_movement_identifier') + + # Create gaze position calibrator + gaze_position_calibrator_module_path, gaze_position_calibrator_parameters = gaze_movement_calibrator_value.popitem() + + # Prepend argaze.GazeAnalysis path when a single name is provided + if len(gaze_position_calibrator_module_path.split('.')) == 1: + gaze_position_calibrator_module_path = f'argaze.GazeAnalysis.{gaze_position_calibrator_module_path}' + + gaze_position_calibrator_module = importlib.import_module(gaze_position_calibrator_module_path) + new_gaze_position_calibrator = gaze_position_calibrator_module.GazePositionCalibrator(**gaze_position_calibrator_parameters) except KeyError: @@ -911,7 +921,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified - if self.__identified_gaze_movement and self.__identified_gaze_movement.finished: + if self.__identified_gaze_movement and self.__identified_gaze_movement.is_finished(): if GazeFeatures.is_fixation(self.__identified_gaze_movement): diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index c7e5193..616f088 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -93,25 +93,20 @@ class AreaOfInterest(numpy.ndarray): @property def dimension(self) -> int: """Number of axis coding area points positions.""" - return self.shape[1] @property def points_number(self) -> int: """Number of points defining the area.""" - return self.shape[0] - @property - def empty(self) -> bool: + def is_empty(self) -> bool: """Is AOI empty ?""" - return self.shape[0] == 0 @property def bounds(self) -> numpy.array: """Get area's bounds.""" - min_bounds = numpy.min(self, axis=0) max_bounds = numpy.max(self, axis=0) @@ -120,13 +115,11 @@ class AreaOfInterest(numpy.ndarray): @property def center(self) -> numpy.array: """Center of mass.""" - return self.mean(axis=0) @property def size(self) -> numpy.array: """Get scene size.""" - min_bounds, max_bounds = self.bounds return max_bounds - min_bounds @@ -134,7 +127,6 @@ class AreaOfInterest(numpy.ndarray): @property def area(self) -> float: """Area of the polygon defined by aoi's points.""" - return Polygon(self).area @property diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 6d471e4..60b7c71 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -444,7 +444,7 @@ class PipelineStepObject(): # Init private attribute self.__name = name - self.__working_directory = None + self.__working_directory = working_directory self.__observers = observers if observers is not None else {} self.__execution_times = {} self.__properties = {} @@ -640,7 +640,7 @@ class PipelineStepObject(): # Open file with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: - json.dump({DataFeatures.module_path(self):DataFeatures.JsonEncoder().default(self)}, object_file, ensure_ascii=False, indent=4) + json.dump({module_path(self):as_dict(self)}, object_file, ensure_ascii=False, indent=4) # QUESTION: maybe we need two saving mode? #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index 84d14e7..745b62c 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -49,7 +49,7 @@ class Fixation(GazeFeatures.Fixation): """Get fixation's maximal deviation.""" return self.__deviation_max - def overlap(self, fixation: FixationType) -> bool: + def is_overlapping(self, fixation: FixationType) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" positions_array = numpy.asarray(fixation.values()) diff --git a/src/argaze/GazeAnalysis/LinearRegression.py b/src/argaze/GazeAnalysis/LinearRegression.py index 717e8a3..d788f6f 100644 --- a/src/argaze/GazeAnalysis/LinearRegression.py +++ b/src/argaze/GazeAnalysis/LinearRegression.py @@ -9,9 +9,8 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple -from dataclasses import dataclass, field -from argaze import GazeFeatures +from argaze import DataFeatures, GazeFeatures from sklearn.linear_model import LinearRegression import numpy @@ -20,7 +19,6 @@ import cv2 GazePositionType = TypeVar('GazePositionType', bound="GazePositionType") # Type definition for type annotation convenience -@dataclass class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator): """Implementation of linear regression algorithm as described in: @@ -28,29 +26,41 @@ class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator): *Time- and space-efficient eye tracker calibration.* Proceedings of the 11th ACM Symposium on Eye Tracking Research & Applications (ETRA'19, 1-8). [https://dl.acm.org/doi/pdf/10.1145/3314111.3319818](https://dl.acm.org/doi/pdf/10.1145/3314111.3319818) - """ - coefficients: numpy.array = field(default_factory=lambda : numpy.array([[1., 0.], [0., 1.]])) - """Linear regression coefficients""" + Parameters: + coefficients: linear regression coefficients. + intercept: linear regression intercept value. + """ - intercept: numpy.array = field(default_factory=lambda : numpy.array([0., 0.])) - """Linear regression intercept value""" + def __init__(self, coefficients: list = [[1., 0.], [0., 1.]], intercept: list = [0., 0.]): - def __post_init__(self): + super().__init__() self.__linear_regression = LinearRegression() - self.__linear_regression.coef_ = numpy.array(self.coefficients) - self.__linear_regression.intercept_ = numpy.array(self.intercept) + self.__linear_regression.coef_ = numpy.array(coefficients) + self.__linear_regression.intercept_ = numpy.array(intercept) - def store(self, timestamp: int|float, observed_gaze_position: GazeFeatures.GazePosition, expected_gaze_position: GazeFeatures.GazePosition): - """Store observed and expected gaze positions.""" + @property + def coefficients(self) -> list: + """Get linear regression coefficients.""" + return self.__linear_regression.coef_.tolist() + + @property + def intercept(self): + """Get linear regression intercept value.""" + return self.__linear_regression.intercept_.tolist() + + def is_calibrating(self) -> bool: + """Is the calibration running?""" + return self.__linear_regression is None + def store(self, observed_gaze_position: GazeFeatures.GazePosition, expected_gaze_position: GazeFeatures.GazePosition): + """Store observed and expected gaze positions.""" self.__observed_positions.append(observed_gaze_position) self.__expected_positions.append(expected_gaze_position) def reset(self): """Reset observed and expected gaze positions.""" - self.__observed_positions = [] self.__expected_positions = [] self.__linear_regression = None @@ -61,24 +71,16 @@ class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator): Returns: score: the score of linear regression """ - self.__linear_regression = LinearRegression().fit(self.__observed_positions, self.__expected_positions) - # Update frozen coefficients attribute - object.__setattr__(self, 'coefficients', self.__linear_regression.coef_) - - # Update frozen intercept attribute - object.__setattr__(self, 'intercept', self.__linear_regression.intercept_) - # Return calibrated gaze position return self.__linear_regression.score(self.__observed_positions, self.__expected_positions) def apply(self, gaze_position: GazeFeatures.GazePosition) -> GazePositionType: """Apply calibration onto observed gaze position.""" - if not self.calibrating: - return GazeFeatures.GazePosition(self.__linear_regression.predict(numpy.array([gaze_position]))[0], precision=gaze_position.precision) + return GazeFeatures.GazePosition(self.__linear_regression.predict(numpy.array([gaze_position]))[0], precision=gaze_position.precision, timestamp=gaze_position.timestamp) else: @@ -86,7 +88,6 @@ class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator): def draw(self, image: numpy.array, size: tuple, resolution: tuple, line_color: tuple = (0, 0, 0), thickness: int = 1): """Draw calibration field.""" - width, height = size if width * height > 0: @@ -104,9 +105,3 @@ class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator): end = self.apply(GazeFeatures.GazePosition(start)).value cv2.line(image, (int(start[0]), int(start[1])), (int(end[0]), int(end[1])), line_color, thickness) - - @property - def calibrating(self) -> bool: - """Is the calibration running?""" - - return self.__linear_regression is None
\ No newline at end of file diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index a54cee1..bfe04fa 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -48,7 +48,7 @@ class Fixation(GazeFeatures.Fixation): """Get fixation's maximal deviation.""" return self.__deviation_max - def overlap(self, fixation: FixationType) -> bool: + def is_overlapping(self, fixation: FixationType) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" positions_array = numpy.asarray(fixation.values()) diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index fb56935..cdd29a3 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -288,43 +288,17 @@ class GazePositionCalibrationFailed(Exception): GazePositionCalibratorType = TypeVar('GazePositionCalibrator', bound="GazePositionCalibrator") # Type definition for type annotation convenience -@dataclass -class GazePositionCalibrator(): +class GazePositionCalibrator(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a gaze position calibrator algorithm.""" - @classmethod - def from_dict(cls, calibrator_data: dict) -> GazePositionCalibratorType: - """Load gaze position calibrator from dictionary. - - Parameters: - calibrator_data: dictionary with class name and attributes to load - """ - gaze_position_calibrator_module_path, gaze_position_calibrator_parameters = calibrator_data.popitem() - - # Prepend argaze.GazeAnalysis path when a single name is provided - if len(gaze_position_calibrator_module_path.split('.')) == 1: - gaze_position_calibrator_module_path = f'argaze.GazeAnalysis.{gaze_position_calibrator_module_path}' - - gaze_position_calibrator_module = importlib.import_module(gaze_position_calibrator_module_path) - return gaze_position_calibrator_module.GazePositionCalibrator(**gaze_position_calibrator_parameters) - - @classmethod - def from_json(self, json_filepath: str) -> GazePositionCalibratorType: - """Load calibrator from .json file.""" - - # Remember file path to ease rewriting - self.__json_filepath = json_filepath - - # Open file - with open(self.__json_filepath) as calibration_file: + def __init__(self): - return GazePositionCalibrator.from_dict(json.load(calibration_file)) + super().__init__() - def store(self, timestamp: int|float, observed_gaze_position: GazePosition, expected_gaze_position: GazePosition): + def store(self, observed_gaze_position: GazePosition, expected_gaze_position: GazePosition): """Store observed and expected gaze positions. Parameters: - timestamp: time of observed gaze position observed_gaze_position: where gaze position actually is expected_gaze_position: where gaze position should be """ @@ -366,8 +340,7 @@ class GazePositionCalibrator(): raise NotImplementedError('draw() method not implemented') - @property - def calibrating(self) -> bool: + def is_calibrating(self) -> bool: """Is the calibration running?""" raise NotImplementedError('ready getter not implemented') @@ -415,8 +388,7 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): """Block gaze movement timestamp setting.""" raise('GazeMovement timestamp is first positon timestamp.') - @property - def finished(self) -> bool: + def is_finished(self) -> bool: """Is the movement finished?""" return self.__finished @@ -446,7 +418,7 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): if self: - output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.finished}' + output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.is_finished()}' for position in self: diff --git a/src/argaze/utils/demo_data/demo_frame_logger.py b/src/argaze/utils/demo_data/demo_frame_logger.py index 57bc149..2bb4bdc 100644 --- a/src/argaze/utils/demo_data/demo_frame_logger.py +++ b/src/argaze/utils/demo_data/demo_frame_logger.py @@ -16,7 +16,7 @@ class FixationLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.FileWriter """Log fixations.""" # Log fixations - if GazeFeatures.is_fixation(frame.last_gaze_movement) and frame.last_gaze_movement.finished: + if GazeFeatures.is_fixation(frame.last_gaze_movement) and frame.last_gaze_movement.is_finished(): log = ( timestamp, @@ -32,7 +32,7 @@ class ScanPathAnalysisLogger(DataFeatures.PipelineStepObserver, UtilsFeatures.Fi def on_look(self, timestamp, frame, exception): """Log scan path metrics.""" - if frame.analysis_available: + if frame.is_analysis_available(): log = ( timestamp, diff --git a/src/argaze/utils/demo_data/demo_layer_logger.py b/src/argaze/utils/demo_data/demo_layer_logger.py index 47cf577..d702ac1 100644 --- a/src/argaze/utils/demo_data/demo_layer_logger.py +++ b/src/argaze/utils/demo_data/demo_layer_logger.py @@ -15,7 +15,7 @@ class AOIScanPathAnalysisLogger(DataFeatures.PipelineStepObserver, UtilsFeatures def on_look(self, timestamp, layer, exception): """Log aoi scan path metrics""" - if layer.analysis_available: + if layer.is_analysis_available(): log = ( timestamp, |