aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/DataFeatures.py
diff options
context:
space:
mode:
authorThéo de la Hogue2024-04-08 21:06:40 +0200
committerThéo de la Hogue2024-04-08 21:06:40 +0200
commit007feec2a8d8de80f40b021710c6e6b382a0e886 (patch)
tree9963a1d9b240128e6a631cffbedf25529b1b49ff /src/argaze/DataFeatures.py
parent3cc3dd1c3ba0f4010f45e68615b342127cba6f6e (diff)
downloadargaze-007feec2a8d8de80f40b021710c6e6b382a0e886.zip
argaze-007feec2a8d8de80f40b021710c6e6b382a0e886.tar.gz
argaze-007feec2a8d8de80f40b021710c6e6b382a0e886.tar.bz2
argaze-007feec2a8d8de80f40b021710c6e6b382a0e886.tar.xz
Fixing typo and docstrings.
Diffstat (limited to 'src/argaze/DataFeatures.py')
-rw-r--r--src/argaze/DataFeatures.py1386
1 files changed, 689 insertions, 697 deletions
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index 7189001..a7b0a48 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -16,48 +16,47 @@ __credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
-from typing import Self
+import bisect
+import json
+import logging
+import math
import os
import sys
-import logging
-import traceback
-import importlib
-import collections
-import json
-import bisect
import threading
-import math
import time
+from typing import Self
-import pandas
-import numpy
import cv2
-import matplotlib.pyplot as mpyplot
import matplotlib.patches as mpatches
+import matplotlib.pyplot as mpyplot
+import numpy
+import pandas
from colorama import Style, Fore
# Define global working directory used to load file using relative path
WORKING_DIRECTORY = [None]
+
def get_working_directory() -> str:
- """Get global working directory."""
- return WORKING_DIRECTORY[0]
+ """Get global working directory."""
+ return WORKING_DIRECTORY[0]
+
def set_working_directory(working_directory: str):
- """Set global working directory."""
+ """Set global working directory."""
- # Forget former global working directory
- if WORKING_DIRECTORY[0] is not None:
+ # Forget former global working directory
+ if WORKING_DIRECTORY[0] is not None:
+ sys.path.remove(WORKING_DIRECTORY[0])
- sys.path.remove(WORKING_DIRECTORY[0])
+ # Append new working directory to Python path
+ sys.path.append(working_directory)
- # Append new working directory to Python path
- sys.path.append(working_directory)
+ WORKING_DIRECTORY[0] = working_directory
- WORKING_DIRECTORY[0] = working_directory
def get_class(class_path: str) -> object:
- """Get class object from 'path.to.class' string.
+ """Get class object from 'path.to.class' string.
Parameters:
class_path: a 'path.to.class' string.
@@ -65,18 +64,19 @@ def get_class(class_path: str) -> object:
Returns:
class: a 'path.to.class' class.
"""
- parts = class_path.split('.')
- module = ".".join(parts[:-1])
+ parts = class_path.split('.')
+ module = ".".join(parts[:-1])
- m = __import__(module)
+ m = __import__(module)
- for comp in parts[1:]:
- m = getattr(m, comp)
+ for comp in parts[1:]:
+ m = getattr(m, comp)
+
+ return m
- return m
def get_class_path(o: object) -> str:
- """Get 'path.to.class' class path from object.
+ """Get 'path.to.class' class path from object.
Parameters:
o: any object instance.
@@ -84,33 +84,33 @@ def get_class_path(o: object) -> str:
Returns:
class_path: object 'path.to.class' class.
"""
- c = o.__class__
- m = c.__module__
+ c = o.__class__
+ m = c.__module__
- # Avoid outputs like 'builtins.str'
- if m == 'builtins':
+ # Avoid outputs like 'builtins.str'
+ if m == 'builtins':
+ return c.__qualname__
- return c.__qualname__
+ return m + '.' + c.__qualname__
- return m + '.' + c.__qualname__
def properties(cls) -> list:
- """get class properties name."""
+ """get class properties name."""
- properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)]
+ properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)]
- for base in cls.__bases__:
+ for base in cls.__bases__:
- for name, item in base.__dict__.items():
+ for name, item in base.__dict__.items():
- if isinstance(item, property):
+ if isinstance(item, property):
+ properties.append(name)
- properties.append(name)
+ return properties
- return properties
-def from_json(configuration_filepath: str, patch_filepath: str = None) -> object:
- """
+def from_json(configuration_filepath: str, patch_filepath: str = None) -> any:
+ """
Load object instance from .json file.
!!! note
@@ -121,293 +121,295 @@ def from_json(configuration_filepath: str, patch_filepath: str = None) -> object
patch_filepath: path to json patch file to modify any configuration entries
"""
- logging.debug('DataFeatures.from_json')
+ logging.debug('DataFeatures.from_json')
- # Edit working directory once
- if get_working_directory() is None:
+ # Edit working directory once
+ if get_working_directory() is None:
+ set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath)))
- set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath)))
+ logging.debug('\t> set global working directory as %s', get_working_directory())
- logging.debug('\t> set global working directory as %s', get_working_directory())
+ # Load configuration from JSON file
+ with open(configuration_filepath) as configuration_file:
- # Load configuration from JSON file
- with open(configuration_filepath) as configuration_file:
+ object_data = json.load(configuration_file)
- object_data = json.load(configuration_file)
+ # Apply patch to configuration if required
+ if patch_filepath is not None:
- # Apply patch to configuration if required
- if patch_filepath is not None:
+ with open(patch_filepath) as patch_file:
- with open(patch_filepath) as patch_file:
+ patch_data = json.load(patch_file)
- patch_data = json.load(patch_file)
+ import collections.abc
- import collections.abc
+ def update(d, u):
- def update(d, u):
+ for k, v in u.items():
- for k, v in u.items():
+ if isinstance(v, collections.abc.Mapping):
- if isinstance(v, collections.abc.Mapping):
+ d[k] = update(d.get(k, {}), v)
- d[k] = update(d.get(k, {}), v)
+ elif v is None:
- elif v is None:
+ del d[k]
- del d[k]
+ else:
- else:
+ d[k] = v
- d[k] = v
+ return d
- return d
+ objects_data = update(object_data, patch_data)
- objects_data = update(object_data, patch_data)
+ # Load unique object
+ object_class, object_data = object_data.popitem()
- # Load unique object
- object_class, object_data = object_data.popitem()
+ # Instanciate class
+ logging.debug('\t+ create %s object', object_class)
- # Instanciate class
- logging.debug('\t+ create %s object', object_class)
+ # noinspection PyCallingNonCallable
+ return get_class(object_class)(**object_data)
- return get_class(object_class)(**object_data)
def from_dict(expected_value_type: type, data: dict) -> any:
- """Load expected type instance(s) from dict values."""
+ """Load expected type instance(s) from dict values."""
- logging.debug('\t> load %s from dict', expected_value_type.__name__)
+ logging.debug('\t> load %s from dict', expected_value_type.__name__)
- # Check if json keys are PipelineStepObject class and store them in a list
- new_objects_list = []
+ # Check if json keys are PipelineStepObject class and store them in a list
+ new_objects_list = []
- for key, value in data.items():
+ for key, value in data.items():
- try:
+ try:
- new_class = get_class(key)
+ new_class = get_class(key)
- except ValueError as e:
+ except ValueError as e:
- # Keys are not class name
- if str(e) == 'Empty module name':
+ # Keys are not class name
+ if str(e) == 'Empty module name':
- break
+ break
- else:
+ else:
- raise(e)
+ raise (e)
- logging.debug('\t+ create %s object from key using value as argument', key)
+ logging.debug('\t+ create %s object from key using value as argument', key)
- new_objects_list.append( new_class(**value) )
+ # noinspection PyCallingNonCallable
+ new_objects_list.append(new_class(**value))
- # Only one object have been loaded: pass the object if it is a subclass of expected type
- if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type):
+ # Only one object have been loaded: pass the object if it is a subclass of expected type
+ if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type):
- return new_objects_list[0]
+ return new_objects_list[0]
- # Pass non empty objects list
- elif len(new_objects_list) > 0:
+ # Pass non-empty objects list
+ elif len(new_objects_list) > 0:
- return new_objects_list
+ return new_objects_list
- # Otherwise, data are parameters of the expected class
- logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__)
+ # Otherwise, data are parameters of the expected class
+ logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__)
- return expected_value_type(**data)
+ return expected_value_type(**data)
-def as_dict(obj, filter: bool=True) -> dict:
- """Export object as dictionary.
- Parameters:
- filter: remove None attribute values.
- """
- _dict = {}
+def as_dict(obj, filter: bool = True) -> dict:
+ """Export object as dictionary.
+
+ Parameters:
+ obj:
+ filter: remove None attribute values.
+ """
+ _dict = {}
- for p in properties(obj.__class__):
+ for p in properties(obj.__class__):
- v = getattr(obj, p)
+ v = getattr(obj, p)
- if not filter or v is not None:
+ if not filter or v is not None:
+ _dict[p] = v
- _dict[p] = v
+ return _dict
- return _dict
class JsonEncoder(json.JSONEncoder):
- """Specific ArGaze JSON Encoder."""
+ """Specific ArGaze JSON Encoder."""
- def default(self, obj):
- """default implementation to serialize object."""
+ def default(self, obj):
+ """default implementation to serialize object."""
- # numpy cases
- if isinstance(obj, numpy.integer):
- return int(obj)
+ # numpy cases
+ if isinstance(obj, numpy.integer):
+ return int(obj)
- elif isinstance(obj, numpy.floating):
- return float(obj)
+ elif isinstance(obj, numpy.floating):
+ return float(obj)
- elif isinstance(obj, numpy.ndarray):
- return obj.tolist()
+ elif isinstance(obj, numpy.ndarray):
+ return obj.tolist()
- # default case
- try:
+ # default case
+ try:
- return json.JSONEncoder.default(self, obj)
+ return json.JSONEncoder.default(self, obj)
- # class case
- except:
+ # class case
+ except:
- # ignore attribute starting with _
- public_dict = {}
+ # ignore attribute starting with _
+ public_dict = {}
- for k, v in vars(obj).items():
-
- if not k.startswith('_'):
-
- # numpy cases
- if isinstance(v, numpy.integer):
- v = int(v)
+ for k, v in vars(obj).items():
- elif isinstance(v, numpy.floating):
- v = float(v)
+ if not k.startswith('_'):
- elif isinstance(v, numpy.ndarray):
- v = v.tolist()
+ # numpy cases
+ if isinstance(v, numpy.integer):
+ v = int(v)
- public_dict[k] = v
+ elif isinstance(v, numpy.floating):
+ v = float(v)
+
+ elif isinstance(v, numpy.ndarray):
+ v = v.tolist()
+
+ public_dict[k] = v
+
+ return public_dict
- return public_dict
class DataDictionary(dict):
- """Enable dot.notation access to dictionary attributes"""
+ """Enable dot notation access to dictionary attributes"""
+
+ __getattr__ = dict.get
+ __setattr__ = dict.__setitem__
+ __delattr__ = dict.__delitem__
- __getattr__ = dict.get
- __setattr__ = dict.__setitem__
- __delattr__ = dict.__delitem__
class TimestampedObject():
- """Abstract class to enable timestamp management."""
+ """Abstract class to enable timestamp management."""
- def __init__(self, timestamp: int|float = math.nan):
- """Initialize TimestampedObject."""
- self._timestamp = timestamp
+ def __init__(self, timestamp: int | float = math.nan):
+ """Initialize TimestampedObject."""
+ self._timestamp = timestamp
- def __repr__(self):
- """String representation."""
- return json.dumps(as_dict(self))
+ def __repr__(self):
+ """String representation."""
+ return json.dumps(as_dict(self))
- @property
- def timestamp(self) -> int|float:
- """Get object timestamp."""
- return self._timestamp
+ @property
+ def timestamp(self) -> int | float:
+ """Get object timestamp."""
+ return self._timestamp
- @timestamp.setter
- def timestamp(self, timestamp: int|float):
- """Set object timestamp."""
- self._timestamp = timestamp
+ @timestamp.setter
+ def timestamp(self, timestamp: int | float):
+ """Set object timestamp."""
+ self._timestamp = timestamp
- def untimestamp(self):
- """Reset object timestamp."""
- self._timestamp = math.nan
+ def untimestamp(self):
+ """Reset object timestamp."""
+ self._timestamp = math.nan
+
+ def is_timestamped(self) -> bool:
+ """Is the object timestamped?"""
+ return not math.isnan(self._timestamp)
- def is_timestamped(self) -> bool:
- """Is the object timestamped?"""
- return not math.isnan(self._timestamp)
class TimestampedObjectsList(list):
- """Handle timestamped object into a list.
+ """Handle timestamped object into a list.
!!! warning "Timestamped objects are not sorted internally"
- Timestamped objects are considered to be stored according at their coming time.
+ Timestamped objects are considered to be stored according to their coming time.
"""
- def __init__(self, ts_object_type: type, ts_objects: list = []):
+ def __init__(self, ts_object_type: type, ts_objects: list = []):
- self.__object_type = ts_object_type
- self.__object_properties = properties(self.__object_type)
+ self.__object_type = ts_object_type
+ self.__object_properties = properties(self.__object_type)
- for ts_object in ts_objects:
+ for ts_object in ts_objects:
+ self.append(ts_object)
- self.append(ts_object)
+ @property
+ def object_type(self):
+ """Get object type handled by the list."""
+ return self.__object_type
- @property
- def object_type(self):
- """Get object type handled by the list."""
- return self.__object_type
+ def append(self, ts_object: TimestampedObject | dict):
+ """Append timestamped object."""
- def append(self, ts_object: TimestampedObject|dict):
- """Append timestamped object."""
-
- # Convert dict into object
- if type(ts_object) == dict:
+ # Convert dict into object
+ if type(ts_object) == dict:
+ ts_object = from_dict(self.__object_type, ts_object)
- ts_object = self.__object_type.from_dict(ts_object)
+ # Check object type
+ if type(ts_object) != self.__object_type:
- # Check object type
- if type(ts_object) != self.__object_type:
+ if not issubclass(ts_object.__class__, self.__object_type):
+ raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance')
- if not issubclass(ts_object.__class__, self.__object_type):
+ if not ts_object.is_timestamped():
+ raise ValueError(f'object is not timestamped')
- raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance')
-
- if not ts_object.is_timestamped():
-
- raise ValueError(f'object is not timestamped')
-
- super().append(ts_object)
+ super().append(ts_object)
- def look_for(self, timestamp: int|float) -> TimestampedObject:
- """Look for object at given timestamp."""
- for ts_object in self:
-
- if ts_object.timestamp == timestamp:
+ def look_for(self, timestamp: int | float) -> TimestampedObject:
+ """Look for object at given timestamp."""
+ for ts_object in self:
- return ts_object
+ if ts_object.timestamp == timestamp:
+ return ts_object
- def __add__(self, ts_objects: list = []) -> Self:
- """Append timestamped objects list."""
+ def __add__(self, ts_objects: list = []) -> Self:
+ """Append timestamped objects list."""
- for ts_object in ts_objects:
+ for ts_object in ts_objects:
+ self.append(ts_object)
- self.append(ts_object)
+ return self
- return self
+ @property
+ def duration(self):
+ """Get inferred duration from first and last timestamps."""
+ if self:
- @property
- def duration(self):
- """Get inferred duration from first and last timestamps."""
- if self:
+ return self[-1].timestamp - self[0].timestamp
- return self[-1].timestamp - self[0].timestamp
+ else:
- else:
+ return 0
- return 0
+ def timestamps(self):
+ """Get all timestamps in list."""
+ return [ts_object.timestamp for ts_object in self]
- def timestamps(self):
- """Get all timestamps in list."""
- return [ts_object.timestamp for ts_object in self]
+ def tuples(self) -> list:
+ """Get all timestamped objects as list of tuple."""
+ return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self]
- def tuples(self) -> list:
- """Get all timestamped objects as list of tuple."""
- return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self]
+ @classmethod
+ def from_dataframe(cls, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self:
+ """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)."""
- @classmethod
- def from_dataframe(self, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> Self:
- """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)."""
+ dataframe.drop(exclude, inplace=True, axis=True)
- dataframe.drop(exclude, inplace=True, axis=True)
+ assert (dataframe.index.name == 'timestamp')
- assert(dataframe.index.name == 'timestamp')
+ object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in
+ dataframe.to_dict('index').items()]
- object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in dataframe.to_dict('index').items()]
+ return TimestampedObjectsList(ts_object_type, object_list)
- return TimestampedObjectsList(ts_object_type, object_list)
-
- def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame:
- """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
+ def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame:
+ """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
The optional *split* argument allows tuple values to be stored in dedicated columns.
For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]}
@@ -421,760 +423,750 @@ class TimestampedObjectsList(list):
Timestamps are stored as index column called 'timestamp'.
"""
- df = pandas.DataFrame(self.tuples(), columns=self.__object_properties)
+ df = pandas.DataFrame(self.tuples(), columns=self.__object_properties)
- # Exclude columns
- df.drop(exclude, inplace=True, axis=True)
+ # Exclude columns
+ df.drop(exclude, inplace=True, axis=True)
- # Split columns
- if len(split) > 0:
+ # Split columns
+ if len(split) > 0:
- splited_columns = []
-
- for column in df.columns:
+ split_columns = []
- if column in split.keys():
+ for column in df.columns:
- df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index)
- df.drop(column, inplace=True, axis=True)
+ if column in split.keys():
- for new_column in split[column]:
+ df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index)
+ df.drop(column, inplace=True, axis=True)
- splited_columns.append(new_column)
+ for new_column in split[column]:
+ split_columns.append(new_column)
- else:
+ else:
- splited_columns.append(column)
+ split_columns.append(column)
- # Reorder splited columns
- df = df[splited_columns]
+ # Reorder split columns
+ df = df[split_columns]
- # Append timestamps as index column
- df['timestamp'] = self.timestamps()
- df.set_index('timestamp', inplace=True)
+ # Append timestamps as index column
+ df['timestamp'] = self.timestamps()
+ df.set_index('timestamp', inplace=True)
- return df
+ return df
- @classmethod
- def from_json(self, ts_object_type: type, json_filepath: str) -> Self:
- """Create a TimestampedObjectsList from .json file."""
+ @classmethod
+ def from_json(cls, ts_object_type: type, json_filepath: str) -> Self:
+ """Create a TimestampedObjectsList from .json file."""
- with open(json_filepath, encoding='utf-8') as ts_objects_file:
+ with open(json_filepath, encoding='utf-8') as ts_objects_file:
+ json_ts_objects = json.load(ts_objects_file)
- json_ts_objects = json.load(ts_objects_file)
+ return TimestampedObjectsList(ts_object_type,
+ [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects])
- return TimestampedObjectsList(ts_object_type, [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects])
+ def to_json(self, json_filepath: str):
+ """Save a TimestampedObjectsList to .json file."""
- def to_json(self, json_filepath: str):
- """Save a TimestampedObjectsList to .json file."""
+ with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file:
+ json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ')
- with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file:
+ def __repr__(self):
+ """String representation"""
+ return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, )
- json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ')
+ def __str__(self):
+ """String representation"""
+ return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False, )
- def __repr__(self):
- """String representation"""
- return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,)
+ def pop_last_until(self, timestamp: int | float) -> TimestampedObject:
+ """Pop all item until a given timestamped value and return the first after."""
- def __str__(self):
- """String representation"""
- return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,)
+ # get last item before given timestamp
+ earliest_value = self.get_last_until(timestamp)
- def pop_last_until(self, timestamp: int|float) -> TimestampedObject:
- """Pop all item until a given timestamped value and return the first after."""
+ while self[0].timestamp < earliest_value.timestamp:
+ self.pop(0)
- # get last item before given timestamp
- earliest_value = self.get_last_until(timestamp)
+ return self[0]
- while self[0].timestamp < earliest_value.timestamp:
+ def pop_last_before(self, timestamp: int | float) -> TimestampedObject:
+ """Pop all item before a given timestamped value and return the last one."""
- self.pop(0)
-
- return self[0]
+ # get last item before given timestamp
+ earliest_value = self.get_last_before(timestamp)
- def pop_last_before(self, timestamp: int|float) -> TimestampedObject:
- """Pop all item before a given timestamped value and return the last one."""
+ popped_value = self.pop(0)
- # get last item before given timestamp
- earliest_value = self.get_last_before(timestamp)
+ while popped_value.timestamp != earliest_value.timestamp:
+ popped_value = self.pop(0)
- poped_value = self.pop(0)
+ return popped_value
- while poped_value.timestamp != earliest_value.timestamp:
+ def get_first_from(self, timestamp: int | float) -> TimestampedObject:
+ """Retrieve first item timestamp from a given timestamp value."""
- poped_value = self.pop(0)
+ ts_list = self.timestamps()
+ first_from_index = bisect.bisect_left(ts_list, timestamp)
- return poped_value
+ if first_from_index < len(self):
- def get_first_from(self, timestamp: int|float) -> TimestampedObject:
- """Retreive first item timestamp from a given timestamp value."""
+ return self[ts_list[first_from_index]]
- ts_list = self.timestamps()
- first_from_index = bisect.bisect_left(ts_list, timestamp)
+ else:
- if first_from_index < len(self):
+ raise KeyError(f'No data stored after {timestamp} timestamp.')
- return self[ts_list[first_from_index]]
-
- else:
-
- raise KeyError(f'No data stored after {timestamp} timestamp.')
+ def get_last_before(self, timestamp: int | float) -> TimestampedObject:
+ """Retrieve last item timestamp before a given timestamp value."""
- def get_last_before(self, timestamp: int|float) -> TimestampedObject:
- """Retreive last item timestamp before a given timestamp value."""
+ ts_list = self.timestamps()
+ last_before_index = bisect.bisect_left(ts_list, timestamp) - 1
- ts_list = self.timestamps()
- last_before_index = bisect.bisect_left(ts_list, timestamp) - 1
+ if last_before_index >= 0:
- if last_before_index >= 0:
+ return self[ts_list[last_before_index]]
- return self[ts_list[last_before_index]]
-
- else:
-
- raise KeyError(f'No data stored before {timestamp} timestamp.')
-
- def get_last_until(self, timestamp: int|float) -> TimestampedObject:
- """Retreive last item timestamp until a given timestamp value."""
+ else:
+
+ raise KeyError(f'No data stored before {timestamp} timestamp.')
+
+ def get_last_until(self, timestamp: int | float) -> TimestampedObject:
+ """Retrieve last item timestamp until a given timestamp value."""
- ts_list = self.timestamps()
- last_until_index = bisect.bisect_right(ts_list, timestamp) - 1
+ ts_list = self.timestamps()
+ last_until_index = bisect.bisect_right(ts_list, timestamp) - 1
- if last_until_index >= 0:
+ if last_until_index >= 0:
- return self[ts_list[last_until_index]]
-
- else:
-
- raise KeyError(f'No data stored until {timestamp} timestamp.')
+ return self[ts_list[last_until_index]]
- def plot(self, names=[], colors=[], split={}, samples=None) -> list:
- """Plot as [matplotlib](https://matplotlib.org/) time chart."""
+ else:
- df = self.as_dataframe(split=split)
- legend_patches = []
+ raise KeyError(f'No data stored until {timestamp} timestamp.')
- # decimate data
- if samples != None:
+ def plot(self, names=[], colors=[], split={}, samples=None) -> list:
+ """Plot as [matplotlib](https://matplotlib.org/) time chart."""
- if samples < len(df):
+ df = self.as_dataframe(split=split)
+ legend_patches = []
- step = int(len(df) / samples) + 1
- df = df.iloc[::step, :]
+ # decimate data
+ if samples != None:
- for name, color in zip(names, colors):
+ if samples < len(df):
+ step = int(len(df) / samples) + 1
+ df = df.iloc[::step, :]
- markerline, stemlines, baseline = mpyplot.stem(df.index, df[name])
- mpyplot.setp(markerline, color=color, linewidth=1, markersize = 1)
- mpyplot.setp(stemlines, color=color, linewidth=1)
- mpyplot.setp(baseline, color=color, linewidth=1)
+ for name, color in zip(names, colors):
+ markerline, stemlines, baseline = mpyplot.stem(df.index, df[name])
+ mpyplot.setp(markerline, color=color, linewidth=1, markersize=1)
+ mpyplot.setp(stemlines, color=color, linewidth=1)
+ mpyplot.setp(baseline, color=color, linewidth=1)
- legend_patches.append(mpatches.Patch(color=color, label=name.upper()))
+ legend_patches.append(mpatches.Patch(color=color, label=name.upper()))
+
+ return legend_patches
- return legend_patches
class SharedObject(TimestampedObject):
- """Abstract class to enable multiple threads sharing for timestamped object."""
+ """Abstract class to enable multiple threads sharing for timestamped object."""
- def __init__(self, timestamp: int|float = math.nan):
+ def __init__(self, timestamp: int | float = math.nan):
+ TimestampedObject.__init__(self, timestamp)
+ self._lock = threading.Lock()
+ self._execution_times = {}
+ self._exceptions = {}
- TimestampedObject.__init__(self, timestamp)
- self._lock = threading.Lock()
- self._execution_times = {}
- self._exceptions = {}
class TimestampedException(Exception, TimestampedObject):
- """Wrap exception to keep track of raising timestamp."""
+ """Wrap exception to keep track of raising timestamp."""
- def __init__(self, exception = Exception, timestamp: int|float = math.nan):
+ def __init__(self, exception=Exception, timestamp: int | float = math.nan):
+ Exception.__init__(self, exception)
+ TimestampedObject.__init__(self, timestamp)
- Exception.__init__(self, exception)
- TimestampedObject.__init__(self, timestamp)
class TimestampedExceptions(TimestampedObjectsList):
- """Handle timestamped exceptions into a list."""
-
- def __init__(self, exceptions: list = []):
+ """Handle timestamped exceptions into a list."""
+
+ def __init__(self, exceptions: list = []):
+ TimestampedObjectsList.__init__(self, TimestampedException, exceptions)
- TimestampedObjectsList.__init__(self, TimestampedException, exceptions)
+ def values(self) -> list[str]:
+ """Get all timestamped exception values as list of messages."""
+ return [ts_exception.message for ts_exception in self]
- def values(self) -> list[str]:
- """Get all timestamped exception values as list of messages."""
- return [ts_exception.message for ts_exception in self]
class PipelineStepLoadingFailed(Exception):
- """
+ """
Exception raised when pipeline step object loading fails.
"""
- def __init__(self, message):
- super().__init__(message)
-
-class TimestampedImage(numpy.ndarray, TimestampedObject):
- """Wrap numpy.array to timestamp image."""
+ def __init__(self, message):
+ super().__init__(message)
- def __new__(cls, array: numpy.array, timestamp: int|float = math.nan):
- return numpy.ndarray.__new__(cls, array.shape, dtype = array.dtype, buffer = array)
+class TimestampedImage(numpy.ndarray, TimestampedObject):
+ """Wrap numpy.array to timestamp image."""
- def __init__(self, array: numpy.array, timestamp: int|float = math.nan):
+ def __new__(cls, array: numpy.array, timestamp: int | float = math.nan):
+ return numpy.ndarray.__new__(cls, array.shape, dtype=array.dtype, buffer=array)
- TimestampedObject.__init__(self, timestamp)
+ def __init__(self, array: numpy.array, timestamp: int | float = math.nan):
+ TimestampedObject.__init__(self, timestamp)
- def __array_finalize__(self, obj):
+ def __array_finalize__(self, obj):
+ pass
- pass
+ @property
+ def size(self) -> list:
+ """Return list with width and height."""
+ return list(self.shape[0:2][::-1])
- @property
- def size(self) -> list:
- """Return list with width and heigth."""
- return list(self.shape[0:2][::-1])
class TimestampedImages(TimestampedObjectsList):
- """Handle timestamped images into a list."""
-
- def __init__(self, images: list = []):
+ """Handle timestamped images into a list."""
+
+ def __init__(self, images: list = []):
+ TimestampedObjectsList.__init__(self, TimestampedImage, images)
- TimestampedObjectsList.__init__(self, TimestampedImage, images)
def PipelineStepInit(method):
- """Define a decorator use into PipelineStepObject class to wrap pipeline step __init__ method."""
+ """Define a decorator use into PipelineStepObject class to wrap pipeline step __init__ method."""
- def wrapper(self, **kwargs):
- """Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call.
+ def wrapper(self, **kwargs):
+ """Wrap pipeline __init__ init method to update PipelineStepObject attributes with arguments after init call.
Parameters:
+ self:
kwargs: any arguments defined by PipelineStepMethodInit.
"""
- # Init pipeline step object attributes
- PipelineStepObject.__init__(self)
+ # Init pipeline step object attributes
+ PipelineStepObject.__init__(self)
- # Init class attributes
- method(self, **kwargs)
+ # Init class attributes
+ method(self, **kwargs)
- # Update all attributes
- self.update_attributes(kwargs)
+ # Update all attributes
+ self.update_attributes(kwargs)
+
+ return wrapper
- return wrapper
def PipelineStepEnter(method):
- """Define a decorator use into PipelineStepObject class to wrap pipeline step __enter__ method."""
+ """Define a decorator use into PipelineStepObject class to wrap pipeline step __enter__ method."""
+
+ def wrapper(self):
+ """Wrap pipeline step __enter__ method to call super, observers and children __enter__ method."""
- def wrapper(self):
- """Wrap pipeline step __enter__ method to call super, observers and children __enter__ method."""
+ logging.debug('%s.__enter__', get_class_path(self))
- logging.debug('%s.__enter__', get_class_path(self))
+ method(self)
- method(self)
+ return PipelineStepObject.__enter__(self)
- return PipelineStepObject.__enter__(self)
+ return wrapper
- return wrapper
def PipelineStepExit(method):
- """Define a decorator use into PipelineStepObject class to wrap pipeline step __exit__ method."""
+ """Define a decorator use into PipelineStepObject class to wrap pipeline step __exit__ method."""
- def wrapper(self, *args):
- """Wrap pipeline step __exit__ method to call super, observers and children __exit__ method."""
+ def wrapper(self, *args):
+ """Wrap pipeline step __exit__ method to call super, observers and children __exit__ method."""
- logging.debug('%s.__exit__', get_class_path(self))
+ logging.debug('%s.__exit__', get_class_path(self))
- PipelineStepObject.__exit__(self, *args)
+ PipelineStepObject.__exit__(self, *args)
- method(self, *args)
+ method(self, *args)
+
+ return wrapper
- return wrapper
def PipelineStepAttributeSetter(method):
- """Define a decorator use into PipelineStepObject class to wrap pipeline step attribute setter."""
+ """Define a decorator use into PipelineStepObject class to wrap pipeline step attribute setter."""
- def wrapper(self, new_value, unwrap: bool = False):
- """Wrap pipeline step attribute setter to load attribute from file.
+ def wrapper(self, new_value, unwrap: bool = False):
+ """Wrap pipeline step attribute setter to load attribute from file.
- Parameters:
- new_value: value used to set attribute.
- unwrap: call wrapped method directly.
- """
- if unwrap:
+ Parameters:
+ self:
+ new_value: value used to set attribute.
+ unwrap: call wrapped method directly.
+ """
+ if unwrap:
+ return method(self, new_value)
- return method(self, new_value)
+ # Get new value type
+ new_value_type = type(new_value)
- # Get new value type
- new_value_type = type(new_value)
+ # Check setter annotations to get expected value type
+ try:
- # Check setter annotations to get expected value type
- try:
+ expected_value_type = list(method.__annotations__.values())[0]
- expected_value_type = list(method.__annotations__.values())[0]
+ except KeyError:
- except KeyError:
+ raise (
+ PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}'))
- raise(PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}'))
+ logging.debug('%s@%s.setter', get_class_path(self), method.__name__)
+ logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__)
- logging.debug('%s@%s.setter', get_class_path(self), method.__name__)
- logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__)
+ # String not expected: load value from file
+ if new_value_type == str and new_value_type != expected_value_type:
- # String not expected: load value from file
- if new_value_type == str and new_value_type != expected_value_type:
+ split_point = new_value.split('.')
- split_point = new_value.split('.')
+ # String have a dot inside: file path with format
+ if len(split_point) > 1:
- # String have a dot inside: file path with format
- if len(split_point) > 1:
+ file_format = split_point[-1].upper()
- file_format = split_point[-1].upper()
+ logging.debug('\t> %s is a path to a %s file', new_value, file_format)
- logging.debug('\t> %s is a path to a %s file', new_value, file_format)
-
- filepath = os.path.join(get_working_directory(), new_value)
+ filepath = os.path.join(get_working_directory(), new_value)
- # Load image from JPG and PNG formats
- if file_format == 'JPG' or file_format == 'PNG':
+ # Load image from JPG and PNG formats
+ if file_format == 'JPG' or file_format == 'PNG':
- return method(self, TimestampedImage(cv2.imread(filepath)))
+ return method(self, TimestampedImage(cv2.imread(filepath)))
- # Load image from OBJ formats
- elif file_format == 'OBJ':
+ # Load image from OBJ formats
+ elif file_format == 'OBJ':
- return method(self, expected_value_type.from_obj(filepath))
+ return method(self, expected_value_type.from_obj(filepath))
- # Load object from JSON file
- elif file_format == 'JSON':
+ # Load object from JSON file
+ elif file_format == 'JSON':
- with open(filepath) as file:
+ with open(filepath) as file:
- return method(self, from_dict(expected_value_type, json.load(file)))
+ return method(self, from_dict(expected_value_type, json.load(file)))
- # No point inside string: identifier name
- else:
+ # No point inside string: identifier name
+ else:
- logging.debug('\t> %s is an identifier', new_value)
- logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__)
-
- return method(self, expected_value_type(new_value))
+ logging.debug('\t> %s is an identifier', new_value)
+ logging.debug('\t+ create %s object using string as argument', expected_value_type.__name__)
- # Dict not expected: load value from dict
- if new_value_type == dict and expected_value_type != dict:
+ return method(self, expected_value_type(new_value))
- return method(self, from_dict(expected_value_type, new_value))
+ # Dict not expected: load value from dict
+ if new_value_type == dict and expected_value_type != dict:
+ return method(self, from_dict(expected_value_type, new_value))
- # Otherwise, pass new value to setter method
- logging.debug('\t> use %s value as passed', new_value_type.__name__)
+ # Otherwise, pass new value to setter method
+ logging.debug('\t> use %s value as passed', new_value_type.__name__)
- method(self, new_value)
+ method(self, new_value)
+
+ return wrapper
- return wrapper
def PipelineStepImage(method):
- """Define a decorator use into PipelineStepObject class to wrap pipeline step image method."""
+ """Define a decorator use into PipelineStepObject class to wrap pipeline step image method."""
+
+ def wrapper(self, **kwargs) -> numpy.array:
+ """Wrap pipeline step image method."""
- def wrapper(self, **kwargs) -> numpy.array:
- """Wrap pipeline step image method."""
+ if kwargs:
- if kwargs:
+ logging.debug('\t> using kwargs')
- logging.debug('\t> using kwargs')
+ return method(self, **kwargs)
- return method(self, **kwargs)
+ else:
- else:
+ logging.debug('\t> using image_parameters')
- logging.debug('\t> using image_parameters')
+ return method(self, **self.image_parameters)
- return method(self, **self.image_parameters)
+ return wrapper
- return wrapper
def PipelineStepDraw(method):
- """Define a decorator use into PipelineStepObject class to wrap pipeline step draw method."""
+ """Define a decorator use into PipelineStepObject class to wrap pipeline step draw method."""
- def wrapper(self, image: numpy.array, **kwargs):
- """Wrap pipeline step draw method."""
+ def wrapper(self, image: numpy.array, **kwargs):
+ """Wrap pipeline step draw method."""
- if kwargs:
+ if kwargs:
- logging.debug('\t> using kwargs')
+ logging.debug('\t> using kwargs')
- method(self, image, **kwargs)
+ method(self, image, **kwargs)
- else:
+ else:
- logging.debug('\t> using draw_parameters')
+ logging.debug('\t> using draw_parameters')
- method(self, image, **self.draw_parameters)
+ method(self, image, **self.draw_parameters)
- return wrapper
+ return wrapper
+
+# noinspection PyAttributeOutsideInit
class PipelineStepObject():
- """
+ """
Define class to assess pipeline step methods execution time and observe them.
"""
- __initialized = False
-
- def __init__(self):
- """Initialize PipelineStepObject."""
-
- if not self.__initialized:
+ __initialized = False
- logging.debug('%s.__init__', get_class_path(self))
+ def __init__(self):
+ """Initialize PipelineStepObject."""
- # Init private attributes
- self.__initialized = True
- self.__name = None
- self.__observers = []
- self.__execution_times = {}
- self.__image_parameters = {}
+ if not self.__initialized:
+ logging.debug('%s.__init__', get_class_path(self))
- # Init protected attributes
- self._image_parameters = {}
- self._draw_parameters = {}
-
- # Parent attribute will be setup later by parent it self
- self.__parent = None
+ # Init private attributes
+ self.__initialized = True
+ self.__name = None
+ self.__observers = []
+ self.__execution_times = {}
+ self.__image_parameters = {}
- def __enter__(self):
- """Define default method to enter into pipeline step object context."""
+ # Init protected attributes
+ self._image_parameters = {}
+ self._draw_parameters = {}
- # Start children pipeline step objects
- for child in self.children:
+ # Parent attribute will be setup later by parent itself
+ self.__parent = None
- child.__enter__()
+ def __enter__(self):
+ """Define default method to enter into pipeline step object context."""
- # Start observers
- for observer in self.observers:
+ # Start children pipeline step objects
+ for child in self.children:
+ child.__enter__()
- observer.__enter__()
+ # Start observers
+ for observer in self.observers:
+ observer.__enter__()
- return self
+ return self
- def __exit__(self, exception_type, exception_value, exception_traceback):
- """Define default method to exit from pipeline step object context."""
+ def __exit__(self, exception_type, exception_value, exception_traceback):
+ """Define default method to exit from pipeline step object context."""
- # Stop observers
- for observer in self.observers:
+ # Stop observers
+ for observer in self.observers:
+ observer.__exit__(exception_type, exception_value, exception_traceback)
- observer.__exit__(exception_type, exception_value, exception_traceback)
+ # Stop children pipeline step objects
+ for child in self.children:
+ child.__exit__(exception_type, exception_value, exception_traceback)
- # Stop children pipeline step objects
- for child in self.children:
+ def update_attributes(self, object_data: dict):
+ """Update pipeline step object attributes with dictionary."""
- child.__exit__(exception_type, exception_value, exception_traceback)
+ for key, value in object_data.items():
- def update_attributes(self, object_data: dict):
- """Update pipeline step object attributes with dictionary."""
+ if hasattr(self, key):
- for key, value in object_data.items():
+ logging.debug('%s.update_attributes > update %s with %s value', get_class_path(self), key,
+ type(value).__name__)
- if hasattr(self, key):
+ setattr(self, key, value)
- logging.debug('%s.update_attributes > update %s with %s value', get_class_path(self), key, type(value).__name__)
+ else:
- setattr(self, key, value)
+ raise (AttributeError(f'{get_class_path(self)} has not {key} attribute.'))
- else:
+ @property
+ def name(self) -> str:
+ """Get pipeline step object's name."""
+ return self.__name
- raise(AttributeError(f'{get_class_path(self)} has not {key} attribute.'))
+ @name.setter
+ def name(self, name: str):
+ """Set pipeline step object's name."""
+ self.__name = name
- @property
- def name(self) -> str:
- """Get pipeline step object's name."""
- return self.__name
+ @property
+ def parent(self) -> object:
+ """Get pipeline step object's parent object."""
+ return self.__parent
- @name.setter
- def name(self, name: str):
- """Set pipeline step object's name."""
- self.__name = name
+ @parent.setter
+ def parent(self, parent: object):
+ """Set layer's parent object."""
+ self.__parent = parent
- @property
- def parent(self) -> object:
- """Get pipeline step object's parent object."""
- return self.__parent
+ @property
+ def observers(self) -> list:
+ """Pipeline step object observers list."""
+ return self.__observers
- @parent.setter
- def parent(self, parent: object):
- """Set layer's parent object."""
- self.__parent = parent
+ @observers.setter
+ @PipelineStepAttributeSetter
+ def observers(self, observers: list):
- @property
- def observers(self) -> list:
- """Pipeline step object observers list."""
- return self.__observers
+ # Edit new observers dictionary
+ self.__observers = observers
- @observers.setter
- @PipelineStepAttributeSetter
- def observers(self, observers: list):
+ @property
+ def execution_times(self):
+ """Get pipeline step object observers execution times dictionary."""
+ return self.__execution_times
- # Edit new observers dictionary
- self.__observers = observers
+ @property
+ def image_parameters(self) -> dict:
+ """image method parameters dictionary."""
+ return self._image_parameters
- @property
- def execution_times(self):
- """Get pipeline step object observers execution times dictionary."""
- return self.__execution_times
-
- @property
- def image_parameters(self) -> dict:
- """image method parameters dictionary."""
- return self._image_parameters
+ @image_parameters.setter
+ @PipelineStepAttributeSetter
+ def image_parameters(self, image_parameters: dict):
- @image_parameters.setter
- @PipelineStepAttributeSetter
- def image_parameters(self, image_parameters: dict):
+ self._image_parameters = image_parameters
- self._image_parameters = image_parameters
+ @property
+ def draw_parameters(self) -> dict:
+ """draw method parameters dictionary."""
+ return self._draw_parameters
- @property
- def draw_parameters(self) -> dict:
- """draw method parameters dictionary."""
- return self._draw_parameters
+ @draw_parameters.setter
+ @PipelineStepAttributeSetter
+ def draw_parameters(self, draw_parameters: dict):
- @draw_parameters.setter
- @PipelineStepAttributeSetter
- def draw_parameters(self, draw_parameters: dict):
+ self._draw_parameters = draw_parameters
- self._draw_parameters = draw_parameters
-
- def as_dict(self) -> dict:
- """Export PipelineStepObject attributes as dictionary.
+ def as_dict(self) -> dict:
+ """Export PipelineStepObject attributes as dictionary.
Returns:
object_data: dictionary with pipeline step object attributes values.
"""
- return {
- "name": self.__name,
- "observers": self.__observers
- }
-
- def to_json(self, json_filepath: str = None):
- """Save pipeline step object into .json file."""
-
- # Remember file path to ease rewriting
- if json_filepath is not None:
+ return {
+ "name": self.__name,
+ "observers": self.__observers
+ }
- self.__json_filepath = json_filepath
+ # noinspection PyAttributeOutsideInit
+ def to_json(self, json_filepath: str = None):
+ """Save pipeline step object into .json file."""
- # Open file
- with open(self.__json_filepath, 'w', encoding='utf-8') as object_file:
+ # Remember file path to ease rewriting
+ if json_filepath is not None:
+ # noinspection PyAttributeOutsideInit
+ self.__json_filepath = json_filepath
- json.dump({self.__class__.__module__:as_dict(self)}, object_file, ensure_ascii=False, indent=4)
+ # Open file
+ with open(self.__json_filepath, 'w', encoding='utf-8') as object_file:
+ json.dump({self.__class__.__module__: as_dict(self)}, object_file, ensure_ascii=False, indent=4)
- # QUESTION: maybe we need two saving mode?
- #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder)
+ # QUESTION: maybe we need two saving mode?
+ #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder)
- def __str__(self) -> str:
- """
+ def __str__(self) -> str:
+ """
String representation of pipeline step object.
Returns:
String representation
"""
- logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '')
-
- tabs = self.tabulation
- output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n'
-
- if self.__name is not None:
- output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n'
-
- if self.__parent is not None:
- output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n'
-
- if len(self.__observers):
- output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n'
- for observer in self.__observers:
- output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n'
-
- for name, value in self.properties:
+ logging.debug('%s.__str__ %s', get_class_path(self), self.name if self.name is not None else '')
- logging.debug('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__)
+ tabs = self.tabulation
+ output = f'{Fore.GREEN}{Style.BRIGHT}{self.__class__.__module__}.{self.__class__.__name__}{Style.RESET_ALL}\n'
- output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: '
+ if self.__name is not None:
+ output += f'{tabs}\t{Style.BRIGHT}name: {Fore.MAGENTA}{self.__name}{Style.RESET_ALL}\n'
- if type(value) == dict:
+ if self.__parent is not None:
+ output += f'{tabs}\t{Style.BRIGHT}parent{Style.RESET_ALL}: {Fore.MAGENTA}{self.__parent.name}{Style.RESET_ALL}\n'
- output += '\n'
+ if len(self.__observers):
+ output += f'{tabs}\t{Style.BRIGHT}observers{Style.RESET_ALL}:\n'
+ for observer in self.__observers:
+ output += f'{tabs}\t - {Fore.GREEN}{Style.BRIGHT}{observer.__class__.__module__}.{observer.__class__.__name__}{Style.RESET_ALL}\n'
- for k, v in value.items():
+ for name, value in self.properties:
- output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n'
+ logging.debug('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__)
- elif type(value) == list:
+ output += f'{tabs}\t{Style.BRIGHT}{name}{Style.RESET_ALL}: '
- output += '\n'
+ if type(value) == dict:
- for v in value:
+ output += '\n'
- output += f'{tabs}\t - {v}\n'
+ for k, v in value.items():
+ output += f'{tabs}\t - {Fore.MAGENTA}{k}{Style.RESET_ALL}: {v}\n'
- elif type(value) == numpy.ndarray or type(value) == TimestampedImage:
+ elif type(value) == list:
- output += f'numpy.array{value.shape}\n'
+ output += '\n'
- elif type(value) == pandas.DataFrame:
+ for v in value:
+ output += f'{tabs}\t - {v}\n'
- output += f'pandas.DataFrame{value.shape}\n'
+ elif type(value) == numpy.ndarray or type(value) == TimestampedImage:
- else:
+ output += f'numpy.array{value.shape}\n'
- try:
+ elif type(value) == pandas.DataFrame:
- output += f'{value}'
+ output += f'pandas.DataFrame{value.shape}\n'
- except TypeError as e:
+ else:
- logging.error('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__)
+ try:
- output += f'{Fore.RED}{Style.BRIGHT}!!! {get_class_path(self)}.{name}: {e}{Style.RESET_ALL}\n\n'
+ output += f'{value}'
- if output[-1] != '\n':
+ except TypeError as e:
- output += '\n'
+ logging.error('%s.__str__ @property %s (%s)', get_class_path(self), name, type(value).__name__)
- return output
+ output += f'{Fore.RED}{Style.BRIGHT}!!! {get_class_path(self)}.{name}: {e}{Style.RESET_ALL}\n\n'
- @property
- def tabulation(self) -> str:
- """Edit tabulation string according parents number."""
+ if output[-1] != '\n':
+ output += '\n'
- tabs = ''
- parent = self.__parent
+ return output
- while (parent is not None):
+ @property
+ def tabulation(self) -> str:
+ """Edit tabulation string according parents number."""
- tabs += '\t'
- parent = parent.parent
+ tabs = ''
+ parent = self.__parent
- return tabs
+ while (parent is not None):
+ tabs += '\t'
+ parent = parent.parent
- @property
- def properties(self) -> tuple[name, any]:
- """Iterate over pipeline step properties values."""
+ return tabs
- properties = [name for name, item in self.__class__.__dict__.items() if isinstance(item, property)]
+ @property
+ def properties(self) -> tuple[name, any]:
+ """Iterate over pipeline step properties values."""
- for base in self.__class__.__bases__:
+ properties = [name for name, item in self.__class__.__dict__.items() if isinstance(item, property)]
- if base != PipelineStepObject and base != SharedObject:
+ for base in self.__class__.__bases__:
- for name, item in base.__dict__.items():
+ if base != PipelineStepObject and base != SharedObject:
- if isinstance(item, property) and not name in properties:
+ for name, item in base.__dict__.items():
- properties.append(name)
+ if isinstance(item, property) and not name in properties:
+ properties.append(name)
- for name in properties:
+ for name in properties:
+ yield name, getattr(self, name)
- yield name, getattr(self, name)
+ @property
+ def children(self) -> object:
+ """Iterate over children pipeline step objects."""
- @property
- def children(self) -> object:
- """Iterate over children pipeline step objects."""
+ for name, value in self.properties:
- for name, value in self.properties:
+ # Pipeline step object attribute
+ if issubclass(type(value), PipelineStepObject) and value != self.parent:
- # Pipeline step object attribute
- if issubclass(type(value), PipelineStepObject) and value != self.parent:
+ yield value
- yield value
+ # Pipeline step objects list attribute
+ elif type(value) == list:
- # Pipeline step objects list attribute
- elif type(value) == list:
+ for p in value:
- for p in value:
+ if issubclass(type(p), PipelineStepObject):
+ yield p
- if issubclass(type(p), PipelineStepObject):
+ # Pipeline step objects list attribute
+ elif type(value) == dict:
- yield p
+ for p in value.values():
- # Pipeline step objects list attribute
- elif type(value) == dict:
+ if issubclass(type(p), PipelineStepObject):
+ yield p
- for p in value.values():
-
- if issubclass(type(p), PipelineStepObject):
-
- yield p
def PipelineStepMethod(method):
- """Define a decorator use into PipelineStepObject class to declare pipeline method.
-
- !!! danger
- PipelineStepMethod must have a timestamp as first argument.
- """
+ """Define a decorator use into PipelineStepObject class to declare pipeline method.
- def wrapper(self, *args, timestamp: int|float = None, unwrap: bool = False, catch_exceptions: bool = True, **kwargs):
- """Wrap pipeline step method to measure execution time.
+ !!! danger
+ PipelineStepMethod must have a timestamp as first argument.
+ """
- Parameters:
- args: any arguments defined by PipelineStepMethod.
- timestamp: optional method call timestamp (unit does'nt matter) if first args parameter is not a TimestampedObject instance.
- unwrap: extra arguments used in wrapper function to call wrapped method directly.
- catch_exceptions: extra arguments used in wrapper function to catch exception.
- """
- if timestamp is None and len(args) > 0:
+ def wrapper(self, *args, timestamp: int | float = None, unwrap: bool = False, catch_exceptions: bool = True, **kwargs):
+ """Wrap pipeline step method to measure execution time.
- if issubclass(type(args[0]), TimestampedObject):
+ Parameters:
+ self:
+ args: any arguments defined by PipelineStepMethod.
+ timestamp: optional method call timestamp (unit doesn't matter) if first args parameter is not a
+ TimestampedObject instance.
+ unwrap: extra arguments used in wrapper function to call wrapped method directly.
+ catch_exceptions: extra arguments used in wrapper function to catch exception.
+ """
+ if timestamp is None and len(args) > 0:
- timestamp = args[0].timestamp
+ if issubclass(type(args[0]), TimestampedObject):
- else:
+ timestamp = args[0].timestamp
- logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.', get_class_path(self), method.__name__, type(args[0]).__name__)
+ else:
- if unwrap:
+ logging.error('%s.%s: %s is not a TimestampedObject subclass. You must pass a timestamp argument.', get_class_path(self), method.__name__, type(args[0]).__name__)
- return method(self, *args, **kwargs)
+ if unwrap:
+ return method(self, *args, **kwargs)
- # Initialize execution time assessment
- start = time.perf_counter()
- exception = None
- result = None
+ # Initialize execution time assessment
+ start = time.perf_counter()
+ exception = None
+ result = None
- if not catch_exceptions:
+ if not catch_exceptions:
- # Execute wrapped method without catching exceptions
- result = method(self, *args, **kwargs)
+ # Execute wrapped method without catching exceptions
+ result = method(self, *args, **kwargs)
- # Measure execution time
- self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3
-
- else:
-
- try:
+ # Measure execution time
+ self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3
- # Execute wrapped method
- result = method(self, *args, **kwargs)
+ else:
- except Exception as e:
+ try:
- exception = e
+ # Execute wrapped method
+ result = method(self, *args, **kwargs)
- finally:
+ except Exception as e:
- # Measure execution time
- self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3
+ exception = e
- # Notify observers that method has been called
- subscription_name = f'on_{method.__name__}'
+ finally:
- for observer in self.observers:
+ # Measure execution time
+ self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3
- # Does the observer cares about this method?
- if subscription_name in dir(observer):
+ # Notify observers that method has been called
+ subscription_name = f'on_{method.__name__}'
- subscription = getattr(observer, subscription_name)
+ for observer in self.observers:
- # Call subscription
- subscription(timestamp, self, exception)
+ # Does the observer cares about this method?
+ if subscription_name in dir(observer):
+ subscription = getattr(observer, subscription_name)
- # Raise timestamped exception
- if exception is not None:
+ # Call subscription
+ subscription(timestamp, self, exception)
- raise TimestampedException(exception, timestamp)
+ # Raise timestamped exception
+ if exception is not None:
+ raise TimestampedException(exception, timestamp)
- return result
+ return result
- return wrapper
+ return wrapper