"""Miscellaneous class and functions used in utils script.""" """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import os import pathlib import time import csv import types import traceback from argaze import DataFeatures import numpy import cv2 def print_progress_bar (iteration:int, total:int, prefix:str = '', suffix:str = '', decimals:int = 1, length:int = 100, fill:str = '█', printEnd:str = "\r"): """ Print iterations progress. Call in a loop to create terminal progress bar. Parameters: iteration: current iteration total: total iterations prefix: string to print before progress bar suffix: string to print after progress bar decimals: positive number of decimals in percent complete length: character length of bar fill: bar fill character printEnd: end character (e.g. \\r, \\r\\n) """ percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filledLength = int(length * iteration // total) bar = fill * filledLength + '-' * (length - filledLength) print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd) # Print New Line on Complete if iteration == total: print() def import_from_test_package(module: str) -> types.ModuleType: """ Import module from ArGaze test package. Parameters: module: module name into ArGaze package Returns: module named Test """ import argaze import importlib.util import sys import os source_directory = os.path.dirname(os.path.dirname(os.path.abspath(argaze.__file__))) module_directory = os.path.join(source_directory, 'argaze.test', f'{module}.py') spec = importlib.util.spec_from_file_location(f'{module}Test', module_directory) TestModule = importlib.util.module_from_spec(spec) sys.modules[f'{module}Test'] = TestModule spec.loader.exec_module(TestModule) return TestModule class TimeProbe(): """ Assess temporal performance. """ def __init__(self): self.start() def start(self): """ Start chronometer. """ # noinspection PyAttributeOutsideInit self.__last_time = time.perf_counter() # noinspection PyAttributeOutsideInit self.__lap_counter = 0 # noinspection PyAttributeOutsideInit self.__elapsed_time = 0 def lap(self) -> tuple[float, int, float]: """ Get lap info. Returns: last lap time (millisecond) number of laps total elapsed time (millisecond) """ lap_time = time.perf_counter() - self.__last_time # noinspection PyAttributeOutsideInit self.__last_time = time.perf_counter() self.__lap_counter += 1 self.__elapsed_time += lap_time return lap_time * 1e3, self.__lap_counter, self.__elapsed_time * 1e3 def end(self) -> tuple[float, int]: """ Stop chronometer Returns: elapsed time (millisecond) """ self.__elapsed_time += time.perf_counter() - self.__last_time return self.__elapsed_time * 1e3, self.__lap_counter def restart(self): """ Restart chronometer. """ self.start() def tuple_to_string(t: tuple, separator: str = ", ") -> str: """Convert tuple elements into quoted strings separated by a separator string.""" return separator.join(f'\"{e}\"' for e in t) def PrintCallStack(method): """Define a decorator to print call stack until the decorated method.""" def wrapper(self, *args, **kwargs): """Wrap method to print call stack before its call. Parameters: self: args: method arguments. kwargs: extra arguments. """ print(f'Call stack until method \'{method.__name__}\':', ) traceback.print_stack() return method(self, *args, **kwargs) return wrapper class FileWriter(DataFeatures.PipelineStepObject): """Write data into a file line by line.""" # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): # Init private attributes self.__path = None self.__separator = ',' self.__header = None @property def path(self) -> str: """File path where to write data.""" return self.__path @path.setter def path(self, path: str): self.__path = pathlib.Path(path) @property def separator(self) -> str: """String used to separate elements during tuple to string conversion.""" return self.__separator @separator.setter def separator(self, separator: str): self.__separator = separator @property def header(self) -> str|tuple: """String or tuple to write first.""" return self.__header @header.setter def header(self, header: str|tuple): self.__header = header @DataFeatures.PipelineStepEnter def __enter__(self): """Check that folder structure exist and open file then, write header line.""" if not os.path.exists(self.__path.parent.absolute()): os.makedirs(self.__path.parent.absolute()) # Open file self.__file = open(self.__path, 'w', encoding='utf-8', buffering=1) # Write header if required if self.__header is not None: # Format list or tuple element into quoted strings if not isinstance(self.__header, str): self.__header = tuple_to_string(self.__header, self.__separator) print(self.__header, file=self.__file, flush=True) @DataFeatures.PipelineStepExit def __exit__(self, exception_type, exception_value, exception_traceback): """Close file.""" self.__file.close() def write(self, data: str|tuple): """Write data as a new line into file. !!! note Tuple elements are converted into quoted strings separated by separator string. """ # Format list or tuple element into quoted strings if not isinstance(data, str): data = tuple_to_string(data, self.__separator) # Write into file print(data, file=self.__file, flush=True) class FileReader(DataFeatures.PipelineStepObject): """Read data from a file line by line.""" # noinspection PyMissingConstructor @DataFeatures.PipelineStepInit def __init__(self, **kwargs): # Init private attributes self.__path = None self.__separator = ',' @property def path(self) -> str: """File path where to read data.""" return self.__path @path.setter def path(self, path: str): self.__path = pathlib.Path(path) @property def separator(self) -> str: """String used to separate elements during string to tuple conversion.""" return self.__separator @separator.setter def separator(self, separator: str): self.__separator = separator @DataFeatures.PipelineStepEnter def __enter__(self): # Open file self.__file = csv.reader(open(self.__path), delimiter= self.__separator) @DataFeatures.PipelineStepExit def __exit__(self, exception_type, exception_value, exception_traceback): pass def read(self) -> str|tuple: """Read next data from file. !!! note Quoted strings separated by separator string are converted into tuple elements. """ try: return next(self.__file) except Exception: raise EOFError class VideoWriter(DataFeatures.PipelineStepObject, DataFeatures.SharedObject): """Open ffmpeg application as sub-process. FFmpeg input PIPE: RAW images in BGR color format FFmpeg output MP4 file encoded with HEVC codec. Arguments list: -y Overwrite output file without asking -s {width}x{height} Input resolution width x height (1344x756) -pixel_format bgr24 Input frame color format is BGR with 8 bits per color component -f rawvideo Input format: raw video -r {fps} Frame rate: fps -i pipe: ffmpeg input is a PIPE -vcodec libx265 Video codec: H.265 (HEVC) -pix_fmt yuv420p Output video color space YUV420 (saving space compared to YUV444) -crf 24 Constant quality encoding (lower value for higher quality and larger output file). {output_filename} Output file name: output_filename (output.mp4) """ @DataFeatures.PipelineStepInit def __init__(self, **kwargs): # Init parent classes DataFeatures.SharedObject.__init__(self) # Init private attributes self.__path = None self.__width = 320 self.__height = 240 self.__fps = 25 @property def path(self) -> str: """File path where to write images.""" return self.__path @path.setter def path(self, path: str): self.__path = pathlib.Path(path) @property def width(self) -> int: """Video horizontal resolution.""" return self.__width @width.setter def width(self, width: int): self.__width = width @property def height(self) -> int: """Video vertical resolution.""" return self.__height @height.setter def height(self, height: int): self.__height = height @property def fps(self) -> int: """frame per second.""" return self.__fps @fps.setter def fps(self, fps: int): self.__fps = fps @DataFeatures.PipelineStepEnter def __enter__(self): """Check that folder structure exist then, open ffmpeg subprocess.""" # Use lock feature with self._lock: import subprocess as sp import shlex if not os.path.exists(self.__path.parent.absolute()): os.makedirs(self.__path.parent.absolute()) self.__process = sp.Popen(shlex.split(f'ffmpeg -hide_banner -loglevel error -y -s {self.__width}x{self.__height} -pixel_format bgr24 -f rawvideo -r {self.__fps} -i pipe: -vcodec libx265 -x265-params log-level=error -pix_fmt yuv420p -crf 24 {self.__path.as_posix()}'), stdin=sp.PIPE) @DataFeatures.PipelineStepExit def __exit__(self, exception_type, exception_value, exception_traceback): # Use lock feature with self._lock: # Close and flush stdin self.__process.stdin.close() # Wait for sub-process to finish self.__process.wait() # Terminate the sub-process # Note: We don't have to terminate the sub-process (after process.wait(), the sub-process is supposed to be closed). self.__process.terminate() def write(self, image: numpy.array): """Write raw video frame to input stream of ffmpeg sub-process.""" # Use lock feature with self._lock: # Check if subprocess still alive if self.__process.poll() is None: # Resize image to adapt to video resolution output = cv2.resize(image, dsize=(self.__width, self.__height), interpolation=cv2.INTER_LINEAR) self.__process.stdin.write(output.tobytes())