From 0a7154cbaa43bb14aae62fc4324ad24c60949472 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Tue, 29 Nov 2022 19:08:14 +0100 Subject: Renaming ArGazeProject into ArGazeScene. Removing useless old ArUcoSet related class. --- src/argaze.test/ArGazeProject.py | 34 -- src/argaze.test/ArGazeScene.py | 59 +++ src/argaze.test/ArUcoMarkers/ArUcoCamera.py | 7 +- src/argaze.test/ArUcoMarkers/ArUcoTracker.py | 23 + src/argaze.test/ArUcoMarkers/utils/tracker.json | 5 + src/argaze.test/utils/aoi.obj | 7 + src/argaze.test/utils/project.json | 34 -- src/argaze.test/utils/scene.json | 58 +++ src/argaze/ArGazeProject.py | 35 -- src/argaze/ArGazeScene.py | 137 ++++++ src/argaze/ArUcoMarkers/ArUcoCamera.py | 16 +- src/argaze/ArUcoMarkers/ArUcoCube.py | 79 ---- src/argaze/ArUcoMarkers/ArUcoMarker.py | 2 +- src/argaze/ArUcoMarkers/ArUcoPlan.py | 60 --- src/argaze/ArUcoMarkers/ArUcoScene.py | 487 ++++++++++++++++++++ src/argaze/ArUcoMarkers/ArUcoSet.py | 491 --------------------- src/argaze/ArUcoMarkers/ArUcoSetFactory.py | 21 - src/argaze/ArUcoMarkers/ArUcoTracker.py | 174 +++++--- src/argaze/ArUcoMarkers/__init__.py | 2 +- src/argaze/AreaOfInterest/AOI3DScene.py | 11 +- src/argaze/AreaOfInterest/AOIFeatures.py | 11 + src/argaze/__init__.py | 2 +- .../utils/tobii_segment_argaze_scene_export.py | 235 ++++++++++ src/argaze/utils/tobii_segment_aruco_set_export.py | 346 --------------- 24 files changed, 1154 insertions(+), 1182 deletions(-) delete mode 100644 src/argaze.test/ArGazeProject.py create mode 100644 src/argaze.test/ArGazeScene.py create mode 100644 src/argaze.test/ArUcoMarkers/utils/tracker.json create mode 100644 src/argaze.test/utils/aoi.obj delete mode 100644 src/argaze.test/utils/project.json create mode 100644 src/argaze.test/utils/scene.json delete mode 100644 src/argaze/ArGazeProject.py create mode 100644 src/argaze/ArGazeScene.py delete mode 100644 src/argaze/ArUcoMarkers/ArUcoCube.py delete mode 100644 src/argaze/ArUcoMarkers/ArUcoPlan.py create mode 100644 src/argaze/ArUcoMarkers/ArUcoScene.py delete mode 100644 src/argaze/ArUcoMarkers/ArUcoSet.py delete mode 100644 src/argaze/ArUcoMarkers/ArUcoSetFactory.py create mode 100644 src/argaze/utils/tobii_segment_argaze_scene_export.py delete mode 100644 src/argaze/utils/tobii_segment_aruco_set_export.py diff --git a/src/argaze.test/ArGazeProject.py b/src/argaze.test/ArGazeProject.py deleted file mode 100644 index 8e449bf..0000000 --- a/src/argaze.test/ArGazeProject.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python - -import unittest -import os - -from argaze import ArGazeProject - -import numpy - -class TestArGazeProjectClass(unittest.TestCase): - """Test ArGazeProject class.""" - - def test_from_json(self): - """Test ArGazeProject creation from json file.""" - - # Edit project file path - current_directory = os.path.dirname(os.path.abspath(__file__)) - json_filepath = os.path.join(current_directory, 'utils/project.json') - - # Load project - argaze_project = ArGazeProject.ArGazeProject.from_json(json_filepath) - - # Check project meta data - self.assertEqual(argaze_project.name, "TestProject") - - # Check ArUco camera - self.assertEqual(argaze_project.aruco_camera.rms, 1.0) - self.assertIsNone(numpy.testing.assert_array_equal(argaze_project.aruco_camera.dimensions, [1920, 1080])) - self.assertIsNone(numpy.testing.assert_array_equal(argaze_project.aruco_camera.K, [[1.0, 0.0, 1.0], [0.0, 1.0, 1.0], [0.0, 0.0, 1.0]])) - self.assertIsNone(numpy.testing.assert_array_equal(argaze_project.aruco_camera.D, [-1.0, -0.5, 0.0, 0.5, 1.0])) - -if __name__ == '__main__': - - unittest.main() \ No newline at end of file diff --git a/src/argaze.test/ArGazeScene.py b/src/argaze.test/ArGazeScene.py new file mode 100644 index 0000000..4d0b3ae --- /dev/null +++ b/src/argaze.test/ArGazeScene.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python + +import unittest +import os + +from argaze import ArGazeScene + +import numpy + +class TestArGazeSceneClass(unittest.TestCase): + """Test ArGazeScene class.""" + + def test_from_json(self): + """Test ArGazeScene creation from json file.""" + + # Edit scene file path + current_directory = os.path.dirname(os.path.abspath(__file__)) + json_filepath = os.path.join(current_directory, 'utils/scene.json') + + # Load scene + argaze_scene = ArGazeScene.ArGazeScene.from_json(json_filepath) + + # Check scene meta data + self.assertEqual(argaze_scene.name, "TestScene") + self.assertEqual(argaze_scene.aruco_dictionary.name, "DICT_ARUCO_ORIGINAL") + self.assertEqual(argaze_scene.aruco_marker_size, 3.0) + + # Check ArUco camera + self.assertEqual(argaze_scene.aruco_camera.rms, 1.0) + self.assertIsNone(numpy.testing.assert_array_equal(argaze_scene.aruco_camera.dimensions, [1920, 1080])) + self.assertIsNone(numpy.testing.assert_array_equal(argaze_scene.aruco_camera.K, [[1.0, 0.0, 1.0], [0.0, 1.0, 1.0], [0.0, 0.0, 1.0]])) + self.assertIsNone(numpy.testing.assert_array_equal(argaze_scene.aruco_camera.D, [-1.0, -0.5, 0.0, 0.5, 1.0])) + + # Check ArUco tracker + self.assertEqual(argaze_scene.aruco_tracker.tracking_data.cornerRefinementMethod, 3) + self.assertEqual(argaze_scene.aruco_tracker.tracking_data.aprilTagQuadSigma, 2) + self.assertEqual(argaze_scene.aruco_tracker.tracking_data.aprilTagDeglitch, 1) + + # Check ArUco scene + self.assertEqual(argaze_scene.aruco_scene.angle_tolerance, 1.0) + self.assertEqual(argaze_scene.aruco_scene.distance_tolerance, 2.0) + self.assertEqual(len(argaze_scene.aruco_scene.places), 2) + + # Check ArUco scene places + self.assertIsNone(numpy.testing.assert_array_equal(argaze_scene.aruco_scene.places['A'].translation, [1, 0, 0])) + self.assertIsNone(numpy.testing.assert_array_equal(argaze_scene.aruco_scene.places['A'].rotation, [0, 0, 0])) + self.assertEqual(argaze_scene.aruco_scene.places['A'].marker.identifier, 0) + + self.assertIsNone(numpy.testing.assert_array_equal(argaze_scene.aruco_scene.places['B'].translation, [0, 1, 0])) + self.assertIsNone(numpy.testing.assert_array_equal(argaze_scene.aruco_scene.places['B'].rotation, [0, 90, 0])) + self.assertEqual(argaze_scene.aruco_scene.places['B'].marker.identifier, 1) + + # Check AOI scene + self.assertEqual(len(argaze_scene.aoi_scene.items()), 1) + self.assertEqual(argaze_scene.aoi_scene['Test'].size, 4) + +if __name__ == '__main__': + + unittest.main() \ No newline at end of file diff --git a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py index 7e50e8c..ef78b04 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py @@ -18,7 +18,10 @@ class TestArUcoCameraClass(unittest.TestCase): # Check ArUco camera self.assertEqual(aruco_camera.rms, 0.0) - self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.dimensions, numpy.array([0, 0]))) + + #self.assertEqual(type(aruco_camera.K), numpy.array) + + self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.dimensions, [0, 0])) self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.K, ArUcoCamera.K0)) self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.D, ArUcoCamera.D0)) @@ -33,7 +36,7 @@ class TestArUcoCameraClass(unittest.TestCase): # Check ArUco camera self.assertEqual(aruco_camera.rms, 1.0) - self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.dimensions, numpy.array([1920, 1080]))) + self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.dimensions, [1920, 1080])) self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.K, [[1.0, 0.0, 1.0], [0.0, 1.0, 1.0], [0.0, 0.0, 1.0]])) self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.D, [-1.0, -0.5, 0.0, 0.5, 1.0])) diff --git a/src/argaze.test/ArUcoMarkers/ArUcoTracker.py b/src/argaze.test/ArUcoMarkers/ArUcoTracker.py index acc0000..87373ea 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoTracker.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoTracker.py @@ -9,6 +9,29 @@ from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoCamera, ArUcoTracke import cv2 as cv import numpy +class TestTrackingDataClass(unittest.TestCase): + """Test TrackingData class.""" + + def test_from_json(self): + """Test TrackingData creation from json file.""" + + # Edit traking data file path + current_directory = os.path.dirname(os.path.abspath(__file__)) + json_filepath = os.path.join(current_directory, 'utils/tracker.json') + + # Load project + tracking_data = ArUcoTracker.TrackingData.from_json(json_filepath) + + # Check data + self.assertEqual(tracking_data.cornerRefinementMethod, 3) + self.assertEqual(tracking_data.aprilTagQuadSigma, 2) + self.assertEqual(tracking_data.aprilTagDeglitch, 1) + + # Check bad data access fails + with self.assertRaises(AttributeError): + + tracking_data.unknown_data = 1 + class TestArUcoTrackerClass(unittest.TestCase): """Test ArUcoTracker class.""" diff --git a/src/argaze.test/ArUcoMarkers/utils/tracker.json b/src/argaze.test/ArUcoMarkers/utils/tracker.json new file mode 100644 index 0000000..d26a3fa --- /dev/null +++ b/src/argaze.test/ArUcoMarkers/utils/tracker.json @@ -0,0 +1,5 @@ +{ + "cornerRefinementMethod": 3, + "aprilTagQuadSigma": 2, + "aprilTagDeglitch": 1 +} \ No newline at end of file diff --git a/src/argaze.test/utils/aoi.obj b/src/argaze.test/utils/aoi.obj new file mode 100644 index 0000000..ed4554f --- /dev/null +++ b/src/argaze.test/utils/aoi.obj @@ -0,0 +1,7 @@ +o Test +v 0.000000 0.000000 0.000000 +v 1.000000 0.000000 0.000000 +v 0.000000 1.000000 0.000000 +v 1.000000 1.000000 0.000000 +s off +f 1 2 4 3 diff --git a/src/argaze.test/utils/project.json b/src/argaze.test/utils/project.json deleted file mode 100644 index e2812f2..0000000 --- a/src/argaze.test/utils/project.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "name": "TestProject", - "aruco_camera": { - "rms": 1.0, - "dimensions": [ - 1920, - 1080 - ], - "K": [ - [ - 1.0, - 0.0, - 1.0 - ], - [ - 0.0, - 1.0, - 1.0 - ], - [ - 0.0, - 0.0, - 1.0 - ] - ], - "D": [ - -1.0, - -0.5, - 0.0, - 0.5, - 1.0 - ] - } -} \ No newline at end of file diff --git a/src/argaze.test/utils/scene.json b/src/argaze.test/utils/scene.json new file mode 100644 index 0000000..1f80eac --- /dev/null +++ b/src/argaze.test/utils/scene.json @@ -0,0 +1,58 @@ +{ + "name": "TestScene", + "aruco_dictionary": "DICT_ARUCO_ORIGINAL", + "aruco_marker_size": 3.0, + "aruco_camera": { + "rms": 1.0, + "dimensions": [ + 1920, + 1080 + ], + "K": [ + [ + 1.0, + 0.0, + 1.0 + ], + [ + 0.0, + 1.0, + 1.0 + ], + [ + 0.0, + 0.0, + 1.0 + ] + ], + "D": [ + -1.0, + -0.5, + 0.0, + 0.5, + 1.0 + ] + }, + "aruco_tracker": { + "cornerRefinementMethod": 3, + "aprilTagQuadSigma": 2, + "aprilTagDeglitch": 1 + }, + "aruco_scene": { + "places": { + "A": { + "translation": [1, 0, 0], + "rotation": [0, 0, 0], + "marker": 0 + }, + "B": { + "translation": [0, 1, 0], + "rotation": [0, 90, 0], + "marker": 1 + } + }, + "angle_tolerance": 1.0, + "distance_tolerance": 2.0 + }, + "aoi_scene": "aoi.obj" +} \ No newline at end of file diff --git a/src/argaze/ArGazeProject.py b/src/argaze/ArGazeProject.py deleted file mode 100644 index 757d65c..0000000 --- a/src/argaze/ArGazeProject.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python - -from typing import TypeVar -from dataclasses import dataclass, field -import json - -from argaze.ArUcoMarkers import * -from argaze.AreaOfInterest import * - -ArGazeProjectType = TypeVar('ArGazeProject', bound="ArGazeProject") -# Type definition for type annotation convenience - -@dataclass -class ArGazeProject(): - """Define an Augmented Reality environnement thanks to ArUco markers and project gaze on it to know where is looking at.""" - - name: str - """Project name.""" - - aruco_camera: ArUcoCamera.ArUcoCamera = field(init=False, default_factory=ArUcoCamera.ArUcoCamera) - """ArUco camera ...""" - - def __init__(self, **kwargs): - - self.aruco_camera = ArUcoCamera.ArUcoCamera(**kwargs.pop('aruco_camera')) - - self.__dict__.update(kwargs) - - @classmethod - def from_json(self, json_filepath) -> ArGazeProjectType: - """Load ArGaze project from .json file.""" - - with open(json_filepath) as configuration_file: - - return ArGazeProject(**json.load(configuration_file)) \ No newline at end of file diff --git a/src/argaze/ArGazeScene.py b/src/argaze/ArGazeScene.py new file mode 100644 index 0000000..56ac18e --- /dev/null +++ b/src/argaze/ArGazeScene.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python + +from typing import TypeVar +from dataclasses import dataclass, field +import json +import os + +from argaze.ArUcoMarkers import * +from argaze.AreaOfInterest import * + +import numpy + +ArGazeSceneType = TypeVar('ArGazeScene', bound="ArGazeScene") +# Type definition for type annotation convenience + +@dataclass +class ArGazeScene(): + """Define an Augmented Reality environnement thanks to ArUco markers and project it onto incoming frames.""" + + name: str + """Project name.""" + + aruco_dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary = field(init=False, default_factory=ArUcoMarkersDictionary.ArUcoMarkersDictionary) + """ArUco markers dictionary.""" + + aruco_marker_size: float = field(init=False) + """Size of ArUco markers in centimeter.""" + + aruco_camera: ArUcoCamera.ArUcoCamera = field(init=False, default_factory=ArUcoCamera.ArUcoCamera) + """ArUco camera ...""" + + aruco_tracker: ArUcoTracker.ArUcoTracker = field(init=False, default_factory=ArUcoTracker.ArUcoTracker) + """ArUco tracker ...""" + + aruco_scene: ArUcoScene.ArUcoScene = field(init=False, default_factory=ArUcoScene.ArUcoScene) + """ArUco scene ...""" + + def __init__(self, **kwargs): + + self.aruco_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(kwargs.pop('aruco_dictionary')) + + self.aruco_marker_size = kwargs.pop('aruco_marker_size') + + self.aruco_camera = ArUcoCamera.ArUcoCamera(**kwargs.pop('aruco_camera')) + + self.aruco_tracker = ArUcoTracker.ArUcoTracker(self.aruco_dictionary, self.aruco_marker_size, self.aruco_camera, **kwargs.pop('aruco_tracker')) + + self.aruco_scene = ArUcoScene.ArUcoScene(self.aruco_dictionary, self.aruco_marker_size, **kwargs.pop('aruco_scene')) + + # Check aoi_scene value type + aoi_scene_value = kwargs.pop('aoi_scene') + + # Relative path to a .obj file + if type(aoi_scene_value) == str: + + obj_filepath = os.path.join(self.__current_directory, aoi_scene_value) + self.aoi_scene = AOI3DScene.AOI3DScene.from_obj(obj_filepath) + + # Dict of all AOI + else: + self.aoi_scene = AOI3DScene.AOI3DScene(aoi_scene_value) + + self.__dict__.update(kwargs) + + @classmethod + def from_json(self, json_filepath: str) -> ArGazeSceneType: + """Load ArGaze project from .json file.""" + + with open(json_filepath) as configuration_file: + + # Store current directory to allow relative path loading + self.__current_directory = os.path.dirname(os.path.abspath(json_filepath)) + + return ArGazeScene(**json.load(configuration_file)) + + def __str__(self) -> str: + """String display""" + + output = '' + output += f'\nArUcoCamera: {self.aruco_camera}' + output += f'\n\nArUcoTracker tracking data: {self.aruco_tracker.tracking_data}' + output += f'\n\nArUcoScene: {self.aruco_scene}' + output += f'\n\nAOIScene: {self.aoi_scene}' + + return output + + def project(self, frame, valid_markers:int = 1, visual_hfov=0): + """Project ArGazeScene into frame.""" + + # Track markers with pose estimation and draw them + self.aruco_tracker.track(frame) + + # When no marker is detected, no AOI scene projection can't be done + if len(self.aruco_tracker.tracked_markers) == 0: + + raise UserWarning('No marker detected') + + # Estimate set pose from tracked markers + tvec, rvec, success, validity, unvalid = self.aruco_scene.estimate_pose(self.aruco_tracker.tracked_markers) + + # When pose estimation fails, ignore AOI scene projection + if not success: + + raise UserWarning('Pose estimation fails') + + # Consider pose estimation only if it is validated by a given number of valid markers at least + elif validity >= valid_markers: + + # Clip AOI out of the horizontal visual field of view (optional) + if visual_hfov > 0: + + # Transform scene into camera referential + aoi_scene_camera_ref = self.aoi_scene.transform(tvec, rvec) + + # Get aoi inside vision cone field + cone_vision_height_cm = 200 # cm + cone_vision_radius_cm = numpy.tan(numpy.deg2rad(visual_hfov / 2)) * cone_vision_height_cm + + _, aoi_outside = aoi_scene_camera_ref.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) + + # Keep only aoi inside vision cone field + aoi_scene_copy = self.aoi_scene.copy(exclude=aoi_outside.keys()) + + else: + + aoi_scene_copy = self.aoi_scene.copy() + + # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it + # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable. + aoi_scene_projection = aoi_scene_copy.project(tvec, rvec, self.aruco_camera.K) + + # Warn user when the merged scene is empty + if len(aoi_scene_projection.keys()) == 0: + + raise UserWarning('AOI projection is empty') + + return aoi_scene_projection, unvalid diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index dd254d2..c535523 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -19,7 +19,7 @@ class CalibrationData(): rms: float = field(default=0) """Root Mean Square error of calibration.""" - dimensions: numpy.ndarray = field(default=numpy.array([0, 0])) + dimensions: numpy.array = field(default=numpy.array([0, 0])) """Frame dimensions in pixels from which the calibration have been done.""" K: numpy.array = field(default=K0) @@ -28,10 +28,6 @@ class CalibrationData(): D: numpy.array = field(default=D0) """Distorsion coefficients vector.""" - def __init__(self, **kwargs): - - self.__dict__.update(kwargs) - @classmethod def from_json(self, json_filepath): """Load optical parameters from .json file.""" @@ -47,6 +43,16 @@ class CalibrationData(): json.dump(self, calibration_file, ensure_ascii=False, indent=4) + def __str__(self) -> str: + """String display""" + + output = f'\n\trms: {self.rms}' + output += f'\n\tdimensions: {self.dimensions}' + output += f'\n\tK: {self.K}' + output += f'\n\tD: {self.D}' + + return output + class ArUcoCamera(CalibrationData): """Handle camera calibration process.""" diff --git a/src/argaze/ArUcoMarkers/ArUcoCube.py b/src/argaze/ArUcoMarkers/ArUcoCube.py deleted file mode 100644 index 88163d8..0000000 --- a/src/argaze/ArUcoMarkers/ArUcoCube.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python - -from typing import Tuple -from dataclasses import dataclass, field -import json -import math -import itertools - -from argaze.ArUcoMarkers import ArUcoSet - -import numpy -import cv2 as cv -import cv2.aruco as aruco - -@dataclass -class ArUcoCube(ArUcoSet.ArUcoSet): - """Define ArUco cube as a specific ArUco set.""" - - edge_size: int = field(init=False) - """Size of the cube edges in centimeter.""" - - def __init__(self, configuration_filepath): - """Define cube from a .json file.""" - - # Load generic set configuration data - super().__init__(configuration_filepath) - - # Load specific cube configuration data - with open(configuration_filepath) as configuration_file: - - # Deserialize .json - # TODO find a better way - configuration = json.load(configuration_file) - - # Load edge size - self.edge_size = configuration['edge_size'] - - def draw(self, frame, K, D, draw_places=True): - """Draw cube, axis and places.""" - - l = self.edge_size / 2 - ll = self.edge_size - - # Select color according validity score - n = 95 * self._validity if self._validity < 2 else 0 - f = 159 * self._validity if self._validity < 2 else 255 - - # Draw left face - leftPoints = numpy.float32([[-l, l, l], [-l, -l, l], [-l, -l, -l], [-l, l, -l]]).reshape(-1, 3) - leftPoints, _ = cv.projectPoints(leftPoints, self._rotation, self._translation, K, D) - leftPoints = leftPoints.astype(int) - - cv.line(frame, tuple(leftPoints[0].ravel()), tuple(leftPoints[1].ravel()), (n,n,f), 2) - cv.line(frame, tuple(leftPoints[1].ravel()), tuple(leftPoints[2].ravel()), (n,n,f), 2) - cv.line(frame, tuple(leftPoints[2].ravel()), tuple(leftPoints[3].ravel()), (n,n,f), 2) - cv.line(frame, tuple(leftPoints[3].ravel()), tuple(leftPoints[0].ravel()), (n,n,f), 2) - - # Draw top face - topPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, l, -l], [l, l, -l]]).reshape(-1, 3) - topPoints, _ = cv.projectPoints(topPoints, self._rotation, self._translation, K, D) - topPoints = topPoints.astype(int) - - cv.line(frame, tuple(topPoints[0].ravel()), tuple(topPoints[1].ravel()), (n,f,n), 2) - cv.line(frame, tuple(topPoints[1].ravel()), tuple(topPoints[2].ravel()), (n,f,n), 2) - cv.line(frame, tuple(topPoints[2].ravel()), tuple(topPoints[3].ravel()), (n,f,n), 2) - cv.line(frame, tuple(topPoints[3].ravel()), tuple(topPoints[0].ravel()), (n,f,n), 2) - - # Draw front face - frontPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, -l, l], [l, -l, l]]).reshape(-1, 3) - frontPoints, _ = cv.projectPoints(frontPoints, self._rotation, self._translation, K, D) - frontPoints = frontPoints.astype(int) - - cv.line(frame, tuple(frontPoints[0].ravel()), tuple(frontPoints[1].ravel()), (f,n,n), 2) - cv.line(frame, tuple(frontPoints[1].ravel()), tuple(frontPoints[2].ravel()), (f,n,n), 2) - cv.line(frame, tuple(frontPoints[2].ravel()), tuple(frontPoints[3].ravel()), (f,n,n), 2) - cv.line(frame, tuple(frontPoints[3].ravel()), tuple(frontPoints[0].ravel()), (f,n,n), 2) - - # Draw axis and places - super().draw(frame, K, D, draw_places) diff --git a/src/argaze/ArUcoMarkers/ArUcoMarker.py b/src/argaze/ArUcoMarkers/ArUcoMarker.py index 2daaa04..2177186 100644 --- a/src/argaze/ArUcoMarkers/ArUcoMarker.py +++ b/src/argaze/ArUcoMarkers/ArUcoMarker.py @@ -55,7 +55,7 @@ class ArUcoMarker(): # Draw marker axis if pose has been estimated if self.translation.size == 3 and self.rotation.size == 3: - cv.drawFrameAxes(frame, K, D, self.rotation, self.translation, self.size) + cv.drawFrameAxes(frame, numpy.array(K), numpy.array(D), self.rotation, self.translation, self.size) aruco.drawDetectedMarkers(frame, [self.corners], numpy.array([self.identifier])) diff --git a/src/argaze/ArUcoMarkers/ArUcoPlan.py b/src/argaze/ArUcoMarkers/ArUcoPlan.py deleted file mode 100644 index aed42b3..0000000 --- a/src/argaze/ArUcoMarkers/ArUcoPlan.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python - -from typing import Tuple -from dataclasses import dataclass, field -import json -import math -import itertools - -from argaze.ArUcoMarkers import ArUcoSet - -import numpy -import cv2 as cv -import cv2.aruco as aruco - -@dataclass -class ArUcoPlan(ArUcoSet.ArUcoSet): - """Define a ArUco plan as a specific ArUco set.""" - - width: int = field(init=False) - """Width of the plan in centimeter.""" - - height: int = field(init=False) - """Height of the plan in centimeter.""" - - def __init__(self, configuration_filepath): - """Define plan from a .json file.""" - - # Load generic set configuration data - super().__init__(configuration_filepath) - - # Load specific plan configuration data - with open(configuration_filepath) as configuration_file: - - # Deserialize .json - # TODO find a better way - configuration = json.load(configuration_file) - - # Load plan dimensions - self.width = configuration['width'] - self.height = configuration['height'] - - def draw(self, frame, K, D, draw_places=True): - """Draw plan, axis and places.""" - - # Select color according validity score - n = 95 * self._validity if self._validity < 2 else 0 - f = 159 * self._validity if self._validity < 2 else 255 - - # Draw plan - planPoints = numpy.float32([[0, 0, 0], [self.width, 0, 0], [self.width, self.height, 0], [0, self.height, 0]]).reshape(-1, 3) - planPoints, _ = cv.projectPoints(planPoints, self._rotation, self._translation, K, D) - planPoints = planPoints.astype(int) - - cv.line(frame, tuple(planPoints[0].ravel()), tuple(planPoints[1].ravel()), (f,f,f), 2) - cv.line(frame, tuple(planPoints[1].ravel()), tuple(planPoints[2].ravel()), (f,f,f), 2) - cv.line(frame, tuple(planPoints[2].ravel()), tuple(planPoints[3].ravel()), (f,f,f), 2) - cv.line(frame, tuple(planPoints[3].ravel()), tuple(planPoints[0].ravel()), (f,f,f), 2) - - # Draw axis and places - super().draw(frame, K, D, draw_places) diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py new file mode 100644 index 0000000..2134cf7 --- /dev/null +++ b/src/argaze/ArUcoMarkers/ArUcoScene.py @@ -0,0 +1,487 @@ +#!/usr/bin/env python + +from typing import TypeVar, Tuple +from dataclasses import dataclass, field +import json +import math +import itertools + +from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoCamera + +import numpy +import cv2 as cv +import cv2.aruco as aruco + +ArUcoSceneType = TypeVar('ArUcoScene', bound="ArUcoScene") +# Type definition for type annotation convenience + +@dataclass +class Place(): + """Define a place as a pose and a marker.""" + + translation: numpy.array + """Position in set referential.""" + + rotation: numpy.array + """Rotation in set referential.""" + + marker: dict + """ArUco marker linked to the place.""" + +@dataclass +class ArUcoScene(): + """Define abstract class to handle group of ArUco markers as one unique spatial entity and estimate its pose.""" + + places: dict = field(init=False, default_factory=dict) + """All named places of the set and their ArUco markers.""" + + angle_tolerance: float = field(init=False) + """Angle error tolerance allowed to validate place pose in degree.""" + + distance_tolerance: float = field(init=False) + """Distance error tolerance allowed to validate place pose in centimeter.""" + + def __init__(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary, marker_size: float, **kwargs): + """Define set from a .json file.""" + + self.__dictionary = dictionary + self.__marker_size = marker_size + + # Load places + self.places = {} + for name, place in kwargs['places'].items(): + marker = ArUcoMarker.ArUcoMarker(self.__dictionary, place['marker'], self.__marker_size) + self.places[name] = Place(numpy.array(place['translation']).astype(numpy.float32), numpy.array(place['rotation']).astype(numpy.float32), marker) + + # Load angle tolerance + self.angle_tolerance = kwargs['angle_tolerance'] + + # Load distance tolerance + self.distance_tolerance = kwargs['distance_tolerance'] + + # Init pose data + self._translation = numpy.zeros(3) + self._rotation = numpy.zeros(3) + self._succeded = False + self._validity = 0 + + # Process markers ids to speed up further calculations + self.__identifier_cache = {} + for name, place in self.places.items(): + self.__identifier_cache[place.marker.identifier] = name + + # Process each place pose to speed up further calculations + self.__translation_cache = {} + for name, place in self.places.items(): + self.__translation_cache[name] = place.translation + + # Process each place rotation matrix to speed up further calculations + self.__rotation_cache = {} + for name, place in self.places.items(): + + # Create intrinsic rotation matrix + R = self.__make_rotation_matrix(*place.rotation) + + assert(self.__is_rotation_matrix(R)) + + # Store rotation matrix + self.__rotation_cache[name] = R + + # Process axis-angle between place combination to speed up further calculations + self.__angle_cache = {} + for (A_name, A_place), (B_name, B_place) in itertools.combinations(self.places.items(), 2): + + A = self.__rotation_cache[A_name] + B = self.__rotation_cache[B_name] + + if numpy.array_equal(A, B): + + angle = 0. + + else: + + # Rotation matrix from A place to B place + AB = B.dot(A.T) + + assert(self.__is_rotation_matrix(AB)) + + # Calculate axis-angle representation of AB rotation matrix + angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2)) + + try: + self.__angle_cache[A_name][B_name] = angle + except: + self.__angle_cache[A_name] = {B_name: angle} + + try: + self.__angle_cache[B_name][A_name] = angle + except: + self.__angle_cache[B_name] = {A_name: angle} + + # Process distance between each place combination to speed up further calculations + self.__distance_cache = {} + for (A_name, A_place), (B_name, B_place) in itertools.combinations(self.places.items(), 2): + + A = self.__translation_cache[A_name] + B = self.__translation_cache[B_name] + + # Calculate axis-angle representation of AB rotation matrix + distance = numpy.linalg.norm(B - A) + + try: + self.__distance_cache[A_name][B_name] = distance + except: + self.__distance_cache[A_name] = {B_name: distance} + + try: + self.__distance_cache[B_name][A_name] = distance + except: + self.__distance_cache[B_name] = {A_name: distance} + + @classmethod + def from_json(self, json_filepath) -> ArUcoSceneType: + """Load ArUco scene from .json file.""" + + with open(json_filepath) as configuration_file: + + return ArUcoScene(**json.load(configuration_file)) + + def __str__(self) -> str: + """String display""" + + output = f'\n\n\tDictionary: {self.__dictionary.name}' + + output += '\n\n\tIdentifier cache:' + for i, name in self.__identifier_cache.items(): + output += f'\n\t\t- {i}: {name}' + + output += '\n\n\tTranslation cache:' + for name, item in self.__translation_cache.items(): + output += f'\n\t\t- {name}: {item}' + + output += '\n\n\tRotation cache:' + for name, item in self.__rotation_cache.items(): + output += f'\n\t\t- {name}:\n{item}' + + output += '\n\n\tAngle cache:' + for A_name, A_angle_cache in self.__angle_cache.items(): + for B_name, angle in A_angle_cache.items(): + output += f'\n\t\t- {A_name}/{B_name}: {angle:3f}' + + output += '\n\n\tDistance cache:' + for A_name, A_distance_cache in self.__distance_cache.items(): + for B_name, distance in A_distance_cache.items(): + output += f'\n\t\t- {A_name}/{B_name}: {distance:3f}' + + return output + + @property + def identifiers(self) -> list: + """List all makers identifier.""" + + return list(self.__identifier_cache.keys()) + + def __make_rotation_matrix(self, x, y, z): + + # Create rotation matrix around x axis + c = numpy.cos(numpy.deg2rad(x)) + s = numpy.sin(numpy.deg2rad(x)) + Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) + + # Create rotation matrix around y axis + c = numpy.cos(numpy.deg2rad(y)) + s = numpy.sin(numpy.deg2rad(y)) + Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) + + # Create rotation matrix around z axis + c = numpy.cos(numpy.deg2rad(z)) + s = numpy.sin(numpy.deg2rad(z)) + Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) + + # Return intrinsic rotation matrix + return Rx.dot(Ry.dot(Rz)) + + def __is_rotation_matrix(self, R): + """Checks if a matrix is a valid rotation matrix.""" + + I = numpy.identity(3, dtype = R.dtype) + return numpy.linalg.norm(I - numpy.dot(R.T, R)) < 1e-6 + + def __normalise_place_pose(self, name, place, F): + + # Transform place rotation into set rotation vector + R = self.__rotation_cache[name] + rvec, _ = cv.Rodrigues(F.dot(R)) + + #print(f'{name} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}') + + # Transform place translation into set translation vector + OF = place.translation + T = self.__translation_cache[name] + FC = R.dot(F.dot(-T)) + + tvec = OF + FC + + #print(f'{name} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}') + + return rvec, tvec + + def estimate_pose(self, tracked_markers) -> Tuple[numpy.array, numpy.array, bool, int, dict]: + """Estimate set pose from tracked markers (cf ArUcoTracker.track()) + + * **Returns:** + - translation vector + - rotation vector + - pose estimation success status + - the number of places used to estimate the pose as validity score + - dict of non valid distance and angle + """ + + # Init pose data + self._translation = numpy.zeros(3) + self._rotation = numpy.zeros(3) + self._succeded = False + self._validity = 0 + self._unvalid = {} + + # Don't try to estimate pose if there is no tracked markers + if len(tracked_markers) == 0: + + return self._translation, self._rotation, self._succeded, self._validity, self._unvalid + + # Look for places related to tracked markers + tracked_places = {} + for (marker_id, marker) in tracked_markers.items(): + + try: + name = self.__identifier_cache[marker_id] + tracked_places[name] = marker + + except KeyError: + continue + + #print('-------------- ArUcoScene pose estimation --------------') + + # Pose validity checking is'nt possible when only one place of the set is tracked + if len(tracked_places.keys()) == 1: + + # Get set pose from to the unique place pose + name, place = tracked_places.popitem() + F, _ = cv.Rodrigues(place.rotation) + + self._rotation, self._translation = self.__normalise_place_pose(name, place, F) + self._succeded = True + self._validity = 1 + + #print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!') + #print(f'ArUcoScene rotation vector: {self._rotation[0][0]:3f} {self._rotation[1][0]:3f} {self._rotation[2][0]:3f}') + #print(f'ArUcoScene translation vector: {self._translation[0]:3f} {self._translation[1]:3f} {self._translation[2]:3f}') + + # Pose validity checking processes places two by two + else: + + valid_places = [] + valid_rvecs = [] + valid_tvecs = [] + + for (A_name, A_place), (B_name, B_place) in itertools.combinations(tracked_places.items(), 2): + + #print(f'** {A_name} > {B_name}') + + # Get place rotation estimation + # Use rotation matrix instead of rotation vector + A, _ = cv.Rodrigues(A_place.rotation) + B, _ = cv.Rodrigues(B_place.rotation) + + # Rotation matrix from A place to B place + AB = B.dot(A.T) + + assert(self.__is_rotation_matrix(AB)) + + # Calculate axis-angles representation of AB rotation matrix + angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2)) + expected_angle = self.__angle_cache[A_name][B_name] + + # Calculate distance between A place center and B place center + distance = numpy.linalg.norm(A_place.translation - B_place.translation) + expected_distance = self.__distance_cache[A_name][B_name] + + # Check angle and distance according given tolerance then normalise place pose + valid_angle = math.isclose(angle, expected_angle, abs_tol=self.angle_tolerance) + valid_distance = math.isclose(distance, expected_distance, abs_tol=self.distance_tolerance) + + if valid_angle and valid_distance: + + if A_name not in valid_places: + + # Remember this place is already validated + valid_places.append(A_name) + + rvec, tvec = self.__normalise_place_pose(A_name, A_place, A) + + # Store normalised place pose + valid_rvecs.append(rvec) + valid_tvecs.append(tvec) + + if B_name not in valid_places: + + # Remember this place is already validated + valid_places.append(B_name) + + rvec, tvec = self.__normalise_place_pose(B_name, B_place, B) + + # Store normalised place pose + valid_rvecs.append(rvec) + valid_tvecs.append(tvec) + + else: + + if not valid_angle: + self._unvalid[f'{A_name}/{B_name} angle'] = angle + + if not valid_distance: + self._unvalid[f'{A_name}/{B_name} distance'] = distance + + if len(valid_places) > 1: + + # Consider ArUcoScene rotation as the mean of all valid translations + # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before. + self._rotation = numpy.mean(numpy.array(valid_rvecs), axis=0) + + # Consider ArUcoScene translation as the mean of all valid translations + self._translation = numpy.mean(numpy.array(valid_tvecs), axis=0) + + #print(':::::::::::::::::::::::::::::::::::::::::::::::::::') + #print(f'ArUcoScene rotation vector: {self._rotation[0][0]:3f} {self._rotation[1][0]:3f} {self._rotation[2][0]:3f}') + #print(f'ArUcoScene translation vector: {self._translation[0]:3f} {self._translation[1]:3f} {self._translation[2]:3f}') + + self._succeded = True + self._validity = len(valid_places) + + else: + + unvalid_rvecs = [] + unvalid_tvecs = [] + + # Gather unvalid pose estimations + for name, place in tracked_places.items(): + + if name not in valid_places: + + R, _ = cv.Rodrigues(place.rotation) + rvec, tvec = self.__normalise_place_pose(name, place, R) + + unvalid_rvecs = [rvec] + unvalid_tvecs = [tvec] + + # Consider ArUcoScene rotation as the mean of all unvalid translations + # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before. + self._rotation = numpy.mean(numpy.array(unvalid_rvecs), axis=0) + + # Consider ArUcoScene translation as the mean of all unvalid translations + self._translation = numpy.mean(numpy.array(unvalid_tvecs), axis=0) + + #print(':::::::::::::::::::::::::::::::::::::::::::::::::::') + #print(f'ArUcoScene rotation vector: {self._rotation[0][0]:3f} {self._rotation[1][0]:3f} {self._rotation[2][0]:3f}') + #print(f'ArUcoScene translation vector: {self._translation[0]:3f} {self._translation[1]:3f} {self._translation[2]:3f}') + + self._succeded = False + self._validity = len(tracked_places) + + #print('----------------------------------------------------') + + return self._translation, self._rotation, self._succeded, self._validity, self._unvalid + + @property + def translation(self) -> numpy.array: + """Access to set translation vector. + + .. warning:: + Setting set translation vector implies succeded status to be True and validity score to be 0.""" + + return self._translation + + @translation.setter + def translation(self, tvec): + + self._translation = tvec + self._succeded = True + self._validity = 0 + + @property + def rotation(self) -> numpy.array: + """Access to set rotation vector. + + .. warning:: + Setting set rotation vector implies succeded status to be True and validity score to be 0.""" + + return self._translation + + @rotation.setter + def rotation(self, rvec): + + self._rotation = rvec + self._succeded = True + self._validity = 0 + + @property + def succeded(self) -> bool: + """Access to set pose estimation succeded status.""" + + return self._succeded + + @property + def validity(self) -> int: + """Access to set pose estimation validity score.""" + + return self._validity + + def draw(self, frame, K, D, draw_places=True): + """Draw set axis and places.""" + + l = self.__marker_size / 2 + ll = self.__marker_size + + # Select color according validity score + n = 95 * self._validity if self._validity < 2 else 0 + f = 159 * self._validity if self._validity < 2 else 255 + + try: + + # Draw axis + axisPoints = numpy.float32([[ll, 0, 0], [0, ll, 0], [0, 0, ll], [0, 0, 0]]).reshape(-1, 3) + axisPoints, _ = cv.projectPoints(axisPoints, self._rotation, self._translation, numpy.array(K), numpy.array(D)) + axisPoints = axisPoints.astype(int) + + cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (n,n,f), 5) # X (red) + cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (n,f,n), 5) # Y (green) + cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (f,n,n), 5) # Z (blue) + + # Draw places (optional) + if draw_places: + + for name, place in self.places.items(): + + if name != "top": + continue + + T = self.__translation_cache[name] + R = self.__rotation_cache[name] + + placePoints = (T + numpy.float32([R.dot([-l, -l, 0]), R.dot([l, -l, 0]), R.dot([l, l, 0]), R.dot([-l, l, 0])])).reshape(-1, 3) + placePoints, _ = cv.projectPoints(placePoints, self._rotation, self._translation, numpy.array(K), numpy.array(D)) + placePoints = placePoints.astype(int) + + cv.line(frame, tuple(placePoints[0].ravel()), tuple(placePoints[1].ravel()), (f,f,f), 2) + cv.line(frame, tuple(placePoints[1].ravel()), tuple(placePoints[2].ravel()), (f,f,f), 2) + cv.line(frame, tuple(placePoints[2].ravel()), tuple(placePoints[3].ravel()), (f,f,f), 2) + cv.line(frame, tuple(placePoints[3].ravel()), tuple(placePoints[0].ravel()), (f,f,f), 2) + + except Exception as e: + + print(e) + print(self._translation) + print(self._rotation) + print(self._succeded) + print(self._validity) + print(axisPoints) diff --git a/src/argaze/ArUcoMarkers/ArUcoSet.py b/src/argaze/ArUcoMarkers/ArUcoSet.py deleted file mode 100644 index df0ad65..0000000 --- a/src/argaze/ArUcoMarkers/ArUcoSet.py +++ /dev/null @@ -1,491 +0,0 @@ -#!/usr/bin/env python - -from typing import Tuple -from dataclasses import dataclass, field -import json -import math -import itertools - -from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoCamera - -import numpy -import cv2 as cv -import cv2.aruco as aruco - -@dataclass -class ArUcoSetPlace(): - """Define set place pose and marker.""" - - translation: numpy.array - """Position in set referential.""" - - rotation: numpy.array - """Rotation in set referential.""" - - marker: dict - """ArUco marker linked to the place.""" - -@dataclass -class ArUcoSet(): - """Define abstract class to handle specific ArUco markers set and estimate its pose.""" - - dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary - """ArUco dictionary of set markers.""" - - marker_size: int = field(init=False) - """Size of markers in centimeter.""" - - places: dict = field(init=False, default_factory=dict) - """All named places of the set and their ArUco markers.""" - - angle_tolerance: float = field(init=False) - """Angle error tolerance allowed to validate place pose in degree.""" - - distance_tolerance: float = field(init=False) - """Distance error tolerance allowed to validate place pose in centimeter.""" - - def __init__(self, configuration_filepath): - """Define set from a .json file.""" - - with open(configuration_filepath) as configuration_file: - - # Deserialize .json - # TODO find a better way - configuration = json.load(configuration_file) - - # Load dictionary - self.dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(configuration['dictionary']) - - # Load marker size - self.marker_size = configuration['marker_size'] - - # Load places - self.places = {} - for name, place in configuration['places'].items(): - marker = ArUcoMarker.ArUcoMarker(self.dictionary, place['marker'], self.marker_size) - self.places[name] = ArUcoSetPlace(numpy.array(place['translation']).astype(numpy.float32), numpy.array(place['rotation']).astype(numpy.float32), marker) - - # Load angle tolerance - self.angle_tolerance = configuration['angle_tolerance'] - - # Load distance tolerance - self.distance_tolerance = configuration['distance_tolerance'] - - # Init pose data - self._translation = numpy.zeros(3) - self._rotation = numpy.zeros(3) - self._succeded = False - self._validity = 0 - - # Process markers ids to speed up further calculations - self.__identifier_cache = {} - for name, place in self.places.items(): - self.__identifier_cache[place.marker.identifier] = name - - # Process each place pose to speed up further calculations - self.__translation_cache = {} - for name, place in self.places.items(): - self.__translation_cache[name] = place.translation - - # Process each place rotation matrix to speed up further calculations - self.__rotation_cache = {} - for name, place in self.places.items(): - - # Create intrinsic rotation matrix - R = self.__make_rotation_matrix(*place.rotation) - - assert(self.__is_rotation_matrix(R)) - - # Store rotation matrix - self.__rotation_cache[name] = R - - # Process axis-angle between place combination to speed up further calculations - self.__angle_cache = {} - for (A_name, A_place), (B_name, B_place) in itertools.combinations(self.places.items(), 2): - - A = self.__rotation_cache[A_name] - B = self.__rotation_cache[B_name] - - if numpy.array_equal(A, B): - - angle = 0. - - else: - - # Rotation matrix from A place to B place - AB = B.dot(A.T) - - assert(self.__is_rotation_matrix(AB)) - - # Calculate axis-angle representation of AB rotation matrix - angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2)) - - try: - self.__angle_cache[A_name][B_name] = angle - except: - self.__angle_cache[A_name] = {B_name: angle} - - try: - self.__angle_cache[B_name][A_name] = angle - except: - self.__angle_cache[B_name] = {A_name: angle} - - # Process distance between each place combination to speed up further calculations - self.__distance_cache = {} - for (A_name, A_place), (B_name, B_place) in itertools.combinations(self.places.items(), 2): - - A = self.__translation_cache[A_name] - B = self.__translation_cache[B_name] - - # Calculate axis-angle representation of AB rotation matrix - distance = numpy.linalg.norm(B - A) - - try: - self.__distance_cache[A_name][B_name] = distance - except: - self.__distance_cache[A_name] = {B_name: distance} - - try: - self.__distance_cache[B_name][A_name] = distance - except: - self.__distance_cache[B_name] = {A_name: distance} - - def __str__(self) -> str: - """Output pre-processed data as string representation.""" - - output = f'\n\n\tDictionary: {self.dictionary.name}' - - output += '\n\n\tIdentifier cache:' - for i, name in self.__identifier_cache.items(): - output += f'\n\t\t- {i}: {name}' - - output += '\n\n\tTranslation cache:' - for name, item in self.__translation_cache.items(): - output += f'\n\t\t- {name}: {item}' - - output += '\n\n\tRotation cache:' - for name, item in self.__rotation_cache.items(): - output += f'\n\t\t- {name}:\n{item}' - - output += '\n\n\tAngle cache:' - for A_name, A_angle_cache in self.__angle_cache.items(): - for B_name, angle in A_angle_cache.items(): - output += f'\n\t\t- {A_name}/{B_name}: {angle:3f}' - - output += '\n\n\tDistance cache:' - for A_name, A_distance_cache in self.__distance_cache.items(): - for B_name, distance in A_distance_cache.items(): - output += f'\n\t\t- {A_name}/{B_name}: {distance:3f}' - - return output - - @property - def identifiers(self) -> list: - """List all makers identifier.""" - - return list(self.__identifier_cache.keys()) - - def __make_rotation_matrix(self, x, y, z): - - # Create rotation matrix around x axis - c = numpy.cos(numpy.deg2rad(x)) - s = numpy.sin(numpy.deg2rad(x)) - Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) - - # Create rotation matrix around y axis - c = numpy.cos(numpy.deg2rad(y)) - s = numpy.sin(numpy.deg2rad(y)) - Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) - - # Create rotation matrix around z axis - c = numpy.cos(numpy.deg2rad(z)) - s = numpy.sin(numpy.deg2rad(z)) - Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) - - # Return intrinsic rotation matrix - return Rx.dot(Ry.dot(Rz)) - - def __is_rotation_matrix(self, R): - """Checks if a matrix is a valid rotation matrix.""" - - I = numpy.identity(3, dtype = R.dtype) - return numpy.linalg.norm(I - numpy.dot(R.T, R)) < 1e-6 - - def __normalise_place_pose(self, name, place, F): - - # Transform place rotation into set rotation vector - R = self.__rotation_cache[name] - rvec, _ = cv.Rodrigues(F.dot(R)) - - #print(f'{name} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}') - - # Transform place translation into set translation vector - OF = place.translation - T = self.__translation_cache[name] - FC = R.dot(F.dot(-T)) - - tvec = OF + FC - - #print(f'{name} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}') - - return rvec, tvec - - def estimate_pose(self, tracked_markers) -> Tuple[numpy.array, numpy.array, bool, int, dict]: - """Estimate set pose from tracked markers (cf ArUcoTracker.track()) - - * **Returns:** - - translation vector - - rotation vector - - pose estimation success status - - the number of places used to estimate the pose as validity score - - dict of non valid distance and angle - """ - - # Init pose data - self._translation = numpy.zeros(3) - self._rotation = numpy.zeros(3) - self._succeded = False - self._validity = 0 - self._unvalid = {} - - # Don't try to estimate pose if there is no tracked markers - if len(tracked_markers) == 0: - - return self._translation, self._rotation, self._succeded, self._validity, self._unvalid - - # Look for places related to tracked markers - tracked_places = {} - for (marker_id, marker) in tracked_markers.items(): - - try: - name = self.__identifier_cache[marker_id] - tracked_places[name] = marker - - except KeyError: - continue - - #print('-------------- ArUcoSet pose estimation --------------') - - # Pose validity checking is'nt possible when only one place of the set is tracked - if len(tracked_places.keys()) == 1: - - # Get set pose from to the unique place pose - name, place = tracked_places.popitem() - F, _ = cv.Rodrigues(place.rotation) - - self._rotation, self._translation = self.__normalise_place_pose(name, place, F) - self._succeded = True - self._validity = 1 - - #print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!') - #print(f'ArUcoSet rotation vector: {self._rotation[0][0]:3f} {self._rotation[1][0]:3f} {self._rotation[2][0]:3f}') - #print(f'ArUcoSet translation vector: {self._translation[0]:3f} {self._translation[1]:3f} {self._translation[2]:3f}') - - # Pose validity checking processes places two by two - else: - - valid_places = [] - valid_rvecs = [] - valid_tvecs = [] - - for (A_name, A_place), (B_name, B_place) in itertools.combinations(tracked_places.items(), 2): - - #print(f'** {A_name} > {B_name}') - - # Get place rotation estimation - # Use rotation matrix instead of rotation vector - A, _ = cv.Rodrigues(A_place.rotation) - B, _ = cv.Rodrigues(B_place.rotation) - - # Rotation matrix from A place to B place - AB = B.dot(A.T) - - assert(self.__is_rotation_matrix(AB)) - - # Calculate axis-angles representation of AB rotation matrix - angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2)) - expected_angle = self.__angle_cache[A_name][B_name] - - # Calculate distance between A place center and B place center - distance = numpy.linalg.norm(A_place.translation - B_place.translation) - expected_distance = self.__distance_cache[A_name][B_name] - - # Check angle and distance according given tolerance then normalise place pose - valid_angle = math.isclose(angle, expected_angle, abs_tol=self.angle_tolerance) - valid_distance = math.isclose(distance, expected_distance, abs_tol=self.distance_tolerance) - - if valid_angle and valid_distance: - - if A_name not in valid_places: - - # Remember this place is already validated - valid_places.append(A_name) - - rvec, tvec = self.__normalise_place_pose(A_name, A_place, A) - - # Store normalised place pose - valid_rvecs.append(rvec) - valid_tvecs.append(tvec) - - if B_name not in valid_places: - - # Remember this place is already validated - valid_places.append(B_name) - - rvec, tvec = self.__normalise_place_pose(B_name, B_place, B) - - # Store normalised place pose - valid_rvecs.append(rvec) - valid_tvecs.append(tvec) - - else: - - if not valid_angle: - self._unvalid[f'{A_name}/{B_name} angle'] = angle - - if not valid_distance: - self._unvalid[f'{A_name}/{B_name} distance'] = distance - - if len(valid_places) > 1: - - # Consider ArUcoSet rotation as the mean of all valid translations - # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before. - self._rotation = numpy.mean(numpy.array(valid_rvecs), axis=0) - - # Consider ArUcoSet translation as the mean of all valid translations - self._translation = numpy.mean(numpy.array(valid_tvecs), axis=0) - - #print(':::::::::::::::::::::::::::::::::::::::::::::::::::') - #print(f'ArUcoSet rotation vector: {self._rotation[0][0]:3f} {self._rotation[1][0]:3f} {self._rotation[2][0]:3f}') - #print(f'ArUcoSet translation vector: {self._translation[0]:3f} {self._translation[1]:3f} {self._translation[2]:3f}') - - self._succeded = True - self._validity = len(valid_places) - - else: - - unvalid_rvecs = [] - unvalid_tvecs = [] - - # Gather unvalid pose estimations - for name, place in tracked_places.items(): - - if name not in valid_places: - - R, _ = cv.Rodrigues(place.rotation) - rvec, tvec = self.__normalise_place_pose(name, place, R) - - unvalid_rvecs = [rvec] - unvalid_tvecs = [tvec] - - # Consider ArUcoSet rotation as the mean of all unvalid translations - # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before. - self._rotation = numpy.mean(numpy.array(unvalid_rvecs), axis=0) - - # Consider ArUcoSet translation as the mean of all unvalid translations - self._translation = numpy.mean(numpy.array(unvalid_tvecs), axis=0) - - #print(':::::::::::::::::::::::::::::::::::::::::::::::::::') - #print(f'ArUcoSet rotation vector: {self._rotation[0][0]:3f} {self._rotation[1][0]:3f} {self._rotation[2][0]:3f}') - #print(f'ArUcoSet translation vector: {self._translation[0]:3f} {self._translation[1]:3f} {self._translation[2]:3f}') - - self._succeded = False - self._validity = len(tracked_places) - - #print('----------------------------------------------------') - - return self._translation, self._rotation, self._succeded, self._validity, self._unvalid - - @property - def translation(self) -> numpy.array: - """Access to set translation vector. - - .. warning:: - Setting set translation vector implies succeded status to be True and validity score to be 0.""" - - return self._translation - - @translation.setter - def translation(self, tvec): - - self._translation = tvec - self._succeded = True - self._validity = 0 - - @property - def rotation(self) -> numpy.array: - """Access to set rotation vector. - - .. warning:: - Setting set rotation vector implies succeded status to be True and validity score to be 0.""" - - return self._translation - - @rotation.setter - def rotation(self, rvec): - - self._rotation = rvec - self._succeded = True - self._validity = 0 - - @property - def succeded(self) -> bool: - """Access to set pose estimation succeded status.""" - - return self._succeded - - @property - def validity(self) -> int: - """Access to set pose estimation validity score.""" - - return self._validity - - def draw(self, frame, K, D, draw_places=True): - """Draw set axis and places.""" - - l = self.marker_size / 2 - ll = self.marker_size - - # Select color according validity score - n = 95 * self._validity if self._validity < 2 else 0 - f = 159 * self._validity if self._validity < 2 else 255 - - try: - - # Draw axis - axisPoints = numpy.float32([[ll, 0, 0], [0, ll, 0], [0, 0, ll], [0, 0, 0]]).reshape(-1, 3) - axisPoints, _ = cv.projectPoints(axisPoints, self._rotation, self._translation, K, D) - axisPoints = axisPoints.astype(int) - - cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (n,n,f), 5) # X (red) - cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (n,f,n), 5) # Y (green) - cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (f,n,n), 5) # Z (blue) - - # Draw places (optional) - if draw_places: - - for name, place in self.places.items(): - - if name != "top": - continue - - T = self.__translation_cache[name] - R = self.__rotation_cache[name] - - placePoints = (T + numpy.float32([R.dot([-l, -l, 0]), R.dot([l, -l, 0]), R.dot([l, l, 0]), R.dot([-l, l, 0])])).reshape(-1, 3) - placePoints, _ = cv.projectPoints(placePoints, self._rotation, self._translation, K, D) - placePoints = placePoints.astype(int) - - cv.line(frame, tuple(placePoints[0].ravel()), tuple(placePoints[1].ravel()), (f,f,f), 2) - cv.line(frame, tuple(placePoints[1].ravel()), tuple(placePoints[2].ravel()), (f,f,f), 2) - cv.line(frame, tuple(placePoints[2].ravel()), tuple(placePoints[3].ravel()), (f,f,f), 2) - cv.line(frame, tuple(placePoints[3].ravel()), tuple(placePoints[0].ravel()), (f,f,f), 2) - - except Exception as e: - - print(e) - print(self._translation) - print(self._rotation) - print(self._succeded) - print(self._validity) - print(axisPoints) diff --git a/src/argaze/ArUcoMarkers/ArUcoSetFactory.py b/src/argaze/ArUcoMarkers/ArUcoSetFactory.py deleted file mode 100644 index 230c9b3..0000000 --- a/src/argaze/ArUcoMarkers/ArUcoSetFactory.py +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env python - -import json - -from argaze.ArUcoMarkers import ArUcoPlan, ArUcoCube - -class ArUcoSetFactory(): - """Define class to build specific ArUco markers set.""" - - @classmethod - def make(self, configuration_filepath): - - with open(configuration_filepath) as configuration_file: - - # Deserialize .json - configuration = json.load(configuration_file) - - # Load set type - set_type = configuration['type'] - - return eval(f'{set_type}.{set_type}')(configuration_filepath) \ No newline at end of file diff --git a/src/argaze/ArUcoMarkers/ArUcoTracker.py b/src/argaze/ArUcoMarkers/ArUcoTracker.py index 086eade..7909434 100644 --- a/src/argaze/ArUcoMarkers/ArUcoTracker.py +++ b/src/argaze/ArUcoMarkers/ArUcoTracker.py @@ -10,61 +10,110 @@ import numpy import cv2 as cv import cv2.aruco as aruco -ArUcoTrackerParameters = [ - 'adaptiveThreshConstant', - 'adaptiveThreshWinSizeMax', - 'adaptiveThreshWinSizeMin', - 'adaptiveThreshWinSizeStep', - 'aprilTagCriticalRad', - 'aprilTagDeglitch', - 'aprilTagMaxLineFitMse', - 'aprilTagMaxNmaxima', - 'aprilTagMinClusterPixels', - 'aprilTagMinWhiteBlackDiff', - 'aprilTagQuadDecimate', - 'aprilTagQuadSigma', - 'cornerRefinementMaxIterations', - 'cornerRefinementMethod', - 'cornerRefinementMinAccuracy', - 'cornerRefinementWinSize', - 'markerBorderBits', - 'minMarkerPerimeterRate', - 'maxMarkerPerimeterRate', - 'minMarkerDistanceRate', - 'detectInvertedMarker', - 'errorCorrectionRate', - 'maxErroneousBitsInBorderRate', - 'minCornerDistanceRate', - 'minDistanceToBorder', - 'minOtsuStdDev', - 'perspectiveRemoveIgnoredMarginPerCell', - 'perspectiveRemovePixelPerCell', - 'polygonalApproxAccuracyRate' -] -"""All parameters are detailled on [opencv page](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html)""" - ArUcoMarkerType = TypeVar('ArUcoMarker', bound="ArUcoMarker") # Type definition for type annotation convenience +TrackingDataType = TypeVar('TrackingData', bound="TrackingData") +# Type definition for type annotation convenience + +DetectorParametersType = TypeVar('') + +class TrackingData(): + """Define ArUco marker tracking data. + + .. note:: More details on [opencv page](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html) + """ + + __detector_parameters = aruco.DetectorParameters_create() + __detector_parameters_names = [ + 'adaptiveThreshConstant', + 'adaptiveThreshWinSizeMax', + 'adaptiveThreshWinSizeMin', + 'adaptiveThreshWinSizeStep', + 'aprilTagCriticalRad', + 'aprilTagDeglitch', + 'aprilTagMaxLineFitMse', + 'aprilTagMaxNmaxima', + 'aprilTagMinClusterPixels', + 'aprilTagMinWhiteBlackDiff', + 'aprilTagQuadDecimate', + 'aprilTagQuadSigma', + 'cornerRefinementMaxIterations', + 'cornerRefinementMethod', + 'cornerRefinementMinAccuracy', + 'cornerRefinementWinSize', + 'markerBorderBits', + 'minMarkerPerimeterRate', + 'maxMarkerPerimeterRate', + 'minMarkerDistanceRate', + 'detectInvertedMarker', + 'errorCorrectionRate', + 'maxErroneousBitsInBorderRate', + 'minCornerDistanceRate', + 'minDistanceToBorder', + 'minOtsuStdDev', + 'perspectiveRemoveIgnoredMarginPerCell', + 'perspectiveRemovePixelPerCell', + 'polygonalApproxAccuracyRate' + ] + + def __init__(self, **kwargs): + + for parameter, value in kwargs.items(): + + setattr(self.__detector_parameters, parameter, value) + + self.__dict__.update(kwargs) + + def __setattr__(self, parameter, value): + + setattr(self.__detector_parameters, parameter, value) + + def __getattr__(self, parameter): + + return getattr(self.__detector_parameters, parameter) + + @classmethod + def from_json(self, json_filepath) -> TrackingDataType: + """Load tracking data from .json file.""" + + with open(json_filepath) as configuration_file: + + return TrackingData(**json.load(configuration_file)) + + def __str__(self, print_all=False) -> str: + """Tracking data string representation.""" + + output = '' + + for parameter in self.__detector_parameters_names: + + if parameter in self.__dict__.keys(): + + output += f'\n\t*{parameter}: {getattr(self.__detector_parameters, parameter)}' + + elif print_all: + + output += f'\n\t{parameter}: {getattr(self.__detector_parameters, parameter)}' + + return output + + @property + def internal(self): + return self.__detector_parameters + class ArUcoTracker(): """Track ArUco markers into a frame.""" - def __init__(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary, marker_size: float, camera: ArUcoCamera.ArUcoCamera): - """Define which markers library to track and their size""" + def __init__(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary, marker_size: float, camera: ArUcoCamera.ArUcoCamera, **kwargs): + """Define which markers dictionary and size to track and camera.""" - # load ArUco markers dictionary self.__dictionary = dictionary - - # define marker length in centimeter self.__marker_size = marker_size - - # define camera self.__camera = camera - # setup ArUco detection parameters - self.__detector_parameters = aruco.DetectorParameters_create() - self.__detector_parameters.cornerRefinementMethod = aruco.CORNER_REFINE_CONTOUR # to get a better pose estimation - self.__detector_parameters_loaded = {} + # Init tracking data + self.__tracking_data = TrackingData(**kwargs) # init tracked markers data self.__tracked_markers = {} @@ -79,30 +128,17 @@ class ArUcoTracker(): self.__track_count = 0 self.__tracked_ids = [] - def load_configuration_file(self, configuration_filepath): - """Load aruco detection parameters from .json file.""" - - with open(configuration_filepath) as configuration_file: - - self.__detector_parameters_loaded = json.load(configuration_file) - - for key, value in self.__detector_parameters_loaded.items(): - - try: - setattr(self.__detector_parameters, key, value) - - except AttributeError as error: - - print(error) + @property + def marker_size(self) -> float: + """ArUco marker length to track in centimeter.""" - def print_configuration(self, print_all=False): - """Print aruco detection parameters.""" + return self.__marker_size - for parameter in ArUcoTrackerParameters: - if parameter in self.__detector_parameters_loaded.keys(): - print(f'\t*{parameter}: {getattr(self.__detector_parameters, parameter)}') - elif print_all: - print(f'\t{parameter}: {getattr(self.__detector_parameters, parameter)}') + @property + def tracking_data(self): + """ArUco marker tracking data.""" + + return self.__tracking_data def track(self, frame, estimate_pose = True, check_rotation = False): """Track ArUco markers in frame. @@ -115,14 +151,14 @@ class ArUcoTracker(): markers_corners, markers_ids, markers_rvecs, markers_tvecs, markers_points = [], [], [], [], [] # Track markers into gray picture - markers_corners, markers_ids, _ = aruco.detectMarkers(cv.cvtColor(frame, cv.COLOR_BGR2GRAY), self.__dictionary.markers, parameters = self.__detector_parameters) + markers_corners, markers_ids, _ = aruco.detectMarkers(cv.cvtColor(frame, cv.COLOR_BGR2GRAY), self.__dictionary.markers, parameters = self.__tracking_data.internal) if len(markers_corners) > 0: # Pose estimation is optional if estimate_pose: - markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(markers_corners, self.__marker_size, self.__camera.K, self.__camera.D) + markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(markers_corners, self.__marker_size, numpy.array(self.__camera.K), numpy.array(self.__camera.D)) # Gather tracked markers data and update metrics self.__track_count += 1 @@ -170,7 +206,7 @@ class ArUcoTracker(): # detect markers from gray picture gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) - markers_corners, markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers, parameters = self.__detector_parameters) + markers_corners, markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers, parameters = self.__tracking_data.internal) # if all board markers are detected if len(markers_corners) == expected_markers_number: diff --git a/src/argaze/ArUcoMarkers/__init__.py b/src/argaze/ArUcoMarkers/__init__.py index af43673..f5b9ca5 100644 --- a/src/argaze/ArUcoMarkers/__init__.py +++ b/src/argaze/ArUcoMarkers/__init__.py @@ -2,4 +2,4 @@ .. include:: README.md """ __docformat__ = "restructuredtext" -__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoCamera', 'ArUcoTracker', 'ArUcoSet', 'ArUcoPlan', 'ArUcoCube', 'ArUcoSetFactory'] \ No newline at end of file +__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoCamera', 'ArUcoTracker', 'ArUcoScene'] \ No newline at end of file diff --git a/src/argaze/AreaOfInterest/AOI3DScene.py b/src/argaze/AreaOfInterest/AOI3DScene.py index c5ee265..e081f57 100644 --- a/src/argaze/AreaOfInterest/AOI3DScene.py +++ b/src/argaze/AreaOfInterest/AOI3DScene.py @@ -36,9 +36,12 @@ class AOI3DScene(AOIFeatures.AOIScene): super().__init__(3, aois_3d) - def load(self, obj_filepath: str): + @classmethod + def from_obj(self, obj_filepath: str) -> AOI3DSceneType: """Load AOI3D scene from .obj file.""" + aois_3d = {} + # regex rules for .obj file parsing OBJ_RX_DICT = { 'comment': re.compile(r'#(.*)\n'), @@ -102,11 +105,13 @@ class AOI3DScene(AOIFeatures.AOIScene): # retreive all aoi3D vertices for name, face in faces.items(): aoi3D = AOIFeatures.AreaOfInterest([ vertices[i-1] for i in face ]) - self[name] = aoi3D + aois_3d[name] = aoi3D except IOError: raise IOError(f'File not found: {obj_filepath}') + return AOI3DScene(aois_3d) + def save(self, obj_filepath: str): """Save AOI3D scene into .obj file.""" @@ -192,7 +197,7 @@ class AOI3DScene(AOIFeatures.AOIScene): for name, aoi3D in self.items(): - vertices_2D, J = cv.projectPoints(aoi3D.astype(numpy.float32), R, T, K, D) + vertices_2D, J = cv.projectPoints(aoi3D.astype(numpy.float32), R, T, numpy.array(K),numpy.array(D)) aoi2D = vertices_2D.reshape((len(vertices_2D), 2)).view(AOIFeatures.AreaOfInterest) diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index de5cd48..33caa87 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -214,6 +214,17 @@ class AOIScene(): return str(self.__areas) + def __str__(self) -> str: + """String display""" + + output = '' + + for name, area in self.__areas.items(): + + output += f'\n\t{name}:\n{area}' + + return output + def items(self) -> Tuple[str, AreaOfInterest]: """Iterate over areas.""" diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py index baa46f7..1e1b5bb 100644 --- a/src/argaze/__init__.py +++ b/src/argaze/__init__.py @@ -2,4 +2,4 @@ .. include:: ../../README.md """ __docformat__ = "restructuredtext" -__all__ = ['utils','ArUcoMarkers','AreaOfInterest','GazeFeatures','DataStructures','GazeAnalysis','ArGazeProject','TobiiGlassesPro2'] \ No newline at end of file +__all__ = ['utils','ArUcoMarkers','AreaOfInterest','GazeFeatures','DataStructures','GazeAnalysis','ArGazeScene','TobiiGlassesPro2'] \ No newline at end of file diff --git a/src/argaze/utils/tobii_segment_argaze_scene_export.py b/src/argaze/utils/tobii_segment_argaze_scene_export.py new file mode 100644 index 0000000..ae42d7c --- /dev/null +++ b/src/argaze/utils/tobii_segment_argaze_scene_export.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python + +import argparse +import os, json +import math +import threading + +from argaze import * +from argaze.TobiiGlassesPro2 import * +from argaze.ArUcoMarkers import * +from argaze.AreaOfInterest import * +from argaze.utils import MiscFeatures + +import cv2 as cv +import numpy + +def make_rotation_matrix(x, y, z): + + # Create rotation matrix around x axis + c = numpy.cos(numpy.deg2rad(x)) + s = numpy.sin(numpy.deg2rad(x)) + Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) + + # Create rotation matrix around y axis + c = numpy.cos(numpy.deg2rad(y)) + s = numpy.sin(numpy.deg2rad(y)) + Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) + + # Create rotation matrix around z axis + c = numpy.cos(numpy.deg2rad(z)) + s = numpy.sin(numpy.deg2rad(z)) + Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) + + # Return intrinsic rotation matrix + return Rx.dot(Ry.dot(Rz)) + +def main(): + """ + Track ArUcoPlan into Tobii Glasses Pro 2 camera video stream. + """ + + # Manage arguments + parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) + parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path') + parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') + parser.add_argument('-p', '--project_path', metavar='ARGAZE_PROJECT', type=str, default=None, help='json argaze project filepath') + parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)') + parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction) + args = parser.parse_args() + + if args.segment_path != None: + + # Manage destination path + destination_path = '.' + if args.output != None: + + if not os.path.exists(os.path.dirname(args.output)): + + os.makedirs(os.path.dirname(args.output)) + print(f'{os.path.dirname(args.output)} folder created') + + destination_path = args.output + + else: + + destination_path = args.segment_path + + # Export into a dedicated time range folder + if args.time_range[1] != None: + timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]' + else: + timerange_path = f'[all]' + + destination_path = f'{destination_path}/{timerange_path}' + + if not os.path.exists(destination_path): + + os.makedirs(destination_path) + print(f'{destination_path} folder created') + + vs_data_filepath = f'{destination_path}/aoi.csv' + vs_video_filepath = f'{destination_path}/aoi.mp4' + + # Load a tobii segment + tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) + + # Load a tobii segment video + tobii_segment_video = tobii_segment.load_video() + print(f'Video properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') + + # Load a tobii segment data + tobii_segment_data = tobii_segment.load_data() + + print(f'Loaded data count:') + for name in tobii_segment_data.keys(): + print(f'\t{name}: {len(tobii_segment_data[name])} data') + + # Access to video timestamp data buffer + tobii_ts_vts = tobii_segment_data['VideoTimeStamp'] + + # Prepare video exportation at the same format than segment video + output_video = TobiiVideo.TobiiVideoOutput(vs_video_filepath, tobii_segment_video.stream) + + # Load argaze project + argaze_scene = ArGazeScene.ArGazeScene.from_json(args.project_path) + + print(argaze_scene) + + # Create timestamped buffer to store AOIs and primary time stamp offset + ts_offset_aois = DataStructures.TimeStampedBuffer() + + # Video and data replay loop + try: + + # Initialise progress bar + #MiscFeatures.printProgressBar(0, tobii_segment_video.duration/1e3, prefix = 'Progress:', suffix = 'Complete', length = 100) + + # Iterate on video frames + for video_ts, video_frame in tobii_segment_video.frames(): + + video_ts_ms = video_ts / 1e3 + + # Copy video frame to edit visualisation on it without disrupting aruco tracking + visu_frame = video_frame.copy() + + # Prepare to store projected AOI + projected_aois = {} + + # Process video and data frame + try: + + # Get nearest video timestamp + _, nearest_vts = tobii_ts_vts.get_last_before(video_ts) + + projected_aois['offset'] = nearest_vts + + # Hide frame left and right borders before tracking to ignore markers outside focus area + cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width/6), int(video_frame.height)), (0, 0, 0), -1) + cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - 1/6)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1) + + # Project scene into frame + scene_projection, unvalid = argaze_scene.project(video_frame.matrix, valid_markers=1, visual_hfov=TobiiSpecifications.VISUAL_HFOV) + + # DEBUG: print unvalid distances or angles + for key, value in unvalid.items(): + print(f'{video_ts}: Unvalid {key}: {value}.') + + # Store all projected aoi + for aoi_name in scene_projection.keys(): + + projected_aois[aoi_name] = numpy.rint(scene_projection[aoi_name]).astype(int) + + # Draw tracked markers + argaze_scene.aruco_tracker.draw_tracked_markers(visu_frame.matrix) + + # Draw scene projection + scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255)) + + # Catch warnings raised by project_scene method + except UserWarning as w: + + projected_aois['warning'] = w + + # Draw tracked markers + argaze_scene.aruco_tracker.draw_tracked_markers(visu_frame.matrix) + + if w == 'Pose estimation fails': + + # Draw black AOI scene + scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 0, 0)) + + cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1) + cv.putText(visu_frame.matrix, str(w), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) + + # Raised when timestamped buffer is empty + except KeyError as e: + pass + + # Store projected AOI + ts_offset_aois[video_ts] = projected_aois + + # Draw focus area + cv.rectangle(visu_frame.matrix, (int(video_frame.width/6), 0), (int(visu_frame.width*(1-1/6)), int(visu_frame.height)), (255, 150, 150), 1) + + # Draw center + cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1) + cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1) + + # Write segment timing + cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1) + cv.putText(visu_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) + + if args.window: + + # Close window using 'Esc' key + if cv.waitKey(1) == 27: + break + + # Display visualisation + cv.imshow(f'Segment {tobii_segment.id} ArUco AOI', visu_frame.matrix) + + # Write video + output_video.write(visu_frame.matrix) + + # Update Progress Bar + progress = video_ts_ms - int(args.time_range[0] * 1e3) + #MiscFeatures.printProgressBar(progress, tobii_segment_video.duration/1e3, prefix = 'Progress:', suffix = 'Complete', length = 100) + + # Exit on 'ctrl+C' interruption + except KeyboardInterrupt: + pass + + # Stop frame display + cv.destroyAllWindows() + + # End output video file + output_video.close() + + # Print aruco tracking metrics + print('\nAruco marker tracking metrics') + try_count, tracked_counts = argaze_scene.aruco_tracker.track_metrics + + for marker_id, tracked_count in tracked_counts.items(): + print(f'Markers {marker_id} has been detected in {tracked_count} / {try_count} frames ({round(100 * tracked_count / try_count, 2)} %)') + + # Export aruco aoi data + ts_offset_aois.as_dataframe().to_csv(vs_data_filepath, index=True) + print(f'Aruco AOI data saved into {vs_data_filepath}') + + # Notify when the aruco aoi video has been exported + print(f'Aruco AOI video saved into {vs_video_filepath}') + +if __name__ == '__main__': + + main() \ No newline at end of file diff --git a/src/argaze/utils/tobii_segment_aruco_set_export.py b/src/argaze/utils/tobii_segment_aruco_set_export.py deleted file mode 100644 index d93658f..0000000 --- a/src/argaze/utils/tobii_segment_aruco_set_export.py +++ /dev/null @@ -1,346 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os, json -import math -import threading - -from argaze import DataStructures -from argaze import GazeFeatures -from argaze.TobiiGlassesPro2 import * -from argaze.ArUcoMarkers import * -from argaze.AreaOfInterest import * -from argaze.utils import MiscFeatures - -import cv2 as cv -import numpy - -def make_rotation_matrix(x, y, z): - - # Create rotation matrix around x axis - c = numpy.cos(numpy.deg2rad(x)) - s = numpy.sin(numpy.deg2rad(x)) - Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]]) - - # Create rotation matrix around y axis - c = numpy.cos(numpy.deg2rad(y)) - s = numpy.sin(numpy.deg2rad(y)) - Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]]) - - # Create rotation matrix around z axis - c = numpy.cos(numpy.deg2rad(z)) - s = numpy.sin(numpy.deg2rad(z)) - Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]]) - - # Return intrinsic rotation matrix - return Rx.dot(Ry.dot(Rz)) - -def main(): - """ - Track ArUcoPlan into Tobii Glasses Pro 2 camera video stream. - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path') - parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') - parser.add_argument('-cc', '--camera_calibration', metavar='CAM_CALIB', type=str, default=None, help='json camera calibration filepath') - parser.add_argument('-tc', '--tracker_configuration', metavar='TRACK_CONFIG', type=str, default=None, help='json aruco tracker configuration filepath') - parser.add_argument('-as', '--aruco_set', metavar='ARUCO_SET', type=str, help='json aruco set description filepath') - parser.add_argument('-ai', '--aoi_scene', metavar='AOI_SCENE', type=str, help='obj aoi 3D scene description filepath') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)') - parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction) - args = parser.parse_args() - - if args.segment_path != None: - - # Manage destination path - destination_path = '.' - if args.output != None: - - if not os.path.exists(os.path.dirname(args.output)): - - os.makedirs(os.path.dirname(args.output)) - print(f'{os.path.dirname(args.output)} folder created') - - destination_path = args.output - - else: - - destination_path = args.segment_path - - # Export into a dedicated time range folder - if args.time_range[1] != None: - timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]' - else: - timerange_path = f'[all]' - - destination_path = f'{destination_path}/{timerange_path}' - - if not os.path.exists(destination_path): - - os.makedirs(destination_path) - print(f'{destination_path} folder created') - - vs_data_filepath = f'{destination_path}/aoi.csv' - vs_video_filepath = f'{destination_path}/aoi.mp4' - - # Load a tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) - - # Load a tobii segment video - tobii_segment_video = tobii_segment.load_video() - print(f'Video properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') - - # Load a tobii segment data - tobii_segment_data = tobii_segment.load_data() - - print(f'Loaded data count:') - for name in tobii_segment_data.keys(): - print(f'\t{name}: {len(tobii_segment_data[name])} data') - - # Access to video timestamp data buffer - tobii_ts_vts = tobii_segment_data['VideoTimeStamp'] - - # Access to timestamped head rotations data buffer - tobii_ts_head_rotations = tobii_segment_data['Gyroscope'] - - # Prepare video exportation at the same format than segment video - output_video = TobiiVideo.TobiiVideoOutput(vs_video_filepath, tobii_segment_video.stream) - - # Create aruco camera - aruco_camera = ArUcoCamera.ArUcoCamera() - - # Load calibration file - if args.camera_calibration != None: - - aruco_camera.load_calibration_file(args.camera_calibration) - - else: - - raise UserWarning('.json camera calibration filepath required. Use -c option.') - - # Build aruco set from its description file - aruco_set = ArUcoSetFactory.ArUcoSetFactory.make(args.aruco_set) - - print(f'\n{type(aruco_set)} cache: {aruco_set}') - - # Create aruco tracker - aruco_tracker = ArUcoTracker.ArUcoTracker(aruco_set.dictionary, aruco_set.marker_size, aruco_camera) - - # Load specific configuration file - if args.tracker_configuration != None: - - aruco_tracker.load_configuration_file(args.tracker_configuration) - - print(f'\nArUcoTracker configuration for markers detection:') - aruco_tracker.print_configuration() - - # Load AOI 3D scene centered onto aruco set - aoi3D_scene = AOI3DScene.AOI3DScene() - aoi3D_scene.load(args.aoi_scene) - - print(f'\nAOI in {os.path.basename(args.aoi_scene)} scene:') - for aoi in aoi3D_scene.keys(): - print(f'\t{aoi}') - - # Create timestamped buffer to store AOIs and primary time stamp offset - ts_offset_aois = DataStructures.TimeStampedBuffer() - - # Video and data replay loop - try: - - # Initialise progress bar - #MiscFeatures.printProgressBar(0, tobii_segment_video.duration/1e3, prefix = 'Progress:', suffix = 'Complete', length = 100) - - head_moving = False - head_movement_last = 0. - - # Iterate on video frames - for video_ts, video_frame in tobii_segment_video.frames(): - - video_ts_ms = video_ts / 1e3 - - # Copy video frame to edit visualisation on it without disrupting aruco tracking - visu_frame = video_frame.copy() - - # Process video and data frame - try: - - # Get nearest video timestamp - _, nearest_vts = tobii_ts_vts.get_last_before(video_ts) - - # Edit dictionary to store 2D aoi with primary timestamp offset and warning - all_aoi2D = { - 'offset': nearest_vts.offset, - 'warning': None - } - - # Get nearest head rotation before video timestamp and remove all head rotations before - _, nearest_head_rotation = tobii_ts_head_rotations.pop_first_until(video_ts) - - # Calculate head movement considering only head yaw and pitch - head_movement = numpy.array(nearest_head_rotation.value) - head_movement_px = head_movement.astype(int) - head_movement_norm = numpy.linalg.norm(head_movement[0:2]) - - # Draw movement vector - cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2)), (int(visu_frame.width/2) + head_movement_px[1], int(visu_frame.height/2) - head_movement_px[0]), (150, 150, 150), 3) - - # Head movement detection hysteresis - # TODO : pass the threshold value as argument - if not head_moving and head_movement_norm > 50: - head_moving = True - - if head_moving and head_movement_norm < 10: - head_moving = False - - # When head is moving, ArUco tracking could return bad pose estimation and so bad AOI scene projection - if head_moving: - - all_aoi2D['warning'] = 'Head is moving' - - ts_offset_aois[video_ts] = all_aoi2D - - raise UserWarning(all_aoi2D['warning']) - - # Hide frame left and right borders before tracking to ignore markers outside focus area - cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width/6), int(video_frame.height)), (0, 0, 0), -1) - cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - 1/6)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1) - - # Track markers with pose estimation and draw them - aruco_tracker.track(video_frame.matrix) - aruco_tracker.draw_tracked_markers(visu_frame.matrix) - - # When no marker is detected, no AOI scene projection can't be done - if len(aruco_tracker.tracked_markers) == 0: - - all_aoi2D['warning'] = 'No marker detected' - - ts_offset_aois[video_ts] = all_aoi2D - - raise UserWarning(all_aoi2D['warning']) - - # Estimate set pose from tracked markers - tvec, rvec, success, validity, unvalid = aruco_set.estimate_pose(aruco_tracker.tracked_markers) - - # Print unvalid distances or angles - for key, value in unvalid.items(): - print(f'{video_ts}: Unvalid {key}: {value}.') - - # When pose estimation fails, ignore AOI scene projection - if not success: - - # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it - # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable. - aoi2D_video_scene = aoi3D_scene.project(tvec, rvec, aruco_camera.K) - - # Draw black AOI scene - aoi2D_video_scene.draw(visu_frame.matrix, (0, 0), color=(0, 0, 0)) - - all_aoi2D['warning'] = 'Pose estimation fails' - - ts_offset_aois[video_ts] = all_aoi2D - - raise UserWarning(all_aoi2D['warning']) - - # Consider pose estimation if it is validated by 1 face at least - elif validity >= 1: - - # Transform scene into camera referential - aoi3D_camera = aoi3D_scene.transform(tvec, rvec) - - # Get aoi inside vision cone field - cone_vision_height_cm = 200 # cm - cone_vision_radius_cm = numpy.tan(numpy.deg2rad(TobiiSpecifications.VISUAL_HFOV / 2)) * cone_vision_height_cm - - aoi3D_inside, aoi3D_outside = aoi3D_camera.vision_cone(cone_vision_radius_cm, cone_vision_height_cm) - - # Keep only aoi inside vision cone field - aoi3D_scene = aoi3D_scene.copy(exclude=aoi3D_outside.keys()) - - # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it - # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable. - aoi2D_video_scene = aoi3D_scene.project(tvec, rvec, aruco_camera.K) - - # Draw AOI scene - aoi2D_video_scene.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255)) - - # Store all 2D aoi - for aoi_name in aoi2D_video_scene.keys(): - - all_aoi2D[aoi_name] = numpy.rint(aoi2D_video_scene[aoi_name]).astype(int) - - ts_offset_aois[video_ts] = all_aoi2D - - # Warn user when the merged scene is empty - if len(aoi2D_video_scene.keys()) == 0: - - all_aoi2D['warning'] = 'AOI projection is empty' - - raise UserWarning(all_aoi2D['warning']) - - # Write warning - except UserWarning as w: - - cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(w), (20, 80), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Raised when timestamped buffer is empty - except KeyError as e: - pass - - # Draw focus area - cv.rectangle(visu_frame.matrix, (int(video_frame.width/6), 0), (int(visu_frame.width*(1-1/6)), int(visu_frame.height)), (255, 150, 150), 1) - - # Draw center - cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1) - cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1) - - # Write segment timing - cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1) - cv.putText(visu_frame.matrix, f'Segment time: {int(video_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - if args.window: - - # Close window using 'Esc' key - if cv.waitKey(1) == 27: - break - - # Display visualisation - cv.imshow(f'Segment {tobii_segment.id} ArUco AOI', visu_frame.matrix) - - # Write video - output_video.write(visu_frame.matrix) - - # Update Progress Bar - progress = video_ts_ms - int(args.time_range[0] * 1e3) - #MiscFeatures.printProgressBar(progress, tobii_segment_video.duration/1e3, prefix = 'Progress:', suffix = 'Complete', length = 100) - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - # Stop frame display - cv.destroyAllWindows() - - # End output video file - output_video.close() - - # Print aruco tracking metrics - print('\nAruco marker tracking metrics') - try_count, tracked_counts = aruco_tracker.track_metrics - - for marker_id, tracked_count in tracked_counts.items(): - print(f'Markers {marker_id} has been detected in {tracked_count} / {try_count} frames ({round(100 * tracked_count / try_count, 2)} %)') - - # Export aruco aoi data - ts_offset_aois.as_dataframe().to_csv(vs_data_filepath, index=True) - print(f'Aruco AOI data saved into {vs_data_filepath}') - - # Notify when the aruco aoi video has been exported - print(f'Aruco AOI video saved into {vs_video_filepath}') - -if __name__ == '__main__': - - main() \ No newline at end of file -- cgit v1.1