1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
#!/usr/bin/env python
import collections
import json
import bisect
import pandas
import matplotlib.pyplot as mpyplot
import matplotlib.patches as mpatches
class TimeStampedBuffer(collections.OrderedDict):
"""Ordered dictionary to handle timestamped data.
```
{
timestamp1: data1,
timestamp2: data2,
...
}
```
"""
def __new__(cls, args = None):
return super(TimeStampedBuffer, cls).__new__(cls)
def __setitem__(self, key: float, value):
"""Force key to be a number"""
if type(key) != int and type(key) != float:
raise KeyError('key must be a number')
super().__setitem__(key, value)
def __str__(self):
return json.dumps(self, default=vars)
def append(self, timestamped_buffer):
"""Append a timestamped buffer."""
for ts, value in timestamped_buffer.items():
self[ts] = value
def pop_first(self):
"""Easing FIFO access mode"""
return self.popitem(last=False)
def pop_first_until(self, ts):
"""Pop all item until a given timestamped value and return the last poped item"""
# get last timestamp before given timestamp
earliest_ts = self.get_last_before(ts)
# when no timestamped have been found
if earliest_ts == None:
raise ValueError
popep_ts, poped_value = self.pop_first()
while popep_ts != earliest_ts:
popep_ts, poped_value = self.pop_first()
return popep_ts, poped_value
def pop_last(self):
"""Easing FIFO access mode"""
return self.popitem(last=True)
def get_last_before(self, ts):
"""Retreive last item timestamp before a given timestamp value."""
ts_list = list(self.keys())
last_before_index = bisect.bisect_left(ts_list, ts) - 1
if last_before_index >= 0:
return ts_list[last_before_index]
else:
return None
def export_as_json(self, filepath):
"""Write buffer content into a json file."""
try:
with open(filepath, 'w', encoding='utf-8') as jsonfile:
json.dump(self, jsonfile, ensure_ascii = False, default=vars)
except:
raise RuntimeError(f'Can\' write {filepath}')
def as_dataframe(self, exclude=[], split={}):
"""Convert buffer as pandas dataframe."""
df = pandas.DataFrame.from_dict(self.values())
df.drop(exclude, inplace=True, axis=True)
for key, columns in split.items():
df[columns] = pandas.DataFrame(df[key].tolist(), index=df.index)
df.drop(key, inplace=True, axis=True)
df['timestamp'] = self.keys()
df.set_index('timestamp', inplace=True)
return df
def export_as_csv(self, filepath, exclude=[]):
"""Write buffer content into a csv file."""
try:
self.as_dataframe(exclude=exclude).to_csv(filepath, index=True)
except:
raise RuntimeError(f'Can\' write {filepath}')
def plot(self, names=[], colors=[], split={}, samples=None):
df = self.as_dataframe(split=split)
legend_patches = []
# decimate data
if samples != None:
if samples < len(df):
step = int(len(df) / samples) + 1
df = df.iloc[::step, :]
for name, color in zip(names, colors):
markerline, stemlines, baseline = mpyplot.stem(df.index, df[name])
mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1)
mpyplot.setp(stemlines, color=color, linewidth=1)
mpyplot.setp(baseline, color=color, linewidth=1)
legend_patches.append(mpatches.Patch(color=color, label=name.upper()))
return legend_patches
|