aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/DataFeatures.py
diff options
context:
space:
mode:
authorThéo de la Hogue2024-03-21 18:23:41 +0100
committerThéo de la Hogue2024-03-21 18:23:41 +0100
commit6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3 (patch)
treeba4f2619bc0cf145e9f1c63c2e8b388d7f173de7 /src/argaze/DataFeatures.py
parente42cdc26fbd7f44f6b60acc14ac0e60828af9f42 (diff)
downloadargaze-6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3.zip
argaze-6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3.tar.gz
argaze-6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3.tar.bz2
argaze-6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3.tar.xz
Major serialization mechanism rewriting. Still not working.
Diffstat (limited to 'src/argaze/DataFeatures.py')
-rw-r--r--src/argaze/DataFeatures.py314
1 files changed, 233 insertions, 81 deletions
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index 1334961..677a179 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -32,6 +32,7 @@ import time
import pandas
import numpy
+import cv2
import matplotlib.pyplot as mpyplot
import matplotlib.patches as mpatches
from colorama import Style, Fore
@@ -54,6 +55,25 @@ def module_path(obj) -> str:
"""
return obj.__class__.__module__
+def get_class(class_path: str) -> object:
+ """Get class object from 'path.to.class' string.
+
+ Parameters:
+ class_path: a 'path.to.class' string.
+
+ Returns:
+ class: a 'path.to.class' class.
+ """
+ parts = class_path.split('.')
+ module = ".".join(parts[:-1])
+
+ m = __import__(module)
+
+ for comp in parts[1:]:
+ m = getattr(m, comp)
+
+ return m
+
def properties(cls) -> list:
"""get class properties name."""
@@ -436,29 +456,149 @@ class SharedObject(TimestampedObject):
self._execution_times = {}
self._exceptions = {}
+def PipelineStepInit(method):
+ """Define a decorator use into PipelineStepObject class to declare pipeline step init method."""
+
+ def wrapper(self, **kwargs):
+ """Wrap pipeline step init method to update PipelineStepObject attributes with arguments after init call.
+
+ Parameters:
+ kwargs: Any arguments defined by PipelineStepMethodInit.
+ """
+
+ # DEBUG
+ print('@PipelineStepInit', kwargs.keys())
+
+ method(self, **kwargs)
+
+ self.update(kwargs)
+
+ return wrapper
+
+def PipelineStepAttributeSetter(method):
+ """Define a decorator use into PipelineStepObject class to declare pipeline step attribute setter."""
+
+ def wrapper(self, new_value):
+ """Wrap pipeline step attribute setter to load attribute from file.
+
+ Parameters:
+ new_value: value used to set attribute.
+ """
+
+ # Get new value type
+ new_value_type = type(new_value)
+
+ # Check setter annotations to get expected value type
+ expected_value_type = method.__annotations__.popitem()[1]
+
+ # Define function to load dict values
+ def load_dict(data: dict) -> any:
+
+ # Check if json keys are PipelineStepObject class and store them in a list
+ new_objects_list = []
+
+ for key, value in data.items():
+
+ try:
+
+ new_class = get_class(key)
+
+ except ValueError as e:
+
+ # Keys are not class name
+ if str(e) == 'Empty module name':
+
+ break
+
+ else:
+
+ raise(e)
+
+ # DEBUG
+ print('@PipelineStepAttributeSetter new_class', new_class)
+
+ new_objects_list.append( new_class(**value) )
+
+ # Only one object have been loaded: pass the object if it is a subclass of expected type
+ if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type):
+
+ return new_objects_list[0]
+
+ # Pass non empty objects list
+ elif len(new_objects_list) > 0:
+
+ return new_objects_list
+
+ # Otherwise, data are parameters of the expected class
+ return expected_value_type(**data)
+
+ # DEBUG
+ print('@PipelineStepAttributeSetter', method.__name__, new_value_type, expected_value_type, type(expected_value_type))
+
+ # String not expected: load value from file
+ if new_value_type == str and new_value_type != expected_value_type:
+
+ filepath = os.path.join(self.working_directory, new_value)
+ file_format = filepath.split('.')[-1]
+
+ # Load image from JPG and PNG formats
+ if file_format == 'jpg' or file_format == 'png':
+
+ # DEBUG
+ print('@PipelineStepAttributeSetter IMAGE', filepath)
+
+ return method(self, cv2.imread(filepath))
+
+ # Load PipelineStepObject from JSON file
+ elif file_format == 'json':
+
+ # DEBUG
+ print('@PipelineStepAttributeSetter issubclass', issubclass(expected_value_type, PipelineStepObject))
+
+ #if issubclass(expected_value_type, PipelineStepObject):
+
+ # DEBUG
+ print('@PipelineStepAttributeSetter JSON', filepath)
+
+ with open(filepath) as file:
+
+ return method(self, load_dict(json.load(file)))
+
+ # DEBUG
+ print('@PipelineStepAttributeSetter unknown file format', file_format)
+
+ # Always load value from dict
+ if new_value_type == dict:
+
+ # DEBUG
+ print('@PipelineStepAttributeSetter dict', new_value)
+
+ return method(self, load_dict(new_value))
+
+ # Otherwise, pass new value to setter method
+ method(self, new_value)
+
+ return wrapper
+
class PipelineStepObject():
"""
Define class to assess pipeline step methods execution time and observe them.
"""
- def __init__(self, name: str = None, working_directory: str = None, observers: dict = None):
- """Initialize PipelineStepObject
-
- Parameters:
- name: object name
- working_directory: folder path to use for relative file path.
- observers: dictionary with observers objects.
+ @PipelineStepInit
+ def __init__(self, **kwargs):
+ """Initialize PipelineStepObject."""
- """
+ # DEBUG
+ print('PipelineStepObject.__init__')
# Init private attribute
- self.__name = name
- self.__working_directory = working_directory
- self.__observers = observers if observers is not None else {}
+ self.__name = None
+ self.__working_directory = None
+ self.__observers = {}
self.__execution_times = {}
- self.__properties = {}
- # parent attribute will be setup later by parent it self
+ # Parent attribute will be setup later by parent it self
self.__parent = None
def __enter__(self):
@@ -489,16 +629,40 @@ class PipelineStepObject():
child.__exit__(exception_type, exception_value, exception_traceback)
+ def update(self, object_data: dict):
+ """Update pipeline step object attributes with dictionary."""
+
+ for key, value in object_data.items():
+
+ setattr(self, key, value)
+
@property
def name(self) -> str:
"""Get pipeline step object's name."""
return self.__name
+ @name.setter
+ def name(self, name: str):
+ """Set pipeline step object's name."""
+ self.__name = name
+
@property
def working_directory(self) -> str:
- """Get pipeline step object's working_directory."""
+ """Get pipeline step object's working directory.
+ This path will be joined to relative file path."""
return self.__working_directory
+ @working_directory.setter
+ def working_directory(self, working_directory: str):
+ """Set pipeline step object's working directory."""
+
+ # Append working directory to the Python path
+ if working_directory is not None:
+
+ sys.path.append(working_directory)
+
+ self.__working_directory = working_directory
+
@property
def parent(self) -> object:
"""Get pipeline step object's parent object."""
@@ -514,98 +678,82 @@ class PipelineStepObject():
"""Get pipeline step object observers dictionary."""
return self.__observers
- @property
- def execution_times(self):
- """Get pipeline step object observers execution times dictionary."""
- return self.__execution_times
-
- def as_dict(self) -> dict:
- """Export PipelineStepObject attributes as dictionary.
-
- Returns:
- object_data: dictionary with pipeline step object attributes values.
- """
- return {
- "name": self.__name,
- "observers": self.__observers
- }
-
- @classmethod
- def from_dict(cls, object_data: dict, working_directory: str = None) -> object:
- """Load PipelineStepObject attributes from dictionary.
-
- Returns:
- object_data: dictionary with pipeline step object attributes values.
- working_directory: folder path where to load files when a dictionary value is a relative filepath.
+ @observers.setter
+ def observers(self, observers_value: dict|str):
+ """Set pipeline step object observers dictionary.
+
+ Parameters:
+ observers_value: a dictionary or a path to a file where to load dictionary
"""
- # Append working directory to the Python path
- if working_directory is not None:
-
- sys.path.append(working_directory)
-
- # Load name
- try:
-
- new_name = object_data.pop('name')
-
- except KeyError:
-
- new_name = None
-
- # Load observers
+ # Edit new observers dictionary
new_observers = {}
- try:
-
- new_observers_value = object_data.pop('observers')
+ # str: edit new observers dictionary from file
+ if type(observers_value) == str:
- # str: relative path to file
- if type(new_observers_value) == str:
+ filepath = os.path.join(self.working_directory, observers_value)
+ file_format = filepath.split('.')[-1]
- filepath = os.path.join(working_directory, new_observers_value)
- file_format = filepath.split('.')[-1]
+ # py: load __observers__ variable from Python file
+ if file_format == 'py':
- # Load module from working directory
- if file_format == 'py':
+ observer_module_path = observers_value.split('.')[0]
- observer_module_path = new_observers_value.split('.')[0]
+ observer_module = importlib.import_module(observer_module_path)
- observer_module = importlib.import_module(observer_module_path)
+ new_observers = observer_module.__observers__
- new_observers = observer_module.__observers__
+ # json: load dictionary from JSON file
+ elif file_format == 'json':
- # dict: instanciate ready-made argaze observers
- elif type(new_observers_value) == dict:
+ with open(filepath) as file:
- for observer_type, observer_data in new_observers_value.items():
+ new_observers = json.load(file)
- new_observers[observer_type] = eval(f'{observer_type}(**observer_data)')
+ # Instanciate observers from dictionary
+ for observer_type, observer_data in new_observers.items():
- except KeyError:
+ self.__observers[observer_type] = get_class(observer_type)(**observer_data)
- pass
+ @property
+ def execution_times(self):
+ """Get pipeline step object observers execution times dictionary."""
+ return self.__execution_times
+
+ def as_dict(self) -> dict:
+ """Export PipelineStepObject attributes as dictionary.
- # Create pipeline step object
- return PipelineStepObject(\
- new_name, \
- working_directory, \
- new_observers \
- )
+ Returns:
+ object_data: dictionary with pipeline step object attributes values.
+ """
+ return {
+ "name": self.__name,
+ "observers": self.__observers
+ }
@classmethod
def from_json(cls, configuration_filepath: str, patch_filepath: str = None) -> object:
"""
- Load pipeline step object from .json file.
+ Load instance from .json file.
Parameters:
configuration_filepath: path to json configuration file
patch_filepath: path to json patch file to modify any configuration entries
"""
+
+ # DEBUG
+ print('PipelineStepObject.from_json', cls)
+
+ # Load configuration from JSON file
with open(configuration_filepath) as configuration_file:
- object_data = json.load(configuration_file)
- working_directory = os.path.dirname(configuration_filepath)
+ # Edit object_data with working directory as first key
+ object_data = {
+ 'working_directory': os.path.dirname(configuration_filepath)
+ }
+
+ object_data.update(json.load(configuration_file))
# Apply patch to configuration if required
if patch_filepath is not None:
@@ -636,7 +784,11 @@ class PipelineStepObject():
object_data = update(object_data, patch_data)
- return cls.from_dict(object_data, working_directory)
+ # DEBUG
+ print('PipelineStepObject.from_json', object_data)
+
+ # Instanciate class
+ return cls(**object_data)
def to_json(self, json_filepath: str = None):
"""Save pipeline step object into .json file."""
@@ -755,7 +907,7 @@ class PipelineStepObject():
print('-', name, type(attr))
yield attr
-
+
def PipelineStepMethod(method):
"""Define a decorator use into PipelineStepObject class to declare pipeline method.