From 8c07348a64eecb671e6da01e6089bbec32fd94a6 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Tue, 23 Apr 2024 09:09:21 +0200 Subject: Using tabs. --- src/argaze/utils/UtilsFeatures.py | 610 +++++++++++++++++++------------------- 1 file changed, 305 insertions(+), 305 deletions(-) (limited to 'src') diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py index ccee8a2..695f372 100644 --- a/src/argaze/utils/UtilsFeatures.py +++ b/src/argaze/utils/UtilsFeatures.py @@ -30,390 +30,390 @@ import numpy import cv2 def print_progress_bar (iteration:int, total:int, prefix:str = '', suffix:str = '', decimals:int = 1, length:int = 100, fill:str = '█', printEnd:str = "\r"): - """ - Print iterations progress. - Call in a loop to create terminal progress bar. - - Parameters: - iteration: current iteration - total: total iterations - prefix: string to print before progress bar - suffix: string to print after progress bar - decimals: positive number of decimals in percent complete - length: character length of bar - fill: bar fill character - printEnd: end character (e.g. \\r, \\r\\n) - """ - percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) - filledLength = int(length * iteration // total) - bar = fill * filledLength + '-' * (length - filledLength) - print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd) - - # Print New Line on Complete - if iteration == total: - print() + """ + Print iterations progress. + Call in a loop to create terminal progress bar. + + Parameters: + iteration: current iteration + total: total iterations + prefix: string to print before progress bar + suffix: string to print after progress bar + decimals: positive number of decimals in percent complete + length: character length of bar + fill: bar fill character + printEnd: end character (e.g. \\r, \\r\\n) + """ + percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + filledLength = int(length * iteration // total) + bar = fill * filledLength + '-' * (length - filledLength) + print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd) + + # Print New Line on Complete + if iteration == total: + print() def import_from_test_package(module: str) -> types.ModuleType: - """ - Import module from ArGaze test package. + """ + Import module from ArGaze test package. - Parameters: - module: module name into ArGaze package + Parameters: + module: module name into ArGaze package - Returns: - module named Test - """ + Returns: + module named Test + """ - import argaze - import importlib.util - import sys - import os + import argaze + import importlib.util + import sys + import os - source_directory = os.path.dirname(os.path.dirname(os.path.abspath(argaze.__file__))) - module_directory = os.path.join(source_directory, 'argaze.test', f'{module}.py') + source_directory = os.path.dirname(os.path.dirname(os.path.abspath(argaze.__file__))) + module_directory = os.path.join(source_directory, 'argaze.test', f'{module}.py') - spec = importlib.util.spec_from_file_location(f'{module}Test', module_directory) - TestModule = importlib.util.module_from_spec(spec) - sys.modules[f'{module}Test'] = TestModule - spec.loader.exec_module(TestModule) + spec = importlib.util.spec_from_file_location(f'{module}Test', module_directory) + TestModule = importlib.util.module_from_spec(spec) + sys.modules[f'{module}Test'] = TestModule + spec.loader.exec_module(TestModule) - return TestModule + return TestModule class TimeProbe(): - """ - Assess temporal performance. - """ + """ + Assess temporal performance. + """ - def __init__(self): + def __init__(self): - self.start() + self.start() - def start(self): - """ - Start chronometer. - """ + def start(self): + """ + Start chronometer. + """ - # noinspection PyAttributeOutsideInit - self.__last_time = time.perf_counter() - # noinspection PyAttributeOutsideInit - self.__lap_counter = 0 - # noinspection PyAttributeOutsideInit - self.__elapsed_time = 0 + # noinspection PyAttributeOutsideInit + self.__last_time = time.perf_counter() + # noinspection PyAttributeOutsideInit + self.__lap_counter = 0 + # noinspection PyAttributeOutsideInit + self.__elapsed_time = 0 - def lap(self) -> tuple[float, int, float]: - """ - Get lap info. + def lap(self) -> tuple[float, int, float]: + """ + Get lap info. - Returns: - last lap time (millisecond) - number of laps - total elapsed time (millisecond) - """ + Returns: + last lap time (millisecond) + number of laps + total elapsed time (millisecond) + """ - lap_time = time.perf_counter() - self.__last_time + lap_time = time.perf_counter() - self.__last_time - # noinspection PyAttributeOutsideInit - self.__last_time = time.perf_counter() - self.__lap_counter += 1 - self.__elapsed_time += lap_time + # noinspection PyAttributeOutsideInit + self.__last_time = time.perf_counter() + self.__lap_counter += 1 + self.__elapsed_time += lap_time - return lap_time * 1e3, self.__lap_counter, self.__elapsed_time * 1e3 + return lap_time * 1e3, self.__lap_counter, self.__elapsed_time * 1e3 - def end(self) -> tuple[float, int]: - """ - Stop chronometer + def end(self) -> tuple[float, int]: + """ + Stop chronometer - Returns: - elapsed time (millisecond) - """ + Returns: + elapsed time (millisecond) + """ - self.__elapsed_time += time.perf_counter() - self.__last_time + self.__elapsed_time += time.perf_counter() - self.__last_time - return self.__elapsed_time * 1e3, self.__lap_counter + return self.__elapsed_time * 1e3, self.__lap_counter - def restart(self): - """ - Restart chronometer. - """ - - self.start() + def restart(self): + """ + Restart chronometer. + """ + + self.start() def tuple_to_string(t: tuple, separator: str = ", ") -> str: - """Convert tuple elements into quoted strings separated by a separator string.""" + """Convert tuple elements into quoted strings separated by a separator string.""" - return separator.join(f'\"{e}\"' for e in t) + return separator.join(f'\"{e}\"' for e in t) def PrintCallStack(method): - """Define a decorator to print call stack until the decorated method.""" + """Define a decorator to print call stack until the decorated method.""" - def wrapper(self, *args, **kwargs): - """Wrap method to print call stack before its call. + def wrapper(self, *args, **kwargs): + """Wrap method to print call stack before its call. - Parameters: - self: - args: method arguments. - kwargs: extra arguments. - """ - print(f'Call stack until method \'{method.__name__}\':', ) - - traceback.print_stack() + Parameters: + self: + args: method arguments. + kwargs: extra arguments. + """ + print(f'Call stack until method \'{method.__name__}\':', ) + + traceback.print_stack() - return method(self, *args, **kwargs) + return method(self, *args, **kwargs) - return wrapper + return wrapper class FileWriter(DataFeatures.PipelineStepObject): - """Write data into a file line by line.""" + """Write data into a file line by line.""" - # noinspection PyMissingConstructor - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): - - # Init private attributes - self.__path = None - self.__separator = ',' - self.__header = None + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + # Init private attributes + self.__path = None + self.__separator = ',' + self.__header = None - @property - def path(self) -> str: - """File path where to write data.""" - return self.__path + @property + def path(self) -> str: + """File path where to write data.""" + return self.__path - @path.setter - def path(self, path: str): + @path.setter + def path(self, path: str): - self.__path = pathlib.Path(path) + self.__path = pathlib.Path(path) - @property - def separator(self) -> str: - """String used to separate elements during tuple to string conversion.""" - return self.__separator + @property + def separator(self) -> str: + """String used to separate elements during tuple to string conversion.""" + return self.__separator - @separator.setter - def separator(self, separator: str): + @separator.setter + def separator(self, separator: str): - self.__separator = separator + self.__separator = separator - @property - def header(self) -> str|tuple: - """String or tuple to write first.""" - return self.__header + @property + def header(self) -> str|tuple: + """String or tuple to write first.""" + return self.__header - @header.setter - def header(self, header: str|tuple): + @header.setter + def header(self, header: str|tuple): - self.__header = header + self.__header = header - @DataFeatures.PipelineStepEnter - def __enter__(self): - """Check that folder structure exist and open file then, write header line.""" + @DataFeatures.PipelineStepEnter + def __enter__(self): + """Check that folder structure exist and open file then, write header line.""" - if not os.path.exists(self.__path.parent.absolute()): + if not os.path.exists(self.__path.parent.absolute()): - os.makedirs(self.__path.parent.absolute()) + os.makedirs(self.__path.parent.absolute()) - # Open file - self.__file = open(self.__path, 'w', encoding='utf-8', buffering=1) + # Open file + self.__file = open(self.__path, 'w', encoding='utf-8', buffering=1) - # Write header if required - if self.__header is not None: + # Write header if required + if self.__header is not None: - # Format list or tuple element into quoted strings - if not isinstance(self.__header, str): + # Format list or tuple element into quoted strings + if not isinstance(self.__header, str): - self.__header = tuple_to_string(self.__header, self.__separator) + self.__header = tuple_to_string(self.__header, self.__separator) - print(self.__header, file=self.__file, flush=True) + print(self.__header, file=self.__file, flush=True) - @DataFeatures.PipelineStepExit - def __exit__(self, exception_type, exception_value, exception_traceback): - """Close file.""" - self.__file.close() + @DataFeatures.PipelineStepExit + def __exit__(self, exception_type, exception_value, exception_traceback): + """Close file.""" + self.__file.close() - def write(self, data: str|tuple): - """Write data as a new line into file. + def write(self, data: str|tuple): + """Write data as a new line into file. - !!! note - Tuple elements are converted into quoted strings separated by separator string. - """ + !!! note + Tuple elements are converted into quoted strings separated by separator string. + """ - # Format list or tuple element into quoted strings - if not isinstance(data, str): + # Format list or tuple element into quoted strings + if not isinstance(data, str): - data = tuple_to_string(data, self.__separator) + data = tuple_to_string(data, self.__separator) - # Write into file - print(data, file=self.__file, flush=True) + # Write into file + print(data, file=self.__file, flush=True) class FileReader(DataFeatures.PipelineStepObject): - """Read data from a file line by line.""" + """Read data from a file line by line.""" - # noinspection PyMissingConstructor - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): - - # Init private attributes - self.__path = None - self.__separator = ',' + # noinspection PyMissingConstructor + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + # Init private attributes + self.__path = None + self.__separator = ',' - @property - def path(self) -> str: - """File path where to read data.""" - return self.__path + @property + def path(self) -> str: + """File path where to read data.""" + return self.__path - @path.setter - def path(self, path: str): + @path.setter + def path(self, path: str): - self.__path = pathlib.Path(path) + self.__path = pathlib.Path(path) - @property - def separator(self) -> str: - """String used to separate elements during string to tuple conversion.""" - return self.__separator + @property + def separator(self) -> str: + """String used to separate elements during string to tuple conversion.""" + return self.__separator - @separator.setter - def separator(self, separator: str): + @separator.setter + def separator(self, separator: str): - self.__separator = separator + self.__separator = separator - @DataFeatures.PipelineStepEnter - def __enter__(self): + @DataFeatures.PipelineStepEnter + def __enter__(self): - # Open file - self.__file = csv.reader(open(self.__path), delimiter= self.__separator) + # Open file + self.__file = csv.reader(open(self.__path), delimiter= self.__separator) - @DataFeatures.PipelineStepExit - def __exit__(self, exception_type, exception_value, exception_traceback): - - pass + @DataFeatures.PipelineStepExit + def __exit__(self, exception_type, exception_value, exception_traceback): + + pass - def read(self) -> str|tuple: - """Read next data from file. + def read(self) -> str|tuple: + """Read next data from file. - !!! note - Quoted strings separated by separator string are converted into tuple elements. - """ + !!! note + Quoted strings separated by separator string are converted into tuple elements. + """ - try: + try: - return next(self.__file) + return next(self.__file) - except Exception: + except Exception: - raise EOFError + raise EOFError class VideoWriter(DataFeatures.PipelineStepObject, DataFeatures.SharedObject): - """Open ffmpeg application as sub-process. - FFmpeg input PIPE: RAW images in BGR color format - FFmpeg output MP4 file encoded with HEVC codec. - - Arguments list: - -y Overwrite output file without asking - -s {width}x{height} Input resolution width x height (1344x756) - -pixel_format bgr24 Input frame color format is BGR with 8 bits per color component - -f rawvideo Input format: raw video - -r {fps} Frame rate: fps - -i pipe: ffmpeg input is a PIPE - -vcodec libx265 Video codec: H.265 (HEVC) - -pix_fmt yuv420p Output video color space YUV420 (saving space compared to YUV444) - -crf 24 Constant quality encoding (lower value for higher quality and larger output file). - {output_filename} Output file name: output_filename (output.mp4) - """ - - @DataFeatures.PipelineStepInit - def __init__(self, **kwargs): - - # Init parent classes - DataFeatures.SharedObject.__init__(self) - - # Init private attributes - self.__path = None - self.__width = 320 - self.__height = 240 - self.__fps = 25 - - @property - def path(self) -> str: - """File path where to write images.""" - return self.__path - - @path.setter - def path(self, path: str): - - self.__path = pathlib.Path(path) - - @property - def width(self) -> int: - """Video horizontal resolution.""" - return self.__width - - @width.setter - def width(self, width: int): - - self.__width = width - - @property - def height(self) -> int: - """Video vertical resolution.""" - return self.__height - - @height.setter - def height(self, height: int): - - self.__height = height - - @property - def fps(self) -> int: - """frame per second.""" - return self.__fps - - @fps.setter - def fps(self, fps: int): - - self.__fps = fps - - @DataFeatures.PipelineStepEnter - def __enter__(self): - """Check that folder structure exist then, open ffmpeg subprocess.""" - - # Use lock feature - with self._lock: - - import subprocess as sp - import shlex - - if not os.path.exists(self.__path.parent.absolute()): - - os.makedirs(self.__path.parent.absolute()) - - self.__process = sp.Popen(shlex.split(f'ffmpeg -hide_banner -loglevel error -y -s {self.__width}x{self.__height} -pixel_format bgr24 -f rawvideo -r {self.__fps} -i pipe: -vcodec libx265 -x265-params log-level=error -pix_fmt yuv420p -crf 24 {self.__path.as_posix()}'), stdin=sp.PIPE) - - @DataFeatures.PipelineStepExit - def __exit__(self, exception_type, exception_value, exception_traceback): - - # Use lock feature - with self._lock: - - # Close and flush stdin - self.__process.stdin.close() - - # Wait for sub-process to finish - self.__process.wait() - - # Terminate the sub-process - # Note: We don't have to terminate the sub-process (after process.wait(), the sub-process is supposed to be closed). - self.__process.terminate() - - def write(self, image: numpy.array): - """Write raw video frame to input stream of ffmpeg sub-process.""" + """Open ffmpeg application as sub-process. + FFmpeg input PIPE: RAW images in BGR color format + FFmpeg output MP4 file encoded with HEVC codec. + + Arguments list: + -y Overwrite output file without asking + -s {width}x{height} Input resolution width x height (1344x756) + -pixel_format bgr24 Input frame color format is BGR with 8 bits per color component + -f rawvideo Input format: raw video + -r {fps} Frame rate: fps + -i pipe: ffmpeg input is a PIPE + -vcodec libx265 Video codec: H.265 (HEVC) + -pix_fmt yuv420p Output video color space YUV420 (saving space compared to YUV444) + -crf 24 Constant quality encoding (lower value for higher quality and larger output file). + {output_filename} Output file name: output_filename (output.mp4) + """ + + @DataFeatures.PipelineStepInit + def __init__(self, **kwargs): + + # Init parent classes + DataFeatures.SharedObject.__init__(self) + + # Init private attributes + self.__path = None + self.__width = 320 + self.__height = 240 + self.__fps = 25 + + @property + def path(self) -> str: + """File path where to write images.""" + return self.__path + + @path.setter + def path(self, path: str): + + self.__path = pathlib.Path(path) + + @property + def width(self) -> int: + """Video horizontal resolution.""" + return self.__width + + @width.setter + def width(self, width: int): + + self.__width = width + + @property + def height(self) -> int: + """Video vertical resolution.""" + return self.__height + + @height.setter + def height(self, height: int): + + self.__height = height + + @property + def fps(self) -> int: + """frame per second.""" + return self.__fps + + @fps.setter + def fps(self, fps: int): + + self.__fps = fps + + @DataFeatures.PipelineStepEnter + def __enter__(self): + """Check that folder structure exist then, open ffmpeg subprocess.""" + + # Use lock feature + with self._lock: + + import subprocess as sp + import shlex + + if not os.path.exists(self.__path.parent.absolute()): + + os.makedirs(self.__path.parent.absolute()) + + self.__process = sp.Popen(shlex.split(f'ffmpeg -hide_banner -loglevel error -y -s {self.__width}x{self.__height} -pixel_format bgr24 -f rawvideo -r {self.__fps} -i pipe: -vcodec libx265 -x265-params log-level=error -pix_fmt yuv420p -crf 24 {self.__path.as_posix()}'), stdin=sp.PIPE) + + @DataFeatures.PipelineStepExit + def __exit__(self, exception_type, exception_value, exception_traceback): + + # Use lock feature + with self._lock: + + # Close and flush stdin + self.__process.stdin.close() + + # Wait for sub-process to finish + self.__process.wait() + + # Terminate the sub-process + # Note: We don't have to terminate the sub-process (after process.wait(), the sub-process is supposed to be closed). + self.__process.terminate() + + def write(self, image: numpy.array): + """Write raw video frame to input stream of ffmpeg sub-process.""" - # Use lock feature - with self._lock: - - # Check if subprocess still alive - if self.__process.poll() is None: + # Use lock feature + with self._lock: + + # Check if subprocess still alive + if self.__process.poll() is None: - # Resize image to adapt to video resolution - output = cv2.resize(image, dsize=(self.__width, self.__height), interpolation=cv2.INTER_LINEAR) + # Resize image to adapt to video resolution + output = cv2.resize(image, dsize=(self.__width, self.__height), interpolation=cv2.INTER_LINEAR) - self.__process.stdin.write(output.tobytes()) + self.__process.stdin.write(output.tobytes()) -- cgit v1.1