""" """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple import json import os from collections import Counter import time from argaze import DataFeatures from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoOpticCalibrator import numpy import cv2 as cv from cv2 import aruco ArUcoMarkerDictionaryType = TypeVar('ArUcoMarkerDictionary', bound="ArUcoMarkerDictionary") # Type definition for type annotation convenience ArUcoMarkerType = TypeVar('ArUcoMarker', bound="ArUcoMarker") # Type definition for type annotation convenience OpticParametersType = TypeVar('OpticParameters', bound="OpticParameters") # Type definition for type annotation convenience DetectorParametersType = TypeVar('DetectorParameters', bound="DetectorParameters") # Type definition for type annotation convenience ArUcoDetectorType = TypeVar('ArUcoDetector', bound="ArUcoDetector") # Type definition for type annotation convenience class DetectorParameters(): """Wrapper class around ArUco marker detector parameters. !!! note More details on [opencv page](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html) """ __parameters = aruco.DetectorParameters() __parameters_names = [ 'adaptiveThreshConstant', 'adaptiveThreshWinSizeMax', 'adaptiveThreshWinSizeMin', 'adaptiveThreshWinSizeStep', 'aprilTagCriticalRad', 'aprilTagDeglitch', 'aprilTagMaxLineFitMse', 'aprilTagMaxNmaxima', 'aprilTagMinClusterPixels', 'aprilTagMinWhiteBlackDiff', 'aprilTagQuadDecimate', 'aprilTagQuadSigma', 'cornerRefinementMaxIterations', 'cornerRefinementMethod', 'cornerRefinementMinAccuracy', 'cornerRefinementWinSize', 'markerBorderBits', 'minMarkerPerimeterRate', 'maxMarkerPerimeterRate', 'minMarkerDistanceRate', 'detectInvertedMarker', 'errorCorrectionRate', 'maxErroneousBitsInBorderRate', 'minCornerDistanceRate', 'minDistanceToBorder', 'minOtsuStdDev', 'perspectiveRemoveIgnoredMarginPerCell', 'perspectiveRemovePixelPerCell', 'polygonalApproxAccuracyRate', 'useAruco3Detection' ] def __init__(self, **kwargs): for parameter, value in kwargs.items(): setattr(self.__parameters, parameter, value) self.__dict__.update(kwargs) def __setattr__(self, parameter, value): setattr(self.__parameters, parameter, value) def __getattr__(self, parameter): return getattr(self.__parameters, parameter) @classmethod def from_json(self, json_filepath) -> DetectorParametersType: """Load detector parameters from .json file.""" with open(json_filepath) as configuration_file: return DetectorParameters(**json.load(configuration_file)) def __str__(self) -> str: """Detector parameters string representation.""" return f'{self}' def __format__(self, spec: str) -> str: """Formated detector parameters string representation. Parameters: spec: 'modified' to get only modified parameters. """ output = '' for parameter in self.__parameters_names: if parameter in self.__dict__.keys(): output += f'\t*{parameter}: {getattr(self.__parameters, parameter)}\n' elif spec == "": output += f'\t{parameter}: {getattr(self.__parameters, parameter)}\n' return output @property def internal(self): return self.__parameters class ArUcoDetector(DataFeatures.PipelineStepObject): """OpenCV ArUco library wrapper.""" def __init__(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary = None, optic_parameters: ArUcoOpticCalibrator.OpticParameters = None, parameters: DetectorParameters = None, **kwargs): """Initialize ArUcoDetector. Parameters: dictionary: ArUco markers dictionary to detect. optic_parameters: Optic parameters to use for ArUco detection into image. parameters: ArUco detector parameters. """ # Init parent class super().__init__(**kwargs) # Init private attributes self.__dictionary = dictionary self.__optic_parameters = optic_parameters self.__parameters = parameters # Init detected markers data self.__detected_markers = {} # Init detected board data self.__board = None self.__board_corners_number = 0 self.__board_corners = [] self.__board_corners_ids = [] @property def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary: """Get aruco detector's dictionary object.""" return self.__dictionary @property def optic_parameters(self) -> ArUcoOpticCalibrator.OpticParameters: """Get aruco detector's opetic parameters object.""" return self.__optic_parameters @optic_parameters.setter def optic_parameters(self, value: ArUcoOpticCalibrator.OpticParameters): """Set aruco detector's opetic parameters object.""" self.__optic_parameters = value @property def parameters(self) -> DetectorParameters: """Get aruco detector's parameters object.""" return self.__parameters @parameters.setter def parameters(self, value: DetectorParameters): """Set aruco detector's parameters object.""" self.__parameters = value @classmethod def from_dict(cls, aruco_detector_data: dict, working_directory: str) -> ArUcoDetectorType: """Load ArUcoDetector attributes from dictionary. Parameters: aruco_detector_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load ArUco dictionary dictionary_value = aruco_detector_data.pop('dictionary') # str: dictionary name if type(dictionary_value) == str: new_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(dictionary_value) # dict: else: new_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(**dictionary_value) # Load optic parameters try: optic_parameters_value = aruco_detector_data.pop('optic_parameters') # str: relative path to .json file if type(optic_parameters_value) == str: optic_parameters_value = os.path.join(working_directory, optic_parameters_value) new_optic_parameters = ArUcoOpticCalibrator.OpticParameters.from_json(optic_parameters_value) # dict: else: new_optic_parameters = ArUcoOpticCalibrator.OpticParameters(**optic_parameters_value) except KeyError: new_optic_parameters = None # Load ArUco detector parameters try: # Check detector parameters value type parameters_value = aruco_detector_data.pop('parameters') # str: relative path to .json file if type(parameters_value) == str: parameters_value = os.path.join(working_directory, parameters_value) new_parameters = DetectorParameters.from_json(parameters_value) # dict: else: new_parameters = DetectorParameters(**parameters_value) except KeyError: new_parameters = DetectorParameters() # Load temporary pipeline step object from aruco_detector_data then export it as dict temp_pipeline_step_object_data = DataFeatures.PipelineStepObject.from_dict(aruco_detector_data, working_directory).as_dict() # Create aruco detector return ArUcoDetector( \ new_dictionary, \ new_optic_parameters, \ new_parameters, \ **temp_pipeline_step_object_data \ ) @DataFeatures.PipelineStepMethod def detect_markers(self, image: numpy.array): """Detect all ArUco markers into an image. !!! danger "DON'T MIRROR IMAGE" It makes the markers detection to fail. !!! danger "DON'T UNDISTORED IMAGE" Camera intrisic parameters and distorsion coefficients are used later during pose estimation. """ # Reset detected markers data self.__detected_markers, detected_markers_corners, detected_markers_ids = {}, [], [] # Detect markers into gray picture detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(cv.cvtColor(image, cv.COLOR_BGR2GRAY), self.__dictionary.markers, parameters = self.__parameters.internal) # Is there detected markers ? if len(detected_markers_corners) > 0: # Transform markers ids array into list detected_markers_ids = detected_markers_ids.T[0] for i, marker_id in enumerate(detected_markers_ids): marker = ArUcoMarker.ArUcoMarker(self.__dictionary, marker_id) marker.corners = detected_markers_corners[i][0] # No pose estimation: call estimate_markers_pose to get one marker.translation = numpy.empty([0]) marker.rotation = numpy.empty([0]) marker.points = numpy.empty([0]) self.__detected_markers[marker_id] = marker def estimate_markers_pose(self, size: float, ids: list = []): """Estimate pose detected markers pose considering a marker size. Parameters: size: size of markers in centimeters. ids: markers id list to select detected markers. """ # Is there detected markers ? if len(self.__detected_markers) > 0: # Select all markers by default if len(ids) == 0: ids = self.__detected_markers.keys() # Prepare data for aruco.estimatePoseSingleMarkers function selected_markers_corners = tuple() selected_markers_ids = [] for marker_id, marker in self.__detected_markers.items(): if marker_id in ids: selected_markers_corners += (marker.corners,) selected_markers_ids.append(marker_id) # Estimate pose of selected markers if len(selected_markers_corners) > 0: markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(selected_markers_corners, size, numpy.array(self.__optic_parameters.K), numpy.array(self.__optic_parameters.D)) for i, marker_id in enumerate(selected_markers_ids): marker = self.__detected_markers[marker_id] marker.translation = markers_tvecs[i][0] marker.rotation, _ = cv.Rodrigues(markers_rvecs[i][0]) marker.size = size marker.points = markers_points.reshape(4, 3).dot(marker.rotation) - marker.translation def detected_markers(self) -> dict[ArUcoMarkerType]: """Access to detected markers dictionary.""" return self.__detected_markers def detected_markers_number(self) -> int: """Return detected markers number.""" return len(list(self.__detected_markers.keys())) def draw_detected_markers(self, image: numpy.array, draw_marker: dict = None): """Draw detected markers. Parameters: image: image where to draw draw_marker: ArucoMarker.draw parameters (if None, no marker drawn) """ if draw_marker is not None: for marker_id, marker in self.__detected_markers.items(): marker.draw(image, self.__optic_parameters.K, self.__optic_parameters.D, **draw_marker) def detect_board(self, image: numpy.array, board, expected_markers_number): """Detect ArUco markers board in image setting up the number of detected markers needed to agree detection. !!! danger "DON'T MIRROR IMAGE" It makes the markers detection to fail. """ # detect markers from gray picture gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY) detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers, parameters = self.__parameters.internal) # if all board markers are detected if len(detected_markers_corners) == expected_markers_number: self.__board = board self.__board_corners_number, self.__board_corners, self.__board_corners_ids = aruco.interpolateCornersCharuco(detected_markers_corners, detected_markers_ids, gray, self.__board.model) else: self.__board = None self.__board_corners_number = 0 self.__board_corners = [] self.__board_corners_ids = [] def draw_board(self, image: numpy.array): """Draw detected board corners in image.""" if self.__board != None: cv.drawChessboardCorners(image, ((self.__board.size[0] - 1 ), (self.__board.size[1] - 1)), self.__board_corners, True) def board_corners_number(self) -> int: """Get detected board corners number.""" return self.__board_corners_number def board_corners_identifier(self) -> list[int]: """Get detected board corners identifier.""" return self.__board_corners_ids def board_corners(self) -> list: """Get detected board corners.""" return self.__board_corners class Observer(DataFeatures.PipelineStepObserver): """Define ArUcoDetector observer to count how many times detection succeeded and how many times markers are detected.""" def __init__(self): """Initialize marker detection metrics.""" self.__try_count = 0 self.__success_count = 0 self.__detected_ids = [] @property def metrics(self) -> Tuple[int, dict]: """Get marker detection metrics. Returns: number of detect function call dict with number of detection for each marker identifier """ return self.__try_count, self.__success_count, Counter(self.__detected_ids) def reset(self): """Reset marker detection metrics.""" self.__try_count = 0 self.__success_count = 0 self.__detected_ids = [] def on_detect_markers(self, timestamp, aruco_detector, exception): """Update ArUco markers detection metrics.""" self.__try_count += 1 detected_markers_list = list(aruco_detector.detected_markers().keys()) if len(detected_markers_list): self.__success_count += 1 self.__detected_ids.extend(detected_markers_list)