#!/usr/bin/env python from typing import TypeVar import datetime import json import os from argaze import DataStructures from argaze.TobiiGlassesPro2 import TobiiData, TobiiVideo import av import cv2 as cv TOBII_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S+%f' TOBII_PROJECTS_DIRNAME = "projects" TOBII_PROJECT_FILENAME = "project.json" TOBII_PARTICIPANTS_DIRNAME = "participants" TOBII_PARTICIPANT_FILENAME = "participant.json" TOBII_RECORDINGS_DIRNAME = "recordings" TOBII_RECORD_FILENAME = "recording.json" TOBII_SEGMENTS_DIRNAME = "segments" TOBII_SEGMENT_INFO_FILENAME = "segment.json" TOBII_SEGMENT_VIDEO_FILENAME = "fullstream.mp4" TOBII_SEGMENT_DATA_FILENAME = "livedata.json.gz" DatetimeType = TypeVar('datetime', bound="datetime") # Type definition for type annotation convenience class TobiiSegment: """Handle Tobii Glasses Pro 2 segment info.""" def __init__(self, segment_path, start_timestamp:int = 0, end_timestamp:int = None): """Load segment info from segment directory. Optionnaly select a time range in microsecond.""" self.__id = os.path.basename(segment_path) self.__path = segment_path with open(os.path.join(self.__path, TOBII_SEGMENT_INFO_FILENAME)) as f: try: item = json.load(f) except: raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}') self.__start_timestamp = start_timestamp self.__end_timestamp = min(end_timestamp, int(item["seg_length"] * 1e6)) if end_timestamp != None else int(item["seg_length"] * 1e6) if self.__start_timestamp >= self.__end_timestamp: raise ValueError('start time is equal or greater than end time.') self.__calibrated = bool(item["seg_calibrated"]) self.__start_date = datetime.datetime.strptime(item["seg_t_start"], TOBII_DATETIME_FORMAT) self.__stop_date = datetime.datetime.strptime(item["seg_t_stop"], TOBII_DATETIME_FORMAT) @property def path(self) -> str: """Get segment path.""" return self.__path @property def id(self) -> str: """Get segment id.""" return self.__id @property def start_timestamp(self) -> int: """Get the timestamp where the segment loading starts.""" return self.__start_timestamp @property def end_timestamp(self) -> int: """Get the timestamp where the segment loading ends.""" return self.__end_timestamp @property def start_date(self) -> DatetimeType: """Get the date when the segment has started.""" return self.__start_date @property def stop_date(self) -> DatetimeType: """Get the date when the segment has stopped.""" return self.__stop_date @property def calibrated(self) -> bool: """Is the segment has been calibrated?""" return self.__calibrated def load_data(self) -> "TobiiData.TobiiDataSegment": """Load recorded data stream.""" return TobiiData.TobiiDataSegment(os.path.join(self.__path, TOBII_SEGMENT_DATA_FILENAME), self.__start_timestamp, self.__end_timestamp) def load_video(self) -> "TobiiVideo.TobiiVideoSegment": """Load recorded video stream.""" return TobiiVideo.TobiiVideoSegment(os.path.join(self.__path, TOBII_SEGMENT_VIDEO_FILENAME), self.__start_timestamp, self.__end_timestamp) class TobiiRecording: """Handle Tobii Glasses Pro 2 recording info and segments.""" def __init__(self, recording_path): """Load recording info from recording directory.""" self.__id = os.path.basename(recording_path) self.__path = recording_path with open(os.path.join(self.__path, TOBII_RECORD_FILENAME)) as f: try: item = json.load(f) except: raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_RECORD_FILENAME}') self.__name = item["rec_info"]["Name"] self.__creation_date = datetime.datetime.strptime(item["rec_created"], TOBII_DATETIME_FORMAT) self.__length = int(item["rec_length"]) self.__et_samples = int(item["rec_et_samples"]) self.__et_samples_valid = int(item["rec_et_valid_samples"]) self.__participant_id = item["rec_participant"] @property def path(self) -> str: """Get recording path.""" return self.__path @property def id(self) -> str: """Get recording id.""" return self.__id @property def name(self) -> str: """Get recording name.""" return self.__name @property def creation_date(self) -> DatetimeType: """Get date when the recording has been done.""" return self.__creation_date @property def length(self): """Get record duration in second.""" return self.__length @property def eyetracker_samples(self) -> int: """Get numbers of recorded eye tracker samples.""" return self.__et_samples @property def eyetracker_samples_valid(self) -> int: """Get numbers of recorded eye tracker valid samples.""" return self.__et_samples_valid @property def project(self) -> "TobiiProject": """Get project to which it belongs.""" project_path = os.path.dirname(os.path.dirname(os.path.abspath(self.__path))) return TobiiProject(project_path) @property def participant(self) -> "TobiiParticipant": """Get participant to which it belongs.""" project_path = os.path.dirname(os.path.dirname(os.path.abspath(self.__path))) return TobiiParticipant(project_path + '/participants/' + self.__participant_id) def segments(self) -> list["TobiiSegment"]: """Get all recorded segments.""" all_segments = [] segments_path = os.path.join(self.__path, TOBII_SEGMENTS_DIRNAME) for item in os.listdir(segments_path): segment_path = os.path.join(segments_path, item) if os.path.isdir(segment_path): all_segments.append(TobiiSegment(segment_path)) return all_segments class TobiiParticipant: """Handle Tobii Glasses Pro 2 participant data.""" def __init__(self, participant_path): """Load participant data from path""" self.__id = os.path.basename(participant_path) self.__path = participant_path with open(os.path.join(self.__path, TOBII_PARTICIPANT_FILENAME)) as f: try: item = json.load(f) except: raise RuntimeError(f'JSON fails to load {source_dir}/{TOBII_PARTICIPANT_FILENAME}') self.__name = item["pa_info"]["Name"] @property def path(self) -> str: """Get participant path.""" return self.__path @property def id(self) -> str: """Get participant id.""" return self.__id @property def name(self) -> str: """Get participant name.""" return self.__name class TobiiProject: """Handle Tobii Glasses Pro 2 project data.""" def __init__(self, project_path): """Load project data from projects directory and project id.""" self.__id = os.path.basename(project_path) self.__path = project_path with open(os.path.join(self.__path, TOBII_PROJECT_FILENAME)) as f: try: item = json.load(f) except: raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_PROJECT_FILENAME}') self.__creation_date = datetime.datetime.strptime(item["pr_created"], TOBII_DATETIME_FORMAT) try: self.__name = item["pr_info"]["Name"] except: self.__name = None @property def path(self) -> str: """Get project path.""" return self.__path @property def id(self) -> str: """Get project id.""" return self.__id @property def name(self) -> str: """Get project name.""" return self.__name @property def creation_date(self) -> DatetimeType: """Get date when the project has been created.""" return self.__creation_date def participants(self) -> list["TobiiParticipant"]: """Get all participants.""" all_participants = [] participants_path = os.path.join(self.__path, TOBII_PARTICIPANTS_DIRNAME) for item in os.listdir(participants_path): participant_path = os.path.join(participants_path, item) if os.path.isdir(participant_path): all_participants.append(TobiiParticipant(participant_path)) return all_participants def recordings(self) -> list["TobiiRecording"]: """Get all recordings.""" all_recordings = [] recordings_path = os.path.join(self.__path, TOBII_RECORDINGS_DIRNAME) for item in os.listdir(recordings_path): recording_path = os.path.join(recordings_path, item) if os.path.isdir(recording_path): all_recordings.append(TobiiRecording(recording_path)) return all_recordings class TobiiDrive: """Handle Tobii Glasses Pro 2 drive data.""" def __init__(self, drive_path): """Load drive data from drive directory path.""" self.__path = drive_path @property def path(self) -> str: """Get drive path.""" return self.__path def projects(self) -> list["TobiiProject"]: """Get all projects.""" all_projects = [] projects_path = os.path.join(self.__path, TOBII_PROJECTS_DIRNAME) for item in os.listdir(projects_path): project_path = os.path.join(projects_path, item) if os.path.isdir(project_path): all_projects.append(TobiiProject(project_path)) return all_projects