#!/usr/bin/env python from typing import TypeVar, Tuple import collections import json import bisect import pandas import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches TimeStampType = TypeVar('TimeStamp', int, float) """Type definition for timestamp as integer or float values.""" DataType = TypeVar('Data') """Type definition for data to store anything in time.""" TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") # Type definition for type annotation convenience class TimeStampedBuffer(collections.OrderedDict): """Ordered dictionary to handle timestamped data. ``` { timestamp1: data1, timestamp2: data2, ... } ``` .. warning:: Timestamps must be numbers. .. warning:: Timestamps are not sorted internally. Data are considered to be stored according at their coming time. """ def __new__(cls, args = None): """Inheritance""" return super(TimeStampedBuffer, cls).__new__(cls) def __setitem__(self, ts: TimeStampType, data: DataType): """Store data at given timestamp.""" assert(type(ts) == int or type(ts) == float) super().__setitem__(ts, data) def __repr__(self): """String representation""" return json.dumps(self, ensure_ascii = False, default=vars) def append(self, timestamped_buffer: TimeStampedBufferType) -> TimeStampedBufferType: """Append a timestamped buffer.""" for ts, value in timestamped_buffer.items(): self[ts] = value return self @property def first(self) -> Tuple[TimeStampType, DataType]: """Easing access to first item.""" return list(self.items())[0] def pop_first(self) -> Tuple[TimeStampType, DataType]: """Easing FIFO access mode.""" return self.popitem(last=False) def pop_first_until(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: """Pop all item until a given timestamped value and return the last poped item.""" # get last item before given timestamp earliest_ts, earliest_value = self.get_last_before(ts) popep_ts, poped_value = self.pop_first() while popep_ts != earliest_ts: popep_ts, poped_value = self.pop_first() return popep_ts, poped_value @property def last(self) -> Tuple[TimeStampType, DataType]: """Easing access to last item.""" return list(self.items())[-1] def pop_last(self) -> Tuple[TimeStampType, DataType]: """Easing FIFO access mode.""" return self.popitem(last=True) def get_last_before(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive last item timestamp before a given timestamp value.""" ts_list = list(self.keys()) last_before_index = bisect.bisect_left(ts_list, ts) - 1 if last_before_index >= 0: last_before_ts = ts_list[last_before_index] return last_before_ts, self[last_before_ts] else: raise KeyError(f'No data stored before {ts} timestamp.') def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: """Convert as [pandas dataframe](https://pandas.pydata.org/docs/reference/frame.html). The optional *split* argument allows tuple values to be stored in dedicated columns. For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} .. warning:: Values must be dictionaries. Each key is stored as a column name. .. note:: Timestamps are stored as index column called 'timestamp'. """ df = pandas.DataFrame.from_dict(self.values()) # Exclude columns df.drop(exclude, inplace=True, axis=True) # Split columns splited_columns = [] for column in df.columns: if column in split.keys(): df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) df.drop(column, inplace=True, axis=True) for new_column in split[column]: splited_columns.append(new_column) else: splited_columns.append(column) # Reorder splited columns df = df[splited_columns] # Append timestamps as index column df['timestamp'] = self.keys() df.set_index('timestamp', inplace=True) return df def plot(self, names=[], colors=[], split={}, samples=None) -> list: """Plot as [matplotlib](https://matplotlib.org/) time chart.""" df = self.as_dataframe(split=split) legend_patches = [] # decimate data if samples != None: if samples < len(df): step = int(len(df) / samples) + 1 df = df.iloc[::step, :] for name, color in zip(names, colors): markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) mpyplot.setp(stemlines, color=color, linewidth=1) mpyplot.setp(baseline, color=color, linewidth=1) legend_patches.append(mpatches.Patch(color=color, label=name.upper())) return legend_patches