diff options
author | Théo de la Hogue | 2024-03-21 18:23:41 +0100 |
---|---|---|
committer | Théo de la Hogue | 2024-03-21 18:23:41 +0100 |
commit | 6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3 (patch) | |
tree | ba4f2619bc0cf145e9f1c63c2e8b388d7f173de7 /src/argaze/DataFeatures.py | |
parent | e42cdc26fbd7f44f6b60acc14ac0e60828af9f42 (diff) | |
download | argaze-6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3.zip argaze-6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3.tar.gz argaze-6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3.tar.bz2 argaze-6d834e7630c6104e7b40f0fe2d6cb22ed116e6c3.tar.xz |
Major serialization mechanism rewriting. Still not working.
Diffstat (limited to 'src/argaze/DataFeatures.py')
-rw-r--r-- | src/argaze/DataFeatures.py | 314 |
1 files changed, 233 insertions, 81 deletions
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 1334961..677a179 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -32,6 +32,7 @@ import time import pandas import numpy +import cv2 import matplotlib.pyplot as mpyplot import matplotlib.patches as mpatches from colorama import Style, Fore @@ -54,6 +55,25 @@ def module_path(obj) -> str: """ return obj.__class__.__module__ +def get_class(class_path: str) -> object: + """Get class object from 'path.to.class' string. + + Parameters: + class_path: a 'path.to.class' string. + + Returns: + class: a 'path.to.class' class. + """ + parts = class_path.split('.') + module = ".".join(parts[:-1]) + + m = __import__(module) + + for comp in parts[1:]: + m = getattr(m, comp) + + return m + def properties(cls) -> list: """get class properties name.""" @@ -436,29 +456,149 @@ class SharedObject(TimestampedObject): self._execution_times = {} self._exceptions = {} +def PipelineStepInit(method): + """Define a decorator use into PipelineStepObject class to declare pipeline step init method.""" + + def wrapper(self, **kwargs): + """Wrap pipeline step init method to update PipelineStepObject attributes with arguments after init call. + + Parameters: + kwargs: Any arguments defined by PipelineStepMethodInit. + """ + + # DEBUG + print('@PipelineStepInit', kwargs.keys()) + + method(self, **kwargs) + + self.update(kwargs) + + return wrapper + +def PipelineStepAttributeSetter(method): + """Define a decorator use into PipelineStepObject class to declare pipeline step attribute setter.""" + + def wrapper(self, new_value): + """Wrap pipeline step attribute setter to load attribute from file. + + Parameters: + new_value: value used to set attribute. + """ + + # Get new value type + new_value_type = type(new_value) + + # Check setter annotations to get expected value type + expected_value_type = method.__annotations__.popitem()[1] + + # Define function to load dict values + def load_dict(data: dict) -> any: + + # Check if json keys are PipelineStepObject class and store them in a list + new_objects_list = [] + + for key, value in data.items(): + + try: + + new_class = get_class(key) + + except ValueError as e: + + # Keys are not class name + if str(e) == 'Empty module name': + + break + + else: + + raise(e) + + # DEBUG + print('@PipelineStepAttributeSetter new_class', new_class) + + new_objects_list.append( new_class(**value) ) + + # Only one object have been loaded: pass the object if it is a subclass of expected type + if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type): + + return new_objects_list[0] + + # Pass non empty objects list + elif len(new_objects_list) > 0: + + return new_objects_list + + # Otherwise, data are parameters of the expected class + return expected_value_type(**data) + + # DEBUG + print('@PipelineStepAttributeSetter', method.__name__, new_value_type, expected_value_type, type(expected_value_type)) + + # String not expected: load value from file + if new_value_type == str and new_value_type != expected_value_type: + + filepath = os.path.join(self.working_directory, new_value) + file_format = filepath.split('.')[-1] + + # Load image from JPG and PNG formats + if file_format == 'jpg' or file_format == 'png': + + # DEBUG + print('@PipelineStepAttributeSetter IMAGE', filepath) + + return method(self, cv2.imread(filepath)) + + # Load PipelineStepObject from JSON file + elif file_format == 'json': + + # DEBUG + print('@PipelineStepAttributeSetter issubclass', issubclass(expected_value_type, PipelineStepObject)) + + #if issubclass(expected_value_type, PipelineStepObject): + + # DEBUG + print('@PipelineStepAttributeSetter JSON', filepath) + + with open(filepath) as file: + + return method(self, load_dict(json.load(file))) + + # DEBUG + print('@PipelineStepAttributeSetter unknown file format', file_format) + + # Always load value from dict + if new_value_type == dict: + + # DEBUG + print('@PipelineStepAttributeSetter dict', new_value) + + return method(self, load_dict(new_value)) + + # Otherwise, pass new value to setter method + method(self, new_value) + + return wrapper + class PipelineStepObject(): """ Define class to assess pipeline step methods execution time and observe them. """ - def __init__(self, name: str = None, working_directory: str = None, observers: dict = None): - """Initialize PipelineStepObject - - Parameters: - name: object name - working_directory: folder path to use for relative file path. - observers: dictionary with observers objects. + @PipelineStepInit + def __init__(self, **kwargs): + """Initialize PipelineStepObject.""" - """ + # DEBUG + print('PipelineStepObject.__init__') # Init private attribute - self.__name = name - self.__working_directory = working_directory - self.__observers = observers if observers is not None else {} + self.__name = None + self.__working_directory = None + self.__observers = {} self.__execution_times = {} - self.__properties = {} - # parent attribute will be setup later by parent it self + # Parent attribute will be setup later by parent it self self.__parent = None def __enter__(self): @@ -489,16 +629,40 @@ class PipelineStepObject(): child.__exit__(exception_type, exception_value, exception_traceback) + def update(self, object_data: dict): + """Update pipeline step object attributes with dictionary.""" + + for key, value in object_data.items(): + + setattr(self, key, value) + @property def name(self) -> str: """Get pipeline step object's name.""" return self.__name + @name.setter + def name(self, name: str): + """Set pipeline step object's name.""" + self.__name = name + @property def working_directory(self) -> str: - """Get pipeline step object's working_directory.""" + """Get pipeline step object's working directory. + This path will be joined to relative file path.""" return self.__working_directory + @working_directory.setter + def working_directory(self, working_directory: str): + """Set pipeline step object's working directory.""" + + # Append working directory to the Python path + if working_directory is not None: + + sys.path.append(working_directory) + + self.__working_directory = working_directory + @property def parent(self) -> object: """Get pipeline step object's parent object.""" @@ -514,98 +678,82 @@ class PipelineStepObject(): """Get pipeline step object observers dictionary.""" return self.__observers - @property - def execution_times(self): - """Get pipeline step object observers execution times dictionary.""" - return self.__execution_times - - def as_dict(self) -> dict: - """Export PipelineStepObject attributes as dictionary. - - Returns: - object_data: dictionary with pipeline step object attributes values. - """ - return { - "name": self.__name, - "observers": self.__observers - } - - @classmethod - def from_dict(cls, object_data: dict, working_directory: str = None) -> object: - """Load PipelineStepObject attributes from dictionary. - - Returns: - object_data: dictionary with pipeline step object attributes values. - working_directory: folder path where to load files when a dictionary value is a relative filepath. + @observers.setter + def observers(self, observers_value: dict|str): + """Set pipeline step object observers dictionary. + + Parameters: + observers_value: a dictionary or a path to a file where to load dictionary """ - # Append working directory to the Python path - if working_directory is not None: - - sys.path.append(working_directory) - - # Load name - try: - - new_name = object_data.pop('name') - - except KeyError: - - new_name = None - - # Load observers + # Edit new observers dictionary new_observers = {} - try: - - new_observers_value = object_data.pop('observers') + # str: edit new observers dictionary from file + if type(observers_value) == str: - # str: relative path to file - if type(new_observers_value) == str: + filepath = os.path.join(self.working_directory, observers_value) + file_format = filepath.split('.')[-1] - filepath = os.path.join(working_directory, new_observers_value) - file_format = filepath.split('.')[-1] + # py: load __observers__ variable from Python file + if file_format == 'py': - # Load module from working directory - if file_format == 'py': + observer_module_path = observers_value.split('.')[0] - observer_module_path = new_observers_value.split('.')[0] + observer_module = importlib.import_module(observer_module_path) - observer_module = importlib.import_module(observer_module_path) + new_observers = observer_module.__observers__ - new_observers = observer_module.__observers__ + # json: load dictionary from JSON file + elif file_format == 'json': - # dict: instanciate ready-made argaze observers - elif type(new_observers_value) == dict: + with open(filepath) as file: - for observer_type, observer_data in new_observers_value.items(): + new_observers = json.load(file) - new_observers[observer_type] = eval(f'{observer_type}(**observer_data)') + # Instanciate observers from dictionary + for observer_type, observer_data in new_observers.items(): - except KeyError: + self.__observers[observer_type] = get_class(observer_type)(**observer_data) - pass + @property + def execution_times(self): + """Get pipeline step object observers execution times dictionary.""" + return self.__execution_times + + def as_dict(self) -> dict: + """Export PipelineStepObject attributes as dictionary. - # Create pipeline step object - return PipelineStepObject(\ - new_name, \ - working_directory, \ - new_observers \ - ) + Returns: + object_data: dictionary with pipeline step object attributes values. + """ + return { + "name": self.__name, + "observers": self.__observers + } @classmethod def from_json(cls, configuration_filepath: str, patch_filepath: str = None) -> object: """ - Load pipeline step object from .json file. + Load instance from .json file. Parameters: configuration_filepath: path to json configuration file patch_filepath: path to json patch file to modify any configuration entries """ + + # DEBUG + print('PipelineStepObject.from_json', cls) + + # Load configuration from JSON file with open(configuration_filepath) as configuration_file: - object_data = json.load(configuration_file) - working_directory = os.path.dirname(configuration_filepath) + # Edit object_data with working directory as first key + object_data = { + 'working_directory': os.path.dirname(configuration_filepath) + } + + object_data.update(json.load(configuration_file)) # Apply patch to configuration if required if patch_filepath is not None: @@ -636,7 +784,11 @@ class PipelineStepObject(): object_data = update(object_data, patch_data) - return cls.from_dict(object_data, working_directory) + # DEBUG + print('PipelineStepObject.from_json', object_data) + + # Instanciate class + return cls(**object_data) def to_json(self, json_filepath: str = None): """Save pipeline step object into .json file.""" @@ -755,7 +907,7 @@ class PipelineStepObject(): print('-', name, type(attr)) yield attr - + def PipelineStepMethod(method): """Define a decorator use into PipelineStepObject class to declare pipeline method. |