aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2024-05-28 16:13:44 +0200
committerThéo de la Hogue2024-05-28 16:13:44 +0200
commit8509a152272d3d9fe987b76cc8d22cb196c959f9 (patch)
tree11fee42ab9134627e214d030195e99756ac8fbad
parentb4f6dc8ae487c44f0afb2ab667a920c79a5e8748 (diff)
downloadargaze-8509a152272d3d9fe987b76cc8d22cb196c959f9.zip
argaze-8509a152272d3d9fe987b76cc8d22cb196c959f9.tar.gz
argaze-8509a152272d3d9fe987b76cc8d22cb196c959f9.tar.bz2
argaze-8509a152272d3d9fe987b76cc8d22cb196c959f9.tar.xz
Adding send_signal method to PipelineStepObject. Removing commented code.
-rw-r--r--src/argaze/DataFeatures.py70
1 files changed, 25 insertions, 45 deletions
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index 861d7fe..69599de 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -49,6 +49,7 @@ def get_working_directory() -> str:
raise RuntimeError('No working directory')
+
def set_working_directory(working_directory: str):
"""Set global working directory."""
@@ -100,6 +101,7 @@ def get_class_path(o: object) -> str:
return m + '.' + c.__qualname__
+
def get_class_properties(cls: type) -> dict:
"""Get class properties dictionary.
@@ -125,6 +127,7 @@ def get_class_properties(cls: type) -> dict:
return object_properties
+
def from_json(filepath: str) -> any:
"""
Load object from json file.
@@ -151,39 +154,6 @@ def from_json(filepath: str) -> any:
# Load unique object
object_class, object_data = json.load(configuration_file).popitem()
- '''
- # patch_filepath: path to json patch file to modify any configuration entries
-
- # Apply patch to configuration if required
- if patch_filepath is not None:
-
- with open(patch_filepath) as patch_file:
-
- patch_data = json.load(patch_file)
-
- import collections.abc
-
- def update(d, u):
-
- for k, v in u.items():
-
- if isinstance(v, collections.abc.Mapping):
-
- d[k] = update(d.get(k, {}), v)
-
- elif v is None:
-
- del d[k]
-
- else:
-
- d[k] = v
-
- return d
-
- objects_data = update(object_data, patch_data)
- '''
-
# Instanciate class
logging.debug('\t+ create %s object', object_class)
@@ -1191,6 +1161,26 @@ class PipelineStepObject():
if issubclass(type(p), PipelineStepObject):
yield p
+ def send_signal(self, signal: str, timestamp: int | float = None, exception = None):
+ """Send signal to observers.
+
+ Parameters:
+ signal: notify observers with 'on_<signal>' method.
+ timestamp: optional timestamp (unit doesn't matter).
+ exception: optional exception to share with signal.
+ """
+
+ subscription_name = f'on_{signal}'
+
+ for observer in self.observers:
+
+ # Does the observer cares about this method?
+ if subscription_name in dir(observer):
+
+ subscription = getattr(observer, subscription_name)
+
+ # Call subscription
+ subscription(timestamp, self, exception)
def PipelineStepMethod(method):
"""Define a decorator use into PipelineStepObject class to declare pipeline method.
@@ -1250,18 +1240,8 @@ def PipelineStepMethod(method):
# Measure execution time
self.execution_times[method.__name__] = (time.perf_counter() - start) * 1e3
- # Notify observers that method has been called
- subscription_name = f'on_{method.__name__}'
-
- for observer in self.observers:
-
- # Does the observer cares about this method?
- if subscription_name in dir(observer):
-
- subscription = getattr(observer, subscription_name)
-
- # Call subscription
- subscription(timestamp, self, exception)
+ # Notify observers watching 'on_<this method name>' signal
+ self.send_signal(method.__name__, exception)
# Raise timestamped exception
if exception is not None: