From 8f3b769874159b449d197d86476aaac9d2738000 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 18 Jan 2024 10:15:52 +0100 Subject: Renaming DataStructures.py into DataFeatures.py. --- src/argaze.test/DataStructures.py | 86 ++--- src/argaze/ArFeatures.py | 16 +- src/argaze/ArUcoMarkers/ArUcoCamera.py | 4 +- src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py | 4 +- src/argaze/ArUcoMarkers/ArUcoScene.py | 4 +- src/argaze/AreaOfInterest/AOI2DScene.py | 2 +- src/argaze/AreaOfInterest/AOI3DScene.py | 2 +- src/argaze/AreaOfInterest/AOIFeatures.py | 4 +- src/argaze/DataFeatures.py | 420 ++++++++++++++++++++++++ src/argaze/DataStructures.py | 420 ------------------------ src/argaze/GazeFeatures.py | 10 +- src/argaze/PupillFeatures.py | 6 +- src/argaze/__init__.py | 2 +- 13 files changed, 490 insertions(+), 490 deletions(-) create mode 100644 src/argaze/DataFeatures.py delete mode 100644 src/argaze/DataStructures.py (limited to 'src') diff --git a/src/argaze.test/DataStructures.py b/src/argaze.test/DataStructures.py index 8533af3..b30c560 100644 --- a/src/argaze.test/DataStructures.py +++ b/src/argaze.test/DataStructures.py @@ -11,7 +11,7 @@ import unittest from dataclasses import dataclass, field import os -from argaze import DataStructures +from argaze import DataFeatures import pandas import numpy @@ -25,7 +25,7 @@ def random_data_buffer(size, data_keys): import random import time - ts_buffer = DataStructures.TimeStampedBuffer() + ts_buffer = DataFeatures.TimeStampedBuffer() for i in range(0, size): @@ -54,27 +54,27 @@ class TestTimeStampedBufferClass(unittest.TestCase): """Test TimeStampedBuffer creation.""" # Check TimeStampedBuffer length after creation - self.assertEqual(len(DataStructures.TimeStampedBuffer()), 0) - self.assertEqual(len(DataStructures.TimeStampedBuffer({0: ""})), 1) - self.assertEqual(len(DataStructures.TimeStampedBuffer({0.1: ""})), 1) - self.assertEqual(len(DataStructures.TimeStampedBuffer({0: "A", 1: "B"})), 2) + self.assertEqual(len(DataFeatures.TimeStampedBuffer()), 0) + self.assertEqual(len(DataFeatures.TimeStampedBuffer({0: ""})), 1) + self.assertEqual(len(DataFeatures.TimeStampedBuffer({0.1: ""})), 1) + self.assertEqual(len(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})), 2) # Check TimeStampedBuffer keys after creation - self.assertEqual(list(DataStructures.TimeStampedBuffer().keys()), []) - self.assertEqual(list(DataStructures.TimeStampedBuffer({0: ""}).keys()), [0]) - self.assertEqual(list(DataStructures.TimeStampedBuffer({0.1: ""}).keys()), [0.1]) - self.assertEqual(list(DataStructures.TimeStampedBuffer({0: "A", 1: "B"}).keys()), [0, 1]) + self.assertEqual(list(DataFeatures.TimeStampedBuffer().keys()), []) + self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: ""}).keys()), [0]) + self.assertEqual(list(DataFeatures.TimeStampedBuffer({0.1: ""}).keys()), [0.1]) + self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).keys()), [0, 1]) # Check TimeStampedBuffer items after creation - self.assertEqual(list(DataStructures.TimeStampedBuffer().items()), []) - self.assertEqual(list(DataStructures.TimeStampedBuffer({0: ""}).items()), [(0, "")]) - self.assertEqual(list(DataStructures.TimeStampedBuffer({0.1: ""}).items()), [(0.1, "")]) - self.assertEqual(list(DataStructures.TimeStampedBuffer({0: "A", 1: "B"}).items()), [(0, "A"), (1, "B")]) + self.assertEqual(list(DataFeatures.TimeStampedBuffer().items()), []) + self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: ""}).items()), [(0, "")]) + self.assertEqual(list(DataFeatures.TimeStampedBuffer({0.1: ""}).items()), [(0.1, "")]) + self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).items()), [(0, "A"), (1, "B")]) # Check that TimeStampedBuffer creation fails when keys are not numbers with self.assertRaises(AssertionError): - DataStructures.TimeStampedBuffer({"first": ""}) + DataFeatures.TimeStampedBuffer({"first": ""}) def test_from_dataframe(self): """Test TimeStampedBuffer creation from pandas dataframe.""" @@ -82,7 +82,7 @@ class TestTimeStampedBufferClass(unittest.TestCase): ts_buffer = random_data_buffer(10, ["data_A", "data_B", "data_C"]) # Check dataframe conversion - ts_buffer_from_df = DataStructures.TimeStampedBuffer.from_dataframe(ts_buffer.as_dataframe()) + ts_buffer_from_df = DataFeatures.TimeStampedBuffer.from_dataframe(ts_buffer.as_dataframe()) self.assertEqual(len(ts_buffer_from_df), 10) @@ -94,49 +94,49 @@ class TestTimeStampedBufferClass(unittest.TestCase): json_filepath = os.path.join(current_directory, 'utils/ts_buffer.json') # Load TimeStampedBuffer from json file - ts_buffer = DataStructures.TimeStampedBuffer.from_json(json_filepath) + ts_buffer = DataFeatures.TimeStampedBuffer.from_json(json_filepath) self.assertEqual(len(ts_buffer), 3) def test___repr__(self): """Test TimeStampedBuffer string representation.""" - self.assertEqual(repr(DataStructures.TimeStampedBuffer()), "{}") - self.assertEqual(repr(DataStructures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}") - self.assertEqual(repr(DataStructures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}") + self.assertEqual(repr(DataFeatures.TimeStampedBuffer()), "{}") + self.assertEqual(repr(DataFeatures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}") + self.assertEqual(repr(DataFeatures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}") data = BasicDataClass((123, 456)) - ts_buffer = DataStructures.TimeStampedBuffer({0: data}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: data}) self.assertEqual(repr(ts_buffer), "{\"0\": {\"value\": [123, 456]}}") array = numpy.zeros(3) - ts_buffer = DataStructures.TimeStampedBuffer({0: array}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: array}) self.assertEqual(repr(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}") def test___str__(self): """Test TimeStampedBuffer string representation.""" - self.assertEqual(str(DataStructures.TimeStampedBuffer()), "{}") - self.assertEqual(str(DataStructures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}") - self.assertEqual(str(DataStructures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}") + self.assertEqual(str(DataFeatures.TimeStampedBuffer()), "{}") + self.assertEqual(str(DataFeatures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}") + self.assertEqual(str(DataFeatures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}") data = BasicDataClass((123, 456)) - ts_buffer = DataStructures.TimeStampedBuffer({0: data}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: data}) self.assertEqual(str(ts_buffer), "{\"0\": {\"value\": [123, 456]}}") array = numpy.zeros(3) - ts_buffer = DataStructures.TimeStampedBuffer({0: array}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: array}) self.assertEqual(str(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}") def test_append(self): """Test TimeStampedBuffer append method.""" - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B"}) - ts_buffer_next = DataStructures.TimeStampedBuffer({2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}) + ts_buffer_next = DataFeatures.TimeStampedBuffer({2: "C", 3: "D"}) self.assertEqual(len(ts_buffer.append(ts_buffer_next)), 4) self.assertEqual(list(ts_buffer.append(ts_buffer_next).keys()), [0, 1, 2, 3]) @@ -144,17 +144,17 @@ class TestTimeStampedBufferClass(unittest.TestCase): def test_first(self): """Test TimeStampedBuffer first property.""" - self.assertEqual(DataStructures.TimeStampedBuffer({0: "A", 1: "B"}).first, (0, "A")) + self.assertEqual(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).first, (0, "A")) # Check that accessing to first item of an empty TimeStampedBuffer fails with self.assertRaises(IndexError): - DataStructures.TimeStampedBuffer().first + DataFeatures.TimeStampedBuffer().first def test_pop_first(self): """Test TimeStampedBuffer pop_first method.""" - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}) self.assertEqual(ts_buffer.pop_first(), (0, "A")) self.assertEqual(len(ts_buffer), 1) @@ -163,7 +163,7 @@ class TestTimeStampedBufferClass(unittest.TestCase): def test_pop_last_until(self): """Test TimeStampedBuffer pop_last_until method.""" - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) # Check pop until an existing timestamp pop_last_until_2 = ts_buffer.pop_last_until(2) @@ -173,7 +173,7 @@ class TestTimeStampedBufferClass(unittest.TestCase): self.assertEqual(ts_buffer.first, (2, "C")) # Check first until an none existing timestamp - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) pop_last_until_1dot5 = ts_buffer.pop_last_until(1.5) @@ -184,7 +184,7 @@ class TestTimeStampedBufferClass(unittest.TestCase): def test_pop_last_before(self): """Test TimeStampedBuffer pop_last_before method.""" - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) # Check pop until an existing timestamp last_before_2 = ts_buffer.pop_last_before(2) @@ -194,7 +194,7 @@ class TestTimeStampedBufferClass(unittest.TestCase): self.assertEqual(ts_buffer.first, (2, "C")) # Check pop until an none existing timestamp - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) first_until_1dot5 = ts_buffer.pop_last_before(1.5) @@ -205,17 +205,17 @@ class TestTimeStampedBufferClass(unittest.TestCase): def test_last(self): """Test TimeStampedBuffer last property.""" - self.assertEqual(DataStructures.TimeStampedBuffer({0: "A", 1: "B"}).last, (1, "B")) + self.assertEqual(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).last, (1, "B")) # Check that accessing to last item of an empty TimeStampedBuffer fails with self.assertRaises(IndexError): - DataStructures.TimeStampedBuffer().last + DataFeatures.TimeStampedBuffer().last def test_pop_last(self): """Test TimeStampedBuffer pop_last method.""" - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}) self.assertEqual(ts_buffer.pop_last(), (1, "B")) self.assertEqual(len(ts_buffer), 1) @@ -224,7 +224,7 @@ class TestTimeStampedBufferClass(unittest.TestCase): def test_get_first_from(self): """Test TimeStampedBuffer get_first_from method.""" - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) get_first_from_1 = ts_buffer.get_first_from(1) @@ -247,7 +247,7 @@ class TestTimeStampedBufferClass(unittest.TestCase): def test_get_last_before(self): """Test TimeStampedBuffer get_last_before method.""" - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) get_last_before_2 = ts_buffer.get_last_before(2) @@ -271,7 +271,7 @@ class TestTimeStampedBufferClass(unittest.TestCase): def test_get_last_until(self): """Test TimeStampedBuffer get_last_until method.""" - ts_buffer = DataStructures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) get_last_until_2 = ts_buffer.get_last_until(2) @@ -336,7 +336,7 @@ class TestTimeStampedBufferClass(unittest.TestCase): # Check dataframe conversion with dataclass data = BasicDataClass((123, 456)) - ts_buffer_dataframe = DataStructures.TimeStampedBuffer({0: data}).as_dataframe() + ts_buffer_dataframe = DataFeatures.TimeStampedBuffer({0: data}).as_dataframe() self.assertEqual(ts_buffer_dataframe.index.name, "timestamp") self.assertEqual(ts_buffer_dataframe.index.size, 1) diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 7f1d2d6..7263b94 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -16,7 +16,7 @@ from inspect import getmembers import threading import time -from argaze import DataStructures, GazeFeatures +from argaze import DataFeatures, GazeFeatures from argaze.AreaOfInterest import * from argaze.GazeAnalysis import * @@ -94,12 +94,12 @@ DEFAULT_ARLAYER_DRAW_PARAMETERS = { } @dataclass -class ArLayer(DataStructures.SharedObject): +class ArLayer(DataFeatures.SharedObject): """ Defines a space where to make matching of gaze movements and AOI and inside which those matchings need to be analyzed. !!! note - Inherits from DataStructures.SharedObject class to be shared by multiple threads + Inherits from DataFeatures.SharedObject class to be shared by multiple threads Parameters: name: name of the layer @@ -147,7 +147,7 @@ class ArLayer(DataStructures.SharedObject): # Create timestamped buffers to log each aoi scan path analysis for aoi_scan_path_analyzer_module_path in self.aoi_scan_path_analyzers.keys(): - self.__ts_logs[aoi_scan_path_analyzer_module_path] = DataStructures.TimeStampedBuffer() + self.__ts_logs[aoi_scan_path_analyzer_module_path] = DataFeatures.TimeStampedBuffer() @classmethod def from_dict(self, layer_data: dict, working_directory: str = None) -> ArLayerType: @@ -520,12 +520,12 @@ DEFAULT_ARFRAME_IMAGE_PARAMETERS = { } @dataclass -class ArFrame(DataStructures.SharedObject): +class ArFrame(DataFeatures.SharedObject): """ Defines a rectangular area where to project in timestamped gaze positions and inside which they need to be analyzed. !!! note - Inherits from DataStructures.SharedObject class to be shared by multiple threads + Inherits from DataFeatures.SharedObject class to be shared by multiple threads Parameters: name: name of the frame @@ -579,7 +579,7 @@ class ArFrame(DataStructures.SharedObject): # Create timestamped buffers to log each aoi scan path analysis for scan_path_analyzer_module_path in self.scan_path_analyzers.keys(): - self.__ts_logs[scan_path_analyzer_module_path] = DataStructures.TimeStampedBuffer() + self.__ts_logs[scan_path_analyzer_module_path] = DataFeatures.TimeStampedBuffer() @classmethod def from_dict(self, frame_data: dict, working_directory: str = None) -> ArFrameType: @@ -1546,4 +1546,4 @@ class ArCamera(ArFrame): with open(json_filepath, 'w', encoding='utf-8') as file: - json.dump(self, file, ensure_ascii=False, indent=4, cls=DataStructures.JsonEncoder) + json.dump(self, file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index 6afce03..3ef572b 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -13,7 +13,7 @@ import json import os import time -from argaze import ArFeatures, DataStructures +from argaze import ArFeatures, DataFeatures from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoDetector, ArUcoOpticCalibrator, ArUcoScene from argaze.AreaOfInterest import AOI2DScene @@ -121,7 +121,7 @@ class ArUcoCamera(ArFeatures.ArCamera): aruco_camera_data['image_parameters']['draw_layers'][layer_name] = ArFeatures.DEFAULT_ARLAYER_DRAW_PARAMETERS # Get values of temporary ar frame created from aruco_camera_data - temp_ar_frame_values = DataStructures.as_dict(ArFeatures.ArFrame.from_dict(aruco_camera_data, working_directory)) + temp_ar_frame_values = DataFeatures.as_dict(ArFeatures.ArFrame.from_dict(aruco_camera_data, working_directory)) # Create new aruco camera using temporary ar frame values return ArUcoCamera(aruco_detector=new_aruco_detector, scenes=new_scenes, **temp_ar_frame_values) diff --git a/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py b/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py index f206c11..cdf2c5c 100644 --- a/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py +++ b/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py @@ -9,7 +9,7 @@ __license__ = "BSD" from dataclasses import dataclass, field -from argaze import DataStructures +from argaze import DataFeatures import json import numpy @@ -58,7 +58,7 @@ class OpticParameters(): with open(json_filepath, 'w', encoding='utf-8') as calibration_file: - json.dump(self, calibration_file, ensure_ascii=False, indent=4, cls=DataStructures.JsonEncoder) + json.dump(self, calibration_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) def __str__(self) -> str: """String display""" diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py index 51dd88c..b60b59d 100644 --- a/src/argaze/ArUcoMarkers/ArUcoScene.py +++ b/src/argaze/ArUcoMarkers/ArUcoScene.py @@ -12,7 +12,7 @@ from dataclasses import dataclass, field import json import os -from argaze import ArFeatures, DataStructures +from argaze import ArFeatures, DataFeatures from argaze.ArUcoMarkers import ArUcoMarkersGroup from argaze.AreaOfInterest import AOI2DScene @@ -91,7 +91,7 @@ class ArUcoScene(ArFeatures.ArScene): new_aruco_markers_group = None # Get values of temporary ar scene created from aruco_scene_data - temp_ar_scene_values = DataStructures.as_dict(ArFeatures.ArScene.from_dict(aruco_scene_data, working_directory)) + temp_ar_scene_values = DataFeatures.as_dict(ArFeatures.ArScene.from_dict(aruco_scene_data, working_directory)) # Create new aruco scene using temporary ar scene values return ArUcoScene(aruco_markers_group=new_aruco_markers_group, **temp_ar_scene_values) diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py index 062044f..91d9d56 100644 --- a/src/argaze/AreaOfInterest/AOI2DScene.py +++ b/src/argaze/AreaOfInterest/AOI2DScene.py @@ -9,7 +9,7 @@ __license__ = "BSD" from typing import TypeVar, Tuple -from argaze import DataStructures +from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures, AOI3DScene from argaze import GazeFeatures diff --git a/src/argaze/AreaOfInterest/AOI3DScene.py b/src/argaze/AreaOfInterest/AOI3DScene.py index 33a815c..482437c 100644 --- a/src/argaze/AreaOfInterest/AOI3DScene.py +++ b/src/argaze/AreaOfInterest/AOI3DScene.py @@ -11,7 +11,7 @@ from typing import TypeVar, Tuple import math import re -from argaze import DataStructures +from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures, AOI2DScene import numpy diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index 77a92fd..9f9f4ad 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -13,7 +13,7 @@ import json import os import math -from argaze import DataStructures +from argaze import DataFeatures import cv2 import matplotlib.path as mpath @@ -540,7 +540,7 @@ class AOIScene(): return scene_copy -class TimeStampedAOIScenes(DataStructures.TimeStampedBuffer): +class TimeStampedAOIScenes(DataFeatures.TimeStampedBuffer): """Define timestamped buffer to store AOI scenes in time.""" def __setitem__(self, ts, scene): diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py new file mode 100644 index 0000000..6e058e8 --- /dev/null +++ b/src/argaze/DataFeatures.py @@ -0,0 +1,420 @@ +#!/usr/bin/env python + +"""Timestamped data features.""" + +__author__ = "Théo de la Hogue" +__credits__ = [] +__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" +__license__ = "BSD" + +from typing import TypeVar, Tuple +from inspect import getmembers +import collections +import json +import ast +import bisect +import threading +import math + +import pandas +import numpy +import matplotlib.pyplot as mpyplot +import matplotlib.patches as mpatches + +TimeStampType = TypeVar('TimeStamp', int, float) +"""Type definition for timestamp as integer or float values.""" + +DataType = TypeVar('Data') +"""Type definition for data to store anything in time.""" + +TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") +# Type definition for type annotation convenience + +def as_dict(dataclass_object) -> dict: + """ + Get dataclass object fields's values as a dictionary. + + Returns: + values: dictionary of dataclass fields's values + """ + + # Get data class fields names + fields_names = [] + for member_name, member_value in getmembers(dataclass_object): + if member_name == '__dataclass_fields__': + fields_names = member_value.keys() + + # Copy fields values + return {name: vars(dataclass_object)[name] for name in fields_names} + +def module_path(obj) -> str: + """ + Get object module path. + + Returns: + module path + """ + return obj.__class__.__module__ + +class JsonEncoder(json.JSONEncoder): + """Specific ArGaze JSON Encoder.""" + + def default(self, obj): + """default implementation to serialize object.""" + + # numpy cases + if isinstance(obj, numpy.integer): + return int(obj) + + elif isinstance(obj, numpy.floating): + return float(obj) + + elif isinstance(obj, numpy.ndarray): + return obj.tolist() + + # default case + try: + + return json.JSONEncoder.default(self, obj) + + # class case + except: + + # ignore attribute starting with _ + public_dict = {} + + for k, v in vars(obj).items(): + + if not k.startswith('_'): + + # numpy cases + if isinstance(v, numpy.integer): + v = int(v) + + elif isinstance(v, numpy.floating): + v = float(v) + + elif isinstance(v, numpy.ndarray): + v = v.tolist() + + public_dict[k] = v + + return public_dict + +class SharedObject(): + """Enable multiple threads sharing.""" + + def __init__(self): + self._lock = threading.Lock() + self._timestamp = math.nan + self._token = None + + def acquire(self): + self._lock.acquire() + + def release(self): + self._lock.release() + + def locked(self) -> bool: + return self._lock.locked() + + @property + def timestamp(self) -> int|float: + """Get timestamp""" + + self._lock.acquire() + timestamp = self._timestamp + self._lock.release() + + return timestamp + + @timestamp.setter + def timestamp(self, timestamp: int|float): + """Set timestamp""" + + self._lock.acquire() + self._timestamp = timestamp + self._lock.release() + + def untimestamp(self): + """Reset timestamp""" + + self._lock.acquire() + self._timestamp = math.nan + self._lock.release() + + @property + def timestamped(self) -> bool: + """Is the object timestamped?""" + + self._lock.acquire() + timestamped = not math.isnan(self._timestamp) + self._lock.release() + + return timestamped + + @property + def token(self) -> any: + """Get token""" + + self._lock.acquire() + token = self._token + self._lock.release() + + return token + + @token.setter + def token(self, token: any): + """Set token""" + + self._lock.acquire() + self._token = token + self._lock.release() + +class TimeStampedBuffer(collections.OrderedDict): + """Ordered dictionary to handle timestamped data. + ``` + { + timestamp1: data1, + timestamp2: data2, + ... + } + ``` + + !!! warning + + Timestamps must be numbers. + + !!! warning "Timestamps are not sorted internally" + + Data are considered to be stored according at their coming time. + """ + + def __new__(cls, args = None): + """Inheritance""" + + return super(TimeStampedBuffer, cls).__new__(cls) + + def __setitem__(self, ts: TimeStampType, data: DataType): + """Store data at given timestamp.""" + + assert(type(ts) == int or type(ts) == float) + + super().__setitem__(ts, data) + + def __repr__(self): + """String representation""" + + return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) + + def __str__(self): + """String representation""" + + return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) + + def append(self, timestamped_buffer: TimeStampedBufferType) -> TimeStampedBufferType: + """Append a timestamped buffer.""" + + for ts, value in timestamped_buffer.items(): + self[ts] = value + + return self + + @property + def first(self) -> Tuple[TimeStampType, DataType]: + """Easing access to first item.""" + + return list(self.items())[0] + + def pop_first(self) -> Tuple[TimeStampType, DataType]: + """Easing FIFO access mode.""" + + return self.popitem(last=False) + + def pop_last_until(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: + """Pop all item until a given timestamped value and return the first after.""" + + # get last item before given timestamp + earliest_ts, earliest_value = self.get_last_until(ts) + + first_ts, first_value = self.first + + while first_ts < earliest_ts: + self.pop_first() + first_ts, first_value = self.first + + return first_ts, first_value + + def pop_last_before(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: + """Pop all item before a given timestamped value and return the last one.""" + + # get last item before given timestamp + earliest_ts, earliest_value = self.get_last_before(ts) + + popep_ts, poped_value = self.pop_first() + + while popep_ts != earliest_ts: + popep_ts, poped_value = self.pop_first() + + return popep_ts, poped_value + + @property + def last(self) -> Tuple[TimeStampType, DataType]: + """Easing access to last item.""" + + return list(self.items())[-1] + + def pop_last(self) -> Tuple[TimeStampType, DataType]: + """Easing FIFO access mode.""" + + return self.popitem(last=True) + + def get_first_from(self, ts) -> Tuple[TimeStampType, DataType]: + """Retreive first item timestamp from a given timestamp value.""" + + ts_list = list(self.keys()) + first_from_index = bisect.bisect_left(ts_list, ts) + + if first_from_index < len(self): + + first_from_ts = ts_list[first_from_index] + + return first_from_ts, self[first_from_ts] + + else: + + raise KeyError(f'No data stored after {ts} timestamp.') + + def get_last_before(self, ts) -> Tuple[TimeStampType, DataType]: + """Retreive last item timestamp before a given timestamp value.""" + + ts_list = list(self.keys()) + last_before_index = bisect.bisect_left(ts_list, ts) - 1 + + if last_before_index >= 0: + + last_before_ts = ts_list[last_before_index] + + return last_before_ts, self[last_before_ts] + + else: + + raise KeyError(f'No data stored before {ts} timestamp.') + + + def get_last_until(self, ts) -> Tuple[TimeStampType, DataType]: + """Retreive last item timestamp until a given timestamp value.""" + + ts_list = list(self.keys()) + last_until_index = bisect.bisect_right(ts_list, ts) - 1 + + if last_until_index >= 0: + + last_until_ts = ts_list[last_until_index] + + return last_until_ts, self[last_until_ts] + + else: + + raise KeyError(f'No data stored until {ts} timestamp.') + + @classmethod + def from_json(self, json_filepath: str) -> TimeStampedBufferType: + """Create a TimeStampedBuffer from .json file.""" + + with open(json_filepath, encoding='utf-8') as ts_buffer_file: + + json_buffer = json.load(ts_buffer_file) + + return TimeStampedBuffer({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) + + def to_json(self, json_filepath: str): + """Save a TimeStampedBuffer to .json file.""" + + with open(json_filepath, 'w', encoding='utf-8') as ts_buffer_file: + + json.dump(self, ts_buffer_file, ensure_ascii=False, cls=JsonEncoder) + + @classmethod + def from_dataframe(self, dataframe: pandas.DataFrame, exclude=[]) -> TimeStampedBufferType: + """Create a TimeStampedBuffer from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" + + dataframe.drop(exclude, inplace=True, axis=True) + + assert(dataframe.index.name == 'timestamp') + + return TimeStampedBuffer(dataframe.to_dict('index')) + + def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: + """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). + + The optional *split* argument allows tuple values to be stored in dedicated columns. + For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} + + !!! warning "Values must be dictionaries" + + Each key is stored as a column name. + + !!! note + + Timestamps are stored as index column called 'timestamp'. + """ + + df = pandas.DataFrame.from_dict(self.values()) + + # Exclude columns + df.drop(exclude, inplace=True, axis=True) + + # Split columns + if len(split) > 0: + + splited_columns = [] + + for column in df.columns: + + if column in split.keys(): + + df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) + df.drop(column, inplace=True, axis=True) + + for new_column in split[column]: + + splited_columns.append(new_column) + + else: + + splited_columns.append(column) + + # Reorder splited columns + df = df[splited_columns] + + # Append timestamps as index column + df['timestamp'] = self.keys() + df.set_index('timestamp', inplace=True) + + return df + + def plot(self, names=[], colors=[], split={}, samples=None) -> list: + """Plot as [matplotlib](https://matplotlib.org/) time chart.""" + + df = self.as_dataframe(split=split) + legend_patches = [] + + # decimate data + if samples != None: + + if samples < len(df): + + step = int(len(df) / samples) + 1 + df = df.iloc[::step, :] + + for name, color in zip(names, colors): + + markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) + mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) + mpyplot.setp(stemlines, color=color, linewidth=1) + mpyplot.setp(baseline, color=color, linewidth=1) + + legend_patches.append(mpatches.Patch(color=color, label=name.upper())) + + return legend_patches diff --git a/src/argaze/DataStructures.py b/src/argaze/DataStructures.py deleted file mode 100644 index 6e058e8..0000000 --- a/src/argaze/DataStructures.py +++ /dev/null @@ -1,420 +0,0 @@ -#!/usr/bin/env python - -"""Timestamped data features.""" - -__author__ = "Théo de la Hogue" -__credits__ = [] -__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" -__license__ = "BSD" - -from typing import TypeVar, Tuple -from inspect import getmembers -import collections -import json -import ast -import bisect -import threading -import math - -import pandas -import numpy -import matplotlib.pyplot as mpyplot -import matplotlib.patches as mpatches - -TimeStampType = TypeVar('TimeStamp', int, float) -"""Type definition for timestamp as integer or float values.""" - -DataType = TypeVar('Data') -"""Type definition for data to store anything in time.""" - -TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") -# Type definition for type annotation convenience - -def as_dict(dataclass_object) -> dict: - """ - Get dataclass object fields's values as a dictionary. - - Returns: - values: dictionary of dataclass fields's values - """ - - # Get data class fields names - fields_names = [] - for member_name, member_value in getmembers(dataclass_object): - if member_name == '__dataclass_fields__': - fields_names = member_value.keys() - - # Copy fields values - return {name: vars(dataclass_object)[name] for name in fields_names} - -def module_path(obj) -> str: - """ - Get object module path. - - Returns: - module path - """ - return obj.__class__.__module__ - -class JsonEncoder(json.JSONEncoder): - """Specific ArGaze JSON Encoder.""" - - def default(self, obj): - """default implementation to serialize object.""" - - # numpy cases - if isinstance(obj, numpy.integer): - return int(obj) - - elif isinstance(obj, numpy.floating): - return float(obj) - - elif isinstance(obj, numpy.ndarray): - return obj.tolist() - - # default case - try: - - return json.JSONEncoder.default(self, obj) - - # class case - except: - - # ignore attribute starting with _ - public_dict = {} - - for k, v in vars(obj).items(): - - if not k.startswith('_'): - - # numpy cases - if isinstance(v, numpy.integer): - v = int(v) - - elif isinstance(v, numpy.floating): - v = float(v) - - elif isinstance(v, numpy.ndarray): - v = v.tolist() - - public_dict[k] = v - - return public_dict - -class SharedObject(): - """Enable multiple threads sharing.""" - - def __init__(self): - self._lock = threading.Lock() - self._timestamp = math.nan - self._token = None - - def acquire(self): - self._lock.acquire() - - def release(self): - self._lock.release() - - def locked(self) -> bool: - return self._lock.locked() - - @property - def timestamp(self) -> int|float: - """Get timestamp""" - - self._lock.acquire() - timestamp = self._timestamp - self._lock.release() - - return timestamp - - @timestamp.setter - def timestamp(self, timestamp: int|float): - """Set timestamp""" - - self._lock.acquire() - self._timestamp = timestamp - self._lock.release() - - def untimestamp(self): - """Reset timestamp""" - - self._lock.acquire() - self._timestamp = math.nan - self._lock.release() - - @property - def timestamped(self) -> bool: - """Is the object timestamped?""" - - self._lock.acquire() - timestamped = not math.isnan(self._timestamp) - self._lock.release() - - return timestamped - - @property - def token(self) -> any: - """Get token""" - - self._lock.acquire() - token = self._token - self._lock.release() - - return token - - @token.setter - def token(self, token: any): - """Set token""" - - self._lock.acquire() - self._token = token - self._lock.release() - -class TimeStampedBuffer(collections.OrderedDict): - """Ordered dictionary to handle timestamped data. - ``` - { - timestamp1: data1, - timestamp2: data2, - ... - } - ``` - - !!! warning - - Timestamps must be numbers. - - !!! warning "Timestamps are not sorted internally" - - Data are considered to be stored according at their coming time. - """ - - def __new__(cls, args = None): - """Inheritance""" - - return super(TimeStampedBuffer, cls).__new__(cls) - - def __setitem__(self, ts: TimeStampType, data: DataType): - """Store data at given timestamp.""" - - assert(type(ts) == int or type(ts) == float) - - super().__setitem__(ts, data) - - def __repr__(self): - """String representation""" - - return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) - - def __str__(self): - """String representation""" - - return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) - - def append(self, timestamped_buffer: TimeStampedBufferType) -> TimeStampedBufferType: - """Append a timestamped buffer.""" - - for ts, value in timestamped_buffer.items(): - self[ts] = value - - return self - - @property - def first(self) -> Tuple[TimeStampType, DataType]: - """Easing access to first item.""" - - return list(self.items())[0] - - def pop_first(self) -> Tuple[TimeStampType, DataType]: - """Easing FIFO access mode.""" - - return self.popitem(last=False) - - def pop_last_until(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: - """Pop all item until a given timestamped value and return the first after.""" - - # get last item before given timestamp - earliest_ts, earliest_value = self.get_last_until(ts) - - first_ts, first_value = self.first - - while first_ts < earliest_ts: - self.pop_first() - first_ts, first_value = self.first - - return first_ts, first_value - - def pop_last_before(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: - """Pop all item before a given timestamped value and return the last one.""" - - # get last item before given timestamp - earliest_ts, earliest_value = self.get_last_before(ts) - - popep_ts, poped_value = self.pop_first() - - while popep_ts != earliest_ts: - popep_ts, poped_value = self.pop_first() - - return popep_ts, poped_value - - @property - def last(self) -> Tuple[TimeStampType, DataType]: - """Easing access to last item.""" - - return list(self.items())[-1] - - def pop_last(self) -> Tuple[TimeStampType, DataType]: - """Easing FIFO access mode.""" - - return self.popitem(last=True) - - def get_first_from(self, ts) -> Tuple[TimeStampType, DataType]: - """Retreive first item timestamp from a given timestamp value.""" - - ts_list = list(self.keys()) - first_from_index = bisect.bisect_left(ts_list, ts) - - if first_from_index < len(self): - - first_from_ts = ts_list[first_from_index] - - return first_from_ts, self[first_from_ts] - - else: - - raise KeyError(f'No data stored after {ts} timestamp.') - - def get_last_before(self, ts) -> Tuple[TimeStampType, DataType]: - """Retreive last item timestamp before a given timestamp value.""" - - ts_list = list(self.keys()) - last_before_index = bisect.bisect_left(ts_list, ts) - 1 - - if last_before_index >= 0: - - last_before_ts = ts_list[last_before_index] - - return last_before_ts, self[last_before_ts] - - else: - - raise KeyError(f'No data stored before {ts} timestamp.') - - - def get_last_until(self, ts) -> Tuple[TimeStampType, DataType]: - """Retreive last item timestamp until a given timestamp value.""" - - ts_list = list(self.keys()) - last_until_index = bisect.bisect_right(ts_list, ts) - 1 - - if last_until_index >= 0: - - last_until_ts = ts_list[last_until_index] - - return last_until_ts, self[last_until_ts] - - else: - - raise KeyError(f'No data stored until {ts} timestamp.') - - @classmethod - def from_json(self, json_filepath: str) -> TimeStampedBufferType: - """Create a TimeStampedBuffer from .json file.""" - - with open(json_filepath, encoding='utf-8') as ts_buffer_file: - - json_buffer = json.load(ts_buffer_file) - - return TimeStampedBuffer({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) - - def to_json(self, json_filepath: str): - """Save a TimeStampedBuffer to .json file.""" - - with open(json_filepath, 'w', encoding='utf-8') as ts_buffer_file: - - json.dump(self, ts_buffer_file, ensure_ascii=False, cls=JsonEncoder) - - @classmethod - def from_dataframe(self, dataframe: pandas.DataFrame, exclude=[]) -> TimeStampedBufferType: - """Create a TimeStampedBuffer from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" - - dataframe.drop(exclude, inplace=True, axis=True) - - assert(dataframe.index.name == 'timestamp') - - return TimeStampedBuffer(dataframe.to_dict('index')) - - def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: - """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). - - The optional *split* argument allows tuple values to be stored in dedicated columns. - For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} - - !!! warning "Values must be dictionaries" - - Each key is stored as a column name. - - !!! note - - Timestamps are stored as index column called 'timestamp'. - """ - - df = pandas.DataFrame.from_dict(self.values()) - - # Exclude columns - df.drop(exclude, inplace=True, axis=True) - - # Split columns - if len(split) > 0: - - splited_columns = [] - - for column in df.columns: - - if column in split.keys(): - - df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) - df.drop(column, inplace=True, axis=True) - - for new_column in split[column]: - - splited_columns.append(new_column) - - else: - - splited_columns.append(column) - - # Reorder splited columns - df = df[splited_columns] - - # Append timestamps as index column - df['timestamp'] = self.keys() - df.set_index('timestamp', inplace=True) - - return df - - def plot(self, names=[], colors=[], split={}, samples=None) -> list: - """Plot as [matplotlib](https://matplotlib.org/) time chart.""" - - df = self.as_dataframe(split=split) - legend_patches = [] - - # decimate data - if samples != None: - - if samples < len(df): - - step = int(len(df) / samples) + 1 - df = df.iloc[::step, :] - - for name, color in zip(names, colors): - - markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) - mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) - mpyplot.setp(stemlines, color=color, linewidth=1) - mpyplot.setp(baseline, color=color, linewidth=1) - - legend_patches.append(mpatches.Patch(color=color, label=name.upper())) - - return legend_patches diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 12cccbc..af9f943 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -15,7 +15,7 @@ import json import importlib from inspect import getmembers -from argaze import DataStructures +from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures import numpy @@ -119,7 +119,7 @@ class UnvalidGazePosition(GazePosition): TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeStampedGazePositions") # Type definition for type annotation convenience -class TimeStampedGazePositions(DataStructures.TimeStampedBuffer): +class TimeStampedGazePositions(DataFeatures.TimeStampedBuffer): """Define timestamped buffer to store gaze positions.""" def __setitem__(self, key, value: GazePosition|dict): @@ -255,7 +255,7 @@ class GazePositionCalibrator(): # Open file with open(self.__json_filepath, 'w', encoding='utf-8') as calibration_file: - json.dump({DataStructures.module_path(self):DataStructures.JsonEncoder().default(self)}, calibration_file, ensure_ascii=False, indent=4) + json.dump({DataFeatures.module_path(self):DataFeatures.JsonEncoder().default(self)}, calibration_file, ensure_ascii=False, indent=4) def store(self, timestamp: int|float, observed_gaze_position: GazePosition, expected_gaze_position: GazePosition): """Store observed and expected gaze positions. @@ -466,7 +466,7 @@ def is_saccade(gaze_movement): TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeStampedGazeMovements") # Type definition for type annotation convenience -class TimeStampedGazeMovements(DataStructures.TimeStampedBuffer): +class TimeStampedGazeMovements(DataFeatures.TimeStampedBuffer): """Define timestamped buffer to store gaze movements.""" def __setitem__(self, key, value: GazeMovement): @@ -507,7 +507,7 @@ class GazeStatus(GazePosition): TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus") # Type definition for type annotation convenience -class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer): +class TimeStampedGazeStatus(DataFeatures.TimeStampedBuffer): """Define timestamped buffer to store list of gaze statusa. !!! note diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupillFeatures.py index 05408a3..d751af6 100644 --- a/src/argaze/PupillFeatures.py +++ b/src/argaze/PupillFeatures.py @@ -11,7 +11,7 @@ from typing import TypeVar from dataclasses import dataclass, field import json -from argaze import DataStructures +from argaze import DataFeatures @dataclass(frozen=True) class PupillDiameter(): @@ -43,7 +43,7 @@ class UnvalidPupillDiameter(PupillDiameter): TimeStampedPupillDiametersType = TypeVar('TimeStampedPupillDiameters', bound="TimeStampedPupillDiameters") # Type definition for type annotation convenience -class TimeStampedPupillDiameters(DataStructures.TimeStampedBuffer): +class TimeStampedPupillDiameters(DataFeatures.TimeStampedBuffer): """Define timestamped buffer to store pupill diameters.""" def __setitem__(self, key, value: PupillDiameter|dict): @@ -92,7 +92,7 @@ class PupillDiameterAnalyzer(): assert(type(ts_pupill_diameters) == TimeStampedPupillDiameters) - ts_analyzis = DataStructures.TimeStampedBuffer() + ts_analyzis = DataFeatures.TimeStampedBuffer() # Iterate on pupill diameters for ts, pupill_diameter in ts_pupill_diameters.items(): diff --git a/src/argaze/__init__.py b/src/argaze/__init__.py index 9f79a17..f29c5d3 100644 --- a/src/argaze/__init__.py +++ b/src/argaze/__init__.py @@ -1,4 +1,4 @@ """ ArGaze is divided in submodules dedicated to various specifics features. """ -__all__ = ['ArUcoMarkers','AreaOfInterest','ArFeatures','GazeFeatures','GazeAnalysis','PupillFeatures','PupillAnalysis','DataStructures','utils'] \ No newline at end of file +__all__ = ['ArUcoMarkers','AreaOfInterest','ArFeatures','GazeFeatures','GazeAnalysis','PupillFeatures','PupillAnalysis','DataFeatures','utils'] \ No newline at end of file -- cgit v1.1