1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
Metrics and video recording
===========================
Observers are attached to pipeline steps to be notified when a method is called.
## observers.py
For this use case we need to record gaze analysis metrics on *ArUcoCamera.on_look* call and to record sector screen image on *ArUcoCamera.on_copy_background_into_scenes_frames* signal.
```python
import logging
from argaze.utils import UtilsFeatures
import cv2
class ScanPathAnalysisRecorder(UtilsFeatures.FileWriter):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.header = "Timestamp (ms)", "Path duration (ms)", "Steps number", "Fixation durations average (ms)", "Explore/Exploit ratio", "K coefficient"
def on_look(self, timestamp, frame, exception):
"""Log scan path metrics."""
if frame.is_analysis_available():
analysis = frame.analysis()
data = (
int(timestamp),
analysis['argaze.GazeAnalysis.Basic.ScanPathAnalyzer'].path_duration,
analysis['argaze.GazeAnalysis.Basic.ScanPathAnalyzer'].steps_number,
analysis['argaze.GazeAnalysis.Basic.ScanPathAnalyzer'].step_fixation_durations_average,
analysis['argaze.GazeAnalysis.ExploreExploitRatio.ScanPathAnalyzer'].explore_exploit_ratio,
analysis['argaze.GazeAnalysis.KCoefficient.ScanPathAnalyzer'].K
)
self.write(data)
class AOIScanPathAnalysisRecorder(UtilsFeatures.FileWriter):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.header = "Timestamp (ms)", "Path duration (ms)", "Steps number", "Fixation durations average (ms)", "Transition matrix probabilities", "Transition matrix density", "N-Grams count", "Stationary entropy", "Transition entropy"
def on_look(self, timestamp, layer, exception):
"""Log aoi scan path metrics"""
if layer.is_analysis_available():
analysis = layer.analysis()
data = (
int(timestamp),
analysis['argaze.GazeAnalysis.Basic.AOIScanPathAnalyzer'].path_duration,
analysis['argaze.GazeAnalysis.Basic.AOIScanPathAnalyzer'].steps_number,
analysis['argaze.GazeAnalysis.Basic.AOIScanPathAnalyzer'].step_fixation_durations_average,
analysis['argaze.GazeAnalysis.TransitionMatrix.AOIScanPathAnalyzer'].transition_matrix_probabilities,
analysis['argaze.GazeAnalysis.TransitionMatrix.AOIScanPathAnalyzer'].transition_matrix_density,
analysis['argaze.GazeAnalysis.NGram.AOIScanPathAnalyzer'].ngrams_count,
analysis['argaze.GazeAnalysis.Entropy.AOIScanPathAnalyzer'].stationary_entropy,
analysis['argaze.GazeAnalysis.Entropy.AOIScanPathAnalyzer'].transition_entropy
)
self.write(data)
class VideoRecorder(UtilsFeatures.VideoWriter):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def on_copy_background_into_scenes_frames(self, timestamp, frame, exception):
"""Write frame image."""
logging.debug('VideoRecorder.on_map')
image = frame.image()
# Write video timing
cv2.rectangle(image, (0, 0), (550, 50), (63, 63, 63), -1)
cv2.putText(image, f'Time: {int(timestamp)} ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
self.write(image)
```
|