diff options
author | Théo de la Hogue | 2024-04-08 21:06:40 +0200 |
---|---|---|
committer | Théo de la Hogue | 2024-04-08 21:06:40 +0200 |
commit | 007feec2a8d8de80f40b021710c6e6b382a0e886 (patch) | |
tree | 9963a1d9b240128e6a631cffbedf25529b1b49ff /src/argaze/DataFeatures.py | |
parent | 3cc3dd1c3ba0f4010f45e68615b342127cba6f6e (diff) | |
download | argaze-007feec2a8d8de80f40b021710c6e6b382a0e886.zip argaze-007feec2a8d8de80f40b021710c6e6b382a0e886.tar.gz argaze-007feec2a8d8de80f40b021710c6e6b382a0e886.tar.bz2 argaze-007feec2a8d8de80f40b021710c6e6b382a0e886.tar.xz |
Fixing typo and docstrings.
Diffstat (limited to 'src/argaze/DataFeatures.py')
-rw-r--r-- | src/argaze/DataFeatures.py | 1386 |
1 files changed, 689 insertions, 697 deletions
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 7189001..a7b0a48 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -16,48 +16,47 @@ __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" -from typing import Self +import bisect +import json +import logging +import math import os import sys -import logging -import traceback -import importlib -import collections -import json -import bisect import threading -import math import time +from typing import Self -import pandas -import numpy import cv2 -import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches +import matplotlib.pyplot as mpyplot +import numpy +import pandas from colorama import Style, Fore # Define global working directory used to load file using relative path WORKING_DIRECTORY = [None] + def get_working_directory() -> str: - """Get global working directory.""" - return WORKING_DIRECTORY[0] + """Get global working directory.""" + return WORKING_DIRECTORY[0] + def set_working_directory(working_directory: str): - """Set global working directory.""" + """Set global working directory.""" - # Forget former global working directory - if WORKING_DIRECTORY[0] is not None: + # Forget former global working directory + if WORKING_DIRECTORY[0] is not None: + sys.path.remove(WORKING_DIRECTORY[0]) - sys.path.remove(WORKING_DIRECTORY[0]) + # Append new working directory to Python path + sys.path.append(working_directory) - # Append new working directory to Python path - sys.path.append(working_directory) + WORKING_DIRECTORY[0] = working_directory - WORKING_DIRECTORY[0] = working_directory def get_class(class_path: str) -> object: - """Get class object from 'path.to.class' string. + """Get class object from 'path.to.class' string. Parameters: class_path: a 'path.to.class' string. @@ -65,18 +64,19 @@ def get_class(class_path: str) -> object: Returns: class: a 'path.to.class' class. """ - parts = class_path.split('.') - module = ".".join(parts[:-1]) + parts = class_path.split('.') + module = ".".join(parts[:-1]) - m = __import__(module) + m = __import__(module) - for comp in parts[1:]: - m = getattr(m, comp) + for comp in parts[1:]: + m = getattr(m, comp) + + return m - return m def get_class_path(o: object) -> str: - """Get 'path.to.class' class path from object. + """Get 'path.to.class' class path from object. Parameters: o: any object instance. @@ -84,33 +84,33 @@ def get_class_path(o: object) -> str: Returns: class_path: object 'path.to.class' class. """ - c = o.__class__ - m = c.__module__ + c = o.__class__ + m = c.__module__ - # Avoid outputs like 'builtins.str' - if m == 'builtins': + # Avoid outputs like 'builtins.str' + if m == 'builtins': + return c.__qualname__ - return c.__qualname__ + return m + '.' + c.__qualname__ - return m + '.' + c.__qualname__ def properties(cls) -> list: - """get class properties name.""" + """get class properties name.""" - properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)] + properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)] - for base in cls.__bases__: + for base in cls.__bases__: - for name, item in base.__dict__.items(): + for name, item in base.__dict__.items(): - if isinstance(item, property): + if isinstance(item, property): + properties.append(name) - properties.append(name) + return properties - return properties -def from_json(configuration_filepath: str, patch_filepath: str = None) -> object: - """ +def from_json(configuration_filepath: str, patch_filepath: str = None) -> any: + """ Load object instance from .json file. !!! note @@ -121,293 +121,295 @@ def from_json(configuration_filepath: str, patch_filepath: str = None) -> object patch_filepath: path to json patch file to modify any configuration entries """ - logging.debug('DataFeatures.from_json') + logging.debug('DataFeatures.from_json') - # Edit working directory once - if get_working_directory() is None: + # Edit working directory once + if get_working_directory() is None: + set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath))) - set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath))) + logging.debug('\t> set global working directory as %s', get_working_directory()) - logging.debug('\t> set global working directory as %s', get_working_directory()) + # Load configuration from JSON file + with open(configuration_filepath) as configuration_file: - # Load configuration from JSON file - with open(configuration_filepath) as configuration_file: + object_data = json.load(configuration_file) - object_data = json.load(configuration_file) + # Apply patch to configuration if required + if patch_filepath is not None: - # Apply patch to configuration if required - if patch_filepath is not None: + with open(patch_filepath) as patch_file: - with open(patch_filepath) as patch_file: + patch_data = json.load(patch_file) - patch_data = json.load(patch_file) + import collections.abc - import collections.abc + def update(d, u): - def update(d, u): + for k, v in u.items(): - for k, v in u.items(): + if isinstance(v, collections.abc.Mapping): - if isinstance(v, collections.abc.Mapping): + d[k] = update(d.get(k, {}), v) - d[k] = update(d.get(k, {}), v) + elif v is None: - elif v is None: + del d[k] - del d[k] + else: - else: + d[k] = v - d[k] = v + return d - return d + objects_data = update(object_data, patch_data) - objects_data = update(object_data, patch_data) + # Load unique object + object_class, object_data = object_data.popitem() - # Load unique object - object_class, object_data = object_data.popitem() + # Instanciate class + logging.debug('\t+ create %s object', object_class) - # Instanciate class - logging.debug('\t+ create %s object', object_class) + # noinspection PyCallingNonCallable + return get_class(object_class)(**object_data) - return get_class(object_class)(**object_data) def from_dict(expected_value_type: type, data: dict) -> any: - """Load expected type instance(s) from dict values.""" + """Load expected type instance(s) from dict values.""" - logging.debug('\t> load %s from dict', expected_value_type.__name__) + logging.debug('\t> load %s from dict', expected_value_type.__name__) - # Check if json keys are PipelineStepObject class and store them in a list - new_objects_list = [] + # Check if json keys are PipelineStepObject class and store them in a list + new_objects_list = [] - for key, value in data.items(): + for key, value in data.items(): - try: + try: - new_class = get_class(key) + new_class = get_class(key) - except ValueError as e: + except ValueError as e: - # Keys are not class name - if str(e) == 'Empty module name': + # Keys are not class name + if str(e) == 'Empty module name': - break + break - else: + else: - raise(e) + raise (e) - logging.debug('\t+ create %s object from key using value as argument', key) + logging.debug('\t+ create %s object from key using value as argument', key) - new_objects_list.append( new_class(**value) ) + # noinspection PyCallingNonCallable + new_objects_list.append(new_class(**value)) - # Only one object have been loaded: pass the object if it is a subclass of expected type - if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type): + # Only one object have been loaded: pass the object if it is a subclass of expected type + if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type): - return new_objects_list[0] + return new_objects_list[0] - # Pass non empty objects list - elif len(new_objects_list) > 0: + # Pass non-empty objects list + elif len(new_objects_list) > 0: - return new_objects_list + return new_objects_list - # Otherwise, data are parameters of the expected class - logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__) + # Otherwise, data are parameters of the expected class + logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__) - return expected_value_type(**data) + return expected_value_type(**data) -def as_dict(obj, filter: bool=True) -> dict: - """Export object as dictionary. - Parameters: - filter: remove None attribute values. - """ - _dict = {} +def as_dict(obj, filter: bool = True) -> dict: + """Export object as dictionary. + + Parameters: + obj: + filter: remove None attribute values. + """ + _dict = {} - for p in properties(obj.__class__): + for p in properties(obj.__class__): - v = getattr(obj, p) + v = getattr(obj, p) - if not filter or v is not None: + if not filter or v is not None: + _dict[p] = v - _dict[p] = v + return _dict - return _dict class JsonEncoder(json.JSONEncoder): - """Specific ArGaze JSON Encoder.""" + """Specific ArGaze JSON Encoder.""" - def default(self, obj): - """default implementation to serialize object.""" + def default(self, obj): + """default implementation to serialize object.""" - # numpy cases - if isinstance(obj, numpy.integer): - return int(obj) + # numpy cases + if isinstance(obj, numpy.integer): + return int(obj) - elif isinstance(obj, numpy.floating): - return float(obj) + elif isinstance(obj, numpy.floating): + return float(obj) - elif isinstance(obj, numpy.ndarray): - return obj.tolist() + elif isinstance(obj, numpy.ndarray): + return obj.tolist() - # default case - try: + # default case + try: - return json.JSONEncoder.default(self, obj) + return json.JSONEncoder.default(self, obj) - # class case - except: + # class case + except: - # ignore attribute starting with _ - public_dict = {} + # ignore attribute starting with _ + public_dict = {} - for k, v in vars(obj).items(): - - if not k.startswith('_'): - - # numpy cases - if isinstance(v, numpy.integer): - v = int(v) + for k, v in vars(obj).items(): - elif isinstance(v, numpy.floating): - v = float(v) + if not k.startswith('_'): - elif isinstance(v, numpy.ndarray): - v = v.tolist() + # numpy cases + if isinstance(v, numpy.integer): + v = int(v) - public_dict[k] = v + elif isinstance(v, numpy.floating): + v = float(v) + + elif isinstance(v, numpy.ndarray): + v = v.tolist() + + public_dict[k] = v + + return public_dict - return public_dict class DataDictionary(dict): - """Enable dot.notation access to dictionary attributes""" + """Enable dot notation access to dictionary attributes""" + + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ - __getattr__ = dict.get - __setattr__ = dict.__setitem__ - __delattr__ = dict.__delitem__ class TimestampedObject(): - """Abstract class to enable timestamp management.""" + """Abstract class to enable timestamp management.""" - def __init__(self, timestamp: int|float = math.nan): - """Initialize TimestampedObject.""" - self._timestamp = timestamp + def __init__(self, timestamp: int | float = math.nan): + """Initialize TimestampedObject.""" + self._timestamp = timestamp - def __repr__(self): - """String representation.""" - return json.dumps(as_dict(self)) + def __repr__(self): + """String representation.""" + return json.dumps(as_dict(self)) - @property - def timestamp(self) -> int|float: - """Get object timestamp.""" - return self._timestamp + @property + def timestamp(self) -> int | float: + """Get object timestamp.""" + return self._timestamp - @timestamp.setter - def timestamp(self, timestamp: int|float): - """Set object timestamp.""" - self._timestamp = timestamp + @timestamp.setter + def timestamp(self, timestamp: int | float): + """Set object timestamp.""" + self._timestamp = timestamp - def untimestamp(self): - """Reset object timestamp.""" - self._timestamp = math.nan + def untimestamp(self): + """Reset object timestamp.""" + self._timestamp = math.nan + + def is_timestamped(self) -> bool: + """Is the object timestamped?""" + return not math.isnan(self._timestamp) - def is_timestamped(self) -> bool: - """Is the object timestamped?""" - return not math.isnan(self._timestamp) class TimestampedObjectsList(list): - """Handle timestamped object into a list. + """Handle timestamped object into a list. !!! warning "Timestamped objects are not sorted internally" - Timestamped objects are considered to be stored according at their coming time. + Timestamped objects are considered to be stored according to their coming time. """ - def __init__(self, ts_object_type: type, ts_objects: list = []): + def __init__(self, ts_object_type: type, ts_objects: list = []): - self.__object_type = ts_object_type - self.__object_properties = properties(self.__object_type) + self.__object_type = ts_object_type + self.__object_properties = properties(self.__object_type) - for ts_object in ts_objects: + for ts_object in ts_objects: + self.append(ts_object) - self.append(ts_object) + @property + def object_type(self): + """Get object type handled by the list.""" + return self.__object_type - @property - def object_type(self): - """Get object type handled by the list.""" - return self.__object_type + def append(self, ts_object: TimestampedObject | dict): + """Append timestamped object.""" - def append(self, ts_object: TimestampedObject|dict): - """Append timestamped object.""" - - # Convert dict into object - if type(ts_object) == dict: + # Convert dict into object + if type(ts_object) == dict: + ts_object = from_dict(self.__object_type, ts_object) - ts_object = self.__object_type.from_dict(ts_object) + # Check object type + if type(ts_object) != self.__object_type: - # Check object type - if type(ts_object) != self.__object_type: + if not issubclass(ts_object.__class__, self.__object_type): + raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') - if not issubclass(ts_object.__class__, self.__object_type): + if not ts_object.is_timestamped(): + raise ValueError(f'object is not timestamped') - raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') - - if not ts_object.is_timestamped(): - - raise ValueError(f'object is not timestamped') - - super().append(ts_object) + super().append(ts_object) - def look_for(self, timestamp: int|float) -> TimestampedObject: - """Look for object at given timestamp.""" - for ts_object in self: - - if ts_object.timestamp == timestamp: + def look_for(self, timestamp: int | float) -> TimestampedObject: + """Look for object at given timestamp.""" + for ts_object in self: - return ts_object + if ts_object.timestamp == timestamp: + return ts_object - def __add__(self, ts_objects: list = []) -> Self: - """Append timestamped objects list.""" + def __add__(self, ts_objects: list = []) -> Self: + """Append timestamped objects list.""" - for ts_object in ts_objects: + for ts_object in ts_objects: + self.append(ts_object) - self.append(ts_object) + return self - return self + @property + def duration(self): + """Get inferred duration from first and last timestamps.""" + if self: - @property - def duration(self): - """Get inferred duration from first and last timestamps.""" - if self: + return self[-1].timestamp - self[0].timestamp - return self[-1].timestamp - self[0].timestamp + else: - else: + return 0 - return 0 + def timestamps(self): + """Get all timestamps in list.""" + return [ts_object.timestamp for ts_object in self] - def timestamps(self): - """Get all timestamps in list.""" - return [ts_object.timestamp for ts_object in self] + def tuples(self) -> list: + """Get all timestamped objects as list of tuple.""" + return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] - def tuples(self) -> list: - """Get all timestamped objects as list of tuple.""" - return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] + @classmethod + def from_dataframe(cls, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self: + """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" - @classmethod - def from_dataframe(self, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self: - """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" + dataframe.drop(exclude, inplace=True, axis=True) - dataframe.drop(exclude, inplace=True, axis=True) + assert (dataframe.index.name == 'timestamp') - assert(dataframe.index.name == 'timestamp') + object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in + dataframe.to_dict('index').items()] - object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in dataframe.to_dict('index').items()] + return TimestampedObjectsList(ts_object_type, object_list) - return TimestampedObjectsList(ts_object_type, object_list) - - def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: - """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). + def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: + """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). The optional *split* argument allows tuple values to be stored in dedicated columns. For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} @@ -421,760 +423,750 @@ class TimestampedObjectsList(list): Timestamps are stored as index column called 'timestamp'. """ - df = pandas.DataFrame(self.tuples(), columns=self.__object_properties) + df = pandas.DataFrame(self.tuples(), columns=self.__object_properties) - # Exclude columns - df.drop(exclude, inplace=True, axis=True) + # Exclude columns + df.drop(exclude, inplace=True, axis=True) - # Split columns - if len(split) > 0: + # Split columns + if len(split) > 0: - splited_columns = [] - - for column in df.columns: + split_columns = [] - if column in split.keys(): + for column in df.columns: - df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) - df.drop(column, inplace=True, axis=True) + if column in split.keys(): - for new_column in split[column]: + df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) + df.drop(column, inplace=True, axis=True) - splited_columns.append(new_column) + for new_column in split[column]: + split_columns.append(new_column) - else: + else: - splited_columns.append(column) + split_columns.append(column) - # Reorder splited columns - df = df[splited_columns] + # Reorder split columns + df = df[split_columns] - # Append timestamps as index column - df['timestamp'] = self.timestamps() - df.set_index('timestamp', inplace=True) + # Append timestamps as index column + df['timestamp'] = self.timestamps() + df.set_index('timestamp', inplace=True) - return df + return df - @classmethod - def from_json(self, ts_object_type: type, json_filepath: str) -> Self: - """Create a TimestampedObjectsList from .json file.""" + @classmethod + def from_json(cls, ts_object_type: type, json_filepath: str) -> Self: + """Create a TimestampedObjectsList from .json file.""" - with open(json_filepath, encoding='utf-8') as ts_objects_file: + with open(json_filepath, encoding='utf-8') as ts_objects_file: + json_ts_objects = json.load(ts_objects_file) - json_ts_objects = json.load(ts_objects_file) + return TimestampedObjectsList(ts_object_type, + [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects]) - return TimestampedObjectsList(ts_object_type, [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects]) + def to_json(self, json_filepath: str): + """Save a TimestampedObjectsList to .json file.""" - def to_json(self, json_filepath: str): - """Save a TimestampedObjectsList to .json file.""" + with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file: + json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ') - with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file: + def __repr__(self): + """String representation""" + return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, ) - json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ') + def __str__(self): + """String representation""" + return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, ) - def __repr__(self): - """String representation""" - return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) + def pop_last_until(self, timestamp: int | float) -> TimestampedObject: + """Pop all item until a given timestamped value and return the first after.""" - def __str__(self): - """String representation""" - return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) + # get last item before given timestamp + earliest_value = self.get_last_until(timestamp) - def pop_last_until(self, timestamp: int|float) -> TimestampedObject: - """Pop all item until a given timestamped value and return the first after.""" + while self[0].timestamp < earliest_value.timestamp: + self.pop(0) - # get last item before given timestamp - earliest_value = self.get_last_until(timestamp) + return self[0] - while self[0].timestamp < earliest_value.timestamp: + def pop_last_before(self, timestamp: int | float) -> TimestampedObject: + """Pop all item before a given timestamped value and return the last one.""" - self.pop(0) - - return self[0] + # get last item before given timestamp + earliest_value = self.get_last_before(timestamp) - def pop_last_before(self, timestamp: int|float) -> TimestampedObject: - """Pop all item before a given timestamped value and return the last one.""" + popped_value = self.pop(0) - # get last item before given timestamp - earliest_value = self.get_last_before(timestamp) + while popped_value.timestamp != earliest_value.timestamp: + popped_value = self.pop(0) - poped_value = self.pop(0) + return popped_value - while poped_value.timestamp != earliest_value.timestamp: + def get_first_from(self, timestamp: int | float) -> TimestampedObject: + """Retrieve first item timestamp from a given timestamp value.""" - poped_value = self.pop(0) + ts_list = self.timestamps() + first_from_index = bisect.bisect_left(ts_list, timestamp) - return poped_value + if first_from_index < len(self): - def get_first_from(self, timestamp: int|float) -> TimestampedObject: - """Retreive first item timestamp from a given timestamp value.""" + return self[ts_list[first_from_index]] - ts_list = self.timestamps() - first_from_index = bisect.bisect_left(ts_list, timestamp) + else: - if first_from_index < len(self): + raise KeyError(f'No data stored after {timestamp} timestamp.') - return self[ts_list[first_from_index]] - - else: - - raise KeyError(f'No data stored after {timestamp} timestamp.') + def get_last_before(self, timestamp: int | float) -> TimestampedObject: + """Retrieve last item timestamp before a given timestamp value.""" - def get_last_before(self, timestamp: int|float) -> TimestampedObject: - """Retreive last item timestamp before a given timestamp value.""" + ts_list = self.timestamps() + last_before_index = bisect.bisect_left(ts_list, timestamp) - 1 - ts_list = self.timestamps() - last_before_index = bisect.bisect_left(ts_list, timestamp) - 1 + if last_before_index >= 0: - if last_before_index >= 0: + return self[ts_list[last_before_index]] - return self[ts_list[last_before_index]] - - else: - - raise KeyError(f'No data stored before {timestamp} timestamp.') - - def get_last_until(self, timestamp: int|float) -> TimestampedObject: - """Retreive last item timestamp until a given timestamp value.""" + else: + + raise KeyError(f'No data stored before {timestamp} timestamp.') + + def get_last_until(self, timestamp: int | float) -> TimestampedObject: + """Retrieve last item timestamp until a given timestamp value.""" - ts_list = self.timestamps() - last_until_index = bisect.bisect_right(ts_list, timestamp) - 1 + ts_list = self.timestamps() + last_until_index = bisect.bisect_right(ts_list, timestamp) - 1 - if last_until_index >= 0: + if last_until_index >= 0: - return self[ts_list[last_until_index]] - - else: - - raise KeyError(f'No data stored until {timestamp} timestamp.') + return self[ts_list[last_until_index]] - def plot(self, names=[], colors=[], split={}, samples=None) -> list: - """Plot as [matplotlib](https://matplotlib.org/) time chart.""" + else: - df = self.as_dataframe(split=split) - legend_patches = [] + raise KeyError(f'No data stored until {timestamp} timestamp.') - # decimate data - if samples != None: + def plot(self, names=[], colors=[], split={}, samples=None) -> list: + """Plot as [matplotlib](https://matplotlib.org/) time chart.""" - if samples < len(df): + df = self.as_dataframe(split=split) + legend_patches = [] - step = int(len(df) / samples) + 1 - df = df.iloc[::step, :] + # decimate data + if samples != None: - for name, color in zip(names, colors): + if samples < len(df): + step = int(len(df) / samples) + 1 + df = df.iloc[::step, :] - markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) - mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1) - mpyplot.setp(stemlines, color=color, linewidth=1) - mpyplot.setp(baseline, color=color, linewidth=1) + for name, color in zip(names, colors): + markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) + mpyplot.setp(markerline, color=color, linewidth=1, markersize=1) + mpyplot.setp(stemlines, color=color, linewidth=1) + mpyplot.setp(baseline, color=color, linewidth=1) - legend_patches.append(mpatches.Patch(color=color, label=name.upper())) + legend_patches.append(mpatches.Patch(color=color, label=name.upper())) + + return legend_patches - return legend_patches class SharedObject(TimestampedObject): - """Abstract class to enable multiple threads sharing for timestamped object.""" + """Abstract class to enable multiple threads sharing for timestamped object.""" - def __init__(self, timestamp: int|float = math.nan): + def __init__(self, timestamp: int | float = math.nan): + TimestampedObject.__init__(self, timestamp) + self._lock = threading.Lock() + self._execution_times = {} + self._exceptions = {} - TimestampedObject.__init__(self, timestamp) - self._lock = threading.Lock() - self._execution_times = {} - self._exceptions = {} class TimestampedException(Exception, TimestampedObject): - """Wrap exception to keep track of raising timestamp.""" + """Wrap exception to keep track of raising timestamp.""" - def __init__(self, exception = Exception, timestamp: int|float = math.nan): + def __init__(self, exception=Exception, timestamp: int | float = math.nan): + Exception.__init__(self, exception) + TimestampedObject.__init__(self, timestamp) - Exception.__init__(self, exception) - TimestampedObject.__init__(self, timestamp) class TimestampedExceptions(TimestampedObjectsList): - """Handle timestamped exceptions into a list.""" - - def __init__(self, exceptions: list = []): + """Handle timestamped exceptions into a list.""" + + def __init__(self, exceptions: list = []): + TimestampedObjectsList.__init__(self, TimestampedException, exceptions) - TimestampedObjectsList.__init__(self, TimestampedException, exceptions) + def values(self) -> list[str]: + """Get all timestamped exception values as list of messages.""" + return [ts_exception.message for ts_exception in self] - def values(self) -> list[str]: - """Get all timestamped exception values as list of messages.""" - return [ts_exception.message for ts_exception in self] class PipelineStepLoadingFailed(Exception): - """ + """ Exception raised when pipeline step object loading fails. """ - def __init__(self, message): - super().__init__(message) - -class TimestampedImage(numpy.ndarray, TimestampedObject): - """Wrap numpy.array to timestamp image.""" + def __init__(self, message): + super().__init__(message) - def __new__(cls, array: numpy.array, timestamp: int|float = math.nan): - return numpy.ndarray.__new__(cls, array.shape, dtype = array.dtype, buffer = array) +class TimestampedImage(numpy.ndarray, TimestampedObject): + """Wrap numpy.array to timestamp image.""" - def __init__(self, array: numpy.array, timestamp: int|float = math.nan): + def __new__(cls, array: numpy.array, timestamp: int | float = math.nan): + return numpy.ndarray.__new__(cls, array.shape, dtype=array.dtype, buffer=array) - TimestampedObject.__init__(self, timestamp) + def __init__(self, array: numpy.array, timestamp: int | float = math.nan): + TimestampedObject.__init__(self, timestamp) - def __array_finalize__(self, obj): + def __array_finalize__(self, obj): + pass - pass + @property + def size(self) -> list: + """Return list with width and height.""" + return list(self.shape[0:2][::-1]) - @property - def size(self) -> list: - """Return list with width and heigth.""" - return list(self.shape[0:2][::-1]) class TimestampedImages(TimestampedObjectsList): - """Handle timestamped images into a list.""" - - def __init__(self, images: list = []): + """Handle timestamped images into a list.""" + + def __init__(self, images: list = []): + TimestampedObjectsList.__init__(self, TimestampedImage, images) - TimestampedObjectsList.__init__(self, TimestampedImage, images) def PipelineStepInit(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step __init__ method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step __init__ method.""" - def wrapper(self, **kwargs): - """Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call. + def wrapper(self, **kwargs): + """Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call. Parameters: + self: kwargs: any arguments defined by PipelineStepMethodInit. """ - # Init pipeline step object attributes - PipelineStepObject.__init__(self) + # Init pipeline step object attributes + PipelineStepObject.__init__(self) - # Init class attributes - method(self, **kwargs) + # Init class attributes + method(self, **kwargs) - # Update all attributes - self.update_attributes(kwargs) + # Update all attributes + self.update_attributes(kwargs) + + return wrapper - return wrapper def PipelineStepEnter(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step __enter__ method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step __enter__ method.""" + + def wrapper(self): + """Wrap pipeline step __enter__ method to call super, observers and children __enter__ method.""" - def wrapper(self): - """Wrap pipeline step __enter__ method to call super, observers and children __enter__ method.""" + logging.debug('%s.__enter__', get_class_path(self)) - logging.debug('%s.__enter__', get_class_path(self)) + method(self) - method(self) + return PipelineStepObject.__enter__(self) - return PipelineStepObject.__enter__(self) + return wrapper - return wrapper def PipelineStepExit(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step __exit__ method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step __exit__ method.""" - def wrapper(self, *args): - """Wrap pipeline step __exit__ method to call super, observers and children __exit__ method.""" + def wrapper(self, *args): + """Wrap pipeline step __exit__ method to call super, observers and children __exit__ method.""" - logging.debug('%s.__exit__', get_class_path(self)) + logging.debug('%s.__exit__', get_class_path(self)) - PipelineStepObject.__exit__(self, *args) + PipelineStepObject.__exit__(self, *args) - method(self, *args) + method(self, *args) + + return wrapper - return wrapper def PipelineStepAttributeSetter(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step attribute setter.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step attribute setter.""" - def wrapper(self, new_value, unwrap: bool = False): - """Wrap pipeline step attribute setter to load attribute from file. + def wrapper(self, new_value, unwrap: bool = False): + """Wrap pipeline step attribute setter to load attribute from file. - Parameters: - new_value: value used to set attribute. - unwrap: call wrapped method directly. - """ - if unwrap: + Parameters: + self: + new_value: value used to set attribute. + unwrap: call wrapped method directly. + """ + if unwrap: + return method(self, new_value) - return method(self, new_value) + # Get new value type + new_value_type = type(new_value) - # Get new value type - new_value_type = type(new_value) + # Check setter annotations to get expected value type + try: - # Check setter annotations to get expected value type - try: + expected_value_type = list(method.__annotations__.values())[0] - expected_value_type = list(method.__annotations__.values())[0] + except KeyError: - except KeyError: + raise ( + PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}')) - raise(PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}')) + logging.debug('%s@%s.setter', get_class_path(self), method.__name__) + logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__) - logging.debug('%s@%s.setter', get_class_path(self), method.__name__) - logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__) + # String not expected: load value from file + if new_value_type == str and new_value_type != expected_value_type: - # String not expected: load value from file - if new_value_type == str and new_value_type != expected_value_type: + split_point = new_value.split('.') - split_point = new_value.split('.') + # String have a dot inside: file path with format + if len(split_point) > 1: - # String have a dot inside: file path with format - if len(split_point) > 1: + file_format = split_point[-1].upper() - file_format = split_point[-1].upper() + logging.debug('\t> %s is a path to a %s file', new_value, file_format) - logging.debug('\t> %s is a path to a %s file', new_value, file_format) - - filepath = os.path.join(get_working_directory(), new_value) + filepath = os.path.join(get_working_directory(), new_value) - # Load image from JPG and PNG formats - if file_format == 'JPG' or file_format == 'PNG': + # Load image from JPG and PNG formats + if file_format == 'JPG' or file_format == 'PNG': - return method(self, TimestampedImage(cv2.imread(filepath))) + return method(self, TimestampedImage(cv2.imread(filepath))) - # Load image from OBJ formats - elif file_format == 'OBJ': + # Load image from OBJ formats + elif file_format == 'OBJ': - return method(self, expected_value_type.from_obj(filepath)) + return method(self, expected_value_type.from_obj(filepath)) - # Load object from JSON file - elif file_format == 'JSON': + # Load object from JSON file + elif file_format == 'JSON': - with open(filepath) as file: + with open(filepath) as file: - return method(self, from_dict(expected_value_type, json.load(file))) + return method(self, from_dict(expected_value_type, json.load(file))) - # No point inside string: identifier name - else: + # No point inside string: identifier name + else: - logging.debug('\t> %s is an identifier', new_value) - logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__) - - return method(self, expected_value_type(new_value)) + logging.debug('\t> %s is an identifier', new_value) + logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__) - # Dict not expected: load value from dict - if new_value_type == dict and expected_value_type != dict: + return method(self, expected_value_type(new_value)) - return method(self, from_dict(expected_value_type, new_value)) + # Dict not expected: load value from dict + if new_value_type == dict and expected_value_type != dict: + return method(self, from_dict(expected_value_type, new_value)) - # Otherwise, pass new value to setter method - logging.debug('\t> use %s value as passed', new_value_type.__name__) + # Otherwise, pass new value to setter method + logging.debug('\t> use %s value as passed', new_value_type.__name__) - method(self, new_value) + method(self, new_value) + + return wrapper - return wrapper def PipelineStepImage(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step image method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step image method.""" + + def wrapper(self, **kwargs) -> numpy.array: + """Wrap pipeline step image method.""" - def wrapper(self, **kwargs) -> numpy.array: - """Wrap pipeline step image method.""" + if kwargs: - if kwargs: + logging.debug('\t> using kwargs') - logging.debug('\t> using kwargs') + return method(self, **kwargs) - return method(self, **kwargs) + else: - else: + logging.debug('\t> using image_parameters') - logging.debug('\t> using image_parameters') + return method(self, **self.image_parameters) - return method(self, **self.image_parameters) + return wrapper - return wrapper def PipelineStepDraw(method): - """Define a decorator use into PipelineStepObject class to wrap pipeline step draw method.""" + """Define a decorator use into PipelineStepObject class to wrap pipeline step draw method.""" - def wrapper(self, image: numpy.array, **kwargs): - """Wrap pipeline step draw method.""" + def wrapper(self, image: numpy.array, **kwargs): + """Wrap pipeline step draw method.""" - if kwargs: + if kwargs: - logging.debug('\t> using kwargs') + logging.debug('\t> using kwargs') - method(self, image, **kwargs) + method(self, image, **kwargs) - else: + else: - logging.debug('\t> using draw_parameters') + logging.debug('\t> using draw_parameters') - method(self, image, **self.draw_parameters) + method(self, image, **self.draw_parameters) - return wrapper + return wrapper + +# noinspection PyAttributeOutsideInit class PipelineStepObject(): - """ + """ Define class to assess pipeline step methods execution time and observe them. """ - __initialized = False - - def __init__(self): - """Initialize PipelineStepObject.""" - - if not self.__initialized: + __initialized = False - logging.debug('%s.__init__', get_class_path(self)) + def __init__(self): + """Initialize PipelineStepObject.""" - # Init private attributes - self.__initialized = True - self.__name = None - self.__observers = [] - self.__execution_times = {} - self.__image_parameters = {} + if not self.__initialized: + logging.debug('%s.__init__', get_class_path(self)) - # Init protected attributes - self._image_parameters = {} - self._draw_parameters = {} - - # Parent attribute will be setup later by parent it self - self.__parent = None + # Init private attributes + self.__initialized = True + self.__name = None + self.__observers = [] + self.__execution_times = {} + self.__image_parameters = {} - def __enter__(self): - """Define default method to enter into pipeline step object context.""" + # Init protected attributes + self._image_parameters = {} + self._draw_parameters = {} - # Start children pipeline step objects - for child in self.children: + # Parent attribute will be setup later by parent itself + self.__parent = None - child.__enter__() + def __enter__(self): + """Define default method to enter into pipeline step object context.""" - # Start observers - for observer in self.observers: + # Start children pipeline step objects + for child in self.children: + child.__enter__() - observer.__enter__() + # Start observers + for observer in self.observers: + observer.__enter__() - return self + return self - def __exit__(self, exception_type, exception_value, exception_traceback): - """Define default method to exit from pipeline step object context.""" + def __exit__(self, exception_type, exception_value, exception_traceback): + """Define default method to exit from pipeline step object context.""" - # Stop observers - for observer in self.observers: + # Stop observers + for observer in self.observers: + observer.__exit__(exception_type, exception_value, exception_traceback) - observer.__exit__(exception_type, exception_value, exception_traceback) + # Stop children pipeline step objects + for child in self.children: + child.__exit__(exception_type, exception_value, exception_traceback) - # Stop children pipeline step objects - for child in self.children: + def update_attributes(self, object_data: dict): + """Update pipeline step object attributes with dictionary.""" - child.__exit__(exception_type, exception_value, exception_traceback) + for key, value in object_data.items(): - def update_attributes(self, object_data: dict): - """Update pipeline step object attributes with dictionary.""" + if hasattr(self, key): - for key, value in object_data.items(): + logging.debug('%s.update_attributes > update %s with %s value', get_class_path(self), key, + type(value).__name__) - if hasattr(self, key): + setattr(self, key, value) - logging.debug('%s.update_attributes > update %s with %s value', get_class_path(self), key, type(value).__name__) + else: - setattr(self, key, value) + raise (AttributeError(f'{get_class_path(self)} has not {key} attribute.')) - else: + @property + def name(self) -> str: + """Get pipeline step object's name.""" + return self.__name - raise(AttributeError(f'{get_class_path(self)} has not {key} attribute.')) + @name.setter + def name(self, name: str): + """Set pipeline step object's name.""" + self.__name = name - @property - def name(self) -> str: - """Get pipeline step object's name.""" - return self.__name + @property + def parent(self) -> object: + """Get pipeline step object's parent object.""" + return self.__parent - @name.setter - def name(self, name: str): - """Set pipeline step object's name.""" - self.__name = name + @parent.setter + def parent(self, parent: object): + """Set layer's parent object.""" + self.__parent = parent - @property - def parent(self) -> object: - """Get pipeline step object's parent object.""" - return self.__parent + @property + def observers(self) -> list: + """Pipeline step object observers list.""" + return self.__observers - @parent.setter - def parent(self, parent: object): - """Set layer's parent object.""" - self.__parent = parent + @observers.setter + @PipelineStepAttributeSetter + def observers(self, observers: list): - @property - def observers(self) -> list: - """Pipeline step object observers list.""" - return self.__observers + # Edit new observers dictionary + self.__observers = observers - @observers.setter - @PipelineStepAttributeSetter - def observers(self, observers: list): + @property + def execution_times(self): + """Get pipeline step object observers execution times dictionary.""" + return self.__execution_times - # Edit new observers dictionary - self.__observers = observers + @property + def image_parameters(self) -> dict: + """image method parameters dictionary.""" + return self._image_parameters - @property - def execution_times(self): - """Get pipeline step object observers execution times dictionary.""" - return self.__execution_times - - @property - def image_parameters(self) -> dict: - """image method parameters dictionary.""" - return self._image_parameters + @image_parameters.setter + @PipelineStepAttributeSetter + def image_parameters(self, image_parameters: dict): - @image_parameters.setter - @PipelineStepAttributeSetter - def image_parameters(self, image_parameters: dict): + self._image_parameters = image_parameters - self._image_parameters = image_parameters + @property + def draw_parameters(self) -> dict: + """draw method parameters dictionary.""" + return self._draw_parameters - @property - def draw_parameters(self) -> dict: - """draw method parameters dictionary.""" - return self._draw_parameters + @draw_parameters.setter + @PipelineStepAttributeSetter + def draw_parameters(self, draw_parameters: dict): - @draw_parameters.setter - @PipelineStepAttributeSetter - def draw_parameters(self, draw_parameters: dict): + self._draw_parameters = draw_parameters - self._draw_parameters = draw_parameters - - def as_dict(self) -> dict: - """Export PipelineStepObject attributes as dictionary. + def as_dict(self) -> dict: + """Export PipelineStepObject attributes as dictionary. Returns: object_data: dictionary with pipeline step object attributes values. """ - return { - "name": self.__name, - "observers": self.__observers - } - - def to_json(self, json_filepath: str = None): - """Save pipeline step object into .json file.""" - - # Remember file path to ease rewriting - if json_filepath is not None: + return { + "name": self.__name, + "observers": self.__observers + } - self.__json_filepath = json_filepath + # noinspection PyAttributeOutsideInit + def to_json(self, json_filepath: str = None): + """Save pipeline step object into .json file.""" - # Open file - with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: + # Remember file path to ease rewriting + if json_filepath is not None: + # noinspection PyAttributeOutsideInit + self.__json_filepath = json_filepath - json.dump({self.__class__.__module__:as_dict(self)}, object_file, ensure_ascii=False, indent=4) + # Open file + with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: + json.dump({self.__class__.__module__: as_dict(self)}, object_file, ensure_ascii=False, indent=4) - # QUESTION: maybe we need two saving mode? - #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder) + # QUESTION: maybe we need two saving mode? + #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder) - def __str__(self) -> str: - """ + def __str__(self) -> str: + """ String representation of pipeline step object. Returns: String representation """ - logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '') - - tabs = self.tabulation - output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n' - - if self.__name is not None: - output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n' - - if self.__parent is not None: - output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n' - - if len(self.__observers): - output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n' - for observer in self.__observers: - output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n' - - for name, value in self.properties: + logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '') - logging.debug('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) + tabs = self.tabulation + output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n' - output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: ' + if self.__name is not None: + output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n' - if type(value) == dict: + if self.__parent is not None: + output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n' - output += '\n' + if len(self.__observers): + output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n' + for observer in self.__observers: + output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n' - for k, v in value.items(): + for name, value in self.properties: - output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n' + logging.debug('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) - elif type(value) == list: + output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: ' - output += '\n' + if type(value) == dict: - for v in value: + output += '\n' - output += f'{tabs}\t - {v}\n' + for k, v in value.items(): + output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n' - elif type(value) == numpy.ndarray or type(value) == TimestampedImage: + elif type(value) == list: - output += f'numpy.array{value.shape}\n' + output += '\n' - elif type(value) == pandas.DataFrame: + for v in value: + output += f'{tabs}\t - {v}\n' - output += f'pandas.DataFrame{value.shape}\n' + elif type(value) == numpy.ndarray or type(value) == TimestampedImage: - else: + output += f'numpy.array{value.shape}\n' - try: + elif type(value) == pandas.DataFrame: - output += f'{value}' + output += f'pandas.DataFrame{value.shape}\n' - except TypeError as e: + else: - logging.error('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) + try: - output += f'{Fore.RED}{Style.BRIGHT}!!! {get_class_path(self)}.{name}: {e}{Style.RESET_ALL}\n\n' + output += f'{value}' - if output[-1] != '\n': + except TypeError as e: - output += '\n' + logging.error('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) - return output + output += f'{Fore.RED}{Style.BRIGHT}!!! {get_class_path(self)}.{name}: {e}{Style.RESET_ALL}\n\n' - @property - def tabulation(self) -> str: - """Edit tabulation string according parents number.""" + if output[-1] != '\n': + output += '\n' - tabs = '' - parent = self.__parent + return output - while (parent is not None): + @property + def tabulation(self) -> str: + """Edit tabulation string according parents number.""" - tabs += '\t' - parent = parent.parent + tabs = '' + parent = self.__parent - return tabs + while (parent is not None): + tabs += '\t' + parent = parent.parent - @property - def properties(self) -> tuple[name, any]: - """Iterate over pipeline step properties values.""" + return tabs - properties = [name for name, item in self.__class__.__dict__.items() if isinstance(item, property)] + @property + def properties(self) -> tuple[name, any]: + """Iterate over pipeline step properties values.""" - for base in self.__class__.__bases__: + properties = [name for name, item in self.__class__.__dict__.items() if isinstance(item, property)] - if base != PipelineStepObject and base != SharedObject: + for base in self.__class__.__bases__: - for name, item in base.__dict__.items(): + if base != PipelineStepObject and base != SharedObject: - if isinstance(item, property) and not name in properties: + for name, item in base.__dict__.items(): - properties.append(name) + if isinstance(item, property) and not name in properties: + properties.append(name) - for name in properties: + for name in properties: + yield name, getattr(self, name) - yield name, getattr(self, name) + @property + def children(self) -> object: + """Iterate over children pipeline step objects.""" - @property - def children(self) -> object: - """Iterate over children pipeline step objects.""" + for name, value in self.properties: - for name, value in self.properties: + # Pipeline step object attribute + if issubclass(type(value), PipelineStepObject) and value != self.parent: - # Pipeline step object attribute - if issubclass(type(value), PipelineStepObject) and value != self.parent: + yield value - yield value + # Pipeline step objects list attribute + elif type(value) == list: - # Pipeline step objects list attribute - elif type(value) == list: + for p in value: - for p in value: + if issubclass(type(p), PipelineStepObject): + yield p - if issubclass(type(p), PipelineStepObject): + # Pipeline step objects list attribute + elif type(value) == dict: - yield p + for p in value.values(): - # Pipeline step objects list attribute - elif type(value) == dict: + if issubclass(type(p), PipelineStepObject): + yield p - for p in value.values(): - - if issubclass(type(p), PipelineStepObject): - - yield p def PipelineStepMethod(method): - """Define a decorator use into PipelineStepObject class to declare pipeline method. - - !!! danger - PipelineStepMethod must have a timestamp as first argument. - """ + """Define a decorator use into PipelineStepObject class to declare pipeline method. - def wrapper(self, *args, timestamp: int|float = None, unwrap: bool = False, catch_exceptions: bool = True, **kwargs): - """Wrap pipeline step method to measure execution time. + !!! danger + PipelineStepMethod must have a timestamp as first argument. + """ - Parameters: - args: any arguments defined by PipelineStepMethod. - timestamp: optional method call timestamp (unit does'nt matter) if first args parameter is not a TimestampedObject instance. - unwrap: extra arguments used in wrapper function to call wrapped method directly. - catch_exceptions: extra arguments used in wrapper function to catch exception. - """ - if timestamp is None and len(args) > 0: + def wrapper(self, *args, timestamp: int | float = None, unwrap: bool = False, catch_exceptions: bool = True, **kwargs): + """Wrap pipeline step method to measure execution time. - if issubclass(type(args[0]), TimestampedObject): + Parameters: + self: + args: any arguments defined by PipelineStepMethod. + timestamp: optional method call timestamp (unit doesn't matter) if first args parameter is not a + TimestampedObject instance. + unwrap: extra arguments used in wrapper function to call wrapped method directly. + catch_exceptions: extra arguments used in wrapper function to catch exception. + """ + if timestamp is None and len(args) > 0: - timestamp = args[0].timestamp + if issubclass(type(args[0]), TimestampedObject): - else: + timestamp = args[0].timestamp - logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.', get_class_path(self), method.__name__, type(args[0]).__name__) + else: - if unwrap: + logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.', get_class_path(self), method.__name__, type(args[0]).__name__) - return method(self, *args, **kwargs) + if unwrap: + return method(self, *args, **kwargs) - # Initialize execution time assessment - start = time.perf_counter() - exception = None - result = None + # Initialize execution time assessment + start = time.perf_counter() + exception = None + result = None - if not catch_exceptions: + if not catch_exceptions: - # Execute wrapped method without catching exceptions - result = method(self, *args, **kwargs) + # Execute wrapped method without catching exceptions + result = method(self, *args, **kwargs) - # Measure execution time - self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 - - else: - - try: + # Measure execution time + self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 - # Execute wrapped method - result = method(self, *args, **kwargs) + else: - except Exception as e: + try: - exception = e + # Execute wrapped method + result = method(self, *args, **kwargs) - finally: + except Exception as e: - # Measure execution time - self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 + exception = e - # Notify observers that method has been called - subscription_name = f'on_{method.__name__}' + finally: - for observer in self.observers: + # Measure execution time + self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 - # Does the observer cares about this method? - if subscription_name in dir(observer): + # Notify observers that method has been called + subscription_name = f'on_{method.__name__}' - subscription = getattr(observer, subscription_name) + for observer in self.observers: - # Call subscription - subscription(timestamp, self, exception) + # Does the observer cares about this method? + if subscription_name in dir(observer): + subscription = getattr(observer, subscription_name) - # Raise timestamped exception - if exception is not None: + # Call subscription + subscription(timestamp, self, exception) - raise TimestampedException(exception, timestamp) + # Raise timestamped exception + if exception is not None: + raise TimestampedException(exception, timestamp) - return result + return result - return wrapper + return wrapper |