"""Load and execute ArContext configuration.""" """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import argparse import logging import json import contextlib import os from . import load from .ArFeatures import ArCamera, ArContext import cv2 # Manage arguments parser = argparse.ArgumentParser(description=__doc__.split('-')[0]) parser.add_argument('context_file', metavar='CONTEXT_FILE', type=str, help='JSON context filepath') parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') parser.add_argument('-p', '--pipe_path', metavar='PIPE_PATH', type=str, default=None, help='enable pipe communication to execute external commands') args = parser.parse_args() # Manage logging logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG if args.verbose else logging.INFO) # Manage pipe communication if args.pipe_path is not None: if not os.path.exists(args.pipe_path): os.mkfifo(args.pipe_path) # Open the fifo in non-blocking mode or it will stalls until someone opens it for writting pipe_file = os.open(args.pipe_path, os.O_RDONLY | os.O_NONBLOCK) logging.info('%s pipe opened', args.pipe_path) # Load context from JSON file with load(args.context_file) as context: # Loaded object must be a subclass of ArContext if not issubclass(type(context), ArContext): raise TypeError('Loaded object is not a subclass of ArContext') if args.verbose: print(context) # Create a window to display context cv2.namedWindow(context.name, cv2.WINDOW_AUTOSIZE) # Waiting for 'ctrl+C' interruption with contextlib.suppress(KeyboardInterrupt), os.fdopen(pipe_file) if args.pipe_path is not None else contextlib.nullcontext() as pipe: # Visualization loop while context.is_running(): # Read message from pipe if required if args.pipe_path is not None: message = pipe.read().rstrip('\n') if message: logging.info('%s: %s', args.pipe_path, message) try: exec(message) except Exception as e: logging.error('%s', e) # Display context cv2.imshow(context.name, context.image()) # Head-mounted eye tracker case: display environment frames image if issubclass(type(context.pipeline), ArCamera): for scene_frame in context.pipeline.scene_frames(): cv2.imshow(scene_frame.name, scene_frame.image()) # Key interaction key_pressed = cv2.waitKey(10) # Esc: close window if key_pressed == 27: raise KeyboardInterrupt() # Space bar: pause/resume pipeline processing if key_pressed == 32: try: if context.is_paused(): context.resume() else: context.pause() except NotImplementedError: pass # Stop frame display cv2.destroyAllWindows()