aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoCube.py445
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoPlan.py60
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoSet.py448
-rw-r--r--src/argaze/ArUcoMarkers/__init__.py2
-rw-r--r--src/argaze/ArUcoMarkers/utils/aruco_cube.json24
-rw-r--r--src/argaze/ArUcoMarkers/utils/aruco_plan.json30
-rw-r--r--src/argaze/utils/tobii_stream_aruco_cube_display.py (renamed from src/argaze/utils/tobii_stream_arcube_display.py)5
-rw-r--r--src/argaze/utils/tobii_stream_aruco_plan_display.py360
8 files changed, 968 insertions, 406 deletions
diff --git a/src/argaze/ArUcoMarkers/ArUcoCube.py b/src/argaze/ArUcoMarkers/ArUcoCube.py
index 00f353a..88163d8 100644
--- a/src/argaze/ArUcoMarkers/ArUcoCube.py
+++ b/src/argaze/ArUcoMarkers/ArUcoCube.py
@@ -6,435 +6,74 @@ import json
import math
import itertools
-from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker
+from argaze.ArUcoMarkers import ArUcoSet
import numpy
import cv2 as cv
import cv2.aruco as aruco
@dataclass
-class ArUcoCubeFace():
- """Define cube face pose and marker."""
-
- translation: numpy.array
- """Position in cube referential."""
-
- rotation: numpy.array
- """Rotation in cube referential."""
-
- marker: dict
- """ArUco marker linked to the face."""
-
-@dataclass
-class ArUcoCube():
- """Define a cube with ArUco markers on each face and estimate its pose."""
-
- dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary
- """ArUco dictionary of cube markers."""
-
- marker_size: int = field(init=False)
- """Size of markers in centimeter."""
+class ArUcoCube(ArUcoSet.ArUcoSet):
+ """Define ArUco cube as a specific ArUco set."""
edge_size: int = field(init=False)
"""Size of the cube edges in centimeter."""
- faces: dict = field(init=False, default_factory=dict)
- """All named faces of the cube and their ArUco markers."""
-
- angle_tolerance: float = field(init=False)
- """Angle error tolerance allowed to validate face pose in degree."""
-
- distance_tolerance: float = field(init=False)
- """Distance error tolerance allowed to validate face pose in centimeter."""
-
def __init__(self, configuration_filepath):
"""Define cube from a .json file."""
+ # Load generic set configuration data
+ super().__init__(configuration_filepath)
+
+ # Load specific cube configuration data
with open(configuration_filepath) as configuration_file:
# Deserialize .json
# TODO find a better way
configuration = json.load(configuration_file)
- # Load dictionary
- self.dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(configuration['dictionary'])
-
- # Load marker size
- self.marker_size = configuration['marker_size']
-
# Load edge size
self.edge_size = configuration['edge_size']
- # Load faces
- self.faces = {}
- for name, face in configuration['faces'].items():
- marker = ArUcoMarker.ArUcoMarker(self.dictionary, face['marker'], self.marker_size)
- self.faces[name] = ArUcoCubeFace(numpy.array(face['translation']).astype(numpy.float32), numpy.array(face['rotation']).astype(numpy.float32), marker)
-
- # Load angle tolerance
- self.angle_tolerance = configuration['angle_tolerance']
-
- # Load distance tolerance
- self.distance_tolerance = configuration['distance_tolerance']
-
- # Init pose data
- self.__translation = numpy.zeros(3)
- self.__rotation = numpy.zeros(3)
- self.__succeded = False
- self.__validity = 0
-
- # Process markers ids to speed up further calculations
- self.__identifier_cache = {}
- for name, face in self.faces.items():
- self.__identifier_cache[face.marker.identifier] = name
-
- # Process each face pose to speed up further calculations
- self.__translation_cache = {}
- for name, face in self.faces.items():
- self.__translation_cache[name] = face.translation * self.edge_size / 2
-
- # Process each face rotation matrix to speed up further calculations
- self.__rotation_cache = {}
- for name, face in self.faces.items():
-
- # Create intrinsic rotation matrix
- R = self.__make_rotation_matrix(*face.rotation)
-
- assert(self.__is_rotation_matrix(R))
-
- # Store rotation matrix
- self.__rotation_cache[name] = R
-
- # Process each axis-angle face combination to speed up further calculations
- self.__angle_cache = {}
- for (A_name, A_face), (B_name, B_face) in itertools.combinations(self.faces.items(), 2):
-
- A = self.__rotation_cache[A_name]
- B = self.__rotation_cache[B_name]
-
- # Rotation matrix from A face to B face
- AB = B.dot(A.T)
-
- assert(self.__is_rotation_matrix(AB))
-
- # Calculate axis-angle representation of AB rotation matrix
- angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2))
-
- try:
- self.__angle_cache[A_name][B_name] = angle
- except:
- self.__angle_cache[A_name] = {B_name: angle}
-
- try:
- self.__angle_cache[B_name][A_name] = angle
- except:
- self.__angle_cache[B_name] = {A_name: angle}
-
- # Process distance between face combination to speed up further calculations
- self.__distance_cache = numpy.linalg.norm(numpy.array([0, 0, self.edge_size/2]) - numpy.array([0, self.edge_size/2, 0]))
-
- def print_cache(self):
- """Print pre-processed data."""
-
- print('\nIdentifier cache:')
- for i, name in self.__identifier_cache.items():
- print(f'- {i}: {name}')
-
- print('\nTranslation cache:')
- for name, item in self.__translation_cache.items():
- print(f'- {name}: {item}')
-
- print('\nRotation cache:')
- for name, item in self.__rotation_cache.items():
- print(f'- {name}:\n{item}')
-
- print('\nAngle cache:')
- for A_name, A_angle_cache in self.__angle_cache.items():
- for B_name, angle in A_angle_cache.items():
- print(f'- {A_name}/{B_name}: {angle:3f}')
-
- print(f'\nDistance cache: {self.__distance_cache}')
-
- def __make_rotation_matrix(self, x, y, z):
-
- # Create rotation matrix around x axis
- c = numpy.cos(numpy.deg2rad(x))
- s = numpy.sin(numpy.deg2rad(x))
- Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]])
-
- # Create rotation matrix around y axis
- c = numpy.cos(numpy.deg2rad(y))
- s = numpy.sin(numpy.deg2rad(y))
- Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]])
-
- # Create rotation matrix around z axis
- c = numpy.cos(numpy.deg2rad(z))
- s = numpy.sin(numpy.deg2rad(z))
- Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])
-
- # Return intrinsic rotation matrix
- return Rx.dot(Ry.dot(Rz))
-
- def __is_rotation_matrix(self, R):
- """Checks if a matrix is a valid rotation matrix."""
-
- I = numpy.identity(3, dtype = R.dtype)
- return numpy.linalg.norm(I - numpy.dot(R.T, R)) < 1e-6
-
- def __normalise_face_pose(self, name, face, F):
-
- # Transform face rotation into cube rotation vector
- R = self.__rotation_cache[name]
- rvec, _ = cv.Rodrigues(F.dot(R))
-
- #print(f'{name} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}')
-
- # Transform face translation into cube translation vector
- OF = face.translation
- T = self.__translation_cache[name]
- FC = F.dot(R.dot(T))
-
- tvec = OF + FC
-
- #print(f'{name} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}')
-
- return rvec, tvec
-
- def estimate_pose(self, tracked_markers) -> Tuple[numpy.array, numpy.array, bool, int]:
- """Estimate cube pose from tracked markers (cf ArUcoTracker.track())
-
- * **Returns:**
- - translation vector
- - rotation vector
- - pose estimation success status
- - the number of faces used to estimate the pose as validity score
- """
-
- # Init pose data
- self.__translation = numpy.zeros(3)
- self.__rotation = numpy.zeros(3)
- self.__succeded = False
- self.__validity = 0
-
- # Don't try to estimate pose if there is no tracked markers
- if len(tracked_markers) == 0:
-
- return self.__translation, self.__rotation, self.__succeded, self.__validity
-
- # Look for faces related to tracked markers
- tracked_faces = {}
- for (marker_id, marker) in tracked_markers.items():
-
- try:
- name = self.__identifier_cache[marker_id]
- tracked_faces[name] = marker
-
- except KeyError:
- continue
-
- #print('-------------- ArUcoCube pose estimation --------------')
-
- # Pose validity checking is'nt possible when only one face of the cube is tracked
- if len(tracked_faces.keys()) == 1:
-
- # Get arcube pose from to the unique face pose
- name, face = tracked_faces.popitem()
- F, _ = cv.Rodrigues(face.rotation)
-
- self.__rotation, self.__translation = self.__normalise_face_pose(name,face, F)
- self.__succeded = True
- self.__validity = 1
-
- #print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
- #print(f'arcube rotation vector: {self.__rotation[0][0]:3f} {self.__rotation[1][0]:3f} {self.__rotation[2][0]:3f}')
- #print(f'arcube translation vector: {self.__translation[0]:3f} {self.__translation[1]:3f} {self.__translation[2]:3f}')
-
- # Pose validity checking processes faces two by two
- else:
-
- valid_faces = []
- valid_rvecs = []
- valid_tvecs = []
-
- for (A_name, A_face), (B_name, B_face) in itertools.combinations(tracked_faces.items(), 2):
-
- #print(f'** {A_name} > {B_name}')
-
- # Get face rotation estimation
- # Use rotation matrix instead of rotation vector
- A, _ = cv.Rodrigues(A_face.rotation)
- B, _ = cv.Rodrigues(B_face.rotation)
-
- # Rotation matrix from A face to B face
- AB = B.dot(A.T)
-
- assert(self.__is_rotation_matrix(AB))
-
- # Calculate axis-angles representation of AB rotation matrix
- angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2))
-
- #print('rotation angle:')
- #print(angle)
-
- expected_angle = self.__angle_cache[A_name][B_name]
-
- #print('expected angle:')
- #print(expected_angle)
-
- # Calculate distance between A face center and B face center
- distance = numpy.linalg.norm(A_face.translation - B_face.translation)
- expected_distance = self.__distance_cache
-
- # Check angle and distance according given tolerance then normalise face pose
- valid_angle = math.isclose(angle, expected_angle, abs_tol=self.angle_tolerance)
- valid_distance = math.isclose(distance, expected_distance, abs_tol=self.distance_tolerance)
-
- if valid_angle and valid_distance:
-
- if A_name not in valid_faces:
-
- # Remember this face is already validated
- valid_faces.append(A_name)
-
- rvec, tvec = self.__normalise_face_pose(A_name, A_face, A)
-
- # Store normalised face pose
- valid_rvecs.append(rvec)
- valid_tvecs.append(tvec)
-
- if B_name not in valid_faces:
-
- # Remember this face is already validated
- valid_faces.append(B_name)
-
- rvec, tvec = self.__normalise_face_pose(B_name, B_face, B)
-
- # Store normalised face pose
- valid_rvecs.append(rvec)
- valid_tvecs.append(tvec)
-
- if len(valid_faces) > 1:
-
- # Consider arcube rotation as the mean of all valid translations
- # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before.
- self.__rotation = numpy.mean(numpy.array(valid_rvecs), axis=0)
-
- # Consider arcube translation as the mean of all valid translations
- self.__translation = numpy.mean(numpy.array(valid_tvecs), axis=0)
-
- #print(':::::::::::::::::::::::::::::::::::::::::::::::::::')
- #print(f'arcube rotation vector: {self.__rotation[0][0]:3f} {self.__rotation[1][0]:3f} {self.__rotation[2][0]:3f}')
- #print(f'arcube translation vector: {self.__translation[0]:3f} {self.__translation[1]:3f} {self.__translation[2]:3f}')
-
- self.__succeded = True
- self.__validity = len(valid_faces)
-
- #print('----------------------------------------------------')
-
- return self.__translation, self.__rotation, self.__succeded, self.__validity
-
- @property
- def translation(self) -> numpy.array:
- """Access to cube translation vector.
-
- .. warning::
- Setting cube translation vector implies succeded status to be True and validity score to be 0."""
-
- return self.__translation
-
- @translation.setter
- def translation(self, tvec):
-
- self.__translation = tvec
- self.__succeded = True
- self.__validity = 0
-
- @property
- def rotation(self) -> numpy.array:
- """Access to cube rotation vector.
-
- .. warning::
- Setting cube rotation vector implies succeded status to be True and validity score to be 0."""
-
- return self.__translation
-
- @rotation.setter
- def rotation(self, rvec):
-
- self.__rotation = rvec
- self.__succeded = True
- self.__validity = 0
-
- @property
- def succeded(self) -> bool:
- """Access to cube pose estimation succeded status."""
-
- return self.__succeded
-
- @property
- def validity(self) -> int:
- """Access to cube pose estimation validity score."""
-
- return self.__validity
-
- def draw(self, frame, K, D, draw_faces=True):
- """Draw cube axis and faces."""
+ def draw(self, frame, K, D, draw_places=True):
+ """Draw cube, axis and places."""
l = self.edge_size / 2
ll = self.edge_size
# Select color according validity score
- n = 95 * self.__validity if self.__validity < 2 else 0
- f = 159 * self.__validity if self.__validity < 2 else 255
-
- try:
-
- # Draw axis
- axisPoints = numpy.float32([[ll, 0, 0], [0, ll, 0], [0, 0, ll], [0, 0, 0]]).reshape(-1, 3)
- axisPoints, _ = cv.projectPoints(axisPoints, self.__rotation, self.__translation, K, D)
- axisPoints = axisPoints.astype(int)
+ n = 95 * self._validity if self._validity < 2 else 0
+ f = 159 * self._validity if self._validity < 2 else 255
- cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (n,n,f), 5) # X (red)
- cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (n,f,n), 5) # Y (green)
- cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (f,n,n), 5) # Z (blue)
-
- if draw_faces:
-
- # Draw left face
- leftPoints = numpy.float32([[-l, l, l], [-l, -l, l], [-l, -l, -l], [-l, l, -l]]).reshape(-1, 3)
- leftPoints, _ = cv.projectPoints(leftPoints, self.__rotation, self.__translation, K, D)
- leftPoints = leftPoints.astype(int)
-
- cv.line(frame, tuple(leftPoints[0].ravel()), tuple(leftPoints[1].ravel()), (n,n,f), 2)
- cv.line(frame, tuple(leftPoints[1].ravel()), tuple(leftPoints[2].ravel()), (n,n,f), 2)
- cv.line(frame, tuple(leftPoints[2].ravel()), tuple(leftPoints[3].ravel()), (n,n,f), 2)
- cv.line(frame, tuple(leftPoints[3].ravel()), tuple(leftPoints[0].ravel()), (n,n,f), 2)
-
- # Draw top face
- topPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, l, -l], [l, l, -l]]).reshape(-1, 3)
- topPoints, _ = cv.projectPoints(topPoints, self.__rotation, self.__translation, K, D)
- topPoints = topPoints.astype(int)
-
- cv.line(frame, tuple(topPoints[0].ravel()), tuple(topPoints[1].ravel()), (n,f,n), 2)
- cv.line(frame, tuple(topPoints[1].ravel()), tuple(topPoints[2].ravel()), (n,f,n), 2)
- cv.line(frame, tuple(topPoints[2].ravel()), tuple(topPoints[3].ravel()), (n,f,n), 2)
- cv.line(frame, tuple(topPoints[3].ravel()), tuple(topPoints[0].ravel()), (n,f,n), 2)
-
- # Draw front face
- frontPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, -l, l], [l, -l, l]]).reshape(-1, 3)
- frontPoints, _ = cv.projectPoints(frontPoints, self.__rotation, self.__translation, K, D)
- frontPoints = frontPoints.astype(int)
-
- cv.line(frame, tuple(frontPoints[0].ravel()), tuple(frontPoints[1].ravel()), (f,n,n), 2)
- cv.line(frame, tuple(frontPoints[1].ravel()), tuple(frontPoints[2].ravel()), (f,n,n), 2)
- cv.line(frame, tuple(frontPoints[2].ravel()), tuple(frontPoints[3].ravel()), (f,n,n), 2)
- cv.line(frame, tuple(frontPoints[3].ravel()), tuple(frontPoints[0].ravel()), (f,n,n), 2)
-
- except Exception as e:
+ # Draw left face
+ leftPoints = numpy.float32([[-l, l, l], [-l, -l, l], [-l, -l, -l], [-l, l, -l]]).reshape(-1, 3)
+ leftPoints, _ = cv.projectPoints(leftPoints, self._rotation, self._translation, K, D)
+ leftPoints = leftPoints.astype(int)
+
+ cv.line(frame, tuple(leftPoints[0].ravel()), tuple(leftPoints[1].ravel()), (n,n,f), 2)
+ cv.line(frame, tuple(leftPoints[1].ravel()), tuple(leftPoints[2].ravel()), (n,n,f), 2)
+ cv.line(frame, tuple(leftPoints[2].ravel()), tuple(leftPoints[3].ravel()), (n,n,f), 2)
+ cv.line(frame, tuple(leftPoints[3].ravel()), tuple(leftPoints[0].ravel()), (n,n,f), 2)
+
+ # Draw top face
+ topPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, l, -l], [l, l, -l]]).reshape(-1, 3)
+ topPoints, _ = cv.projectPoints(topPoints, self._rotation, self._translation, K, D)
+ topPoints = topPoints.astype(int)
+
+ cv.line(frame, tuple(topPoints[0].ravel()), tuple(topPoints[1].ravel()), (n,f,n), 2)
+ cv.line(frame, tuple(topPoints[1].ravel()), tuple(topPoints[2].ravel()), (n,f,n), 2)
+ cv.line(frame, tuple(topPoints[2].ravel()), tuple(topPoints[3].ravel()), (n,f,n), 2)
+ cv.line(frame, tuple(topPoints[3].ravel()), tuple(topPoints[0].ravel()), (n,f,n), 2)
+
+ # Draw front face
+ frontPoints = numpy.float32([[l, l, l], [-l, l, l], [-l, -l, l], [l, -l, l]]).reshape(-1, 3)
+ frontPoints, _ = cv.projectPoints(frontPoints, self._rotation, self._translation, K, D)
+ frontPoints = frontPoints.astype(int)
+
+ cv.line(frame, tuple(frontPoints[0].ravel()), tuple(frontPoints[1].ravel()), (f,n,n), 2)
+ cv.line(frame, tuple(frontPoints[1].ravel()), tuple(frontPoints[2].ravel()), (f,n,n), 2)
+ cv.line(frame, tuple(frontPoints[2].ravel()), tuple(frontPoints[3].ravel()), (f,n,n), 2)
+ cv.line(frame, tuple(frontPoints[3].ravel()), tuple(frontPoints[0].ravel()), (f,n,n), 2)
- print(e)
- print(self.__translation)
- print(self.__rotation)
- print(self.__succeded)
- print(self.__validity)
- print(axisPoints)
+ # Draw axis and places
+ super().draw(frame, K, D, draw_places)
diff --git a/src/argaze/ArUcoMarkers/ArUcoPlan.py b/src/argaze/ArUcoMarkers/ArUcoPlan.py
new file mode 100644
index 0000000..aed42b3
--- /dev/null
+++ b/src/argaze/ArUcoMarkers/ArUcoPlan.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+
+from typing import Tuple
+from dataclasses import dataclass, field
+import json
+import math
+import itertools
+
+from argaze.ArUcoMarkers import ArUcoSet
+
+import numpy
+import cv2 as cv
+import cv2.aruco as aruco
+
+@dataclass
+class ArUcoPlan(ArUcoSet.ArUcoSet):
+ """Define a ArUco plan as a specific ArUco set."""
+
+ width: int = field(init=False)
+ """Width of the plan in centimeter."""
+
+ height: int = field(init=False)
+ """Height of the plan in centimeter."""
+
+ def __init__(self, configuration_filepath):
+ """Define plan from a .json file."""
+
+ # Load generic set configuration data
+ super().__init__(configuration_filepath)
+
+ # Load specific plan configuration data
+ with open(configuration_filepath) as configuration_file:
+
+ # Deserialize .json
+ # TODO find a better way
+ configuration = json.load(configuration_file)
+
+ # Load plan dimensions
+ self.width = configuration['width']
+ self.height = configuration['height']
+
+ def draw(self, frame, K, D, draw_places=True):
+ """Draw plan, axis and places."""
+
+ # Select color according validity score
+ n = 95 * self._validity if self._validity < 2 else 0
+ f = 159 * self._validity if self._validity < 2 else 255
+
+ # Draw plan
+ planPoints = numpy.float32([[0, 0, 0], [self.width, 0, 0], [self.width, self.height, 0], [0, self.height, 0]]).reshape(-1, 3)
+ planPoints, _ = cv.projectPoints(planPoints, self._rotation, self._translation, K, D)
+ planPoints = planPoints.astype(int)
+
+ cv.line(frame, tuple(planPoints[0].ravel()), tuple(planPoints[1].ravel()), (f,f,f), 2)
+ cv.line(frame, tuple(planPoints[1].ravel()), tuple(planPoints[2].ravel()), (f,f,f), 2)
+ cv.line(frame, tuple(planPoints[2].ravel()), tuple(planPoints[3].ravel()), (f,f,f), 2)
+ cv.line(frame, tuple(planPoints[3].ravel()), tuple(planPoints[0].ravel()), (f,f,f), 2)
+
+ # Draw axis and places
+ super().draw(frame, K, D, draw_places)
diff --git a/src/argaze/ArUcoMarkers/ArUcoSet.py b/src/argaze/ArUcoMarkers/ArUcoSet.py
new file mode 100644
index 0000000..2eeea32
--- /dev/null
+++ b/src/argaze/ArUcoMarkers/ArUcoSet.py
@@ -0,0 +1,448 @@
+#!/usr/bin/env python
+
+from typing import Tuple
+from dataclasses import dataclass, field
+import json
+import math
+import itertools
+
+from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoCamera
+
+import numpy
+import cv2 as cv
+import cv2.aruco as aruco
+
+@dataclass
+class ArUcoSetPlace():
+ """Define set place pose and marker."""
+
+ translation: numpy.array
+ """Position in set referential."""
+
+ rotation: numpy.array
+ """Rotation in set referential."""
+
+ marker: dict
+ """ArUco marker linked to the place."""
+
+@dataclass
+class ArUcoSet():
+ """Define abstract class to handle specific ArUco markers set and estimate its pose."""
+
+ dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary
+ """ArUco dictionary of set markers."""
+
+ marker_size: int = field(init=False)
+ """Size of markers in centimeter."""
+
+ places: dict = field(init=False, default_factory=dict)
+ """All named places of the set and their ArUco markers."""
+
+ angle_tolerance: float = field(init=False)
+ """Angle error tolerance allowed to validate place pose in degree."""
+
+ distance_tolerance: float = field(init=False)
+ """Distance error tolerance allowed to validate place pose in centimeter."""
+
+ def __init__(self, configuration_filepath):
+ """Define set from a .json file."""
+
+ with open(configuration_filepath) as configuration_file:
+
+ # Deserialize .json
+ # TODO find a better way
+ configuration = json.load(configuration_file)
+
+ # Load dictionary
+ self.dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(configuration['dictionary'])
+
+ # Load marker size
+ self.marker_size = configuration['marker_size']
+
+ # Load places
+ self.places = {}
+ for name, place in configuration['places'].items():
+ marker = ArUcoMarker.ArUcoMarker(self.dictionary, place['marker'], self.marker_size)
+ self.places[name] = ArUcoSetPlace(numpy.array(place['translation']).astype(numpy.float32), numpy.array(place['rotation']).astype(numpy.float32), marker)
+
+ # Load angle tolerance
+ self.angle_tolerance = configuration['angle_tolerance']
+
+ # Load distance tolerance
+ self.distance_tolerance = configuration['distance_tolerance']
+
+ # Init pose data
+ self._translation = numpy.zeros(3)
+ self._rotation = numpy.zeros(3)
+ self._succeded = False
+ self._validity = 0
+
+ # Process markers ids to speed up further calculations
+ self.__identifier_cache = {}
+ for name, place in self.places.items():
+ self.__identifier_cache[place.marker.identifier] = name
+
+ # Process each place pose to speed up further calculations
+ self.__translation_cache = {}
+ for name, place in self.places.items():
+ self.__translation_cache[name] = place.translation
+
+ # Process each place rotation matrix to speed up further calculations
+ self.__rotation_cache = {}
+ for name, place in self.places.items():
+
+ # Create intrinsic rotation matrix
+ R = self.__make_rotation_matrix(*place.rotation)
+
+ assert(self.__is_rotation_matrix(R))
+
+ # Store rotation matrix
+ self.__rotation_cache[name] = R
+
+ # Process axis-angle between place combination to speed up further calculations
+ self.__angle_cache = {}
+ for (A_name, A_place), (B_name, B_place) in itertools.combinations(self.places.items(), 2):
+
+ A = self.__rotation_cache[A_name]
+ B = self.__rotation_cache[B_name]
+
+ if numpy.array_equal(A, B):
+
+ print('A.all() == B.all()')
+ angle = 0.
+
+ else:
+
+ # Rotation matrix from A place to B place
+ AB = B.dot(A.T)
+
+ assert(self.__is_rotation_matrix(AB))
+
+ # Calculate axis-angle representation of AB rotation matrix
+ angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2))
+
+ try:
+ self.__angle_cache[A_name][B_name] = angle
+ except:
+ self.__angle_cache[A_name] = {B_name: angle}
+
+ try:
+ self.__angle_cache[B_name][A_name] = angle
+ except:
+ self.__angle_cache[B_name] = {A_name: angle}
+
+ # Process distance between each place combination to speed up further calculations
+ self.__distance_cache = {}
+ for (A_name, A_place), (B_name, B_place) in itertools.combinations(self.places.items(), 2):
+
+ A = self.__translation_cache[A_name]
+ B = self.__translation_cache[B_name]
+
+ # Calculate axis-angle representation of AB rotation matrix
+ distance = numpy.linalg.norm(B - A)
+
+ try:
+ self.__distance_cache[A_name][B_name] = distance
+ except:
+ self.__distance_cache[A_name] = {B_name: distance}
+
+ try:
+ self.__distance_cache[B_name][A_name] = distance
+ except:
+ self.__distance_cache[B_name] = {A_name: distance}
+
+ def print_cache(self):
+ """Print pre-processed data."""
+
+ print('\nIdentifier cache:')
+ for i, name in self.__identifier_cache.items():
+ print(f'- {i}: {name}')
+
+ print('\nTranslation cache:')
+ for name, item in self.__translation_cache.items():
+ print(f'- {name}: {item}')
+
+ print('\nRotation cache:')
+ for name, item in self.__rotation_cache.items():
+ print(f'- {name}:\n{item}')
+
+ print('\nAngle cache:')
+ for A_name, A_angle_cache in self.__angle_cache.items():
+ for B_name, angle in A_angle_cache.items():
+ print(f'- {A_name}/{B_name}: {angle:3f}')
+
+ print('\nDistance cache:')
+ for A_name, A_distance_cache in self.__distance_cache.items():
+ for B_name, distance in A_distance_cache.items():
+ print(f'- {A_name}/{B_name}: {distance:3f}')
+
+ def __make_rotation_matrix(self, x, y, z):
+
+ # Create rotation matrix around x axis
+ c = numpy.cos(numpy.deg2rad(x))
+ s = numpy.sin(numpy.deg2rad(x))
+ Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]])
+
+ # Create rotation matrix around y axis
+ c = numpy.cos(numpy.deg2rad(y))
+ s = numpy.sin(numpy.deg2rad(y))
+ Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]])
+
+ # Create rotation matrix around z axis
+ c = numpy.cos(numpy.deg2rad(z))
+ s = numpy.sin(numpy.deg2rad(z))
+ Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])
+
+ # Return intrinsic rotation matrix
+ return Rx.dot(Ry.dot(Rz))
+
+ def __is_rotation_matrix(self, R):
+ """Checks if a matrix is a valid rotation matrix."""
+
+ I = numpy.identity(3, dtype = R.dtype)
+ return numpy.linalg.norm(I - numpy.dot(R.T, R)) < 1e-6
+
+ def __normalise_place_pose(self, name, place, F):
+
+ # Transform place rotation into set rotation vector
+ R = self.__rotation_cache[name]
+ rvec, _ = cv.Rodrigues(F.dot(R))
+
+ #print(f'{name} rotation vector: {rvec[0][0]:3f} {rvec[1][0]:3f} {rvec[2][0]:3f}')
+
+ # Transform place translation into set translation vector
+ OF = place.translation
+ T = self.__translation_cache[name]
+ FC = R.dot(F.dot(-T))
+
+ tvec = OF + FC
+
+ #print(f'{name} translation vector: {tvec[0]:3f} {tvec[1]:3f} {tvec[2]:3f}')
+
+ return rvec, tvec
+
+ def estimate_pose(self, tracked_markers) -> Tuple[numpy.array, numpy.array, bool, int]:
+ """Estimate set pose from tracked markers (cf ArUcoTracker.track())
+
+ * **Returns:**
+ - translation vector
+ - rotation vector
+ - pose estimation success status
+ - the number of places used to estimate the pose as validity score
+ """
+
+ # Init pose data
+ self._translation = numpy.zeros(3)
+ self._rotation = numpy.zeros(3)
+ self._succeded = False
+ self._validity = 0
+
+ # Don't try to estimate pose if there is no tracked markers
+ if len(tracked_markers) == 0:
+
+ return self._translation, self._rotation, self._succeded, self._validity
+
+ # Look for places related to tracked markers
+ tracked_places = {}
+ for (marker_id, marker) in tracked_markers.items():
+
+ try:
+ name = self.__identifier_cache[marker_id]
+ tracked_places[name] = marker
+
+ except KeyError:
+ continue
+
+ #print('-------------- ArUcoSet pose estimation --------------')
+
+ # Pose validity checking is'nt possible when only one place of the set is tracked
+ if len(tracked_places.keys()) == 1:
+
+ # Get set pose from to the unique place pose
+ name, place = tracked_places.popitem()
+ F, _ = cv.Rodrigues(place.rotation)
+
+ self._rotation, self._translation = self.__normalise_place_pose(name, place, F)
+ self._succeded = True
+ self._validity = 1
+
+ #print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
+ #print(f'arset rotation vector: {self._rotation[0][0]:3f} {self._rotation[1][0]:3f} {self._rotation[2][0]:3f}')
+ #print(f'arset translation vector: {self._translation[0]:3f} {self._translation[1]:3f} {self._translation[2]:3f}')
+
+ # Pose validity checking processes places two by two
+ else:
+
+ valid_places = []
+ valid_rvecs = []
+ valid_tvecs = []
+
+ for (A_name, A_place), (B_name, B_place) in itertools.combinations(tracked_places.items(), 2):
+
+ #print(f'** {A_name} > {B_name}')
+
+ # Get place rotation estimation
+ # Use rotation matrix instead of rotation vector
+ A, _ = cv.Rodrigues(A_place.rotation)
+ B, _ = cv.Rodrigues(B_place.rotation)
+
+ # Rotation matrix from A place to B place
+ AB = B.dot(A.T)
+
+ assert(self.__is_rotation_matrix(AB))
+
+ # Calculate axis-angles representation of AB rotation matrix
+ angle = numpy.rad2deg(numpy.arccos((numpy.trace(AB) - 1) / 2))
+ expected_angle = self.__angle_cache[A_name][B_name]
+
+ #print('angle:', angle)
+ #print('expected angle:', expected_angle)
+
+ # Calculate distance between A place center and B place center
+ distance = numpy.linalg.norm(A_place.translation - B_place.translation)
+ expected_distance = self.__distance_cache[A_name][B_name]
+
+ #print('distance: ', distance)
+ #print('expected distance: ', expected_distance)
+
+ # Check angle and distance according given tolerance then normalise place pose
+ valid_angle = math.isclose(angle, expected_angle, abs_tol=self.angle_tolerance)
+ valid_distance = math.isclose(distance, expected_distance, abs_tol=self.distance_tolerance)
+
+ if valid_angle and valid_distance:
+
+ if A_name not in valid_places:
+
+ # Remember this place is already validated
+ valid_places.append(A_name)
+
+ rvec, tvec = self.__normalise_place_pose(A_name, A_place, A)
+
+ # Store normalised place pose
+ valid_rvecs.append(rvec)
+ valid_tvecs.append(tvec)
+
+ if B_name not in valid_places:
+
+ # Remember this place is already validated
+ valid_places.append(B_name)
+
+ rvec, tvec = self.__normalise_place_pose(B_name, B_place, B)
+
+ # Store normalised place pose
+ valid_rvecs.append(rvec)
+ valid_tvecs.append(tvec)
+
+ if len(valid_places) > 1:
+
+ # Consider arset rotation as the mean of all valid translations
+ # !!! WARNING !!! This is a bad hack : processing rotations average is a very complex problem that needs to well define the distance calculation method before.
+ self._rotation = numpy.mean(numpy.array(valid_rvecs), axis=0)
+
+ # Consider arset translation as the mean of all valid translations
+ self._translation = numpy.mean(numpy.array(valid_tvecs), axis=0)
+
+ #print(':::::::::::::::::::::::::::::::::::::::::::::::::::')
+ #print(f'arset rotation vector: {self._rotation[0][0]:3f} {self._rotation[1][0]:3f} {self._rotation[2][0]:3f}')
+ #print(f'arset translation vector: {self._translation[0]:3f} {self._translation[1]:3f} {self._translation[2]:3f}')
+
+ self._succeded = True
+ self._validity = len(valid_places)
+
+ #print('----------------------------------------------------')
+
+ return self._translation, self._rotation, self._succeded, self._validity
+
+ @property
+ def translation(self) -> numpy.array:
+ """Access to set translation vector.
+
+ .. warning::
+ Setting set translation vector implies succeded status to be True and validity score to be 0."""
+
+ return self._translation
+
+ @translation.setter
+ def translation(self, tvec):
+
+ self._translation = tvec
+ self._succeded = True
+ self._validity = 0
+
+ @property
+ def rotation(self) -> numpy.array:
+ """Access to set rotation vector.
+
+ .. warning::
+ Setting set rotation vector implies succeded status to be True and validity score to be 0."""
+
+ return self._translation
+
+ @rotation.setter
+ def rotation(self, rvec):
+
+ self._rotation = rvec
+ self._succeded = True
+ self._validity = 0
+
+ @property
+ def succeded(self) -> bool:
+ """Access to set pose estimation succeded status."""
+
+ return self._succeded
+
+ @property
+ def validity(self) -> int:
+ """Access to set pose estimation validity score."""
+
+ return self._validity
+
+ def draw(self, frame, K, D, draw_places=True):
+ """Draw set axis and places."""
+
+ l = self.marker_size / 2
+ ll = self.marker_size
+
+ # Select color according validity score
+ n = 95 * self._validity if self._validity < 2 else 0
+ f = 159 * self._validity if self._validity < 2 else 255
+
+ try:
+
+ # Draw axis
+ axisPoints = numpy.float32([[ll, 0, 0], [0, ll, 0], [0, 0, ll], [0, 0, 0]]).reshape(-1, 3)
+ axisPoints, _ = cv.projectPoints(axisPoints, self._rotation, self._translation, K, D)
+ axisPoints = axisPoints.astype(int)
+
+ cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (n,n,f), 5) # X (red)
+ cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (n,f,n), 5) # Y (green)
+ cv.line(frame, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (f,n,n), 5) # Z (blue)
+
+ # Draw places (optional)
+ if draw_places:
+
+ for name, place in self.places.items():
+
+ if name != "top":
+ continue
+
+ T = self.__translation_cache[name]
+ R = self.__rotation_cache[name]
+
+ placePoints = (T + numpy.float32([R.dot([-l, -l, 0]), R.dot([l, -l, 0]), R.dot([l, l, 0]), R.dot([-l, l, 0])])).reshape(-1, 3)
+ placePoints, _ = cv.projectPoints(placePoints, self._rotation, self._translation, K, D)
+ placePoints = placePoints.astype(int)
+
+ cv.line(frame, tuple(placePoints[0].ravel()), tuple(placePoints[1].ravel()), (f,f,f), 2)
+ cv.line(frame, tuple(placePoints[1].ravel()), tuple(placePoints[2].ravel()), (f,f,f), 2)
+ cv.line(frame, tuple(placePoints[2].ravel()), tuple(placePoints[3].ravel()), (f,f,f), 2)
+ cv.line(frame, tuple(placePoints[3].ravel()), tuple(placePoints[0].ravel()), (f,f,f), 2)
+
+ except Exception as e:
+
+ print(e)
+ print(self._translation)
+ print(self._rotation)
+ print(self._succeded)
+ print(self._validity)
+ print(axisPoints)
diff --git a/src/argaze/ArUcoMarkers/__init__.py b/src/argaze/ArUcoMarkers/__init__.py
index 3a74eeb..73d0851 100644
--- a/src/argaze/ArUcoMarkers/__init__.py
+++ b/src/argaze/ArUcoMarkers/__init__.py
@@ -2,4 +2,4 @@
.. include:: README.md
"""
__docformat__ = "restructuredtext"
-__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoCamera', 'ArUcoTracker', 'ArUcoCube'] \ No newline at end of file
+__all__ = ['ArUcoMarkersDictionary', 'ArUcoMarker', 'ArUcoBoard', 'ArUcoCamera', 'ArUcoTracker', 'ArUcoSet', 'ArUcoPlan', 'ArUcoCube', ] \ No newline at end of file
diff --git a/src/argaze/ArUcoMarkers/utils/aruco_cube.json b/src/argaze/ArUcoMarkers/utils/aruco_cube.json
new file mode 100644
index 0000000..a8df443
--- /dev/null
+++ b/src/argaze/ArUcoMarkers/utils/aruco_cube.json
@@ -0,0 +1,24 @@
+{
+ "dictionary": "DICT_APRILTAG_16h5",
+ "marker_size": 5.2,
+ "edge_size": 6,
+ "places": {
+ "front": {
+ "translation": [0, 0, 3],
+ "rotation": [0, 0, 0],
+ "marker": 2
+ },
+ "top": {
+ "translation": [0, 3, 0],
+ "rotation": [90, 0, 0],
+ "marker": 1
+ },
+ "left": {
+ "translation": [-3, 0, 0],
+ "rotation": [0, 0, 0],
+ "marker": 3
+ }
+ },
+ "angle_tolerance": 1,
+ "distance_tolerance": 0.5
+} \ No newline at end of file
diff --git a/src/argaze/ArUcoMarkers/utils/aruco_plan.json b/src/argaze/ArUcoMarkers/utils/aruco_plan.json
new file mode 100644
index 0000000..1ab05b2
--- /dev/null
+++ b/src/argaze/ArUcoMarkers/utils/aruco_plan.json
@@ -0,0 +1,30 @@
+{
+ "dictionary": "DICT_APRILTAG_16h5",
+ "marker_size": 5,
+ "width": 29.7,
+ "height": 21,
+ "places": {
+ "lower_left": {
+ "translation": [0, 0, 0],
+ "rotation": [0, 0, 0],
+ "marker": 2
+ },
+ "upper_left": {
+ "translation": [0, 21, 0],
+ "rotation": [0, 0, 0],
+ "marker": 3
+ },
+ "upper_right": {
+ "translation": [29.7, 21, 0],
+ "rotation": [0, 0, 0],
+ "marker": 4
+ },
+ "lower_right": {
+ "translation": [29.7, 0, 0],
+ "rotation": [0, 0, 0],
+ "marker": 5
+ }
+ },
+ "angle_tolerance": 5.0,
+ "distance_tolerance": 1.0
+} \ No newline at end of file
diff --git a/src/argaze/utils/tobii_stream_arcube_display.py b/src/argaze/utils/tobii_stream_aruco_cube_display.py
index 9f4a196..e310308 100644
--- a/src/argaze/utils/tobii_stream_arcube_display.py
+++ b/src/argaze/utils/tobii_stream_aruco_cube_display.py
@@ -224,12 +224,13 @@ def main():
# Track markers with pose estimation
aruco_tracker.track(video_frame.matrix)
+ aruco_tracker.draw_tracked_markers(visu_frame.matrix)
# Estimate cube pose from tracked markers
tvec, rvec, success, validity = aruco_cube.estimate_pose(aruco_tracker.tracked_markers)
# Cube pose estimation succeed and is validated by 2 faces at least
- if success and validity >= 2:
+ if success and validity >= 1:
# Lock tobii imu updates
tobii_imu_lock.acquire()
@@ -306,7 +307,7 @@ def main():
#aruco_tracker.draw_tracked_markers(visu_frame.matrix)
# Draw cube pose estimation (without camera distorsion)
- aruco_cube.draw(visu_frame.matrix, aruco_camera.K, aruco_camera.D, draw_faces=False)
+ aruco_cube.draw(visu_frame.matrix, aruco_camera.K, aruco_camera.D, draw_places=True)
# Warn about cube pose validity
if not aruco_cube_validity:
diff --git a/src/argaze/utils/tobii_stream_aruco_plan_display.py b/src/argaze/utils/tobii_stream_aruco_plan_display.py
new file mode 100644
index 0000000..16fc8ef
--- /dev/null
+++ b/src/argaze/utils/tobii_stream_aruco_plan_display.py
@@ -0,0 +1,360 @@
+#!/usr/bin/env python
+
+import argparse
+import os, json
+import math
+import threading
+
+from argaze import DataStructures
+from argaze import GazeFeatures
+from argaze.TobiiGlassesPro2 import *
+from argaze.ArUcoMarkers import *
+from argaze.AreaOfInterest import *
+from argaze.utils import MiscFeatures
+
+import cv2 as cv
+import numpy
+
+def make_rotation_matrix(x, y, z):
+
+ # Create rotation matrix around x axis
+ c = numpy.cos(numpy.deg2rad(x))
+ s = numpy.sin(numpy.deg2rad(x))
+ Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]])
+
+ # Create rotation matrix around y axis
+ c = numpy.cos(numpy.deg2rad(y))
+ s = numpy.sin(numpy.deg2rad(y))
+ Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]])
+
+ # Create rotation matrix around z axis
+ c = numpy.cos(numpy.deg2rad(z))
+ s = numpy.sin(numpy.deg2rad(z))
+ Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])
+
+ # Return intrinsic rotation matrix
+ return Rx.dot(Ry.dot(Rz))
+
+def main():
+ """
+ Track ArUcoPlan into Tobii Glasses Pro 2 camera video stream.
+ """
+
+ # Manage arguments
+ parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
+ parser.add_argument('-t', '--tobii_ip', metavar='TOBII_IP', type=str, default=None, help='tobii glasses ip')
+ parser.add_argument('-c', '--camera_calibration', metavar='CAM_CALIB', type=str, default=None, help='json camera calibration filepath')
+ parser.add_argument('-p', '--aruco_tracker_configuration', metavar='TRACK_CONFIG', type=str, default=None, help='json aruco tracker configuration filepath')
+ parser.add_argument('-i', '--imu_calibration', metavar='IMU_CALIB', type=str, default=None, help='json imu calibration filepath')
+ parser.add_argument('-ac', '--aruco_plan', metavar='ARUCO_PLAN', type=str, help='json aruco plan description filepath')
+ parser.add_argument('-s', '--aoi_scene', metavar='AOI_SCENE', type=str, help='obj aoi 3D scene description filepath')
+ parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction)
+ args = parser.parse_args()
+
+ # Create tobii controller (with auto discovery network process if no ip argument is provided)
+ print('\nLooking for a Tobii Glasses Pro 2 device ...')
+
+ try:
+
+ tobii_controller = TobiiController.TobiiController(args.tobii_ip)
+ print(f'Tobii Glasses Pro 2 device found at {tobii_controller.address} address.')
+
+ except ConnectionError as e:
+
+ print(e)
+ exit()
+
+ # Setup camera at 25 fps to work on Full HD video stream
+ tobii_controller.set_scene_camera_freq_25()
+
+ # Print current confirugration
+ print(f'Tobii Glasses Pro 2 configuration:')
+ for key, value in tobii_controller.get_configuration().items():
+ print(f'\t{key}: {value}')
+
+ # Enable tobii data stream
+ tobii_data_stream = tobii_controller.enable_data_stream()
+
+ # Enable tobii video stream
+ tobii_video_stream = tobii_controller.enable_video_stream()
+
+ # Load aruco plan description
+ aruco_plan = ArUcoPlan.ArUcoPlan(args.aruco_plan)
+ aruco_plan.print_cache()
+
+ # Load AOI 3D scene centered onto aruco plan
+ aoi3D_scene = AOI3DScene.AOI3DScene()
+ aoi3D_scene.load(args.aoi_scene)
+
+ print(f'\nAOI in {os.path.basename(args.aoi_scene)} scene related to ArPlan:')
+ for aoi in aoi3D_scene.keys():
+ print(f'\t{aoi}')
+
+ # Create aruco camera
+ aruco_camera = ArUcoCamera.ArUcoCamera()
+
+ # Load calibration file
+ if args.camera_calibration != None:
+
+ aruco_camera.load_calibration_file(args.camera_calibration)
+
+ else:
+
+ raise UserWarning('.json camera calibration filepath required. Use -c option.')
+
+ # Create aruco tracker
+ aruco_tracker = ArUcoTracker.ArUcoTracker(aruco_plan.dictionary, aruco_plan.marker_size, aruco_camera)
+
+ # Load specific configuration file
+ if args.aruco_tracker_configuration != None:
+
+ aruco_tracker.load_configuration_file(args.aruco_tracker_configuration)
+
+ print(f'\nArUcoTracker configuration for markers detection:')
+ aruco_tracker.print_configuration()
+
+ # Create tobii imu handler to track head pose changes when arcuco plan pose can't be estimated
+ # So, the resulting head pose is relative to last pose estimation
+ tobii_imu = TobiiInertialMeasureUnit.TobiiInertialMeasureUnit()
+
+ # Load optional imu calibration file
+ if args.imu_calibration != None:
+
+ tobii_imu.load_calibration_file(args.imu_calibration)
+
+ # Init tobii imu lock
+ tobii_imu_lock = threading.Lock()
+
+ # TEST : DIFF ACC
+ last_accl = numpy.zeros(3)
+
+ # Init data timestamped in millisecond
+ data_ts_ms = 0
+
+ # Assess temporal performance
+ loop_chrono = MiscFeatures.TimeProbe()
+ loop_ps = 0
+
+ def data_stream_callback(data_ts, data_object, data_object_type):
+
+ nonlocal tobii_imu
+ nonlocal tobii_imu_lock
+ nonlocal data_ts_ms
+
+ #TEST
+ nonlocal last_accl
+
+ data_ts_ms = data_ts / 1e3
+
+ # Don't update imu when it is used
+ if tobii_imu_lock.locked():
+ return
+
+ # Lock tobii imu updates
+ tobii_imu_lock.acquire()
+
+ match data_object_type:
+
+ case 'Gyroscope':
+
+ data_object = tobii_imu.apply_gyroscope_offset(data_object)
+
+ tobii_imu.update_rotation(data_ts, data_object)
+
+
+ case 'Accelerometer':
+ pass
+ '''
+ print('raw accelerometer(m/s2)=', data_object.value)
+
+ # TEST :
+ diff_accl = last_accl - numpy.array(data_object.value)
+ last_accl = numpy.array(data_object.value)
+ print('\tdiff(cm/s2)=', 100 * numpy.linalg.norm(diff_accl))
+
+ # TEST : ignore acceleration double
+ if numpy.linalg.norm(diff_accl) > 0.:
+
+ data_object = tobii_imu.apply_accelerometer_coefficients(data_object)
+
+ print('corrected accelerometer(m/s2)=', data_object.value)
+
+ print('current plumb=', tobii_imu.get_plumb())
+
+ data_object = tobii_imu.apply_plumb(data_object)
+
+ print('corrected accelerometer - gravity(m/s2)=', data_object.value)
+ print('\tnorm(cm/s2)=', 100 * numpy.linalg.norm(data_object.value))
+
+ tobii_imu.update_translation(data_ts, data_object)
+ '''
+ # Unlock tobii imu updates
+ tobii_imu_lock.release()
+
+ tobii_data_stream.reading_callback = data_stream_callback
+
+ # Start streaming
+ tobii_controller.start_streaming()
+
+ # Live video stream capture loop
+ try:
+
+ # Assess loop performance
+ loop_chrono = MiscFeatures.TimeProbe()
+ fps = 0
+
+ # Track aruco plan pose
+ aruco_plan_tvec = numpy.zeros(3)
+ aruco_plan_rvec = numpy.zeros(3)
+ aruco_plan_success = False
+ aruco_plan_validity = False
+ aruco_plan_ts_ms = 0
+
+ while tobii_video_stream.is_alive():
+
+ # Read video stream
+ video_ts, video_frame = tobii_video_stream.read()
+ video_ts_ms = video_ts / 1e3
+
+ # Copy video frame to edit visualisation on it without disrupting aruco tracking
+ visu_frame = video_frame.copy()
+
+ # Process video and data frame
+ try:
+
+ # Track markers with pose estimation
+ aruco_tracker.track(video_frame.matrix)
+ #aruco_tracker.draw_tracked_markers(visu_frame.matrix)
+
+ # Estimate plan pose from tracked markers
+ tvec, rvec, success, validity = aruco_plan.estimate_pose(aruco_tracker.tracked_markers)
+
+ # Plan pose estimation succeed and is validated by 1 faces at least
+ if success and validity >= 1:
+
+ # Lock tobii imu updates
+ tobii_imu_lock.acquire()
+
+ # Reset head rotation, translation and translation speed (cm/s)
+ # Note : head translation speed is also estimated thanks to accelerometer sensor (see upward)
+ tobii_imu.reset_rotation()
+ #tobii_imu.reset_translation(translation_speed = (tvec - aruco_plan_tvec) / (video_ts_ms - aruco_plan_ts_ms))
+
+ # Create a rotation matrix to transform plan rotation from camera referential to imu referential
+ F = make_rotation_matrix(*TobiiInertialMeasureUnit.CAMERA_TO_IMU_ROTATION_VECTOR)
+ R, _ = cv.Rodrigues(rvec)
+ rvec_flipped, _ = cv.Rodrigues(F.dot(R))
+
+ # Update head plumb orientation with flipped plan orientation
+ tobii_imu.rotate_plumb(rvec_flipped)
+
+ # Unlock tobii imu updates
+ tobii_imu_lock.release()
+
+ # Store plan pose
+ aruco_plan_tvec = tvec
+ aruco_plan_rvec = rvec
+ aruco_plan_success = success
+ aruco_plan_validity = validity
+ aruco_plan_ts_ms = video_ts_ms
+
+ # Plan pose estimation fails
+ elif aruco_plan_success:
+
+ # Use tobii glasses inertial sensors to estimate plan pose from last estimated pose
+
+ # Translate plan into imu referential
+ imu_tvec = aruco_plan_tvec + numpy.array(TobiiInertialMeasureUnit.CAMERA_TO_IMU_TRANSLATION_VECTOR)
+
+ # Translate plan according head translation
+ imu_tvec = imu_tvec + tobii_imu.translation
+
+ # Rotate plan around imu origin according head rotation
+ imu_rvec = tobii_imu.rotation * numpy.array([-1., -1., 1.])
+ imu_R = make_rotation_matrix(*imu_rvec)
+ new_imu_tvec = imu_tvec.dot(imu_R)
+
+ # Translate back plan into camera referential
+ new_tvec = new_imu_tvec - numpy.array(TobiiInertialMeasureUnit.CAMERA_TO_IMU_TRANSLATION_VECTOR)
+
+ # Rotate plan orientation (supposing plan top is up in )
+ imu_rvec = tobii_imu.rotation * numpy.array([1., -1., 1.])
+ imu_R = make_rotation_matrix(*imu_rvec)
+
+ C, _ = cv.Rodrigues(aruco_plan_rvec)
+ C = C.dot(imu_R)
+ new_rvec, _ = cv.Rodrigues(C)
+ #new_rvec = aruco_plan_rvec
+
+ # Set plan pose estimation
+ aruco_plan.translation = new_tvec
+ aruco_plan.rotation = new_rvec
+
+ else:
+
+ raise UserWarning('Plan pose estimation fails.')
+
+ # Project AOI 3 scene onto camera frame
+
+ # DON'T APPLY CAMERA DISTORSION : it projects points which are far from the frame into it
+ # This hack isn't realistic but as the gaze will mainly focus on centered AOI, where the distorsion is low, it is acceptable.
+ aoi2D_scene = aoi3D_scene.project(aruco_plan.translation, aruco_plan.rotation, aruco_camera.K)
+
+ # Draw projected scene
+ #aoi2D_scene.draw(visu_frame.matrix)
+
+ # Draw markers pose estimation
+ #aruco_tracker.draw_tracked_markers(visu_frame.matrix)
+
+ # Draw plan pose estimation (without camera distorsion)
+ aruco_plan.draw(visu_frame.matrix, aruco_camera.K, ArUcoCamera.D0, draw_places=True)
+
+ # Warn about plan pose validity
+ if not aruco_plan_validity:
+
+ raise UserWarning('Plan pose estimation is not validated.')
+
+ # Write warning
+ except UserWarning as w:
+
+ cv.rectangle(visu_frame.matrix, (0, 100), (600, 150), (127, 127, 127), -1)
+ cv.putText(visu_frame.matrix, str(w), (20, 140), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
+
+ # Assess loop performance
+ lap_time, lap_counter, elapsed_time = loop_chrono.lap()
+
+ # Update fps each 10 loops
+ if lap_counter >= 10:
+
+ loop_ps = 1e3 * lap_counter / elapsed_time
+ loop_chrono.restart()
+
+ # Draw center
+ cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1)
+ cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1)
+
+ # Write stream timing
+ cv.rectangle(visu_frame.matrix, (0, 0), (1100, 50), (63, 63, 63), -1)
+ cv.putText(visu_frame.matrix, f'Data stream time: {int(data_ts_ms)} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'Video delay: {int(data_ts_ms - video_ts_ms)} ms', (550, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+ cv.putText(visu_frame.matrix, f'Fps: {int(loop_ps)}', (950, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+
+ cv.imshow(f'Stream ArUcoPlan', visu_frame.matrix)
+
+ # Close window using 'Esc' key
+ if cv.waitKey(1) == 27:
+ break
+
+ # Exit on 'ctrl+C' interruption
+ except KeyboardInterrupt:
+ pass
+
+ # Stop frame display
+ cv.destroyAllWindows()
+
+ # Stop streaming
+ tobii_controller.stop_streaming()
+
+if __name__ == '__main__':
+
+ main() \ No newline at end of file