aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/DataFeatures.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/argaze/DataFeatures.py')
-rw-r--r--src/argaze/DataFeatures.py322
1 files changed, 155 insertions, 167 deletions
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index e24ecf1..f573f1c 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -41,12 +41,6 @@ WORKING_DIRECTORY = [None]
def get_working_directory() -> str:
"""Get global working directory."""
-
- # Check global working directory
- if WORKING_DIRECTORY[0] is None:
-
- raise(ValueError(f'No working directory.'))
-
return WORKING_DIRECTORY[0]
def set_working_directory(working_directory: str):
@@ -62,15 +56,6 @@ def set_working_directory(working_directory: str):
WORKING_DIRECTORY[0] = working_directory
-def module_path(obj) -> str:
- """
- Get object module path.
-
- Returns:
- module path
- """
- return obj.__class__.__module__
-
def get_class(class_path: str) -> object:
"""Get class object from 'path.to.class' string.
@@ -105,6 +90,113 @@ def properties(cls) -> list:
return properties
+def from_json(configuration_filepath: str, patch_filepath: str = None) -> object:
+ """
+ Load object instance from .json file.
+
+ !!! note
+ The directory where configuration file is will be the global working directory.
+
+ Parameters:
+ configuration_filepath: path to json configuration file
+ patch_filepath: path to json patch file to modify any configuration entries
+ """
+
+ logging.debug('DataFeatures.from_json')
+
+ # Edit working directory once
+ if get_working_directory() is None:
+
+ set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath)))
+
+ logging.debug('\t> set global working directory as %s', get_working_directory())
+
+ # Load configuration from JSON file
+ with open(configuration_filepath) as configuration_file:
+
+ object_data = json.load(configuration_file)
+
+ # Apply patch to configuration if required
+ if patch_filepath is not None:
+
+ with open(patch_filepath) as patch_file:
+
+ patch_data = json.load(patch_file)
+
+ import collections.abc
+
+ def update(d, u):
+
+ for k, v in u.items():
+
+ if isinstance(v, collections.abc.Mapping):
+
+ d[k] = update(d.get(k, {}), v)
+
+ elif v is None:
+
+ del d[k]
+
+ else:
+
+ d[k] = v
+
+ return d
+
+ objects_data = update(object_data, patch_data)
+
+ # Load unique object
+ object_class, object_data = object_data.popitem()
+
+ # Instanciate class
+ logging.debug('\t+ create %s object', object_class)
+
+ return get_class(object_class)(**object_data)
+
+def from_dict(expected_value_type: type, data: dict) -> any:
+ """Load expected type instance(s) from dict values."""
+
+ logging.debug('\t> load %s from dict', expected_value_type.__name__)
+
+ # Check if json keys are PipelineStepObject class and store them in a list
+ new_objects_list = []
+
+ for key, value in data.items():
+
+ try:
+
+ new_class = get_class(key)
+
+ except ValueError as e:
+
+ # Keys are not class name
+ if str(e) == 'Empty module name':
+
+ break
+
+ else:
+
+ raise(e)
+
+ logging.debug('\t+ create %s object from key using value as argument', key)
+
+ new_objects_list.append( new_class(**value) )
+
+ # Only one object have been loaded: pass the object if it is a subclass of expected type
+ if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type):
+
+ return new_objects_list[0]
+
+ # Pass non empty objects list
+ elif len(new_objects_list) > 0:
+
+ return new_objects_list
+
+ # Otherwise, data are parameters of the expected class
+ logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__)
+
+ return expected_value_type(**data)
+
def as_dict(obj, filter: bool=True) -> dict:
"""Export object as dictionary.
@@ -229,7 +321,7 @@ class TimestampedObjectsList(list):
def append(self, ts_object: TimestampedObject|dict):
"""Append timestamped object."""
- # Convert dict into GazePosition
+ # Convert dict into object
if type(ts_object) == dict:
ts_object = self.__object_type.from_dict(ts_object)
@@ -472,6 +564,33 @@ class SharedObject(TimestampedObject):
self._execution_times = {}
self._exceptions = {}
+class TimestampedException(Exception, TimestampedObject):
+ """Wrap exception to keep track of raising timestamp."""
+
+ def __init__(self, exception = Exception, timestamp: int|float = math.nan):
+
+ Exception.__init__(self, exception)
+ TimestampedObject.__init__(self, timestamp)
+
+class TimeStampedExceptions(TimestampedObjectsList):
+ """Handle timestamped exceptions into a list."""
+
+ def __init__(self, exceptions: list = []):
+
+ TimestampedObjectsList.__init__(self, TimestampedException, exceptions)
+
+ def values(self) -> list[str]:
+ """Get all timestamped exception values as list of messages."""
+ return [ts_exception.message for ts_exception in self]
+
+class PipelineStepLoadingFailed(Exception):
+ """
+ Exception raised when pipeline step object loading fails.
+ """
+ def __init__(self, message):
+
+ super().__init__(message)
+
def PipelineStepInit(method):
"""Define a decorator use into PipelineStepObject class to declare pipeline step init method."""
@@ -512,53 +631,10 @@ def PipelineStepAttributeSetter(method):
except KeyError:
- raise(ValueError(f'Annotations are missing for {method.__name__}: {method.__annotations__}'))
-
- logging.debug('@PipelineStepAttributeSetter %s.%s.setter(%s) with %s', type(self).__name__, method.__name__, expected_value_type.__name__, new_value_type.__name__)
+ raise(PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}'))
- # Define function to load dict values
- def load_dict(data: dict) -> any:
-
- logging.debug('\t> load %s from %s', expected_value_type.__name__, new_value_type.__name__)
-
- # Check if json keys are PipelineStepObject class and store them in a list
- new_objects_list = []
-
- for key, value in data.items():
-
- try:
-
- new_class = get_class(key)
-
- except ValueError as e:
-
- # Keys are not class name
- if str(e) == 'Empty module name':
-
- break
-
- else:
-
- raise(e)
-
- logging.debug('\t+ create %s object from key using value as argument', key)
-
- new_objects_list.append( new_class(**value) )
-
- # Only one object have been loaded: pass the object if it is a subclass of expected type
- if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type):
-
- return new_objects_list[0]
-
- # Pass non empty objects list
- elif len(new_objects_list) > 0:
-
- return new_objects_list
-
- # Otherwise, data are parameters of the expected class
- logging.debug('\t+ create %s object using %s as argument', expected_value_type.__name__, new_value_type.__name__)
-
- return expected_value_type(**data)
+ logging.debug('%s@%s.setter', type(self).__name__, method.__name__)
+ logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__)
# String not expected: load value from file
if new_value_type == str and new_value_type != expected_value_type:
@@ -568,28 +644,28 @@ def PipelineStepAttributeSetter(method):
# String have a dot inside: file path with format
if len(split_point) > 1:
- file_format = split_point[-1]
+ file_format = split_point[-1].upper()
- logging.debug('\t> %s is a path to a %s file', new_value, file_format.upper())
+ logging.debug('\t> %s is a path to a %s file', new_value, file_format)
filepath = os.path.join(get_working_directory(), new_value)
# Load image from JPG and PNG formats
- if file_format == 'jpg' or file_format == 'png':
+ if file_format == 'JPG' or file_format == 'PNG':
return method(self, cv2.imread(filepath))
# Load image from OBJ formats
- elif file_format == 'obj':
+ elif file_format == 'OBJ':
return method(self, expected_value_type.from_obj(filepath))
# Load object from JSON file
- elif file_format == 'json':
+ elif file_format == 'JSON':
with open(filepath) as file:
- return method(self, load_dict(json.load(file)))
+ return method(self, from_dict(expected_value_type, json.load(file)))
# No point inside string: identifier name
else:
@@ -602,7 +678,7 @@ def PipelineStepAttributeSetter(method):
# Dict not expected: load value from dict
if new_value_type == dict and expected_value_type != dict:
- return method(self, load_dict(new_value))
+ return method(self, from_dict(expected_value_type, new_value))
# Otherwise, pass new value to setter method
logging.debug('\t> use %s value as passed', new_value_type.__name__)
@@ -620,7 +696,7 @@ class PipelineStepObject():
def __init__(self, **kwargs):
"""Initialize PipelineStepObject."""
- logging.debug('PipelineStepObject.__init__ %s', type(self).__name__)
+ logging.debug('%s.__init__', type(self).__name__)
# Init private attribute
self.__name = None
@@ -633,6 +709,8 @@ class PipelineStepObject():
def __enter__(self):
"""At with statement start."""
+ logging.debug('%s.__enter__', type(self).__name__)
+
# Start children pipeline step objects
for child in self.children:
@@ -648,6 +726,8 @@ class PipelineStepObject():
def __exit__(self, exception_type, exception_value, exception_traceback):
"""At with statement end."""
+ logging.debug('%s.__exit__', type(self).__name__)
+
# End observers
for observer in self.__observers:
@@ -665,7 +745,7 @@ class PipelineStepObject():
if hasattr(self, key):
- logging.debug('PipelineStepObject.update_attributes %s.%s with %s value', type(self).__name__, key, type(value).__name__)
+ logging.debug('%s.update_attributes > update %s with %s value', type(self).__name__, key, type(value).__name__)
setattr(self, key, value)
@@ -721,65 +801,6 @@ class PipelineStepObject():
"observers": self.__observers
}
- @classmethod
- def from_json(cls, configuration_filepath: str, patch_filepath: str = None) -> object:
- """
- Load instance from .json file.
-
- !!! note
- The directory where configuration file is will be the global working directory.
-
- Parameters:
- configuration_filepath: path to json configuration file
- patch_filepath: path to json patch file to modify any configuration entries
- """
-
- logging.debug('%s.from_json', cls.__name__)
-
- # Edit working directory
- set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath)))
-
- logging.debug('\t> set global working directory as %s', get_working_directory())
-
- # Load configuration from JSON file
- with open(configuration_filepath) as configuration_file:
-
- object_data = json.load(configuration_file)
-
- # Apply patch to configuration if required
- if patch_filepath is not None:
-
- with open(patch_filepath) as patch_file:
-
- patch_data = json.load(patch_file)
-
- import collections.abc
-
- def update(d, u):
-
- for k, v in u.items():
-
- if isinstance(v, collections.abc.Mapping):
-
- d[k] = update(d.get(k, {}), v)
-
- elif v is None:
-
- del d[k]
-
- else:
-
- d[k] = v
-
- return d
-
- object_data = update(object_data, patch_data)
-
- # Instanciate class
- logging.debug('\t+ create %s object from configuration updated by patch', cls.__name__)
-
- return cls(**object_data)
-
def to_json(self, json_filepath: str = None):
"""Save pipeline step object into .json file."""
@@ -791,10 +812,10 @@ class PipelineStepObject():
# Open file
with open(self.__json_filepath, 'w', encoding='utf-8') as object_file:
- json.dump({module_path(self):as_dict(self)}, object_file, ensure_ascii=False, indent=4)
+ json.dump({self.__class__.__module__:as_dict(self)}, object_file, ensure_ascii=False, indent=4)
# QUESTION: maybe we need two saving mode?
- #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder)
+ #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder)
def __str__(self) -> str:
"""
@@ -973,10 +994,10 @@ def PipelineStepMethod(method):
# Call subscription
subscription(timestamp, self, exception)
- # Raise exception
+ # Raise timestamped exception
if exception is not None:
- raise exception
+ raise TimestampedException(exception, timestamp)
return result
@@ -1006,36 +1027,3 @@ class PipelineStepObserver():
This method is called provided that the observed PipelineStepObject is created as a context using a with statement.
"""
pass
-
-class PipelineInputProvider(PipelineStepObject):
- """
- Define class to ...
- """
- @PipelineStepInit
- def __init__(self, **kwargs):
-
- logging.debug('PipelineInputProvider.__init__')
-
- PipelineStepObject.__init__(self)
-
- def attach(self, method):
-
- logging.debug('PipelineInputProvider.attach', method)
-
- def __enter__(self):
- """
- Define abstract __enter__ method to use device as a context.
-
- !!! warning
- This method is called provided that the PipelineInputProvider is created as a context using a with statement.
- """
- return self
-
- def __exit__(self, type, value, traceback):
- """
- Define abstract __exit__ method to use device as a context.
-
- !!! warning
- This method is called provided that the PipelineInputProvider is created as a context using a with statement.
- """
- pass \ No newline at end of file