"""Load and execute ArContext configuration.""" """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import argparse import logging import json import contextlib import time import os import stat from . import load from .ArFeatures import ArCamera, ArContext, PostProcessingContext, LiveProcessingContext from .utils.UtilsFeatures import print_progress_bar import cv2 # Manage arguments parser = argparse.ArgumentParser(description=__doc__.split('-')[0]) parser.add_argument('context_file', metavar='CONTEXT_FILE', type=str, help='JSON context filepath') parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') parser.add_argument('-p', '--pipe_path', metavar='PIPE_PATH', type=str, default=None, help='enable pipe communication to execute external commands') parser.add_argument('-x', '--display', metavar='DISPLAY', nargs="+", type=int, default=[1920, 1080], help='adapt windows to display dimension') parser.add_argument('--no-window', action='store_true', default=False, help='disable window mode') args = parser.parse_args() # Manage logging logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG if args.verbose else logging.INFO) # Manage pipe communication if args.pipe_path is not None: # Create FIFO if not os.path.exists(args.pipe_path): os.mkfifo(args.pipe_path) # Open the fifo in non-blocking mode or it will stalls until someone opens it for writting pipe_file = os.open(args.pipe_path, os.O_RDONLY | os.O_NONBLOCK) logging.info('%s pipe opened', args.pipe_path) def display(name, image, factor): """Adapt image to display dimension.""" display_size = tuple(args.display) height, width, _ = image.shape image_ratio = width/height new_image_size = (int(display_size[1] * factor * image_ratio), int(display_size[1] * factor)) cv2.imshow(name, cv2.resize(image, dsize=new_image_size, interpolation=cv2.INTER_LINEAR)) # Load context from JSON file with load(args.context_file) as context: # Loaded object must be a subclass of ArContext if not issubclass(type(context), ArContext): raise TypeError('Loaded object is not a subclass of ArContext') if args.verbose: print(context) if not args.no_window: # Create a window to display context cv2.namedWindow(context.name, cv2.WINDOW_AUTOSIZE) # Assess processing time start_time = time.time() # Waiting for 'ctrl+C' interruption with contextlib.suppress(KeyboardInterrupt), os.fdopen(pipe_file) if args.pipe_path is not None else contextlib.nullcontext() as pipe: # Visualization loop while context.is_running(): # Read message from pipe if required if args.pipe_path is not None: try: message = pipe.read().rstrip('\n') if message: logging.info('%s pipe received: %s', args.pipe_path, message) exec(message) except Exception as e: logging.error('%s', e) # Window mode on if not args.no_window: # Display context display(context.name, context.image(), 0.75) # Head-mounted eye tracker case: display environment frames image if issubclass(type(context.pipeline), ArCamera): for scene_frame in context.pipeline.scene_frames(): display(scene_frame.name, scene_frame.image(), 0.5) # Key interaction key_pressed = cv2.waitKey(40) # Esc: close window if key_pressed == 27: raise KeyboardInterrupt() # Space bar: pause/resume pipeline processing if key_pressed == 32: if context.is_paused(): context.resume() else: context.pause() # Enter: start calibration if key_pressed == 13: if issubclass(type(context), LiveProcessingContext): context.calibrate() # Window mode off else: if issubclass(type(context), PostProcessingContext): prefix = f'Progression' suffix = f'| {int(context.progression*context.duration * 1e-3)}s in {int(time.time()-start_time)}s' look_time, look_freq = context.process_gaze_position_performance() suffix += f' | Look {look_time:.2f}ms at {look_freq}Hz' if issubclass(type(context.pipeline), ArCamera): watch_time, watch_freq = context.process_camera_image_performance() suffix += f' | Watch {int(watch_time)}ms at {watch_freq}Hz' # Clear old longer print suffix += ' ' print_progress_bar(context.progression, 1., prefix = prefix, suffix = suffix, length = 50) # Wait one second time.sleep(1) # Stop frame display cv2.destroyAllWindows() # Manage pipe communication if args.pipe_path is not None: # Remove pipe if os.path.exists(args.pipe_path): os.remove(args.pipe_path) logging.info('%s pipe closed', args.pipe_path)