aboutsummaryrefslogtreecommitdiff
path: root/src/argaze.test/DataFeatures.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/argaze.test/DataFeatures.py')
-rw-r--r--src/argaze.test/DataFeatures.py350
1 files changed, 350 insertions, 0 deletions
diff --git a/src/argaze.test/DataFeatures.py b/src/argaze.test/DataFeatures.py
new file mode 100644
index 0000000..b30c560
--- /dev/null
+++ b/src/argaze.test/DataFeatures.py
@@ -0,0 +1,350 @@
+#!/usr/bin/env python
+
+""" """
+
+__author__ = "Théo de la Hogue"
+__credits__ = []
+__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
+__license__ = "BSD"
+
+import unittest
+from dataclasses import dataclass, field
+import os
+
+from argaze import DataFeatures
+
+import pandas
+import numpy
+
+def random_data_buffer(size, data_keys):
+ """ Generate a random TimeStampedBuffer for testing purpose.
+ Timestamps are current time.
+ Values are tuples containing an expected value and a random value.
+ """
+
+ import random
+ import time
+
+ ts_buffer = DataFeatures.TimeStampedBuffer()
+
+ for i in range(0, size):
+
+ # Edit data
+ random_data = {}
+ for key in data_keys:
+ random_data[key] = (i, random.random())
+
+ # Store data
+ ts_buffer[time.time()] = random_data
+
+ time.sleep(0.0001)
+
+ return ts_buffer
+
+@dataclass()
+class BasicDataClass():
+ """Define a basic dataclass for testing purpose."""
+
+ value: tuple
+
+class TestTimeStampedBufferClass(unittest.TestCase):
+ """Test TimeStampedBuffer class."""
+
+ def test_new(self):
+ """Test TimeStampedBuffer creation."""
+
+ # Check TimeStampedBuffer length after creation
+ self.assertEqual(len(DataFeatures.TimeStampedBuffer()), 0)
+ self.assertEqual(len(DataFeatures.TimeStampedBuffer({0: ""})), 1)
+ self.assertEqual(len(DataFeatures.TimeStampedBuffer({0.1: ""})), 1)
+ self.assertEqual(len(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})), 2)
+
+ # Check TimeStampedBuffer keys after creation
+ self.assertEqual(list(DataFeatures.TimeStampedBuffer().keys()), [])
+ self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: ""}).keys()), [0])
+ self.assertEqual(list(DataFeatures.TimeStampedBuffer({0.1: ""}).keys()), [0.1])
+ self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).keys()), [0, 1])
+
+ # Check TimeStampedBuffer items after creation
+ self.assertEqual(list(DataFeatures.TimeStampedBuffer().items()), [])
+ self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: ""}).items()), [(0, "")])
+ self.assertEqual(list(DataFeatures.TimeStampedBuffer({0.1: ""}).items()), [(0.1, "")])
+ self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).items()), [(0, "A"), (1, "B")])
+
+ # Check that TimeStampedBuffer creation fails when keys are not numbers
+ with self.assertRaises(AssertionError):
+
+ DataFeatures.TimeStampedBuffer({"first": ""})
+
+ def test_from_dataframe(self):
+ """Test TimeStampedBuffer creation from pandas dataframe."""
+
+ ts_buffer = random_data_buffer(10, ["data_A", "data_B", "data_C"])
+
+ # Check dataframe conversion
+ ts_buffer_from_df = DataFeatures.TimeStampedBuffer.from_dataframe(ts_buffer.as_dataframe())
+
+ self.assertEqual(len(ts_buffer_from_df), 10)
+
+ def test_from_json(self):
+ """Test TimeStampedBuffer creation from json file."""
+
+ # Edit dataframe csv file path
+ current_directory = os.path.dirname(os.path.abspath(__file__))
+ json_filepath = os.path.join(current_directory, 'utils/ts_buffer.json')
+
+ # Load TimeStampedBuffer from json file
+ ts_buffer = DataFeatures.TimeStampedBuffer.from_json(json_filepath)
+
+ self.assertEqual(len(ts_buffer), 3)
+
+ def test___repr__(self):
+ """Test TimeStampedBuffer string representation."""
+
+ self.assertEqual(repr(DataFeatures.TimeStampedBuffer()), "{}")
+ self.assertEqual(repr(DataFeatures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}")
+ self.assertEqual(repr(DataFeatures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}")
+
+ data = BasicDataClass((123, 456))
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: data})
+
+ self.assertEqual(repr(ts_buffer), "{\"0\": {\"value\": [123, 456]}}")
+
+ array = numpy.zeros(3)
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: array})
+
+ self.assertEqual(repr(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}")
+
+ def test___str__(self):
+ """Test TimeStampedBuffer string representation."""
+
+ self.assertEqual(str(DataFeatures.TimeStampedBuffer()), "{}")
+ self.assertEqual(str(DataFeatures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}")
+ self.assertEqual(str(DataFeatures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}")
+
+ data = BasicDataClass((123, 456))
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: data})
+
+ self.assertEqual(str(ts_buffer), "{\"0\": {\"value\": [123, 456]}}")
+
+ array = numpy.zeros(3)
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: array})
+
+ self.assertEqual(str(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}")
+
+ def test_append(self):
+ """Test TimeStampedBuffer append method."""
+
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})
+ ts_buffer_next = DataFeatures.TimeStampedBuffer({2: "C", 3: "D"})
+
+ self.assertEqual(len(ts_buffer.append(ts_buffer_next)), 4)
+ self.assertEqual(list(ts_buffer.append(ts_buffer_next).keys()), [0, 1, 2, 3])
+
+ def test_first(self):
+ """Test TimeStampedBuffer first property."""
+
+ self.assertEqual(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).first, (0, "A"))
+
+ # Check that accessing to first item of an empty TimeStampedBuffer fails
+ with self.assertRaises(IndexError):
+
+ DataFeatures.TimeStampedBuffer().first
+
+ def test_pop_first(self):
+ """Test TimeStampedBuffer pop_first method."""
+
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})
+
+ self.assertEqual(ts_buffer.pop_first(), (0, "A"))
+ self.assertEqual(len(ts_buffer), 1)
+ self.assertEqual(ts_buffer.first, (1, "B"))
+
+ def test_pop_last_until(self):
+ """Test TimeStampedBuffer pop_last_until method."""
+
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+
+ # Check pop until an existing timestamp
+ pop_last_until_2 = ts_buffer.pop_last_until(2)
+
+ self.assertEqual(pop_last_until_2, (2, "C"))
+ self.assertEqual(len(ts_buffer), 2)
+ self.assertEqual(ts_buffer.first, (2, "C"))
+
+ # Check first until an none existing timestamp
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+
+ pop_last_until_1dot5 = ts_buffer.pop_last_until(1.5)
+
+ self.assertEqual(pop_last_until_1dot5, (1, "B"))
+ self.assertEqual(len(ts_buffer), 3)
+ self.assertEqual(ts_buffer.first, (1, "B"))
+
+ def test_pop_last_before(self):
+ """Test TimeStampedBuffer pop_last_before method."""
+
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+
+ # Check pop until an existing timestamp
+ last_before_2 = ts_buffer.pop_last_before(2)
+
+ self.assertEqual(last_before_2, (1, "B"))
+ self.assertEqual(len(ts_buffer), 2)
+ self.assertEqual(ts_buffer.first, (2, "C"))
+
+ # Check pop until an none existing timestamp
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+
+ first_until_1dot5 = ts_buffer.pop_last_before(1.5)
+
+ self.assertEqual(first_until_1dot5, (1, "B"))
+ self.assertEqual(len(ts_buffer), 2)
+ self.assertEqual(ts_buffer.first, (2, "C"))
+
+ def test_last(self):
+ """Test TimeStampedBuffer last property."""
+
+ self.assertEqual(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).last, (1, "B"))
+
+ # Check that accessing to last item of an empty TimeStampedBuffer fails
+ with self.assertRaises(IndexError):
+
+ DataFeatures.TimeStampedBuffer().last
+
+ def test_pop_last(self):
+ """Test TimeStampedBuffer pop_last method."""
+
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})
+
+ self.assertEqual(ts_buffer.pop_last(), (1, "B"))
+ self.assertEqual(len(ts_buffer), 1)
+ self.assertEqual(ts_buffer.last, (0, "A"))
+
+ def test_get_first_from(self):
+ """Test TimeStampedBuffer get_first_from method."""
+
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+
+ get_first_from_1 = ts_buffer.get_first_from(1)
+
+ self.assertEqual(get_first_from_1, (1, "B"))
+ self.assertEqual(len(ts_buffer), 4)
+
+ get_first_from_1dot5 = ts_buffer.get_first_from(1.5)
+
+ self.assertEqual(get_first_from_1dot5, (2, "C"))
+
+ get_first_from_0 = ts_buffer.get_first_from(0)
+
+ self.assertEqual(get_first_from_0, (0, "A"))
+
+ # Check that accessing to lately timestamp fails
+ with self.assertRaises(KeyError):
+
+ ts_buffer.get_first_from(4)
+
+ def test_get_last_before(self):
+ """Test TimeStampedBuffer get_last_before method."""
+
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+
+ get_last_before_2 = ts_buffer.get_last_before(2)
+
+ self.assertEqual(get_last_before_2, (1, "B"))
+ self.assertEqual(len(ts_buffer), 4)
+
+ get_last_before_1dot5 = ts_buffer.get_last_before(1.5)
+
+ self.assertEqual(get_last_before_1dot5, (1, "B"))
+
+ get_last_before_4 = ts_buffer.get_last_before(4)
+
+ self.assertEqual(get_last_before_4, (3, "D"))
+
+ # Check that accessing to early timestamp fails
+ with self.assertRaises(KeyError):
+
+ ts_buffer.get_last_before(-1)
+
+
+ def test_get_last_until(self):
+ """Test TimeStampedBuffer get_last_until method."""
+
+ ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"})
+
+ get_last_until_2 = ts_buffer.get_last_until(2)
+
+ self.assertEqual(get_last_until_2, (2, "C"))
+ self.assertEqual(len(ts_buffer), 4)
+
+ get_last_until_1dot5 = ts_buffer.get_last_until(1.5)
+
+ self.assertEqual(get_last_until_1dot5, (1, "B"))
+
+ get_last_until_4 = ts_buffer.get_last_until(4)
+
+ self.assertEqual(get_last_until_4, (3, "D"))
+
+ # Check that accessing to early timestamp fails
+ with self.assertRaises(KeyError):
+
+ ts_buffer.get_last_until(-1)
+
+ def test_as_dataframe(self):
+ """Test TimeStampedBuffer as_dataframe method."""
+
+ ts_buffer = random_data_buffer(10, ["data_A", "data_B", "data_C"])
+
+ # Check dataframe conversion
+ ts_buffer_dataframe = ts_buffer.as_dataframe()
+
+ self.assertEqual(ts_buffer_dataframe.index.name, "timestamp")
+ self.assertEqual(ts_buffer_dataframe.index.size, 10)
+
+ self.assertEqual(ts_buffer_dataframe.columns.size, 3)
+ self.assertEqual(ts_buffer_dataframe.columns[0], "data_A")
+ self.assertEqual(ts_buffer_dataframe.columns[1], "data_B")
+ self.assertEqual(ts_buffer_dataframe.columns[2], "data_C")
+
+ self.assertEqual(ts_buffer_dataframe.index.dtype, 'float64')
+ self.assertEqual(ts_buffer_dataframe["data_A"].dtype, 'object')
+ self.assertEqual(ts_buffer_dataframe["data_B"].dtype, 'object')
+ self.assertEqual(ts_buffer_dataframe["data_C"].dtype, 'object')
+
+ # Check data exclusion option
+ ts_buffer_dataframe = ts_buffer.as_dataframe(exclude=["data_B"])
+
+ self.assertEqual(ts_buffer_dataframe.index.name, "timestamp")
+ self.assertEqual(ts_buffer_dataframe.index.size, 10)
+
+ self.assertEqual(ts_buffer_dataframe.columns.size, 2)
+ self.assertEqual(ts_buffer_dataframe.columns[0], "data_A")
+ self.assertEqual(ts_buffer_dataframe.columns[1], "data_C")
+
+ # Check dataframe split option
+ ts_buffer_dataframe = ts_buffer.as_dataframe(split={"data_B": ["data_B0", "data_B1"]})
+
+ self.assertEqual(ts_buffer_dataframe.index.name, "timestamp")
+ self.assertEqual(ts_buffer_dataframe.index.size, 10)
+
+ self.assertEqual(ts_buffer_dataframe.columns.size, 4)
+ self.assertEqual(ts_buffer_dataframe.columns[0], "data_A")
+ self.assertEqual(ts_buffer_dataframe.columns[1], "data_B0")
+ self.assertEqual(ts_buffer_dataframe.columns[2], "data_B1")
+ self.assertEqual(ts_buffer_dataframe.columns[3], "data_C")
+
+ # Check dataframe conversion with dataclass
+ data = BasicDataClass((123, 456))
+ ts_buffer_dataframe = DataFeatures.TimeStampedBuffer({0: data}).as_dataframe()
+
+ self.assertEqual(ts_buffer_dataframe.index.name, "timestamp")
+ self.assertEqual(ts_buffer_dataframe.index.size, 1)
+
+ self.assertEqual(ts_buffer_dataframe.columns.size, 1)
+ self.assertEqual(ts_buffer_dataframe.columns[0], "value")
+
+
+if __name__ == '__main__':
+
+ unittest.main() \ No newline at end of file