"""Miscellaneous data features. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" from typing import Self import os import sys import logging import traceback import importlib import collections import json import bisect import threading import math import time import pandas import numpy import cv2 import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches from colorama import Style, Fore def module_path(obj) -> str: """ Get object module path. Returns: module path """ return obj.__class__.__module__ def get_class(class_path: str) -> object: """Get class object from 'path.to.class' string. Parameters: class_path: a 'path.to.class' string. Returns: class: a 'path.to.class' class. """ parts = class_path.split('.') module = ".".join(parts[:-1]) m = __import__(module) for comp in parts[1:]: m = getattr(m, comp) return m def properties(cls) -> list: """get class properties name.""" properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)] for base in cls.__bases__: for name, item in base.__dict__.items(): if isinstance(item, property): properties.append(name) return properties def as_dict(obj, filter: bool=True) -> dict: """Export object as dictionary. Parameters: filter: remove None attribute values. """ _dict = {} for p in properties(obj.__class__): v = getattr(obj, p) if not filter or v is not None: _dict[p] = v return _dict class JsonEncoder(json.JSONEncoder): """Specific ArGaze JSON Encoder.""" def default(self, obj): """default implementation to serialize object.""" # numpy cases if isinstance(obj, numpy.integer): return int(obj) elif isinstance(obj, numpy.floating): return float(obj) elif isinstance(obj, numpy.ndarray): return obj.tolist() # default case try: return json.JSONEncoder.default(self, obj) # class case except: # ignore attribute starting with _ public_dict = {} for k, v in vars(obj).items(): if not k.startswith('_'): # numpy cases if isinstance(v, numpy.integer): v = int(v) elif isinstance(v, numpy.floating): v = float(v) elif isinstance(v, numpy.ndarray): v = v.tolist() public_dict[k] = v return public_dict class DataDictionary(dict): """Enable dot.notation access to dictionary attributes""" __getattr__ = dict.get __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ class TimestampedObject(): """Abstract class to enable timestamp management.""" def __init__(self, timestamp: int|float = math.nan): """Initialize TimestampedObject.""" self._timestamp = timestamp def __repr__(self): """String representation.""" return json.dumps(as_dict(self)) @property def timestamp(self) -> int|float: """Get object timestamp.""" return self._timestamp @timestamp.setter def timestamp(self, timestamp: int|float): """Set object timestamp.""" self._timestamp = timestamp def untimestamp(self): """Reset object timestamp.""" self._timestamp = math.nan def is_timestamped(self) -> bool: """Is the object timestamped?""" return not math.isnan(self._timestamp) class TimestampedObjectsList(list): """Handle timestamped object into a list. !!! warning "Timestamped objects are not sorted internally" Timestamped objects are considered to be stored according at their coming time. """ def __init__(self, ts_object_type: type, ts_objects: list = []): self.__object_type = ts_object_type self.__object_properties = properties(self.__object_type) for ts_object in ts_objects: self.append(ts_object) @property def object_type(self): """Get object type handled by the list.""" return self.__object_type def append(self, ts_object: TimestampedObject|dict): """Append timestamped object.""" # Convert dict into GazePosition if type(ts_object) == dict: ts_object = self.__object_type.from_dict(ts_object) # Check object type if type(ts_object) != self.__object_type: if not issubclass(ts_object.__class__, self.__object_type): raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') if not ts_object.is_timestamped(): raise ValueError(f'object is not timestamped') super().append(ts_object) def look_for(self, timestamp: int|float) -> TimestampedObject: """Look for object at given timestamp.""" for ts_object in self: if ts_object.timestamp == timestamp: return ts_object def __add__(self, ts_objects: list = []) -> Self: """Append timestamped objects list.""" for ts_object in ts_objects: self.append(ts_object) return self @property def duration(self): """Get inferred duration from first and last timestamps.""" if self: return self[-1].timestamp - self[0].timestamp else: return 0 def timestamps(self): """Get all timestamps in list.""" return [ts_object.timestamp for ts_object in self] def tuples(self) -> list: """Get all timestamped objects as list of tuple.""" return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] @classmethod def from_dataframe(self, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self: """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" dataframe.drop(exclude, inplace=True, axis=True) assert(dataframe.index.name == 'timestamp') object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in dataframe.to_dict('index').items()] return TimestampedObjectsList(ts_object_type, object_list) def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). The optional *split* argument allows tuple values to be stored in dedicated columns. For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} !!! warning "Values must be dictionaries" Each key is stored as a column name. !!! note Timestamps are stored as index column called 'timestamp'. """ df = pandas.DataFrame(self.tuples(), columns=self.__object_properties) # Exclude columns df.drop(exclude, inplace=True, axis=True) # Split columns if len(split) > 0: splited_columns = [] for column in df.columns: if column in split.keys(): df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) df.drop(column, inplace=True, axis=True) for new_column in split[column]: splited_columns.append(new_column) else: splited_columns.append(column) # Reorder splited columns df = df[splited_columns] # Append timestamps as index column df['timestamp'] = self.timestamps() df.set_index('timestamp', inplace=True) return df @classmethod def from_json(self, ts_object_type: type, json_filepath: str) -> Self: """Create a TimestampedObjectsList from .json file.""" with open(json_filepath, encoding='utf-8') as ts_objects_file: json_ts_objects = json.load(ts_objects_file) return TimestampedObjectsList(ts_object_type, [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects]) def to_json(self, json_filepath: str): """Save a TimestampedObjectsList to .json file.""" with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file: json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ') def __repr__(self): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) def __str__(self): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) def pop_last_until(self, timestamp: int|float) -> TimestampedObject: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp earliest_value = self.get_last_until(timestamp) while self[0].timestamp < earliest_value.timestamp: self.pop(0) return self[0] def pop_last_before(self, timestamp: int|float) -> TimestampedObject: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp earliest_value = self.get_last_before(timestamp) poped_value = self.pop(0) while poped_value.timestamp != earliest_value.timestamp: poped_value = self.pop(0) return poped_value def get_first_from(self, timestamp: int|float) -> TimestampedObject: """Retreive first item timestamp from a given timestamp value.""" ts_list = self.timestamps() first_from_index = bisect.bisect_left(ts_list, timestamp) if first_from_index < len(self): return self[ts_list[first_from_index]] else: raise KeyError(f'No data stored after {timestamp} timestamp.') def get_last_before(self, timestamp: int|float) -> TimestampedObject: """Retreive last item timestamp before a given timestamp value.""" ts_list = self.timestamps() last_before_index = bisect.bisect_left(ts_list, timestamp) - 1 if last_before_index >= 0: return self[ts_list[last_before_index]] else: raise KeyError(f'No data stored before {timestamp} timestamp.') def get_last_until(self, timestamp: int|float) -> TimestampedObject: """Retreive last item timestamp until a given timestamp value.""" ts_list = self.timestamps() last_until_index = bisect.bisect_right(ts_list, timestamp) - 1 if last_until_index >= 0: return self[ts_list[last_until_index]] else: raise KeyError(f'No data stored until {timestamp} timestamp.') def plot(self, names=[], colors=[], split={}, samples=None) -> list: """Plot as [matplotlib](https://matplotlib.org/) time chart.""" df = self.as_dataframe(split=split) legend_patches = [] # decimate data if samples != None: if samples < len(df): step = int(len(df) / samples) + 1 df = df.iloc[::step, :] for name, color in zip(names, colors): markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) mpyplot.setp(stemlines, color=color, linewidth=1) mpyplot.setp(baseline, color=color, linewidth=1) legend_patches.append(mpatches.Patch(color=color, label=name.upper())) return legend_patches class SharedObject(TimestampedObject): """Abstract class to enable multiple threads sharing for timestamped object.""" def __init__(self, timestamp: int|float = math.nan): TimestampedObject.__init__(self, timestamp) self._lock = threading.Lock() self._execution_times = {} self._exceptions = {} def PipelineStepInit(method): """Define a decorator use into PipelineStepObject class to declare pipeline step init method.""" def wrapper(self, **kwargs): """Wrap pipeline step init method to update PipelineStepObject attributes with arguments after init call. Parameters: kwargs: any arguments defined by PipelineStepMethodInit. """ method(self, **kwargs) self.update_attributes(kwargs) return wrapper def PipelineStepAttributeSetter(method): """Define a decorator use into PipelineStepObject class to declare pipeline step attribute setter.""" def wrapper(self, new_value, unwrap: bool = False): """Wrap pipeline step attribute setter to load attribute from file. Parameters: new_value: value used to set attribute. unwrap: call wrapped method directly. """ if unwrap: return method(self, new_value) # Get new value type new_value_type = type(new_value) # Check setter annotations to get expected value type try: expected_value_type = list(method.__annotations__.values())[0] except KeyError: raise(ValueError(f'Missing annotations in {method.__name__}: {method.__annotations__}')) logging.debug('@PipelineStepAttributeSetter %s.%s.setter(%s) with %s', type(self).__name__, method.__name__, expected_value_type.__name__, new_value_type.__name__) # Define function to load dict values def load_dict(data: dict) -> any: logging.debug('\t> load %s from %s', expected_value_type.__name__, new_value_type.__name__) # Check if json keys are PipelineStepObject class and store them in a list new_objects_list = [] for key, value in data.items(): try: new_class = get_class(key) except ValueError as e: # Keys are not class name if str(e) == 'Empty module name': break else: raise(e) logging.debug('\t+ create %s object from key using value as argument', key) new_objects_list.append( new_class(**value) ) # Only one object have been loaded: pass the object if it is a subclass of expected type if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type): return new_objects_list[0] # Pass non empty objects list elif len(new_objects_list) > 0: return new_objects_list # Otherwise, data are parameters of the expected class logging.debug('\t+ create %s object using %s as argument', expected_value_type.__name__, new_value_type.__name__) return expected_value_type(**data) # String not expected: load value from file if new_value_type == str and new_value_type != expected_value_type: split_point = new_value.split('.') # String have a dot inside: file path with format if len(split_point) > 1: file_format = split_point[-1] logging.debug('\t> %s is a path to a %s file', new_value, file_format.upper()) filepath = os.path.join(self.working_directory, new_value) # Load image from JPG and PNG formats if file_format == 'jpg' or file_format == 'png': return method(self, cv2.imread(filepath)) # Load image from OBJ formats elif file_format == 'obj': return method(self, expected_value_type.from_obj(filepath)) # Load object from JSON file elif file_format == 'json': with open(filepath) as file: return method(self, load_dict(json.load(file))) # No point inside string: identifier name else: logging.debug('\t> %s is an identifier', new_value) logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__) return method(self, expected_value_type(new_value)) # Dict not expected: load value from dict if new_value_type == dict and expected_value_type != dict: return method(self, load_dict(new_value)) # Otherwise, pass new value to setter method logging.debug('\t> use %s value as passed', new_value_type.__name__) method(self, new_value) return wrapper class PipelineStepObject(): """ Define class to assess pipeline step methods execution time and observe them. """ @PipelineStepInit def __init__(self, **kwargs): """Initialize PipelineStepObject.""" logging.debug('PipelineStepObject.__init__ %s %s', type(self).__name__, kwargs['name'] if 'name' in kwargs else '') # Init private attribute self.__name = None self.__working_directory = None self.__observers = [] self.__execution_times = {} # Parent attribute will be setup later by parent it self self.__parent = None def __enter__(self): """At with statement start.""" # Start children pipeline step objects for child in self.children: child.__enter__() # Start observers for observer in self.__observers: observer.__enter__() return self def __exit__(self, exception_type, exception_value, exception_traceback): """At with statement end.""" # End observers for observer in self.__observers: observer.__exit__(exception_type, exception_value, exception_traceback) # End children pipeline step objects for child in self.children: child.__exit__(exception_type, exception_value, exception_traceback) def update_attributes(self, object_data: dict): """Update pipeline step object attributes with dictionary.""" for key, value in object_data.items(): logging.debug('PipelineStepObject.update_attributes %s.%s with %s value', type(self).__name__, key, type(value).__name__) setattr(self, key, value) @property def name(self) -> str: """Get pipeline step object's name.""" return self.__name @name.setter def name(self, name: str): """Set pipeline step object's name.""" self.__name = name @property def working_directory(self) -> str: """Get pipeline step object's working directory. This path will be joined to relative file path.""" return self.__working_directory @working_directory.setter def working_directory(self, working_directory: str): """Set pipeline step object's working directory.""" # Append working directory to the Python path if working_directory is not None: sys.path.append(working_directory) self.__working_directory = working_directory @property def parent(self) -> object: """Get pipeline step object's parent object.""" return self.__parent @parent.setter def parent(self, parent: object): """Set layer's parent object.""" self.__parent = parent @property def observers(self) -> list: """Pipeline step object observers list.""" return self.__observers @observers.setter @PipelineStepAttributeSetter def observers(self, observers: list): # Edit new observers dictionary self.__observers = observers @property def execution_times(self): """Get pipeline step object observers execution times dictionary.""" return self.__execution_times def as_dict(self) -> dict: """Export PipelineStepObject attributes as dictionary. Returns: object_data: dictionary with pipeline step object attributes values. """ return { "name": self.__name, "observers": self.__observers } @classmethod def from_json(cls, configuration_filepath: str, patch_filepath: str = None) -> object: """ Load instance from .json file. Parameters: configuration_filepath: path to json configuration file patch_filepath: path to json patch file to modify any configuration entries """ logging.debug('%s.from_json', cls.__name__) # Load configuration from JSON file with open(configuration_filepath) as configuration_file: # Edit object_data with working directory as first key object_data = { 'working_directory': os.path.dirname(configuration_filepath) } object_data.update(json.load(configuration_file)) # Apply patch to configuration if required if patch_filepath is not None: with open(patch_filepath) as patch_file: patch_data = json.load(patch_file) import collections.abc def update(d, u): for k, v in u.items(): if isinstance(v, collections.abc.Mapping): d[k] = update(d.get(k, {}), v) elif v is None: del d[k] else: d[k] = v return d object_data = update(object_data, patch_data) # Instanciate class return cls(**object_data) def to_json(self, json_filepath: str = None): """Save pipeline step object into .json file.""" # Remember file path to ease rewriting if json_filepath is not None: self.__json_filepath = json_filepath # Open file with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: json.dump({module_path(self):as_dict(self)}, object_file, ensure_ascii=False, indent=4) # QUESTION: maybe we need two saving mode? #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) def __str__(self) -> str: """ String representation of pipeline step object. Returns: String representation """ tabs = self.tabulation output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n' if self.__name is not None: output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n' if self.__parent is not None: output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n' if len(self.__observers): output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n' for observer in self.__observers: output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n' for name, value in self.properties: output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: ' if type(value) == dict: output += '\n' for k, v in value.items(): output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n' if type(value) == list: output += '\n' for v in value: output += f'{tabs}\t - {v}\n' elif type(value) == numpy.ndarray: output += f'numpy.array{value.shape}\n' elif type(value) == pandas.DataFrame: output += f'pandas.DataFrame{value.shape}\n' else: try: output += f'{value}' except TypeError as e: output += f'{Fore.RED}{Style.BRIGHT}!!! {type(self).__name__}.{name}: {e}{Style.RESET_ALL}\n\n' if output[-1] != '\n': output += '\n' return output @property def tabulation(self) -> str: """Edit tabulation string according parents number.""" tabs = '' parent = self.__parent while (parent is not None): tabs += '\t' parent = parent.parent return tabs @property def properties(self) -> tuple[name, any]: """Iterate over pipeline step properties values.""" for name, item in self.__class__.__dict__.items(): if isinstance(item, property): yield name, getattr(self, name) for base in self.__class__.__bases__: if base != PipelineStepObject and base != SharedObject: for name, item in base.__dict__.items(): if isinstance(item, property): yield name, getattr(self, name) @property def children(self) -> object: """Iterate over children pipeline step objects.""" for name in dir(self): if not name.startswith('_'): attr = getattr(self, name) if isinstance(attr, PipelineStepObject) and attr != self.parent: yield attr def PipelineStepMethod(method): """Define a decorator use into PipelineStepObject class to declare pipeline method. !!! danger PipelineStepMethod must have a timestamp as first argument. """ def wrapper(self, *args, timestamp: int|float = None, unwrap: bool = False, **kwargs): """Wrap pipeline step method to measure execution time. Parameters: args: any arguments defined by PipelineStepMethod. timestamp: optional method call timestamp (unit does'nt matter) if first args parameter is not a TimestampedObject instance. unwrap: extra arguments used in wrapper function to call wrapped method directly. """ if timestamp is None and len(args) > 0: if isinstance(args[0], TimestampedObject): timestamp = args[0].timestamp if unwrap: return method(self, *args, **kwargs) # Initialize execution time assessment start = time.perf_counter() exception = None result = None try: # Execute wrapped method result = method(self, *args, **kwargs) except Exception as e: exception = e finally: # Measure execution time self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 # Notify observers that method has been called subscription_name = f'on_{method.__name__}' for observer in self.observers: # Does the observer cares about this method? if subscription_name in dir(observer): subscription = getattr(observer, subscription_name) # Call subscription subscription(timestamp, self, exception) # Raise exception if exception is not None: raise exception return result return wrapper class PipelineStepObserver(): """Define abstract class to observe pipeline step object use. !!! note To subscribe to a method call, the inherited class simply needs to define 'on_' functions with timestamp, object and traceback argument. """ def __enter__(self): """ Define abstract __enter__ method to use observer as a context. !!! warning This method is called provided that the observed PipelineStepObject is created as a context using a with statement. """ return self def __exit__(self, type, value, traceback): """ Define abstract __exit__ method to use observer as a context. !!! warning This method is called provided that the observed PipelineStepObject is created as a context using a with statement. """ pass class PipelineInputProvider(PipelineStepObject): """ Define class to ... """ @PipelineStepInit def __init__(self, **kwargs): logging.debug('PipelineInputProvider.__init__') super().__init__() def attach(self, method): logging.debug('PipelineInputProvider.attach', method) def __enter__(self): """ Define abstract __enter__ method to use device as a context. !!! warning This method is called provided that the PipelineInputProvider is created as a context using a with statement. """ return self def __exit__(self, type, value, traceback): """ Define abstract __exit__ method to use device as a context. !!! warning This method is called provided that the PipelineInputProvider is created as a context using a with statement. """ pass