From 72af13343bba4b7ccda40bdc4f100f470fd33d99 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 17 Apr 2024 10:18:34 +0200 Subject: Adding pipe communication with ArGaze main script. --- src/argaze/__main__.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/argaze/__main__.py b/src/argaze/__main__.py index 96fda23..127baa8 100644 --- a/src/argaze/__main__.py +++ b/src/argaze/__main__.py @@ -20,6 +20,7 @@ import argparse import logging import json import contextlib +import os from . import load from .ArFeatures import ArCamera, ArContext @@ -30,12 +31,25 @@ import cv2 parser = argparse.ArgumentParser(description=__doc__.split('-')[0]) parser.add_argument('context_file', metavar='CONTEXT_FILE', type=str, help='JSON context filepath') parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console') +parser.add_argument('-p', '--pipe_path', metavar='PIPE_PATH', type=str, default=None, help='enable pipe communication at given path to execute external commands') args = parser.parse_args() # Manage logging logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG if args.verbose else logging.INFO) +# Manage pipe communication +if args.pipe_path is not None: + + if not os.path.exists(args.pipe_path): + + os.mkfifo(args.pipe_path) + + # Open the fifo in non-blocking mode or it will stalls until someone opens it for writting + pipe_file = os.open(args.pipe_path, os.O_RDONLY | os.O_NONBLOCK) + + logging.info('%s pipe opened', args.pipe_path) + # Load context from JSON file with load(args.context_file) as context: @@ -52,11 +66,28 @@ with load(args.context_file) as context: cv2.namedWindow(context.name, cv2.WINDOW_AUTOSIZE) # Waiting for 'ctrl+C' interruption - with contextlib.suppress(KeyboardInterrupt): + with contextlib.suppress(KeyboardInterrupt), os.fdopen(pipe_file) as pipe: # Visualization loop while True: + # Read message from pipe if required + if args.pipe_path is not None: + + message = pipe.read().rstrip('\n') + + if message: + + logging.info('%s: %s', args.pipe_path, message) + + try: + + exec(message) + + except Exception as e: + + logging.error('%s', e) + # Display context cv2.imshow(context.name, context.image()) -- cgit v1.1