"""Miscellaneous data features."""
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see .
"""
__author__ = "Théo de la Hogue"
__credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
import bisect
import json
import logging
import math
import os
import sys
import threading
import time
from typing import Self
from functools import wraps
import cv2
import matplotlib.patches as mpatches
import matplotlib.pyplot as mpyplot
import numpy
import pandas
from colorama import Style, Fore
# Define global working directory used to load file using relative path
WORKING_DIRECTORY = [None]
def get_working_directory() -> str:
"""Get global working directory."""
if WORKING_DIRECTORY[0] is not None:
return WORKING_DIRECTORY[0]
else:
raise RuntimeError('No working directory')
def set_working_directory(working_directory: str):
"""Set global working directory."""
# Forget former global working directory
if WORKING_DIRECTORY[0] is not None:
sys.path.remove(WORKING_DIRECTORY[0])
# Append new working directory to Python path
sys.path.append(working_directory)
WORKING_DIRECTORY[0] = working_directory
def get_class(class_path: str) -> type:
"""Get class object from 'path.to.class' string.
Parameters:
class_path: a 'path.to.class' string.
Returns:
class: a 'path.to.class' class.
"""
parts = class_path.split('.')
module = ".".join(parts[:-1])
m = __import__(module)
for comp in parts[1:]:
m = getattr(m, comp)
return m
def get_class_path(o: object) -> str:
"""Get 'path.to.class' class path from object.
Parameters:
o: any object instance.
Returns:
class_path: object 'path.to.class' class.
"""
c = o.__class__
m = c.__module__
# Avoid outputs like 'builtins.str'
if m == 'builtins':
return c.__qualname__
return m + '.' + c.__qualname__
def get_class_properties(cls: type) -> dict:
"""Get class properties dictionary.
Parameters:
cls: class to consider.
Returns:
properties: dict of properties stored by names
"""
# Stop recursion when reaching core objects
if cls is not object and cls is not PipelineStepObject and cls is not SharedObject:
object_properties = {name: item for name, item in cls.__dict__.items() if isinstance(item, property)}
for base in cls.__bases__:
base_properties = get_class_properties(base)
if base_properties is not None:
object_properties.update(base_properties)
return object_properties
def from_json(filepath: str) -> any:
"""
Load object from json file.
!!! note
The directory where json file is will be used as global working directory.
Parameters:
filepath: path to json file
"""
logging.debug('DataFeatures.from_json')
# Edit working directory once
if WORKING_DIRECTORY[0] is None:
set_working_directory(os.path.dirname(os.path.abspath(filepath)))
logging.debug('\t> set global working directory as %s', get_working_directory())
# Open JSON file
with open(filepath) as configuration_file:
# Load unique object
object_class, object_data = json.load(configuration_file).popitem()
# Instanciate class
logging.debug('\t+ create %s object', object_class)
return get_class(object_class)(**object_data)
def from_dict(expected_value_type: type, data: dict) -> any:
"""Load expected type instance(s) from dict values."""
logging.debug('\t> load %s from dict', expected_value_type.__name__)
# Check if json keys are PipelineStepObject class and store them in a list
new_objects_list = []
for key, value in data.items():
try:
new_class = get_class(key)
except ValueError as e:
# Keys are not class name
if str(e) == 'Empty module name':
break
else:
raise (e)
logging.debug('\t+ create %s object from key using value as argument', key)
# noinspection PyCallingNonCallable
new_objects_list.append(new_class(**value))
# Only one object have been loaded: pass the object if it is a subclass of expected type
if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type):
return new_objects_list[0]
# Pass non-empty objects list
elif len(new_objects_list) > 0:
return new_objects_list
# Otherwise, data are parameters of the expected class
logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__)
return expected_value_type(**data)
def as_dict(obj: any, filter: bool = True) -> dict:
"""Export object as dictionary.
Parameters:
obj: object to export as dictionary
filter: remove None attribute values.
"""
_dict = {}
for p in get_class_properties(obj.__class__):
v = getattr(obj, p)
if not filter or v is not None:
_dict[p] = v
return _dict
class JsonEncoder(json.JSONEncoder):
"""Specific ArGaze JSON Encoder."""
def default(self, obj):
"""default implementation to serialize object."""
# numpy cases
if isinstance(obj, numpy.integer):
return int(obj)
elif isinstance(obj, numpy.floating):
return float(obj)
elif isinstance(obj, numpy.ndarray):
return obj.tolist()
# default case
try:
return json.JSONEncoder.default(self, obj)
# class case
except:
# ignore attribute starting with _
public_dict = {}
for k, v in vars(obj).items():
if not k.startswith('_'):
# numpy cases
if isinstance(v, numpy.integer):
v = int(v)
elif isinstance(v, numpy.floating):
v = float(v)
elif isinstance(v, numpy.ndarray):
v = v.tolist()
public_dict[k] = v
return public_dict
class DataDictionary(dict):
"""Enable dot notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def timestamp(cls):
"""Decorate a class to enable timestamp management."""
class_init = cls.__init__
class_repr = cls.__repr__
def __init__(self, *args, **kwargs):
"""Initialize timestamped object."""
try:
self._timestamp = kwargs.pop('timestamp')
except KeyError:
self._timestamp = math.nan
class_init(self, *args, **kwargs)
def __repr__(self) -> str:
"""String representation."""
return str(self._timestamp) + ': ' + class_repr(self)
if issubclass(cls, TimestampedObjectsList):
def get_timestamp(self) -> int | float:
"""Get first position timestamp."""
if self:
return self[0].timestamp
def set_timestamp(self, timestamp: int | float):
"""Block timestamp setting."""
raise ('TimestampedObjectsList timestamp is not settable.')
def del_timestamp(self):
"""Block timestamp resetting."""
raise ('TimestampedObjectsList timestamp cannot be deleted.')
def is_timestamped(self) -> bool:
"""Is the object timestamped?"""
return bool(self)
else:
def get_timestamp(self) -> int | float:
"""Get object timestamp."""
return self._timestamp
def set_timestamp(self, timestamp: int | float):
"""Set object timestamp."""
self._timestamp = timestamp
def del_timestamp(self):
"""Reset object timestamp."""
self._timestamp = math.nan
def is_timestamped(self) -> bool:
"""Is the object timestamped?"""
return not math.isnan(self._timestamp)
cls.__init__ = __init__
cls.__repr__ = __repr__
setattr(cls, "timestamp", property(get_timestamp, set_timestamp, del_timestamp, """Object timestamp."""))
setattr(cls, "is_timestamped", is_timestamped)
return cls
class TimestampedObjectsList(list):
"""Handle timestamped object into a list.
!!! warning "Timestamped objects are not sorted internally"
Timestamped objects are considered to be stored according to their coming time.
"""
# noinspection PyMissingConstructor
def __init__(self, ts_object_type: type, ts_objects=None):
if ts_objects is None:
ts_objects = []
self.__object_type = ts_object_type
self.__object_properties_names = list(get_class_properties(self.__object_type).keys())
for ts_object in ts_objects:
self.append(ts_object)
@property
def object_type(self):
"""Get object type handled by the list."""
return self.__object_type
def append(self, ts_object: object | dict):
"""Append timestamped object."""
# Convert dict into object
if type(ts_object) == dict:
ts_object = from_dict(self.__object_type, ts_object)
# Check object type
if type(ts_object) != self.__object_type:
if not issubclass(ts_object.__class__, self.__object_type):
raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance')
if not ts_object.is_timestamped():
raise ValueError(f'object is not timestamped')
super().append(ts_object)
def look_for(self, timestamp: int | float) -> object:
"""Look for object at given timestamp."""
for ts_object in self:
if ts_object.timestamp == timestamp:
return ts_object
def __add__(self, ts_objects: list = []) -> Self:
"""Append timestamped objects list."""
for ts_object in ts_objects:
self.append(ts_object)
return self
@property
def duration(self):
"""Get inferred duration from first and last timestamps."""
if self:
return self[-1].timestamp - self[0].timestamp
else:
return 0
def timestamps(self):
"""Get all timestamps in list."""
return [ts_object.timestamp for ts_object in self]
def tuples(self) -> list:
"""Get all timestamped objects as list of tuple."""
return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self]
@classmethod
def from_dataframe(cls, ts_object_type: type, dataframe: pandas.DataFrame, exclude=None) -> Self:
"""Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)."""
if exclude is None:
exclude = []
dataframe.drop(exclude, inplace=True, axis=True)
assert (dataframe.index.name == 'timestamp')
object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in
dataframe.to_dict('index').items()]
return TimestampedObjectsList(ts_object_type, object_list)
def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame:
"""Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
The optional *split* argument allows tuple values to be stored in dedicated columns.
For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]}
!!! warning "Values must be dictionaries"
Each key is stored as a column name.
!!! note
Timestamps are stored as index column called 'timestamp'.
"""
df = pandas.DataFrame(self.tuples(), columns=self.__object_properties_names)
# Exclude columns
df.drop(exclude, inplace=True, axis=True)
# Split columns
if len(split) > 0:
split_columns = []
for column in df.columns:
if column in split.keys():
df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index)
df.drop(column, inplace=True, axis=True)
for new_column in split[column]:
split_columns.append(new_column)
else:
split_columns.append(column)
# Reorder split columns
df = df[split_columns]
# Append timestamps as index column
df['timestamp'] = self.timestamps()
df.set_index('timestamp', inplace=True)
return df
@classmethod
def from_json(cls, ts_object_type: type, json_filepath: str) -> Self:
"""Create a TimestampedObjectsList from .json file."""
with open(json_filepath, encoding='utf-8') as ts_objects_file:
json_ts_objects = json.load(ts_objects_file)
return TimestampedObjectsList(ts_object_type, [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects])
def to_json(self, json_filepath: str):
"""Save a TimestampedObjectsList to .json file."""
with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file:
json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ')
def __repr__(self):
"""String representation"""
return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, )
def __str__(self):
"""String representation"""
return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, )
def pop_last_until(self, timestamp: int | float) -> object:
"""Pop all item until a given timestamped value and return the first after."""
# get last item before given timestamp
earliest_value = self.get_last_until(timestamp)
while self[0].timestamp < earliest_value.timestamp:
self.pop(0)
return self[0]
def pop_last_before(self, timestamp: int | float) -> object:
"""Pop all item before a given timestamped value and return the last one."""
# get last item before given timestamp
earliest_value = self.get_last_before(timestamp)
popped_value = self.pop(0)
while popped_value.timestamp != earliest_value.timestamp:
popped_value = self.pop(0)
return popped_value
def get_first_from(self, timestamp: int | float) -> object:
"""Retrieve first item timestamp from a given timestamp value."""
ts_list = self.timestamps()
first_from_index = bisect.bisect_left(ts_list, timestamp)
if first_from_index < len(self):
return self[ts_list[first_from_index]]
else:
raise KeyError(f'No data stored after {timestamp} timestamp.')
def get_last_before(self, timestamp: int | float) -> object:
"""Retrieve last item timestamp before a given timestamp value."""
ts_list = self.timestamps()
last_before_index = bisect.bisect_left(ts_list, timestamp) - 1
if last_before_index >= 0:
return self[ts_list[last_before_index]]
else:
raise KeyError(f'No data stored before {timestamp} timestamp.')
def get_last_until(self, timestamp: int | float) -> object:
"""Retrieve last item timestamp until a given timestamp value."""
ts_list = self.timestamps()
last_until_index = bisect.bisect_right(ts_list, timestamp) - 1
if last_until_index >= 0:
return self[ts_list[last_until_index]]
else:
raise KeyError(f'No data stored until {timestamp} timestamp.')
def plot(self, names=[], colors=[], split=None, samples=None) -> list:
"""Plot as [matplotlib](https://matplotlib.org/) time chart."""
if split is None:
split = {}
df = self.as_dataframe(split=split)
legend_patches = []
# decimate data
if samples != None:
if samples < len(df):
step = int(len(df) / samples) + 1
df = df.iloc[::step, :]
for name, color in zip(names, colors):
markerline, stemlines, baseline = mpyplot.stem(df.index, df[name])
mpyplot.setp(markerline, color=color, linewidth=1, markersize=1)
mpyplot.setp(stemlines, color=color, linewidth=1)
mpyplot.setp(baseline, color=color, linewidth=1)
legend_patches.append(mpatches.Patch(color=color, label=name.upper()))
return legend_patches
class SharedObject():
"""Enable multiple threads sharing."""
def __init__(self):
self._lock = threading.Lock()
def busy(self) -> bool:
"""Return lock state."""
return self._lock.locked()
class SharedObjectBusy(Exception):
"""
Exception to raised when a shared object is locked.
"""
def __init__(self, message):
super().__init__(message)
@timestamp
class TimestampedException(Exception):
"""Enable timestamp management for exception."""
def __init__(self, exception: Exception):
Exception.__init__(self, exception)
class TimestampedExceptions(TimestampedObjectsList):
"""Handle timestamped exceptions into a list."""
def __init__(self, exceptions: list = []):
TimestampedObjectsList.__init__(self, TimestampedException, exceptions)
def values(self) -> list[str]:
"""Get all timestamped exception values as list of messages."""
return [ts_exception.message for ts_exception in self]
class PipelineStepLoadingFailed(Exception):
"""
Exception raised when pipeline step object loading fails.
"""
def __init__(self, message):
super().__init__(message)
class PipelineStepEnterFailed(Exception):
"""
Exception raised when pipeline step object context fails to enter.
"""
def __init__(self, message):
super().__init__(message)
@timestamp
class TimestampedImage(numpy.ndarray):
"""Wrap numpy.array to timestamp image."""
def __new__(cls, array: numpy.array, **kwargs):
return numpy.ndarray.__new__(cls, array.shape, dtype=array.dtype, buffer=array)
def __init__(self, array: numpy.array, **kwargs):
pass
def __array_finalize__(self, obj):
pass
@property
def size(self) -> list:
"""Return list with width and height."""
return list(self.shape[0:2][::-1])
class TimestampedImages(TimestampedObjectsList):
"""Handle timestamped images into a list."""
def __init__(self, images: list = []):
TimestampedObjectsList.__init__(self, TimestampedImage, images)
def PipelineStepExecutionTime(method):
"""Define a decorator use to assess pipeline step execution time and frequency."""
@wraps(method)
def wrapper(self, *args, **kwargs):
start = time.perf_counter()
result = method(self, *args, **kwargs)
end = time.perf_counter()
# Check earlier call dates to calculate frequency
try:
last_start, last_end = self._execution_times[method.__name__]
if start > last_start:
self._execution_frequencies[method.__name__] = 1 / (start - last_start)
except KeyError:
self._execution_frequencies[method.__name__] = math.nan
# Store start end end dates
self._execution_times[method.__name__] = (start, end)
return result
return wrapper
def PipelineStepInit(method):
"""Define a decorator use into PipelineStepObject class to wrap pipeline step __init__ method."""
@wraps(method)
def wrapper(self, **kwargs):
"""Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call.
Parameters:
self:
kwargs: any arguments defined by PipelineStepMethodInit.
"""
# Init pipeline step object attributes
PipelineStepObject.__init__(self)
# Init class attributes
method(self, **kwargs)
# Update all attributes
self.update_attributes(kwargs)
return wrapper
def PipelineStepEnter(method):
"""Define a decorator use into PipelineStepObject class to wrap pipeline step __enter__ method."""
@wraps(method)
def wrapper(self):
"""Wrap pipeline step __enter__ method to call super, observers and children __enter__ method."""
logging.debug('%s.__enter__', get_class_path(self))
PipelineStepObject.__enter__(self)
try:
method(self)
self._starting_error = None
except Exception as e:
self._starting_error = e
logging.error('%s.__enter__: %s', get_class_path(self), e)
return self
return wrapper
def PipelineStepExit(method):
"""Define a decorator use into PipelineStepObject class to wrap pipeline step __exit__ method."""
@wraps(method)
def wrapper(self, *args):
"""Wrap pipeline step __exit__ method to call super, observers and children __exit__ method."""
logging.debug('%s.__exit__', get_class_path(self))
method(self, *args)
PipelineStepObject.__exit__(self, *args)
return wrapper
def PipelineStepAttributeSetter(method):
"""Define a decorator use into PipelineStepObject class to wrap pipeline step attribute setter."""
@wraps(method)
def wrapper(self, new_value, unwrap: bool = False):
"""Wrap pipeline step attribute setter to load attribute from file.
Parameters:
self:
new_value: value used to set attribute.
unwrap: call wrapped method directly.
"""
if unwrap:
return method(self, new_value)
# Get new value type
new_value_type = type(new_value)
# Check setter annotations to get expected value type
try:
expected_value_type = list(method.__annotations__.values())[0]
except KeyError:
raise (PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}'))
logging.debug('%s@%s.setter', get_class_path(self), method.__name__)
logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__)
# String not expected: load value from file
if new_value_type == str and new_value_type != expected_value_type:
split_point = new_value.split('.')
# String have a dot inside: file path with format
if len(split_point) > 1:
file_format = split_point[-1].upper()
logging.debug('\t> %s is a path to a %s file', new_value, file_format)
filepath = os.path.join(get_working_directory(), new_value)
# Load image from JPG and PNG formats
if file_format == 'JPG' or file_format == 'PNG':
return method(self, TimestampedImage(cv2.imread(filepath)))
# Load image from OBJ formats
elif file_format == 'OBJ':
return method(self, expected_value_type.from_obj(filepath))
# Load object from JSON file
elif file_format == 'JSON':
with open(filepath) as file:
return method(self, from_dict(expected_value_type, json.load(file)))
# No point inside string: identifier name
else:
logging.debug('\t> %s is an identifier', new_value)
logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__)
return method(self, expected_value_type(new_value))
# Dict not expected: load value from dict
if new_value_type == dict and expected_value_type != dict:
return method(self, from_dict(expected_value_type, new_value))
# Otherwise, pass new value to setter method
logging.debug('\t> use %s value as passed', new_value_type.__name__)
method(self, new_value)
return wrapper
def PipelineStepMethod(method):
"""Define a decorator use into PipelineStepObject class to declare pipeline method."""
@wraps(method)
def wrapper(self, *args, timestamp: int | float = None, unwrap: bool = False, **kwargs):
"""Wrap pipeline step method to notify observers and timestamped exceptions.
Parameters:
self:
args: any arguments defined by PipelineStepMethod.
timestamp: optional method call timestamp (unit doesn't matter) if first args parameter is not a TimestampedObject instance.
unwrap: extra arguments used in wrapper function to call wrapped method directly.
"""
if timestamp is None and len(args) > 0:
try:
timestamp = args[0].timestamp
except:
logging.error('%s.%s: %s is not a timestamped class. Use @DataFeatures.timestamp decorator.', get_class_path(self), method.__name__, type(args[0]).__name__)
if unwrap:
return method(self, *args, **kwargs)
# Initialize execution outputs
exception = None
result = None
if not self._catch_exceptions:
# Execute wrapped method without catching exceptions
result = method(self, *args, **kwargs)
else:
try:
# Execute wrapped method
result = method(self, *args, **kwargs)
except Exception as e:
exception = e
# Notify observers watching 'on_' signal
self.send_signal(method.__name__, timestamp=timestamp, exception=exception)
# Raise timestamped exception
if exception is not None:
raise TimestampedException(exception, timestamp=timestamp)
return result
return wrapper
def PipelineStepImage(method):
"""Define a decorator use into PipelineStepObject class to wrap pipeline step image method."""
@wraps(method)
def wrapper(self, wait: bool = True, **kwargs) -> numpy.array:
"""Wrap pipeline step image method.
Parameters:
wait: for SharedObject, wait until the object is available else throw a SharedObjectBusy exception.
"""
# Check shared object instance
if issubclass(type(self), SharedObject):
# Wait until the shared object is available if required
if wait:
while self.busy():
time.sleep(1e-6)
# Otherwise, busy shared object can't return image
else:
if self.busy():
raise SharedObjectBusy(f'Can\'t return image because {self.name} is busy.')
if kwargs:
logging.debug('\t> using kwargs')
return method(self, **kwargs)
else:
logging.debug('\t> using image_parameters')
return method(self, **self.image_parameters)
return wrapper
def PipelineStepDraw(method):
"""Define a decorator use into PipelineStepObject class to wrap pipeline step draw method."""
@wraps(method)
def wrapper(self, image: numpy.array, **kwargs):
"""Wrap pipeline step draw method."""
if kwargs:
logging.debug('\t> using kwargs')
method(self, image, **kwargs)
else:
logging.debug('\t> using draw_parameters')
method(self, image, **self.draw_parameters)
return wrapper
# noinspection PyAttributeOutsideInit
class PipelineStepObject():
"""Define class to assess pipeline step methods execution time and observe them.
"""
__initialized = False
_starting_error = None
def __init__(self):
"""Initialize PipelineStepObject."""
if not self.__initialized:
logging.debug('%s.__init__', get_class_path(self))
# Init private attributes
self.__initialized = True
self.__name = None
self.__observers = []
self.__image_parameters = {}
# Init protected attributes
self._catch_exceptions = True
self._image_parameters = {}
self._draw_parameters = {}
# Parent attribute will be setup later by parent itself
self.__parent = None
# Init execution assessment
self._execution_times = {}
self._execution_frequencies = {}
self.__last_execution_assessment_time = None
def __enter__(self):
"""Define default method to enter into pipeline step object context."""
# Start children pipeline step objects
for child in self.children():
child.__enter__()
# Start observers
for observer in self.observers:
observer.__enter__()
# Context starting error is catched in @PipelineStepEnter decorator wrapper
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
"""Define default method to exit from pipeline step object context."""
# Stop observers
for observer in self.observers:
observer.__exit__(exception_type, exception_value, exception_traceback)
# Stop children pipeline step objects
for child in self.children():
child.__exit__(exception_type, exception_value, exception_traceback)
def update_attributes(self, object_data: dict):
"""Update pipeline step object attributes with dictionary."""
for key, value in object_data.items():
if hasattr(self, key):
logging.debug('%s.update_attributes > update %s with %s value', get_class_path(self), key, type(value).__name__)
setattr(self, key, value)
else:
raise (AttributeError(f'{get_class_path(self)} has not {key} attribute.'))
@property
def starting_error(self) -> Exception:
"""Get pipeline step object's context stating error."""
return self._starting_error
@property
def name(self) -> str:
"""Get pipeline step object's name."""
return self.__name
@name.setter
def name(self, name: str):
"""Set pipeline step object's name."""
self.__name = name
@property
def parent(self) -> Self:
"""Get pipeline step object's parent object."""
return self.__parent
@parent.setter
def parent(self, parent: Self):
"""Set layer's parent object."""
self.__parent = parent
@property
def observers(self) -> list:
"""Get pipeline step object observers list."""
return self.__observers
@observers.setter
@PipelineStepAttributeSetter
def observers(self, observers: list):
"""Set pipeline step object observers list."""
self.__observers = observers
# Edit observers' parent
for observer in self.__observers:
observer.parent = self
@property
def catch_exceptions(self) -> bool:
"""Catch pipeline step method exception instead of crashing execution."""
return self._catch_exceptions
@catch_exceptions.setter
def catch_exceptions(self, catch_exceptions: bool):
self._catch_exceptions = catch_exceptions
# Propagate to children
for child in self.children():
child.catch_exceptions = self._catch_exceptions
@property
def image_parameters(self) -> dict:
"""image method parameters dictionary."""
return self._image_parameters
@image_parameters.setter
@PipelineStepAttributeSetter
def image_parameters(self, image_parameters: dict):
self._image_parameters = image_parameters
@property
def draw_parameters(self) -> dict:
"""draw method parameters dictionary."""
return self._draw_parameters
@draw_parameters.setter
@PipelineStepAttributeSetter
def draw_parameters(self, draw_parameters: dict):
self._draw_parameters = draw_parameters
def as_dict(self) -> dict:
"""Export PipelineStepObject attributes as dictionary.
Returns:
object_data: dictionary with pipeline step object attributes values.
"""
return {
"name": self.__name,
"observers": self.__observers
}
# noinspection PyAttributeOutsideInit
def to_json(self, json_filepath: str = None):
"""Save pipeline step object into .json file."""
# Remember file path to ease rewriting
if json_filepath is not None:
# noinspection PyAttributeOutsideInit
self.__json_filepath = json_filepath
# Open file
with open(self.__json_filepath, 'w', encoding='utf-8') as object_file:
json.dump({self.__class__.__module__: as_dict(self)}, object_file, ensure_ascii=False, indent=4)
# QUESTION: maybe we need two saving mode?
#json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder)
def __str__(self) -> str:
"""String representation of pipeline step object.
Returns:
String representation
"""
logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '')
tabs = self.tabulation
output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n'
if self.__name is not None:
output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n'
if self.__parent is not None:
output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n'
output += f'{tabs}\t{Style.BRIGHT}catch_exceptions{Style.RESET_ALL}: {Fore.MAGENTA}{self._catch_exceptions}{Style.RESET_ALL}\n'
if len(self.__observers):
output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n'
for observer in self.__observers:
output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n'
for name, value in self.properties():
logging.debug('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__)
output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: '
if type(value) == dict:
output += '\n'
for k, v in value.items():
output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n'
elif type(value) == list:
output += '\n'
for v in value:
output += f'{tabs}\t - {v}\n'
elif type(value) == numpy.ndarray or type(value) == TimestampedImage:
output += f'numpy.array{value.shape}\n'
elif type(value) == pandas.DataFrame:
output += f'pandas.DataFrame{value.shape}\n'
else:
try:
output += f'{value}'
except TypeError as e:
logging.error('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__)
output += f'{Fore.RED}{Style.BRIGHT}!!! {get_class_path(self)}.{name}: {e}{Style.RESET_ALL}\n\n'
if output[-1] != '\n':
output += '\n'
def print_dict(d: dict, key_color, tabs) -> str:
output = ''
tabs = f'{tabs}\t'
for k, v in d.items():
if type(v) is dict:
output += f'{tabs} - {key_color}{Style.BRIGHT}{k}{Style.RESET_ALL}:\n{print_dict(v, key_color, tabs)}'
else:
output += f'{tabs} - {key_color}{Style.BRIGHT}{k}{Style.RESET_ALL}: {v}\n'
return output
if len(self._image_parameters):
output += f'{tabs}\t{Style.BRIGHT}image_parameters{Style.RESET_ALL}:\n'
output += print_dict(self._image_parameters, Fore.YELLOW, tabs)
if len(self._draw_parameters):
output += f'{tabs}\t{Style.BRIGHT}draw_parameters{Style.RESET_ALL}:\n'
output += print_dict(self._draw_parameters, Fore.YELLOW, tabs)
return output
@property
def tabulation(self) -> str:
"""Edit tabulation string according parents number."""
tabs = ''
parent = self.__parent
while (parent is not None):
tabs += '\t'
parent = parent.parent
return tabs
def properties(self) -> tuple[str, any]:
"""Iterate over object properties values."""
for name in get_class_properties(self.__class__).keys():
yield name, getattr(self, name)
def children(self):
"""Iterate over children pipeline step objects."""
for name, value in self.properties():
# Pipeline step object attribute
if issubclass(type(value), PipelineStepObject) and value != self.parent:
yield value
# Pipeline step objects list attribute
elif type(value) == list:
for p in value:
if issubclass(type(p), PipelineStepObject):
yield p
# Pipeline step objects list attribute
elif type(value) == dict:
for p in value.values():
if issubclass(type(p), PipelineStepObject):
yield p
def send_signal(self, signal: str, timestamp: int | float = None, exception: Exception = None):
"""Send signal to observers.
Parameters:
signal: notify observers with 'on_' method.
timestamp: optional timestamp (unit doesn't matter).
exception: optional exception to share with signal.
"""
subscription_name = f'on_{signal}'
logging.debug('%s.send_signal %s', get_class_path(self), subscription_name)
for observer in self.observers:
# Does the observer cares about this method?
if subscription_name in dir(observer):
subscription = getattr(observer, subscription_name)
# Call subscription
subscription(timestamp, self, exception)
def execution_info(self, method_name: str) -> tuple[float, float]:
"""Get pipeline step method execution time (s) and frequency (Hz)."""
t = math.nan
f = math.nan
# Check execution time
try:
start, end = self._execution_times[method_name]
t = end - start
except KeyError:
t = math.nan
# Check execution frequency
try:
f = self._execution_frequencies[method_name]
except KeyError:
f = math.nan
return t, f