"""ArGaze module commands.""" """ This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "GPLv3" import argparse import logging import json import contextlib import time import os import stat from . import load from .DataFeatures import SharedObjectBusy from .ArFeatures import ArCamera, ArContext, PostProcessingContext, LiveProcessingContext from .utils.UtilsFeatures import print_progress_bar import cv2 def load_context(args): """Load and execute ArContext configuration.""" # Manage logging logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG if args.verbose else logging.INFO) # Manage pipe communication if args.pipe_path is not None: # Create FIFO if not os.path.exists(args.pipe_path): os.mkfifo(args.pipe_path) # Open the fifo in non-blocking mode or it will stalls until someone opens it for writting pipe_file = os.open(args.pipe_path, os.O_RDONLY | os.O_NONBLOCK) logging.info('%s pipe opened', args.pipe_path) def display(name, image, factor = 0.75, draw_help = False): """Adapt image to display dimension.""" height, width, _ = image.shape if draw_help: cv2.rectangle(image, (int(width/4), int(height/3)), (int(width*3/4), int(height*2/3)), (127, 127, 127), -1) cv2.rectangle(image, (int(width/4), int(height/3)), (int(width*3/4), int(height*2/3)), (255, 255, 255), 1) info_stack = 1 cv2.putText(image, f'(H)elp', (int(width/4)+20, int(height/3)+(info_stack*40)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) # Blanck line info_stack += 1 if issubclass(type(context), LiveProcessingContext): info_stack += 1 cv2.putText(image, f'Press Enter to start calibration', (int(width/4)+20, int(height/3)+(info_stack*40)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) info_stack += 1 cv2.putText(image, f'Press r to start/stop recording', (int(width/4)+20, int(height/3)+(info_stack*40)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if issubclass(type(context), PostProcessingContext): info_stack += 1 cv2.putText(image, f'Press Space bar to pause/resume processing', (int(width/4)+20, int(height/3)+(info_stack*40)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) info_stack += 1 cv2.putText(image, f'Press f to pause/resume visualisation', (int(width/4)+20, int(height/3)+(info_stack*40)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) info_stack += 1 cv2.putText(image, f'Press Escape to quit', (int(width/4)+20, int(height/3)+(info_stack*40)), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) else: cv2.putText(image, f'(H)elp', (width-105, height - 15), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA) if args.display is not None: display_size = tuple(args.display) image_ratio = width/height new_image_size = (int(display_size[1] * factor * image_ratio), int(display_size[1] * factor)) image = cv2.resize(image, dsize=new_image_size, interpolation=cv2.INTER_LINEAR) cv2.imshow(name, image) # Load context from JSON file with load(args.context_file) as context: if context.starting_error is not None: exit( RuntimeError(f'Context fails to start: {context.starting_error}') ) # Loaded object must be a subclass of ArContext if not issubclass(type(context), ArContext): exit( TypeError('Loaded object is not a subclass of ArContext') ) if args.verbose: print(context) if not args.no_window: # Create a window to display context cv2.namedWindow(context.name, cv2.WINDOW_AUTOSIZE) # Assess processing time start_time = time.time() # Draw parameters draw_pipeline = True draw_help = False # Waiting for 'ctrl+C' interruption with contextlib.suppress(KeyboardInterrupt), os.fdopen(pipe_file) if args.pipe_path is not None else contextlib.nullcontext() as pipe: # Visualization loop while context.is_running(): # Read message from pipe if required if args.pipe_path is not None: try: message = pipe.read().rstrip('\n') if message: logging.info('%s pipe received: %s', args.pipe_path, message) exec(message) except Exception as e: logging.error('%s', e) # Window mode on if not args.no_window: try: # Display context if the pipeline is available display(context.name, context.image(wait = False, draw_pipeline=draw_pipeline), 0.75, draw_help=draw_help) except SharedObjectBusy: pass # Head-mounted eye tracker case: display environment frames image if issubclass(type(context.pipeline), ArCamera): for scene_frame in context.pipeline.scene_frames(): try: # Display scene's frame if available display(scene_frame.name, scene_frame.image(wait = False), 0.5) except SharedObjectBusy: pass # Key interaction key_pressed = cv2.waitKey(40) #print("key_pressed", key_pressed) # f: disable/enable pipeline drawing if key_pressed == 102: draw_pipeline = not draw_pipeline # h: disable/enable help drawing if key_pressed == 104: draw_help = not draw_help # Esc: close window if key_pressed == 27: raise KeyboardInterrupt() # Keys specific to live processing contexts if issubclass(type(context), LiveProcessingContext): # Enter: start calibration if key_pressed == 13: context.calibrate() # r: start/stop recording if key_pressed == 114: # FIXME: the following commands only work with TobiiGlassesPro2.LiveStream context. recording_status = context.get_recording_status() if recording_status == 'recording': context.stop_recording() else: context.create_recording() context.start_recording() # Keys specific to post processing contexts if issubclass(type(context), PostProcessingContext): # Space bar: pause/resume pipeline processing if key_pressed == 32: if context.is_paused(): context.resume() else: context.pause() # Select previous image with left arrow if key_pressed == 2: context.previous() # Select next image with right arrow if key_pressed == 3: context.next() # Window mode off else: if issubclass(type(context), PostProcessingContext): prefix = f'Progression' suffix = f'| {int(context.progression*context.duration * 1e-3)}s in {int(time.time()-start_time)}s' look_time, look_freq = context.pipeline.execution_info('look') suffix += f' | Look {look_time:.2f}ms at {look_freq}Hz' if issubclass(type(context.pipeline), ArCamera): watch_time, watch_freq = context.pipeline.execution_info('watch') suffix += f' | Watch {int(watch_time)}ms at {watch_freq}Hz' # Clear old longer print suffix += ' ' print_progress_bar(context.progression, 1., prefix = prefix, suffix = suffix, length = 50) # Wait one second time.sleep(1) # Stop frame display cv2.destroyAllWindows() # Manage pipe communication if args.pipe_path is not None: # Remove pipe if os.path.exists(args.pipe_path): os.remove(args.pipe_path) logging.info('%s pipe closed', args.pipe_path) def edit_file(args): """ Edit a JSON file according a JSON changes file into a JSON output file. """ # Open JSON files with open(args.file) as file, open(args.changes) as changes, open(args.output, 'w', encoding='utf-8') as output: import collections.abc import json # Load unique object file_data = json.load(file) changes_data = json.load(changes) def update(d, u): for k, v in u.items(): if isinstance(v, collections.abc.Mapping): d[k] = update(d.get(k, {}), v) elif v is None: del d[k] else: d[k] = v return d new_data = update(file_data, changes_data) # Write new data json.dump(new_data, output, ensure_ascii=False, indent=' ') # Manage arguments parser = argparse.ArgumentParser(description=__doc__.split('-')[0]) subparsers = parser.add_subparsers(help='sub-command help') parser_load = subparsers.add_parser('load', help=load_context.__doc__) parser_load.add_argument('context_file', metavar='CONTEXT_FILE', type=str, help='JSON context filepath') parser_load.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') parser_load.add_argument('-p', '--pipe_path', metavar='PIPE_PATH', type=str, default=None, help='enable pipe communication to execute external commands') parser_load.add_argument('-x', '--display', metavar='DISPLAY', nargs="+", type=int, default=None, help='adapt windows to display dimension') parser_load.add_argument('--no-window', action='store_true', default=False, help='disable window mode') parser_load.set_defaults(func=load_context) parser_patch = subparsers.add_parser('edit', help=edit_file.__doc__) parser_patch.add_argument('file', metavar='FILE', type=str, default=None, help='json file path') parser_patch.add_argument('changes', metavar='CHANGES', type=str, default=None, help='json changes path') parser_patch.add_argument('output', metavar='OUTPUT', type=str, default=None, help='json output path') parser_patch.set_defaults(func=edit_file) args = parser.parse_args() args.func(args)