aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py
blob: 5cbf6c4f42d7e963c81f2a4465fbffc092d188d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
#!/usr/bin/env python

from dataclasses import dataclass, field
import math

from argaze import GazeFeatures

import numpy

@dataclass(frozen=True)
class Fixation(GazeFeatures.Fixation):
    """Define dispersion based fixation."""

    centroid: tuple = field(init=False)
    """Centroïd of all gaze positions belonging to the fixation."""

    deviation_max: float = field(init=False)
    """Maximal gaze position distance to the centroïd."""

    def __post_init__(self):

        super().__post_init__()

        self.update()

    def point_deviation(self, gaze_position) -> float:
        """Get distance of a point from the fixation's centroïd."""

        return numpy.sqrt((self.centroid[0] - gaze_position.value[0])**2 + (self.centroid[1] - gaze_position.value[1])**2)

    def update(self):
        """Update fixation's centroïd then maximal gaze positions deviation from this centroïd."""

        points = self.positions.values()
        points_x, points_y = [p[0] for p in points], [p[1] for p in points]
        points_array = numpy.column_stack([points_x, points_y])
        centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)])
        deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))

        # Update frozen centroid attribute
        object.__setattr__(self, 'centroid', (centroid_array[0], centroid_array[1]))

        # Update frozen deviation_max attribute
        object.__setattr__(self, 'deviation_max', max(deviations_array))

    def overlap(self, fixation) -> list:
        """Does a gaze position from another fixation have a deviation to this fixation centroïd smaller than maximal deviation?"""

        points = fixation.positions.values()
        points_x, points_y = [p[0] for p in points], [p[1] for p in points]
        points_array = numpy.column_stack([points_x, points_y])
        centroid_array = numpy.array([self.centroid[0], self.centroid[1]])
        deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1))

        return min(deviations_array) <= self.deviation_max

    def merge(self, fixation) -> float:
        """Merge another fixation into this fixation."""

        self.positions.append(fixation.positions)
        self.__post_init__()

@dataclass(frozen=True)
class Saccade(GazeFeatures.Saccade):
    """Define dispersion based saccade."""

    def __post_init__(self):
        super().__post_init__()

@dataclass
class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
    """Implementation of the I-DT algorithm as described in:
        
            Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and
            saccades in eye-tracking protocols. In Proceedings of the 2000 symposium
            on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA,
            71-78. [DOI=http://dx.doi.org/10.1145/355017.355028](DOI=http://dx.doi.org/10.1145/355017.355028)
    """

    deviation_max_threshold: int|float
    """Maximal distance allowed to consider a gaze movement as a fixation."""

    duration_min_threshold: int|float
    """Minimal duration allowed to consider a gaze movement as a fixation.
    It is also used as maximal duration allowed to consider a gaze movement as a saccade."""

    def __iter__(self) -> GazeFeatures.GazeMovementType:
        """GazeMovement identification generator."""

        self.__last_fixation = None
        unmatched_gaze_positions = GazeFeatures.TimeStampedGazePositions()

        # While there are 2 gaze positions at least
        while len(self.__ts_gaze_positions) >= 2:

            # Remove all unvalid gaze positions until to find a valid one
            ts_current, gaze_position_current = self.__ts_gaze_positions.pop_first()

            while not gaze_position_current.valid and len(self.__ts_gaze_positions) > 0:
                ts_current, gaze_position_current = self.__ts_gaze_positions.pop_first()

            # Prepare to select current and next valid gaze positions until a duration threshold
            valid_gaze_positions = GazeFeatures.TimeStampedGazePositions()

            # Output last fixation after too much unvalid positons
            if self.__last_fixation != None:

                ts_last, gaze_position_last = self.__last_fixation.positions.last

                if (ts_current - ts_last) > self.duration_min_threshold:

                    # Get last gaze position of the last fixation as it is it's out position
                    last_new_ts, last_new_position = self.__last_fixation.positions.pop_last()

                    # Update last fixation
                    self.__last_fixation.update()

                    # Append this last out position to the valid gaze positions selection
                    valid_gaze_positions[last_new_ts] = last_new_position

                    yield self.__last_fixation
                    self.__last_fixation = None

            # Append current gaze position to valid gaze positions selection
            valid_gaze_positions[ts_current] = gaze_position_current

            # Store unvalid gaze positions to count them
            unvalid_gaze_positions = GazeFeatures.TimeStampedGazePositions()

            # Keep track of last valid timestamp
            ts_last_valid = ts_current

            for ts_next, gaze_position_next in self.__ts_gaze_positions.items():

                if (ts_next - ts_current) < self.duration_min_threshold:

                    # Store valid position
                    if gaze_position_next.valid:

                        valid_gaze_positions[ts_next] = gaze_position_next

                        # Keep track of last valid timestamp
                        ts_last_valid = ts_next

                    # Store non valid position
                    else:

                        unvalid_gaze_positions[ts_next] = gaze_position_next

                else:

                    break

            # If there is at least 3 valid gaze positions selected:
            # - 1 entering position
            # - 1 staying position
            # - 1 outing position (which may become a staying position after extension)
            if len(valid_gaze_positions) >= 3:

                # Consider selected valid gaze positions as part of a maybe new fixation
                new_fixation = Fixation(valid_gaze_positions)

                # Dispersion small enough: it is a fixation ! Try to extend it
                if new_fixation.deviation_max <= self.deviation_max_threshold:

                    # Remove valid and unvalid gaze positions as there as now stored in new fixation
                    # -1 as current gaze position have already been poped
                    for _ in range(len(valid_gaze_positions) + len(unvalid_gaze_positions) - 1):
                        self.__ts_gaze_positions.pop_first()

                    # Copy new fixation positions before to try to extend them
                    extended_gaze_positions = new_fixation.positions.copy()
                    extended_fixation = new_fixation

                    # Are next gaze positions not too dispersed ?
                    while len(self.__ts_gaze_positions) > 0:

                        # Select and remove next gaze position
                        ts_next, gaze_position_next = self.__ts_gaze_positions.pop_first()

                        # Consider only valid next position
                        if gaze_position_next.valid:

                            # Get deviation of the nex gaze position from the new extended fixation
                            deviation_from_extended = extended_fixation.point_deviation(gaze_position_next)

                            # Extend fixation anyway even with last out position: it will be popped later.
                            extended_gaze_positions[ts_next] = gaze_position_next
                            extended_fixation = Fixation(extended_gaze_positions)

                            # Stop fixation extension
                            if deviation_from_extended > self.deviation_max_threshold:

                                break

                        # Check that consecutive unvalid gaze positions do not exceed fixation duration threshold
                        elif ts_next - ts_last_valid >= self.duration_min_threshold:

                            break

                    # Update new fixation
                    new_fixation = extended_fixation

                    # Does a former fixation have been identified ?
                    if self.__last_fixation != None:

                        # Remove last gaze position of the new fixation as it is it's out position 
                        last_new_ts, last_new_position = new_fixation.positions.pop_last()

                        # Edit inter fixations movement gaze positions
                        movement_gaze_positions = GazeFeatures.TimeStampedGazePositions()

                        # If such unmatched positions exist
                        if len(unmatched_gaze_positions) > 0:

                            # Inter fixations movement should:
                            # - starts at last position of last fixation (this position is out so it have to be popped)
                            # - stops at the first position inside new fixation
                            start_movement_ts, start_position = self.__last_fixation.positions.pop_last()
                            stop_movement_ts, stop_position = new_fixation.positions.pop_first()

                            # Edit first movement gaze position
                            movement_gaze_positions[start_movement_ts] = start_position

                            # Edit movement positions with unmatched positions if there are between the 2 fixations
                            start_unmatched_ts, _ = unmatched_gaze_positions.first
                            end_unmatched_ts, _ = unmatched_gaze_positions.last

                            # Does unmatched gaze positions happened between the last and the new fixation ?
                            if start_unmatched_ts > start_movement_ts and end_unmatched_ts < stop_movement_ts: 

                                # Append unmatched gaze positions to saccade
                                movement_gaze_positions.append(unmatched_gaze_positions)

                            # Unmatched gaze positions happened before last fixation
                            else:

                                # Ignore them: GazeMovementIdentifier have to output movements according their time apparition
                                pass

                            # Edit last movement gaze position
                            movement_gaze_positions[stop_movement_ts] = stop_position

                        # When there is no unmatched positions between 2 fixations (*rare case)
                        else:

                            # the last fixation position is the same than the first position of the new fixation
                            stop_movement_ts, stop_position = self.__last_fixation.positions.pop_last()
                            start_movement_ts, start_position = self.__last_fixation.positions.pop_last()

                            # Edit first movement gaze position
                            movement_gaze_positions[start_movement_ts] = start_position

                            # Edit last movement gaze position
                            movement_gaze_positions[stop_movement_ts] = stop_position

                        # Update last and new fixations
                        self.__last_fixation.update()
                        new_fixation.update()

                        # Does new fixation overlap last fixation?
                        if self.__last_fixation.overlap(new_fixation):

                            # (*rare case)
                            if len(unmatched_gaze_positions) == 0:
                                self.__last_fixation.positions[start_movement_ts] = start_position
                                self.__last_fixation.positions[stop_movement_ts] = stop_position

                            # Merge new fixation into last fixation
                            self.__last_fixation.merge(new_fixation)

                            # QUESTION: What to do if the time between the two fixations it very long ?
                            # It would be dangerous to set a timeout value as a fixation duration has no limit.

                            # Forget new fixation
                            new_fixation = None

                            # NOTE: Ignore inter fixations gaze positions: there was probably noisy positions.

                        # Otherwise,
                        else:

                            # Output last fixation
                            yield self.__last_fixation

                            # New fixation becomes the last fixation to allow further merging
                            self.__last_fixation = new_fixation

                            # Short time between fixations : this movement is a saccade
                            if stop_movement_ts - start_movement_ts <= self.duration_min_threshold: 

                                # Output saccade
                                yield Saccade(movement_gaze_positions)

                            # Too much time between fixations: this movement is unknown
                            else:

                                # Output unknown movement
                                yield GazeFeatures.GazeMovement(movement_gaze_positions)

                        # Append out position to last fixation: it will be popped as start_position the next time
                        self.__last_fixation.positions[last_new_ts] = last_new_position

                        # In any case, forget former unmatched gaze positions
                        unmatched_gaze_positions = GazeFeatures.TimeStampedGazePositions()

                    # First fixation is stored to allow further merging
                    # The movement before is outputed
                    else:

                        self.__last_fixation = new_fixation

                        # Is there unmatched gaze positions?
                        if len(unmatched_gaze_positions) > 0:

                            start_movement_ts, _ = unmatched_gaze_positions.first
                            stop_movement_ts, _ = unmatched_gaze_positions.last

                            # Short time between fixations : this movement is a saccade
                            if stop_movement_ts - start_movement_ts <= self.duration_min_threshold: 

                                # Output saccade
                                yield Saccade(unmatched_gaze_positions)

                            # Too much time between fixations: this movement is unknown
                            else:

                                # Output unknown movement
                                yield GazeFeatures.GazeMovement(unmatched_gaze_positions)
                        
                        # In any case, forget former unmatched gaze positions
                        unmatched_gaze_positions = GazeFeatures.TimeStampedGazePositions()

                # Dispersion too wide:
                # Current gaze position is not part of a fixation
                else:

                    unmatched_gaze_positions[ts_current] = gaze_position_current

            # Only one valid gaze position selected:
            # Current gaze position is not part of a fixation
            else:

                unmatched_gaze_positions[ts_current] = gaze_position_current

        # Output last fixation
        if self.__last_fixation != None:
            yield self.__last_fixation