"""Miscellaneous data features.
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see .
"""
__author__ = "Théo de la Hogue"
__credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
from typing import Self
import os
import sys
import logging
import traceback
import importlib
import collections
import json
import bisect
import threading
import math
import time
import pandas
import numpy
import cv2
import matplotlib.pyplot as mpyplot
import matplotlib.patches as mpatches
from colorama import Style, Fore
def module_path(obj) -> str:
"""
Get object module path.
Returns:
module path
"""
return obj.__class__.__module__
def get_class(class_path: str) -> object:
"""Get class object from 'path.to.class' string.
Parameters:
class_path: a 'path.to.class' string.
Returns:
class: a 'path.to.class' class.
"""
parts = class_path.split('.')
module = ".".join(parts[:-1])
m = __import__(module)
for comp in parts[1:]:
m = getattr(m, comp)
return m
def properties(cls) -> list:
"""get class properties name."""
properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)]
for base in cls.__bases__:
for name, item in base.__dict__.items():
if isinstance(item, property):
properties.append(name)
return properties
def as_dict(obj, filter: bool=True) -> dict:
"""Export object as dictionary.
Parameters:
filter: remove None attribute values.
"""
_dict = {}
for p in properties(obj.__class__):
v = getattr(obj, p)
if not filter or v is not None:
_dict[p] = v
return _dict
class JsonEncoder(json.JSONEncoder):
"""Specific ArGaze JSON Encoder."""
def default(self, obj):
"""default implementation to serialize object."""
# numpy cases
if isinstance(obj, numpy.integer):
return int(obj)
elif isinstance(obj, numpy.floating):
return float(obj)
elif isinstance(obj, numpy.ndarray):
return obj.tolist()
# default case
try:
return json.JSONEncoder.default(self, obj)
# class case
except:
# ignore attribute starting with _
public_dict = {}
for k, v in vars(obj).items():
if not k.startswith('_'):
# numpy cases
if isinstance(v, numpy.integer):
v = int(v)
elif isinstance(v, numpy.floating):
v = float(v)
elif isinstance(v, numpy.ndarray):
v = v.tolist()
public_dict[k] = v
return public_dict
class DataDictionary(dict):
"""Enable dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class TimestampedObject():
"""Abstract class to enable timestamp management."""
def __init__(self, timestamp: int|float = math.nan):
"""Initialize TimestampedObject."""
self._timestamp = timestamp
def __repr__(self):
"""String representation."""
return json.dumps(as_dict(self))
@property
def timestamp(self) -> int|float:
"""Get object timestamp."""
return self._timestamp
@timestamp.setter
def timestamp(self, timestamp: int|float):
"""Set object timestamp."""
self._timestamp = timestamp
def untimestamp(self):
"""Reset object timestamp."""
self._timestamp = math.nan
def is_timestamped(self) -> bool:
"""Is the object timestamped?"""
return not math.isnan(self._timestamp)
class TimestampedObjectsList(list):
"""Handle timestamped object into a list.
!!! warning "Timestamped objects are not sorted internally"
Timestamped objects are considered to be stored according at their coming time.
"""
def __init__(self, ts_object_type: type, ts_objects: list = []):
self.__object_type = ts_object_type
self.__object_properties = properties(self.__object_type)
for ts_object in ts_objects:
self.append(ts_object)
@property
def object_type(self):
"""Get object type handled by the list."""
return self.__object_type
def append(self, ts_object: TimestampedObject|dict):
"""Append timestamped object."""
# Convert dict into GazePosition
if type(ts_object) == dict:
ts_object = self.__object_type.from_dict(ts_object)
# Check object type
if type(ts_object) != self.__object_type:
if not issubclass(ts_object.__class__, self.__object_type):
raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance')
if not ts_object.is_timestamped():
raise ValueError(f'object is not timestamped')
super().append(ts_object)
def look_for(self, timestamp: int|float) -> TimestampedObject:
"""Look for object at given timestamp."""
for ts_object in self:
if ts_object.timestamp == timestamp:
return ts_object
def __add__(self, ts_objects: list = []) -> Self:
"""Append timestamped objects list."""
for ts_object in ts_objects:
self.append(ts_object)
return self
@property
def duration(self):
"""Get inferred duration from first and last timestamps."""
if self:
return self[-1].timestamp - self[0].timestamp
else:
return 0
def timestamps(self):
"""Get all timestamps in list."""
return [ts_object.timestamp for ts_object in self]
def tuples(self) -> list:
"""Get all timestamped objects as list of tuple."""
return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self]
@classmethod
def from_dataframe(self, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self:
"""Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)."""
dataframe.drop(exclude, inplace=True, axis=True)
assert(dataframe.index.name == 'timestamp')
object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in dataframe.to_dict('index').items()]
return TimestampedObjectsList(ts_object_type, object_list)
def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame:
"""Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
The optional *split* argument allows tuple values to be stored in dedicated columns.
For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]}
!!! warning "Values must be dictionaries"
Each key is stored as a column name.
!!! note
Timestamps are stored as index column called 'timestamp'.
"""
df = pandas.DataFrame(self.tuples(), columns=self.__object_properties)
# Exclude columns
df.drop(exclude, inplace=True, axis=True)
# Split columns
if len(split) > 0:
splited_columns = []
for column in df.columns:
if column in split.keys():
df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index)
df.drop(column, inplace=True, axis=True)
for new_column in split[column]:
splited_columns.append(new_column)
else:
splited_columns.append(column)
# Reorder splited columns
df = df[splited_columns]
# Append timestamps as index column
df['timestamp'] = self.timestamps()
df.set_index('timestamp', inplace=True)
return df
@classmethod
def from_json(self, ts_object_type: type, json_filepath: str) -> Self:
"""Create a TimestampedObjectsList from .json file."""
with open(json_filepath, encoding='utf-8') as ts_objects_file:
json_ts_objects = json.load(ts_objects_file)
return TimestampedObjectsList(ts_object_type, [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects])
def to_json(self, json_filepath: str):
"""Save a TimestampedObjectsList to .json file."""
with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file:
json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ')
def __repr__(self):
"""String representation"""
return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,)
def __str__(self):
"""String representation"""
return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,)
def pop_last_until(self, timestamp: int|float) -> TimestampedObject:
"""Pop all item until a given timestamped value and return the first after."""
# get last item before given timestamp
earliest_value = self.get_last_until(timestamp)
while self[0].timestamp < earliest_value.timestamp:
self.pop(0)
return self[0]
def pop_last_before(self, timestamp: int|float) -> TimestampedObject:
"""Pop all item before a given timestamped value and return the last one."""
# get last item before given timestamp
earliest_value = self.get_last_before(timestamp)
poped_value = self.pop(0)
while poped_value.timestamp != earliest_value.timestamp:
poped_value = self.pop(0)
return poped_value
def get_first_from(self, timestamp: int|float) -> TimestampedObject:
"""Retreive first item timestamp from a given timestamp value."""
ts_list = self.timestamps()
first_from_index = bisect.bisect_left(ts_list, timestamp)
if first_from_index < len(self):
return self[ts_list[first_from_index]]
else:
raise KeyError(f'No data stored after {timestamp} timestamp.')
def get_last_before(self, timestamp: int|float) -> TimestampedObject:
"""Retreive last item timestamp before a given timestamp value."""
ts_list = self.timestamps()
last_before_index = bisect.bisect_left(ts_list, timestamp) - 1
if last_before_index >= 0:
return self[ts_list[last_before_index]]
else:
raise KeyError(f'No data stored before {timestamp} timestamp.')
def get_last_until(self, timestamp: int|float) -> TimestampedObject:
"""Retreive last item timestamp until a given timestamp value."""
ts_list = self.timestamps()
last_until_index = bisect.bisect_right(ts_list, timestamp) - 1
if last_until_index >= 0:
return self[ts_list[last_until_index]]
else:
raise KeyError(f'No data stored until {timestamp} timestamp.')
def plot(self, names=[], colors=[], split={}, samples=None) -> list:
"""Plot as [matplotlib](https://matplotlib.org/) time chart."""
df = self.as_dataframe(split=split)
legend_patches = []
# decimate data
if samples != None:
if samples < len(df):
step = int(len(df) / samples) + 1
df = df.iloc[::step, :]
for name, color in zip(names, colors):
markerline, stemlines, baseline = mpyplot.stem(df.index, df[name])
mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1)
mpyplot.setp(stemlines, color=color, linewidth=1)
mpyplot.setp(baseline, color=color, linewidth=1)
legend_patches.append(mpatches.Patch(color=color, label=name.upper()))
return legend_patches
class SharedObject(TimestampedObject):
"""Abstract class to enable multiple threads sharing for timestamped object."""
def __init__(self, timestamp: int|float = math.nan):
TimestampedObject.__init__(self, timestamp)
self._lock = threading.Lock()
self._execution_times = {}
self._exceptions = {}
def PipelineStepInit(method):
"""Define a decorator use into PipelineStepObject class to declare pipeline step init method."""
def wrapper(self, **kwargs):
"""Wrap pipeline step init method to update PipelineStepObject attributes with arguments after init call.
Parameters:
kwargs: any arguments defined by PipelineStepMethodInit.
"""
method(self, **kwargs)
self.update_attributes(kwargs)
return wrapper
def PipelineStepAttributeSetter(method):
"""Define a decorator use into PipelineStepObject class to declare pipeline step attribute setter."""
def wrapper(self, new_value, unwrap: bool = False):
"""Wrap pipeline step attribute setter to load attribute from file.
Parameters:
new_value: value used to set attribute.
unwrap: call wrapped method directly.
"""
if unwrap:
return method(self, new_value)
# Get new value type
new_value_type = type(new_value)
# Check setter annotations to get expected value type
try:
expected_value_type = list(method.__annotations__.values())[0]
except KeyError:
raise(ValueError(f'Missing annotations in {method.__name__}: {method.__annotations__}'))
logging.debug('@PipelineStepAttributeSetter %s.%s.setter(%s) with %s', type(self).__name__, method.__name__, expected_value_type.__name__, new_value_type.__name__)
# Define function to load dict values
def load_dict(data: dict) -> any:
logging.debug('\t> load %s from %s', expected_value_type.__name__, new_value_type.__name__)
# Check if json keys are PipelineStepObject class and store them in a list
new_objects_list = []
for key, value in data.items():
try:
new_class = get_class(key)
except ValueError as e:
# Keys are not class name
if str(e) == 'Empty module name':
break
else:
raise(e)
logging.debug('\t+ create %s object from key using value as argument', key)
new_objects_list.append( new_class(**value) )
# Only one object have been loaded: pass the object if it is a subclass of expected type
if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type):
return new_objects_list[0]
# Pass non empty objects list
elif len(new_objects_list) > 0:
return new_objects_list
# Otherwise, data are parameters of the expected class
logging.debug('\t+ create %s object using %s as argument', expected_value_type.__name__, new_value_type.__name__)
return expected_value_type(**data)
# String not expected: load value from file
if new_value_type == str and new_value_type != expected_value_type:
split_point = new_value.split('.')
# String have a dot inside: file path with format
if len(split_point) > 1:
file_format = split_point[-1]
logging.debug('\t> %s is a path to a %s file', new_value, file_format.upper())
filepath = os.path.join(self.working_directory, new_value)
# Load image from JPG and PNG formats
if file_format == 'jpg' or file_format == 'png':
return method(self, cv2.imread(filepath))
# Load image from OBJ formats
elif file_format == 'obj':
return method(self, expected_value_type.from_obj(filepath))
# Load object from JSON file
elif file_format == 'json':
with open(filepath) as file:
return method(self, load_dict(json.load(file)))
# No point inside string: identifier name
else:
logging.debug('\t> %s is an identifier', new_value)
logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__)
return method(self, expected_value_type(new_value))
# Dict not expected: load value from dict
if new_value_type == dict and expected_value_type != dict:
return method(self, load_dict(new_value))
# Otherwise, pass new value to setter method
logging.debug('\t> use %s value as passed', new_value_type.__name__)
method(self, new_value)
return wrapper
class PipelineStepObject():
"""
Define class to assess pipeline step methods execution time and observe them.
"""
@PipelineStepInit
def __init__(self, **kwargs):
"""Initialize PipelineStepObject."""
logging.debug('PipelineStepObject.__init__ %s %s', type(self).__name__, kwargs['name'] if 'name' in kwargs else '')
# Init private attribute
self.__name = None
self.__working_directory = None
self.__observers = []
self.__execution_times = {}
# Parent attribute will be setup later by parent it self
self.__parent = None
def __enter__(self):
"""At with statement start."""
# Start children pipeline step objects
for child in self.children:
child.__enter__()
# Start observers
for observer in self.__observers:
observer.__enter__()
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
"""At with statement end."""
# End observers
for observer in self.__observers:
observer.__exit__(exception_type, exception_value, exception_traceback)
# End children pipeline step objects
for child in self.children:
child.__exit__(exception_type, exception_value, exception_traceback)
def update_attributes(self, object_data: dict):
"""Update pipeline step object attributes with dictionary."""
for key, value in object_data.items():
logging.debug('PipelineStepObject.update_attributes %s.%s with %s value', type(self).__name__, key, type(value).__name__)
setattr(self, key, value)
@property
def name(self) -> str:
"""Get pipeline step object's name."""
return self.__name
@name.setter
def name(self, name: str):
"""Set pipeline step object's name."""
self.__name = name
@property
def working_directory(self) -> str:
"""Get pipeline step object's working directory.
This path will be joined to relative file path."""
return self.__working_directory
@working_directory.setter
def working_directory(self, working_directory: str):
"""Set pipeline step object's working directory."""
# Append working directory to the Python path
if working_directory is not None:
sys.path.append(working_directory)
self.__working_directory = working_directory
@property
def parent(self) -> object:
"""Get pipeline step object's parent object."""
return self.__parent
@parent.setter
def parent(self, parent: object):
"""Set layer's parent object."""
self.__parent = parent
@property
def observers(self) -> list:
"""Pipeline step object observers list."""
return self.__observers
@observers.setter
@PipelineStepAttributeSetter
def observers(self, observers: list):
# Edit new observers dictionary
self.__observers = observers
@property
def execution_times(self):
"""Get pipeline step object observers execution times dictionary."""
return self.__execution_times
def as_dict(self) -> dict:
"""Export PipelineStepObject attributes as dictionary.
Returns:
object_data: dictionary with pipeline step object attributes values.
"""
return {
"name": self.__name,
"observers": self.__observers
}
@classmethod
def from_json(cls, configuration_filepath: str, patch_filepath: str = None) -> object:
"""
Load instance from .json file.
Parameters:
configuration_filepath: path to json configuration file
patch_filepath: path to json patch file to modify any configuration entries
"""
logging.debug('%s.from_json', cls.__name__)
# Load configuration from JSON file
with open(configuration_filepath) as configuration_file:
# Edit object_data with working directory as first key
object_data = {
'working_directory': os.path.dirname(configuration_filepath)
}
object_data.update(json.load(configuration_file))
# Apply patch to configuration if required
if patch_filepath is not None:
with open(patch_filepath) as patch_file:
patch_data = json.load(patch_file)
import collections.abc
def update(d, u):
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = update(d.get(k, {}), v)
elif v is None:
del d[k]
else:
d[k] = v
return d
object_data = update(object_data, patch_data)
# Instanciate class
return cls(**object_data)
def to_json(self, json_filepath: str = None):
"""Save pipeline step object into .json file."""
# Remember file path to ease rewriting
if json_filepath is not None:
self.__json_filepath = json_filepath
# Open file
with open(self.__json_filepath, 'w', encoding='utf-8') as object_file:
json.dump({module_path(self):as_dict(self)}, object_file, ensure_ascii=False, indent=4)
# QUESTION: maybe we need two saving mode?
#json.dump(self, object_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder)
def __str__(self) -> str:
"""
String representation of pipeline step object.
Returns:
String representation
"""
tabs = self.tabulation
output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n'
if self.__name is not None:
output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n'
if self.__parent is not None:
output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n'
if len(self.__observers):
output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n'
for observer in self.__observers:
output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n'
for name, value in self.properties:
output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: '
if type(value) == dict:
output += '\n'
for k, v in value.items():
output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n'
if type(value) == list:
output += '\n'
for v in value:
output += f'{tabs}\t - {v}\n'
elif type(value) == numpy.ndarray:
output += f'numpy.array{value.shape}\n'
elif type(value) == pandas.DataFrame:
output += f'pandas.DataFrame{value.shape}\n'
else:
try:
output += f'{value}'
except TypeError as e:
output += f'{Fore.RED}{Style.BRIGHT}!!! {type(self).__name__}.{name}: {e}{Style.RESET_ALL}\n\n'
if output[-1] != '\n':
output += '\n'
return output
@property
def tabulation(self) -> str:
"""Edit tabulation string according parents number."""
tabs = ''
parent = self.__parent
while (parent is not None):
tabs += '\t'
parent = parent.parent
return tabs
@property
def properties(self) -> tuple[name, any]:
"""Iterate over pipeline step properties values."""
for name, item in self.__class__.__dict__.items():
if isinstance(item, property):
yield name, getattr(self, name)
for base in self.__class__.__bases__:
if base != PipelineStepObject and base != SharedObject:
for name, item in base.__dict__.items():
if isinstance(item, property):
yield name, getattr(self, name)
@property
def children(self) -> object:
"""Iterate over children pipeline step objects."""
for name in dir(self):
if not name.startswith('_'):
attr = getattr(self, name)
if isinstance(attr, PipelineStepObject) and attr != self.parent:
yield attr
def PipelineStepMethod(method):
"""Define a decorator use into PipelineStepObject class to declare pipeline method.
!!! danger
PipelineStepMethod must have a timestamp as first argument.
"""
def wrapper(self, *args, timestamp: int|float = None, unwrap: bool = False, **kwargs):
"""Wrap pipeline step method to measure execution time.
Parameters:
args: any arguments defined by PipelineStepMethod.
timestamp: optional method call timestamp (unit does'nt matter) if first args parameter is not a TimestampedObject instance.
unwrap: extra arguments used in wrapper function to call wrapped method directly.
"""
if timestamp is None and len(args) > 0:
if isinstance(args[0], TimestampedObject):
timestamp = args[0].timestamp
if unwrap:
return method(self, *args, **kwargs)
# Initialize execution time assessment
start = time.perf_counter()
exception = None
result = None
try:
# Execute wrapped method
result = method(self, *args, **kwargs)
except Exception as e:
exception = e
finally:
# Measure execution time
self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3
# Notify observers that method has been called
subscription_name = f'on_{method.__name__}'
for observer in self.observers:
# Does the observer cares about this method?
if subscription_name in dir(observer):
subscription = getattr(observer, subscription_name)
# Call subscription
subscription(timestamp, self, exception)
# Raise exception
if exception is not None:
raise exception
return result
return wrapper
class PipelineStepObserver():
"""Define abstract class to observe pipeline step object use.
!!! note
To subscribe to a method call, the inherited class simply needs to define 'on_' functions with timestamp, object and traceback argument.
"""
def __enter__(self):
"""
Define abstract __enter__ method to use observer as a context.
!!! warning
This method is called provided that the observed PipelineStepObject is created as a context using a with statement.
"""
return self
def __exit__(self, type, value, traceback):
"""
Define abstract __exit__ method to use observer as a context.
!!! warning
This method is called provided that the observed PipelineStepObject is created as a context using a with statement.
"""
pass
class PipelineInputProvider(PipelineStepObject):
"""
Define class to ...
"""
@PipelineStepInit
def __init__(self, **kwargs):
logging.debug('PipelineInputProvider.__init__')
super().__init__()
def attach(self, method):
logging.debug('PipelineInputProvider.attach', method)
def __enter__(self):
"""
Define abstract __enter__ method to use device as a context.
!!! warning
This method is called provided that the PipelineInputProvider is created as a context using a with statement.
"""
return self
def __exit__(self, type, value, traceback):
"""
Define abstract __exit__ method to use device as a context.
!!! warning
This method is called provided that the PipelineInputProvider is created as a context using a with statement.
"""
pass