diff options
Diffstat (limited to 'src/argaze/ArFeatures.py')
-rw-r--r-- | src/argaze/ArFeatures.py | 331 |
1 files changed, 122 insertions, 209 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 281dec8..30044a7 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -130,6 +130,9 @@ class ArLayer(DataFeatures.SharedObject): # Init current gaze movement self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() + # Init new analysis available state + self.__new_analysis_available = False + # Cast aoi scene to its effective dimension if self.aoi_scene.dimension == 2: @@ -293,7 +296,10 @@ class ArLayer(DataFeatures.SharedObject): for logger_name, logger_data in new_loggers_value.items(): - new_loggers[logger_name] = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) + logger = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) + logger.name = logger_name + + new_loggers[logger_name] = logger except KeyError: @@ -346,6 +352,13 @@ class ArLayer(DataFeatures.SharedObject): self.__parent = parent + @property + def new_analysis_available(self) -> bool: + """Is there new aoi scan path analysis to check?""" + + return self.__new_analysis_available + + @DataFeatures.PipelineStep def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> dict: """ Project timestamped gaze movement into layer. @@ -355,123 +368,65 @@ class ArLayer(DataFeatures.SharedObject): Parameters: gaze_movement: gaze movement to project - - Returns: - look_data: data dictionary - - !!! note "look data dictionary" - - **gaze_movement**: incoming gaze movement - - **looked_aoi_name**: most likely looked aoi name - - **looked_aoi**: most likely looked aoi shape - - **aoi_scan_path_analysis**: aoi scan path analysis at each new scan step if aoi_scan_path is instanciated - - **exception**: error catched during gaze movement processing """ # Lock layer exploitation self.acquire() - # Store look execution start date - look_start = time.perf_counter() + # Gather look data + look_data = locals() # Update current gaze movement self.__gaze_movement = gaze_movement - # Init looked aoi - looked_aoi_name, looked_aoi = (None, None) - - # Init aoi scan path analysis report - aoi_scan_path_analysis = {} - - # Assess pipeline execution times - execution_times = { - 'aoi_matcher': None, - 'aoi_scan_path_analyzers': {} - } - - # Catch any error - exception = None - - try: - - if self.aoi_matcher is not None: - - # Store aoi matching start date - matching_start = time.perf_counter() - - # Update looked aoi thanks to aoi matcher - # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally - looked_aoi_name, looked_aoi = self.aoi_matcher.match(self.aoi_scene, gaze_movement) + # No new analysis available by default + self.__new_analysis_available = False - # Assess aoi matching time in ms - execution_times['aoi_matcher'] = (time.perf_counter() - matching_start) * 1e3 + # Init looked aoi name + looked_aoi_name = None - # Valid and finished gaze movement has been identified - if gaze_movement.valid and gaze_movement.finished: + if self.aoi_matcher is not None: - if GazeFeatures.is_fixation(gaze_movement): + # Update looked aoi thanks to aoi matcher + # Note: don't filter valid/unvalid and finished/unfinished fixation/saccade as we don't know how the aoi matcher works internally + looked_aoi_name, _ , match_time, match_exception = self.aoi_matcher.match(self.aoi_scene, gaze_movement) - # Append fixation to aoi scan path - if self.aoi_scan_path is not None and looked_aoi_name is not None: + # Valid and finished gaze movement has been identified + if gaze_movement.valid and gaze_movement.finished: - aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, looked_aoi_name) + if GazeFeatures.is_fixation(gaze_movement): - # Is there a new step? - if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: + # Append fixation to aoi scan path + if self.aoi_scan_path is not None and looked_aoi_name is not None: - for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): + aoi_scan_step = self.aoi_scan_path.append_fixation(timestamp, gaze_movement, looked_aoi_name) - # Store aoi scan path analysis start date - aoi_scan_path_analysis_start = time.perf_counter() + # Is there a new step? + if aoi_scan_step is not None and len(self.aoi_scan_path) > 1: - # Analyze aoi scan path - aoi_scan_path_analyzer.analyze(self.aoi_scan_path) + for aoi_scan_path_analyzer_module_path, aoi_scan_path_analyzer in self.aoi_scan_path_analyzers.items(): - # Assess aoi scan path analysis time in ms - execution_times['aoi_scan_path_analyzers'][aoi_scan_path_analyzer_module_path] = (time.perf_counter() - aoi_scan_path_analysis_start) * 1e3 + # Analyze aoi scan path + analyze_time, analyze_exception = aoi_scan_path_analyzer.analyze(self.aoi_scan_path) - # Store analysis - aoi_scan_path_analysis[aoi_scan_path_analyzer_module_path] = aoi_scan_path_analyzer.analysis + # Update new analysis available state + self.__new_analysis_available = True - elif GazeFeatures.is_saccade(gaze_movement): + elif GazeFeatures.is_saccade(gaze_movement): - # Append saccade to aoi scan path - if self.aoi_scan_path is not None: + # Append saccade to aoi scan path + if self.aoi_scan_path is not None: - self.aoi_scan_path.append_saccade(timestamp, gaze_movement) - - except Exception as e: - - print('Warning: the following error occurs in ArLayer.look method:', e) - - looked_aoi_name = None - looked_aoi = None - aoi_scan_path_analysis = {} - exception = e - - # Assess total execution time in ms - execution_times['total'] = (time.perf_counter() - look_start) * 1e3 - - # Edit look data dictionary - look_data = DataFeatures.DataDictionary({ - "gaze_movement": gaze_movement, - "looked_aoi_name": looked_aoi_name, - "looked_aoi": looked_aoi, - "aoi_scan_path_analysis": DataFeatures.DataDictionary(aoi_scan_path_analysis), - "execution_times": DataFeatures.DataDictionary(execution_times), - "exception": exception - }) + self.aoi_scan_path.append_saccade(timestamp, gaze_movement) # Log look data for logger_name, logger in self.loggers.items(): - logger.emit(timestamp, DataFeatures.DataDictionary(look_data)) + logger.emit(look_data) # Unlock layer exploitation self.release() - # Return look data dictionary - return look_data - def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ Draw into image. @@ -573,7 +528,13 @@ class ArFrame(DataFeatures.SharedObject): layer.parent = self # Init current gaze position - self.__gaze_position = GazeFeatures.UnvalidGazePosition() + self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() + + # Init current gaze movement + self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + + # Init new analysis available state + self.__new_analysis_available = False @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: @@ -771,7 +732,10 @@ class ArFrame(DataFeatures.SharedObject): for logger_name, logger_data in new_loggers_value.items(): - new_loggers[logger_name] = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) + logger = DataFeatures.TimeStampedDataLogger.from_dict(logger_data) + logger.name = logger_name + + new_loggers[logger_name] = logger except KeyError: @@ -829,7 +793,26 @@ class ArFrame(DataFeatures.SharedObject): self.__parent = parent - def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> DataFeatures.DataDictionary: + @property + def gaze_position(self) -> object: + """Get current calibrated gaze position""" + + return self.__calibrated_gaze_position + + @property + def gaze_movement(self) -> object: + """Get current identified gaze movement""" + + return self.__identified_gaze_movement + + @property + def new_analysis_available(self) -> bool: + """Is there new scan path analysis to check?""" + + return self.__new_analysis_available + + @DataFeatures.PipelineStep + def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): """ Project gaze position into frame. @@ -839,162 +822,92 @@ class ArFrame(DataFeatures.SharedObject): Parameters: timestamp: any number used to know when the given gaze position occurs gaze_position: gaze position to project - - Returns: - look_data: data dictionary - - !!! note "look data dictionary" - - **gaze_position**: calibrated gaze position if gaze_position_calibrator is instanciated else, given gaze position. - - **gaze_movement**: identified gaze movement from incoming consecutive timestamped gaze positions if gaze_movement_identifier is instanciated. Current gaze movement if filter_in_progress_identification is False. - - **scan_path_analysis**: scan path analysis at each new scan step if scan_path is instanciated. - - **execution_times**: all pipeline steps execution times. - - **exception**: error catched during gaze position processing. - - **layers**: data dictionary with each layer's look data. """ # Lock frame exploitation self.acquire() - # Store look execution start date - look_start = time.perf_counter() - - # No gaze movement identified by default - identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() - - # Init scan path analysis report - scan_step_analysis = {} - - # Assess pipeline execution times - execution_times = { - 'gaze_movement_identifier': None, - 'scan_path_analyzers': DataFeatures.DataDictionary({}), - 'heatmap': None - } - - # Catch any error - exception = None - - # Init layers look data report - layers_look_data = {} - - try: - - # Apply gaze position calibration - if self.gaze_position_calibrator is not None: + # Store look arguments + look_data = locals() - self.__gaze_position = self.gaze_position_calibrator.apply(gaze_position) + # No new analysis by default + self.__new_analysis_available = False - # Or update gaze position at least - else: - - self.__gaze_position = gaze_position - - # Identify gaze movement - if self.gaze_movement_identifier is not None: - - # Store movement identification start date - identification_start = time.perf_counter() - - # Identify finished gaze movement - identified_gaze_movement = self.gaze_movement_identifier.identify(timestamp, self.__gaze_position) - - # Assess movement identification time in ms - execution_times['gaze_movement_identifier'] = (time.perf_counter() - identification_start) * 1e3 - - # Valid and finished gaze movement has been identified - if identified_gaze_movement.valid and identified_gaze_movement.finished: - - if GazeFeatures.is_fixation(identified_gaze_movement): - - # Append fixation to scan path - if self.scan_path is not None: + # No gaze movement identified by default + self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() - self.scan_path.append_fixation(timestamp, identified_gaze_movement) + # Apply gaze position calibration + if self.gaze_position_calibrator is not None: - elif GazeFeatures.is_saccade(identified_gaze_movement): + self.__calibrated_gaze_position = self.gaze_position_calibrator.apply(gaze_position) - # Append saccade to scan path - if self.scan_path is not None: - - scan_step = self.scan_path.append_saccade(timestamp, identified_gaze_movement) + # Or update gaze position at least + else: - # Is there a new step? - if scan_step and len(self.scan_path) > 1: + self.__calibrated_gaze_position = gaze_position - for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): + # Identify gaze movement + if self.gaze_movement_identifier is not None: - # Store scan step analysis start date - scan_step_analysis_start = time.perf_counter() + # Identify finished gaze movement + self.__identified_gaze_movement, identify_time, identify_exception = self.gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) - # Analyze aoi scan path - scan_path_analyzer.analyze(self.scan_path) + # Valid and finished gaze movement has been identified + if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished: - # Assess scan path analysis time in ms - execution_times['scan_path_analyzers'][scan_path_analyzer_module_path] = (time.perf_counter() - scan_step_analysis_start) * 1e3 + if GazeFeatures.is_fixation(self.__identified_gaze_movement): - # Store analysis - scan_step_analysis[scan_path_analyzer_module_path] = scan_path_analyzer.analysis + # Append fixation to scan path + if self.scan_path is not None: - # No valid finished gaze movement: optionnaly stop in progress identification filtering - elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification: + self.scan_path.append_fixation(timestamp, self.__identified_gaze_movement) - identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement + elif GazeFeatures.is_saccade(self.__identified_gaze_movement): - # Update heatmap - if self.heatmap is not None: + # Append saccade to scan path + if self.scan_path is not None: + + scan_step = self.scan_path.append_saccade(timestamp, self.__identified_gaze_movement) - # Store heatmap start date - heatmap_start = time.perf_counter() + # Is there a new step? + if scan_step and len(self.scan_path) > 1: - # Scale gaze position value - scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]]) + for scan_path_analyzer_module_path, scan_path_analyzer in self.scan_path_analyzers.items(): - # Update heatmap image - self.heatmap.update(self.__gaze_position.value * scale) + # Analyze aoi scan path + analyze_time, analyze_exception = scan_path_analyzer.analyze(self.scan_path) - # Assess heatmap time in ms - execution_times['heatmap'] = (time.perf_counter() - heatmap_start) * 1e3 + # Update new analysis available state + self.__new_analysis_available = True - # Look layers with valid identified gaze movement - # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally - for layer_name, layer in self.layers.items(): + # No valid finished gaze movement: optionnaly stop in progress identification filtering + elif self.gaze_movement_identifier is not None and not self.filter_in_progress_identification: - layers_look_data[layer_name] = layer.look(timestamp, identified_gaze_movement) + self.__identified_gaze_movement = self.gaze_movement_identifier.current_gaze_movement - except Exception as e: + # Update heatmap + if self.heatmap is not None: - print('Warning: the following error occurs in ArFrame.look method:', e) + # Scale gaze position value + scale = numpy.array([self.heatmap.size[0] / self.size[0], self.heatmap.size[1] / self.size[1]]) - self.__gaze_position = GazeFeatures.UnvalidGazePosition() - identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() - scan_step_analysis = {} - exception = e - layers_look_data = {} + # Update heatmap image + update_time, update_exception = self.heatmap.update(self.__calibrated_gaze_position.value * scale) - # Assess total execution time in ms - execution_times['total'] = (time.perf_counter() - look_start) * 1e3 + # Look layers with valid identified gaze movement + # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally + for layer_name, layer in self.layers.items(): - # Edit look data dictionary - look_data = DataFeatures.DataDictionary({ - "gaze_position": self.__gaze_position, - "gaze_movement": identified_gaze_movement, - "scan_path_analysis": DataFeatures.DataDictionary(scan_step_analysis), - "execution_times": DataFeatures.DataDictionary(execution_times), - "exception": exception, - "layers": DataFeatures.DataDictionary(layers_look_data) - }) + look_time, look_exception = layer.look(timestamp, self.__identified_gaze_movement) # Log look data for logger_name, logger in self.loggers.items(): - logger.emit(timestamp, look_data) + logger.emit(look_data) # Unlock frame exploitation self.release() - # Return look data - return look_data - def __image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. @@ -1065,7 +978,7 @@ class ArFrame(DataFeatures.SharedObject): # Draw current gaze position if required if draw_gaze_positions is not None: - self.__gaze_position.draw(image, **draw_gaze_positions) + self.__calibrated_gaze_position.draw(image, **draw_gaze_positions) # Unlock frame exploitation self.release() |