From 8509a152272d3d9fe987b76cc8d22cb196c959f9 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Tue, 28 May 2024 16:13:44 +0200 Subject: Adding send_signal method to PipelineStepObject. Removing commented code. --- src/argaze/DataFeatures.py | 70 +++++++++++++++++----------------------------- 1 file changed, 25 insertions(+), 45 deletions(-) diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 861d7fe..69599de 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -49,6 +49,7 @@ def get_working_directory() -> str: raise RuntimeError('No working directory') + def set_working_directory(working_directory: str): """Set global working directory.""" @@ -100,6 +101,7 @@ def get_class_path(o: object) -> str: return m + '.' + c.__qualname__ + def get_class_properties(cls: type) -> dict: """Get class properties dictionary. @@ -125,6 +127,7 @@ def get_class_properties(cls: type) -> dict: return object_properties + def from_json(filepath: str) -> any: """ Load object from json file. @@ -151,39 +154,6 @@ def from_json(filepath: str) -> any: # Load unique object object_class, object_data = json.load(configuration_file).popitem() - ''' - # patch_filepath: path to json patch file to modify any configuration entries - - # Apply patch to configuration if required - if patch_filepath is not None: - - with open(patch_filepath) as patch_file: - - patch_data = json.load(patch_file) - - import collections.abc - - def update(d, u): - - for k, v in u.items(): - - if isinstance(v, collections.abc.Mapping): - - d[k] = update(d.get(k, {}), v) - - elif v is None: - - del d[k] - - else: - - d[k] = v - - return d - - objects_data = update(object_data, patch_data) - ''' - # Instanciate class logging.debug('\t+ create %s object', object_class) @@ -1191,6 +1161,26 @@ class PipelineStepObject(): if issubclass(type(p), PipelineStepObject): yield p + def send_signal(self, signal: str, timestamp: int | float = None, exception = None): + """Send signal to observers. + + Parameters: + signal: notify observers with 'on_' method. + timestamp: optional timestamp (unit doesn't matter). + exception: optional exception to share with signal. + """ + + subscription_name = f'on_{signal}' + + for observer in self.observers: + + # Does the observer cares about this method? + if subscription_name in dir(observer): + + subscription = getattr(observer, subscription_name) + + # Call subscription + subscription(timestamp, self, exception) def PipelineStepMethod(method): """Define a decorator use into PipelineStepObject class to declare pipeline method. @@ -1250,18 +1240,8 @@ def PipelineStepMethod(method): # Measure execution time self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3 - # Notify observers that method has been called - subscription_name = f'on_{method.__name__}' - - for observer in self.observers: - - # Does the observer cares about this method? - if subscription_name in dir(observer): - - subscription = getattr(observer, subscription_name) - - # Call subscription - subscription(timestamp, self, exception) + # Notify observers watching 'on_' signal + self.send_signal(method.__name__, exception) # Raise timestamped exception if exception is not None: -- cgit v1.1