aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/argaze/utils/contexts/TobiiProGlasses2.py115
1 files changed, 44 insertions, 71 deletions
diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py
index b7db70b..c71a622 100644
--- a/src/argaze/utils/contexts/TobiiProGlasses2.py
+++ b/src/argaze/utils/contexts/TobiiProGlasses2.py
@@ -354,6 +354,9 @@ class LiveStream(ArFeatures.LiveProcessingContext):
self.__configuration = {}
+ self.__battery_level = math.nan
+ self.__battery_remaining_time = math.nan
+
self.__parser = TobiiJsonDataParser()
@property
@@ -577,6 +580,12 @@ class LiveStream(ArFeatures.LiveProcessingContext):
logging.debug('> starting keep alive thread...')
self.__keep_alive_thread.start()
+ # Check battery status
+ self.__check_battery_thread = threading.Thread(target=self.__check_battery)
+
+ logging.debug('> starting battery status thread...')
+ self.__check_battery_thread.start()
+
return self
@DataFeatures.PipelineStepExit
@@ -786,6 +795,20 @@ class LiveStream(ArFeatures.LiveProcessingContext):
time.sleep(1)
+ def __check_battery(self):
+ """Check battery status every 10 seconds."""
+
+ logging.debug('%s.__check_battery', DataFeatures.get_class_path(self))
+
+ while not self._stop_event.is_set():
+
+ battery_status = self.get_system_status()['sys_battery']
+
+ self.__battery_level = battery_status['level']
+ self.__battery_remaining_time = battery_status['remaining_time']
+
+ time.sleep(10)
+
def __get_request(self, api_action) -> any:
"""Send a GET request and get data back."""
@@ -971,7 +994,7 @@ class LiveStream(ArFeatures.LiveProcessingContext):
def is_recording(self) -> bool:
"""Is it recording?"""
- rec_status = self.get_status()['sys_recording']
+ rec_status = self.get_system_status()['sys_recording']
if rec_status != {}:
if rec_status['rec_state'] == "recording":
@@ -998,82 +1021,15 @@ class LiveStream(ArFeatures.LiveProcessingContext):
# MISC
- def eject_sd(self):
- self.__get_request('/api/eject')
-
- def get_battery_info(self):
- return ("Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (
- float(self.get_battery_level()), float(self.get_battery_remaining_time())))
-
- def get_battery_level(self):
- return self.get_battery_status()['level']
-
- def get_battery_remaining_time(self):
- return self.get_battery_status()['remaining_time']
-
- def get_battery_status(self):
- return self.get_status()['sys_battery']
-
- def get_et_freq(self):
- return self.get_configuration()['sys_et_freq']
-
- def get_et_frequencies(self):
- return self.get_status()['sys_et']['frequencies']
-
def identify(self):
self.__get_request('/api/identify')
- def get_configuration(self):
- return self.__get_request('/api/system/conf')
+ def eject_sd(self):
+ self.__get_request('/api/eject')
- def get_status(self):
+ def get_system_status(self):
return self.__get_request('/api/system/status')
- def get_storage_info(self):
- return ("Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()))
-
- def get_storage_remaining_time(self):
- return self.get_storage_status()['remaining_time']
-
- def get_storage_status(self):
- return self.get_status()['sys_storage']
-
- def get_scene_camera_freq(self):
- return self.get_configuration()['sys_sc_fps']
-
- def set_et_freq_50(self):
- data = {'sys_et_freq': 50}
- json_data = self.__post_request('/api/system/conf', data)
-
- def set_et_freq_100(self):
- # May not be available. Check get_et_frequencies() first.
- data = {'sys_et_freq': 100}
- json_data = self.__post_request('/api/system/conf', data)
-
- def set_eye_camera_indoor_preset(self) -> str:
- data = {'sys_ec_preset': 'Indoor'}
- return self.__post_request('/api/system/conf', data)
-
- def set_eye_camera_outdoor_preset(self) -> str:
- data = {'sys_ec_preset': 'ClearWeather'}
- return self.__post_request('/api/system/conf', data)
-
- def set_scene_camera_auto_preset(self):
- data = {'sys_sc_preset': 'Auto'}
- json_data = self.__post_request('/api/system/conf', data)
-
- def set_scene_camera_gaze_preset(self):
- data = {'sys_sc_preset': 'GazeBasedExposure'}
- json_data = self.__post_request('/api/system/conf', data)
-
- def set_scene_camera_freq_25(self):
- data = {'sys_sc_fps': 25}
- json_data = self.__post_request('/api/system/conf/', data)
-
- def set_scene_camera_freq_50(self):
- data = {'sys_sc_fps': 50}
- json_data = self.__post_request('/api/system/conf/', data)
-
@DataFeatures.PipelineStepImage
def image(self, **kwargs):
"""
@@ -1107,6 +1063,23 @@ class LiveStream(ArFeatures.LiveProcessingContext):
cv2.rectangle(image, calibration_panel[0], calibration_panel[1], (0, 127, 0), -1)
cv2.putText(image, f'Calibration succeeded', (calibration_panel[0][0]+20, calibration_panel[0][1]+40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ # Display battery level
+ if self.__battery_level > 50:
+
+ text_color = (0, 255, 0)
+
+ elif self.__battery_level > 10:
+
+ text_color = (0, 255, 255)
+
+ else:
+
+ self.__battery_level = (0, 0, 255)
+
+ if self.__battery_level > 0:
+
+ cv2.putText(image, f'Battery {self.__battery_level}%', (width - 220, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, text_color, 1, cv2.LINE_AA)
+
return image
class PostProcessing(ArFeatures.PostProcessingContext):