#!/usr/bin/env python """Timestamped data features.""" __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar, Tuple from inspect import getmembers import collections import json import ast import bisect import threading import math import pandas import numpy import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches TimeStampType = TypeVar('TimeStamp', int, float) """Type definition for timestamp as integer or float values.""" DataType = TypeVar('Data') """Type definition for data to store anything in time.""" TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") # Type definition for type annotation convenience def as_dict(dataclass_object) -> dict: """ Get dataclass object fields's values as a dictionary. Returns: values: dictionary of dataclass fields's values """ # Get data class fields names fields_names = [] for member_name, member_value in getmembers(dataclass_object): if member_name == '__dataclass_fields__': fields_names = member_value.keys() # Copy fields values return {name: vars(dataclass_object)[name] for name in fields_names} def module_path(obj) -> str: """ Get object module path. Returns: module path """ return obj.__class__.__module__ class JsonEncoder(json.JSONEncoder): """Specific ArGaze JSON Encoder.""" def default(self, obj): """default implementation to serialize object.""" # numpy cases if isinstance(obj, numpy.integer): return int(obj) elif isinstance(obj, numpy.floating): return float(obj) elif isinstance(obj, numpy.ndarray): return obj.tolist() # default case try: return json.JSONEncoder.default(self, obj) # class case except: # ignore attribute starting with _ public_dict = {} for k, v in vars(obj).items(): if not k.startswith('_'): # numpy cases if isinstance(v, numpy.integer): v = int(v) elif isinstance(v, numpy.floating): v = float(v) elif isinstance(v, numpy.ndarray): v = v.tolist() public_dict[k] = v return public_dict class SharedObject(): """Enable multiple threads sharing.""" def __init__(self): self._lock = threading.Lock() self._timestamp = math.nan self._token = None def acquire(self): self._lock.acquire() def release(self): self._lock.release() def locked(self) -> bool: return self._lock.locked() @property def timestamp(self) -> int|float: """Get timestamp""" self._lock.acquire() timestamp = self._timestamp self._lock.release() return timestamp @timestamp.setter def timestamp(self, timestamp: int|float): """Set timestamp""" self._lock.acquire() self._timestamp = timestamp self._lock.release() def untimestamp(self): """Reset timestamp""" self._lock.acquire() self._timestamp = math.nan self._lock.release() @property def timestamped(self) -> bool: """Is the object timestamped?""" self._lock.acquire() timestamped = not math.isnan(self._timestamp) self._lock.release() return timestamped @property def token(self) -> any: """Get token""" self._lock.acquire() token = self._token self._lock.release() return token @token.setter def token(self, token: any): """Set token""" self._lock.acquire() self._token = token self._lock.release() class TimeStampedBuffer(collections.OrderedDict): """Ordered dictionary to handle timestamped data. ``` { timestamp1: data1, timestamp2: data2, ... } ``` !!! warning Timestamps must be numbers. !!! warning "Timestamps are not sorted internally" Data are considered to be stored according at their coming time. """ def __new__(cls, args = None): """Inheritance""" return super(TimeStampedBuffer, cls).__new__(cls) def __setitem__(self, ts: TimeStampType, data: DataType): """Store data at given timestamp.""" assert(type(ts) == int or type(ts) == float) super().__setitem__(ts, data) def __repr__(self): """String representation""" return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) def __str__(self): """String representation""" return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) def append(self, timestamped_buffer: TimeStampedBufferType) -> TimeStampedBufferType: """Append a timestamped buffer.""" for ts, value in timestamped_buffer.items(): self[ts] = value return self @property def first(self) -> Tuple[TimeStampType, DataType]: """Easing access to first item.""" return list(self.items())[0] def pop_first(self) -> Tuple[TimeStampType, DataType]: """Easing FIFO access mode.""" return self.popitem(last=False) def pop_last_until(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp earliest_ts, earliest_value = self.get_last_until(ts) first_ts, first_value = self.first while first_ts < earliest_ts: self.pop_first() first_ts, first_value = self.first return first_ts, first_value def pop_last_before(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp earliest_ts, earliest_value = self.get_last_before(ts) popep_ts, poped_value = self.pop_first() while popep_ts != earliest_ts: popep_ts, poped_value = self.pop_first() return popep_ts, poped_value @property def last(self) -> Tuple[TimeStampType, DataType]: """Easing access to last item.""" return list(self.items())[-1] def pop_last(self) -> Tuple[TimeStampType, DataType]: """Easing FIFO access mode.""" return self.popitem(last=True) def get_first_from(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive first item timestamp from a given timestamp value.""" ts_list = list(self.keys()) first_from_index = bisect.bisect_left(ts_list, ts) if first_from_index < len(self): first_from_ts = ts_list[first_from_index] return first_from_ts, self[first_from_ts] else: raise KeyError(f'No data stored after {ts} timestamp.') def get_last_before(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive last item timestamp before a given timestamp value.""" ts_list = list(self.keys()) last_before_index = bisect.bisect_left(ts_list, ts) - 1 if last_before_index >= 0: last_before_ts = ts_list[last_before_index] return last_before_ts, self[last_before_ts] else: raise KeyError(f'No data stored before {ts} timestamp.') def get_last_until(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive last item timestamp until a given timestamp value.""" ts_list = list(self.keys()) last_until_index = bisect.bisect_right(ts_list, ts) - 1 if last_until_index >= 0: last_until_ts = ts_list[last_until_index] return last_until_ts, self[last_until_ts] else: raise KeyError(f'No data stored until {ts} timestamp.') @classmethod def from_json(self, json_filepath: str) -> TimeStampedBufferType: """Create a TimeStampedBuffer from .json file.""" with open(json_filepath, encoding='utf-8') as ts_buffer_file: json_buffer = json.load(ts_buffer_file) return TimeStampedBuffer({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) def to_json(self, json_filepath: str): """Save a TimeStampedBuffer to .json file.""" with open(json_filepath, 'w', encoding='utf-8') as ts_buffer_file: json.dump(self, ts_buffer_file, ensure_ascii=False, cls=JsonEncoder) @classmethod def from_dataframe(self, dataframe: pandas.DataFrame, exclude=[]) -> TimeStampedBufferType: """Create a TimeStampedBuffer from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" dataframe.drop(exclude, inplace=True, axis=True) assert(dataframe.index.name == 'timestamp') return TimeStampedBuffer(dataframe.to_dict('index')) def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). The optional *split* argument allows tuple values to be stored in dedicated columns. For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} !!! warning "Values must be dictionaries" Each key is stored as a column name. !!! note Timestamps are stored as index column called 'timestamp'. """ df = pandas.DataFrame.from_dict(self.values()) # Exclude columns df.drop(exclude, inplace=True, axis=True) # Split columns if len(split) > 0: splited_columns = [] for column in df.columns: if column in split.keys(): df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) df.drop(column, inplace=True, axis=True) for new_column in split[column]: splited_columns.append(new_column) else: splited_columns.append(column) # Reorder splited columns df = df[splited_columns] # Append timestamps as index column df['timestamp'] = self.keys() df.set_index('timestamp', inplace=True) return df def plot(self, names=[], colors=[], split={}, samples=None) -> list: """Plot as [matplotlib](https://matplotlib.org/) time chart.""" df = self.as_dataframe(split=split) legend_patches = [] # decimate data if samples != None: if samples < len(df): step = int(len(df) / samples) + 1 df = df.iloc[::step, :] for name, color in zip(names, colors): markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) mpyplot.setp(stemlines, color=color, linewidth=1) mpyplot.setp(baseline, color=color, linewidth=1) legend_patches.append(mpatches.Patch(color=color, label=name.upper())) return legend_patches