aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/GazeAnalysis/DispersionBasedGazeMovementIdentifier.py
blob: 4fd5aabcb9695f658a6d8b21090b578d3b5bca92 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python

from dataclasses import dataclass, field
import math

from argaze import GazeFeatures

import numpy

@dataclass(frozen=True)
class Fixation(GazeFeatures.Fixation):
    """Define dispersion based fixation."""

    dispersion: float = field(init=False)
    """Dispersion of all gaze positions belonging to the fixation."""

    euclidian: bool = field(default=True)
    """Does the distance is calculated in euclidian way."""

    centroid: tuple = field(init=False)
    """Centroïd of all gaze positions belonging to the fixation."""

    def __post_init__(self):

        super().__post_init__()

        x_list = [gp[0] for (ts, gp) in list(self.positions.items())]
        y_list = [gp[1] for (ts, gp) in list(self.positions.items())]

        cx = numpy.mean(x_list)
        cy = numpy.mean(y_list)

        # Select dispersion algorithm
        if self.euclidian:

            c = [cx, cy]
            points = numpy.column_stack([x_list, y_list])

            dist = (points - c)**2
            dist = numpy.sum(dist, axis=1)
            dist = numpy.sqrt(dist)

            __dispersion = max(dist)

        else:

            __dispersion = (max(x_list) - min(x_list)) + (max(y_list) - min(y_list))

        # Update frozen dispersion attribute
        object.__setattr__(self, 'dispersion', __dispersion)

        # Update frozen centroid attribute
        object.__setattr__(self, 'centroid', (cx, cy))

@dataclass(frozen=True)
class Saccade(GazeFeatures.Saccade):
    """Define dispersion based saccade."""

    def __post_init__(self):
        super().__post_init__()

@dataclass
class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier):
    """Implementation of the I-DT algorithm as described in:
        
            Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and
            saccades in eye-tracking protocols. In Proceedings of the 2000 symposium
            on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA,
            71-78. [DOI=http://dx.doi.org/10.1145/355017.355028](DOI=http://dx.doi.org/10.1145/355017.355028)
    """

    dispersion_threshold: int|float
    """Maximal distance allowed to consider several gaze positions as a fixation."""

    duration_threshold: int|float
    """Minimal duration allowed to consider several gaze positions as a fixation."""

    def __iter__(self) -> GazeFeatures.GazeMovementType:
        """GazeMovement identification generator."""

        self.__last_fixation = None

        # while there are 2 gaze positions at least
        while len(self.__ts_gaze_positions) >= 2:

            # copy remaining timestamped gaze positions
            remaining_ts_gaze_positions = self.__ts_gaze_positions.copy()

            # select timestamped gaze position until a duration threshold
            ts_start, gaze_position_start = remaining_ts_gaze_positions.pop_first()

            # Ignore non valid start position
            if not gaze_position_start.valid:
                self.__ts_gaze_positions.pop_first()
                continue

            ts_gaze_positions = GazeFeatures.TimeStampedGazePositions()
            ts_gaze_positions[ts_start] = gaze_position_start

            # Select next position
            ts_next, gaze_position_next = remaining_ts_gaze_positions.first

            while (ts_next - ts_start) < self.duration_threshold:

                # Ignore non valid position
                # Should we consider invalid position to not break fixation ?
                if gaze_position_next.valid:

                    # Store selected position
                    ts, gaze_position = remaining_ts_gaze_positions.pop_first()
                    ts_gaze_positions[ts] = gaze_position

                else:

                    remaining_ts_gaze_positions.pop_first()

                try:
                    # Read next position
                    ts_next, gaze_position_next = remaining_ts_gaze_positions.first

                except:
                    break

            # is it a new fixation ?
            new_fixation = Fixation(ts_gaze_positions)

            # dispersion is small : extending fixation
            if new_fixation.dispersion <= self.dispersion_threshold:

                # remove selected gaze positions
                for gp in ts_gaze_positions:
                    self.__ts_gaze_positions.pop_first()

                # extend fixation position from a copy
                ts_gaze_positions_extension = ts_gaze_positions.copy()

                # are next gaze positions not too dispersed ?
                while len(remaining_ts_gaze_positions) > 0:

                    # Select next gaze position
                    ts_next, gaze_position_next = remaining_ts_gaze_positions.first

                    # Ignore non valid position
                    # Should we consider invalid position to not break fixation ?
                    if not gaze_position_next.valid:
                        remaining_ts_gaze_positions.pop_first()
                        continue

                    ts_gaze_positions_extension[ts_next] = gaze_position_next

                    # how much gaze is dispersed ?
                    extended_fixation = Fixation(ts_gaze_positions_extension)

                    # dispersion becomes too wide : ignore extended fixation
                    if extended_fixation.dispersion > self.dispersion_threshold:
                        break

                    # update new fixation
                    new_fixation = Fixation(ts_gaze_positions_extension.copy())

                    # remove selected gaze position
                    remaining_ts_gaze_positions.pop_first()
                    self.__ts_gaze_positions.pop_first()

                # is the new fixation have a duration ?
                if new_fixation.duration > 0:

                    if self.__last_fixation != None:

                        # store start and end positions in a timestamped buffer
                        ts_saccade_positions = GazeFeatures.TimeStampedGazePositions()

                        start_position_ts, start_position = self.__last_fixation.positions.last
                        ts_saccade_positions[start_position_ts] = start_position

                        end_position_ts, end_position = new_fixation.positions.first
                        ts_saccade_positions[end_position_ts] = end_position

                        if end_position_ts > start_position_ts:

                            new_saccade = Saccade(ts_saccade_positions)
                        
                            yield new_saccade

                    self.__last_fixation = new_fixation

                    yield new_fixation

            # dispersion too wide : consider next gaze position
            else:
                self.__ts_gaze_positions.pop_first()