#!/usr/bin/env python """Miscellaneous data features.""" __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple, Any import os import importlib from inspect import getmembers, getmodule import collections import json import ast import bisect import threading import math import time import pandas import numpy import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches from colorama import Style, Fore TimeStampType = TypeVar('TimeStamp', int, float) """Type definition for timestamp as integer or float values.""" DataType = TypeVar('Data') """Type definition for data to store anything in time.""" TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") # Type definition for type annotation convenience def module_path(obj) -> str: """ Get object module path. Returns: module path """ return obj.__class__.__module__ class JsonEncoder(json.JSONEncoder): """Specific ArGaze JSON Encoder.""" def default(self, obj): """default implementation to serialize object.""" # numpy cases if isinstance(obj, numpy.integer): return int(obj) elif isinstance(obj, numpy.floating): return float(obj) elif isinstance(obj, numpy.ndarray): return obj.tolist() # default case try: return json.JSONEncoder.default(self, obj) # class case except: # ignore attribute starting with _ public_dict = {} for k, v in vars(obj).items(): if not k.startswith('_'): # numpy cases if isinstance(v, numpy.integer): v = int(v) elif isinstance(v, numpy.floating): v = float(v) elif isinstance(v, numpy.ndarray): v = v.tolist() public_dict[k] = v return public_dict class TimeStampedBuffer(collections.OrderedDict): """Ordered dictionary to handle timestamped data. ``` { timestamp1: data1, timestamp2: data2, ... } ``` !!! warning Timestamps must be numbers. !!! warning "Timestamps are not sorted internally" Data are considered to be stored according at their coming time. """ def __new__(cls, args = None): """Inheritance""" return super(TimeStampedBuffer, cls).__new__(cls) def __setitem__(self, ts: TimeStampType, data: DataType): """Store data at given timestamp.""" assert(type(ts) == int or type(ts) == float) super().__setitem__(ts, data) def __repr__(self): """String representation""" return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) def __str__(self): """String representation""" return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) def append(self, timestamped_buffer: TimeStampedBufferType) -> TimeStampedBufferType: """Append a timestamped buffer.""" for ts, value in timestamped_buffer.items(): self[ts] = value return self @property def first(self) -> Tuple[TimeStampType, DataType]: """Easing access to first item.""" return list(self.items())[0] def pop_first(self) -> Tuple[TimeStampType, DataType]: """Easing FIFO access mode.""" return self.popitem(last=False) def pop_last_until(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp earliest_ts, earliest_value = self.get_last_until(ts) first_ts, first_value = self.first while first_ts < earliest_ts: self.pop_first() first_ts, first_value = self.first return first_ts, first_value def pop_last_before(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp earliest_ts, earliest_value = self.get_last_before(ts) popep_ts, poped_value = self.pop_first() while popep_ts != earliest_ts: popep_ts, poped_value = self.pop_first() return popep_ts, poped_value @property def last(self) -> Tuple[TimeStampType, DataType]: """Easing access to last item.""" return list(self.items())[-1] def pop_last(self) -> Tuple[TimeStampType, DataType]: """Easing FIFO access mode.""" return self.popitem(last=True) def get_first_from(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive first item timestamp from a given timestamp value.""" ts_list = list(self.keys()) first_from_index = bisect.bisect_left(ts_list, ts) if first_from_index < len(self): first_from_ts = ts_list[first_from_index] return first_from_ts, self[first_from_ts] else: raise KeyError(f'No data stored after {ts} timestamp.') def get_last_before(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive last item timestamp before a given timestamp value.""" ts_list = list(self.keys()) last_before_index = bisect.bisect_left(ts_list, ts) - 1 if last_before_index >= 0: last_before_ts = ts_list[last_before_index] return last_before_ts, self[last_before_ts] else: raise KeyError(f'No data stored before {ts} timestamp.') def get_last_until(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive last item timestamp until a given timestamp value.""" ts_list = list(self.keys()) last_until_index = bisect.bisect_right(ts_list, ts) - 1 if last_until_index >= 0: last_until_ts = ts_list[last_until_index] return last_until_ts, self[last_until_ts] else: raise KeyError(f'No data stored until {ts} timestamp.') @classmethod def from_json(self, json_filepath: str) -> TimeStampedBufferType: """Create a TimeStampedBuffer from .json file.""" with open(json_filepath, encoding='utf-8') as ts_buffer_file: json_buffer = json.load(ts_buffer_file) return TimeStampedBuffer({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) def to_json(self, json_filepath: str): """Save a TimeStampedBuffer to .json file.""" with open(json_filepath, 'w', encoding='utf-8') as ts_buffer_file: json.dump(self, ts_buffer_file, ensure_ascii=False, cls=JsonEncoder) @classmethod def from_dataframe(self, dataframe: pandas.DataFrame, exclude=[]) -> TimeStampedBufferType: """Create a TimeStampedBuffer from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" dataframe.drop(exclude, inplace=True, axis=True) assert(dataframe.index.name == 'timestamp') return TimeStampedBuffer(dataframe.to_dict('index')) def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). The optional *split* argument allows tuple values to be stored in dedicated columns. For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} !!! warning "Values must be dictionaries" Each key is stored as a column name. !!! note Timestamps are stored as index column called 'timestamp'. """ df = pandas.DataFrame.from_dict(self.values()) # Exclude columns df.drop(exclude, inplace=True, axis=True) # Split columns if len(split) > 0: splited_columns = [] for column in df.columns: if column in split.keys(): df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) df.drop(column, inplace=True, axis=True) for new_column in split[column]: splited_columns.append(new_column) else: splited_columns.append(column) # Reorder splited columns df = df[splited_columns] # Append timestamps as index column df['timestamp'] = self.keys() df.set_index('timestamp', inplace=True) return df def plot(self, names=[], colors=[], split={}, samples=None) -> list: """Plot as [matplotlib](https://matplotlib.org/) time chart.""" df = self.as_dataframe(split=split) legend_patches = [] # decimate data if samples != None: if samples < len(df): step = int(len(df) / samples) + 1 df = df.iloc[::step, :] for name, color in zip(names, colors): markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) mpyplot.setp(stemlines, color=color, linewidth=1) mpyplot.setp(baseline, color=color, linewidth=1) legend_patches.append(mpatches.Patch(color=color, label=name.upper())) return legend_patches class DataDictionary(dict): """Enable dot.notation access to dictionary attributes""" __getattr__ = dict.get __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ class SharedObject(): """Abstract class to enable multiple threads sharing and timestamp management.""" def __init__(self): self._lock = threading.Lock() self._timestamp = math.nan self._execution_times = {} self._exceptions = {} @property def lock(self) -> threading.Lock: """Get shared object lock object.""" return self._lock @property def timestamp(self) -> int|float: """Get shared object timestamp.""" self._lock.acquire() timestamp = self._timestamp self._lock.release() return timestamp @timestamp.setter def timestamp(self, timestamp: int|float): """Set shared object timestamp.""" self._lock.acquire() self._timestamp = timestamp self._lock.release() def untimestamp(self): """Reset shared object timestamp.""" self._lock.acquire() self._timestamp = math.nan self._lock.release() @property def timestamped(self) -> bool: """Is the object timestamped?""" self._lock.acquire() timestamped = not math.isnan(self._timestamp) self._lock.release() return timestamped class PipelineStepObject(): """ Define class to assess pipeline step methods execution time and observe them. """ def __init__(self, name: str = None, observers: dict = None): """Initialize PipelineStepObject Parameters: observers: dictionary with observers objects. """ # Init private attribute self.__name = name self.__observers = observers if observers is not None else {} self.__execution_times = {} self.__properties = {} # parent attribute will be setup later by parent it self self.__parent = None def __enter__(self): """At with statement start.""" # Start observers for observer_name, observer in self.__observers.items(): observer.__enter__() return self def __exit__(self, type, value, traceback): """At with statement end.""" # End observers for observer_name, observer in self.__observers.items(): observer.__exit__(type, value, traceback) @property def name(self) -> str: """Get layer's name.""" return self.__name @property def parent(self) -> object: """Get layer's parent object.""" return self.__parent @parent.setter def parent(self, parent: object): """Set layer's parent object.""" self.__parent = parent @property def observers(self) -> dict: """Get pipeline step object observers dictionary.""" return self.__observers @property def execution_times(self): """Get pipeline step object observers execution times dictionary.""" return self.__execution_times def as_dict(self) -> dict: """Export PipelineStepObject attributes as dictionary. Returns: object_data: dictionary with pipeline step object attributes values. """ return { "name": self.__name, "observers": self.__observers } @classmethod def from_dict(cls, object_data: dict, working_directory: str = None) -> object: """Load PipelineStepObject attributes from dictionary. Returns: object_data: dictionary with pipeline step object attributes values. working_directory: folder path where to load files when a dictionary value is a relative filepath. """ # Load name try: new_name = object_data.pop('name') except KeyError: new_name = None # Load observers new_observers = {} try: new_observers_value = object_data.pop('observers') # str: relative path to file if type(new_observers_value) == str: filepath = os.path.join(working_directory, new_observers_value) file_format = filepath.split('.')[-1] # Python file format if file_format == 'py': observer_module_path = new_observers_value.split('.')[0] observer_module = importlib.import_module(observer_module_path) new_observers = observer_module.__observers__ # dict: instanciate ready-made argaze observers elif type(new_observers_value) == dict: for observer_type, observer_data in new_observers_value.items(): new_observers[observer_type] = eval(f'{observer_type}(**observer_data)') except KeyError: pass # Create pipeline step object return PipelineStepObject(\ new_name, \ new_observers \ ) @classmethod def from_json(self, json_filepath: str) -> object: """ Define abstract method to load pipeline step object from .json file. Parameters: json_filepath: path to json file """ raise NotImplementedError('from_json() method not implemented') def to_json(self, json_filepath: str = None): """Save pipeline step object into .json file.""" # Remember file path to ease rewriting if json_filepath is not None: self.__json_filepath = json_filepath # Open file with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: json.dump({DataFeatures.module_path(self):DataFeatures.JsonEncoder().default(self)}, object_file, ensure_ascii=False, indent=4) # QUESTION: maybe we need two saving mode? #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) def __str__(self) -> str: """ String representation of pipeline step object. Returns: String representation """ tabs = self.tabulation output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n' if self.__name is not None: output += f'{tabs}\t{Style.BRIGHT}name{Style.RESET_ALL}: {self.__name}\n' if self.__parent is not None: output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {self.__parent.name}\n' if len(self.__observers): output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n' for name, observer in self.__observers.items(): output += f'{tabs}\t - {Fore.RED}{name}{Style.RESET_ALL}: {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n' for name, value in self.properties: output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: ' if type(value) == dict: output += '\n' for k, v in value.items(): output += f'{tabs}\t - {Fore.RED}{k}{Style.RESET_ALL}: {v}\n' elif type(value) == numpy.ndarray: output += f'numpy.array{value.shape}\n' elif type(value) == pandas.DataFrame: output += f'pandas.DataFrame{value.shape}\n' else: output += f'{value}' if output[-1] != '\n': output += '\n' return output @property def tabulation(self) -> str: """Edit tabulation string according parents number.""" tabs = '' parent = self.__parent while (parent is not None): tabs += '\t' parent = parent.parent return tabs @property def properties(self) -> list: """Iterate over pipeline step properties values.""" for name, item in self.__class__.__dict__.items(): if isinstance(item, property): yield name, getattr(self, name) for base in self.__class__.__bases__: if base != PipelineStepObject and base != SharedObject: for name, item in base.__dict__.items(): if isinstance(item, property): yield name, getattr(self, name) def PipelineStepMethod(method): """Define a decorator use into PipelineStepObject class to declare pipeline method. !!! danger PipelineStepMethod must have a timestamp as first argument. """ def wrapper(self, timestamp, *args, **kw): """Wrap pipeline step method to measure execution time.""" # Initialize execution time assessment start = time.perf_counter() try: # Execute wrapped method result = method(self, timestamp, *args, **kw) finally: # Measure execution time self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 # Notify observers that method has been called subscription_name = f'on_{method.__name__}' for observer_name, observer in self.observers.items(): # Does the observer cares about this method? if subscription_name in dir(observer): subscription = getattr(observer, subscription_name) # Call subscription subscription(timestamp, self) return result return wrapper class PipelineStepObserver(): """Define abstract class to observe pipeline step object use. !!! note To subscribe to a method call, the inherited class simply needs to define 'on_' functions with timestamp and object argument. """ def __enter__(self): """ Define abstract __enter__ method to use observer as a context. !!! warning This method is called provided that the observed PipelineStepObject is created as a context using a with statement. """ return self def __exit__(self, type, value, traceback): """ Define abstract __exit__ method to use observer as a context. !!! warning This method is called provided that the observed PipelineStepObject is created as a context using a with statement. """ pass