From f50b0961f2feec0b7a220a451f9e05e080536147 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 6 Apr 2022 16:53:04 +0200 Subject: Adding functions to TimeStampedBuffer class --- src/argaze/DataStructures.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) (limited to 'src') diff --git a/src/argaze/DataStructures.py b/src/argaze/DataStructures.py index 4dab036..5fe7ce6 100644 --- a/src/argaze/DataStructures.py +++ b/src/argaze/DataStructures.py @@ -2,6 +2,7 @@ import collections import json +import bisect class DictObject(): """Convert dictionnary into object""" @@ -63,10 +64,44 @@ class TimeStampedBuffer(collections.OrderedDict): """Easing FIFO access mode""" return self.popitem(last=False) + def pop_first_until(self, ts): + """Pop all item until a given timestamped value and returning the last poped item""" + + popep_ts, poped_value = self.pop_first() + + while popep_ts != ts: + popep_ts, poped_value = self.pop_first() + + return popep_ts, poped_value + def pop_last(self): """Easing FIFO access mode""" return self.popitem(last=True) + def pop_last_until(self, ts): + """Pop all item until a given timestamped value and returning the last poped item""" + + popep_ts, poped_value = self.pop_last() + + while popep_ts != ts: + popep_ts, poped_value = self.pop_last() + + return popep_ts, poped_value + + def get_last_before(self, ts): + """Retreive last item timestamp before a given timestamp value.""" + + ts_list = list(self.keys()) + last_before_index = bisect.bisect_left(ts_list, ts) - 1 + + if last_before_index > 0: + + return ts_list[last_before_index] + + else: + + return None + def export_as_json(self, filepath): """Write buffer content into a json file""" try: -- cgit v1.1