aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/__main__.py
blob: 5c16f39949e8fd45552bd06f48d21b38fcbecc75 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""Load and execute ArContext configuration."""

"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <https://www.gnu.org/licenses/>.
"""

__author__ = "Théo de la Hogue"
__credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"

import argparse
import logging
import json
import contextlib
import os

from . import load
from .ArFeatures import ArCamera, ArContext

import cv2

# Manage arguments
parser = argparse.ArgumentParser(description=__doc__.split('-')[0])
parser.add_argument('context_file', metavar='CONTEXT_FILE', type=str, help='JSON context filepath')
parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console')
parser.add_argument('-p', '--pipe_path', metavar='PIPE_PATH', type=str, default=None, help='enable pipe communication to execute external commands')

args = parser.parse_args()

# Manage logging
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG if args.verbose else logging.INFO)

# Manage pipe communication
if args.pipe_path is not None:

	if not os.path.exists(args.pipe_path):

	    os.mkfifo(args.pipe_path)

	# Open the fifo in non-blocking mode or it will stalls until someone opens it for writting
	pipe_file = os.open(args.pipe_path, os.O_RDONLY | os.O_NONBLOCK)

	logging.info('%s pipe opened', args.pipe_path)

# Load context from JSON file
with load(args.context_file) as context:

	# Loaded object must be a subclass of ArContext
	if not issubclass(type(context), ArContext):

		raise TypeError('Loaded object is not a subclass of ArContext')

	if args.verbose:

		print(context)

	# Create a window to display context
	cv2.namedWindow(context.name, cv2.WINDOW_AUTOSIZE)

	# Waiting for 'ctrl+C' interruption
	with contextlib.suppress(KeyboardInterrupt), os.fdopen(pipe_file) if args.pipe_path is not None else contextlib.nullcontext() as pipe:

		# Visualization loop
		while context.is_running():

			# Read message from pipe if required
			if args.pipe_path is not None:

				message = pipe.read().rstrip('\n')

				if message:

					logging.info('%s: %s', args.pipe_path, message)

					try:

						exec(message)

					except Exception as e:

						logging.error('%s', e)

			# Display context
			cv2.imshow(context.name, context.image())

			# Head-mounted eye tracker case: display environment frames image
			if issubclass(type(context.pipeline), ArCamera):

				for scene_frame in context.pipeline.scene_frames():

					cv2.imshow(scene_frame.name, scene_frame.image())

			# Key interaction
			key_pressed = cv2.waitKey(10)

			# Esc: close window
			if key_pressed == 27:

				raise KeyboardInterrupt()

			# Space bar: pause/resume pipeline processing
			if key_pressed == 32:

				try:

					if context.is_paused():

						context.resume()

					else:

						context.pause()

				except NotImplementedError:

					pass

	# Stop frame display
	cv2.destroyAllWindows()