diff options
Diffstat (limited to 'src/argaze/ArFeatures.py')
-rw-r--r-- | src/argaze/ArFeatures.py | 104 |
1 files changed, 49 insertions, 55 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 56a941d..32d1542 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with -this program. If not, see <http://www.gnu.org/licenses/>. +this program. If not, see <https://www.gnu.org/licenses/>. """ __author__ = "Théo de la Hogue" @@ -16,24 +16,18 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Iterator, Union import logging -import json -import os -import sys -import importlib -import threading -import time import math +import os +from typing import Iterator, Union + +import cv2 +import numpy from argaze import DataFeatures, GazeFeatures from argaze.AreaOfInterest import * -from argaze.GazeAnalysis import * from argaze.utils import UtilsFeatures -import numpy -import cv2 - class PoseEstimationFailed(Exception): """ @@ -129,12 +123,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): @aoi_scene.setter def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene | str | dict): + new_aoi_scene = None + if issubclass(type(aoi_scene_value), AOIFeatures.AOIScene): new_aoi_scene = aoi_scene_value # str: relative path to file - elif type(aoi_scene_value) == str: + elif type(aoi_scene_value) is str: filepath = os.path.join(DataFeatures.get_working_directory(), aoi_scene_value) file_format = filepath.split('.')[-1] @@ -154,10 +150,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath) # dict: - elif type(aoi_scene_value) == dict: + elif type(aoi_scene_value) is dict: new_aoi_scene = AOIFeatures.AOIScene.from_dict(aoi_scene_value) + else: + + raise ValueError("Bad aoi scene value") + # Cast aoi scene to its effective dimension if new_aoi_scene.dimension == 2: @@ -213,6 +213,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """AOI scan path analyzers list.""" return self.__aoi_scan_path_analyzers + # noinspection PyUnresolvedReferences @aoi_scan_path_analyzers.setter @DataFeatures.PipelineStepAttributeSetter def aoi_scan_path_analyzers(self, aoi_scan_path_analyzers: list): @@ -245,7 +246,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): for a in self.__aoi_scan_path_analyzers: - if type(a) == property_type: + if type(a) is property_type: setattr(analyzer, name, a) found = True @@ -254,7 +255,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation - if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None: + if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path is None: self.__aoi_scan_path = GazeFeatures.ScanPath() # Edit parent @@ -311,7 +312,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__aoi_scan_path.expected_aoi = expected_aoi @DataFeatures.PipelineStepMethod - def look(self, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): + def look(self, gaze_movement: GazeFeatures.GazeMovement = None): """ Project timestamped gaze movement into layer. @@ -321,6 +322,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Parameters: gaze_movement: gaze movement to project """ + if gaze_movement is None: + gaze_movement = GazeFeatures.GazeMovement() # Use layer lock feature with self._lock: @@ -383,8 +386,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): Parameters: image: image where to draw. - draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn) - draw_aoi_matching: AOIMatcher.draw parameters (which depends on the loaded aoi matcher module, + draw_aoi_scene: [AOI2DScene.draw][argaze.AreaOfInterest.AOI2DScene.draw] parameters (if None, no aoi scene is drawn) + draw_aoi_matching: [AOIMatcher.draw][argaze.GazeFeatures.AOIMatcher.draw] parameters (which depends on the loaded aoi matcher module, if None, no aoi matching is drawn) """ @@ -530,6 +533,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): """Scan path analyzers list.""" return self.__scan_path_analyzers + # noinspection PyUnresolvedReferences @scan_path_analyzers.setter @DataFeatures.PipelineStepAttributeSetter def scan_path_analyzers(self, scan_path_analyzers: list): @@ -562,7 +566,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): for a in self.__scan_path_analyzers: - if type(a) == property_type: + if type(a) is property_type: setattr(analyzer, name, a) found = True @@ -571,7 +575,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.') # Force scan path creation - if len(self.__scan_path_analyzers) > 0 and self.__scan_path == None: + if len(self.__scan_path_analyzers) > 0 and self.__scan_path is None: self.__scan_path = GazeFeatures.ScanPath() # Edit parent @@ -739,8 +743,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Analyze aoi scan path for scan_path_analyzer in self.__scan_path_analyzers: - scan_path_analyzer.analyze(self.__scan_path, - timestamp=self.__identified_gaze_movement.timestamp) + scan_path_analyzer.analyze(self.__scan_path, timestamp=self.__identified_gaze_movement.timestamp) # Update scan path analyzed state self.__scan_path_analyzed = True @@ -756,8 +759,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image - self.__heatmap.update(self.__calibrated_gaze_position * scale, - timestamp=self.__calibrated_gaze_position.timestamp) + self.__heatmap.update(self.__calibrated_gaze_position * scale, timestamp=self.__calibrated_gaze_position.timestamp) # Look layers with valid identified gaze movement # Note: don't filter valid/invalid finished/unfinished gaze movement to allow layers to reset internally @@ -765,9 +767,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): layer.look(self.__identified_gaze_movement) @DataFeatures.PipelineStepImage - def image(self, background_weight: float = None, heatmap_weight: float = None, - draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, - draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: + def image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array: """ Get background image with overlaid visualisations. @@ -901,12 +901,12 @@ class ArScene(DataFeatures.PipelineStepObject): for layer_name, layer_data in layers.items(): - if type(layer_data) == dict: + if type(layer_data) is dict: self._layers[layer_name] = ArLayer(name=layer_name, **layer_data) # str: relative path to JSON file - elif type(layer_data) == str: + elif type(layer_data) is str: self._layers[layer_name] = DataFeatures.from_json( os.path.join(DataFeatures.get_working_directory(), layer_data)) @@ -929,18 +929,22 @@ class ArScene(DataFeatures.PipelineStepObject): for frame_name, frame_data in frames.items(): - if type(frame_data) == dict: + if type(frame_data) is dict: new_frame = ArFrame(name=frame_name, **frame_data) # str: relative path to JSON file - elif type(frame_data) == str: + elif type(frame_data) is str: new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data)) # Loaded frame name have to be equals to dictionary key assert (new_frame.name == frame_name) + else: + + raise ValueError("Bad frame data.") + # Look for a scene layer with an AOI named like the frame for scene_layer_name, scene_layer in self.layers.items(): @@ -1013,8 +1017,7 @@ class ArScene(DataFeatures.PipelineStepObject): raise NotImplementedError('estimate_pose() method not implemented') @DataFeatures.PipelineStepMethod - def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> \ - Iterator[Union[str, AOI2DScene.AOI2DScene]]: + def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]: """Project layers according estimated pose and optional field of view clipping angles. Parameters: @@ -1050,6 +1053,7 @@ class ArScene(DataFeatures.PipelineStepObject): aoi_scene_copy = layer.aoi_scene.copy() # Project layer aoi scene + # noinspection PyUnresolvedReferences yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K) @@ -1246,15 +1250,13 @@ class ArCamera(ArFrame): inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position) # QUESTION: How to project gaze precision? - inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), - timestamp=timestamped_gaze_position.timestamp) + inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp) # Project inner gaze position into scene frame scene_frame.look(inner_gaze_position * scene_frame.size) # Ignore missing aoi in camera frame layer projection - except KeyError as e: - + except KeyError: pass @DataFeatures.PipelineStepMethod @@ -1375,8 +1377,7 @@ class ArContext(DataFeatures.PipelineStepObject): """Exit from ArContext.""" pass - def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None, - precision: int | float = None): + def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None, precision: int | float = None): """Request pipeline to process new gaze position at a timestamp.""" logging.debug('ArContext._process_gaze_position %s', self.name) @@ -1395,14 +1396,12 @@ class ArContext(DataFeatures.PipelineStepObject): if x is None and y is None: # Edit empty gaze position - self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp), - catch_exceptions=self.__catch_exceptions) + self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp), catch_exceptions=self.__catch_exceptions) else: # Edit gaze position - self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp), - catch_exceptions=self.__catch_exceptions) + self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp), catch_exceptions=self.__catch_exceptions) except DataFeatures.TimestampedException as e: @@ -1439,8 +1438,7 @@ class ArContext(DataFeatures.PipelineStepObject): logging.debug('\t> watch image (%i x %i)', width, height) - self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp), - catch_exceptions=self.__catch_exceptions) + self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp), catch_exceptions=self.__catch_exceptions) # TODO: make this step optional self.__pipeline.map(timestamp=timestamp, catch_exceptions=self.__catch_exceptions) @@ -1477,8 +1475,7 @@ class ArContext(DataFeatures.PipelineStepObject): if image.is_timestamped(): info_stack += 1 - cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, - (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArCamera): @@ -1491,8 +1488,7 @@ class ArContext(DataFeatures.PipelineStepObject): watch_time = math.nan info_stack += 1 - cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', - (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(self.__pipeline), ArFrame): @@ -1505,8 +1501,7 @@ class ArContext(DataFeatures.PipelineStepObject): look_time = math.nan info_stack += 1 - cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', - (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) + cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if draw_exceptions: @@ -1515,8 +1510,7 @@ class ArContext(DataFeatures.PipelineStepObject): e = self.__exceptions.pop() i = len(self.__exceptions) - cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - (i) * 50), (0, 0, 127), -1) - cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, - (255, 255, 255), 1, cv2.LINE_AA) + cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - i * 50), (0, 0, 127), -1) + cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) return image |