aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/argaze/utils/tobii_segment_gaze_movements_export.py362
1 files changed, 183 insertions, 179 deletions
diff --git a/src/argaze/utils/tobii_segment_gaze_movements_export.py b/src/argaze/utils/tobii_segment_gaze_movements_export.py
index 934f340..f8bdb48 100644
--- a/src/argaze/utils/tobii_segment_gaze_movements_export.py
+++ b/src/argaze/utils/tobii_segment_gaze_movements_export.py
@@ -26,6 +26,7 @@ def main():
parser.add_argument('-t', '--time_range', metavar=('START_TIME', 'END_TIME'), nargs=2, type=float, default=(0., None), help='start and end time (in second)')
parser.add_argument('-dev', '--deviation_max_threshold', metavar='DISPERSION_THRESHOLD', type=int, default=None, help='maximal distance for fixation identification in pixel')
parser.add_argument('-dmin', '--duration_min_threshold', metavar='DURATION_MIN_THRESHOLD', type=int, default=200, help='minimal duration for fixation identification in millisecond')
+ parser.add_argument('-v', '--visu', metavar='VISU', type=bool, default=False, help='enable data visualisation')
parser.add_argument('-o', '--output', metavar='OUT', type=str, default=None, help='destination folder path (segment folder by default)')
parser.add_argument('-w', '--window', metavar='DISPLAY', type=bool, default=True, help='enable window display', action=argparse.BooleanOptionalAction)
args = parser.parse_args()
@@ -294,7 +295,7 @@ def main():
ts_status = GazeFeatures.TimeStampedGazeStatus()
# Initialise progress bar
- #MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazeMovements identification:', suffix = 'Complete', length = 100)
+ MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGazeMovements identification:', suffix = 'Complete', length = 100)
for gaze_movement in movement_identifier(ts_gaze_positions):
@@ -330,7 +331,7 @@ def main():
# Update Progress Bar
progress = start_ts - int(args.time_range[0] * 1e6)
- #MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze movements identification:', suffix = 'Complete', length = 100)
+ MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze movements identification:', suffix = 'Complete', length = 100)
print(f'\nGazeMovements identification metrics:')
print(f'\t{len(ts_fixations)} fixations found')
@@ -352,245 +353,248 @@ def main():
# DEBUG
ts_status.as_dataframe().to_csv(f'{destination_path}/gaze_status.csv')
- # Prepare video exportation at the same format than segment video
- output_video = TobiiVideo.TobiiVideoOutput(gaze_status_video_filepath, tobii_segment_video.stream)
+ # Edit data visualisation
+ if args.visu:
- # Reload aoi scene projection
- ts_aois_projections = DataStructures.TimeStampedBuffer.from_json(aoi_filepath)
+ # Prepare video exportation at the same format than segment video
+ output_video = TobiiVideo.TobiiVideoOutput(gaze_status_video_filepath, tobii_segment_video.stream)
- # Prepare gaze satus image
- gaze_status_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8)
+ # Reload aoi scene projection
+ ts_aois_projections = DataStructures.TimeStampedBuffer.from_json(aoi_filepath)
- # Video loop
- try:
+ # Prepare gaze satus image
+ gaze_status_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8)
- # Initialise progress bar
- MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGaze status video processing:', suffix = 'Complete', length = 100)
+ # Video loop
+ try:
- fixations_exist = len(ts_fixations) > 0
- saccades_exist = len(ts_saccades) > 0
- movements_exist = len(ts_movements) > 0
- status_exist = len(ts_status) > 0
+ # Initialise progress bar
+ MiscFeatures.printProgressBar(0, tobii_segment_video.duration, prefix = '\nGaze status video processing:', suffix = 'Complete', length = 100)
- if fixations_exist:
- current_fixation_ts, current_fixation = ts_fixations.pop_first()
- current_fixation_time_counter = 0
+ fixations_exist = len(ts_fixations) > 0
+ saccades_exist = len(ts_saccades) > 0
+ movements_exist = len(ts_movements) > 0
+ status_exist = len(ts_status) > 0
- if saccades_exist:
- current_saccade_ts, current_saccade = ts_saccades.pop_first()
+ if fixations_exist:
+ current_fixation_ts, current_fixation = ts_fixations.pop_first()
+ current_fixation_time_counter = 0
- if movements_exist:
- current_movements_ts, current_movements = ts_movements.pop_first()
+ if saccades_exist:
+ current_saccade_ts, current_saccade = ts_saccades.pop_first()
- # Iterate on video frames
- for video_ts, video_frame in tobii_segment_video.frames():
+ if movements_exist:
+ current_movements_ts, current_movements = ts_movements.pop_first()
- # This video frame is the reference until the next frame
- # Here next frame is at + 40ms (25 fps)
- # TODO: Get video fps to adapt
- next_video_ts = video_ts + 40000
+ # Iterate on video frames
+ for video_ts, video_frame in tobii_segment_video.frames():
- visu_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8)
+ # This video frame is the reference until the next frame
+ # Here next frame is at + 40ms (25 fps)
+ # TODO: Get video fps to adapt
+ next_video_ts = video_ts + 40000
- try:
+ visu_matrix = numpy.zeros((1080, 1920, 3), numpy.uint8)
- # Get current aoi projection at video frame time
- ts_current_aois, current_aois = ts_aois_projections.pop_first()
+ try:
- assert(ts_current_aois == video_ts)
+ # Get current aoi projection at video frame time
+ ts_current_aois, current_aois = ts_aois_projections.pop_first()
- # Catch aoi error to not update current aoi
- if 'error' in current_aois.keys():
+ assert(ts_current_aois == video_ts)
- # Display error (remove extra info after ':')
- current_aoi_error = current_aois.pop('error').split(':')[0]
+ # Catch aoi error to not update current aoi
+ if 'error' in current_aois.keys():
- # Select color error
- if current_aoi_error == 'VideoTimeStamp missing':
- color_error = (0, 0, 255)
- else:
- color_error = (0, 255, 255)
+ # Display error (remove extra info after ':')
+ current_aoi_error = current_aois.pop('error').split(':')[0]
- cv.rectangle(visu_matrix, (0, 100), (550, 150), (127, 127, 127), -1)
- cv.putText(visu_matrix, current_aoi_error, (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA)
+ # Select color error
+ if current_aoi_error == 'VideoTimeStamp missing':
+ color_error = (0, 0, 255)
+ else:
+ color_error = (0, 255, 255)
- # Or update current aoi
- elif args.aoi in current_aois.keys():
+ cv.rectangle(visu_matrix, (0, 100), (550, 150), (127, 127, 127), -1)
+ cv.putText(visu_matrix, current_aoi_error, (20, 130), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA)
- ts_current_aoi = ts_current_aois
- current_aoi = AOIFeatures.AreaOfInterest(current_aois.pop(args.aoi))
+ # Or update current aoi
+ elif args.aoi in current_aois.keys():
- # Apply perspective transform algorithm
- destination = numpy.float32([[0, 1080],[1920, 1080],[1920, 0],[0, 0]])
- aoi_matrix = cv.getPerspectiveTransform(current_aoi.astype(numpy.float32), destination)
- visu_matrix = cv.warpPerspective(video_frame.matrix, aoi_matrix, (1920, 1080))
+ ts_current_aoi = ts_current_aois
+ current_aoi = AOIFeatures.AreaOfInterest(current_aois.pop(args.aoi))
- # Wait for aois projection
- except KeyError:
- pass
+ # Apply perspective transform algorithm
+ destination = numpy.float32([[0, 1080],[1920, 1080],[1920, 0],[0, 0]])
+ aoi_matrix = cv.getPerspectiveTransform(current_aoi.astype(numpy.float32), destination)
+ visu_matrix = cv.warpPerspective(video_frame.matrix, aoi_matrix, (1920, 1080))
- if fixations_exist:
+ # Wait for aois projection
+ except KeyError:
+ pass
- # Check next fixation
- if video_ts >= current_fixation_ts + current_fixation.duration and len(ts_fixations) > 0:
+ if fixations_exist:
- current_fixation_ts, current_fixation = ts_fixations.pop_first()
- current_fixation_time_counter = 0
+ # Check next fixation
+ if video_ts >= current_fixation_ts + current_fixation.duration and len(ts_fixations) > 0:
- # While current time belongs to the current fixation
- if video_ts >= current_fixation_ts and video_ts < current_fixation_ts + current_fixation.duration:
+ current_fixation_ts, current_fixation = ts_fixations.pop_first()
+ current_fixation_time_counter = 0
- current_fixation_time_counter += 1
+ # While current time belongs to the current fixation
+ if video_ts >= current_fixation_ts and video_ts < current_fixation_ts + current_fixation.duration:
- # Draw current fixation
- cv.circle(visu_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 255, 0), current_fixation_time_counter)
- cv.circle(gaze_status_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 155, 0))
+ current_fixation_time_counter += 1
- if saccades_exist:
-
- # Check next saccade
- if video_ts >= current_saccade_ts + current_saccade.duration and len(ts_saccades) > 0:
+ # Draw current fixation
+ cv.circle(visu_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 255, 0), current_fixation_time_counter)
+ cv.circle(gaze_status_matrix, (int(current_fixation.centroid[0]), int(current_fixation.centroid[1])), int(current_fixation.deviation_max), (0, 155, 0))
- current_saccade_ts, current_saccade = ts_saccades.pop_first()
+ if saccades_exist:
+
+ # Check next saccade
+ if video_ts >= current_saccade_ts + current_saccade.duration and len(ts_saccades) > 0:
- # While current time belongs to the current saccade
- if video_ts >= current_saccade_ts and video_ts < current_saccade_ts + current_saccade.duration:
- pass
+ current_saccade_ts, current_saccade = ts_saccades.pop_first()
- if movements_exist:
+ # While current time belongs to the current saccade
+ if video_ts >= current_saccade_ts and video_ts < current_saccade_ts + current_saccade.duration:
+ pass
- # Check next movements movement
- if video_ts >= current_movements_ts + current_movements.duration and len(ts_movements) > 0:
+ if movements_exist:
- current_movements_ts, current_movements = ts_movements.pop_first()
+ # Check next movements movement
+ if video_ts >= current_movements_ts + current_movements.duration and len(ts_movements) > 0:
- # While current time belongs to the current movements movement
- if video_ts >= current_movements_ts and video_ts < current_movements_ts + current_movements.duration:
- pass
+ current_movements_ts, current_movements = ts_movements.pop_first()
- # Draw gaze status until next frame
- try:
-
- # Get next gaze status
- ts_start, start_gaze_status = ts_status.first
- ts_next, next_gaze_status = ts_status.first
+ # While current time belongs to the current movements movement
+ if video_ts >= current_movements_ts and video_ts < current_movements_ts + current_movements.duration:
+ pass
- # Check next gaze status is not after next frame time
- while ts_next < next_video_ts:
+ # Draw gaze status until next frame
+ try:
- ts_start, start_gaze_status = ts_status.pop_first()
+ # Get next gaze status
+ ts_start, start_gaze_status = ts_status.first
ts_next, next_gaze_status = ts_status.first
- # Draw movement type
- if start_gaze_status.valid and next_gaze_status.valid \
- and start_gaze_status.movement_index == next_gaze_status.movement_index \
- and start_gaze_status.movement_type == next_gaze_status.movement_type:
+ # Check next gaze status is not after next frame time
+ while ts_next < next_video_ts:
+
+ ts_start, start_gaze_status = ts_status.pop_first()
+ ts_next, next_gaze_status = ts_status.first
+
+ # Draw movement type
+ if start_gaze_status.valid and next_gaze_status.valid \
+ and start_gaze_status.movement_index == next_gaze_status.movement_index \
+ and start_gaze_status.movement_type == next_gaze_status.movement_type:
+
+ if next_gaze_status.movement_type == 'Fixation':
+ movement_color = (0, 255, 0)
+ elif next_gaze_status.movement_type == 'Saccade':
+ movement_color = (0, 0, 255)
+ else:
+ movement_color = (255, 0, 0)
+
+ cv.line(visu_matrix, start_gaze_status, next_gaze_status, movement_color, 3)
+ cv.line(gaze_status_matrix, start_gaze_status, next_gaze_status, movement_color, 3)
+
+ # Empty gaze position
+ except IndexError:
+ pass
+
+ # Draw gaze positions until next frame
+ try:
- if next_gaze_status.movement_type == 'Fixation':
- movement_color = (0, 255, 0)
- elif next_gaze_status.movement_type == 'Saccade':
- movement_color = (0, 0, 255)
- else:
- movement_color = (255, 0, 0)
+ # Get next gaze position
+ ts_start, start_gaze_position = ts_gaze_positions.first
+ ts_next, next_gaze_position = ts_gaze_positions.first
- cv.line(visu_matrix, start_gaze_status, next_gaze_status, movement_color, 3)
- cv.line(gaze_status_matrix, start_gaze_status, next_gaze_status, movement_color, 3)
+ # Gaze position count
+ gaze_position_count = 0
- # Empty gaze position
- except IndexError:
- pass
+ # Check next gaze position is not after next frame time
+ while ts_next < next_video_ts:
- # Draw gaze positions until next frame
- try:
+ ts_start, start_gaze_position = ts_gaze_positions.pop_first()
+ ts_next, next_gaze_position = ts_gaze_positions.first
- # Get next gaze position
- ts_start, start_gaze_position = ts_gaze_positions.first
- ts_next, next_gaze_position = ts_gaze_positions.first
+ if not start_gaze_position.valid:
- # Gaze position count
- gaze_position_count = 0
-
- # Check next gaze position is not after next frame time
- while ts_next < next_video_ts:
+ # Select color error
+ if start_gaze_position.message == 'VideoTimeStamp missing':
+ color_error = (0, 0, 255)
+ else:
+ color_error = (0, 255, 255)
- ts_start, start_gaze_position = ts_gaze_positions.pop_first()
- ts_next, next_gaze_position = ts_gaze_positions.first
+ # Write unvalid error message
+ cv.putText(visu_matrix, f'{ts_start*1e-3:.3f} ms: {start_gaze_position.message}', (20, 1060 - (gaze_position_count)*50), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA)
+
+ # Draw start gaze
+ start_gaze_position.draw(visu_matrix, draw_precision=False)
+ start_gaze_position.draw(gaze_status_matrix, draw_precision=False)
- if not start_gaze_position.valid:
+ if start_gaze_position.valid and next_gaze_position.valid:
- # Select color error
- if start_gaze_position.message == 'VideoTimeStamp missing':
- color_error = (0, 0, 255)
- else:
- color_error = (0, 255, 255)
-
- # Write unvalid error message
- cv.putText(visu_matrix, f'{ts_start*1e-3:.3f} ms: {start_gaze_position.message}', (20, 1060 - (gaze_position_count)*50), cv.FONT_HERSHEY_SIMPLEX, 1, color_error, 1, cv.LINE_AA)
-
- # Draw start gaze
- start_gaze_position.draw(visu_matrix, draw_precision=False)
- start_gaze_position.draw(gaze_status_matrix, draw_precision=False)
+ # Draw movement from start to next
+ cv.line(visu_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1)
+ cv.line(gaze_status_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1)
- if start_gaze_position.valid and next_gaze_position.valid:
+ gaze_position_count += 1
- # Draw movement from start to next
- cv.line(visu_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1)
- cv.line(gaze_status_matrix, start_gaze_position, next_gaze_position, (0, 55, 55), 1)
+ if start_gaze_position.valid:
- gaze_position_count += 1
+ # Write last start gaze position
+ cv.putText(visu_matrix, str(start_gaze_position.value), start_gaze_position.value, cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
- if start_gaze_position.valid:
-
- # Write last start gaze position
- cv.putText(visu_matrix, str(start_gaze_position.value), start_gaze_position.value, cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv.LINE_AA)
-
- # Write last start gaze position timing
- cv.rectangle(visu_matrix, (0, 50), (550, 100), (31, 31, 31), -1)
- cv.putText(visu_matrix, f'Gaze time: {ts_start*1e-3:.3f} ms', (20, 85), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Empty gaze position
- except IndexError:
- pass
-
- # Write segment timing
- cv.rectangle(visu_matrix, (0, 0), (550, 50), (63, 63, 63), -1)
- cv.putText(visu_matrix, f'Video time: {video_ts*1e-3:.3f} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
-
- # Write movement identification parameters
- cv.rectangle(visu_matrix, (0, 150), (550, 310), (63, 63, 63), -1)
- cv.putText(visu_matrix, f'Deviation max: {selected_deviation_max_threshold} px', (20, 210), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- cv.putText(visu_matrix, f'Duration min: {args.duration_min_threshold} ms', (20, 270), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+ # Write last start gaze position timing
+ cv.rectangle(visu_matrix, (0, 50), (550, 100), (31, 31, 31), -1)
+ cv.putText(visu_matrix, f'Gaze time: {ts_start*1e-3:.3f} ms', (20, 85), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+
+ # Empty gaze position
+ except IndexError:
+ pass
+
+ # Write segment timing
+ cv.rectangle(visu_matrix, (0, 0), (550, 50), (63, 63, 63), -1)
+ cv.putText(visu_matrix, f'Video time: {video_ts*1e-3:.3f} ms', (20, 40), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+
+ # Write movement identification parameters
+ cv.rectangle(visu_matrix, (0, 150), (550, 310), (63, 63, 63), -1)
+ cv.putText(visu_matrix, f'Deviation max: {selected_deviation_max_threshold} px', (20, 210), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
+ cv.putText(visu_matrix, f'Duration min: {args.duration_min_threshold} ms', (20, 270), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv.LINE_AA)
- # Draw dispersion threshold circle
- cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), 2, (0, 255, 255), -1)
- cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), selected_deviation_max_threshold, (255, 150, 150), 1)
+ # Draw dispersion threshold circle
+ cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), 2, (0, 255, 255), -1)
+ cv.circle(visu_matrix, (selected_deviation_max_threshold + 400, 230), selected_deviation_max_threshold, (255, 150, 150), 1)
- if args.window:
+ if args.window:
- # Close window using 'Esc' key
- if cv.waitKey(1) == 27:
- break
+ # Close window using 'Esc' key
+ if cv.waitKey(1) == 27:
+ break
- # Display video
- cv.imshow(f'Segment {tobii_segment.id} movements', visu_matrix)
+ # Display video
+ cv.imshow(f'Segment {tobii_segment.id} movements', visu_matrix)
- # Write video
- output_video.write(visu_matrix)
+ # Write video
+ output_video.write(visu_matrix)
- # Update Progress Bar
- progress = video_ts - int(args.time_range[0] * 1e6)
- MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze status video processing:', suffix = 'Complete', length = 100)
+ # Update Progress Bar
+ progress = video_ts - int(args.time_range[0] * 1e6)
+ MiscFeatures.printProgressBar(progress, tobii_segment_video.duration, prefix = 'Gaze status video processing:', suffix = 'Complete', length = 100)
- # Exit on 'ctrl+C' interruption
- except KeyboardInterrupt:
- pass
+ # Exit on 'ctrl+C' interruption
+ except KeyboardInterrupt:
+ pass
- # Saving gaze status image
- cv.imwrite(gaze_status_image_filepath, gaze_status_matrix)
+ # Saving gaze status image
+ cv.imwrite(gaze_status_image_filepath, gaze_status_matrix)
- # End output video file
- output_video.close()
- print(f'\nGaze status video saved into {gaze_status_video_filepath}\n')
+ # End output video file
+ output_video.close()
+ print(f'\nGaze status video saved into {gaze_status_video_filepath}\n')
if __name__ == '__main__':