aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/ArFeatures.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/argaze/ArFeatures.py')
-rw-r--r--src/argaze/ArFeatures.py104
1 files changed, 49 insertions, 55 deletions
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 56a941d..32d1542 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -16,24 +16,18 @@ __credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
-from typing import Iterator, Union
import logging
-import json
-import os
-import sys
-import importlib
-import threading
-import time
import math
+import os
+from typing import Iterator, Union
+
+import cv2
+import numpy
from argaze import DataFeatures, GazeFeatures
from argaze.AreaOfInterest import *
-from argaze.GazeAnalysis import *
from argaze.utils import UtilsFeatures
-import numpy
-import cv2
-
class PoseEstimationFailed(Exception):
"""
@@ -129,12 +123,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@aoi_scene.setter
def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene | str | dict):
+ new_aoi_scene = None
+
if issubclass(type(aoi_scene_value), AOIFeatures.AOIScene):
new_aoi_scene = aoi_scene_value
# str: relative path to file
- elif type(aoi_scene_value) == str:
+ elif type(aoi_scene_value) is str:
filepath = os.path.join(DataFeatures.get_working_directory(), aoi_scene_value)
file_format = filepath.split('.')[-1]
@@ -154,10 +150,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath)
# dict:
- elif type(aoi_scene_value) == dict:
+ elif type(aoi_scene_value) is dict:
new_aoi_scene = AOIFeatures.AOIScene.from_dict(aoi_scene_value)
+ else:
+
+ raise ValueError("Bad aoi scene value")
+
# Cast aoi scene to its effective dimension
if new_aoi_scene.dimension == 2:
@@ -213,6 +213,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""AOI scan path analyzers list."""
return self.__aoi_scan_path_analyzers
+ # noinspection PyUnresolvedReferences
@aoi_scan_path_analyzers.setter
@DataFeatures.PipelineStepAttributeSetter
def aoi_scan_path_analyzers(self, aoi_scan_path_analyzers: list):
@@ -245,7 +246,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
for a in self.__aoi_scan_path_analyzers:
- if type(a) == property_type:
+ if type(a) is property_type:
setattr(analyzer, name, a)
found = True
@@ -254,7 +255,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
- if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None:
+ if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path is None:
self.__aoi_scan_path = GazeFeatures.ScanPath()
# Edit parent
@@ -311,7 +312,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__aoi_scan_path.expected_aoi = expected_aoi
@DataFeatures.PipelineStepMethod
- def look(self, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()):
+ def look(self, gaze_movement: GazeFeatures.GazeMovement = None):
"""
Project timestamped gaze movement into layer.
@@ -321,6 +322,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
Parameters:
gaze_movement: gaze movement to project
"""
+ if gaze_movement is None:
+ gaze_movement = GazeFeatures.GazeMovement()
# Use layer lock feature
with self._lock:
@@ -383,8 +386,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
Parameters:
image: image where to draw.
- draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn)
- draw_aoi_matching: AOIMatcher.draw parameters (which depends on the loaded aoi matcher module,
+ draw_aoi_scene: [AOI2DScene.draw][argaze.AreaOfInterest.AOI2DScene.draw] parameters (if None, no aoi scene is drawn)
+ draw_aoi_matching: [AOIMatcher.draw][argaze.GazeFeatures.AOIMatcher.draw] parameters (which depends on the loaded aoi matcher module,
if None, no aoi matching is drawn)
"""
@@ -530,6 +533,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""Scan path analyzers list."""
return self.__scan_path_analyzers
+ # noinspection PyUnresolvedReferences
@scan_path_analyzers.setter
@DataFeatures.PipelineStepAttributeSetter
def scan_path_analyzers(self, scan_path_analyzers: list):
@@ -562,7 +566,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
for a in self.__scan_path_analyzers:
- if type(a) == property_type:
+ if type(a) is property_type:
setattr(analyzer, name, a)
found = True
@@ -571,7 +575,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
- if len(self.__scan_path_analyzers) > 0 and self.__scan_path == None:
+ if len(self.__scan_path_analyzers) > 0 and self.__scan_path is None:
self.__scan_path = GazeFeatures.ScanPath()
# Edit parent
@@ -739,8 +743,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Analyze aoi scan path
for scan_path_analyzer in self.__scan_path_analyzers:
- scan_path_analyzer.analyze(self.__scan_path,
- timestamp=self.__identified_gaze_movement.timestamp)
+ scan_path_analyzer.analyze(self.__scan_path, timestamp=self.__identified_gaze_movement.timestamp)
# Update scan path analyzed state
self.__scan_path_analyzed = True
@@ -756,8 +759,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]])
# Update heatmap image
- self.__heatmap.update(self.__calibrated_gaze_position * scale,
- timestamp=self.__calibrated_gaze_position.timestamp)
+ self.__heatmap.update(self.__calibrated_gaze_position * scale, timestamp=self.__calibrated_gaze_position.timestamp)
# Look layers with valid identified gaze movement
# Note: don't filter valid/invalid finished/unfinished gaze movement to allow layers to reset internally
@@ -765,9 +767,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
layer.look(self.__identified_gaze_movement)
@DataFeatures.PipelineStepImage
- def image(self, background_weight: float = None, heatmap_weight: float = None,
- draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None,
- draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array:
+ def image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array:
"""
Get background image with overlaid visualisations.
@@ -901,12 +901,12 @@ class ArScene(DataFeatures.PipelineStepObject):
for layer_name, layer_data in layers.items():
- if type(layer_data) == dict:
+ if type(layer_data) is dict:
self._layers[layer_name] = ArLayer(name=layer_name, **layer_data)
# str: relative path to JSON file
- elif type(layer_data) == str:
+ elif type(layer_data) is str:
self._layers[layer_name] = DataFeatures.from_json(
os.path.join(DataFeatures.get_working_directory(), layer_data))
@@ -929,18 +929,22 @@ class ArScene(DataFeatures.PipelineStepObject):
for frame_name, frame_data in frames.items():
- if type(frame_data) == dict:
+ if type(frame_data) is dict:
new_frame = ArFrame(name=frame_name, **frame_data)
# str: relative path to JSON file
- elif type(frame_data) == str:
+ elif type(frame_data) is str:
new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data))
# Loaded frame name have to be equals to dictionary key
assert (new_frame.name == frame_name)
+ else:
+
+ raise ValueError("Bad frame data.")
+
# Look for a scene layer with an AOI named like the frame
for scene_layer_name, scene_layer in self.layers.items():
@@ -1013,8 +1017,7 @@ class ArScene(DataFeatures.PipelineStepObject):
raise NotImplementedError('estimate_pose() method not implemented')
@DataFeatures.PipelineStepMethod
- def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> \
- Iterator[Union[str, AOI2DScene.AOI2DScene]]:
+ def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]:
"""Project layers according estimated pose and optional field of view clipping angles.
Parameters:
@@ -1050,6 +1053,7 @@ class ArScene(DataFeatures.PipelineStepObject):
aoi_scene_copy = layer.aoi_scene.copy()
# Project layer aoi scene
+ # noinspection PyUnresolvedReferences
yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K)
@@ -1246,15 +1250,13 @@ class ArCamera(ArFrame):
inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position)
# QUESTION: How to project gaze precision?
- inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y),
- timestamp=timestamped_gaze_position.timestamp)
+ inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp)
# Project inner gaze position into scene frame
scene_frame.look(inner_gaze_position * scene_frame.size)
# Ignore missing aoi in camera frame layer projection
- except KeyError as e:
-
+ except KeyError:
pass
@DataFeatures.PipelineStepMethod
@@ -1375,8 +1377,7 @@ class ArContext(DataFeatures.PipelineStepObject):
"""Exit from ArContext."""
pass
- def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None,
- precision: int | float = None):
+ def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None, precision: int | float = None):
"""Request pipeline to process new gaze position at a timestamp."""
logging.debug('ArContext._process_gaze_position %s', self.name)
@@ -1395,14 +1396,12 @@ class ArContext(DataFeatures.PipelineStepObject):
if x is None and y is None:
# Edit empty gaze position
- self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp),
- catch_exceptions=self.__catch_exceptions)
+ self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp), catch_exceptions=self.__catch_exceptions)
else:
# Edit gaze position
- self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp),
- catch_exceptions=self.__catch_exceptions)
+ self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp), catch_exceptions=self.__catch_exceptions)
except DataFeatures.TimestampedException as e:
@@ -1439,8 +1438,7 @@ class ArContext(DataFeatures.PipelineStepObject):
logging.debug('\t> watch image (%i x %i)', width, height)
- self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp),
- catch_exceptions=self.__catch_exceptions)
+ self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp), catch_exceptions=self.__catch_exceptions)
# TODO: make this step optional
self.__pipeline.map(timestamp=timestamp, catch_exceptions=self.__catch_exceptions)
@@ -1477,8 +1475,7 @@ class ArContext(DataFeatures.PipelineStepObject):
if image.is_timestamped():
info_stack += 1
- cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1,
- (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
if issubclass(type(self.__pipeline), ArCamera):
@@ -1491,8 +1488,7 @@ class ArContext(DataFeatures.PipelineStepObject):
watch_time = math.nan
info_stack += 1
- cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz',
- (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
if issubclass(type(self.__pipeline), ArFrame):
@@ -1505,8 +1501,7 @@ class ArContext(DataFeatures.PipelineStepObject):
look_time = math.nan
info_stack += 1
- cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz',
- (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
if draw_exceptions:
@@ -1515,8 +1510,7 @@ class ArContext(DataFeatures.PipelineStepObject):
e = self.__exceptions.pop()
i = len(self.__exceptions)
- cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - (i) * 50), (0, 0, 127), -1)
- cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1,
- (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - i * 50), (0, 0, 127), -1)
+ cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
return image