1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
|
"""Miscellaneous class and functions used in utils script."""
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
__credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
import os
import pathlib
import time
import csv
import types
import traceback
from argaze import DataFeatures
import numpy
import cv2
def print_progress_bar (iteration:int, total:int, prefix:str = '', suffix:str = '', decimals:int = 1, length:int = 100, fill:str = '█', printEnd:str = "\r"):
"""
Print iterations progress.
Call in a loop to create terminal progress bar.
Parameters:
iteration: current iteration
total: total iterations
prefix: string to print before progress bar
suffix: string to print after progress bar
decimals: positive number of decimals in percent complete
length: character length of bar
fill: bar fill character
printEnd: end character (e.g. \\r, \\r\\n)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd)
# Print New Line on Complete
if iteration == total:
print()
def import_from_test_package(module: str) -> types.ModuleType:
"""
Import module from ArGaze test package.
Parameters:
module: module name <Module> into ArGaze package
Returns:
module named <Module>Test
"""
import argaze
import importlib.util
import sys
import os
source_directory = os.path.dirname(os.path.dirname(os.path.abspath(argaze.__file__)))
module_directory = os.path.join(source_directory, 'argaze.test', f'{module}.py')
spec = importlib.util.spec_from_file_location(f'{module}Test', module_directory)
TestModule = importlib.util.module_from_spec(spec)
sys.modules[f'{module}Test'] = TestModule
spec.loader.exec_module(TestModule)
return TestModule
class TimeProbe():
"""
Assess temporal performance.
"""
def __init__(self):
self.start()
def start(self):
"""
Start chronometer.
"""
# noinspection PyAttributeOutsideInit
self.__last_time = time.perf_counter()
# noinspection PyAttributeOutsideInit
self.__lap_counter = 0
# noinspection PyAttributeOutsideInit
self.__elapsed_time = 0
def lap(self) -> tuple[float, int, float]:
"""
Get lap info.
Returns:
last lap time (millisecond)
number of laps
total elapsed time (millisecond)
"""
lap_time = time.perf_counter() - self.__last_time
# noinspection PyAttributeOutsideInit
self.__last_time = time.perf_counter()
self.__lap_counter += 1
self.__elapsed_time += lap_time
return lap_time * 1e3, self.__lap_counter, self.__elapsed_time * 1e3
def end(self) -> tuple[float, int]:
"""
Stop chronometer
Returns:
elapsed time (millisecond)
"""
self.__elapsed_time += time.perf_counter() - self.__last_time
return self.__elapsed_time * 1e3, self.__lap_counter
def restart(self):
"""
Restart chronometer.
"""
self.start()
def tuple_to_string(t: tuple, separator: str = ", ") -> str:
"""Convert tuple elements into quoted strings separated by a separator string."""
return separator.join(f'\"{e}\"' for e in t)
def PrintCallStack(method):
"""Define a decorator to print call stack until the decorated method."""
def wrapper(self, *args, **kwargs):
"""Wrap method to print call stack before its call.
Parameters:
self:
args: method arguments.
kwargs: extra arguments.
"""
print(f'Call stack until method \'{method.__name__}\':', )
traceback.print_stack()
return method(self, *args, **kwargs)
return wrapper
class FileWriter(DataFeatures.PipelineStepObject):
"""Write data into a file line by line."""
# noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
# Init private attributes
self.__path = None
self.__separator = ','
self.__header = None
@property
def path(self) -> str:
"""File path where to write data."""
return self.__path
@path.setter
def path(self, path: str):
self.__path = pathlib.Path(path)
@property
def separator(self) -> str:
"""String used to separate elements during tuple to string conversion."""
return self.__separator
@separator.setter
def separator(self, separator: str):
self.__separator = separator
@property
def header(self) -> str|tuple:
"""String or tuple to write first."""
return self.__header
@header.setter
def header(self, header: str|tuple):
self.__header = header
@DataFeatures.PipelineStepEnter
def __enter__(self):
"""Check that folder structure exist and open file then, write header line."""
if not os.path.exists(self.__path.parent.absolute()):
os.makedirs(self.__path.parent.absolute())
# Open file
self.__file = open(self.__path, 'w', encoding='utf-8', buffering=1)
# Write header if required
if self.__header is not None:
# Format list or tuple element into quoted strings
if not isinstance(self.__header, str):
self.__header = tuple_to_string(self.__header, self.__separator)
print(self.__header, file=self.__file, flush=True)
@DataFeatures.PipelineStepExit
def __exit__(self, exception_type, exception_value, exception_traceback):
"""Close file."""
self.__file.close()
def write(self, data: str|tuple):
"""Write data as a new line into file.
!!! note
Tuple elements are converted into quoted strings separated by separator string.
"""
# Format list or tuple element into quoted strings
if not isinstance(data, str):
data = tuple_to_string(data, self.__separator)
# Write into file
print(data, file=self.__file, flush=True)
class FileReader(DataFeatures.PipelineStepObject):
"""Read data from a file line by line."""
# noinspection PyMissingConstructor
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
# Init private attributes
self.__path = None
self.__separator = ','
@property
def path(self) -> str:
"""File path where to read data."""
return self.__path
@path.setter
def path(self, path: str):
self.__path = pathlib.Path(path)
@property
def separator(self) -> str:
"""String used to separate elements during string to tuple conversion."""
return self.__separator
@separator.setter
def separator(self, separator: str):
self.__separator = separator
@DataFeatures.PipelineStepEnter
def __enter__(self):
# Open file
self.__file = csv.reader(open(self.__path), delimiter= self.__separator)
@DataFeatures.PipelineStepExit
def __exit__(self, exception_type, exception_value, exception_traceback):
pass
def read(self) -> str|tuple:
"""Read next data from file.
!!! note
Quoted strings separated by separator string are converted into tuple elements.
"""
try:
return next(self.__file)
except Exception:
raise EOFError
class VideoWriter(DataFeatures.PipelineStepObject, DataFeatures.SharedObject):
"""Open ffmpeg application as sub-process.
FFmpeg input PIPE: RAW images in BGR color format
FFmpeg output MP4 file encoded with HEVC codec.
Arguments list:
-y Overwrite output file without asking
-s {width}x{height} Input resolution width x height (1344x756)
-pixel_format bgr24 Input frame color format is BGR with 8 bits per color component
-f rawvideo Input format: raw video
-r {fps} Frame rate: fps
-i pipe: ffmpeg input is a PIPE
-vcodec libx265 Video codec: H.265 (HEVC)
-pix_fmt yuv420p Output video color space YUV420 (saving space compared to YUV444)
-crf 24 Constant quality encoding (lower value for higher quality and larger output file).
{output_filename} Output file name: output_filename (output.mp4)
"""
@DataFeatures.PipelineStepInit
def __init__(self, **kwargs):
# Init parent classes
DataFeatures.SharedObject.__init__(self)
# Init private attributes
self.__path = None
self.__width = 320
self.__height = 240
self.__fps = 25
@property
def path(self) -> str:
"""File path where to write images."""
return self.__path
@path.setter
def path(self, path: str):
self.__path = pathlib.Path(path)
@property
def width(self) -> int:
"""Video horizontal resolution."""
return self.__width
@width.setter
def width(self, width: int):
self.__width = width
@property
def height(self) -> int:
"""Video vertical resolution."""
return self.__height
@height.setter
def height(self, height: int):
self.__height = height
@property
def fps(self) -> int:
"""frame per second."""
return self.__fps
@fps.setter
def fps(self, fps: int):
self.__fps = fps
@DataFeatures.PipelineStepEnter
def __enter__(self):
"""Check that folder structure exist then, open ffmpeg subprocess."""
# Use lock feature
with self._lock:
import subprocess as sp
import shlex
if not os.path.exists(self.__path.parent.absolute()):
os.makedirs(self.__path.parent.absolute())
self.__process = sp.Popen(shlex.split(f'ffmpeg -hide_banner -loglevel error -y -s {self.__width}x{self.__height} -pixel_format bgr24 -f rawvideo -r {self.__fps} -i pipe: -vcodec libx265 -x265-params log-level=error -pix_fmt yuv420p -crf 24 {self.__path.as_posix()}'), stdin=sp.PIPE)
@DataFeatures.PipelineStepExit
def __exit__(self, exception_type, exception_value, exception_traceback):
# Use lock feature
with self._lock:
# Close and flush stdin
self.__process.stdin.close()
# Wait for sub-process to finish
self.__process.wait()
# Terminate the sub-process
# Note: We don't have to terminate the sub-process (after process.wait(), the sub-process is supposed to be closed).
self.__process.terminate()
def write(self, image: numpy.array):
"""Write raw video frame to input stream of ffmpeg sub-process."""
# Use lock feature
with self._lock:
# Check if subprocess still alive
if self.__process.poll() is None:
# Resize image to adapt to video resolution
output = cv2.resize(image, dsize=(self.__width, self.__height), interpolation=cv2.INTER_LINEAR)
self.__process.stdin.write(output.tobytes())
|