From da307162ca85bcd6433058528b9bd48de2ccaf93 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Mon, 11 Mar 2024 12:11:08 +0100 Subject: Making timestamp as an optional PipelineStepMethod parameter required if the first parameter is not a TimestampedObject. --- .../advanced_topics/scripting.md | 2 +- .../aruco_markers_pipeline/aoi_3d_frame.md | 4 +- .../configuration_and_execution.md | 2 +- .../configuration_and_execution.md | 2 +- .../timestamped_gaze_positions_edition.md | 20 +++---- src/argaze/ArFeatures.py | 63 +++++++++------------- src/argaze/ArUcoMarkers/ArUcoCamera.py | 19 +++---- src/argaze/ArUcoMarkers/ArUcoDetector.py | 2 +- src/argaze/ArUcoMarkers/ArUcoScene.py | 3 +- src/argaze/AreaOfInterest/AOIFeatures.py | 2 +- src/argaze/DataFeatures.py | 14 +++-- src/argaze/GazeAnalysis/Basic.py | 4 +- src/argaze/GazeAnalysis/DeviationCircleCoverage.py | 2 +- .../DispersionThresholdIdentification.py | 4 +- src/argaze/GazeAnalysis/Entropy.py | 2 +- src/argaze/GazeAnalysis/ExploreExploitRatio.py | 2 +- src/argaze/GazeAnalysis/FocusPointInside.py | 2 +- src/argaze/GazeAnalysis/KCoefficient.py | 4 +- src/argaze/GazeAnalysis/LempelZivComplexity.py | 2 +- src/argaze/GazeAnalysis/NGram.py | 2 +- src/argaze/GazeAnalysis/NearestNeighborIndex.py | 2 +- src/argaze/GazeAnalysis/TransitionMatrix.py | 2 +- .../VelocityThresholdIdentification.py | 10 ++-- src/argaze/GazeFeatures.py | 47 ++++++++++------ src/argaze/PupillAnalysis/WorkloadIndex.py | 2 +- src/argaze/PupillFeatures.py | 2 +- src/argaze/utils/demo_aruco_markers_run.py | 18 +++---- src/argaze/utils/demo_gaze_analysis_run.py | 13 ++--- 28 files changed, 121 insertions(+), 132 deletions(-) diff --git a/docs/user_guide/aruco_markers_pipeline/advanced_topics/scripting.md b/docs/user_guide/aruco_markers_pipeline/advanced_topics/scripting.md index 99f52ee..30787b5 100644 --- a/docs/user_guide/aruco_markers_pipeline/advanced_topics/scripting.md +++ b/docs/user_guide/aruco_markers_pipeline/advanced_topics/scripting.md @@ -81,7 +81,7 @@ for name, aruco_scene in aruco_camera.scenes.items(): try: # Watch image with ArUco camera - aruco_camera.watch(timestamp, image) + aruco_camera.watch(image, timestamp=timestamp) # Do something with pipeline exception except Exception as e: diff --git a/docs/user_guide/aruco_markers_pipeline/aoi_3d_frame.md b/docs/user_guide/aruco_markers_pipeline/aoi_3d_frame.md index 53d1ddb..cf4a07e 100644 --- a/docs/user_guide/aruco_markers_pipeline/aoi_3d_frame.md +++ b/docs/user_guide/aruco_markers_pipeline/aoi_3d_frame.md @@ -105,10 +105,10 @@ After camera image is passed to [ArUcoCamera.watch](../../argaze.md/#argaze.ArFe ...: # Detect ArUco markers, estimate scene pose then, project 3D AOI into camera frame - aruco_camera.watch(timestamp, image) + aruco_camera.watch(image, timestamp=timestamp) # Map watched image into ArUcoScenes frames background - aruco_camera.map(timestamp) + aruco_camera.map(timestamp=timestamp) ``` ### Analyse timestamped gaze positions into ArUcoScenes frames diff --git a/docs/user_guide/aruco_markers_pipeline/configuration_and_execution.md b/docs/user_guide/aruco_markers_pipeline/configuration_and_execution.md index 4f05beb..0349a91 100644 --- a/docs/user_guide/aruco_markers_pipeline/configuration_and_execution.md +++ b/docs/user_guide/aruco_markers_pipeline/configuration_and_execution.md @@ -110,7 +110,7 @@ Pass each camera image to [ArUcoCamera.watch](../../argaze.md/#argaze.ArFeatures try: # Detect ArUco markers, estimate scene pose then, project 3D AOI into camera frame - aruco_camera.watch(timestamp, image) + aruco_camera.watch(image, timestamp=timestamp) # Do something with pipeline exception except Exception as e: diff --git a/docs/user_guide/gaze_analysis_pipeline/configuration_and_execution.md b/docs/user_guide/gaze_analysis_pipeline/configuration_and_execution.md index de23713..633b736 100644 --- a/docs/user_guide/gaze_analysis_pipeline/configuration_and_execution.md +++ b/docs/user_guide/gaze_analysis_pipeline/configuration_and_execution.md @@ -110,7 +110,7 @@ Timestamped gaze positions have to be passed one by one to [ArFrame.look](../../ try: # Look ArFrame at a timestamped gaze position - ar_frame.look(timestamp, gaze_position) + ar_frame.look(gaze_position) # Do something with pipeline exception except Exception as e: diff --git a/docs/user_guide/gaze_analysis_pipeline/timestamped_gaze_positions_edition.md b/docs/user_guide/gaze_analysis_pipeline/timestamped_gaze_positions_edition.md index 2156f3b..4c53258 100644 --- a/docs/user_guide/gaze_analysis_pipeline/timestamped_gaze_positions_edition.md +++ b/docs/user_guide/gaze_analysis_pipeline/timestamped_gaze_positions_edition.md @@ -5,7 +5,7 @@ Whatever eye data comes from a file on disk or from a live stream, timestamped g ![Timestamped gaze positions](../../img/timestamped_gaze_positions.png) -## Import gaze positions from CSV file +## Import timestamped gaze positions from CSV file It is possible to load timestamped gaze positions from a [Pandas DataFrame](https://pandas.pydata.org/docs/getting_started/intro_tutorials/01_table_oriented.html#min-tut-01-tableoriented) object which can be loaded from a CSV file. @@ -20,13 +20,13 @@ dataframe = pandas.read_csv('gaze_positions.csv', delimiter=",", low_memory=Fals ts_gaze_positions = GazeFeatures.TimeStampedGazePositions.from_dataframe(dataframe, timestamp = 'Recording timestamp [ms]', x = 'Gaze point X [px]', y = 'Gaze point Y [px]') # Iterate over timestamped gaze positions -for timestamp, gaze_position in ts_gaze_positions.items(): +for gaze_position in ts_gaze_positions: # Do something with each timestamped gaze position ... ``` -## Edit gaze positions from live stream +## Edit timestamped gaze positions from live stream When gaze positions comes from a real time input, gaze position can be edited thanks to [GazePosition](../../argaze.md/#argaze.GazeFeatures.GazePosition) class. Besides, timestamps can be edited from the incoming data stream or, if not available, they can be edited thanks to the python [time package](https://docs.python.org/3/library/time.html). @@ -37,11 +37,8 @@ from argaze import GazeFeatures # Assuming to be inside the function where timestamp_µs, gaze_x and gaze_y values are catched ... - # Edit a second timestamp from a microsecond second timestamp - timestamp = timestamp_µs * 1e-6 - - # Define a basic gaze position - gaze_position = GazeFeatures.GazePosition((gaze_x, gaze_y)) + # Define a basic gaze position converting microsecond timestamp into second timestamp + gaze_position = GazeFeatures.GazePosition((gaze_x, gaze_y), timestamp=timestamp_µs * 1e-6) # Do something with each timestamped gaze position ... @@ -58,11 +55,8 @@ start_time = time.time() # Assuming to be inside the function where only gaze_x and gaze_y values are catched (no timestamp) ... - # Edit a millisecond timestamp - timestamp = int((time.time() - start_time) * 1e3) - - # Define a basic gaze position - gaze_position = GazeFeatures.GazePosition((gaze_x, gaze_y)) + # Define a basic gaze position with millisecond timestamp + gaze_position = GazeFeatures.GazePosition((gaze_x, gaze_y), timestamp=int((time.time() - start_time) * 1e3)) # Do something with each timestamped gaze position ... diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 8889155..fb22afa 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -384,7 +384,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): ) @DataFeatures.PipelineStepMethod - def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): + def look(self, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): """ Project timestamped gaze movement into layer. @@ -392,7 +392,6 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Be aware that gaze movement positions are in the same range of value than aoi_scene size attribute. Parameters: - timestamp: method call timestamp (unit does'nt matter) gaze_movement: gaze movement to project """ @@ -412,7 +411,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Update looked aoi thanks to aoi matcher # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally - self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement) + self.__looked_aoi_name, _ = self.__aoi_matcher.match(self.__aoi_scene, gaze_movement, timestamp=gaze_movement.timestamp) # Valid and finished gaze movement has been identified if gaze_movement and gaze_movement.finished: @@ -431,7 +430,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Analyze aoi scan path for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.__aoi_scan_path_analyzers.items(): - aoi_scan_path_analyzer.analyze(timestamp, self.__aoi_scan_path) + aoi_scan_path_analyzer.analyze(self.__aoi_scan_path, timestamp=gaze_movement.timestamp) # Update aoi scan path analyzed state self.__aoi_scan_path_analyzed = True @@ -875,24 +874,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): ) @DataFeatures.PipelineStepMethod - def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()) -> Iterator[Union[object, type, dict]]: + def look(self, timestamped_gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()) -> Iterator[Union[object, type, dict]]: """ Project timestamped gaze position into frame. !!! warning Be aware that gaze positions are in the same range of value than size attribute. - !!! note - This method timestamps incoming gaze position. - Parameters: - timestamp: method call timestamp (unit does'nt matter) - gaze_position: gaze position to project + timestamped_gaze_position: gaze position to project """ - # Timestamp gaze position - gaze_position.timestamp = timestamp - # Use frame lock feature with self._lock: @@ -905,18 +897,18 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Apply gaze position calibration if self.__gaze_position_calibrator is not None: - self.__calibrated_gaze_position = self.__gaze_position_calibrator.apply(gaze_position) + self.__calibrated_gaze_position = self.__gaze_position_calibrator.apply(timestamped_gaze_position) # Or update gaze position at least else: - self.__calibrated_gaze_position = gaze_position + self.__calibrated_gaze_position = timestamped_gaze_position # Identify gaze movement if self.__gaze_movement_identifier is not None: # Identify finished gaze movement - self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) + self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified if self.__identified_gaze_movement and self.__identified_gaze_movement.finished: @@ -941,7 +933,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Analyze aoi scan path for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items(): - scan_path_analyzer.analyze(timestamp, self.__scan_path) + scan_path_analyzer.analyze(self.__scan_path, timestamp=self.__identified_gaze_movement.timestamp) # Update scan path analyzed state self.__scan_path_analyzed = True @@ -958,13 +950,13 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image - self.__heatmap.update(timestamp, self.__calibrated_gaze_position * scale) + self.__heatmap.update(self.__calibrated_gaze_position * scale, timestamp=self.__calibrated_gaze_position.timestamp) # Look layers with valid identified gaze movement # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally for layer_name, layer in self.__layers.items(): - layer.look(timestamp, self.__identified_gaze_movement) + layer.look(self.__identified_gaze_movement) def __image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ @@ -1253,11 +1245,10 @@ class ArScene(DataFeatures.PipelineStepObject): ) @DataFeatures.PipelineStepMethod - def estimate_pose(self, timestamp: int|float, detected_features: any) -> Tuple[numpy.array, numpy.array, any]: + def estimate_pose(self, detected_features: any) -> Tuple[numpy.array, numpy.array, any]: """Define abstract estimate scene pose method. Parameters: - timestamp: method call timestamp (unit does'nt matter) detected_features: any features detected by parent ArCamera that will help in scene pose estimation. Returns: @@ -1269,11 +1260,10 @@ class ArScene(DataFeatures.PipelineStepObject): raise NotImplementedError('estimate_pose() method not implemented') @DataFeatures.PipelineStepMethod - def project(self, timestamp: int|float, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]: + def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]: """Project layers according estimated pose and optional field of view clipping angles. Parameters: - timestamp: method call timestamp (unit does'nt matter) tvec: translation vector rvec: rotation vector visual_hfov: horizontal field of view clipping angle @@ -1433,32 +1423,30 @@ class ArCamera(ArFrame): } @DataFeatures.PipelineStepMethod - def watch(self, timestamp: int|float, image: numpy.array): + def watch(self, image: numpy.array): """Detect AR features from image and project scenes into camera frame. Parameters: - timestamp: method call timestamp (unit does'nt matter) image: image where to extract AR features """ raise NotImplementedError('watch() method not implemented') @DataFeatures.PipelineStepMethod - def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition): + def look(self, timestamped_gaze_position: GazeFeatures.GazePosition): """Project timestamped gaze position into each scene frames. !!! warning watch method needs to be called first. Parameters: - timestamp: method call timestamp (unit does'nt matter) - gaze_position: gaze position to project + timestamped_gaze_position: gaze position to project """ - # Project gaze position into camera frame + # Project timestamped gaze position into camera frame # NOTE: the call to super().look method uses unwrap option to disable observers notification # as they are already notified that this look method is called. Cf DataFeatures.PipelineStepMethod.wrapper. - super().look(timestamp, gaze_position, unwrap=True) + super().look(timestamped_gaze_position, unwrap=True) # Use camera frame lock feature with self._lock: @@ -1473,18 +1461,18 @@ class ArCamera(ArFrame): aoi_2d = camera_layer.aoi_scene[scene_frame.name] - if gaze_position: + if timestamped_gaze_position: # TODO?: Should we prefer to use camera frame AOIMatcher object? - if aoi_2d.contains_point(gaze_position): + if aoi_2d.contains_point(timestamped_gaze_position): - inner_x, inner_y = aoi_2d.clockwise().inner_axis(*gaze_position) + inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position) # QUESTION: How to project gaze precision? - inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) + inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp) # Project inner gaze position into scene frame - scene_frame.look(timestamp, inner_gaze_position * scene_frame.size) + scene_frame.look(inner_gaze_position * scene_frame.size) # Ignore missing aoi in camera frame layer projection except KeyError as e: @@ -1492,14 +1480,11 @@ class ArCamera(ArFrame): pass @DataFeatures.PipelineStepMethod - def map(self, timestamp: int|float): + def map(self): """Project camera frame background into scene frames background. !!! warning watch method needs to be called first. - - Parameters: - timestamp: method call timestamp (unit does'nt matter) """ # Use camera frame lock feature diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index 99f530f..c5f8892 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -132,18 +132,14 @@ class ArUcoCamera(ArFeatures.ArCamera): ) @DataFeatures.PipelineStepMethod - def watch(self, timestamp: int|float, image: numpy.array): - """Detect environment aruco markers from image and project scenes into camera frame. - - !!! note - This method timestamps camera frame and its layers. - """ + def watch(self, image: numpy.array): + """Detect environment aruco markers from image and project scenes into camera frame.""" # Use camera frame locker feature with self._lock: # Detect aruco markers - self.__aruco_detector.detect_markers(timestamp, image) + self.__aruco_detector.detect_markers(image, timestamp=self.timestamp) # Fill camera frame background with image self.background = image @@ -170,10 +166,10 @@ class ArUcoCamera(ArFeatures.ArCamera): ''' # Estimate scene pose from detected scene markers - tvec, rmat, _ = scene.estimate_pose(timestamp, self.__aruco_detector.detected_markers) + tvec, rmat, _ = scene.estimate_pose(self.__aruco_detector.detected_markers, timestamp=self.timestamp) # Project scene into camera frame according estimated pose - for layer_name, layer_projection in scene.project(timestamp, tvec, rmat, self.visual_hfov, self.visual_vfov): + for layer_name, layer_projection in scene.project(tvec, rmat, self.visual_hfov, self.visual_vfov, timestamp=self.timestamp): try: @@ -181,15 +177,12 @@ class ArUcoCamera(ArFeatures.ArCamera): self.layers[layer_name].aoi_scene |= layer_projection # Timestamp camera layer - self.layers[layer_name].timestamp = timestamp + self.layers[layer_name].timestamp = self.timestamp except KeyError: pass - # Timestamp camera frame - self.timestamp = timestamp - def __image(self, draw_detected_markers: dict = None, draw_scenes: dict = None, draw_optic_parameters_grid: dict = None, **kwargs: dict) -> numpy.array: """Get frame image with ArUco detection visualisation. diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py index ff86ee9..19b0f45 100644 --- a/src/argaze/ArUcoMarkers/ArUcoDetector.py +++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py @@ -258,7 +258,7 @@ class ArUcoDetector(DataFeatures.PipelineStepObject): ) @DataFeatures.PipelineStepMethod - def detect_markers(self, timestamp: int|float, image: numpy.array): + def detect_markers(self, image: numpy.array): """Detect all ArUco markers into an image. !!! danger "DON'T MIRROR IMAGE" diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py index 84de39e..2bec7f2 100644 --- a/src/argaze/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze/ArUcoMarkers/ArUcoScene.py @@ -100,11 +100,10 @@ class ArUcoScene(ArFeatures.ArScene): ) @DataFeatures.PipelineStepMethod - def estimate_pose(self, timestamp: int|float, detected_markers: dict) -> Tuple[numpy.array, numpy.array, dict]: + def estimate_pose(self, detected_markers: dict) -> Tuple[numpy.array, numpy.array, dict]: """Estimate scene pose from detected ArUco markers. Parameters: - timestamp: method call timestamp (unit does'nt matter) detected_markers: dictionary with all detected markers Returns: diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index dbecfc1..c7e5193 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -605,7 +605,7 @@ class Heatmap(DataFeatures.PipelineStepObject): self.__point_spread_buffer_size = self.buffer @DataFeatures.PipelineStepMethod - def update(self, timestamp: int|float, point: tuple): + def update(self, point: tuple): """Update heatmap image.""" point_spread = self.point_spread(point) diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 849601f..6d471e4 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -752,17 +752,23 @@ def PipelineStepMethod(method): PipelineStepMethod must have a timestamp as first argument. """ - def wrapper(self, timestamp, *args, unwrap: bool = False, **kwargs): + def wrapper(self, *args, timestamp: int|float = None, unwrap: bool = False, **kwargs): """Wrap pipeline step method to measure execution time. Parameters: - timestamp: PipelineStepMethod must define timestamp as first parameter. args: Any arguments defined by PipelineStepMethod. + timestamp: Optional method call timestamp (unit does'nt matter) if first args parameter is not a TimestampedObject instance. unwrap: Extra arguments used in wrapper function to call wrapped method directly. """ + if timestamp is None: + + if isinstance(args[0], TimestampedObject): + + timestamp = args[0].timestamp + if unwrap: - return method(self, timestamp, *args, **kwargs) + return method(self, *args, **kwargs) # Initialize execution time assessment start = time.perf_counter() @@ -772,7 +778,7 @@ def PipelineStepMethod(method): try: # Execute wrapped method - result = method(self, timestamp, *args, **kwargs) + result = method(self, *args, **kwargs) except Exception as e: diff --git a/src/argaze/GazeAnalysis/Basic.py b/src/argaze/GazeAnalysis/Basic.py index 54135d4..f22db56 100644 --- a/src/argaze/GazeAnalysis/Basic.py +++ b/src/argaze/GazeAnalysis/Basic.py @@ -27,7 +27,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__step_fixation_durations_average = 0 @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, scan_path: GazeFeatures.ScanPathType): + def analyze(self, scan_path: GazeFeatures.ScanPathType): self.__path_duration = scan_path.duration @@ -73,7 +73,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__aoi_fixation_distribution = {} @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, aoi_scan_path: GazeFeatures.ScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.ScanPathType): self.__path_duration = aoi_scan_path.duration diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py index 3849d59..c4c5b33 100644 --- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py +++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py @@ -44,7 +44,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__matched_region = None @DataFeatures.PipelineStepMethod - def match(self, timestamp: int|float, aoi_scene, gaze_movement) -> Tuple[str, AOIFeatures.AreaOfInterest]: + def match(self, aoi_scene, gaze_movement) -> Tuple[str, AOIFeatures.AreaOfInterest]: """Returns AOI with the maximal fixation's deviation circle coverage if above coverage threshold.""" if GazeFeatures.is_fixation(gaze_movement): diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index 2cb2acf..84d14e7 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -140,7 +140,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return self.__duration_min_threshold @DataFeatures.PipelineStepMethod - def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: + def identify(self, gaze_position, terminate=False) -> GazeMovementType: # Ignore empty gaze position if not gaze_position: @@ -152,7 +152,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): ts_last = self.__valid_positions[-1].timestamp - if (timestamp - ts_last) > self.__duration_min_threshold: + if (gaze_position.timestamp - ts_last) > self.__duration_min_threshold: # Get last movement last_movement = self.current_gaze_movement.finish() diff --git a/src/argaze/GazeAnalysis/Entropy.py b/src/argaze/GazeAnalysis/Entropy.py index fef3546..242257e 100644 --- a/src/argaze/GazeAnalysis/Entropy.py +++ b/src/argaze/GazeAnalysis/Entropy.py @@ -42,7 +42,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__transition_entropy = -1 @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, aoi_scan_path: GazeFeatures.AOIScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/ExploreExploitRatio.py b/src/argaze/GazeAnalysis/ExploreExploitRatio.py index 1f7fad0..5516349 100644 --- a/src/argaze/GazeAnalysis/ExploreExploitRatio.py +++ b/src/argaze/GazeAnalysis/ExploreExploitRatio.py @@ -34,7 +34,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__explore_exploit_ratio = 0. @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, scan_path: GazeFeatures.ScanPathType): + def analyze(self, scan_path: GazeFeatures.ScanPathType): assert(len(scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/FocusPointInside.py b/src/argaze/GazeAnalysis/FocusPointInside.py index 19b8c27..0358fae 100644 --- a/src/argaze/GazeAnalysis/FocusPointInside.py +++ b/src/argaze/GazeAnalysis/FocusPointInside.py @@ -37,7 +37,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__matched_gaze_movement = None @DataFeatures.PipelineStepMethod - def match(self, timestamp: int|float, aoi_scene, gaze_movement) -> Tuple[str, AOIFeatures.AreaOfInterest]: + def match(self, aoi_scene, gaze_movement) -> Tuple[str, AOIFeatures.AreaOfInterest]: """Returns AOI containing fixation focus point.""" if GazeFeatures.is_fixation(gaze_movement): diff --git a/src/argaze/GazeAnalysis/KCoefficient.py b/src/argaze/GazeAnalysis/KCoefficient.py index 40e3ddd..41338a3 100644 --- a/src/argaze/GazeAnalysis/KCoefficient.py +++ b/src/argaze/GazeAnalysis/KCoefficient.py @@ -31,7 +31,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__K = 0 @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, scan_path: GazeFeatures.ScanPathType): + def analyze(self, scan_path: GazeFeatures.ScanPathType): assert(len(scan_path) > 1) @@ -88,7 +88,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__K = 0 @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, aoi_scan_path: GazeFeatures.AOIScanPathType) -> float: + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType) -> float: assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/LempelZivComplexity.py b/src/argaze/GazeAnalysis/LempelZivComplexity.py index 53d4285..f6a49ab 100644 --- a/src/argaze/GazeAnalysis/LempelZivComplexity.py +++ b/src/argaze/GazeAnalysis/LempelZivComplexity.py @@ -32,7 +32,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__lempel_ziv_complexity = 0 @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, aoi_scan_path: GazeFeatures.AOIScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/NGram.py b/src/argaze/GazeAnalysis/NGram.py index 049da7d..2526123 100644 --- a/src/argaze/GazeAnalysis/NGram.py +++ b/src/argaze/GazeAnalysis/NGram.py @@ -36,7 +36,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__ngrams_count = {} @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, aoi_scan_path: GazeFeatures.AOIScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/NearestNeighborIndex.py b/src/argaze/GazeAnalysis/NearestNeighborIndex.py index e42dea2..72df516 100644 --- a/src/argaze/GazeAnalysis/NearestNeighborIndex.py +++ b/src/argaze/GazeAnalysis/NearestNeighborIndex.py @@ -36,7 +36,7 @@ class ScanPathAnalyzer(GazeFeatures.ScanPathAnalyzer): self.__nearest_neighbor_index = 0 @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, scan_path: GazeFeatures.ScanPathType): + def analyze(self, scan_path: GazeFeatures.ScanPathType): assert(len(scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/TransitionMatrix.py b/src/argaze/GazeAnalysis/TransitionMatrix.py index 5248480..d001947 100644 --- a/src/argaze/GazeAnalysis/TransitionMatrix.py +++ b/src/argaze/GazeAnalysis/TransitionMatrix.py @@ -34,7 +34,7 @@ class AOIScanPathAnalyzer(GazeFeatures.AOIScanPathAnalyzer): self.__transition_matrix_density = 0. @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, aoi_scan_path: GazeFeatures.AOIScanPathType): + def analyze(self, aoi_scan_path: GazeFeatures.AOIScanPathType): assert(len(aoi_scan_path) > 1) diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index b2e3b89..a54cee1 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -140,7 +140,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return self.__duration_min_threshold @DataFeatures.PipelineStepMethod - def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: + def identify(self, gaze_position, terminate=False) -> GazeMovementType: # Ignore empty gaze position if not gaze_position: @@ -150,16 +150,16 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Store first valid position if self.__last_ts < 0: - self.__last_ts = timestamp + self.__last_ts = gaze_position.timestamp self.__last_position = gaze_position return GazeFeatures.GazeMovement() # Check if too much time elapsed since last gaze position - if (timestamp - self.__last_ts) > self.duration_min_threshold: + if (gaze_position.timestamp - self.__last_ts) > self.duration_min_threshold: # Remember last position - self.__last_ts = timestamp + self.__last_ts = gaze_position.timestamp self.__last_position = gaze_position # Get last movement @@ -176,7 +176,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): velocity = abs(gaze_position.distance(self.__last_position) / (timestamp - self.__last_ts)) # Remember last position - self.__last_ts = timestamp + self.__last_ts = gaze_position.timestamp self.__last_position = gaze_position # Velocity is greater than threshold diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index fea1331..fb56935 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -85,14 +85,17 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): !!! note The returned position precision is the maximal precision. + + !!! note + The returned position timestamp is the self object timestamp. """ if self.__precision is not None and position.precision is not None: - return GazePosition(numpy.array(self) + numpy.array(position), precision = max(self.__precision, position.precision)) + return GazePosition(numpy.array(self) + numpy.array(position), precision = max(self.__precision, position.precision), timestamp=self.timestamp) else: - return GazePosition(numpy.array(self) + numpy.array(position)) + return GazePosition(numpy.array(self) + numpy.array(position), timestamp=self.timestamp) __radd__ = __add__ @@ -101,44 +104,56 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): !!! note The returned position precision is the maximal precision. + + !!! note + The returned position timestamp is the self object timestamp. """ if self.__precision is not None and position.precision is not None: - return GazePosition(numpy.array(self) - numpy.array(position), precision = max(self.__precision, position.precision)) + return GazePosition(numpy.array(self) - numpy.array(position), precision = max(self.__precision, position.precision), timestamp=self.timestamp) else: - return GazePosition(numpy.array(self) - numpy.array(position)) + return GazePosition(numpy.array(self) - numpy.array(position), timestamp=self.timestamp) def __rsub__(self, position: GazePositionType) -> GazePositionType: """Reversed substract position. !!! note The returned position precision is the maximal precision. + + !!! note + The returned position timestamp is the self object timestamp. """ if self.__precision is not None and position.precision is not None: - return GazePosition(numpy.array(position) - numpy.array(self), precision = max(self.__precision, position.precision)) + return GazePosition(numpy.array(position) - numpy.array(self), precision = max(self.__precision, position.precision), timestamp=self.timestamp) else: - return GazePosition(numpy.array(position) - numpy.array(self)) + return GazePosition(numpy.array(position) - numpy.array(self), timestamp=self.timestamp) def __mul__(self, factor: int|float) -> GazePositionType: """Multiply position by a factor. !!! note The returned position precision is also multiplied by the factor. + + !!! note + The returned position timestamp is the self object timestamp. """ - return GazePosition(numpy.array(self) * factor, precision = self.__precision * factor if self.__precision is not None else None) + return GazePosition(numpy.array(self) * factor, precision = self.__precision * factor if self.__precision is not None else None, timestamp=self.timestamp) def __pow__(self, factor: int|float) -> GazePositionType: """Power position by a factor. !!! note The returned position precision is also powered by the factor. + + !!! note + The returned position timestamp is the self object timestamp. """ - return GazePosition(numpy.array(self) ** factor, precision = self.__precision ** factor if self.__precision is not None else None) + return GazePosition(numpy.array(self) ** factor, precision = self.__precision ** factor if self.__precision is not None else None, timestamp=self.timestamp) def distance(self, gaze_position) -> float: """Distance to another gaze positions.""" @@ -388,7 +403,8 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): @property def timestamp(self) -> int|float: """Get first position timestamp.""" - return self[0].timestamp + if self: + return self[0].timestamp def is_timestamped(self) -> bool: """If first position exist, the movement is timestamped.""" @@ -573,15 +589,14 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): super().__init__() @DataFeatures.PipelineStepMethod - def identify(self, timestamp: int|float, gaze_position: GazePosition, terminate:bool=False) -> GazeMovementType: + def identify(self, timestamped_gaze_position: GazePosition, terminate:bool=False) -> GazeMovementType: """Identify gaze movement from successive timestamped gaze positions. !!! warning "Mandatory" Each identified gaze movement have to share its first/last gaze position with previous/next gaze movement. Parameters: - timestamp: gaze position timestamp - gaze_position: new gaze position from where identification have to be done considering former gaze positions. + timestamped_gaze_position: new gaze position from where identification have to be done considering former gaze positions. terminate: allows to notify identification algorithm that given gaze position will be the last one. Returns: @@ -629,7 +644,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): # Iterate on gaze positions for gaze_position in ts_gaze_positions: - gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) + gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts)) if gaze_movement: @@ -676,7 +691,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): # Iterate on gaze positions for gaze_position in ts_gaze_positions: - gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) + gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts)) if gaze_movement: @@ -865,7 +880,7 @@ class ScanPathAnalyzer(DataFeatures.PipelineStepObject): return DataFeatures.DataDictionary(analysis) @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, scan_path: ScanPathType): + def analyze(self, scan_path: ScanPathType): """Analyze scan path.""" raise NotImplementedError('analyze() method not implemented') @@ -1221,7 +1236,7 @@ class AOIScanPathAnalyzer(DataFeatures.PipelineStepObject): return DataFeatures.DataDictionary(analysis) @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, aoi_scan_path: AOIScanPathType): + def analyze(self, aoi_scan_path: AOIScanPathType): """Analyze aoi scan path.""" raise NotImplementedError('analyze() method not implemented') diff --git a/src/argaze/PupillAnalysis/WorkloadIndex.py b/src/argaze/PupillAnalysis/WorkloadIndex.py index 38be6ca..f97dce3 100644 --- a/src/argaze/PupillAnalysis/WorkloadIndex.py +++ b/src/argaze/PupillAnalysis/WorkloadIndex.py @@ -44,7 +44,7 @@ class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer): return self.__period @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, pupill_diameter: PupillFeatures.PupillDiameter) -> float: + def analyze(self, pupill_diameter: PupillFeatures.PupillDiameter) -> float: """Analyze workload index from successive timestamped pupill diameters.""" # Ignore non valid pupill diameter diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupillFeatures.py index bf3b4a9..16fbea2 100644 --- a/src/argaze/PupillFeatures.py +++ b/src/argaze/PupillFeatures.py @@ -53,7 +53,7 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a pupill diameter analyser.""" @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, pupill_diameter: PupillDiameterType) -> any: + def analyze(self, pupill_diameter: PupillDiameterType) -> any: """Analyze pupill diameter from successive timestamped pupill diameters.""" raise NotImplementedError('analyze() method not implemented') diff --git a/src/argaze/utils/demo_aruco_markers_run.py b/src/argaze/utils/demo_aruco_markers_run.py index 3ad8cca..12e2b11 100644 --- a/src/argaze/utils/demo_aruco_markers_run.py +++ b/src/argaze/utils/demo_aruco_markers_run.py @@ -66,18 +66,18 @@ def main(): # Edit millisecond timestamp timestamp = int((time.time() - start_time) * 1e3) - try: + #try: - # Project gaze position into camera - aruco_camera.look(timestamp, GazeFeatures.GazePosition((x, y))) + # Project gaze position into camera + aruco_camera.look(GazeFeatures.GazePosition((x, y), timestamp=timestamp)) - # Assess gaze analysis - gaze_analysis_time = aruco_camera.execution_times['look'] + # Assess gaze analysis + gaze_analysis_time = aruco_camera.execution_times['look'] - except Exception as e: + #except Exception as e: - print(e) - gaze_analysis_time = 0 + # print(e) + # gaze_analysis_time = 0 # Attach mouse callback to window cv2.setMouseCallback(aruco_camera.name, on_mouse_event) @@ -121,7 +121,7 @@ def main(): try: # Detect and project AR features - aruco_camera.watch(capture_time, video_image) + aruco_camera.watch(video_image, timestamp=capture_time) # Detection suceeded exception = None diff --git a/src/argaze/utils/demo_gaze_analysis_run.py b/src/argaze/utils/demo_gaze_analysis_run.py index 06e251d..bf768cc 100644 --- a/src/argaze/utils/demo_gaze_analysis_run.py +++ b/src/argaze/utils/demo_gaze_analysis_run.py @@ -47,18 +47,15 @@ def main(): # Update pointer position def on_mouse_event(event, x, y, flags, param): - # Edit millisecond timestamp - timestamp = int((time.time() - start_time) * 1e3) + #try: - try: - - # Project gaze position into frame - ar_frame.look(timestamp, GazeFeatures.GazePosition((x, y))) + # Project gaze position into frame with millisecond timestamp + ar_frame.look(GazeFeatures.GazePosition((x, y), timestamp=int((time.time() - start_time) * 1e3))) # Catch pipeline exception - except Exception as e: + #except Exception as e: - print('Gaze projection error:', e) + # print('Gaze projection error:', e) # Attach mouse callback to window cv2.setMouseCallback(ar_frame.name, on_mouse_event) -- cgit v1.1