aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/__main__.py
blob: 80f12a35ec63fb348c455d1ad3be679b762389b3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""Load and execute ArContext configuration."""

"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <https://www.gnu.org/licenses/>.
"""

__author__ = "Théo de la Hogue"
__credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"

import argparse
import logging
import json
import contextlib
import time
import os
import stat

from . import load
from .ArFeatures import ArCamera, ArContext, PostProcessingContext, LiveProcessingContext
from .utils.UtilsFeatures import print_progress_bar

import cv2

# Manage arguments
parser = argparse.ArgumentParser(description=__doc__.split('-')[0])
parser.add_argument('context_file', metavar='CONTEXT_FILE', type=str, help='JSON context filepath')
parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console')
parser.add_argument('-p', '--pipe_path', metavar='PIPE_PATH', type=str, default=None, help='enable pipe communication to execute external commands')
parser.add_argument('-x', '--display', metavar='DISPLAY', nargs="+", type=int, default=[1920, 1080], help='adapt windows to display dimension')
parser.add_argument('--no-window', action='store_true', default=False, help='disable window mode')

args = parser.parse_args()

# Manage logging
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG if args.verbose else logging.INFO)

# Manage pipe communication
if args.pipe_path is not None:

	# Create FIFO
	if not os.path.exists(args.pipe_path):

		os.mkfifo(args.pipe_path)

	# Open the fifo in non-blocking mode or it will stalls until someone opens it for writting
	pipe_file = os.open(args.pipe_path, os.O_RDONLY | os.O_NONBLOCK)

	logging.info('%s pipe opened', args.pipe_path)

def display(name, image, factor):
	"""Adapt image to display dimension."""

	display_size = tuple(args.display)
	height, width, _ = image.shape
	image_ratio = width/height

	new_image_size = (int(display_size[1] * factor * image_ratio), int(display_size[1] * factor))

	cv2.imshow(name, cv2.resize(image, dsize=new_image_size, interpolation=cv2.INTER_LINEAR))

# Load context from JSON file
with load(args.context_file) as context:

	# Loaded object must be a subclass of ArContext
	if not issubclass(type(context), ArContext):

		raise TypeError('Loaded object is not a subclass of ArContext')

	if args.verbose:

		print(context)

	if not args.no_window:

		# Create a window to display context
		cv2.namedWindow(context.name, cv2.WINDOW_AUTOSIZE)

	# Assess processing time
	start_time = time.time()

	# Waiting for 'ctrl+C' interruption
	with contextlib.suppress(KeyboardInterrupt), os.fdopen(pipe_file) if args.pipe_path is not None else contextlib.nullcontext() as pipe:

		# Visualization loop
		while context.is_running():

			# Read message from pipe if required
			if args.pipe_path is not None:

				try:

					message = pipe.read().rstrip('\n')

					if message:

						logging.info('%s pipe received: %s', args.pipe_path, message)

						exec(message)

				except Exception as e:

					logging.error('%s', e)

			# Window mode on
			if not args.no_window:

				# Display context
				display(context.name, context.image(), 0.75)

				# Head-mounted eye tracker case: display environment frames image
				if issubclass(type(context.pipeline), ArCamera):

					for scene_frame in context.pipeline.scene_frames():

						display(scene_frame.name, scene_frame.image(), 0.5)

				# Key interaction
				key_pressed = cv2.waitKey(40)

				# Esc: close window
				if key_pressed == 27:

					raise KeyboardInterrupt()

				# Space bar: pause/resume pipeline processing
				if key_pressed == 32:

					if context.is_paused():

						context.resume()

					else:

						context.pause()

				# Enter: start calibration
				if key_pressed == 13:

					if issubclass(type(context), LiveProcessingContext):

						context.calibrate()

			# Window mode off
			else:

				if issubclass(type(context), PostProcessingContext):

					prefix = f'Progression'
					suffix = f'| {int(context.progression*context.duration * 1e-3)}s in {int(time.time()-start_time)}s'

					look_time, look_freq = context.process_gaze_position_performance()
					suffix += f' | Look {look_time:.2f}ms at {look_freq}Hz'

					if issubclass(type(context.pipeline), ArCamera):

						watch_time, watch_freq = context.process_camera_image_performance()
						suffix += f' | Watch {int(watch_time)}ms at {watch_freq}Hz'

					# Clear old longer print
					suffix += '   '

					print_progress_bar(context.progression, 1., prefix = prefix, suffix = suffix, length = 50)

				# Wait one second
				time.sleep(1)

	# Stop frame display
	cv2.destroyAllWindows()

	# Manage pipe communication
	if args.pipe_path is not None:

		# Remove pipe
		if os.path.exists(args.pipe_path):

			os.remove(args.pipe_path)

			logging.info('%s pipe closed', args.pipe_path)