aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/utils/UtilsFeatures.py
blob: 695f3727ded018c011f90194a8ecd0899599cbcc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
"""Miscellaneous class and functions used in utils script."""

"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <https://www.gnu.org/licenses/>.
"""

__author__ = "Théo de la Hogue"
__credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"

import os
import pathlib
import time
import csv
import types
import traceback

from argaze import DataFeatures

import numpy
import cv2

def print_progress_bar (iteration:int, total:int, prefix:str = '', suffix:str = '', decimals:int = 1, length:int = 100, fill:str = '█', printEnd:str = "\r"):
	"""
	Print iterations progress.  
	Call in a loop to create terminal progress bar.  

	Parameters:
		iteration: current iteration  
		total: total iterations  
		prefix: string to print before progress bar
		suffix: string to print after progress bar
		decimals: positive number of decimals in percent complete  
		length: character length of bar  
		fill: bar fill character  
		printEnd: end character (e.g. \\r, \\r\\n)
	"""
	percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
	filledLength = int(length * iteration // total)
	bar = fill * filledLength + '-' * (length - filledLength)
	print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd)

	# Print New Line on Complete
	if iteration == total: 
		print()

def import_from_test_package(module: str) -> types.ModuleType:
	"""
	Import module from ArGaze test package.

	Parameters:
		module: module name <Module> into ArGaze package

	Returns:
		module named <Module>Test
	"""

	import argaze
	import importlib.util
	import sys
	import os

	source_directory = os.path.dirname(os.path.dirname(os.path.abspath(argaze.__file__)))
	module_directory = os.path.join(source_directory, 'argaze.test', f'{module}.py')

	spec = importlib.util.spec_from_file_location(f'{module}Test', module_directory)
	TestModule = importlib.util.module_from_spec(spec)
	sys.modules[f'{module}Test'] = TestModule
	spec.loader.exec_module(TestModule)

	return TestModule

class TimeProbe():
	"""
	Assess temporal performance.
	"""

	def __init__(self):

		self.start()

	def start(self):
		"""
		Start chronometer.
		"""

		# noinspection PyAttributeOutsideInit
		self.__last_time = time.perf_counter()
		# noinspection PyAttributeOutsideInit
		self.__lap_counter = 0
		# noinspection PyAttributeOutsideInit
		self.__elapsed_time = 0

	def lap(self) -> tuple[float, int, float]:
		"""
		Get lap info.

		Returns:
			last lap time (millisecond)
			number of laps
			total elapsed time (millisecond)
		"""

		lap_time = time.perf_counter() - self.__last_time

		# noinspection PyAttributeOutsideInit
		self.__last_time = time.perf_counter()
		self.__lap_counter += 1
		self.__elapsed_time += lap_time

		return lap_time * 1e3, self.__lap_counter, self.__elapsed_time * 1e3

	def end(self) -> tuple[float, int]:
		"""
		Stop chronometer

		Returns:
			elapsed time (millisecond)
		"""

		self.__elapsed_time += time.perf_counter() - self.__last_time

		return self.__elapsed_time * 1e3, self.__lap_counter

	def restart(self):
		"""
		Restart chronometer.
		"""
		
		self.start()

def tuple_to_string(t: tuple, separator: str = ", ") -> str:
	"""Convert tuple elements into quoted strings separated by a separator string."""

	return separator.join(f'\"{e}\"' for e in t)

def PrintCallStack(method):
	"""Define a decorator to print call stack until the decorated method."""

	def wrapper(self, *args, **kwargs):
		"""Wrap method to print call stack before its call.

		Parameters:
			self:
			args: method arguments.
			kwargs: extra arguments.
		"""
		print(f'Call stack until method \'{method.__name__}\':', )
		
		traceback.print_stack()

		return method(self, *args, **kwargs)

	return wrapper

class FileWriter(DataFeatures.PipelineStepObject):
	"""Write data into a file line by line."""

	# noinspection PyMissingConstructor
	@DataFeatures.PipelineStepInit
	def __init__(self, **kwargs):
		
		# Init private attributes
		self.__path = None
		self.__separator = ','
		self.__header = None

	@property
	def path(self) -> str:
		"""File path where to write data."""
		return self.__path

	@path.setter
	def path(self, path: str):

		self.__path = pathlib.Path(path)

	@property
	def separator(self) -> str:
		"""String used to separate elements during tuple to string conversion."""
		return self.__separator

	@separator.setter
	def separator(self, separator: str):

		self.__separator = separator

	@property
	def header(self) -> str|tuple:
		"""String or tuple to write first."""
		return self.__header

	@header.setter
	def header(self, header: str|tuple):

		self.__header = header

	@DataFeatures.PipelineStepEnter
	def __enter__(self):
		"""Check that folder structure exist and open file then, write header line."""

		if not os.path.exists(self.__path.parent.absolute()):

			os.makedirs(self.__path.parent.absolute())

		# Open file
		self.__file = open(self.__path, 'w', encoding='utf-8', buffering=1)

		# Write header if required
		if self.__header is not None:

			# Format list or tuple element into quoted strings
			if not isinstance(self.__header, str):

				self.__header = tuple_to_string(self.__header, self.__separator)

			print(self.__header, file=self.__file, flush=True)

	@DataFeatures.PipelineStepExit
	def __exit__(self, exception_type, exception_value, exception_traceback):
		"""Close file."""
		self.__file.close()

	def write(self, data: str|tuple):
		"""Write data as a new line into file. 

		!!! note
			Tuple elements are converted into quoted strings separated by separator string.
		"""

		# Format list or tuple element into quoted strings
		if not isinstance(data, str):

			data = tuple_to_string(data, self.__separator)

		# Write into file
		print(data, file=self.__file, flush=True)

class FileReader(DataFeatures.PipelineStepObject):
	"""Read data from a file line by line."""

	# noinspection PyMissingConstructor
	@DataFeatures.PipelineStepInit
	def __init__(self, **kwargs):
		
		# Init private attributes
		self.__path = None
		self.__separator = ','

	@property
	def path(self) -> str:
		"""File path where to read data."""
		return self.__path

	@path.setter
	def path(self, path: str):

		self.__path = pathlib.Path(path)

	@property
	def separator(self) -> str:
		"""String used to separate elements during string to tuple conversion."""
		return self.__separator

	@separator.setter
	def separator(self, separator: str):

		self.__separator = separator

	@DataFeatures.PipelineStepEnter
	def __enter__(self):

		# Open file
		self.__file = csv.reader(open(self.__path), delimiter= self.__separator)

	@DataFeatures.PipelineStepExit
	def __exit__(self, exception_type, exception_value, exception_traceback):
		
		pass

	def read(self) -> str|tuple:
		"""Read next data from file. 

		!!! note
			Quoted strings separated by separator string are converted into tuple elements.
		"""

		try:

			return next(self.__file)

		except Exception:

			raise EOFError

class VideoWriter(DataFeatures.PipelineStepObject, DataFeatures.SharedObject):
	"""Open ffmpeg application as sub-process.
		FFmpeg input PIPE: RAW images in BGR color format
		FFmpeg output MP4 file encoded with HEVC codec.

		Arguments list:
			-y				   Overwrite output file without asking
			-s {width}x{height}  Input resolution width x height (1344x756)
			-pixel_format bgr24  Input frame color format is BGR with 8 bits per color component
			-f rawvideo		  Input format: raw video
			-r {fps}			 Frame rate: fps
			-i pipe:			 ffmpeg input is a PIPE
			-vcodec libx265	  Video codec: H.265 (HEVC)
			-pix_fmt yuv420p	 Output video color space YUV420 (saving space compared to YUV444)
			-crf 24			  Constant quality encoding (lower value for higher quality and larger output file).
			{output_filename}	Output file name: output_filename (output.mp4)
	"""

	@DataFeatures.PipelineStepInit
	def __init__(self, **kwargs):

		# Init parent classes
		DataFeatures.SharedObject.__init__(self)
		
		# Init private attributes
		self.__path = None
		self.__width = 320
		self.__height = 240
		self.__fps = 25

	@property
	def path(self) -> str:
		"""File path where to write images."""
		return self.__path

	@path.setter
	def path(self, path: str):

		self.__path = pathlib.Path(path)

	@property
	def width(self) -> int:
		"""Video horizontal resolution."""
		return self.__width

	@width.setter
	def width(self, width: int):

		self.__width = width

	@property
	def height(self) -> int:
		"""Video vertical resolution."""
		return self.__height

	@height.setter
	def height(self, height: int):

		self.__height = height

	@property
	def fps(self) -> int:
		"""frame per second."""
		return self.__fps
	
	@fps.setter
	def fps(self, fps: int):

		self.__fps = fps

	@DataFeatures.PipelineStepEnter
	def __enter__(self):
		"""Check that folder structure exist then, open ffmpeg subprocess."""

		# Use lock feature
		with self._lock:
		
			import subprocess as sp
			import shlex

			if not os.path.exists(self.__path.parent.absolute()):
				
				os.makedirs(self.__path.parent.absolute())

			self.__process = sp.Popen(shlex.split(f'ffmpeg -hide_banner -loglevel error -y -s {self.__width}x{self.__height} -pixel_format bgr24 -f rawvideo -r {self.__fps} -i pipe: -vcodec libx265 -x265-params log-level=error -pix_fmt yuv420p -crf 24 {self.__path.as_posix()}'), stdin=sp.PIPE)
		
	@DataFeatures.PipelineStepExit
	def __exit__(self, exception_type, exception_value, exception_traceback):

		# Use lock feature
		with self._lock:

			# Close and flush stdin
			self.__process.stdin.close()

			# Wait for sub-process to finish
			self.__process.wait()

			# Terminate the sub-process
			# Note: We don't have to terminate the sub-process (after process.wait(), the sub-process is supposed to be closed).
			self.__process.terminate() 

	def write(self, image: numpy.array):
		"""Write raw video frame to input stream of ffmpeg sub-process."""

		# Use lock feature
		with self._lock:

			# Check if subprocess still alive
			if self.__process.poll() is None:

				# Resize image to adapt to video resolution
				output = cv2.resize(image, dsize=(self.__width, self.__height), interpolation=cv2.INTER_LINEAR)

				self.__process.stdin.write(output.tobytes())