From 5f915a84f32405dc8bddae4ecbf95f4745af6fbc Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 28 Feb 2024 13:57:31 +0100 Subject: More work on TimestampedGazePositions and GazeMovements. --- .../DispersionThresholdIdentification.py | 4 +- .../VelocityThresholdIdentification.py | 4 +- src/argaze.test/GazeFeatures.py | 122 +++++++------- src/argaze/ArFeatures.py | 6 +- src/argaze/DataFeatures.py | 5 +- .../DispersionThresholdIdentification.py | 48 +++--- src/argaze/GazeFeatures.py | 178 +++++++++++---------- 7 files changed, 184 insertions(+), 183 deletions(-) (limited to 'src') diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index b7475b5..f0d286a 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -47,7 +47,7 @@ def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time else: - gaze_position = GazeFeatures.UnvalidGazePosition() + gaze_position = GazeFeatures.GazePosition() # Store gaze position ts = time.time() - start_time + start_ts @@ -85,7 +85,7 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl else: - gaze_position = GazeFeatures.UnvalidGazePosition() + gaze_position = GazeFeatures.GazePosition() # Store gaze position ts = time.time() - start_time + start_ts diff --git a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py index 425d592..24f2e3c 100644 --- a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py @@ -53,7 +53,7 @@ def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float, else: - gaze_position = GazeFeatures.UnvalidGazePosition() + gaze_position = GazeFeatures.GazePosition() # Store gaze position ts = time.time() - start_time + start_ts @@ -91,7 +91,7 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl else: - gaze_position = GazeFeatures.UnvalidGazePosition() + gaze_position = GazeFeatures.GazePosition() # Store gaze position ts = time.time() - start_time + start_ts diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index fdc140d..7d18976 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -39,28 +39,27 @@ def random_gaze_positions(size, frame_dimension: tuple[float, float] = (1, 1)): return ts_gaze_positions -@dataclass(frozen=True) class TestFixation(GazeFeatures.Fixation): """Define basic fixation class for test.""" - def __post_init__(self): + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): - super().__post_init__() + super().__init__(positions, finished, message, **kwargs) - points = self.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)]) + if positions: - # Update frozen focus attribute using centroid - object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1])) + positions_array = numpy.asarray(self.values()) + centroid = numpy.mean(positions_array, axis=0) + + # Update focus attribute using centroid + self.focus = (centroid[0], centroid[1]) -@dataclass(frozen=True) class TestSaccade(GazeFeatures.Saccade): """Define basic saccade for test.""" - def __post_init__(self): - super().__post_init__() + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): + + super().__init__(positions, finished, message, **kwargs) class TestGazePositionClass(unittest.TestCase): """Test GazePosition class.""" @@ -265,11 +264,11 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase): self.assertEqual(ts_gaze_positions[0].precision, 15) self.assertEqual(bool(ts_gaze_positions[0]), True) - # Check third gaze position is correctly stored and accessible as a UnvalidGazePosition + # Check third gaze position is correctly stored and accessible as a GazePosition self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition) self.assertEqual(numpy.isnan(ts_gaze_positions[2].precision), True) self.assertEqual(bool(ts_gaze_positions[2]), False) - + def test_as_dataframe(self): """Test inherited as_dataframe method.""" @@ -309,19 +308,19 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase): class TestGazeMovementClass(unittest.TestCase): """Test GazeMovement class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test GazeMovement creation.""" abstract_gaze_movement = GazeFeatures.GazeMovement(random_gaze_positions(0)) # Check abstract GazeMovement - self.assertEqual(len(abstract_gaze_movement.positions), 0) - self.assertEqual(abstract_gaze_movement.duration, -1) - self.assertEqual(abstract_gaze_movement.amplitude, -1) - self.assertEqual(abstract_gaze_movement.valid, False) + self.assertEqual(len(abstract_gaze_movement), 0) + self.assertEqual(abstract_gaze_movement.duration, 0) + self.assertEqual(abstract_gaze_movement.amplitude, 0) + self.assertEqual(bool(abstract_gaze_movement), False) self.assertEqual(abstract_gaze_movement.finished, False) - @unittest.skip("DEBUG") + def test_finish(self): """Test GazeMovement finishing.""" @@ -340,25 +339,22 @@ class TestGazeMovementClass(unittest.TestCase): self.assertEqual(abstract_gaze_movement.finished, True) self.assertEqual(abstract_gaze_movement_ref.finished, True) -class TestUnvalidGazeMovementClass(unittest.TestCase): - """Test UnvalidGazeMovement class.""" - @unittest.skip("DEBUG") - def test_new(self): - """Test UnvalidGazeMovement creation.""" + def test_message(self): + """Test GazeMovement creation with message only.""" - unvalid_gaze_movement = GazeFeatures.UnvalidGazeMovement('test') + gaze_movement = GazeFeatures.GazeMovement(message='test') - # Check UnvalidGazeMovement - self.assertEqual(len(unvalid_gaze_movement.positions), 0) - self.assertEqual(unvalid_gaze_movement.duration, -1) - self.assertEqual(unvalid_gaze_movement.amplitude, -1) - self.assertEqual(unvalid_gaze_movement.valid, False) - self.assertEqual(unvalid_gaze_movement.finished, False) - self.assertEqual(unvalid_gaze_movement.message, 'test') + # Check GazeMovement + self.assertEqual(len(gaze_movement), 0) + self.assertEqual(gaze_movement.duration, 0) + self.assertEqual(gaze_movement.amplitude, 0) + self.assertEqual(bool(gaze_movement), False) + self.assertEqual(gaze_movement.finished, False) + self.assertEqual(gaze_movement.message, 'test') class TestScanStepClass(unittest.TestCase): """Test ScanStep class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test ScanStep creation.""" @@ -371,7 +367,7 @@ class TestScanStepClass(unittest.TestCase): self.assertEqual(scan_step.first_fixation, fixation) self.assertEqual(scan_step.last_saccade, saccade) self.assertGreater(scan_step.duration, 0) - + def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)): """Build scan path""" @@ -380,18 +376,18 @@ def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)): for i in range(size): fixation = TestFixation(random_gaze_positions(10, frame_dimension)) - ts, _ = fixation.positions.first + ts, _ = fixation.first scan_path.append_fixation(ts, fixation) saccade = TestSaccade(random_gaze_positions(2, frame_dimension)) - ts, _ = saccade.positions.first + ts, _ = saccade.first scan_path.append_saccade(ts, saccade) return scan_path class TestScanPathClass(unittest.TestCase): """Test ScanPath class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test ScanPath creation.""" @@ -400,7 +396,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(len(scan_path), 0) self.assertEqual(scan_path.duration, 0) - @unittest.skip("DEBUG") + def test_append(self): """Test ScanPath append methods.""" @@ -408,7 +404,7 @@ class TestScanPathClass(unittest.TestCase): # Append a saccade that should be ignored saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.positions.first + ts = saccade[0].timestamp new_step = scan_path.append_saccade(ts, saccade) @@ -419,7 +415,7 @@ class TestScanPathClass(unittest.TestCase): # Append first fixation fixation_A = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_A.positions.first + ts = fixation_A[0].timestamp new_step = scan_path.append_fixation(ts, fixation_A) @@ -430,7 +426,7 @@ class TestScanPathClass(unittest.TestCase): # Append consecutive saccade saccade_A = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade_A.positions.first + ts = saccade_A[0].timestamp new_step_A = scan_path.append_saccade(ts, saccade_A) @@ -443,7 +439,7 @@ class TestScanPathClass(unittest.TestCase): # Append 2 consecutive fixations then a saccade fixation_B1 = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_B1.positions.first + ts = fixation_B1[0].timestamp new_step = scan_path.append_fixation(ts, fixation_B1) @@ -453,7 +449,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(new_step, None) fixation_B2 = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_B2.positions.first + ts = fixation_B2[0].timestamp new_step = scan_path.append_fixation(ts, fixation_B2) @@ -463,7 +459,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(new_step, None) saccade_B = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade_B.positions.first + ts = saccade_B[0].timestamp new_step_B = scan_path.append_saccade(ts, saccade_B) @@ -476,19 +472,19 @@ class TestScanPathClass(unittest.TestCase): class TestAOIScanStepClass(unittest.TestCase): """Test AOIScanStep class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test AOIScanStep creation.""" movements = GazeFeatures.TimeStampedGazeMovements() fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first - movements[ts] = fixation + ts = fixation[0].timestamp + movements.append(fixation) saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.positions.first - movements[ts] = saccade + ts = saccade[0].timestamp + movements.append(saccade) aoi_scan_step = GazeFeatures.AOIScanStep(movements, 'Test') @@ -505,12 +501,12 @@ class TestAOIScanStepClass(unittest.TestCase): movements = GazeFeatures.TimeStampedGazeMovements() saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.positions.first - movements[ts] = saccade + ts = saccade[0].timestamp + movements.append(saccade) fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first - movements[ts] = fixation + ts = fixation[0].timestamp + movements.append(fixation) # Check that aoi scan step creation fail with self.assertRaises(GazeFeatures.AOIScanStepError): @@ -528,11 +524,11 @@ def build_aoi_scan_path(expected_aoi, aoi_path): for aoi in aoi_path: fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first + ts, _ = fixation.first aoi_scan_path.append_fixation(ts, fixation, aoi) saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.positions.first + ts, _ = saccade.first aoi_scan_path.append_saccade(ts, saccade) return aoi_scan_path @@ -555,7 +551,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on A aoi fixation_A = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_A.positions.first + ts, _ = fixation_A.first new_step = aoi_scan_path.append_fixation(ts, fixation_A, 'Foo') @@ -566,7 +562,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append saccade saccade_A = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade_A.positions.first + ts, _ = saccade_A.first new_step = aoi_scan_path.append_saccade(ts, saccade_A) @@ -577,7 +573,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on B aoi fixation_B = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_B.positions.first + ts, _ = fixation_B.first new_step_A = aoi_scan_path.append_fixation(ts, fixation_B, 'Bar') @@ -588,8 +584,8 @@ class TestAOIScanPathClass(unittest.TestCase): self.assertEqual(new_step_A.aoi, 'Foo') self.assertEqual(new_step_A.letter, 'A') - first_ts, _ = fixation_A.positions.first - last_ts, _ = saccade_A.positions.last + first_ts, _ = fixation_A.first + last_ts, _ = saccade_A.last self.assertEqual(new_step_A.duration, last_ts - first_ts) @@ -603,7 +599,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on A aoi fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first + ts, _ = fixation.first new_step = aoi_scan_path.append_fixation(ts, fixation, 'Foo') @@ -613,7 +609,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on B aoi fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first + ts, _ = fixation.first # Check that aoi scan step creation fail when fixation is appened after another fixation with self.assertRaises(GazeFeatures.AOIScanStepError): diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 5fcc990..47a91e9 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -384,7 +384,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): ) @DataFeatures.PipelineStepMethod - def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): + def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): """ Project timestamped gaze movement into layer. @@ -531,7 +531,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__layers = layers self.__image_parameters = image_parameters - self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() + self.__calibrated_gaze_position = GazeFeatures.GazePosition() self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() self.__scan_path_analyzed = False @@ -875,7 +875,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): ) @DataFeatures.PipelineStepMethod - def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]: + def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()) -> Iterator[Union[object, type, dict]]: """ Project timestamped gaze position into frame. diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 931c21d..7c53c2a 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -133,7 +133,6 @@ class TimeStampedObjectsList(list): def __init__(self, ts_object_type: type, ts_objects: list = []): - super().__init__() self.__object_type = ts_object_type self.__object_properties = properties(self.__object_type) @@ -384,11 +383,11 @@ class TimestampedObject(): def untimestamp(self): """Reset object timestamp.""" - self._timestamp = math.nan + self.timestamp = math.nan def is_timestamped(self) -> bool: """Is the object timestamped?""" - timestamped = not math.isnan(self._timestamp) + timestamped = not math.isnan(self.timestamp) return timestamped diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index 6f8c554..f8e519f 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -37,17 +37,15 @@ class Fixation(GazeFeatures.Fixation): super().__post_init__() - points = self.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)]) - deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) + positions_array = numpy.asarray(self.positions.values()) + centroid = numpy.mean(positions_array, axis=0) + deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) # Update frozen focus attribute using centroid - object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1])) + object.__setattr__(self, 'focus', (centroid[0], centroid[1])) # Update frozen deviation_max attribute - object.__setattr__(self, 'deviation_max', max(deviations_array)) + object.__setattr__(self, 'deviation_max', deviations_array.max()) def point_deviation(self, gaze_position) -> float: """Get distance of a point from the fixation's centroïd.""" @@ -57,13 +55,11 @@ class Fixation(GazeFeatures.Fixation): def overlap(self, fixation) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" - points = fixation.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([self.focus[0], self.focus[1]]) - deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) + positions_array = numpy.asarray(self.positions.values()) + centroid = numpy.array(list(self.focus)) + deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) - return min(deviations_array) <= self.deviation_max + return deviations_array.min() <= self.deviation_max def merge(self, fixation) -> FixationType: """Merge another fixation into this fixation.""" @@ -114,8 +110,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - _, start_position = self.positions.first - _, last_position = self.positions.last + _, start_position = self.positions[0] + _, last_position = self.positions[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @@ -148,14 +144,14 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType: # Ignore non valid gaze position - if not gaze_position.valid: + if not gaze_position: return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish() # Check if too much time elapsed since last valid gaze position if len(self.__valid_positions) > 0: - ts_last, _ = self.__valid_positions.last + ts_last, _ = self.__valid_positions[-1] if (ts - ts_last) > self.duration_min_threshold: @@ -168,16 +164,16 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Store valid gaze position - self.__valid_positions[ts] = gaze_position + self.__valid_positions.append(gaze_position) # Return last valid movement if exist return last_movement # Store gaze positions until a minimal duration - self.__valid_positions[ts] = gaze_position + self.__valid_positions.append(gaze_position) - first_ts, _ = self.__valid_positions.first - last_ts, _ = self.__valid_positions.last + first_ts, _ = self.__valid_positions[0] + last_ts, _ = self.__valid_positions[-1] # Once the minimal duration is reached if last_ts - first_ts >= self.duration_min_threshold: @@ -194,8 +190,8 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): if len(self.__saccade_positions) > 0: # Copy oldest valid position into saccade positions - first_ts, first_position = self.__valid_positions.first - self.__saccade_positions[first_ts] = first_position + first_ts, first_position = self.__valid_positions[0] + self.__saccade_positions.append(first_position) # Finish last saccade last_saccade = self.current_saccade.finish() @@ -218,8 +214,8 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): if len(self.__fixation_positions) > 0: # Copy most recent fixation position into saccade positions - last_ts, last_position = self.__fixation_positions.last - self.__saccade_positions[last_ts] = last_position + last_ts, last_position = self.__fixation_positions[-1] + self.__saccade_positions.append(last_position) # Finish last fixation last_fixation = self.current_fixation.finish() @@ -238,7 +234,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Move oldest valid position into saccade positions first_ts, first_position = self.__valid_positions.pop_first() - self.__saccade_positions[first_ts] = first_position + self.__saccade_positions.append(first_position) # Always return unvalid gaze movement at least return GazeFeatures.UnvalidGazeMovement() diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 1c27540..54784ac 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -17,6 +17,7 @@ from inspect import getmembers from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures +from argaze.utils import UtilsFeatures # DEBUG import numpy import pandas @@ -161,7 +162,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True): """Draw gaze position point and precision circle.""" - if self.valid: + if self: int_value = (int(self[0]), int(self[1])) @@ -170,7 +171,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): cv2.circle(image, int_value, size, color, -1) # Draw precision circle - if self.__precision > 0 and draw_precision: + if self.__precision is not None and draw_precision: cv2.circle(image, int_value, round(self.__precision), color, 1) TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeStampedGazePositions") @@ -179,9 +180,14 @@ TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeSt class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList): """Handle timestamped gaze positions into a list""" + #@UtilsFeatures.PrintCallStack def __init__(self, gaze_positions: list = []): - super().__init__(GazePosition, gaze_positions) + DataFeatures.TimeStampedObjectsList.__init__(self, GazePosition, gaze_positions) + + def values(self) -> list: + """Get all timestamped position values as list of tuple.""" + return [tuple(ts_position) for ts_position in self] @classmethod def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType: @@ -206,29 +212,24 @@ class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList): """ # Copy columns - if precision: + columns = (timestamp, x, y) - df = dataframe.loc[:, (timestamp, x, y, precision)] + if precision is not None: - else: + columns += (precision,) + + if message is not None: - df = dataframe.loc[:, (timestamp, x, y)] + columns += (message,) - # DEBUG - print(df) + df = dataframe.loc[:, columns] # Merge x and y columns into one 'value' column df['value'] = tuple(zip(df[x], df[y])) df.drop(columns=[x, y], inplace=True, axis=1) - # DEBUG - print(df) - - # Replace (NaN, NaN) values by () - df['value'] = df.apply(lambda row: print(row.values[1], pandas.isnull(row.values[1])), axis=True) - - # DEBUG - print(df) + # Replace tuple values containing NaN values by () + df['value'] = df.apply(lambda row: () if pandas.isnull(list(row.value)).any() else row.value, axis=True) # Handle precision data if precision: @@ -358,58 +359,85 @@ class GazePositionCalibrator(): GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement") # Type definition for type annotation convenience -@dataclass(frozen=True) -class GazeMovement(DataFeatures.TimestampedObject): - """Define abstract gaze movement class as a buffer of timestamped positions.""" +class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): + """Define abstract gaze movement class as timestamped gaze positions list. - positions: TimeStampedGazePositions - """All timestamp gaze positions.""" + Parameters: + positions: timestamp gaze positions. + finished: is the movement finished? + message: a string to describe why the movement is what it is. + """ - duration: float = field(init=False) - """Inferred duration from first and last timestamps.""" + def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): - amplitude: float = field(init=False) - """Inferred amplitude from first and last positions.""" + return TimeStampedGazePositions.__new__(cls, positions) - finished: bool = field(init=False, default=False) - """Is the movement finished?""" + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + """Initialize GazeMovement""" - def __post_init__(self): + TimeStampedGazePositions.__init__(self, positions) + DataFeatures.TimestampedObject.__init__(self, **kwargs) - if self.valid: + self.__finished = finished + self.__message = message - start_position_ts, start_position = self.positions.first - end_position_ts, end_position = self.positions.last + @property + def timestamp(self) -> int|float: + """Get first position timestamp.""" + return self[0].timestamp - # Update frozen duration attribute - object.__setattr__(self, 'duration', end_position_ts - start_position_ts) + @timestamp.setter + def timestamp(self, timestamp: int|float): + """Block gaze movment timestamp setting.""" + raise('GazeMovement timestamp is first positon timestamp.') - _, start_position = self.positions.first - _, end_position = self.positions.last + @property + def finished(self) -> bool: + """Is the movement finished?""" + return self.__finished + + def finish(self) -> GazeMovementType: + """Set gaze movement as finished""" + self.__finished = True + return self + + @property + def message(self): + """Get movement's message.""" + return self.__message - amplitude = numpy.linalg.norm(start_position - end_position) + @property + def duration(self): + """Get inferred duration from first and last timestamps.""" + if self: - # Update frozen amplitude attribute - object.__setattr__(self, 'amplitude', amplitude) + return self[-1].timestamp - self[0].timestamp else: - # Update frozen duration attribute - object.__setattr__(self, 'duration', -1) + return 0 + + @property + def amplitude(self): + """Get inferred amplitude from first and last positions.""" + if self: - # Update frozen amplitude attribute - object.__setattr__(self, 'amplitude', -1) + return numpy.linalg.norm(self[0] - self[-1]) + + else: + + return 0 def __str__(self) -> str: """String display""" - if self.valid: + if self: - output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self.positions)}\n\tfinished={self.finished}' + output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.finished}' - for ts, position in self.positions.items(): + for position in self: - output += f'\n\t{ts}:\n\t\tvalue={position},\n\t\tprecision={position.precision}' + output += f'\n\t{position.timestamp}:\n\t\tvalue={position},\n\t\tprecision={position.precision}' else: @@ -417,20 +445,6 @@ class GazeMovement(DataFeatures.TimestampedObject): return output - @property - def valid(self) -> bool: - """Is there positions?""" - - return len(self.positions) > 0 - - def finish(self) -> GazeMovementType: - """Set gaze movement as finished""" - - # Update frozen finished attribute - object.__setattr__(self, 'finished', True) - - return self - def draw_positions(self, image: numpy.array, position_color: tuple = None, line_color: tuple = None): """Draw gaze movement positions with line between each position. @@ -439,12 +453,12 @@ class GazeMovement(DataFeatures.TimestampedObject): line_color: color of line between each position """ - gaze_positions = self.positions.copy() + positions = self.copy() - while len(gaze_positions) >= 2: + while len(positions) >= 2: - ts_start, start_gaze_position = gaze_positions.pop_first() - ts_next, next_gaze_position = gaze_positions.first + start_gaze_position = positions.pop(0) + next_gaze_position = positions[0] # Draw line between positions if required if line_color is not None: @@ -461,31 +475,27 @@ class GazeMovement(DataFeatures.TimestampedObject): raise NotImplementedError('draw() method not implemented') -class UnvalidGazeMovement(GazeMovement): - """Unvalid gaze movement.""" - - def __init__(self, message=None): - - self.message = message - - super().__init__(TimeStampedGazePositions()) - - def draw(self, image: numpy.array, **kwargs): - - pass - FixationType = TypeVar('Fixation', bound="Fixation") # Type definition for type annotation convenience class Fixation(GazeMovement): """Define abstract fixation as gaze movement.""" - focus: tuple = field(init=False) - """Representative position of the fixation.""" + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): - def __post_init__(self): + super().__init__(positions, finished, message, **kwargs) + + self._focus = () - super().__post_init__() + @property + def focus(self) -> tuple: + """Get representative position of the fixation.""" + return self._focus + + @focus.setter + def focus(self, focus: tuple): + """Set representative position of the fixation.""" + self._focus = focus def merge(self, fixation) -> FixationType: """Merge another fixation into this fixation.""" @@ -500,9 +510,9 @@ def is_fixation(gaze_movement): class Saccade(GazeMovement): """Define abstract saccade as gaze movement.""" - def __post_init__(self): + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): - super().__post_init__() + super().__init__(positions, finished, message, **kwargs) def is_saccade(gaze_movement): """Is a gaze movement a saccade?""" -- cgit v1.1