diff options
Diffstat (limited to 'src/argaze/utils/tobii_segment_arscene_export.py')
-rw-r--r-- | src/argaze/utils/tobii_segment_arscene_export.py | 306 |
1 files changed, 0 insertions, 306 deletions
diff --git a/src/argaze/utils/tobii_segment_arscene_export.py b/src/argaze/utils/tobii_segment_arscene_export.py deleted file mode 100644 index b2cc0e0..0000000 --- a/src/argaze/utils/tobii_segment_arscene_export.py +++ /dev/null @@ -1,306 +0,0 @@ -#!/usr/bin/env python - -import argparse -import os, json -import math -import threading - -from argaze import * -from argaze.TobiiGlassesPro2 import * -from argaze.ArUcoMarkers import * -from argaze.AreaOfInterest import * -from argaze.utils import MiscFeatures - -import cv2 as cv -import numpy - -def main(): - """ - Detect ArUcoScene into Tobii Glasses Pro 2 camera video record. - """ - - # Manage arguments - parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0]) - parser.add_argument('-s', '--segment_path', metavar='SEGMENT_PATH', type=str, default=None, help='segment path') - parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)') - parser.add_argument('-p', '--env_path', metavar='ENVIRONMENT_PATH', type=str, default=None, help='json argaze environment filepath') - parser.add_argument('-b', '--borders', metavar='BORDERS', type=float, default=16.666, help='define left and right borders mask (%) to not detect aruco out of these borders') - parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)') - parser.add_argument('-d', '--debug', metavar='DEBUG', type=bool, default=False, help='Enable visualisation and console outputs') - args = parser.parse_args() - - if args.segment_path != None: - - # Manage destination path - destination_path = '.' - if args.output != None: - - if not os.path.exists(os.path.dirname(args.output)): - - os.makedirs(os.path.dirname(args.output)) - print(f'{os.path.dirname(args.output)} folder created') - - destination_path = args.output - - else: - - destination_path = args.segment_path - - # Export into a dedicated time range folder - if args.time_range[1] != None: - timerange_path = f'[{int(args.time_range[0])}s - {int(args.time_range[1])}s]' - else: - timerange_path = f'[all]' - - destination_path = f'{destination_path}/{timerange_path}' - - if not os.path.exists(destination_path): - - os.makedirs(destination_path) - print(f'{destination_path} folder created') - - aoi_json_filepath = f'{destination_path}/aoi.json' - aoi_csv_filepath = f'{destination_path}/aoi.csv' - aoi_mp4_filepath = f'{destination_path}/aoi.mp4' - - # Load a tobii segment - tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None) - - # Load a tobii segment video - tobii_segment_video = tobii_segment.load_video() - print(f'\nVideo properties:\n\tduration: {tobii_segment_video.duration/1e6} s\n\twidth: {tobii_segment_video.width} px\n\theight: {tobii_segment_video.height} px') - - # Load a tobii segment data - tobii_segment_data = tobii_segment.load_data() - - print(f'\nLoaded data count:') - for name in tobii_segment_data.keys(): - print(f'\t{name}: {len(tobii_segment_data[name])} data') - - # Access to video timestamp data buffer - tobii_ts_vts = tobii_segment_data['VideoTimeStamp'] - - # Access to timestamped gaze position data buffer - tobii_ts_gaze_positions = tobii_segment_data['GazePosition'] - - # Format tobii gaze position in pixel - ts_gaze_positions = GazeFeatures.TimeStampedGazePositions() - - # Initialise progress bar - MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazePositions projection:', suffix = 'Complete', length = 100) - - for ts, tobii_gaze_position in tobii_ts_gaze_positions.items(): - - # Update Progress Bar - progress = ts - int(args.time_range[0] * 1e6) - MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'GazePositions projection:', suffix = 'Complete', length = 100) - - # Test gaze position validity - if tobii_gaze_position.validity == 0: - - gaze_position_px = (int(tobii_gaze_position.value[0] * tobii_segment_video.width), int(tobii_gaze_position.value[1] * tobii_segment_video.height)) - ts_gaze_positions[ts] = GazeFeatures.GazePosition(gaze_position_px) - - print('\n') - - if args.debug: - - # Prepare video exportation at the same format than segment video - output_video = TobiiVideo.TobiiVideoOutput(aoi_mp4_filepath, tobii_segment_video.stream) - - # Load ArEnvironment - ar_env = ArFeatures.ArEnvironment.from_json(args.env_path) - - if args.debug: - print(ar_env) - - # Work with first scene only - _, ar_scene = next(iter(ar_env.items())) - - # Create timestamped buffer to store AOIs and primary time stamp offset - ts_offset_aois = DataStructures.TimeStampedBuffer() - - # Video and data replay loop - try: - - # Initialise progress bar - MiscFeatures.printProgressBar(0, tobii_segment_video.duration/1e3, prefix = 'ArUco detection & AOI projection:', suffix = 'Complete', length = 100) - - # Iterate on video frames - for video_ts, video_frame in tobii_segment_video.frames(): - - # This video frame is the reference until the next frame - # Here next frame is at + 40ms (25 fps) - # TODO: Get video fps to adapt - next_video_ts = video_ts + 40000 - - # Copy video frame to edit visualisation on it without disrupting aruco detection - visu_frame = video_frame.copy() - - # Prepare to store projected AOI - projected_aois = {} - - # Process video and data frame - try: - - # Get nearest video timestamp - _, nearest_vts = tobii_ts_vts.get_last_before(video_ts) - - projected_aois['offset'] = nearest_vts.offset - - # Hide frame left and right borders before detection to ignore markers outside focus area - cv.rectangle(video_frame.matrix, (0, 0), (int(video_frame.width*args.borders/100), int(video_frame.height)), (0, 0, 0), -1) - cv.rectangle(video_frame.matrix, (int(video_frame.width*(1 - args.borders/100)), 0), (int(video_frame.width), int(video_frame.height)), (0, 0, 0), -1) - - # Detect aruco markers into frame - ar_env.aruco_detector.detect_markers(video_frame.matrix) - - # Estimate markers poses - ar_env.aruco_detector.estimate_markers_pose() - - # Estimate scene pose from ArUco markers into frame. - tvec, rmat, _ = ar_scene.estimate_pose(ar_env.aruco_detector.detected_markers) - - # Project AOI scene into frame according estimated pose - aoi_scene_projection = ar_scene.project(tvec, rmat, visual_hfov=TobiiSpecifications.VISUAL_HFOV) - - # Store all projected aoi - for aoi_name in aoi_scene_projection.keys(): - - projected_aois[aoi_name] = numpy.rint(aoi_scene_projection[aoi_name]).astype(int) - - if args.debug: - - # Draw detected markers - ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix) - - # Draw AOI - aoi_scene_projection.draw(visu_frame.matrix, (0, 0), color=(0, 255, 255)) - - # Catch exceptions raised by estimate_pose and project methods - except (ArFeatures.PoseEstimationFailed, ArFeatures.SceneProjectionFailed) as e: - - if str(e) == 'Unconsistent marker poses': - - projected_aois['error'] = str(e) + ': ' + str(e.unconsistencies) - - else: - - projected_aois['error'] = str(e) - - if args.debug: - - # Draw detected markers - ar_env.aruco_detector.draw_detected_markers(visu_frame.matrix) - - cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Raised when timestamped buffer is empty - except KeyError as e: - - e = 'VideoTimeStamp missing' - - projected_aois['offset'] = 0 - projected_aois['error'] = e - - if args.debug: - - cv.rectangle(visu_frame.matrix, (0, 100), (550, 150), (127, 127, 127), -1) - cv.putText(visu_frame.matrix, str(e), (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv.LINE_AA) - - # Store projected AOI - ts_offset_aois[video_ts] = projected_aois - - if args.debug: - # Draw gaze positions until next frame - try: - - # Get next gaze position - ts_start, start_gaze_position = ts_gaze_positions.first - ts_next, next_gaze_position = ts_gaze_positions.first - - # Check next gaze position is not after next frame time - while ts_next < next_video_ts: - - ts_start, start_gaze_position = ts_gaze_positions.pop_first() - ts_next, next_gaze_position = ts_gaze_positions.first - - # Draw start gaze - start_gaze_position.draw(visu_frame.matrix) - - if start_gaze_position.valid and next_gaze_position.valid: - - # Draw movement from start to next - cv.line(visu_frame.matrix, start_gaze_position, next_gaze_position, (0, 255, 255), 1) - - if start_gaze_position.valid: - - # Write last start gaze position - cv.putText(visu_frame.matrix, str(start_gaze_position.value), start_gaze_position.value, cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA) - - # Write last start gaze position timing - cv.rectangle(visu_frame.matrix, (0, 50), (550, 100), (31, 31, 31), -1) - cv.putText(visu_frame.matrix, f'Gaze time: {ts_start*1e-3:.3f} ms', (20, 85), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - # Empty gaze position - except IndexError: - pass - - # Draw focus area - cv.rectangle(visu_frame.matrix, (int(video_frame.width*args.borders/100.), 0), (int(visu_frame.width*(1-args.borders/100)), int(visu_frame.height)), (255, 150, 150), 1) - - # Draw center - cv.line(visu_frame.matrix, (int(visu_frame.width/2) - 50, int(visu_frame.height/2)), (int(visu_frame.width/2) + 50, int(visu_frame.height/2)), (255, 150, 150), 1) - cv.line(visu_frame.matrix, (int(visu_frame.width/2), int(visu_frame.height/2) - 50), (int(visu_frame.width/2), int(visu_frame.height/2) + 50), (255, 150, 150), 1) - - # Write segment timing - cv.rectangle(visu_frame.matrix, (0, 0), (550, 50), (63, 63, 63), -1) - cv.putText(visu_frame.matrix, f'Video time: {video_ts*1e-3:.3f} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA) - - if args.debug: - - # Close window using 'Esc' key - if cv.waitKey(1) == 27: - break - - # Display visualisation - cv.imshow(f'Segment {tobii_segment.id} ArUco AOI', visu_frame.matrix) - - # Write video - output_video.write(visu_frame.matrix) - - # Update Progress Bar - progress = video_ts*1e-3 - int(args.time_range[0] * 1e3) - MiscFeatures.printProgressBar(progress, tobii_segment_video.duration*1e-3, prefix = 'ArUco detection & AOI projection:', suffix = 'Complete', length = 100) - - # Exit on 'ctrl+C' interruption - except KeyboardInterrupt: - pass - - if args.debug: - - # Stop frame display - cv.destroyAllWindows() - - # End output video file - output_video.close() - - # Print aruco detection metrics - print('\n\nAruco marker detection metrics') - try_count, detected_counts = ar_env.aruco_detector.detection_metrics - - for marker_id, detected_count in detected_counts.items(): - print(f'\tMarkers {marker_id} has been detected in {detected_count} / {try_count} frames ({round(100 * detected_count / try_count, 2)} %)') - - # Export aruco aoi data - ts_offset_aois.to_json(aoi_json_filepath) - ts_offset_aois.as_dataframe().to_csv(aoi_csv_filepath) - print(f'Aruco AOI data saved into {aoi_json_filepath} and {aoi_csv_filepath}') - - # Notify when the aruco aoi video has been exported - print(f'Aruco AOI video saved into {aoi_mp4_filepath}') - -if __name__ == '__main__': - - main()
\ No newline at end of file |