aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThéo de la Hogue2022-08-09 17:49:37 +0200
committerThéo de la Hogue2022-08-09 17:49:37 +0200
commit2718252c06e7e23c1500e568769dea70e409a04f (patch)
treedae1c9b60840637d7003ca03221c74ce983b4413
parent7cd90bd18f09dd60f5893b284628c8ff9adf7d23 (diff)
downloadargaze-2718252c06e7e23c1500e568769dea70e409a04f.zip
argaze-2718252c06e7e23c1500e568769dea70e409a04f.tar.gz
argaze-2718252c06e7e23c1500e568769dea70e409a04f.tar.bz2
argaze-2718252c06e7e23c1500e568769dea70e409a04f.tar.xz
rewriting movement identification classes.
-rw-r--r--src/argaze/DataStructures.py8
-rw-r--r--src/argaze/GazeFeatures.py237
-rw-r--r--src/argaze/utils/export_tobii_segment_movements.py46
3 files changed, 128 insertions, 163 deletions
diff --git a/src/argaze/DataStructures.py b/src/argaze/DataStructures.py
index 637a007..15811c2 100644
--- a/src/argaze/DataStructures.py
+++ b/src/argaze/DataStructures.py
@@ -38,6 +38,10 @@ class TimeStampedBuffer(collections.OrderedDict):
for ts, value in timestamped_buffer.items():
self[ts] = value
+ def get_first(self):
+
+ return list(self.items())[0]
+
def pop_first(self):
"""Easing FIFO access mode"""
return self.popitem(last=False)
@@ -59,6 +63,10 @@ class TimeStampedBuffer(collections.OrderedDict):
return popep_ts, poped_value
+ def get_last(self):
+
+ return list(self.items())[-1]
+
def pop_last(self):
"""Easing FIFO access mode"""
return self.popitem(last=True)
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index 8d3d93f..fc07ca2 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
-from dataclasses import dataclass
+from dataclasses import dataclass, field
import math
from argaze import DataStructures
@@ -19,63 +19,43 @@ class TimeStampedGazePositions(DataStructures.TimeStampedBuffer):
super().__setitem__(key, value)
@dataclass
-class Fixation():
- """Define gaze fixation."""
+class Movement():
+ """Define abstract movement class."""
- index: int
- duration: float
- dispersion: float
- centroid: GazePosition
positions: TimeStampedGazePositions
+ duration: float = field(init=False)
-class TimeStampedFixations(DataStructures.TimeStampedBuffer):
- """Define timestamped buffer to store fixations."""
-
- def __setitem__(self, key, value: Fixation):
- """Force value to be a Fixation"""
-
- if not isinstance(value, Fixation):
- raise ValueError('value must be a Fixation')
+ def __post_init__(self):
- super().__setitem__(key, value)
-
-@dataclass
-class Saccade():
- """Define gaze saccade."""
+ start_position_ts, start_position = self.positions.get_first()
+ end_position_ts, end_position = self.positions.get_last()
- index: int
- duration: float
- start_position: GazePosition
- end_position: GazePosition
+ self.duration = round(end_position_ts - start_position_ts)
-class TimeStampedSaccades(DataStructures.TimeStampedBuffer):
- """Define timestamped buffer to store saccades."""
+Fixation = Movement
+"""Define abstract fixation as movement."""
- def __setitem__(self, key, value: Saccade):
- """Force value to be a Saccade"""
+Saccade = Movement
+"""Define abstract saccade as movement."""
- if not isinstance(value, Saccade):
- raise ValueError('value must be a Saccade')
+class TimeStampedMovements(DataStructures.TimeStampedBuffer):
+ """Define timestamped buffer to store movements."""
+ def __setitem__(self, key, value: Movement):
super().__setitem__(key, value)
@dataclass
-class Movement():
- """Define movement."""
+class GazeStatus():
+ """Define gaze status as a position belonging to an identified and indexed movement."""
- index: int
- type: str
position: GazePosition
+ movement_type: str
+ movement_index: int
-class TimeStampedMovements(DataStructures.TimeStampedBuffer):
- """Define timestamped buffer to store movement."""
-
- def __setitem__(self, key, value: Movement):
- """Force value to be a Movement"""
-
- if not isinstance(value, Movement):
- raise ValueError('value must be a Movement')
+class TimeStampedGazeStatus(DataStructures.TimeStampedBuffer):
+ """Define timestamped buffer to store gaze status."""
+ def __setitem__(self, key, value: GazeStatus):
super().__setitem__(key, value)
class MovementIdentifier():
@@ -92,42 +72,58 @@ class MovementIdentifier():
def __next__(self):
raise NotImplementedError('__next__() method not implemented')
- def identify(self):
+class DispersionBasedMovementIdentifier(MovementIdentifier):
+ """Implementation of the I-DT algorithm as described in:
+
+ Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and
+ saccades in eye-tracking protocols. In Proceedings of the 2000 symposium
+ on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA,
+ 71-78. DOI=http://dx.doi.org/10.1145/355017.355028
+ """
+
+ @dataclass
+ class DispersionBasedFixation(Fixation):
+ """Define dispersion based fixation as an algorithm specific fixation."""
- fixations = TimeStampedFixations()
- saccades = TimeStampedSaccades()
- movements = TimeStampedMovement()
+ dispersion: float = field(init=False)
+ euclidian: bool = field(default=True)
- for ts, item in self:
+ centroid: GazePosition = field(init=False)
- if isinstance(item, GazeFeatures.Fixation):
+ def __post_init__(self):
- fixations[ts] = item
+ super().__post_init__()
- for ts, position in item.positions.items():
+ x_list = [gp[0] for (ts, gp) in list(self.positions.items())]
+ y_list = [gp[1] for (ts, gp) in list(self.positions.items())]
- movements[ts] = Movement(item.index, type(item).__name__, position)
+ cx = round(numpy.mean(x_list))
+ cy = round(numpy.mean(y_list))
- elif isinstance(item, GazeFeatures.Saccade):
+ # select dispersion algorithm
+ if self.euclidian:
- saccades[ts] = item
+ c = [cx, cy]
+ points = numpy.column_stack([x_list, y_list])
- movements[ts] = Movement(item.index, type(item).__name__, item.start_position)
- movements[ts + item.duration] = Movement(item.index, type(item).__name__, item.end_position)
+ dist = (points - c)**2
+ dist = numpy.sum(dist, axis=1)
+ dist = numpy.sqrt(dist)
+
+ self.dispersion = round(max(dist))
else:
- continue
- return fixations, saccades, movements
+ self.dispersion = (max(x_list) - min(x_list)) + (max(y_list) - min(y_list))
-class DispersionBasedMovementIdentifier(MovementIdentifier):
- """Implementation of the I-DT algorithm as described in:
-
- Dario D. Salvucci and Joseph H. Goldberg. 2000. Identifying fixations and
- saccades in eye-tracking protocols. In Proceedings of the 2000 symposium
- on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA,
- 71-78. DOI=http://dx.doi.org/10.1145/355017.355028
- """
+ self.centroid = (cx, cy)
+
+ @dataclass
+ class DispersionBasedSaccade(Saccade):
+ """Define dispersion based saccade as an algorithm specific saccade."""
+
+ def __post_init__(self):
+ super().__post_init__()
def __init__(self, ts_gaze_positions, dispersion_threshold = 10, duration_threshold = 100):
@@ -140,37 +136,6 @@ class DispersionBasedMovementIdentifier(MovementIdentifier):
self.__ts_gaze_positions = ts_gaze_positions.copy()
self.__last_fixation = None
- self.__last_fixation_ts = -1
-
- self.__fixations_count = 0
- self.__saccades_count = 0
-
- def __getEuclideanDispersion(self, ts_gaze_positions_list):
- """Euclidian dispersion algorithm"""
-
- x_list = [gp[0] for (ts, gp) in ts_gaze_positions_list]
- y_list = [gp[1] for (ts, gp) in ts_gaze_positions_list]
-
- cx = numpy.mean(x_list)
- cy = numpy.mean(y_list)
- c = [cx, cy]
-
- points = numpy.column_stack([x_list, y_list])
-
- dist = (points - c)**2
- dist = numpy.sum(dist, axis=1)
- dist = numpy.sqrt(dist)
-
- return round(max(dist)), cx, cy
-
- def __getDispersion(self, ts_gaze_positions_list):
- """Basic dispersion algorithm"""
- # TODO : allow to select this algorithm
-
- x_list = [gp.x for (ts, gp) in ts_gaze_positions_list]
- y_list = [gp.y for (ts, gp) in ts_gaze_positions_list]
-
- return (max(x_list) - min(x_list)) + (max(y_list) - min(y_list))
def __iter__(self):
"""Movement identification generator."""
@@ -185,87 +150,71 @@ class DispersionBasedMovementIdentifier(MovementIdentifier):
(ts_start, gaze_position_start) = remaining_ts_gaze_positions.pop_first()
(ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first()
- ts_gaze_positions_list = [(ts_start, gaze_position_start)]
+ ts_gaze_positions = TimeStampedGazePositions()
+ ts_gaze_positions[ts_start] = gaze_position_start
while (ts_current - ts_start) < self.__duration_threshold:
- ts_gaze_positions_list.append( (ts_current, gaze_position_current) )
+ ts_gaze_positions[ts_current] = gaze_position_current
if len(remaining_ts_gaze_positions) > 0:
(ts_current, gaze_position_current) = remaining_ts_gaze_positions.pop_first()
else:
break
- # how much gaze is dispersed ?
- dispersion, cx, cy = self.__getEuclideanDispersion(ts_gaze_positions_list)
+ # is it a new fixation ?
+ new_fixation = DispersionBasedMovementIdentifier.DispersionBasedFixation(ts_gaze_positions)
- # little dispersion
- if dispersion <= self.__dispersion_threshold:
+ # dispersion is small
+ if new_fixation.dispersion <= self.__dispersion_threshold:
# remove selected gaze positions
- for gp in ts_gaze_positions_list:
+ for gp in ts_gaze_positions:
self.__ts_gaze_positions.pop_first()
# are next gaze positions not too dispersed ?
while len(remaining_ts_gaze_positions) > 0:
# select next gaze position
- ts_gaze_positions_list.append(remaining_ts_gaze_positions.pop_first())
-
- new_dispersion, new_cx, new_cy = self.__getEuclideanDispersion(ts_gaze_positions_list)
+ ts_next, position_next = remaining_ts_gaze_positions.pop_first()
+ ts_gaze_positions[ts_next] = position_next
- # dispersion too wide
- if new_dispersion > self.__dispersion_threshold:
+ # how much gaze is dispersed ?
+ updated_fixation = DispersionBasedMovementIdentifier.DispersionBasedFixation(ts_gaze_positions)
- # remove last gaze position
- ts_gaze_positions_list.pop(-1)
+ # dispersion is becomes too wide : ignore updated fixation
+ if updated_fixation.dispersion > self.__dispersion_threshold:
break
- # store new dispersion data
- dispersion = new_dispersion
- cx = new_cx
- cy = new_cy
+ # update new fixation
+ new_fixation = updated_fixation
# remove selected gaze position
self.__ts_gaze_positions.pop_first()
- # we have a new fixation
- ts_list = [ts for (ts, gp) in ts_gaze_positions_list]
- duration = ts_list[-1] - ts_list[0]
-
- if duration > 0:
-
- # store all positions in a timestamped buffer
- ts_gaze_positions = TimeStampedGazePositions()
-
- for (ts, gp) in ts_gaze_positions_list:
- ts_gaze_positions[round(ts)] = gp
-
- self.__fixations_count += 1
-
- new_fixation = Fixation(self.__fixations_count, round(duration), dispersion, (round(cx), round(cy)), ts_gaze_positions)
- new_fixation_ts = ts_list[0]
+ # is the new fixation have a duration ?
+ if new_fixation.duration > 0:
if self.__last_fixation != None:
- new_saccade_ts = self.__last_fixation_ts + self.__last_fixation.duration
- new_saccade_duration = new_fixation_ts - new_saccade_ts
+ # store start and end positions in a timestamped buffer
+ ts_saccade_positions = TimeStampedGazePositions()
+
+ start_position_ts, start_position = self.__last_fixation.positions.pop_last()
+ ts_saccade_positions[start_position_ts] = start_position
- if new_saccade_duration > 0:
+ end_position_ts, end_position = new_fixation.positions.pop_first()
+ ts_saccade_positions[end_position_ts] = end_position
- start_position_ts, start_position = self.__last_fixation.positions.pop_last()
- end_position_ts, end_position = new_fixation.positions.pop_first()
+ if end_position_ts > start_position_ts:
- self.__saccades_count += 1
-
- new_saccade = Saccade(self.__saccades_count, round(new_saccade_duration), start_position, end_position)
+ new_saccade = DispersionBasedMovementIdentifier.DispersionBasedSaccade(ts_saccade_positions)
- yield round(new_saccade_ts), new_saccade
+ yield new_saccade
self.__last_fixation = new_fixation
- self.__last_fixation_ts = new_fixation_ts
- yield round(new_fixation_ts), new_fixation
+ yield new_fixation
# dispersion too wide : consider next gaze position
else:
@@ -393,12 +342,12 @@ class PointerBasedVisualScan(VisualScanGenerator):
class FixationBasedVisualScan(VisualScanGenerator):
"""Build visual scan on the basis of timestamped fixations."""
- def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_fixations: TimeStampedFixations):
+ def __init__(self, ts_aoi_scenes: AOIFeatures.TimeStampedAOIScenes, ts_fixations: TimeStampedMovements):
super().__init__(ts_aoi_scenes)
- if type(ts_fixations) != TimeStampedFixations:
- raise ValueError('second argument must be a GazeFeatures.TimeStampedFixations')
+ if type(ts_fixations) != TimeStampedMovements:
+ raise ValueError('second argument must be a GazeFeatures.TimeStampedMovements')
# process identification on a copy
self.__ts_aoi_scenes = ts_aoi_scenes.copy()
diff --git a/src/argaze/utils/export_tobii_segment_movements.py b/src/argaze/utils/export_tobii_segment_movements.py
index 7222414..ee41a60 100644
--- a/src/argaze/utils/export_tobii_segment_movements.py
+++ b/src/argaze/utils/export_tobii_segment_movements.py
@@ -41,10 +41,11 @@ def main():
destination_path = args.segment_path
- gaze_video_filepath = f'{destination_path}/movements.mp4'
fixations_filepath = f'{destination_path}/movements_fixations.csv'
saccades_filepath = f'{destination_path}/movements_saccades.csv'
- movements_filepath = f'{destination_path}/movements.csv'
+
+ gaze_status_filepath = f'{destination_path}/gaze_status.csv'
+ gaze_status_video_filepath = f'{destination_path}/gaze_status.mp4'
# Load a tobii segment
tobii_segment = TobiiEntities.TobiiSegment(args.segment_path, int(args.time_range[0] * 1e6), int(args.time_range[1] * 1e6) if args.time_range[1] != None else None)
@@ -75,29 +76,34 @@ def main():
# Start movement identification
movement_identifier = GazeFeatures.DispersionBasedMovementIdentifier(ts_gaze_positions, args.dispersion_threshold, args.duration_threshold)
- fixations = GazeFeatures.TimeStampedFixations()
- saccades = GazeFeatures.TimeStampedSaccades()
- movements = GazeFeatures.TimeStampedMovements()
+ fixations = GazeFeatures.TimeStampedMovements()
+ saccades = GazeFeatures.TimeStampedMovements()
+ gaze_status = GazeFeatures.TimeStampedGazeStatus()
# Initialise progress bar
MiscFeatures.printProgressBar(0, int(tobii_segment_video.get_duration()/1000), prefix = 'Movements identification:', suffix = 'Complete', length = 100)
- for ts, item in movement_identifier:
+ for item in movement_identifier:
+
+ if isinstance(item, GazeFeatures.DispersionBasedMovementIdentifier.DispersionBasedFixation):
- if isinstance(item, GazeFeatures.Fixation):
+ start_ts, start_position = item.positions.get_first()
- fixations[ts] = item
+ fixations[start_ts] = item
for ts, position in item.positions.items():
- movements[ts] = GazeFeatures.Movement(item.index, type(item).__name__, position)
+ gaze_status[ts] = GazeFeatures.GazeStatus(position, 'Fixation', len(fixations))
- elif isinstance(item, GazeFeatures.Saccade):
+ elif isinstance(item, GazeFeatures.DispersionBasedMovementIdentifier.DispersionBasedSaccade):
- saccades[ts] = item
+ start_ts, start_position = item.positions.get_first()
+ end_ts, end_position = item.positions.get_last()
+
+ saccades[start_ts] = item
- movements[ts] = GazeFeatures.Movement(item.index, type(item).__name__, item.start_position)
- movements[ts + item.duration] = GazeFeatures.Movement(item.index, type(item).__name__, item.end_position)
+ gaze_status[start_ts] = GazeFeatures.GazeStatus(start_position, 'Saccade', len(saccades))
+ gaze_status[end_ts] = GazeFeatures.GazeStatus(end_position, 'Saccade', len(saccades))
else:
continue
@@ -116,12 +122,12 @@ def main():
saccades.export_as_csv(saccades_filepath)
print(f'Saccades saved into {saccades_filepath}')
- # Export movements analysis
- movements.export_as_csv(movements_filepath)
- print(f'Movements saved into {movements_filepath}')
+ # Export gaze status analysis
+ gaze_status.export_as_csv(gaze_status_filepath)
+ print(f'Gaze status saved into {gaze_status_filepath}')
# Prepare video exportation at the same format than segment video
- output_video = TobiiVideo.TobiiVideoOutput(gaze_video_filepath, tobii_segment_video.get_stream())
+ output_video = TobiiVideo.TobiiVideoOutput(gaze_status_video_filepath, tobii_segment_video.get_stream())
# Video and data loop
try:
@@ -160,8 +166,10 @@ def main():
if video_ts_ms > current_saccade_ts + current_saccade.duration:
current_saccade_ts, current_saccade = saccades.pop_first()
+ start_ts, start_position = current_saccade.positions.pop_first()
+ end_ts, end_position = current_saccade.positions.pop_first()
- cv.line(video_frame.matrix, current_saccade.start_position, current_saccade.end_position, (0, 0, 255), 2)
+ cv.line(video_frame.matrix, start_position, end_position, (0, 0, 255), 2)
else:
@@ -203,7 +211,7 @@ def main():
# End output video file
output_video.close()
- print(f'\nVideo with movements saved into {gaze_video_filepath}')
+ print(f'\nVideo with movements saved into {gaze_status_video_filepath}')
if __name__ == '__main__':