"""Miscellaneous class and functions used in utils script.
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see .
"""
__author__ = "Théo de la Hogue"
__credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
import os
import pathlib
import time
import types
import traceback
from argaze import DataFeatures
import numpy
import cv2
def print_progress_bar (iteration:int, total:int, prefix:str = '', suffix:str = '', decimals:int = 1, length:int = 100, fill:str = '█', printEnd:str = "\r"):
"""
Print iterations progress.
Call in a loop to create terminal progress bar.
Parameters:
iteration: current iteration
total: total iterations
prefix: string to print before progress bar
suffix: string to print after progress bar
decimals: positive number of decimals in percent complete
length: character length of bar
fill: bar fill character
printEnd: end character (e.g. \\r, \\r\\n)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd)
# Print New Line on Complete
if iteration == total:
print()
def import_from_test_package(module: str) -> types.ModuleType:
"""
Import module from ArGaze test package.
Parameters:
module: module name into ArGaze package
Returns:
module named Test
"""
import argaze
import importlib.util
import sys
import os
source_directory = os.path.dirname(os.path.dirname(os.path.abspath(argaze.__file__)))
module_directory = os.path.join(source_directory, 'argaze.test', f'{module}.py')
spec = importlib.util.spec_from_file_location(f'{module}Test', module_directory)
TestModule = importlib.util.module_from_spec(spec)
sys.modules[f'{module}Test'] = TestModule
spec.loader.exec_module(TestModule)
return TestModule
class TimeProbe():
"""
Assess temporal performance.
"""
def __init__(self):
self.start()
def start(self):
"""
Start chronometer.
"""
# noinspection PyAttributeOutsideInit
self.__last_time = time.perf_counter()
# noinspection PyAttributeOutsideInit
self.__lap_counter = 0
# noinspection PyAttributeOutsideInit
self.__elapsed_time = 0
def lap(self) -> tuple[float, int, float]:
"""
Get lap info.
Returns:
last lap time (millisecond)
number of laps
total elapsed time (millisecond)
"""
lap_time = time.perf_counter() - self.__last_time
# noinspection PyAttributeOutsideInit
self.__last_time = time.perf_counter()
self.__lap_counter += 1
self.__elapsed_time += lap_time
return lap_time * 1e3, self.__lap_counter, self.__elapsed_time * 1e3
def end(self) -> tuple[float, int]:
"""
Stop chronometer
Returns:
elapsed time (millisecond)
"""
self.__elapsed_time += time.perf_counter() - self.__last_time
return self.__elapsed_time * 1e3, self.__lap_counter
def restart(self):
"""
Restart chronometer.
"""
self.start()
def tuple_to_string(t: tuple, separator: str = ", ") -> str:
"""Convert tuple elements into quoted strings separated by a separator string."""
return separator.join(f'\"{e}\"' for e in t)
def PrintCallStack(method):
"""Define a decorator to print call stack until the decorated method."""
def wrapper(self, *args, **kwargs):
"""Wrap method to print call stack before its call.
Parameters:
self:
args: method arguments.
kwargs: extra arguments.
"""
print(f'Call stack until method \'{method.__name__}\':', )
traceback.print_stack()
return method(self, *args, **kwargs)
return wrapper
class FileWriter(DataFeatures.PipelineStepObject):
"""Write data into a file line by line."""
# noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
# Init private attributes
self.__path = None
self.__separator = ','
self.__header = None
@property
def path(self) -> str:
"""File path where to write data."""
return self.__path
@path.setter
def path(self, path: str):
self.__path = pathlib.Path(path)
@property
def separator(self) -> str:
"""String used to separate elements during tuple to string conversion."""
return self.__separator
@separator.setter
def separator(self, separator: str):
self.__separator = separator
@property
def header(self) -> str|tuple:
"""String or tuple to write first."""
return self.__header
@header.setter
def header(self, header: str|tuple):
self.__header = header
@DataFeatures.PipelineStepEnter
def __enter__(self):
"""Check that folder structure exist and open file then, write header line."""
if not os.path.exists(self.__path.parent.absolute()):
os.makedirs(self.__path.parent.absolute())
# Open file
self.__file = open(self.__path, 'w', encoding='utf-8', buffering=1)
# Write header if required
if self.__header is not None:
# Format list or tuple element into quoted strings
if not isinstance(self.__header, str):
self.__header = tuple_to_string(self.__header, self.__separator)
print(self.__header, file=self.__file, flush=True)
@DataFeatures.PipelineStepExit
def __exit__(self, exception_type, exception_value, exception_traceback):
"""Close file."""
self.__file.close()
def write(self, data: str|tuple):
"""Write data as a new line into file.
!!! note
Tuple elements are converted into quoted strings separated by separator string.
"""
# Format list or tuple element into quoted strings
if not isinstance(data, str):
data = tuple_to_string(data, self.__separator)
# Write into file
print(data, file=self.__file, flush=True)
class VideoWriter(DataFeatures.PipelineStepObject, DataFeatures.SharedObject):
"""Open ffmpeg application as sub-process.
FFmpeg input PIPE: RAW images in BGR color format
FFmpeg output MP4 file encoded with HEVC codec.
Arguments list:
-y Overwrite output file without asking
-s {width}x{height} Input resolution width x height (1344x756)
-pixel_format bgr24 Input frame color format is BGR with 8 bits per color component
-f rawvideo Input format: raw video
-r {fps} Frame rate: fps
-i pipe: ffmpeg input is a PIPE
-vcodec libx265 Video codec: H.265 (HEVC)
-pix_fmt yuv420p Output video color space YUV420 (saving space compared to YUV444)
-crf 24 Constant quality encoding (lower value for higher quality and larger output file).
{output_filename} Output file name: output_filename (output.mp4)
"""
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
# Init parent classes
DataFeatures.SharedObject.__init__(self)
# Init private attributes
self.__path = None
self.__width = 320
self.__height = 240
self.__fps = 25
@property
def path(self) -> str:
"""File path where to write images."""
return self.__path
@path.setter
def path(self, path: str):
self.__path = pathlib.Path(path)
@property
def width(self) -> int:
"""Video horizontal resolution."""
return self.__width
@width.setter
def width(self, width: int):
self.__width = width
@property
def height(self) -> int:
"""Video vertical resolution."""
return self.__height
@height.setter
def height(self, height: int):
self.__height = height
@property
def fps(self) -> int:
"""frame per second."""
return self.__fps
@fps.setter
def fps(self, fps: int):
self.__fps = fps
@DataFeatures.PipelineStepEnter
def __enter__(self):
"""Check that folder structure exist then, open ffmpeg subprocess."""
# Use lock feature
with self._lock:
import subprocess as sp
import shlex
if not os.path.exists(self.__path.parent.absolute()):
os.makedirs(self.__path.parent.absolute())
self.__process = sp.Popen(shlex.split(f'ffmpeg -hide_banner -loglevel error -y -s {self.__width}x{self.__height} -pixel_format bgr24 -f rawvideo -r {self.__fps} -i pipe: -vcodec libx265 -x265-params log-level=error -pix_fmt yuv420p -crf 24 {self.__path}'), stdin=sp.PIPE)
@DataFeatures.PipelineStepExit
def __exit__(self, exception_type, exception_value, exception_traceback):
# Use lock feature
with self._lock:
# Close and flush stdin
self.__process.stdin.close()
# Wait for sub-process to finish
self.__process.wait()
# Terminate the sub-process
# Note: We don't have to terminate the sub-process (after process.wait(), the sub-process is supposed to be closed).
self.__process.terminate()
def write(self, image: numpy.array):
"""Write raw video frame to input stream of ffmpeg sub-process."""
# Use lock feature
with self._lock:
# Check if subprocess still alive
if self.__process.poll() is None:
# Resize image to adapt to video resolution
output = cv2.resize(image, dsize=(self.__width, self.__height), interpolation=cv2.INTER_LINEAR)
self.__process.stdin.write(output.tobytes())