diff options
Diffstat (limited to 'src/argaze/DataFeatures.py')
-rw-r--r-- | src/argaze/DataFeatures.py | 322 |
1 files changed, 155 insertions, 167 deletions
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index e24ecf1..f573f1c 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -41,12 +41,6 @@ WORKING_DIRECTORY = [None] def get_working_directory() -> str: """Get global working directory.""" - - # Check global working directory - if WORKING_DIRECTORY[0] is None: - - raise(ValueError(f'No working directory.')) - return WORKING_DIRECTORY[0] def set_working_directory(working_directory: str): @@ -62,15 +56,6 @@ def set_working_directory(working_directory: str): WORKING_DIRECTORY[0] = working_directory -def module_path(obj) -> str: - """ - Get object module path. - - Returns: - module path - """ - return obj.__class__.__module__ - def get_class(class_path: str) -> object: """Get class object from 'path.to.class' string. @@ -105,6 +90,113 @@ def properties(cls) -> list: return properties +def from_json(configuration_filepath: str, patch_filepath: str = None) -> object: + """ + Load object instance from .json file. + + !!! note + The directory where configuration file is will be the global working directory. + + Parameters: + configuration_filepath: path to json configuration file + patch_filepath: path to json patch file to modify any configuration entries + """ + + logging.debug('DataFeatures.from_json') + + # Edit working directory once + if get_working_directory() is None: + + set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath))) + + logging.debug('\t> set global working directory as %s', get_working_directory()) + + # Load configuration from JSON file + with open(configuration_filepath) as configuration_file: + + object_data = json.load(configuration_file) + + # Apply patch to configuration if required + if patch_filepath is not None: + + with open(patch_filepath) as patch_file: + + patch_data = json.load(patch_file) + + import collections.abc + + def update(d, u): + + for k, v in u.items(): + + if isinstance(v, collections.abc.Mapping): + + d[k] = update(d.get(k, {}), v) + + elif v is None: + + del d[k] + + else: + + d[k] = v + + return d + + objects_data = update(object_data, patch_data) + + # Load unique object + object_class, object_data = object_data.popitem() + + # Instanciate class + logging.debug('\t+ create %s object', object_class) + + return get_class(object_class)(**object_data) + +def from_dict(expected_value_type: type, data: dict) -> any: + """Load expected type instance(s) from dict values.""" + + logging.debug('\t> load %s from dict', expected_value_type.__name__) + + # Check if json keys are PipelineStepObject class and store them in a list + new_objects_list = [] + + for key, value in data.items(): + + try: + + new_class = get_class(key) + + except ValueError as e: + + # Keys are not class name + if str(e) == 'Empty module name': + + break + + else: + + raise(e) + + logging.debug('\t+ create %s object from key using value as argument', key) + + new_objects_list.append( new_class(**value) ) + + # Only one object have been loaded: pass the object if it is a subclass of expected type + if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type): + + return new_objects_list[0] + + # Pass non empty objects list + elif len(new_objects_list) > 0: + + return new_objects_list + + # Otherwise, data are parameters of the expected class + logging.debug('\t+ create %s object using dict as argument', expected_value_type.__name__) + + return expected_value_type(**data) + def as_dict(obj, filter: bool=True) -> dict: """Export object as dictionary. @@ -229,7 +321,7 @@ class TimestampedObjectsList(list): def append(self, ts_object: TimestampedObject|dict): """Append timestamped object.""" - # Convert dict into GazePosition + # Convert dict into object if type(ts_object) == dict: ts_object = self.__object_type.from_dict(ts_object) @@ -472,6 +564,33 @@ class SharedObject(TimestampedObject): self._execution_times = {} self._exceptions = {} +class TimestampedException(Exception, TimestampedObject): + """Wrap exception to keep track of raising timestamp.""" + + def __init__(self, exception = Exception, timestamp: int|float = math.nan): + + Exception.__init__(self, exception) + TimestampedObject.__init__(self, timestamp) + +class TimeStampedExceptions(TimestampedObjectsList): + """Handle timestamped exceptions into a list.""" + + def __init__(self, exceptions: list = []): + + TimestampedObjectsList.__init__(self, TimestampedException, exceptions) + + def values(self) -> list[str]: + """Get all timestamped exception values as list of messages.""" + return [ts_exception.message for ts_exception in self] + +class PipelineStepLoadingFailed(Exception): + """ + Exception raised when pipeline step object loading fails. + """ + def __init__(self, message): + + super().__init__(message) + def PipelineStepInit(method): """Define a decorator use into PipelineStepObject class to declare pipeline step init method.""" @@ -512,53 +631,10 @@ def PipelineStepAttributeSetter(method): except KeyError: - raise(ValueError(f'Annotations are missing for {method.__name__}: {method.__annotations__}')) - - logging.debug('@PipelineStepAttributeSetter %s.%s.setter(%s) with %s', type(self).__name__, method.__name__, expected_value_type.__name__, new_value_type.__name__) + raise(PipelineStepLoadingFailed(f'Annotations are missing for {method.__name__}: {method.__annotations__}')) - # Define function to load dict values - def load_dict(data: dict) -> any: - - logging.debug('\t> load %s from %s', expected_value_type.__name__, new_value_type.__name__) - - # Check if json keys are PipelineStepObject class and store them in a list - new_objects_list = [] - - for key, value in data.items(): - - try: - - new_class = get_class(key) - - except ValueError as e: - - # Keys are not class name - if str(e) == 'Empty module name': - - break - - else: - - raise(e) - - logging.debug('\t+ create %s object from key using value as argument', key) - - new_objects_list.append( new_class(**value) ) - - # Only one object have been loaded: pass the object if it is a subclass of expected type - if len(new_objects_list) == 1 and issubclass(type(new_objects_list[0]), expected_value_type): - - return new_objects_list[0] - - # Pass non empty objects list - elif len(new_objects_list) > 0: - - return new_objects_list - - # Otherwise, data are parameters of the expected class - logging.debug('\t+ create %s object using %s as argument', expected_value_type.__name__, new_value_type.__name__) - - return expected_value_type(**data) + logging.debug('%s@%s.setter', type(self).__name__, method.__name__) + logging.debug('\t> set %s with %s', expected_value_type.__name__, new_value_type.__name__) # String not expected: load value from file if new_value_type == str and new_value_type != expected_value_type: @@ -568,28 +644,28 @@ def PipelineStepAttributeSetter(method): # String have a dot inside: file path with format if len(split_point) > 1: - file_format = split_point[-1] + file_format = split_point[-1].upper() - logging.debug('\t> %s is a path to a %s file', new_value, file_format.upper()) + logging.debug('\t> %s is a path to a %s file', new_value, file_format) filepath = os.path.join(get_working_directory(), new_value) # Load image from JPG and PNG formats - if file_format == 'jpg' or file_format == 'png': + if file_format == 'JPG' or file_format == 'PNG': return method(self, cv2.imread(filepath)) # Load image from OBJ formats - elif file_format == 'obj': + elif file_format == 'OBJ': return method(self, expected_value_type.from_obj(filepath)) # Load object from JSON file - elif file_format == 'json': + elif file_format == 'JSON': with open(filepath) as file: - return method(self, load_dict(json.load(file))) + return method(self, from_dict(expected_value_type, json.load(file))) # No point inside string: identifier name else: @@ -602,7 +678,7 @@ def PipelineStepAttributeSetter(method): # Dict not expected: load value from dict if new_value_type == dict and expected_value_type != dict: - return method(self, load_dict(new_value)) + return method(self, from_dict(expected_value_type, new_value)) # Otherwise, pass new value to setter method logging.debug('\t> use %s value as passed', new_value_type.__name__) @@ -620,7 +696,7 @@ class PipelineStepObject(): def __init__(self, **kwargs): """Initialize PipelineStepObject.""" - logging.debug('PipelineStepObject.__init__ %s', type(self).__name__) + logging.debug('%s.__init__', type(self).__name__) # Init private attribute self.__name = None @@ -633,6 +709,8 @@ class PipelineStepObject(): def __enter__(self): """At with statement start.""" + logging.debug('%s.__enter__', type(self).__name__) + # Start children pipeline step objects for child in self.children: @@ -648,6 +726,8 @@ class PipelineStepObject(): def __exit__(self, exception_type, exception_value, exception_traceback): """At with statement end.""" + logging.debug('%s.__exit__', type(self).__name__) + # End observers for observer in self.__observers: @@ -665,7 +745,7 @@ class PipelineStepObject(): if hasattr(self, key): - logging.debug('PipelineStepObject.update_attributes %s.%s with %s value', type(self).__name__, key, type(value).__name__) + logging.debug('%s.update_attributes > update %s with %s value', type(self).__name__, key, type(value).__name__) setattr(self, key, value) @@ -721,65 +801,6 @@ class PipelineStepObject(): "observers": self.__observers } - @classmethod - def from_json(cls, configuration_filepath: str, patch_filepath: str = None) -> object: - """ - Load instance from .json file. - - !!! note - The directory where configuration file is will be the global working directory. - - Parameters: - configuration_filepath: path to json configuration file - patch_filepath: path to json patch file to modify any configuration entries - """ - - logging.debug('%s.from_json', cls.__name__) - - # Edit working directory - set_working_directory(os.path.dirname(os.path.abspath(configuration_filepath))) - - logging.debug('\t> set global working directory as %s', get_working_directory()) - - # Load configuration from JSON file - with open(configuration_filepath) as configuration_file: - - object_data = json.load(configuration_file) - - # Apply patch to configuration if required - if patch_filepath is not None: - - with open(patch_filepath) as patch_file: - - patch_data = json.load(patch_file) - - import collections.abc - - def update(d, u): - - for k, v in u.items(): - - if isinstance(v, collections.abc.Mapping): - - d[k] = update(d.get(k, {}), v) - - elif v is None: - - del d[k] - - else: - - d[k] = v - - return d - - object_data = update(object_data, patch_data) - - # Instanciate class - logging.debug('\t+ create %s object from configuration updated by patch', cls.__name__) - - return cls(**object_data) - def to_json(self, json_filepath: str = None): """Save pipeline step object into .json file.""" @@ -791,10 +812,10 @@ class PipelineStepObject(): # Open file with open(self.__json_filepath, 'w', encoding='utf-8') as object_file: - json.dump({module_path(self):as_dict(self)}, object_file, ensure_ascii=False, indent=4) + json.dump({self.__class__.__module__:as_dict(self)}, object_file, ensure_ascii=False, indent=4) # QUESTION: maybe we need two saving mode? - #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=DataFeatures.JsonEncoder) + #json.dump(self, object_file, ensure_ascii=False, indent=4, cls=JsonEncoder) def __str__(self) -> str: """ @@ -973,10 +994,10 @@ def PipelineStepMethod(method): # Call subscription subscription(timestamp, self, exception) - # Raise exception + # Raise timestamped exception if exception is not None: - raise exception + raise TimestampedException(exception, timestamp) return result @@ -1006,36 +1027,3 @@ class PipelineStepObserver(): This method is called provided that the observed PipelineStepObject is created as a context using a with statement. """ pass - -class PipelineInputProvider(PipelineStepObject): - """ - Define class to ... - """ - @PipelineStepInit - def __init__(self, **kwargs): - - logging.debug('PipelineInputProvider.__init__') - - PipelineStepObject.__init__(self) - - def attach(self, method): - - logging.debug('PipelineInputProvider.attach', method) - - def __enter__(self): - """ - Define abstract __enter__ method to use device as a context. - - !!! warning - This method is called provided that the PipelineInputProvider is created as a context using a with statement. - """ - return self - - def __exit__(self, type, value, traceback): - """ - Define abstract __exit__ method to use device as a context. - - !!! warning - This method is called provided that the PipelineInputProvider is created as a context using a with statement. - """ - pass
\ No newline at end of file |