#!/usr/bin/env python """ """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple from dataclasses import dataclass, field import json import os import math from argaze import DataFeatures import cv2 import matplotlib.path as mpath import numpy from shapely.geometry import Polygon from shapely.geometry.point import Point from colorama import Style, Fore AreaOfInterestType = TypeVar('AreaOfInterest', bound="AreaOfInterest") # Type definition for type annotation convenience class AreaOfInterest(numpy.ndarray): """Define Area Of Interest as an array of points of any dimension.""" def __new__(cls, points: numpy.array = numpy.empty(0)) -> AreaOfInterestType: """View casting inheritance.""" return numpy.array(points).view(AreaOfInterest) def __repr__(self): """String representation""" return repr(self.tolist()) def __str__(self): """String display""" return repr(self.tolist()) @classmethod def from_dict(cls, aoi_data: dict, working_directory: str = None) -> AreaOfInterestType: """Load attributes from dictionary. Parameters: aoi_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Get first and unique shape # TODO: allow multiple shapes to describe more complex AOI shape, shape_data = aoi_data.popitem() if shape == 'Rectangle': x = shape_data.pop('x') y = shape_data.pop('y') width = shape_data.pop('width') height = shape_data.pop('height') points = [[x, y], [x+width, y], [x+width, y+height], [x, y+height]] return AreaOfInterest(points) elif shape == 'Circle': cx = shape_data.pop('cx') cy = shape_data.pop('cy') radius = shape_data.pop('radius') # TODO: Use pygeos N = 32 points = [(math.cos(2*math.pi / N*x) * radius + cx, math.sin(2*math.pi / N*x) * radius + cy) for x in range(0, N+1)] return AreaOfInterest(points) elif shape == 'Ellipse': cx = shape_data.pop('cx') cy = shape_data.pop('cy') rx = shape_data.pop('rx') ry = shape_data.pop('ry') # TODO: Use pygeos N = 32 points = [(math.cos(2*math.pi / N*x) * rx + cx, math.sin(2*math.pi / N*x) * ry + cy) for x in range(0, N+1)] @property def dimension(self) -> int: """Number of axis coding area points positions.""" return self.shape[1] @property def points_number(self) -> int: """Number of points defining the area.""" return self.shape[0] @property def empty(self) -> bool: """Is AOI empty ?""" return self.shape[0] == 0 @property def bounds(self) -> numpy.array: """Get area's bounds.""" min_bounds = numpy.min(self, axis=0) max_bounds = numpy.max(self, axis=0) return numpy.array([min_bounds, max_bounds]) @property def center(self) -> numpy.array: """Center of mass.""" return self.mean(axis=0) @property def size(self) -> numpy.array: """Get scene size.""" min_bounds, max_bounds = self.bounds return max_bounds - min_bounds @property def area(self) -> float: """Area of the polygon defined by aoi's points.""" return Polygon(self).area @property def bounding_box(self) -> numpy.array: """Get area's bounding box. !!! warning Available for 2D AOI only.""" assert(self.points_number > 1) assert(self.dimension == 2) min_x, min_y = numpy.min(self, axis=0) max_x, max_y = numpy.max(self, axis=0) return numpy.array([(min_x, min_y), (max_x, min_y), (max_x, max_y), (min_x, max_y)]) def clockwise(self) -> AreaOfInterestType: """Get area points in clockwise order. !!! warning Available for 2D AOI only.""" assert(self.dimension == 2) O = self.center OP = (self - O) / numpy.linalg.norm(self - O) angles = numpy.arctan2(OP[:, 1], OP[:, 0]) return self[numpy.argsort(angles)] def contains_point(self, point: tuple) -> bool: """Is a point inside area? !!! warning Available for 2D AOI only. !!! danger The AOI points must be sorted in clockwise order.""" assert(self.dimension == 2) assert(len(point) == self.dimension) return mpath.Path(self).contains_points([point])[0] def inner_axis(self, x: float, y: float) -> tuple: """Transform a point coordinates from global axis to AOI axis. !!! warning Available for 2D AOI only. !!! danger The AOI points must be sorted in clockwise order.""" assert(self.dimension == 2) Src = self Src_origin = Src[0] Src = (Src - Src_origin).reshape((len(Src)), 2).astype(numpy.float32) Dst = numpy.array([[0., 0.], [1., 0.], [1., 1.], [0., 1.]]).astype(numpy.float32) P = cv2.getPerspectiveTransform(Src, Dst) X = numpy.append(numpy.array(numpy.array([x, y]) - Src_origin), [1.0]).astype(numpy.float32) Y = numpy.dot(P, X) La = (Y/Y[2])[:-1] return tuple(numpy.around(La, 4)) def outter_axis(self, x: float, y: float) -> tuple: """Transform a point coordinates from AOI axis to global axis. !!! danger The AOI points must be sorted in clockwise order. !!! danger The AOI must be a rectangle.""" # Origin point O = self[0] # Horizontal axis vector H = self[1] - self[0] # Vertical axis vector V = self[3] - self[0] return tuple(O + x * H + y * V) def circle_intersection(self, center: tuple, radius: float) -> Tuple[numpy.array, float, float]: """Get intersection shape with a circle, intersection area / AOI area ratio and intersection area / circle area ratio. !!! warning Available for 2D AOI only. Returns: intersection shape intersection aoi ratio intersection circle ratio """ assert(self.dimension == 2) self_polygon = Polygon(self) args_circle = Point(center).buffer(radius) if self_polygon.intersects(args_circle): intersection = self_polygon.intersection(args_circle) intersection_array = numpy.array([list(xy) for xy in intersection.exterior.coords[:]]).astype(numpy.float32).view(AreaOfInterest) return intersection_array, intersection.area / self_polygon.area, intersection.area / args_circle.area else: empty_array = numpy.array([list([])]).astype(numpy.float32).view(AreaOfInterest) return empty_array, 0., 0. def draw(self, image: numpy.array, color, border_size=1): """Draw 2D AOI into image. !!! warning Available for 2D AOI only.""" assert(self.dimension == 2) if len(self) > 1: # Draw form pixels = numpy.rint(self).astype(int) cv2.line(image, pixels[-1], pixels[0], color, border_size) for A, B in zip(pixels, pixels[1:]): cv2.line(image, A, B, color, border_size) # Draw center center_pixel = numpy.rint(self.center).astype(int) cv2.circle(image, center_pixel, 1, color, -1) AOISceneType = TypeVar('AOIScene', bound="AOIScene") # Type definition for type annotation convenience class AOIScene(): """Define AOI scene as a dictionary of AOI.""" def __init__(self, dimension: int, areas: dict = None): """Initialisation.""" assert(dimension > 0) super().__init__() self.__dimension = dimension self.__areas = {} # NEVER USE {} as default function argument if areas is not None: for name, area in areas.items(): self[name] = AreaOfInterest(area) @classmethod def from_dict(cls, aoi_scene_data: dict, working_directory: str = None) -> AOISceneType: """Load attributes from dictionary. Parameters: aoi_scene_data: dictionary with attributes to load working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load areas areas = {} for area_name, area_data in aoi_scene_data.items(): if type(area_data) == list: areas[area_name] = AreaOfInterest(area_data) elif type(area_data) == dict: areas[area_name] = AreaOfInterest.from_dict(area_data) # Default dimension is 0 dimension = 0 # Guess dimension from first area dimension (default: 2) if len(areas) > 0: dimension = list(areas.values())[0].dimension return AOIScene(dimension = dimension, areas = areas) @classmethod def from_json(self, json_filepath: str) -> AOISceneType: """ Load attributes from .json file. Parameters: json_filepath: path to json file """ with open(json_filepath) as configuration_file: aoi_scene_data = json.load(configuration_file) working_directory = os.path.dirname(json_filepath) return AOIScene.from_dict(aoi_scene_data, working_directory) def __getitem__(self, name) -> AreaOfInterest: """Get an AOI from the scene.""" return AreaOfInterest(self.__areas[name]) def __setitem__(self, name, aoi: AreaOfInterest): """Add an AOI to the scene.""" assert(aoi.dimension == self.__dimension) self.__areas[name] = AreaOfInterest(aoi) # Expose area as an attribute of the class setattr(self, name, self.__areas[name]) def __delitem__(self, key): """Remove an AOI from the scene.""" del self.__areas[key] # Stop area exposition as an attribute of the class delattr(self, key) def __or__(self, other): """Merge another scene using | operator.""" assert(other.dimension == self.__dimension) merged_areas = dict(self.__areas) merged_areas.update(other.__areas) return AOIScene(self.dimension, merged_areas) def __ror__(self, other): """Merge another scene using | operator.""" assert(other.dimension == self.__dimension) merged_areas = dict(other.__areas) merged_areas.update(self.__areas) return AOIScene(self.dimension, merged_areas) def __ior__(self, other): """Merge scene with another scene in-place using |= operator.""" assert(other.dimension == self.__dimension) self.__areas.update(other.__areas) self.__dict__.update(other.__areas) return self def __len__(self): """Get number of AOI into scene.""" return len(self.__areas) def __repr__(self): """String representation""" return str(self.__areas) def __add__(self, add_vector) -> AOISceneType: """Add vector to scene.""" assert(len(add_vector) == self.__dimension) for name, area in self.__areas.items(): self.__areas[name] = self.__areas[name] + add_vector return self # Allow n + scene operation __radd__ = __add__ def __sub__(self, sub_vector) -> AOISceneType: """Sub vector to scene.""" assert(len(sub_vector) == self.__dimension) for name, area in self.__areas.items(): self.__areas[name] = self.__areas[name] - sub_vector return self def __rsub__(self, rsub_vector) -> AOISceneType: """RSub vector to scene.""" assert(len(rsub_vector) == self.__dimension) for name, area in self.__areas.items(): self.__areas[name] = rsub_vector - self.__areas[name] return self def __mul__(self, scale_vector) -> AOISceneType: """Scale scene by a vector.""" assert(len(scale_vector) == self.__dimension) for name, area in self.__areas.items(): self.__areas[name] = self.__areas[name] * scale_vector return self # Allow n * scene operation __rmul__ = __mul__ def __truediv__(self, div_vector) -> AOISceneType: assert(len(div_vector) == self.__dimension) for name, area in self.__areas.items(): self.__areas[name] = self.__areas[name] / div_vector return self def items(self) -> Tuple[str, AreaOfInterest]: """Iterate over areas.""" return self.__areas.items() def keys(self) -> list[str]: """Get areas name.""" return self.__areas.keys() @property def dimension(self) -> int: """Dimension of the AOI in scene.""" return self.__dimension def expand(self) -> AOISceneType: """Add 1 dimension to the AOIs in scene.""" new_areas = {} for name, area in self.__areas.items(): zeros = numpy.zeros((len(self.__areas[name]), 1)) new_areas[name] = numpy.concatenate((self.__areas[name], zeros), axis=1) return AOIScene(dimension = self.__dimension + 1, areas = new_areas) @property def bounds(self) -> numpy.array: """Get scene's bounds.""" all_vertices = [] for area in self.__areas.values(): for vertice in area: all_vertices.append(vertice) all_vertices = numpy.array(all_vertices) #.astype(numpy.float32) min_bounds = numpy.min(all_vertices, axis=0) max_bounds = numpy.max(all_vertices, axis=0) return numpy.array([min_bounds, max_bounds]) @property def center(self) -> numpy.array: """Get scene's center point.""" min_bounds, max_bounds = self.bounds return (min_bounds + max_bounds) / 2 @property def size(self) -> numpy.array: """Get scene size.""" min_bounds, max_bounds = self.bounds return max_bounds - min_bounds def copy(self, exclude=[]) -> AOISceneType: """Copy scene partly excluding AOI by name.""" scene_copy = type(self)() for name, area in self.__areas.items(): if name not in exclude: scene_copy[name] = AreaOfInterest(area) #.astype(numpy.float32).view(AreaOfInterest) return scene_copy def clear(self): """Clear scene.""" self.__areas.clear() def __str__(self) -> str: """ String representation of pipeline step object. Returns: String representation """ output = '' for name, area in self.__areas.items(): output += f'{Fore.BLUE}{Style.BRIGHT}{name}{Style.RESET_ALL} ' return output class TimeStampedAOIScenes(DataFeatures.TimeStampedBuffer): """Define timestamped buffer to store AOI scenes in time.""" def __setitem__(self, ts, scene): """Force value to inherit from AOIScene.""" assert(type(scene).__bases__[0] == AOIScene) super().__setitem__(ts, scene) HeatmapType = TypeVar('Heatmap', bound="Heatmap") # Type definition for type annotation convenience @dataclass class Heatmap(DataFeatures.PipelineStepObject): """Define image to draw heatmap.""" size: tuple = field(default=(1, 1)) """Size of heatmap image in pixels.""" buffer: int = field(default=0) """Size of heatmap buffer (0 means no buffering).""" sigma: float = field(default=0.05) """Point spread factor.""" def __post_init__(self): super().__init__() self.__rX, self.__rY = self.size # Init coordinates self.__Sx = numpy.linspace(0., self.__rX/self.__rY, self.__rX) self.__Sy = numpy.linspace(0., 1., self.__rY) # Init heatmap image self.clear() def point_spread(self, point: tuple): """Draw gaussian point spread into image.""" div = -2 * self.sigma**2 x = point[0] / self.__rY # we use rY not rX !!! y = point[1] / self.__rY dX2 = (self.__Sx - x)**2 dY2 = (self.__Sy - y)**2 v_dX, v_dY = numpy.array(numpy.meshgrid(dX2, dY2)).reshape(2, -1) return numpy.exp((v_dX + v_dY) / div).reshape(self.__rY, self.__rX) def clear(self): """Clear heatmap image.""" self.__point_spread_sum = numpy.zeros((self.__rY, self.__rX)) self.__point_spread_buffer = [] self.__point_spread_buffer_size = self.buffer @DataFeatures.PipelineStepMethod def update(self, timestamp: int|float, point: tuple): """Update heatmap image.""" point_spread = self.point_spread(point) # Sum point spread self.__point_spread_sum += point_spread # If point spread buffering enabled if self.buffer > 0: self.__point_spread_buffer.append(point_spread) # Remove oldest point spread buffer image if len(self.__point_spread_buffer) > self.buffer: self.__point_spread_sum -= self.__point_spread_buffer.pop(0) # Edit heatmap gray = (255 * self.__point_spread_sum / numpy.max(self.__point_spread_sum)).astype(numpy.uint8) self.__image = cv2.applyColorMap(gray, cv2.COLORMAP_JET) @property def image(self): """Get heatmap image.""" try: return self.__image except AttributeError: return numpy.zeros((self.__rY, self.__rX, 3)).astype(numpy.uint8)