#!/usr/bin/env python import collections import json import bisect import pandas import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches class TimeStampedBuffer(collections.OrderedDict): """Ordered dictionary to handle timestamped data. ``` { timestamp1: data1, timestamp2: data2, ... } ``` """ def __new__(cls, args = None): return super(TimeStampedBuffer, cls).__new__(cls) def __setitem__(self, key: float, value): """Force key to be a number""" if type(key) != int and type(key) != float: raise KeyError('key must be a number') super().__setitem__(key, value) def __str__(self): return json.dumps(self, default=vars) def append(self, timestamped_buffer): """Append a timestamped buffer.""" for ts, value in timestamped_buffer.items(): self[ts] = value def get_first(self): """Easing access to first item""" return list(self.items())[0] def pop_first(self): """Easing FIFO access mode""" return self.popitem(last=False) def pop_first_until(self, ts): """Pop all item until a given timestamped value and return the last poped item""" # get last timestamp before given timestamp earliest_ts = self.get_last_before(ts) # when no timestamped have been found if earliest_ts == None: raise KeyError popep_ts, poped_value = self.pop_first() while popep_ts != earliest_ts: popep_ts, poped_value = self.pop_first() return popep_ts, poped_value def get_last(self): """Easing access to last item""" return list(self.items())[-1] def pop_last(self): """Easing FIFO access mode""" return self.popitem(last=True) def get_last_before(self, ts): """Retreive last item timestamp before a given timestamp value.""" ts_list = list(self.keys()) last_before_index = bisect.bisect_left(ts_list, ts) - 1 if last_before_index >= 0: return ts_list[last_before_index] else: return None def export_as_json(self, filepath): """Write buffer content into a json file.""" try: with open(filepath, 'w', encoding='utf-8') as jsonfile: json.dump(self, jsonfile, ensure_ascii = False, default=vars) except: raise RuntimeError(f'Can\' write {filepath}') def as_dataframe(self, exclude=[], split={}): """Convert buffer as pandas dataframe. Timestamped values must be stored as dictionary where each keys will be related to a column.""" df = pandas.DataFrame.from_dict(self.values()) df.drop(exclude, inplace=True, axis=True) for key, columns in split.items(): df[columns] = pandas.DataFrame(df[key].tolist(), index=df.index) df.drop(key, inplace=True, axis=True) df['timestamp'] = self.keys() df.set_index('timestamp', inplace=True) return df def export_as_csv(self, filepath, exclude=[]): """Write buffer content into a csv file.""" try: self.as_dataframe(exclude=exclude).to_csv(filepath, index=True) except: raise RuntimeError(f'Can\' write {filepath}') def plot(self, names=[], colors=[], split={}, samples=None): df = self.as_dataframe(split=split) legend_patches = [] # decimate data if samples != None: if samples < len(df): step = int(len(df) / samples) + 1 df = df.iloc[::step, :] for name, color in zip(names, colors): markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) mpyplot.setp(stemlines, color=color, linewidth=1) mpyplot.setp(baseline, color=color, linewidth=1) legend_patches.append(mpatches.Patch(color=color, label=name.upper())) return legend_patches