"""Miscellaneous data features.""" """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import bisect import json import logging import math import os import sys import threading import time from typing import Self import cv2 import matplotlib.patches as mpatches import matplotlib.pyplot as mpyplot import numpy import pandas from colorama import Style, Fore # Define global working directory used to load file using relative path WORKING_DIRECTORY = [None] def get_working_directory() -> str: """Get global working directory.""" if WORKING_DIRECTORY[0] is not None: return WORKING_DIRECTORY[0] else: raise RuntimeError('No working directory') def set_working_directory(working_directory: str): """Set global working directory.""" # Forget former global working directory if WORKING_DIRECTORY[0] is not None: sys.path.remove(WORKING_DIRECTORY[0]) # Append new working directory to Python path sys.path.append(working_directory) WORKING_DIRECTORY[0] = working_directory def get_class(class_path: str) -> type: """Get class object from 'path.to.class' string. Parameters: class_path: a 'path.to.class' string. Returns: class: a 'path.to.class' class. """ parts = class_path.split('.') module = ".".join(parts[:-1]) m = __import__(module) for comp in parts[1:]: m = getattr(m, comp) return m def get_class_path(o: object) -> str: """Get 'path.to.class' class path from object. Parameters: o: any object instance. Returns: class_path: object 'path.to.class' class. """ c = o.__class__ m = c.__module__ # Avoid outputs like 'builtins.str' if m == 'builtins': return c.__qualname__ return m + '.' + c.__qualname__ def get_class_properties(cls: type) -> dict: """Get class properties dictionary. Parameters: cls: class to consider. Returns: properties: dict of properties stored by names """ # Stop recursion when reaching core objects if cls is not object and cls is not PipelineStepObject and cls is not SharedObject: object_properties = {name: item for name, item in cls.__dict__.items() if isinstance(item, property)} for base in cls.__bases__: base_properties = get_class_properties(base) if base_properties is not None: object_properties.update(base_properties) return object_properties def from_json(filepath: str) -> any: """ Load object from json file. !!! note The directory where json file is will be used as global working directory. Parameters: filepath: path to json file """ logging.debug('DataFeatures.from_json') # Edit working directory once if WORKING_DIRECTORY[0] is None: set_working_directory(os.path.dirname(os.path.abspath(filepath))) logging.debug('\t> set global working directory as %s', get_working_directory()) # Open JSON file with open(filepath) as configuration_file: # Load unique object object_class, object_data = json.load(configuration_file).popitem() ''' # patch_filepath: path to json patch file to modify any configuration entries # Apply patch to configuration if required if patch_filepath is not None: with open(patch_filepath) as patch_file: patch_data = json.load(patch_file) import collections.abc def update(d, u): for k, v in u.items(): if isinstance(v, collections.abc.Mapping): d[k] = update(d.get(k, {}), v) elif v is None: del d[k] else: d[k] = v return d objects_data = update(object_data, patch_data) ''' # Instanciate class logging.debug('\t+ create %s object', object_class) return get_class(object_class)(**object_data) def from_dict(expected_value_type: type, data: dict) -> any: """Load expected type instance(s) from dict values.""" logging.debug('\t> load %s from dict', expected_value_type.__name__) # Check if json keys are PipelineStepObject class and store them in a list new_objects_list = [] for key, value in data.items(): try: new_class = get_class(key) except ValueError as e: # Keys are not class name if str(e) == 'Empty module name': break else: raise (e) logging.debug('\t+ create %s object from key using value as argument', key) # noinspection PyCallingNonCallable new_objects_list.append(new_class(**value)) # Only one object have been loaded: pass the object if it is a subclass of expected type if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type): return new_objects_list[0] # Pass non-empty objects list elif len(new_objects_list) > 0: return new_objects_list # Otherwise, data are parameters of the expected class logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__) return expected_value_type(**data) def as_dict(obj: any, filter: bool = True) -> dict: """Export object as dictionary. Parameters: obj: object to export as dictionary filter: remove None attribute values. """ _dict = {} for p in get_class_properties(obj.__class__): v = getattr(obj, p) if not filter or v is not None: _dict[p] = v return _dict class JsonEncoder(json.JSONEncoder): """Specific ArGaze JSON Encoder.""" def default(self, obj): """default implementation to serialize object.""" # numpy cases if isinstance(obj, numpy.integer): return int(obj) elif isinstance(obj, numpy.floating): return float(obj) elif isinstance(obj, numpy.ndarray): return obj.tolist() # default case try: return json.JSONEncoder.default(self, obj) # class case except: # ignore attribute starting with _ public_dict = {} for k, v in vars(obj).items(): if not k.startswith('_'): # numpy cases if isinstance(v, numpy.integer): v = int(v) elif isinstance(v, numpy.floating): v = float(v) elif isinstance(v, numpy.ndarray): v = v.tolist() public_dict[k] = v return public_dict class DataDictionary(dict): """Enable dot notation access to dictionary attributes""" __getattr__ = dict.get __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ def timestamp(cls): """Decorate a class to enable timestamp management.""" class_init = cls.__init__ class_repr = cls.__repr__ def __init__(self, *args, **kwargs): """Initialize timestamped object.""" try: self._timestamp = kwargs.pop('timestamp') except KeyError: self._timestamp = math.nan class_init(self, *args, **kwargs) def __repr__(self) -> str: """String representation.""" return str(self._timestamp) + ': ' + class_repr(self) if issubclass(cls, TimestampedObjectsList): def get_timestamp(self) -> int | float: """Get first position timestamp.""" if self: return self[0].timestamp def set_timestamp(self, timestamp: int | float): """Block timestamp setting.""" raise ('TimestampedObjectsList timestamp is not settable.') def del_timestamp(self): """Block timestamp resetting.""" raise ('TimestampedObjectsList timestamp cannot be deleted.') def is_timestamped(self) -> bool: """Is the object timestamped?""" return bool(self) else: def get_timestamp(self) -> int | float: """Get object timestamp.""" return self._timestamp def set_timestamp(self, timestamp: int | float): """Set object timestamp.""" self._timestamp = timestamp def del_timestamp(self): """Reset object timestamp.""" self._timestamp = math.nan def is_timestamped(self) -> bool: """Is the object timestamped?""" return not math.isnan(self._timestamp) cls.__init__ = __init__ cls.__repr__ = __repr__ setattr(cls, "timestamp", property(get_timestamp, set_timestamp, del_timestamp, """Object timestamp.""")) setattr(cls, "is_timestamped", is_timestamped) return cls class TimestampedObjectsList(list): """Handle timestamped object into a list. !!! warning "Timestamped objects are not sorted internally" Timestamped objects are considered to be stored according to their coming time. """ # noinspection PyMissingConstructor def __init__(self, ts_object_type: type, ts_objects=None): if ts_objects is None: ts_objects = [] self.__object_type = ts_object_type self.__object_properties_names = list(get_class_properties(self.__object_type).keys()) for ts_object in ts_objects: self.append(ts_object) @property def object_type(self): """Get object type handled by the list.""" return self.__object_type def append(self, ts_object: object | dict): """Append timestamped object.""" # Convert dict into object if type(ts_object) == dict: ts_object = from_dict(self.__object_type, ts_object) # Check object type if type(ts_object) != self.__object_type: if not issubclass(ts_object.__class__, self.__object_type): raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') if not ts_object.is_timestamped(): raise ValueError(f'object is not timestamped') super().append(ts_object) def look_for(self, timestamp: int | float) -> object: """Look for object at given timestamp.""" for ts_object in self: if ts_object.timestamp == timestamp: return ts_object def __add__(self, ts_objects: list = []) -> Self: """Append timestamped objects list.""" for ts_object in ts_objects: self.append(ts_object) return self @property def duration(self): """Get inferred duration from first and last timestamps.""" if self: return self[-1].timestamp - self[0].timestamp else: return 0 def timestamps(self): """Get all timestamps in list.""" return [ts_object.timestamp for ts_object in self] def tuples(self) -> list: """Get all timestamped objects as list of tuple.""" return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] @classmethod def from_dataframe(cls, ts_object_type: type, dataframe: pandas.DataFrame, exclude=None) -> Self: """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" if exclude is None: exclude = [] dataframe.drop(exclude, inplace=True, axis=True) assert (dataframe.index.name == 'timestamp') object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in dataframe.to_dict('index').items()] return TimestampedObjectsList(ts_object_type, object_list) def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). The optional *split* argument allows tuple values to be stored in dedicated columns. For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} !!! warning "Values must be dictionaries" Each key is stored as a column name. !!! note Timestamps are stored as index column called 'timestamp'. """ df = pandas.DataFrame(self.tuples(), columns=self.__object_properties_names) # Exclude columns df.drop(exclude, inplace=True, axis=True) # Split columns if len(split) > 0: split_columns = [] for column in df.columns: if column in split.keys(): df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) df.drop(column, inplace=True, axis=True) for new_column in split[column]: split_columns.append(new_column) else: split_columns.append(column) # Reorder split columns df = df[split_columns] # Append timestamps as index column df['timestamp'] = self.timestamps() df.set_index('timestamp', inplace=True) return df @classmethod def from_json(cls, ts_object_type: type, json_filepath: str) -> Self: """Create a TimestampedObjectsList from .json file.""" with open(json_filepath, encoding='utf-8') as ts_objects_file: json_ts_objects = json.load(ts_objects_file) return TimestampedObjectsList(ts_object_type, [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects]) def to_json(self, json_filepath: str): """Save a TimestampedObjectsList to .json file.""" with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file: json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ') def __repr__(self): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, ) def __str__(self): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, ) def pop_last_until(self, timestamp: int | float) -> object: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp earliest_value = self.get_last_until(timestamp) while self[0].timestamp < earliest_value.timestamp: self.pop(0) return self[0] def pop_last_before(self, timestamp: int | float) -> object: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp earliest_value = self.get_last_before(timestamp) popped_value = self.pop(0) while popped_value.timestamp != earliest_value.timestamp: popped_value = self.pop(0) return popped_value def get_first_from(self, timestamp: int | float) -> object: """Retrieve first item timestamp from a given timestamp value.""" ts_list = self.timestamps() first_from_index = bisect.bisect_left(ts_list, timestamp) if first_from_index < len(self): return self[ts_list[first_from_index]] else: raise KeyError(f'No data stored after {timestamp} timestamp.') def get_last_before(self, timestamp: int | float) -> object: """Retrieve last item timestamp before a given timestamp value.""" ts_list = self.timestamps() last_before_index = bisect.bisect_left(ts_list, timestamp) - 1 if last_before_index >= 0: return self[ts_list[last_before_index]] else: raise KeyError(f'No data stored before {timestamp} timestamp.') def get_last_until(self, timestamp: int | float) -> object: """Retrieve last item timestamp until a given timestamp value.""" ts_list = self.timestamps() last_until_index = bisect.bisect_right(ts_list, timestamp) - 1 if last_until_index >= 0: return self[ts_list[last_until_index]] else: raise KeyError(f'No data stored until {timestamp} timestamp.') def plot(self, names=[], colors=[], split=None, samples=None) -> list: """Plot as [matplotlib](https://matplotlib.org/) time chart.""" if split is None: split = {} df = self.as_dataframe(split=split) legend_patches = [] # decimate data if samples != None: if samples < len(df): step = int(len(df) / samples) + 1 df = df.iloc[::step, :] for name, color in zip(names, colors): markerline, stemlines, baseline = mpyplot.stem(df.index, df[name]) mpyplot.setp(markerline, color=color, linewidth=1, markersize=1) mpyplot.setp(stemlines, color=color, linewidth=1) mpyplot.setp(baseline, color=color, linewidth=1) legend_patches.append(mpatches.Patch(color=color, label=name.upper())) return legend_patches class SharedObject(): """Enable multiple threads sharing.""" def __init__(self): self._lock = threading.Lock() self._execution_times = {} self._exceptions = {} @timestamp class TimestampedException(Exception,): """Enable timestamp management for exception.""" def __init__(self, exception: Exception): Exception.__init__(self, exception) class TimestampedExceptions(TimestampedObjectsList): """Handle timestamped exceptions into a list.""" def __init__(self, exceptions: list = []): TimestampedObjectsList.__init__(self, TimestampedException, exceptions) def values(self) -> list[str]: """Get all timestamped exception values as list of messages.""" return [ts_exception.message for ts_exception in self] class PipelineStepLoadingFailed(Exception): """ Exception raised when pipeline step object loading fails. """ def __init__(self, message): super().__init__(message) @timestamp class TimestampedImage(numpy.ndarray): """Wrap numpy.array to timestamp image.""" def __new__(cls, array: numpy.array, **kwargs): return numpy.ndarray.__new__(cls, array.shape, dtype=array.dtype, buffer=array) def __init__(self, array: numpy.array, **kwargs): pass def __array_finalize__(self, obj): pass @property def size(self) -> list: """Return list with width and height.""" return list(self.shape[0:2][::-1]) class TimestampedImages(TimestampedObjectsList): """Handle timestamped images into a list.""" def __init__(self, images: list = []): TimestampedObjectsList.__init__(self, TimestampedImage, images) def PipelineStepInit(method): """Define a decorator use into PipelineStepObject class to wrap pipeline step __init__ method.""" def wrapper(self, **kwargs): """Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call. Parameters: self: kwargs: any arguments defined by PipelineStepMethodInit. """ # Init pipeline step object attributes PipelineStepObject.__init__(self) # Init class attributes method(self, **kwargs) # Update all attributes self.update_attributes(kwargs) return wrapper def PipelineStepEnter(method): """Define a decorator use into PipelineStepObject class to wrap pipeline step __enter__ method.""" def wrapper(self): """Wrap pipeline step __enter__ method to call super, observers and children __enter__ method.""" logging.debug('%s.__enter__', get_class_path(self)) PipelineStepObject.__enter__(self) method(self) return self return wrapper def PipelineStepExit(method): """Define a decorator use into PipelineStepObject class to wrap pipeline step __exit__ method.""" def wrapper(self, *args): """Wrap pipeline step __exit__ method to call super, observers and children __exit__ method.""" logging.debug('%s.__exit__', get_class_path(self)) method(self, *args) PipelineStepObject.__exit__(self, *args) return wrapper def PipelineStepAttributeSetter(method): """Define a decorator use into PipelineStepObject class to wrap pipeline step attribute setter.""" def wrapper(self, new_value, unwrap: bool = False): """Wrap pipeline step attribute setter to load attribute from file. Parameters: self: new_value: value used to set attribute. unwrap: call wrapped method directly. """ if unwrap: return method(self, new_value) # Get new value type new_value_type = type(new_value) # Check setter annotations to get expected value type try: expected_value_type = list(method.__annotations__.values())[0] except KeyError: raise ( PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}')) logging.debug('%s@%s.setter', get_class_path(self), method.__name__) logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__) # String not expected: load value from file if new_value_type == str and new_value_type != expected_value_type: split_point = new_value.split('.') # String have a dot inside: file path with format if len(split_point) > 1: file_format = split_point[-1].upper() logging.debug('\t> %s is a path to a %s file', new_value, file_format) filepath = os.path.join(get_working_directory(), new_value) # Load image from JPG and PNG formats if file_format == 'JPG' or file_format == 'PNG': return method(self, TimestampedImage(cv2.imread(filepath))) # Load image from OBJ formats elif file_format == 'OBJ': return method(self, expected_value_type.from_obj(filepath)) # Load object from JSON file elif file_format == 'JSON': with open(filepath) as file: return method(self, from_dict(expected_value_type, json.load(file))) # No point inside string: identifier name else: logging.debug('\t> %s is an identifier', new_value) logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__) return method(self, expected_value_type(new_value)) # Dict not expected: load value from dict if new_value_type == dict and expected_value_type != dict: return method(self, from_dict(expected_value_type, new_value)) # Otherwise, pass new value to setter method logging.debug('\t> use %s value as passed', new_value_type.__name__) method(self, new_value) return wrapper def PipelineStepImage(method): """Define a decorator use into PipelineStepObject class to wrap pipeline step image method.""" def wrapper(self, **kwargs) -> numpy.array: """Wrap pipeline step image method.""" if kwargs: logging.debug('\t> using kwargs') return method(self, **kwargs) else: logging.debug('\t> using image_parameters') return method(self, **self.image_parameters) return wrapper def PipelineStepDraw(method): """Define a decorator use into PipelineStepObject class to wrap pipeline step draw method.""" def wrapper(self, image: numpy.array, **kwargs): """Wrap pipeline step draw method.""" if kwargs: logging.debug('\t> using kwargs') method(self, image, **kwargs) else: logging.debug('\t> using draw_parameters') method(self, image, **self.draw_parameters) return wrapper # noinspection PyAttributeOutsideInit class PipelineStepObject(): """Define class to assess pipeline step methods execution time and observe them. """ __initialized = False def __init__(self): """Initialize PipelineStepObject.""" if not self.__initialized: logging.debug('%s.__init__', get_class_path(self)) # Init private attributes self.__initialized = True self.__name = None self.__observers = [] self.__execution_times = {} self.__image_parameters = {} # Init protected attributes self._catch_exceptions = False self._image_parameters = {} self._draw_parameters = {} # Parent attribute will be setup later by parent itself self.__parent = None def __enter__(self): """Define default method to enter into pipeline step object context.""" # Start children pipeline step objects for child in self.children(): child.__enter__() # Start observers for observer in self.observers: observer.__enter__() return self def __exit__(self, exception_type, exception_value, exception_traceback): """Define default method to exit from pipeline step object context.""" # Stop observers for observer in self.observers: observer.__exit__(exception_type, exception_value, exception_traceback) # Stop children pipeline step objects for child in self.children(): child.__exit__(exception_type, exception_value, exception_traceback) def update_attributes(self, object_data: dict): """Update pipeline step object attributes with dictionary.""" for key, value in object_data.items(): if hasattr(self, key): logging.debug('%s.update_attributes > update %s with %s value', get_class_path(self), key, type(value).__name__) setattr(self, key, value) else: raise (AttributeError(f'{get_class_path(self)} has not {key} attribute.')) @property def name(self) -> str: """Get pipeline step object's name.""" return self.__name @name.setter def name(self, name: str): """Set pipeline step object's name.""" self.__name = name @property def parent(self) -> Self: """Get pipeline step object's parent object.""" return self.__parent @parent.setter def parent(self, parent: Self): """Set layer's parent object.""" self.__parent = parent @property def observers(self) -> list: """Pipeline step object observers list.""" return self.__observers @observers.setter @PipelineStepAttributeSetter def observers(self, observers: list): # Edit new observers dictionary self.__observers = observers @property def execution_times(self): """Get pipeline step object observers execution times dictionary.""" return self.__execution_times @property def catch_exceptions(self) -> bool: """Catch pipeline step method exception instead of crashing execution.""" return self._catch_exceptions @catch_exceptions.setter def catch_exceptions(self, catch_exceptions: bool): self._catch_exceptions = catch_exceptions # Propagate to children for child in self.children(): child.catch_exceptions = self._catch_exceptions @property def image_parameters(self) -> dict: """image method parameters dictionary.""" return self._image_parameters @image_parameters.setter @PipelineStepAttributeSetter def image_parameters(self, image_parameters: dict): self._image_parameters = image_parameters @property def draw_parameters(self) -> dict: """draw method parameters dictionary.""" return self._draw_parameters @draw_parameters.setter @PipelineStepAttributeSetter def draw_parameters(self, draw_parameters: dict): self._draw_parameters = draw_parameters def as_dict(self) -> dict: """Export PipelineStepObject attributes as dictionary. Returns: object_data: dictionary with pipeline step object attributes values. """ return { "name": self.__name, "observers": self.__observers } # noinspection PyAttributeOutsideInit def to_json(self, json_filepath: str = None): """Save pipeline step object into .json file.""" # Remember file path to ease rewriting if json_filepath is not None: # noinspection PyAttributeOutsideInit self.__json_filepath = json_filepath # Open file with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: json.dump({self.__class__.__module__: as_dict(self)}, object_file, ensure_ascii=False, indent=4) # QUESTION: maybe we need two saving mode? #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder) def __str__(self) -> str: """String representation of pipeline step object. Returns: String representation """ logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '') tabs = self.tabulation output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n' if self.__name is not None: output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n' if self.__parent is not None: output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n' if len(self.__observers): output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n' for observer in self.__observers: output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n' for name, value in self.properties(): logging.debug('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: ' if type(value) == dict: output += '\n' for k, v in value.items(): output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n' elif type(value) == list: output += '\n' for v in value: output += f'{tabs}\t - {v}\n' elif type(value) == numpy.ndarray or type(value) == TimestampedImage: output += f'numpy.array{value.shape}\n' elif type(value) == pandas.DataFrame: output += f'pandas.DataFrame{value.shape}\n' else: try: output += f'{value}' except TypeError as e: logging.error('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__) output += f'{Fore.RED}{Style.BRIGHT}!!! {get_class_path(self)}.{name}: {e}{Style.RESET_ALL}\n\n' if output[-1] != '\n': output += '\n' def print_dict(d: dict, key_color, tabs) -> str: output = '' tabs = f'{tabs}\t' for k, v in d.items(): if type(v) is dict: output += f'{tabs} - {key_color}{Style.BRIGHT}{k}{Style.RESET_ALL}:\n{print_dict(v, key_color, tabs)}' else: output += f'{tabs} - {key_color}{Style.BRIGHT}{k}{Style.RESET_ALL}: {v}\n' return output if len(self._image_parameters): output += f'{tabs}\t{Style.BRIGHT}image_parameters{Style.RESET_ALL}:\n' output += print_dict(self._image_parameters, Fore.YELLOW, tabs) if len(self._draw_parameters): output += f'{tabs}\t{Style.BRIGHT}draw_parameters{Style.RESET_ALL}:\n' output += print_dict(self._draw_parameters, Fore.YELLOW, tabs) return output @property def tabulation(self) -> str: """Edit tabulation string according parents number.""" tabs = '' parent = self.__parent while (parent is not None): tabs += '\t' parent = parent.parent return tabs def properties(self) -> tuple[str, any]: """Iterate over object properties values.""" for name in get_class_properties(self.__class__).keys(): yield name, getattr(self, name) def children(self): """Iterate over children pipeline step objects.""" for name, value in self.properties(): # Pipeline step object attribute if issubclass(type(value), PipelineStepObject) and value != self.parent: yield value # Pipeline step objects list attribute elif type(value) == list: for p in value: if issubclass(type(p), PipelineStepObject): yield p # Pipeline step objects list attribute elif type(value) == dict: for p in value.values(): if issubclass(type(p), PipelineStepObject): yield p def PipelineStepMethod(method): """Define a decorator use into PipelineStepObject class to declare pipeline method. !!! danger PipelineStepMethod must have a timestamp as first argument. """ def wrapper(self, *args, timestamp: int | float = None, unwrap: bool = False, **kwargs): """Wrap pipeline step method to measure execution time. Parameters: self: args: any arguments defined by PipelineStepMethod. timestamp: optional method call timestamp (unit doesn't matter) if first args parameter is not a TimestampedObject instance. unwrap: extra arguments used in wrapper function to call wrapped method directly. """ if timestamp is None and len(args) > 0: try: timestamp = args[0].timestamp except: logging.error('%s.%s: %s is not a timestamped class. Use @DataFeatures.timestamp decorator.', get_class_path(self), method.__name__, type(args[0]).__name__) if unwrap: return method(self, *args, **kwargs) # Initialize execution time assessment start = time.perf_counter() exception = None result = None if not self._catch_exceptions: # Execute wrapped method without catching exceptions result = method(self, *args, **kwargs) # Measure execution time self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 else: try: # Execute wrapped method result = method(self, *args, **kwargs) except Exception as e: exception = e finally: # Measure execution time self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 # Notify observers that method has been called subscription_name = f'on_{method.__name__}' for observer in self.observers: # Does the observer cares about this method? if subscription_name in dir(observer): subscription = getattr(observer, subscription_name) # Call subscription subscription(timestamp, self, exception) # Raise timestamped exception if exception is not None: raise TimestampedException(exception, timestamp) return result return wrapper