#!/usr/bin/env python from typing import TypeVar, Tuple import collections import json import ast import bisect import pandas import numpy import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches TimeStampType = TypeVar('TimeStamp', int, float) """Type definition for timestamp as integer or float values.""" DataType = TypeVar('Data') """Type definition for data to store anything in time.""" TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") # Type definition for type annotation convenience class JsonEncoder(json.JSONEncoder): """Specific ArGaze JSON Encoder.""" def default(self, obj): """default implementation to serialize object.""" # numpy cases if isinstance(obj, numpy.integer): return int(obj) if isinstance(obj, numpy.floating): return float(obj) if isinstance(obj, numpy.ndarray): return obj.tolist() # default case try: return json.JSONEncoder.default(self, obj) # class case except: # ignore attribute starting with _ public_dict = {} for k, v in vars(obj).items(): if not k.startswith('_'): public_dict[k] = v return public_dict class TimeStampedBuffer(collections.OrderedDict): """Ordered dictionary to handle timestamped data. ``` { timestamp1: data1, timestamp2: data2, ... } ``` .. warning:: Timestamps must be numbers. .. warning:: Timestamps are not sorted internally. Data are considered to be stored according at their coming time. """ def __new__(cls, args = None): """Inheritance""" return super(TimeStampedBuffer, cls).__new__(cls) def __setitem__(self, ts: TimeStampType, data: DataType): """Store data at given timestamp.""" assert(type(ts) == int or type(ts) == float) super().__setitem__(ts, data) def __repr__(self): """String representation""" return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) def __str__(self): """String representation""" return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) def append(self, timestamped_buffer: TimeStampedBufferType) -> TimeStampedBufferType: """Append a timestamped buffer.""" for ts, value in timestamped_buffer.items(): self[ts] = value return self @property def first(self) -> Tuple[TimeStampType, DataType]: """Easing access to first item.""" return list(self.items())[0] def pop_first(self) -> Tuple[TimeStampType, DataType]: """Easing FIFO access mode.""" return self.popitem(last=False) def pop_last_until(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp earliest_ts, earliest_value = self.get_last_until(ts) first_ts, first_value = self.first while first_ts < earliest_ts: self.pop_first() first_ts, first_value = self.first return first_ts, first_value def pop_last_before(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp earliest_ts, earliest_value = self.get_last_before(ts) popep_ts, poped_value = self.pop_first() while popep_ts != earliest_ts: popep_ts, poped_value = self.pop_first() return popep_ts, poped_value @property def last(self) -> Tuple[TimeStampType, DataType]: """Easing access to last item.""" return list(self.items())[-1] def pop_last(self) -> Tuple[TimeStampType, DataType]: """Easing FIFO access mode.""" return self.popitem(last=True) def get_first_from(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive first item timestamp from a given timestamp value.""" ts_list = list(self.keys()) first_from_index = bisect.bisect_left(ts_list, ts) if first_from_index < len(self): first_from_ts = ts_list[first_from_index] return first_from_ts, self[first_from_ts] else: raise KeyError(f'No data stored after {ts} timestamp.') def get_last_before(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive last item timestamp before a given timestamp value.""" ts_list = list(self.keys()) last_before_index = bisect.bisect_left(ts_list, ts) - 1 if last_before_index >= 0: last_before_ts = ts_list[last_before_index] return last_before_ts, self[last_before_ts] else: raise KeyError(f'No data stored before {ts} timestamp.') def get_last_until(self, ts) -> Tuple[TimeStampType, DataType]: """Retreive last item timestamp until a given timestamp value.""" ts_list = list(self.keys()) last_until_index = bisect.bisect_right(ts_list, ts) - 1 if last_until_index >= 0: last_until_ts = ts_list[last_until_index] return last_until_ts, self[last_until_ts] else: raise KeyError(f'No data stored until {ts} timestamp.') @classmethod def from_json(self, json_filepath: str) -> TimeStampedBufferType: """Create a TimeStampedBuffer from .json file.""" with open(json_filepath, encoding='utf-8') as ts_buffer_file: json_buffer = json.load(ts_buffer_file) return TimeStampedBuffer({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) def to_json(self, json_filepath: str): """Save a TimeStampedBuffer to .json file.""" with open(json_filepath, 'w', encoding='utf-8') as ts_buffer_file: json.dump(self, ts_buffer_file, ensure_ascii=False, cls=JsonEncoder) @classmethod def from_dataframe(self, dataframe: pandas.DataFrame, exclude=[]) -> TimeStampedBufferType: """Create a TimeStampedBuffer from [pandas dataframe](https://pandas.pydata.org/docs/reference/frame.html).""" dataframe.drop(exclude, inplace=True, axis=True) assert(dataframe.index.name == 'timestamp') return TimeStampedBuffer(dataframe.to_dict('index')) def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: """Convert as [pandas dataframe](https://pandas.pydata.org/docs/reference/frame.html). The optional *split* argument allows tuple values to be stored in dedicated columns. For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} .. warning:: Values must be dictionaries. Each key is stored as a column name. .. note:: Timestamps are stored as index column called 'timestamp'. """ df = pandas.DataFrame.from_dict(self.values()) # Exclude columns df.drop(exclude, inplace=True, axis=True) # Split columns if len(split) > 0: splited_columns = [] for column in df.columns: if column in split.keys(): df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) df.drop(column, inplace=True, axis=True) for new_column in split[column]: splited_columns.append(new_column) else: splited_columns.append(column) # Reorder splited columns df = df[splited_columns] # Append timestamps as index column df['timestamp'] = self.keys() df.set_index('timestamp', inplace=True) return df def plot(self, names=[], colors=[], split={}, samples=None) -> list: """Plot as [matplotlib](https://matplotlib.org/) time chart.""" df = self.as_dataframe(split=split) legend_patches = [] # decimate data if samples != None: if samples < len(df): step = int(len(df) / samples) + 1 df = df.iloc[::step, :] for name, color in zip(names, colors): markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) mpyplot.setp(stemlines, color=color, linewidth=1) mpyplot.setp(baseline, color=color, linewidth=1) legend_patches.append(mpatches.Patch(color=color, label=name.upper())) return legend_patches