From b0479080cdde48351abc3e536eebfb0f87e0e915 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 28 Feb 2024 09:54:48 +0100 Subject: Major rewrite of GazePosition and timestampGazePositions class --- src/argaze/GazeFeatures.py | 267 ++++++++++++++++++++++++++------------------- 1 file changed, 156 insertions(+), 111 deletions(-) diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index cfd7419..1c27540 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -25,57 +25,124 @@ import cv2 GazePositionType = TypeVar('GazePosition', bound="GazePosition") # Type definition for type annotation convenience -@dataclass(frozen=True) -class GazePosition(): - """Define gaze position as a tuple of coordinates with precision.""" +class GazePosition(tuple, DataFeatures.TimestampedObject): + """Define gaze position as a tuple of coordinates with precision. - value: tuple[int | float] = field(default=(0, 0)) - """Position's value.""" + Parameters: + precision: the radius of a circle around value where other same gaze position measurements could be. + message: a string to describe why the the position is what it is. + """ - precision: float = field(default=0., kw_only=True) - """Position's precision represents the radius of a circle around \ - this gaze position value where other same gaze position measurements could be.""" + def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, **kwargs): - def __getitem__(self, axis: int) -> int | float: - """Get position value along a particular axis.""" + return tuple.__new__(cls, position) - return self.value[axis] + def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, **kwargs): - def __iter__(self) -> iter: - """Iterate over each position value axis.""" + DataFeatures.TimestampedObject.__init__(self, **kwargs) + self.__precision = precision + self.__message = message - return iter(self.value) - - def __len__(self) -> int: - """Number of axis in position value.""" + @property + def value(self): + """Get position's tuple value.""" + return tuple(self) + + @property + def precision(self): + """Get position's precision.""" + return self.__precision + + @property + def message(self): + """Get position's message.""" + return self.__message + + @classmethod + def from_dict(self, position_data: dict) -> GazePositionType: + + if 'value' in position_data.keys(): + + value = position_data.pop('value') + return GazePosition(value, **position_data) + + else: - return len(self.value) + return GazePosition(**position_data) + + def __bool__(self) -> bool: + """Is the position value valid?""" + return len(self) > 0 def __repr__(self): """String representation""" - return json.dumps(self, ensure_ascii = False, default=vars) + return json.dumps(DataFeatures.as_dict(self)) - def __mul__(self, value) -> GazePositionType: - """Multiply gaze position.""" + def __add__(self, position: GazePositionType) -> GazePositionType: + """Add position. - return GazePosition(numpy.array(self.value) * value, precision= self.precision * numpy.linalg.norm(value)) + !!! note + The returned position precision is the maximal precision. + """ + if self.__precision is not None and position.precision is not None: - def __array__(self): - """Cast as numpy array.""" + return GazePosition(numpy.array(self) + numpy.array(position), precision = max(self.__precision, position.precision)) - return numpy.array(self.value) + else: - @property - def valid(self) -> bool: - """Is the precision not None?""" + return GazePosition(numpy.array(self) + numpy.array(position)) + + __radd__ = __add__ + + def __sub__(self, position: GazePositionType) -> GazePositionType: + """Substract position. + + !!! note + The returned position precision is the maximal precision. + """ + if self.__precision is not None and position.precision is not None: + + return GazePosition(numpy.array(self) - numpy.array(position), precision = max(self.__precision, position.precision)) + + else: + + return GazePosition(numpy.array(self) - numpy.array(position)) + + def __rsub__(self, position: GazePositionType) -> GazePositionType: + """Reversed substract position. + + !!! note + The returned position precision is the maximal precision. + """ + if self.__precision is not None and position.precision is not None: + + return GazePosition(numpy.array(position) - numpy.array(self), precision = max(self.__precision, position.precision)) + + else: + + return GazePosition(numpy.array(position) - numpy.array(self)) + + def __mul__(self, factor: int|float) -> GazePositionType: + """Multiply position by a factor. + + !!! note + The returned position precision is also multiplied by the factor. + """ + return GazePosition(numpy.array(self) * factor, precision = self.__precision * factor if self.__precision is not None else None) + + def __pow__(self, factor: int|float) -> GazePositionType: + """Power position by a factor. - return self.precision is not None + !!! note + The returned position precision is also powered by the factor. + """ + return GazePosition(numpy.array(self) ** factor, precision = self.__precision ** factor if self.__precision is not None else None) def distance(self, gaze_position) -> float: """Distance to another gaze positions.""" - distance = (self.value[0] - gaze_position.value[0])**2 + (self.value[1] - gaze_position.value[1])**2 + distance = (self[0] - gaze_position[0])**2 + (self[1] - gaze_position[1])**2 distance = numpy.sqrt(distance) return distance @@ -84,82 +151,50 @@ class GazePosition(): """Does this gaze position overlap another gaze position considering its precision? Set both to True to test if the other gaze position overlaps this one too.""" - distance = (self.value[0] - gaze_position.value[0])**2 + (self.value[1] - gaze_position.value[1])**2 - distance = numpy.sqrt(distance) + distance = numpy.sqrt(numpy.sum((self - gaze_position)**2)) if both: - return distance < min(self.precision, gaze_position.precision) + return distance < min(self.__precision, gaze_position.precision) else: - return distance < self.precision + return distance < self.__precision def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True): """Draw gaze position point and precision circle.""" if self.valid: - int_value = (int(self.value[0]), int(self.value[1])) + int_value = (int(self[0]), int(self[1])) # Draw point at position if required if color is not None: cv2.circle(image, int_value, size, color, -1) # Draw precision circle - if self.precision > 0 and draw_precision: - cv2.circle(image, int_value, round(self.precision), color, 1) - -class UnvalidGazePosition(GazePosition): - """Unvalid gaze position.""" - - def __init__(self, message=None): - - self.message = message - - super().__init__((None, None), precision=None) + if self.__precision > 0 and draw_precision: + cv2.circle(image, int_value, round(self.__precision), color, 1) TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeStampedGazePositions") # Type definition for type annotation convenience -class TimeStampedGazePositions(DataFeatures.TimeStampedBuffer): - """Define timestamped buffer to store gaze positions.""" - - def __setitem__(self, key, value: GazePosition|dict): - """Force GazePosition storage.""" - - # Convert dict into GazePosition - if type(value) == dict: - - assert(set(['value', 'precision']).issubset(value.keys())) - - if math.isnan(value['precision']): - - if 'message' in value.keys(): - - value = UnvalidGazePosition(value['message']) - - else : - - value = UnvalidGazePosition() - - else: - - value = GazePosition(value['value'], precision=value['precision']) - - assert(type(value) == GazePosition or type(value) == UnvalidGazePosition) +class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList): + """Handle timestamped gaze positions into a list""" + + def __init__(self, gaze_positions: list = []): - super().__setitem__(key, value) + super().__init__(GazePosition, gaze_positions) @classmethod def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType: """Create a TimeStampedGazePositionsType from .json file.""" - with open(json_filepath, encoding='utf-8') as ts_buffer_file: + with open(json_filepath, encoding='utf-8') as ts_positions_file: - json_buffer = json.load(ts_buffer_file) + json_positions = json.load(ts_positions_file) - return TimeStampedGazePositions({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) + return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions}) @classmethod - def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None) -> TimeStampedGazePositionsType: + def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> TimeStampedGazePositionsType: """Create a TimeStampedGazePositions from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). Parameters: @@ -167,6 +202,7 @@ class TimeStampedGazePositions(DataFeatures.TimeStampedBuffer): x: specific x column label. y: specific y column label. precision: specific precision column label if exist. + message: specific message column label if exist. """ # Copy columns @@ -178,29 +214,52 @@ class TimeStampedGazePositions(DataFeatures.TimeStampedBuffer): df = dataframe.loc[:, (timestamp, x, y)] + # DEBUG + print(df) + # Merge x and y columns into one 'value' column df['value'] = tuple(zip(df[x], df[y])) - df.drop(columns= [x, y], inplace=True, axis=1) + df.drop(columns=[x, y], inplace=True, axis=1) + + # DEBUG + print(df) + + # Replace (NaN, NaN) values by () + df['value'] = df.apply(lambda row: print(row.values[1], pandas.isnull(row.values[1])), axis=True) + + # DEBUG + print(df) # Handle precision data if precision: - # Rename precision column into 'precision' column + # Rename precision column into 'precision' column df.rename(columns={precision: 'precision'}, inplace=True) else: - # Append a precision column where precision is NaN if value is a tuple of NaN else 0 - df['precision'] = df.apply(lambda row: numpy.nan if math.isnan(row.value[0]) or math.isnan(row.value[1]) else 0, axis=True) + # Append a None precision column + df['precision'] = df.apply(lambda row: None, axis=True) + + # Handle message data + if message: + + # Rename message column into 'message' column + df.rename(columns={precision: 'message'}, inplace=True) + + else: + + # Append a None message column + df['message'] = df.apply(lambda row: None, axis=True) - # Rename timestamp column into 'timestamp' column then use it as index + # Rename timestamp column into 'timestamp' column df.rename(columns={timestamp: 'timestamp'}, inplace=True) - df.set_index('timestamp', inplace=True) # Filter duplicate timestamps - df = df[df.index.duplicated() == False] + df = df[df.timestamp.duplicated() == False] - return TimeStampedGazePositions(df.to_dict('index')) + # Create timestamped gaze positions + return TimeStampedGazePositions(df.apply(lambda row: GazePosition(row.value, precision=row.precision, message=row.message, timestamp=row.timestamp), axis=True)) class GazePositionCalibrationFailed(Exception): """Exception raised by GazePositionCalibrator.""" @@ -300,7 +359,7 @@ GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement") # Type definition for type annotation convenience @dataclass(frozen=True) -class GazeMovement(): +class GazeMovement(DataFeatures.TimestampedObject): """Define abstract gaze movement class as a buffer of timestamped positions.""" positions: TimeStampedGazePositions @@ -328,7 +387,7 @@ class GazeMovement(): _, start_position = self.positions.first _, end_position = self.positions.last - amplitude = numpy.linalg.norm( numpy.array(start_position.value) - numpy.array(end_position.value)) + amplitude = numpy.linalg.norm(start_position - end_position) # Update frozen amplitude attribute object.__setattr__(self, 'amplitude', amplitude) @@ -350,7 +409,7 @@ class GazeMovement(): for ts, position in self.positions.items(): - output += f'\n\t{ts}:\n\t\tvalue={position.value},\n\t\tprecision={position.precision}' + output += f'\n\t{ts}:\n\t\tvalue={position},\n\t\tprecision={position.precision}' else: @@ -453,24 +512,12 @@ def is_saccade(gaze_movement): TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeStampedGazeMovements") # Type definition for type annotation convenience -class TimeStampedGazeMovements(DataFeatures.TimeStampedBuffer): - """Define timestamped buffer to store gaze movements.""" +class TimeStampedGazeMovements(DataFeatures.TimeStampedObjectsList): + """Handle timestamped gaze movements into a list""" - def __setitem__(self, key, value: GazeMovement): - """Force value to be or inherit from GazeMovement.""" - - assert(isinstance(value, GazeMovement) or type(value).__bases__[0] == Fixation or type(value).__bases__[0] == Saccade) - - super().__setitem__(key, value) - - def __str__(self): - - output = '' - for ts, item in self.items(): - - output += f'\n{item}' + def __init__(self): - return output + super().__init__(GazeMovement) GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus") # Type definition for type annotation convenience @@ -489,23 +536,21 @@ class GazeStatus(GazePosition): def from_position(cls, gaze_position: GazePosition, movement_type: str, movement_index: int) -> GazeStatusType: """Initialize from a gaze position instance.""" - return cls(gaze_position.value, precision=gaze_position.precision, movement_type=movement_type, movement_index=movement_index) + return cls(gaze_position, precision=gaze_position.precision, movement_type=movement_type, movement_index=movement_index) TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus") # Type definition for type annotation convenience -class TimeStampedGazeStatus(DataFeatures.TimeStampedBuffer): - """Define timestamped buffer to store list of gaze statusa. +class TimeStampedGazeStatus(DataFeatures.TimeStampedObjectsList): + """Handle timestamped gaze movements into a list !!! note List of gaze status are required as a gaze position can belongs to two consecutive gaze movements as last and first position. """ - def __setitem__(self, key, value: list): - - assert(isinstance(value, list)) + def __init__(self): - super().__setitem__(key, value) + super().__init__(GazeStatus) class GazeMovementIdentifier(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a gaze movement identifier.""" -- cgit v1.1 From 76f36daab7067176c55927a7c62907d24215accf Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 28 Feb 2024 09:55:50 +0100 Subject: Renaming Timestampbuffer into TimestampObjectsList. Defining TimestampedObject class. --- src/argaze/DataFeatures.py | 217 ++++++++++++++++++++++++++------------------- 1 file changed, 128 insertions(+), 89 deletions(-) diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index edbf8e9..931c21d 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -30,10 +30,10 @@ from colorama import Style, Fore TimeStampType = TypeVar('TimeStamp', int, float) """Type definition for timestamp as integer or float values.""" -DataType = TypeVar('Data') -"""Type definition for data to store anything in time.""" +TimeStampedObjectType = TypeVar('TimeStampedObject', bound="TimeStampedObject") +# Type definition for type annotation convenience -TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") +TimeStampedObjectsListType = TypeVar('TimeStampedObjectsList', bound="TimeStampedObjectsList") # Type definition for type annotation convenience def module_path(obj) -> str: @@ -45,6 +45,39 @@ def module_path(obj) -> str: """ return obj.__class__.__module__ +def properties(cls) -> list: + """get class properties name.""" + + properties = [name for name, item in cls.__dict__.items() if isinstance(item, property)] + + for base in cls.__bases__: + + for name, item in base.__dict__.items(): + + if isinstance(item, property): + + properties.append(name) + + return properties + +def as_dict(obj, filter: bool=True) -> dict: + """Export object as dictionary. + + Parameters: + filter: remove None attribute values. + """ + _dict = {} + + for p in properties(obj.__class__): + + v = getattr(obj, p) + + if not filter or v is not None: + + _dict[p] = v + + return _dict + class JsonEncoder(json.JSONEncoder): """Specific ArGaze JSON Encoder.""" @@ -90,67 +123,63 @@ class JsonEncoder(json.JSONEncoder): return public_dict -class TimeStampedBuffer(collections.OrderedDict): - """Ordered dictionary to handle timestamped data. - ``` - { - timestamp1: data1, - timestamp2: data2, - ... - } - ``` - - !!! warning - - Timestamps must be numbers. +class TimeStampedObjectsList(list): + """Handle timestamped object into a list. - !!! warning "Timestamps are not sorted internally" + !!! warning "Timestamped objects are not sorted internally" - Data are considered to be stored according at their coming time. + Timestamped objects are considered to be stored according at their coming time. """ - def __new__(cls, args = None): - """Inheritance""" + def __init__(self, ts_object_type: type, ts_objects: list = []): - return super(TimeStampedBuffer, cls).__new__(cls) + super().__init__() + self.__object_type = ts_object_type + self.__object_properties = properties(self.__object_type) - def __setitem__(self, ts: TimeStampType, data: DataType): - """Store data at given timestamp.""" + for ts_object in ts_objects: - assert(type(ts) == int or type(ts) == float) + self.append(ts_object) - super().__setitem__(ts, data) + @property + def object_type(self): + """Get object type handled by the list.""" + return self.__object_type - def __repr__(self): - """String representation""" + def append(self, ts_object: TimeStampedObjectType|dict): + """Append timestamped object.""" - return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) + # Convert dict into GazePosition + if type(ts_object) == dict: - def __str__(self): - """String representation""" + ts_object = self.__object_type.from_dict(ts_object) - return json.dumps(self, ensure_ascii=False, cls=JsonEncoder) + # Check object type + if type(ts_object) != self.__object_type: - def append(self, timestamped_buffer: TimeStampedBufferType) -> TimeStampedBufferType: - """Append a timestamped buffer.""" + raise TypeError(f'object type have to be {self.__object_type} not {type(ts_object)}') - for ts, value in timestamped_buffer.items(): - self[ts] = value + assert(ts_object.is_timestamped()) - return self + super().append(ts_object) - @property - def first(self) -> Tuple[TimeStampType, DataType]: - """Easing access to first item.""" + def timestamps(self): + """Get all timestamps in list.""" + return [ts_object.timestamp for ts_object in self] - return list(self.items())[0] + def tuples(self) -> list: + """Get all timestamped objects as list of tuple.""" + return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] - def pop_first(self) -> Tuple[TimeStampType, DataType]: - """Easing FIFO access mode.""" + def __repr__(self): + """String representation""" + return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) - return self.popitem(last=False) + def __str__(self): + """String representation""" + return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) - def pop_last_until(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: + def pop_last_until(self, ts: TimeStampType) -> TimeStampedObjectType: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp @@ -164,7 +193,7 @@ class TimeStampedBuffer(collections.OrderedDict): return first_ts, first_value - def pop_last_before(self, ts: TimeStampType) -> Tuple[TimeStampType, DataType]: + def pop_last_before(self, ts: TimeStampType) -> TimeStampedObjectType: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp @@ -177,18 +206,7 @@ class TimeStampedBuffer(collections.OrderedDict): return popep_ts, poped_value - @property - def last(self) -> Tuple[TimeStampType, DataType]: - """Easing access to last item.""" - - return list(self.items())[-1] - - def pop_last(self) -> Tuple[TimeStampType, DataType]: - """Easing FIFO access mode.""" - - return self.popitem(last=True) - - def get_first_from(self, ts) -> Tuple[TimeStampType, DataType]: + def get_first_from(self, ts) -> TimeStampedObjectType: """Retreive first item timestamp from a given timestamp value.""" ts_list = list(self.keys()) @@ -204,7 +222,7 @@ class TimeStampedBuffer(collections.OrderedDict): raise KeyError(f'No data stored after {ts} timestamp.') - def get_last_before(self, ts) -> Tuple[TimeStampType, DataType]: + def get_last_before(self, ts) -> TimeStampedObjectType: """Retreive last item timestamp before a given timestamp value.""" ts_list = list(self.keys()) @@ -220,7 +238,7 @@ class TimeStampedBuffer(collections.OrderedDict): raise KeyError(f'No data stored before {ts} timestamp.') - def get_last_until(self, ts) -> Tuple[TimeStampType, DataType]: + def get_last_until(self, ts) -> TimeStampedObjectType: """Retreive last item timestamp until a given timestamp value.""" ts_list = list(self.keys()) @@ -237,14 +255,14 @@ class TimeStampedBuffer(collections.OrderedDict): raise KeyError(f'No data stored until {ts} timestamp.') @classmethod - def from_json(self, json_filepath: str) -> TimeStampedBufferType: - """Create a TimeStampedBuffer from .json file.""" + def from_json(self, json_filepath: str) -> TimeStampedObjectsListType: + """Create a TimeStampedObjectsList from .json file.""" - with open(json_filepath, encoding='utf-8') as ts_buffer_file: + with open(json_filepath, encoding='utf-8') as ts_objects_file: - json_buffer = json.load(ts_buffer_file) + json_ts_objects = json.load(ts_objects_file) - return TimeStampedBuffer({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) + return TimeStampedObjectsList([ast.literal_eval(ts_object) for ts_object in json_ts_objects]) def to_json(self, json_filepath: str): """Save a TimeStampedBuffer to .json file.""" @@ -254,14 +272,14 @@ class TimeStampedBuffer(collections.OrderedDict): json.dump(self, ts_buffer_file, ensure_ascii=False, cls=JsonEncoder) @classmethod - def from_dataframe(self, dataframe: pandas.DataFrame, exclude=[]) -> TimeStampedBufferType: - """Create a TimeStampedBuffer from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" + def from_dataframe(self, dataframe: pandas.DataFrame, exclude=[]) -> TimeStampedObjectsListType: + """Create a TimeStampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" dataframe.drop(exclude, inplace=True, axis=True) assert(dataframe.index.name == 'timestamp') - return TimeStampedBuffer(dataframe.to_dict('index')) + return TimeStampedObjectsList(dataframe.to_dict('index')) def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). @@ -278,7 +296,7 @@ class TimeStampedBuffer(collections.OrderedDict): Timestamps are stored as index column called 'timestamp'. """ - df = pandas.DataFrame.from_dict(self.values()) + df = pandas.DataFrame(self.tuples(), columns=self.__object_properties) # Exclude columns df.drop(exclude, inplace=True, axis=True) @@ -307,7 +325,7 @@ class TimeStampedBuffer(collections.OrderedDict): df = df[splited_columns] # Append timestamps as index column - df['timestamp'] = self.keys() + df['timestamp'] = self.timestamps() df.set_index('timestamp', inplace=True) return df @@ -344,13 +362,43 @@ class DataDictionary(dict): __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ -class SharedObject(): +class TimestampedObject(): + """Abstract class to enable timestamp management.""" + + def __init__(self, timestamp: int|float = math.nan): + + self._timestamp = timestamp + + @property + def timestamp(self) -> int|float: + """Get object timestamp.""" + return self._timestamp + + @timestamp.setter + def timestamp(self, timestamp: int|float): + """Set object timestamp.""" + + assert(type(timestamp) == int or type(timestamp) == float) + + self._timestamp = timestamp + + def untimestamp(self): + """Reset object timestamp.""" + self._timestamp = math.nan + + def is_timestamped(self) -> bool: + """Is the object timestamped?""" + timestamped = not math.isnan(self._timestamp) + + return timestamped + +class SharedObject(TimestampedObject): """Abstract class to enable multiple threads sharing and timestamp management.""" def __init__(self): + super().__init__() self._lock = threading.Lock() - self._timestamp = math.nan self._execution_times = {} self._exceptions = {} @@ -362,33 +410,24 @@ class SharedObject(): @property def timestamp(self) -> int|float: """Get shared object timestamp.""" - self._lock.acquire() - timestamp = self._timestamp - self._lock.release() - - return timestamp + with self._lock: + return super().timestamp @timestamp.setter def timestamp(self, timestamp: int|float): """Set shared object timestamp.""" - self._lock.acquire() - self._timestamp = timestamp - self._lock.release() + with self._lock: + super().timestamp = timestamp def untimestamp(self): """Reset shared object timestamp.""" - self._lock.acquire() - self._timestamp = math.nan - self._lock.release() + with self._lock: + self.timestamp = math.nan - @property - def timestamped(self) -> bool: + def is_timestamped(self) -> bool: """Is the object timestamped?""" - self._lock.acquire() - timestamped = not math.isnan(self._timestamp) - self._lock.release() - - return timestamped + with self._lock: + return super().is_timestamped() class PipelineStepObject(): """ -- cgit v1.1 From 0c334b93c3ffab25eec1ced03782babf8ab8b6c9 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 28 Feb 2024 09:56:19 +0100 Subject: Removing useless TimeStampedAOIScenes class. --- src/argaze/AreaOfInterest/AOIFeatures.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py index 6149da4..dbecfc1 100644 --- a/src/argaze/AreaOfInterest/AOIFeatures.py +++ b/src/argaze/AreaOfInterest/AOIFeatures.py @@ -553,16 +553,6 @@ class AOIScene(): return output -class TimeStampedAOIScenes(DataFeatures.TimeStampedBuffer): - """Define timestamped buffer to store AOI scenes in time.""" - - def __setitem__(self, ts, scene): - """Force value to inherit from AOIScene.""" - - assert(type(scene).__bases__[0] == AOIScene) - - super().__setitem__(ts, scene) - HeatmapType = TypeVar('Heatmap', bound="Heatmap") # Type definition for type annotation convenience -- cgit v1.1 From d6754148e3866cd8051ff01d1dbbd5534664bc2a Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 28 Feb 2024 09:57:46 +0100 Subject: Updating GazeFeatures test. --- src/argaze.test/GazeFeatures.py | 218 ++++++++++++++++++++++------------------ 1 file changed, 119 insertions(+), 99 deletions(-) diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index b41c7c7..fdc140d 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -29,10 +29,13 @@ def random_gaze_positions(size, frame_dimension: tuple[float, float] = (1, 1)): for i in range(0, size): # Edit gaze position - random_gaze_position = GazeFeatures.GazePosition((random.random() * frame_dimension[0], random.random() * frame_dimension[1])) + random_gaze_position = GazeFeatures.GazePosition((random.random() * frame_dimension[0], random.random() * frame_dimension[1]), precision=5) - # Store gaze position - ts_gaze_positions[time.time()] = random_gaze_position + # Timestamp gaze position + random_gaze_position.timestamp = time.time() + + # Store timestamped gaze position + ts_gaze_positions.append(random_gaze_position) return ts_gaze_positions @@ -61,47 +64,45 @@ class TestSaccade(GazeFeatures.Saccade): class TestGazePositionClass(unittest.TestCase): """Test GazePosition class.""" - + def test_new(self): """Test GazePosition creation.""" # Check empty GazePosition - empty_gaze_position = GazeFeatures.GazePosition() + empty_gaze_position = GazeFeatures.GazePosition(message="empty for test") - self.assertEqual(empty_gaze_position.value, (0, 0)) - self.assertEqual(empty_gaze_position[0], 0) - self.assertEqual(empty_gaze_position[1], 0) - - for v in empty_gaze_position: - self.assertEqual(v, 0) - - self.assertEqual(empty_gaze_position.precision, 0.) - self.assertEqual(len(empty_gaze_position), 2) - self.assertEqual(empty_gaze_position.valid, True) - self.assertEqual(numpy.array(empty_gaze_position).shape, (2,)) + self.assertEqual(empty_gaze_position, ()) + self.assertEqual(empty_gaze_position.value, ()) + self.assertEqual(empty_gaze_position.precision, None) + self.assertEqual(empty_gaze_position.message, "empty for test") + self.assertEqual(len(empty_gaze_position), 0) + self.assertEqual(bool(empty_gaze_position), False) + self.assertEqual(numpy.array(empty_gaze_position).shape, (0,)) # Check integer GazePosition int_gaze_position = GazeFeatures.GazePosition((123, 456), precision=55) - self.assertEqual(int_gaze_position.value, (123, 456)) + self.assertEqual(int_gaze_position, (123, 456)) self.assertEqual(int_gaze_position[0], 123) self.assertEqual(int_gaze_position[1], 456) + self.assertEqual(int_gaze_position.value, (123, 456)) self.assertEqual(int_gaze_position.precision, 55) self.assertEqual(len(int_gaze_position), 2) - self.assertEqual(int_gaze_position.valid, True) - self.assertEqual(numpy.array(empty_gaze_position).shape, (2,)) + self.assertEqual(bool(int_gaze_position), True) + self.assertEqual(numpy.array(int_gaze_position).shape, (2,)) # Check float GazePosition float_gaze_position = GazeFeatures.GazePosition((1.23, 4.56), precision=5.5) - self.assertEqual(float_gaze_position.value, (1.23, 4.56)) + self.assertEqual(float_gaze_position, (1.23, 4.56)) self.assertEqual(float_gaze_position[0], 1.23) self.assertEqual(float_gaze_position[1], 4.56) + self.assertEqual(float_gaze_position.value, (1.23, 4.56)) self.assertEqual(float_gaze_position.precision, 5.5) self.assertEqual(len(float_gaze_position), 2) - self.assertEqual(float_gaze_position.valid, True) - self.assertEqual(numpy.array(empty_gaze_position).shape, (2,)) - + self.assertEqual(bool(float_gaze_position), True) + self.assertEqual(numpy.array(float_gaze_position).shape, (2,)) + def test_properties(self): """Test GazePosition properties cannot be modified after creation.""" @@ -110,11 +111,27 @@ class TestGazePositionClass(unittest.TestCase): # Check that gaze position value setting fails with self.assertRaises(AttributeError): - gaze_position.value = (123, 456) + gaze_position.value = (12, 34) + + self.assertNotEqual(gaze_position.value, (12, 34)) + self.assertEqual(gaze_position.value, ()) + + # Check that gaze position precision setting fails + with self.assertRaises(AttributeError): + + gaze_position.precision = 5 - self.assertNotEqual(gaze_position.value, (123, 456)) - self.assertEqual(gaze_position.value, (0, 0)) + self.assertNotEqual(gaze_position.precision, 5) + self.assertEqual(gaze_position.precision, None) + # Check that gaze position message setting fails + with self.assertRaises(AttributeError): + + gaze_position.message = "later setting" + + self.assertNotEqual(gaze_position.message, "later setting") + self.assertEqual(gaze_position.message, None) + def test_overlapping(self): """Test GazePosition overlap method.""" @@ -133,83 +150,80 @@ class TestGazePositionClass(unittest.TestCase): self.assertFalse(gaze_position_C.overlap(gaze_position_A)) self.assertFalse(gaze_position_C.overlap(gaze_position_B)) - + def test___repr__(self): """Test GazePosition string representation.""" # Check empty GazePosition representation - self.assertEqual(repr(GazeFeatures.GazePosition()), "{\"value\": [0, 0], \"precision\": 0.0}") + self.assertEqual(repr(GazeFeatures.GazePosition()), "{\"value\": [], \"timestamp\": NaN}") -class TestUnvalidGazePositionClass(unittest.TestCase): - """Test UnvalidGazePosition class.""" + # Check GazePosition representation + self.assertEqual(repr(GazeFeatures.GazePosition((12, 345), precision=50, message="ok")), \ + "{\"value\": [12, 345], \"precision\": 50, \"message\": \"ok\", \"timestamp\": NaN}") +class TestTimeStampedGazePositionsClass(unittest.TestCase): + """Test TimeStampedGazePositions class.""" + def test_new(self): - """Test UnvalidGazePosition creation.""" - - import math - - unvalid_gaze_position = GazeFeatures.UnvalidGazePosition() - - self.assertEqual(unvalid_gaze_position.value, (None, None)) - self.assertEqual(unvalid_gaze_position.precision, None) - self.assertEqual(unvalid_gaze_position.valid, False) + """Test TimeStampedGazePositions creation.""" - def test___repr__(self): - """Test UnvalidGazePosition string representation.""" - - self.assertEqual(repr(GazeFeatures.UnvalidGazePosition()), "{\"message\": null, \"value\": [null, null], \"precision\": null}") + empty_gaze_position = GazeFeatures.GazePosition() + empty_gaze_position.timestamp = 100 -class TestTimeStampedGazePositionsClass(unittest.TestCase): - """Test TimeStampedGazePositions class.""" + gaze_position_with_message = GazeFeatures.GazePosition((12, 345), message="second position") + gaze_position_with_message.timestamp = 200 - def test___setitem__(self): - """Test __setitem__ method.""" + dict_gaze_position = {"timestamp": 300, "value": (0, 0), "precision": 0.} - ts_gaze_positions = GazeFeatures.TimeStampedGazePositions() - ts_gaze_positions[0] = GazeFeatures.GazePosition() - ts_gaze_positions[1] = GazeFeatures.UnvalidGazePosition() - ts_gaze_positions[2] = {"value": (0, 0), "precision": 0.} + ts_gaze_positions = GazeFeatures.TimeStampedGazePositions([empty_gaze_position, gaze_position_with_message, dict_gaze_position]) - # Check GazePosition is correctly stored and accessible as a GazePosition + # Check first GazePosition is correctly stored and accessible as a GazePosition self.assertIsInstance(ts_gaze_positions[0], GazeFeatures.GazePosition) - self.assertEqual(ts_gaze_positions[0].valid, True) + self.assertEqual(bool(ts_gaze_positions[0]), False) + self.assertEqual(ts_gaze_positions[0].timestamp, 100) - # Check UnvalidGazePosition is correctly stored and accessible as a UnvalidGazePosition - self.assertIsInstance(ts_gaze_positions[1], GazeFeatures.UnvalidGazePosition) - self.assertEqual(ts_gaze_positions[1].valid, False) + # Check second GazePosition is correctly stored and accessible as a GazePosition + self.assertIsInstance(ts_gaze_positions[1], GazeFeatures.GazePosition) + self.assertEqual(bool(ts_gaze_positions[1]), True) + self.assertEqual(ts_gaze_positions[1].timestamp, 200) - # Check dict with "value" and "precision" keys is correctly stored and accessible as a GazePosition + # Check third GazePosition from dict is correctly stored and accessible as a GazePosition self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition) - self.assertEqual(ts_gaze_positions[2].valid, True) + self.assertEqual(bool(ts_gaze_positions[2]), True) + self.assertEqual(ts_gaze_positions[2].timestamp, 300) # Check that bad data type insertion fails - with self.assertRaises(AssertionError): + with self.assertRaises(TypeError): - ts_gaze_positions[3] = "This string is not a gaze position value." + ts_gaze_positions.append("This string is not a gaze position value.") # Check that dict with bad keys insertion fails - with self.assertRaises(AssertionError): + with self.assertRaises(TypeError): - ts_gaze_positions[4] = {"bad_key": (0, 0), "precision": 0.} + ts_gaze_positions.append({"bad_key": (0, 0), "precision": 0.}) # Check final lenght self.assertEqual(len(ts_gaze_positions), 3) - + def test___repr__(self): """Test inherited string representation.""" ts_gaze_positions = GazeFeatures.TimeStampedGazePositions() - self.assertEqual(repr(GazeFeatures.TimeStampedGazePositions()), "{}") + self.assertEqual(repr(GazeFeatures.TimeStampedGazePositions()), "[]") - ts_gaze_positions[0] = GazeFeatures.GazePosition() - - self.assertEqual(repr(ts_gaze_positions), "{\"0\": {\"value\": [0, 0], \"precision\": 0.0}}") + empty_gaze_position = GazeFeatures.GazePosition() + empty_gaze_position.timestamp = 100 + ts_gaze_positions.append(empty_gaze_position) - ts_gaze_positions[0] = GazeFeatures.UnvalidGazePosition() + self.assertEqual(repr(ts_gaze_positions), "[{\"value\": [], \"timestamp\": 100}]") - self.assertEqual(repr(ts_gaze_positions), "{\"0\": {\"message\": null, \"value\": [null, null], \"precision\": null}}") + full_gaze_position = GazeFeatures.GazePosition((12, 345), precision=50, message="ok") + full_gaze_position.timestamp = 200 + ts_gaze_positions[0] = full_gaze_position + self.assertEqual(repr(ts_gaze_positions), "[{\"value\": [12, 345], \"precision\": 50, \"message\": \"ok\", \"timestamp\": 200}]") + def test_from_dataframe(self): """Test from_dataframe classmethod.""" @@ -226,13 +240,13 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase): # Check first gaze position is correctly stored and accessible as a GazePosition self.assertIsInstance(ts_gaze_positions[0], GazeFeatures.GazePosition) - self.assertEqual(ts_gaze_positions[0].precision, 0) - self.assertEqual(ts_gaze_positions[0].valid, True) + self.assertEqual(ts_gaze_positions[0].precision, None) + self.assertEqual(bool(ts_gaze_positions[0]), True) - # Check third gaze position is correctly stored and accessible as a UnvalidGazePosition - self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.UnvalidGazePosition) + # Check third gaze position is correctly stored and accessible as a GazePosition + self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition) self.assertEqual(ts_gaze_positions[2].precision, None) - self.assertEqual(ts_gaze_positions[2].valid, False) + self.assertEqual(bool(ts_gaze_positions[2]), False) data = {'Specific timestamp label': [0, 1, 2, 3, 4], 'Specific gaze position x label': [0, 10, numpy.nan, 30, 40], @@ -249,13 +263,13 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase): # Check first gaze position is correctly stored and accessible as a GazePosition self.assertIsInstance(ts_gaze_positions[0], GazeFeatures.GazePosition) self.assertEqual(ts_gaze_positions[0].precision, 15) - self.assertEqual(ts_gaze_positions[0].valid, True) + self.assertEqual(bool(ts_gaze_positions[0]), True) # Check third gaze position is correctly stored and accessible as a UnvalidGazePosition - self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.UnvalidGazePosition) - self.assertEqual(ts_gaze_positions[2].precision, None) - self.assertEqual(ts_gaze_positions[2].valid, False) - + self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition) + self.assertEqual(numpy.isnan(ts_gaze_positions[2].precision), True) + self.assertEqual(bool(ts_gaze_positions[2]), False) + def test_as_dataframe(self): """Test inherited as_dataframe method.""" @@ -265,31 +279,37 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase): self.assertEqual(ts_gaze_positions_dataframe.index.name, "timestamp") self.assertEqual(ts_gaze_positions_dataframe.index.size, 10) - self.assertEqual(ts_gaze_positions_dataframe.columns.size, 2) + self.assertEqual(ts_gaze_positions_dataframe.columns.size, 3) self.assertEqual(ts_gaze_positions_dataframe.columns[0], "value") self.assertEqual(ts_gaze_positions_dataframe.columns[1], "precision") + self.assertEqual(ts_gaze_positions_dataframe.columns[2], "message") self.assertEqual(ts_gaze_positions_dataframe["value"].dtype, 'object') - self.assertEqual(ts_gaze_positions_dataframe["precision"].dtype, 'float64') + self.assertEqual(ts_gaze_positions_dataframe["precision"].dtype, 'int64') + self.assertEqual(ts_gaze_positions_dataframe["message"].dtype, 'O') # Python object type - # Check unvalid position conversion - ts_gaze_positions = GazeFeatures.TimeStampedGazePositions() - ts_gaze_positions[0] = GazeFeatures.UnvalidGazePosition() + # Check empty position conversion + empty_gaze_position = GazeFeatures.GazePosition() + empty_gaze_position.timestamp = 100 + + ts_gaze_positions = GazeFeatures.TimeStampedGazePositions([empty_gaze_position]) ts_gaze_positions_dataframe = ts_gaze_positions.as_dataframe() self.assertEqual(ts_gaze_positions_dataframe.index.name, "timestamp") self.assertEqual(ts_gaze_positions_dataframe.index.size, 1) - self.assertEqual(ts_gaze_positions_dataframe.columns.size, 2) + self.assertEqual(ts_gaze_positions_dataframe.columns.size, 3) self.assertEqual(ts_gaze_positions_dataframe.columns[0], "value") self.assertEqual(ts_gaze_positions_dataframe.columns[1], "precision") + self.assertEqual(ts_gaze_positions_dataframe.columns[2], "message") self.assertEqual(ts_gaze_positions_dataframe["value"].dtype, 'object') self.assertEqual(ts_gaze_positions_dataframe["precision"].dtype, 'O') # Python object type + self.assertEqual(ts_gaze_positions_dataframe["message"].dtype, 'O') # Python object type class TestGazeMovementClass(unittest.TestCase): """Test GazeMovement class.""" - + @unittest.skip("DEBUG") def test_new(self): """Test GazeMovement creation.""" @@ -301,7 +321,7 @@ class TestGazeMovementClass(unittest.TestCase): self.assertEqual(abstract_gaze_movement.amplitude, -1) self.assertEqual(abstract_gaze_movement.valid, False) self.assertEqual(abstract_gaze_movement.finished, False) - + @unittest.skip("DEBUG") def test_finish(self): """Test GazeMovement finishing.""" @@ -322,7 +342,7 @@ class TestGazeMovementClass(unittest.TestCase): class TestUnvalidGazeMovementClass(unittest.TestCase): """Test UnvalidGazeMovement class.""" - + @unittest.skip("DEBUG") def test_new(self): """Test UnvalidGazeMovement creation.""" @@ -338,7 +358,7 @@ class TestUnvalidGazeMovementClass(unittest.TestCase): class TestScanStepClass(unittest.TestCase): """Test ScanStep class.""" - + @unittest.skip("DEBUG") def test_new(self): """Test ScanStep creation.""" @@ -371,7 +391,7 @@ def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)): class TestScanPathClass(unittest.TestCase): """Test ScanPath class.""" - + @unittest.skip("DEBUG") def test_new(self): """Test ScanPath creation.""" @@ -380,7 +400,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(len(scan_path), 0) self.assertEqual(scan_path.duration, 0) - + @unittest.skip("DEBUG") def test_append(self): """Test ScanPath append methods.""" @@ -456,7 +476,7 @@ class TestScanPathClass(unittest.TestCase): class TestAOIScanStepClass(unittest.TestCase): """Test AOIScanStep class.""" - + @unittest.skip("DEBUG") def test_new(self): """Test AOIScanStep creation.""" @@ -478,7 +498,7 @@ class TestAOIScanStepClass(unittest.TestCase): self.assertEqual(aoi_scan_step.first_fixation, fixation) self.assertEqual(aoi_scan_step.last_saccade, saccade) self.assertGreater(aoi_scan_step.duration, 0) - + @unittest.skip("DEBUG") def test_error(self): """Test AOIScanStep creation error.""" @@ -519,7 +539,7 @@ def build_aoi_scan_path(expected_aoi, aoi_path): class TestAOIScanPathClass(unittest.TestCase): """Test AOIScanPath class.""" - + @unittest.skip("DEBUG") def test_new(self): """Test AOIScanPath creation.""" @@ -527,7 +547,7 @@ class TestAOIScanPathClass(unittest.TestCase): aoi_scan_path = GazeFeatures.AOIScanPath() self.assertEqual(len(aoi_scan_path), 0) - + @unittest.skip("DEBUG") def test_append(self): """Test AOIScanPath append methods.""" @@ -575,7 +595,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Check letter affectation self.assertEqual(aoi_scan_path.get_letter_aoi('A'), 'Foo') - + @unittest.skip("DEBUG") def test_append_error(self): """Test AOIScanPath append error.""" @@ -604,7 +624,7 @@ class TestAOIScanPathClass(unittest.TestCase): with self.assertRaises(GazeFeatures.AOIScanStepError): new_step = aoi_scan_path.append_fixation(ts, fixation, 'Shu') - + @unittest.skip("DEBUG") def test_letter_index_and_string_reprentation(self): """Test AOIScanPath letter index and string representation feature.""" @@ -633,7 +653,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Check letter sequence representation self.assertEqual(aoi_scan_path.letter_sequence, 'ABCA') - + @unittest.skip("DEBUG") def test_transition_matrix(self): """Test AOIScanPath transition matrix feature.""" @@ -652,7 +672,7 @@ class TestAOIScanPathClass(unittest.TestCase): self.assertEqual(aoi_scan_path.transition_matrix['Shu']['Foo'], 0) self.assertEqual(aoi_scan_path.transition_matrix['Shu']['Bar'], 1) - + @unittest.skip("DEBUG") def test_transition_matrix(self): """Test AOIScanPath fixations count feature.""" -- cgit v1.1 From 5f915a84f32405dc8bddae4ecbf95f4745af6fbc Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 28 Feb 2024 13:57:31 +0100 Subject: More work on TimestampedGazePositions and GazeMovements. --- .../DispersionThresholdIdentification.py | 4 +- .../VelocityThresholdIdentification.py | 4 +- src/argaze.test/GazeFeatures.py | 122 +++++++------- src/argaze/ArFeatures.py | 6 +- src/argaze/DataFeatures.py | 5 +- .../DispersionThresholdIdentification.py | 48 +++--- src/argaze/GazeFeatures.py | 178 +++++++++++---------- 7 files changed, 184 insertions(+), 183 deletions(-) diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index b7475b5..f0d286a 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -47,7 +47,7 @@ def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time else: - gaze_position = GazeFeatures.UnvalidGazePosition() + gaze_position = GazeFeatures.GazePosition() # Store gaze position ts = time.time() - start_time + start_ts @@ -85,7 +85,7 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl else: - gaze_position = GazeFeatures.UnvalidGazePosition() + gaze_position = GazeFeatures.GazePosition() # Store gaze position ts = time.time() - start_time + start_ts diff --git a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py index 425d592..24f2e3c 100644 --- a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py @@ -53,7 +53,7 @@ def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float, else: - gaze_position = GazeFeatures.UnvalidGazePosition() + gaze_position = GazeFeatures.GazePosition() # Store gaze position ts = time.time() - start_time + start_ts @@ -91,7 +91,7 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl else: - gaze_position = GazeFeatures.UnvalidGazePosition() + gaze_position = GazeFeatures.GazePosition() # Store gaze position ts = time.time() - start_time + start_ts diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index fdc140d..7d18976 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -39,28 +39,27 @@ def random_gaze_positions(size, frame_dimension: tuple[float, float] = (1, 1)): return ts_gaze_positions -@dataclass(frozen=True) class TestFixation(GazeFeatures.Fixation): """Define basic fixation class for test.""" - def __post_init__(self): + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): - super().__post_init__() + super().__init__(positions, finished, message, **kwargs) - points = self.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)]) + if positions: - # Update frozen focus attribute using centroid - object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1])) + positions_array = numpy.asarray(self.values()) + centroid = numpy.mean(positions_array, axis=0) + + # Update focus attribute using centroid + self.focus = (centroid[0], centroid[1]) -@dataclass(frozen=True) class TestSaccade(GazeFeatures.Saccade): """Define basic saccade for test.""" - def __post_init__(self): - super().__post_init__() + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): + + super().__init__(positions, finished, message, **kwargs) class TestGazePositionClass(unittest.TestCase): """Test GazePosition class.""" @@ -265,11 +264,11 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase): self.assertEqual(ts_gaze_positions[0].precision, 15) self.assertEqual(bool(ts_gaze_positions[0]), True) - # Check third gaze position is correctly stored and accessible as a UnvalidGazePosition + # Check third gaze position is correctly stored and accessible as a GazePosition self.assertIsInstance(ts_gaze_positions[2], GazeFeatures.GazePosition) self.assertEqual(numpy.isnan(ts_gaze_positions[2].precision), True) self.assertEqual(bool(ts_gaze_positions[2]), False) - + def test_as_dataframe(self): """Test inherited as_dataframe method.""" @@ -309,19 +308,19 @@ class TestTimeStampedGazePositionsClass(unittest.TestCase): class TestGazeMovementClass(unittest.TestCase): """Test GazeMovement class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test GazeMovement creation.""" abstract_gaze_movement = GazeFeatures.GazeMovement(random_gaze_positions(0)) # Check abstract GazeMovement - self.assertEqual(len(abstract_gaze_movement.positions), 0) - self.assertEqual(abstract_gaze_movement.duration, -1) - self.assertEqual(abstract_gaze_movement.amplitude, -1) - self.assertEqual(abstract_gaze_movement.valid, False) + self.assertEqual(len(abstract_gaze_movement), 0) + self.assertEqual(abstract_gaze_movement.duration, 0) + self.assertEqual(abstract_gaze_movement.amplitude, 0) + self.assertEqual(bool(abstract_gaze_movement), False) self.assertEqual(abstract_gaze_movement.finished, False) - @unittest.skip("DEBUG") + def test_finish(self): """Test GazeMovement finishing.""" @@ -340,25 +339,22 @@ class TestGazeMovementClass(unittest.TestCase): self.assertEqual(abstract_gaze_movement.finished, True) self.assertEqual(abstract_gaze_movement_ref.finished, True) -class TestUnvalidGazeMovementClass(unittest.TestCase): - """Test UnvalidGazeMovement class.""" - @unittest.skip("DEBUG") - def test_new(self): - """Test UnvalidGazeMovement creation.""" + def test_message(self): + """Test GazeMovement creation with message only.""" - unvalid_gaze_movement = GazeFeatures.UnvalidGazeMovement('test') + gaze_movement = GazeFeatures.GazeMovement(message='test') - # Check UnvalidGazeMovement - self.assertEqual(len(unvalid_gaze_movement.positions), 0) - self.assertEqual(unvalid_gaze_movement.duration, -1) - self.assertEqual(unvalid_gaze_movement.amplitude, -1) - self.assertEqual(unvalid_gaze_movement.valid, False) - self.assertEqual(unvalid_gaze_movement.finished, False) - self.assertEqual(unvalid_gaze_movement.message, 'test') + # Check GazeMovement + self.assertEqual(len(gaze_movement), 0) + self.assertEqual(gaze_movement.duration, 0) + self.assertEqual(gaze_movement.amplitude, 0) + self.assertEqual(bool(gaze_movement), False) + self.assertEqual(gaze_movement.finished, False) + self.assertEqual(gaze_movement.message, 'test') class TestScanStepClass(unittest.TestCase): """Test ScanStep class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test ScanStep creation.""" @@ -371,7 +367,7 @@ class TestScanStepClass(unittest.TestCase): self.assertEqual(scan_step.first_fixation, fixation) self.assertEqual(scan_step.last_saccade, saccade) self.assertGreater(scan_step.duration, 0) - + def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)): """Build scan path""" @@ -380,18 +376,18 @@ def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)): for i in range(size): fixation = TestFixation(random_gaze_positions(10, frame_dimension)) - ts, _ = fixation.positions.first + ts, _ = fixation.first scan_path.append_fixation(ts, fixation) saccade = TestSaccade(random_gaze_positions(2, frame_dimension)) - ts, _ = saccade.positions.first + ts, _ = saccade.first scan_path.append_saccade(ts, saccade) return scan_path class TestScanPathClass(unittest.TestCase): """Test ScanPath class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test ScanPath creation.""" @@ -400,7 +396,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(len(scan_path), 0) self.assertEqual(scan_path.duration, 0) - @unittest.skip("DEBUG") + def test_append(self): """Test ScanPath append methods.""" @@ -408,7 +404,7 @@ class TestScanPathClass(unittest.TestCase): # Append a saccade that should be ignored saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.positions.first + ts = saccade[0].timestamp new_step = scan_path.append_saccade(ts, saccade) @@ -419,7 +415,7 @@ class TestScanPathClass(unittest.TestCase): # Append first fixation fixation_A = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_A.positions.first + ts = fixation_A[0].timestamp new_step = scan_path.append_fixation(ts, fixation_A) @@ -430,7 +426,7 @@ class TestScanPathClass(unittest.TestCase): # Append consecutive saccade saccade_A = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade_A.positions.first + ts = saccade_A[0].timestamp new_step_A = scan_path.append_saccade(ts, saccade_A) @@ -443,7 +439,7 @@ class TestScanPathClass(unittest.TestCase): # Append 2 consecutive fixations then a saccade fixation_B1 = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_B1.positions.first + ts = fixation_B1[0].timestamp new_step = scan_path.append_fixation(ts, fixation_B1) @@ -453,7 +449,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(new_step, None) fixation_B2 = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_B2.positions.first + ts = fixation_B2[0].timestamp new_step = scan_path.append_fixation(ts, fixation_B2) @@ -463,7 +459,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(new_step, None) saccade_B = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade_B.positions.first + ts = saccade_B[0].timestamp new_step_B = scan_path.append_saccade(ts, saccade_B) @@ -476,19 +472,19 @@ class TestScanPathClass(unittest.TestCase): class TestAOIScanStepClass(unittest.TestCase): """Test AOIScanStep class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test AOIScanStep creation.""" movements = GazeFeatures.TimeStampedGazeMovements() fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first - movements[ts] = fixation + ts = fixation[0].timestamp + movements.append(fixation) saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.positions.first - movements[ts] = saccade + ts = saccade[0].timestamp + movements.append(saccade) aoi_scan_step = GazeFeatures.AOIScanStep(movements, 'Test') @@ -505,12 +501,12 @@ class TestAOIScanStepClass(unittest.TestCase): movements = GazeFeatures.TimeStampedGazeMovements() saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.positions.first - movements[ts] = saccade + ts = saccade[0].timestamp + movements.append(saccade) fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first - movements[ts] = fixation + ts = fixation[0].timestamp + movements.append(fixation) # Check that aoi scan step creation fail with self.assertRaises(GazeFeatures.AOIScanStepError): @@ -528,11 +524,11 @@ def build_aoi_scan_path(expected_aoi, aoi_path): for aoi in aoi_path: fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first + ts, _ = fixation.first aoi_scan_path.append_fixation(ts, fixation, aoi) saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.positions.first + ts, _ = saccade.first aoi_scan_path.append_saccade(ts, saccade) return aoi_scan_path @@ -555,7 +551,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on A aoi fixation_A = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_A.positions.first + ts, _ = fixation_A.first new_step = aoi_scan_path.append_fixation(ts, fixation_A, 'Foo') @@ -566,7 +562,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append saccade saccade_A = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade_A.positions.first + ts, _ = saccade_A.first new_step = aoi_scan_path.append_saccade(ts, saccade_A) @@ -577,7 +573,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on B aoi fixation_B = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_B.positions.first + ts, _ = fixation_B.first new_step_A = aoi_scan_path.append_fixation(ts, fixation_B, 'Bar') @@ -588,8 +584,8 @@ class TestAOIScanPathClass(unittest.TestCase): self.assertEqual(new_step_A.aoi, 'Foo') self.assertEqual(new_step_A.letter, 'A') - first_ts, _ = fixation_A.positions.first - last_ts, _ = saccade_A.positions.last + first_ts, _ = fixation_A.first + last_ts, _ = saccade_A.last self.assertEqual(new_step_A.duration, last_ts - first_ts) @@ -603,7 +599,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on A aoi fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first + ts, _ = fixation.first new_step = aoi_scan_path.append_fixation(ts, fixation, 'Foo') @@ -613,7 +609,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on B aoi fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.positions.first + ts, _ = fixation.first # Check that aoi scan step creation fail when fixation is appened after another fixation with self.assertRaises(GazeFeatures.AOIScanStepError): diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 5fcc990..47a91e9 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -384,7 +384,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): ) @DataFeatures.PipelineStepMethod - def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()): + def look(self, timestamp: int|float, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()): """ Project timestamped gaze movement into layer. @@ -531,7 +531,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__layers = layers self.__image_parameters = image_parameters - self.__calibrated_gaze_position = GazeFeatures.UnvalidGazePosition() + self.__calibrated_gaze_position = GazeFeatures.GazePosition() self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() self.__scan_path_analyzed = False @@ -875,7 +875,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): ) @DataFeatures.PipelineStepMethod - def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.UnvalidGazePosition()) -> Iterator[Union[object, type, dict]]: + def look(self, timestamp: int|float, gaze_position: GazeFeatures.GazePosition = GazeFeatures.GazePosition()) -> Iterator[Union[object, type, dict]]: """ Project timestamped gaze position into frame. diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 931c21d..7c53c2a 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -133,7 +133,6 @@ class TimeStampedObjectsList(list): def __init__(self, ts_object_type: type, ts_objects: list = []): - super().__init__() self.__object_type = ts_object_type self.__object_properties = properties(self.__object_type) @@ -384,11 +383,11 @@ class TimestampedObject(): def untimestamp(self): """Reset object timestamp.""" - self._timestamp = math.nan + self.timestamp = math.nan def is_timestamped(self) -> bool: """Is the object timestamped?""" - timestamped = not math.isnan(self._timestamp) + timestamped = not math.isnan(self.timestamp) return timestamped diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index 6f8c554..f8e519f 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -37,17 +37,15 @@ class Fixation(GazeFeatures.Fixation): super().__post_init__() - points = self.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)]) - deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) + positions_array = numpy.asarray(self.positions.values()) + centroid = numpy.mean(positions_array, axis=0) + deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) # Update frozen focus attribute using centroid - object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1])) + object.__setattr__(self, 'focus', (centroid[0], centroid[1])) # Update frozen deviation_max attribute - object.__setattr__(self, 'deviation_max', max(deviations_array)) + object.__setattr__(self, 'deviation_max', deviations_array.max()) def point_deviation(self, gaze_position) -> float: """Get distance of a point from the fixation's centroïd.""" @@ -57,13 +55,11 @@ class Fixation(GazeFeatures.Fixation): def overlap(self, fixation) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" - points = fixation.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([self.focus[0], self.focus[1]]) - deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) + positions_array = numpy.asarray(self.positions.values()) + centroid = numpy.array(list(self.focus)) + deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) - return min(deviations_array) <= self.deviation_max + return deviations_array.min() <= self.deviation_max def merge(self, fixation) -> FixationType: """Merge another fixation into this fixation.""" @@ -114,8 +110,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - _, start_position = self.positions.first - _, last_position = self.positions.last + _, start_position = self.positions[0] + _, last_position = self.positions[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @@ -148,14 +144,14 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType: # Ignore non valid gaze position - if not gaze_position.valid: + if not gaze_position: return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish() # Check if too much time elapsed since last valid gaze position if len(self.__valid_positions) > 0: - ts_last, _ = self.__valid_positions.last + ts_last, _ = self.__valid_positions[-1] if (ts - ts_last) > self.duration_min_threshold: @@ -168,16 +164,16 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Store valid gaze position - self.__valid_positions[ts] = gaze_position + self.__valid_positions.append(gaze_position) # Return last valid movement if exist return last_movement # Store gaze positions until a minimal duration - self.__valid_positions[ts] = gaze_position + self.__valid_positions.append(gaze_position) - first_ts, _ = self.__valid_positions.first - last_ts, _ = self.__valid_positions.last + first_ts, _ = self.__valid_positions[0] + last_ts, _ = self.__valid_positions[-1] # Once the minimal duration is reached if last_ts - first_ts >= self.duration_min_threshold: @@ -194,8 +190,8 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): if len(self.__saccade_positions) > 0: # Copy oldest valid position into saccade positions - first_ts, first_position = self.__valid_positions.first - self.__saccade_positions[first_ts] = first_position + first_ts, first_position = self.__valid_positions[0] + self.__saccade_positions.append(first_position) # Finish last saccade last_saccade = self.current_saccade.finish() @@ -218,8 +214,8 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): if len(self.__fixation_positions) > 0: # Copy most recent fixation position into saccade positions - last_ts, last_position = self.__fixation_positions.last - self.__saccade_positions[last_ts] = last_position + last_ts, last_position = self.__fixation_positions[-1] + self.__saccade_positions.append(last_position) # Finish last fixation last_fixation = self.current_fixation.finish() @@ -238,7 +234,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Move oldest valid position into saccade positions first_ts, first_position = self.__valid_positions.pop_first() - self.__saccade_positions[first_ts] = first_position + self.__saccade_positions.append(first_position) # Always return unvalid gaze movement at least return GazeFeatures.UnvalidGazeMovement() diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 1c27540..54784ac 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -17,6 +17,7 @@ from inspect import getmembers from argaze import DataFeatures from argaze.AreaOfInterest import AOIFeatures +from argaze.utils import UtilsFeatures # DEBUG import numpy import pandas @@ -161,7 +162,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True): """Draw gaze position point and precision circle.""" - if self.valid: + if self: int_value = (int(self[0]), int(self[1])) @@ -170,7 +171,7 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): cv2.circle(image, int_value, size, color, -1) # Draw precision circle - if self.__precision > 0 and draw_precision: + if self.__precision is not None and draw_precision: cv2.circle(image, int_value, round(self.__precision), color, 1) TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeStampedGazePositions") @@ -179,9 +180,14 @@ TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeSt class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList): """Handle timestamped gaze positions into a list""" + #@UtilsFeatures.PrintCallStack def __init__(self, gaze_positions: list = []): - super().__init__(GazePosition, gaze_positions) + DataFeatures.TimeStampedObjectsList.__init__(self, GazePosition, gaze_positions) + + def values(self) -> list: + """Get all timestamped position values as list of tuple.""" + return [tuple(ts_position) for ts_position in self] @classmethod def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType: @@ -206,29 +212,24 @@ class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList): """ # Copy columns - if precision: + columns = (timestamp, x, y) - df = dataframe.loc[:, (timestamp, x, y, precision)] + if precision is not None: - else: + columns += (precision,) + + if message is not None: - df = dataframe.loc[:, (timestamp, x, y)] + columns += (message,) - # DEBUG - print(df) + df = dataframe.loc[:, columns] # Merge x and y columns into one 'value' column df['value'] = tuple(zip(df[x], df[y])) df.drop(columns=[x, y], inplace=True, axis=1) - # DEBUG - print(df) - - # Replace (NaN, NaN) values by () - df['value'] = df.apply(lambda row: print(row.values[1], pandas.isnull(row.values[1])), axis=True) - - # DEBUG - print(df) + # Replace tuple values containing NaN values by () + df['value'] = df.apply(lambda row: () if pandas.isnull(list(row.value)).any() else row.value, axis=True) # Handle precision data if precision: @@ -358,58 +359,85 @@ class GazePositionCalibrator(): GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement") # Type definition for type annotation convenience -@dataclass(frozen=True) -class GazeMovement(DataFeatures.TimestampedObject): - """Define abstract gaze movement class as a buffer of timestamped positions.""" +class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): + """Define abstract gaze movement class as timestamped gaze positions list. - positions: TimeStampedGazePositions - """All timestamp gaze positions.""" + Parameters: + positions: timestamp gaze positions. + finished: is the movement finished? + message: a string to describe why the movement is what it is. + """ - duration: float = field(init=False) - """Inferred duration from first and last timestamps.""" + def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): - amplitude: float = field(init=False) - """Inferred amplitude from first and last positions.""" + return TimeStampedGazePositions.__new__(cls, positions) - finished: bool = field(init=False, default=False) - """Is the movement finished?""" + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + """Initialize GazeMovement""" - def __post_init__(self): + TimeStampedGazePositions.__init__(self, positions) + DataFeatures.TimestampedObject.__init__(self, **kwargs) - if self.valid: + self.__finished = finished + self.__message = message - start_position_ts, start_position = self.positions.first - end_position_ts, end_position = self.positions.last + @property + def timestamp(self) -> int|float: + """Get first position timestamp.""" + return self[0].timestamp - # Update frozen duration attribute - object.__setattr__(self, 'duration', end_position_ts - start_position_ts) + @timestamp.setter + def timestamp(self, timestamp: int|float): + """Block gaze movment timestamp setting.""" + raise('GazeMovement timestamp is first positon timestamp.') - _, start_position = self.positions.first - _, end_position = self.positions.last + @property + def finished(self) -> bool: + """Is the movement finished?""" + return self.__finished + + def finish(self) -> GazeMovementType: + """Set gaze movement as finished""" + self.__finished = True + return self + + @property + def message(self): + """Get movement's message.""" + return self.__message - amplitude = numpy.linalg.norm(start_position - end_position) + @property + def duration(self): + """Get inferred duration from first and last timestamps.""" + if self: - # Update frozen amplitude attribute - object.__setattr__(self, 'amplitude', amplitude) + return self[-1].timestamp - self[0].timestamp else: - # Update frozen duration attribute - object.__setattr__(self, 'duration', -1) + return 0 + + @property + def amplitude(self): + """Get inferred amplitude from first and last positions.""" + if self: - # Update frozen amplitude attribute - object.__setattr__(self, 'amplitude', -1) + return numpy.linalg.norm(self[0] - self[-1]) + + else: + + return 0 def __str__(self) -> str: """String display""" - if self.valid: + if self: - output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self.positions)}\n\tfinished={self.finished}' + output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.finished}' - for ts, position in self.positions.items(): + for position in self: - output += f'\n\t{ts}:\n\t\tvalue={position},\n\t\tprecision={position.precision}' + output += f'\n\t{position.timestamp}:\n\t\tvalue={position},\n\t\tprecision={position.precision}' else: @@ -417,20 +445,6 @@ class GazeMovement(DataFeatures.TimestampedObject): return output - @property - def valid(self) -> bool: - """Is there positions?""" - - return len(self.positions) > 0 - - def finish(self) -> GazeMovementType: - """Set gaze movement as finished""" - - # Update frozen finished attribute - object.__setattr__(self, 'finished', True) - - return self - def draw_positions(self, image: numpy.array, position_color: tuple = None, line_color: tuple = None): """Draw gaze movement positions with line between each position. @@ -439,12 +453,12 @@ class GazeMovement(DataFeatures.TimestampedObject): line_color: color of line between each position """ - gaze_positions = self.positions.copy() + positions = self.copy() - while len(gaze_positions) >= 2: + while len(positions) >= 2: - ts_start, start_gaze_position = gaze_positions.pop_first() - ts_next, next_gaze_position = gaze_positions.first + start_gaze_position = positions.pop(0) + next_gaze_position = positions[0] # Draw line between positions if required if line_color is not None: @@ -461,31 +475,27 @@ class GazeMovement(DataFeatures.TimestampedObject): raise NotImplementedError('draw() method not implemented') -class UnvalidGazeMovement(GazeMovement): - """Unvalid gaze movement.""" - - def __init__(self, message=None): - - self.message = message - - super().__init__(TimeStampedGazePositions()) - - def draw(self, image: numpy.array, **kwargs): - - pass - FixationType = TypeVar('Fixation', bound="Fixation") # Type definition for type annotation convenience class Fixation(GazeMovement): """Define abstract fixation as gaze movement.""" - focus: tuple = field(init=False) - """Representative position of the fixation.""" + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): - def __post_init__(self): + super().__init__(positions, finished, message, **kwargs) + + self._focus = () - super().__post_init__() + @property + def focus(self) -> tuple: + """Get representative position of the fixation.""" + return self._focus + + @focus.setter + def focus(self, focus: tuple): + """Set representative position of the fixation.""" + self._focus = focus def merge(self, fixation) -> FixationType: """Merge another fixation into this fixation.""" @@ -500,9 +510,9 @@ def is_fixation(gaze_movement): class Saccade(GazeMovement): """Define abstract saccade as gaze movement.""" - def __post_init__(self): + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): - super().__post_init__() + super().__init__(positions, finished, message, **kwargs) def is_saccade(gaze_movement): """Is a gaze movement a saccade?""" -- cgit v1.1 From 96007cbe6a42d26c4dece35ad7ecee2ddd8bdade Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 28 Feb 2024 22:30:14 +0100 Subject: Making all GazeFeatures test working again. --- .../advanced_topics/scripting.md | 2 +- src/argaze.test/GazeFeatures.py | 46 ++---- src/argaze/ArFeatures.py | 14 +- src/argaze/DataFeatures.py | 69 ++++---- .../DispersionThresholdIdentification.py | 14 +- .../VelocityThresholdIdentification.py | 16 +- src/argaze/GazeFeatures.py | 174 ++++++++++----------- 7 files changed, 156 insertions(+), 179 deletions(-) diff --git a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md index 7952e9f..8c21dec 100644 --- a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md +++ b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md @@ -125,7 +125,7 @@ This is the last calibrated [GazePosition](../../../argaze.md/#argaze.GazeFeatur ### *ar_frame.last_gaze_movement* -Last [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement) identified by [ArFrame.gaze_movement_identifier](../../../argaze.md/#argaze.ArFeatures.ArFrame) object from incoming consecutive timestamped gaze positions. If no gaze movement have been identified, it returns an [UnvalidGazeMovement](../../../argaze.md/#argaze.GazeFeatures.UnvalidGazeMovement). +Last [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement) identified by [ArFrame.gaze_movement_identifier](../../../argaze.md/#argaze.ArFeatures.ArFrame) object from incoming consecutive timestamped gaze positions. If no gaze movement have been identified, it returns an empty [GazeMovement](../../../argaze.md/#argaze.GazeFeatures.GazeMovement). This could also be the current gaze movement if [ArFrame.filter_in_progress_identification](../../../argaze.md/#argaze.ArFeatures.ArFrame) attribute is false. In that case, the last gaze movement *finished* flag is false. diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index 7d18976..c6ccfca 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -494,7 +494,7 @@ class TestAOIScanStepClass(unittest.TestCase): self.assertEqual(aoi_scan_step.first_fixation, fixation) self.assertEqual(aoi_scan_step.last_saccade, saccade) self.assertGreater(aoi_scan_step.duration, 0) - @unittest.skip("DEBUG") + def test_error(self): """Test AOIScanStep creation error.""" @@ -524,18 +524,16 @@ def build_aoi_scan_path(expected_aoi, aoi_path): for aoi in aoi_path: fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.first - aoi_scan_path.append_fixation(ts, fixation, aoi) + aoi_scan_path.append_fixation(fixation, aoi) saccade = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade.first - aoi_scan_path.append_saccade(ts, saccade) + aoi_scan_path.append_saccade(saccade) return aoi_scan_path class TestAOIScanPathClass(unittest.TestCase): """Test AOIScanPath class.""" - @unittest.skip("DEBUG") + def test_new(self): """Test AOIScanPath creation.""" @@ -543,7 +541,7 @@ class TestAOIScanPathClass(unittest.TestCase): aoi_scan_path = GazeFeatures.AOIScanPath() self.assertEqual(len(aoi_scan_path), 0) - @unittest.skip("DEBUG") + def test_append(self): """Test AOIScanPath append methods.""" @@ -551,9 +549,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on A aoi fixation_A = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_A.first - - new_step = aoi_scan_path.append_fixation(ts, fixation_A, 'Foo') + new_step = aoi_scan_path.append_fixation(fixation_A, 'Foo') # Check that no aoi scan step have been created yet self.assertEqual(len(aoi_scan_path), 0) @@ -562,9 +558,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append saccade saccade_A = TestSaccade(random_gaze_positions(2)) - ts, _ = saccade_A.first - - new_step = aoi_scan_path.append_saccade(ts, saccade_A) + new_step = aoi_scan_path.append_saccade(saccade_A) # Check that no aoi scan step have been created yet self.assertEqual(len(aoi_scan_path), 0) @@ -573,9 +567,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on B aoi fixation_B = TestFixation(random_gaze_positions(10)) - ts, _ = fixation_B.first - - new_step_A = aoi_scan_path.append_fixation(ts, fixation_B, 'Bar') + new_step_A = aoi_scan_path.append_fixation(fixation_B, 'Bar') # Check a first aoi scan step have been created once a new fixation is appened self.assertEqual(len(aoi_scan_path), 1) @@ -584,14 +576,11 @@ class TestAOIScanPathClass(unittest.TestCase): self.assertEqual(new_step_A.aoi, 'Foo') self.assertEqual(new_step_A.letter, 'A') - first_ts, _ = fixation_A.first - last_ts, _ = saccade_A.last - - self.assertEqual(new_step_A.duration, last_ts - first_ts) + self.assertEqual(new_step_A.duration, saccade_A[-1].timestamp - fixation_A[0].timestamp) # Check letter affectation self.assertEqual(aoi_scan_path.get_letter_aoi('A'), 'Foo') - @unittest.skip("DEBUG") + def test_append_error(self): """Test AOIScanPath append error.""" @@ -599,9 +588,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on A aoi fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.first - - new_step = aoi_scan_path.append_fixation(ts, fixation, 'Foo') + new_step = aoi_scan_path.append_fixation(fixation, 'Foo') # Check that no aoi scan step have been created yet self.assertEqual(len(aoi_scan_path), 0) @@ -609,18 +596,17 @@ class TestAOIScanPathClass(unittest.TestCase): # Append fixation on B aoi fixation = TestFixation(random_gaze_positions(10)) - ts, _ = fixation.first # Check that aoi scan step creation fail when fixation is appened after another fixation with self.assertRaises(GazeFeatures.AOIScanStepError): - new_step = aoi_scan_path.append_fixation(ts, fixation, 'Bar') + new_step = aoi_scan_path.append_fixation(fixation, 'Bar') # Check that unexpected aoi scan step creation fail with self.assertRaises(GazeFeatures.AOIScanStepError): - new_step = aoi_scan_path.append_fixation(ts, fixation, 'Shu') - @unittest.skip("DEBUG") + new_step = aoi_scan_path.append_fixation(fixation, 'Shu') + def test_letter_index_and_string_reprentation(self): """Test AOIScanPath letter index and string representation feature.""" @@ -649,7 +635,7 @@ class TestAOIScanPathClass(unittest.TestCase): # Check letter sequence representation self.assertEqual(aoi_scan_path.letter_sequence, 'ABCA') - @unittest.skip("DEBUG") + def test_transition_matrix(self): """Test AOIScanPath transition matrix feature.""" @@ -668,7 +654,7 @@ class TestAOIScanPathClass(unittest.TestCase): self.assertEqual(aoi_scan_path.transition_matrix['Shu']['Foo'], 0) self.assertEqual(aoi_scan_path.transition_matrix['Shu']['Bar'], 1) - @unittest.skip("DEBUG") + def test_transition_matrix(self): """Test AOIScanPath fixations count feature.""" diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 47a91e9..b3ecad6 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -123,7 +123,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__aoi_scan_path_analyzers = aoi_scan_path_analyzers self.__draw_parameters = draw_parameters - self.__gaze_movement = GazeFeatures.UnvalidGazeMovement() + self.__gaze_movement = GazeFeatures.GazeMovement() self.__looked_aoi_name = None self.__aoi_scan_path_analyzed = False @@ -423,7 +423,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # TODO: add an option to filter None looked_aoi_name or not if self.__aoi_scan_path is not None: - aoi_scan_step = self.__aoi_scan_path.append_fixation(timestamp, gaze_movement, self.__looked_aoi_name) + aoi_scan_step = self.__aoi_scan_path.append_fixation(gaze_movement, self.__looked_aoi_name) # Is there a new step? if aoi_scan_step is not None and len(self.__aoi_scan_path) > 1: @@ -441,7 +441,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Append saccade to aoi scan path if self.__aoi_scan_path is not None: - self.__aoi_scan_path.append_saccade(timestamp, gaze_movement) + self.__aoi_scan_path.append_saccade(gaze_movement) def draw(self, image: numpy.array, draw_aoi_scene: dict = None, draw_aoi_matching: dict = None): """ @@ -532,7 +532,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__image_parameters = image_parameters self.__calibrated_gaze_position = GazeFeatures.GazePosition() - self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + self.__identified_gaze_movement = GazeFeatures.GazeMovement() self.__scan_path_analyzed = False # Edit pipeline step objects parent @@ -891,7 +891,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): with self._lock: # No gaze movement identified by default - self.__identified_gaze_movement = GazeFeatures.UnvalidGazeMovement() + self.__identified_gaze_movement = GazeFeatures.GazeMovement() # Reset scan path analyzed state self.__scan_path_analyzed = False @@ -920,14 +920,14 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Append fixation to scan path if self.__scan_path is not None: - self.__scan_path.append_fixation(timestamp, self.__identified_gaze_movement) + self.__scan_path.append_fixation(self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): # Append saccade to scan path if self.__scan_path is not None: - scan_step = self.__scan_path.append_saccade(timestamp, self.__identified_gaze_movement) + scan_step = self.__scan_path.append_saccade(self.__identified_gaze_movement) # Is there a new step? if scan_step and len(self.__scan_path) > 1: diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 7c53c2a..ce3ce52 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -156,12 +156,25 @@ class TimeStampedObjectsList(list): # Check object type if type(ts_object) != self.__object_type: - raise TypeError(f'object type have to be {self.__object_type} not {type(ts_object)}') + if not issubclass(ts_object.__class__, self.__object_type): + + raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') assert(ts_object.is_timestamped()) super().append(ts_object) + @property + def duration(self): + """Get inferred duration from first and last timestamps.""" + if self: + + return self[-1].timestamp - self[0].timestamp + + else: + + return 0 + def timestamps(self): """Get all timestamps in list.""" return [ts_object.timestamp for ts_object in self] @@ -178,76 +191,66 @@ class TimeStampedObjectsList(list): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) - def pop_last_until(self, ts: TimeStampType) -> TimeStampedObjectType: + def pop_last_until(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp - earliest_ts, earliest_value = self.get_last_until(ts) + earliest_value = self.get_last_until(timestamp) - first_ts, first_value = self.first + while self[0].timestamp < earliest_value.timestamp: - while first_ts < earliest_ts: - self.pop_first() - first_ts, first_value = self.first + self.pop(0) - return first_ts, first_value + return self[0] - def pop_last_before(self, ts: TimeStampType) -> TimeStampedObjectType: + def pop_last_before(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp - earliest_ts, earliest_value = self.get_last_before(ts) + earliest_value = self.get_last_before(timestamp) + + poped_value = self.pop(0) - popep_ts, poped_value = self.pop_first() + while poped_value.timestamp != earliest_value.timestamp: - while popep_ts != earliest_ts: - popep_ts, poped_value = self.pop_first() + poped_value = self.pop(0) - return popep_ts, poped_value + return poped_value - def get_first_from(self, ts) -> TimeStampedObjectType: + def get_first_from(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Retreive first item timestamp from a given timestamp value.""" - ts_list = list(self.keys()) - first_from_index = bisect.bisect_left(ts_list, ts) + first_from_index = bisect.bisect_left(self.timestamps(), timestamp) if first_from_index < len(self): - first_from_ts = ts_list[first_from_index] - - return first_from_ts, self[first_from_ts] + return self[ts_list[first_from_index]] else: - raise KeyError(f'No data stored after {ts} timestamp.') + raise KeyError(f'No data stored after {timestamp} timestamp.') - def get_last_before(self, ts) -> TimeStampedObjectType: + def get_last_before(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Retreive last item timestamp before a given timestamp value.""" - ts_list = list(self.keys()) - last_before_index = bisect.bisect_left(ts_list, ts) - 1 + last_before_index = bisect.bisect_left(self.timestamps(), timestamp) - 1 if last_before_index >= 0: - last_before_ts = ts_list[last_before_index] - - return last_before_ts, self[last_before_ts] + return self[ts_list[last_before_index]] else: raise KeyError(f'No data stored before {ts} timestamp.') - def get_last_until(self, ts) -> TimeStampedObjectType: + def get_last_until(self, timestamp: TimeStampType) -> TimeStampedObjectType: """Retreive last item timestamp until a given timestamp value.""" - ts_list = list(self.keys()) - last_until_index = bisect.bisect_right(ts_list, ts) - 1 + last_until_index = bisect.bisect_right(self.timestamps(), timestamp) - 1 if last_until_index >= 0: - last_until_ts = ts_list[last_until_index] - - return last_until_ts, self[last_until_ts] + return self[ts_list[last_until_index]] else: diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index f8e519f..f928c5a 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -146,7 +146,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Ignore non valid gaze position if not gaze_position: - return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish() + return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() # Check if too much time elapsed since last valid gaze position if len(self.__valid_positions) > 0: @@ -184,7 +184,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Valid gaze positions deviation small enough if deviation <= self.deviation_max_threshold: - last_saccade = GazeFeatures.UnvalidGazeMovement() + last_saccade = GazeFeatures.GazeMovement() # Is there saccade positions? if len(self.__saccade_positions) > 0: @@ -208,7 +208,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Valid gaze positions deviation too wide else: - last_fixation = GazeFeatures.UnvalidGazeMovement() + last_fixation = GazeFeatures.GazeMovement() # Is there fixation positions? if len(self.__fixation_positions) > 0: @@ -237,7 +237,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__saccade_positions.append(first_position) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_gaze_movement(self) -> GazeMovementType: @@ -254,7 +254,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_fixation(self) -> FixationType: @@ -263,7 +263,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Fixation(self.__fixation_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_saccade(self) -> SaccadeType: @@ -273,4 +273,4 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index d246db4..971ba9b 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -150,7 +150,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Ignore non valid gaze position if not gaze_position.valid: - return GazeFeatures.UnvalidGazeMovement() if not terminate else self.current_fixation.finish() + return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() # Store first valid position if self.__last_ts < 0: @@ -158,7 +158,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__last_ts = ts self.__last_position = gaze_position - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() # Check if too much time elapsed since last gaze position if (ts - self.__last_ts) > self.duration_min_threshold: @@ -187,7 +187,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Velocity is greater than threshold if velocity > self.velocity_max_threshold: - last_fixation = GazeFeatures.UnvalidGazeMovement() + last_fixation = GazeFeatures.GazeMovement() # Does last fixation exist? if len(self.__fixation_positions) > 0: @@ -211,7 +211,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Velocity is less or equals to threshold else: - last_saccade = GazeFeatures.UnvalidGazeMovement() + last_saccade = GazeFeatures.GazeMovement() # Does last saccade exist? if len(self.__saccade_positions) > 0: @@ -233,7 +233,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return last_saccade if not terminate else self.current_fixation.finish() # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_gaze_movement(self) -> GazeMovementType: @@ -250,7 +250,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_fixation(self) -> FixationType: @@ -260,7 +260,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Fixation(self.__fixation_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() @property def current_saccade(self) -> SaccadeType: @@ -270,4 +270,4 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return Saccade(self.__saccade_positions) # Always return unvalid gaze movement at least - return GazeFeatures.UnvalidGazeMovement() + return GazeFeatures.GazeMovement() diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 54784ac..2f83703 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -180,7 +180,6 @@ TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeSt class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList): """Handle timestamped gaze positions into a list""" - #@UtilsFeatures.PrintCallStack def __init__(self, gaze_positions: list = []): DataFeatures.TimeStampedObjectsList.__init__(self, GazePosition, gaze_positions) @@ -407,17 +406,6 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): return self.__message @property - def duration(self): - """Get inferred duration from first and last timestamps.""" - if self: - - return self[-1].timestamp - self[0].timestamp - - else: - - return 0 - - @property def amplitude(self): """Get inferred amplitude from first and last positions.""" if self: @@ -525,9 +513,9 @@ TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeSt class TimeStampedGazeMovements(DataFeatures.TimeStampedObjectsList): """Handle timestamped gaze movements into a list""" - def __init__(self): + def __init__(self, gaze_movements: list = []): - super().__init__(GazeMovement) + DataFeatures.TimeStampedObjectsList.__init__(self, GazeMovement, gaze_movements) GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus") # Type definition for type annotation convenience @@ -619,62 +607,58 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): ts_status = TimeStampedGazeStatus() # Get last ts to terminate identification on last gaze position - last_ts, _ = ts_gaze_positions.last + last_ts = ts_gaze_positions[-1].timestamp # Iterate on gaze positions - for ts, gaze_position in ts_gaze_positions.items(): + for gaze_position in ts_gaze_positions: - finished_gaze_movement = self.identify(ts, gaze_position, terminate=(ts == last_ts)) + finished_gaze_movement = self.identify(ts, gaze_position, terminate=(gaze_position.timestamp == last_ts)) if is_fixation(finished_gaze_movement): - start_ts, start_position = finished_gaze_movement.positions.first - - ts_fixations[start_ts] = finished_gaze_movement + ts_fixations.append(finished_gaze_movement) # First gaze movement position is always shared with previous gaze movement - for ts, position in finished_gaze_movement.positions.items(): + for movement_position in finished_gaze_movement: - gaze_status = GazeStatus.from_position(position, 'Fixation', len(ts_fixations)) + gaze_status = GazeStatus.from_position(movement_position, 'Fixation', len(ts_fixations)) - if ts != start_ts: + if movement_position.timestamp != finished_gaze_movement.timestamp: - ts_status[ts] = [gaze_status] + ts_status.append([gaze_status]) else: try: - ts_status[start_ts].append(gaze_status) + ts_status[finished_gaze_movement.timestamp].append(gaze_status) except KeyError: - ts_status[start_ts] = [gaze_status] + ts_status[finished_gaze_movement.timestamp] = [gaze_status] elif is_saccade(finished_gaze_movement): - start_ts, start_position = finished_gaze_movement.positions.first - - ts_saccades[start_ts] = finished_gaze_movement + ts_saccades.append(finished_gaze_movement) # First gaze movement position is always shared with previous gaze movement - for ts, position in finished_gaze_movement.positions.items(): + for movement_position in finished_gaze_movement: gaze_status = GazeStatus.from_position(position, 'Saccade', len(ts_saccades)) - if ts != start_ts: + if movement_position.timestamp != finished_gaze_movement.timestamp: - ts_status[ts] = [gaze_status] + ts_status.append([gaze_status]) else: try: - ts_status[start_ts].append(gaze_status) + ts_status[finished_gaze_movement.timestamp].append(gaze_status) except KeyError: - ts_status[start_ts] = [gaze_status] + ts_status[finished_gaze_movement.timestamp] = [gaze_status] else: @@ -696,18 +680,16 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): assert(type(ts_gaze_positions) == TimeStampedGazePositions) # Get last ts to terminate identification on last gaze position - last_ts, _ = ts_gaze_positions.last + last_ts = ts_gaze_positions[-1] # Iterate on gaze positions - for ts, gaze_position in ts_gaze_positions.items(): - - finished_gaze_movement = self.identify(ts, gaze_position, terminate=(ts == last_ts)) + for gaze_position in ts_gaze_positions: - if finished_gaze_movement.valid: + finished_gaze_movement = self.identify(ts, gaze_position, terminate=(gaze_position.timestamp == last_ts)) - start_ts, start_position = finished_gaze_movement.positions.first + if finished_gaze_movement: - yield start_ts, finished_gaze_movement + yield finished_gaze_movement ScanStepType = TypeVar('ScanStep', bound="ScanStep") # Type definition for type annotation convenience @@ -719,34 +701,43 @@ class ScanStepError(Exception): super().__init__(message) -@dataclass(frozen=True) class ScanStep(): """Define a scan step as a fixation and a consecutive saccade. + + Parameters: + first_fixation: a fixation that comes before the next saccade. + last_saccade: a saccade that comes after the previous fixation. !!! warning - Scan step have to start by a fixation and then end by a saccade. """ - first_fixation: Fixation - """A fixation that comes before the next saccade.""" - - last_saccade: Saccade - """A saccade that comes after the previous fixation.""" + def __init__(self, first_fixation: Fixation, last_saccade: Saccade): - def __post_init__(self): + self.__first_fixation = first_fixation + self.__last_saccade = last_saccade # First movement have to be a fixation - if not is_fixation(self.first_fixation): + if not is_fixation(self.__first_fixation): raise ScanStepError('First step movement is not a fixation') # Last movement have to be a saccade - if not is_saccade(self.last_saccade): + if not is_saccade(self.__last_saccade): raise ScanStepError('Last step movement is not a saccade') @property + def first_fixation(self): + """Get scan step first fixation.""" + return self.__first_fixation + + @property + def last_saccade(self): + """Get scan step last saccade.""" + return self.__last_saccade + + @property def fixation_duration(self) -> int|float: """Time spent on AOI @@ -754,7 +745,7 @@ class ScanStep(): fixation duration """ - return self.first_fixation.duration + return self.__first_fixation.duration @property def duration(self) -> int|float: @@ -764,7 +755,7 @@ class ScanStep(): duration """ - return self.first_fixation.duration + self.last_saccade.duration + return self.__first_fixation.duration + self.__last_saccade.duration ScanPathType = TypeVar('ScanPathType', bound="ScanPathType") # Type definition for type annotation convenience @@ -937,24 +928,23 @@ class AOIScanStepError(Exception): self.aoi = aoi -@dataclass(frozen=True) class AOIScanStep(): """Define an aoi scan step as a set of successive gaze movements onto a same AOI. - !!! warning - - Aoi scan step have to start by a fixation and then end by a saccade.""" - - movements: TimeStampedGazeMovements - """All movements over an AOI and the last saccade that comes out.""" + Parameters: + movements: all movements over an AOI and the last saccade that comes out. + aoi: AOI name + letter: AOI unique letter to ease sequence analysis. - aoi: str = field(default='') - """AOI name.""" + !!! warning + Aoi scan step have to start by a fixation and then end by a saccade. + """ - letter: str = field(default='') - """AOI unique letter to ease sequence analysis.""" + def __init__(self, movements: TimeStampedGazeMovements, aoi: str = '', letter: str = ''): - def __post_init__(self): + self.__movements = movements + self.__aoi = aoi + self.__letter = letter # First movement have to be a fixation if not is_fixation(self.first_fixation): @@ -967,18 +957,29 @@ class AOIScanStep(): raise AOIScanStepError('Last step movement is not a saccade', self.aoi) @property + def movements(self): + """Get AOI scan step movements.""" + return self.__movements + + @property + def aoi(self): + """Get AOI scan step aoi.""" + return self.__aoi + + @property + def letter(self): + """Get AOI scan step letter.""" + return self.__letter + + @property def first_fixation(self): """First fixation on AOI.""" - - _, first_movement = self.movements.first - return first_movement + return self.movements[0] @property def last_saccade(self): """Last saccade that comes out AOI.""" - - _, last_movement = self.movements.last - return last_movement + return self.movements[-1] @property def fixation_duration(self) -> int|float: @@ -987,14 +988,7 @@ class AOIScanStep(): Returns: fixation duration """ - - # Timestamp of first position of first fixation - first_ts, _ = self.first_fixation.positions.first - - # Timestamp of first position of last saccade - last_ts, _ = self.last_saccade.positions.first - - return last_ts - first_ts + return self.last_saccade[0].timestamp - self.first_fixation[0].timestamp @property def duration(self) -> int|float: @@ -1003,14 +997,7 @@ class AOIScanStep(): Returns: duration """ - - # Timestamp of first position of first fixation - first_ts, _ = self.first_fixation.positions.first - - # Timestamp of last position of last saccade - last_ts, _ = self.last_saccade.positions.last - - return last_ts - first_ts + return self.last_saccade[-1].timestamp - self.first_fixation[0].timestamp AOIScanPathType = TypeVar('AOIScanPathType', bound="AOIScanPathType") # Type definition for type annotation convenience @@ -1130,19 +1117,20 @@ class AOIScanPath(list): return self.__transition_matrix - def append_saccade(self, ts, saccade): + def append_saccade(self, saccade): """Append new saccade to aoi scan path.""" # Ignore saccade if no fixation have been stored before if len(self.__movements) > 0: - self.__movements[ts] = saccade + self.__movements.append(saccade) - def append_fixation(self, ts, fixation, looked_aoi: str) -> bool: + def append_fixation(self, fixation, looked_aoi: str) -> bool: """Append new fixation to aoi scan path and return last new aoi scan step if one have been created. !!! warning - It could raise AOIScanStepError""" + It could raise AOIScanStepError + """ # Replace None aoi by generic OutsideAOI name if looked_aoi is None: @@ -1192,14 +1180,14 @@ class AOIScanPath(list): self.__movements = TimeStampedGazeMovements() # Append new fixation - self.__movements[ts] = fixation + self.__movements.append(fixation) # Remember new aoi self.__current_aoi = looked_aoi else: # Append new fixation - self.__movements[ts] = fixation + self.__movements.append(fixation) # Remember aoi self.__current_aoi = looked_aoi -- cgit v1.1 From 2b4811601c7eb1debec4c25fb0b0896aa15f9596 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Wed, 28 Feb 2024 23:23:38 +0100 Subject: More work on TimestampedObject and TimestampedObjectsList. --- src/argaze.test/DataFeatures.py | 278 ++++++++++++++++++---------------------- src/argaze/DataFeatures.py | 222 ++++++++++++++++---------------- src/argaze/GazeFeatures.py | 10 +- 3 files changed, 244 insertions(+), 266 deletions(-) diff --git a/src/argaze.test/DataFeatures.py b/src/argaze.test/DataFeatures.py index b30c560..4d9c909 100644 --- a/src/argaze.test/DataFeatures.py +++ b/src/argaze.test/DataFeatures.py @@ -16,154 +16,181 @@ from argaze import DataFeatures import pandas import numpy -def random_data_buffer(size, data_keys): - """ Generate a random TimeStampedBuffer for testing purpose. +class BasicDataClass(DataFeatures.TimestampedObject): + """Define a basic dataclass for testing purpose.""" + + def __init__(self, value: tuple = (), **kwargs): + + DataFeatures.TimestampedObject.__init__(self, **kwargs) + + self.__value = value + + @property + def value(self): + return self.__value + +def random_data_list(size): + """ Generate a random TimestampedObjectsList for testing purpose. Timestamps are current time. Values are tuples containing an expected value and a random value. """ - import random import time - ts_buffer = DataFeatures.TimeStampedBuffer() + data_list = [] for i in range(0, size): # Edit data - random_data = {} - for key in data_keys: - random_data[key] = (i, random.random()) + random_data = BasicDataClass((i, random.random())) + + # Timestamp data + random_data.timestamp = time.time() # Store data - ts_buffer[time.time()] = random_data + data_list.append(random_data) time.sleep(0.0001) - return ts_buffer + return DataFeatures.TimestampedObjectsList(BasicDataClass, data_list) -@dataclass() -class BasicDataClass(): - """Define a basic dataclass for testing purpose.""" - - value: tuple - -class TestTimeStampedBufferClass(unittest.TestCase): - """Test TimeStampedBuffer class.""" +class TestTimestampedObjectsListClass(unittest.TestCase): + """Test TimestampedObjectsList class.""" def test_new(self): - """Test TimeStampedBuffer creation.""" + """Test TimestampedObjectsList creation.""" - # Check TimeStampedBuffer length after creation - self.assertEqual(len(DataFeatures.TimeStampedBuffer()), 0) - self.assertEqual(len(DataFeatures.TimeStampedBuffer({0: ""})), 1) - self.assertEqual(len(DataFeatures.TimeStampedBuffer({0.1: ""})), 1) - self.assertEqual(len(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"})), 2) - - # Check TimeStampedBuffer keys after creation - self.assertEqual(list(DataFeatures.TimeStampedBuffer().keys()), []) - self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: ""}).keys()), [0]) - self.assertEqual(list(DataFeatures.TimeStampedBuffer({0.1: ""}).keys()), [0.1]) - self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).keys()), [0, 1]) - - # Check TimeStampedBuffer items after creation - self.assertEqual(list(DataFeatures.TimeStampedBuffer().items()), []) - self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: ""}).items()), [(0, "")]) - self.assertEqual(list(DataFeatures.TimeStampedBuffer({0.1: ""}).items()), [(0.1, "")]) - self.assertEqual(list(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).items()), [(0, "A"), (1, "B")]) - - # Check that TimeStampedBuffer creation fails when keys are not numbers + # Check TimestampedObjectsList length after creation + self.assertEqual(len(DataFeatures.TimestampedObjectsList(BasicDataClass)), 0) + + # Check TimestampedObjectsList timestamps after creation + self.assertEqual(DataFeatures.TimestampedObjectsList(BasicDataClass).timestamps(), []) + + # Check TimestampedObjectsList items after creation + self.assertEqual(DataFeatures.TimestampedObjectsList(BasicDataClass), []) + + # Check that TimestampedObjectsList creation fails when data are not timestamped with self.assertRaises(AssertionError): - DataFeatures.TimeStampedBuffer({"first": ""}) + data_list = [BasicDataClass((0, 0))] + DataFeatures.TimestampedObjectsList(BasicDataClass, data_list) + + def test_as_dataframe(self): + """Test TimestampedObjectsList as_dataframe method.""" + + data_frame = random_data_list(10).as_dataframe() + + # Check dataframe conversion + self.assertEqual(data_frame.index.name, "timestamp") + self.assertEqual(data_frame.index.size, 10) + self.assertEqual(data_frame.columns.size, 1) + self.assertEqual(data_frame.index.dtype, 'float64') + self.assertEqual(data_frame["value"].dtype, 'object') + + # Check data exclusion option + data_frame = random_data_list(10).as_dataframe(exclude=["value"]) + + self.assertEqual(data_frame.index.name, "timestamp") + self.assertEqual(data_frame.index.size, 10) + self.assertEqual(data_frame.columns.size, 0) + # Check dataframe split option + data_frame = random_data_list(10).as_dataframe(split={"value": ["value_0", "value_1"]}) + + self.assertEqual(data_frame.index.name, "timestamp") + self.assertEqual(data_frame.index.size, 10) + self.assertEqual(data_frame.columns.size, 2) + self.assertEqual(data_frame["value_0"].dtype, 'int64') + self.assertEqual(data_frame["value_1"].dtype, 'float64') + def test_from_dataframe(self): - """Test TimeStampedBuffer creation from pandas dataframe.""" + """Test TimestampedObjectsList creation from pandas dataframe.""" - ts_buffer = random_data_buffer(10, ["data_A", "data_B", "data_C"]) + data_frame = random_data_list(10).as_dataframe() # Check dataframe conversion - ts_buffer_from_df = DataFeatures.TimeStampedBuffer.from_dataframe(ts_buffer.as_dataframe()) - - self.assertEqual(len(ts_buffer_from_df), 10) + data_list = DataFeatures.TimestampedObjectsList.from_dataframe(BasicDataClass, data_frame) + self.assertEqual(len(data_list), 10) + @unittest.skip("DEBUG") def test_from_json(self): - """Test TimeStampedBuffer creation from json file.""" + """Test TimestampedObjectsList creation from json file.""" # Edit dataframe csv file path current_directory = os.path.dirname(os.path.abspath(__file__)) json_filepath = os.path.join(current_directory, 'utils/ts_buffer.json') - # Load TimeStampedBuffer from json file - ts_buffer = DataFeatures.TimeStampedBuffer.from_json(json_filepath) + # Load TimestampedObjectsList from json file + ts_buffer = DataFeatures.TimestampedObjectsList.from_json(json_filepath) self.assertEqual(len(ts_buffer), 3) - + @unittest.skip("DEBUG") def test___repr__(self): - """Test TimeStampedBuffer string representation.""" + """Test TimestampedObjectsList string representation.""" - self.assertEqual(repr(DataFeatures.TimeStampedBuffer()), "{}") - self.assertEqual(repr(DataFeatures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}") - self.assertEqual(repr(DataFeatures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}") + self.assertEqual(repr(DataFeatures.TimestampedObjectsList()), "{}") + self.assertEqual(repr(DataFeatures.TimestampedObjectsList({0: ""})), "{\"0\": \"\"}") + self.assertEqual(repr(DataFeatures.TimestampedObjectsList({0.1: ""})), "{\"0.1\": \"\"}") data = BasicDataClass((123, 456)) - ts_buffer = DataFeatures.TimeStampedBuffer({0: data}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: data}) self.assertEqual(repr(ts_buffer), "{\"0\": {\"value\": [123, 456]}}") array = numpy.zeros(3) - ts_buffer = DataFeatures.TimeStampedBuffer({0: array}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: array}) self.assertEqual(repr(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}") - + @unittest.skip("DEBUG") def test___str__(self): - """Test TimeStampedBuffer string representation.""" + """Test TimestampedObjectsList string representation.""" - self.assertEqual(str(DataFeatures.TimeStampedBuffer()), "{}") - self.assertEqual(str(DataFeatures.TimeStampedBuffer({0: ""})), "{\"0\": \"\"}") - self.assertEqual(str(DataFeatures.TimeStampedBuffer({0.1: ""})), "{\"0.1\": \"\"}") + self.assertEqual(str(DataFeatures.TimestampedObjectsList()), "{}") + self.assertEqual(str(DataFeatures.TimestampedObjectsList({0: ""})), "{\"0\": \"\"}") + self.assertEqual(str(DataFeatures.TimestampedObjectsList({0.1: ""})), "{\"0.1\": \"\"}") data = BasicDataClass((123, 456)) - ts_buffer = DataFeatures.TimeStampedBuffer({0: data}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: data}) self.assertEqual(str(ts_buffer), "{\"0\": {\"value\": [123, 456]}}") array = numpy.zeros(3) - ts_buffer = DataFeatures.TimeStampedBuffer({0: array}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: array}) self.assertEqual(str(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}") - + @unittest.skip("DEBUG") def test_append(self): - """Test TimeStampedBuffer append method.""" + """Test TimestampedObjectsList append method.""" - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}) - ts_buffer_next = DataFeatures.TimeStampedBuffer({2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}) + ts_buffer_next = DataFeatures.TimestampedObjectsList({2: "C", 3: "D"}) self.assertEqual(len(ts_buffer.append(ts_buffer_next)), 4) self.assertEqual(list(ts_buffer.append(ts_buffer_next).keys()), [0, 1, 2, 3]) - + @unittest.skip("DEBUG") def test_first(self): - """Test TimeStampedBuffer first property.""" + """Test TimestampedObjectsList first property.""" - self.assertEqual(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).first, (0, "A")) + self.assertEqual(DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}).first, (0, "A")) - # Check that accessing to first item of an empty TimeStampedBuffer fails + # Check that accessing to first item of an empty TimestampedObjectsList fails with self.assertRaises(IndexError): - DataFeatures.TimeStampedBuffer().first - + DataFeatures.TimestampedObjectsList().first + @unittest.skip("DEBUG") def test_pop_first(self): - """Test TimeStampedBuffer pop_first method.""" + """Test TimestampedObjectsList pop_first method.""" - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}) self.assertEqual(ts_buffer.pop_first(), (0, "A")) self.assertEqual(len(ts_buffer), 1) self.assertEqual(ts_buffer.first, (1, "B")) - + @unittest.skip("DEBUG") def test_pop_last_until(self): - """Test TimeStampedBuffer pop_last_until method.""" + """Test TimestampedObjectsList pop_last_until method.""" - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) # Check pop until an existing timestamp pop_last_until_2 = ts_buffer.pop_last_until(2) @@ -173,18 +200,18 @@ class TestTimeStampedBufferClass(unittest.TestCase): self.assertEqual(ts_buffer.first, (2, "C")) # Check first until an none existing timestamp - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) pop_last_until_1dot5 = ts_buffer.pop_last_until(1.5) self.assertEqual(pop_last_until_1dot5, (1, "B")) self.assertEqual(len(ts_buffer), 3) self.assertEqual(ts_buffer.first, (1, "B")) - + @unittest.skip("DEBUG") def test_pop_last_before(self): - """Test TimeStampedBuffer pop_last_before method.""" + """Test TimestampedObjectsList pop_last_before method.""" - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) # Check pop until an existing timestamp last_before_2 = ts_buffer.pop_last_before(2) @@ -194,37 +221,37 @@ class TestTimeStampedBufferClass(unittest.TestCase): self.assertEqual(ts_buffer.first, (2, "C")) # Check pop until an none existing timestamp - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) first_until_1dot5 = ts_buffer.pop_last_before(1.5) self.assertEqual(first_until_1dot5, (1, "B")) self.assertEqual(len(ts_buffer), 2) self.assertEqual(ts_buffer.first, (2, "C")) - + @unittest.skip("DEBUG") def test_last(self): - """Test TimeStampedBuffer last property.""" + """Test TimestampedObjectsList last property.""" - self.assertEqual(DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}).last, (1, "B")) + self.assertEqual(DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}).last, (1, "B")) - # Check that accessing to last item of an empty TimeStampedBuffer fails + # Check that accessing to last item of an empty TimestampedObjectsList fails with self.assertRaises(IndexError): - DataFeatures.TimeStampedBuffer().last - + DataFeatures.TimestampedObjectsList().last + @unittest.skip("DEBUG") def test_pop_last(self): - """Test TimeStampedBuffer pop_last method.""" + """Test TimestampedObjectsList pop_last method.""" - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}) self.assertEqual(ts_buffer.pop_last(), (1, "B")) self.assertEqual(len(ts_buffer), 1) self.assertEqual(ts_buffer.last, (0, "A")) - + @unittest.skip("DEBUG") def test_get_first_from(self): - """Test TimeStampedBuffer get_first_from method.""" + """Test TimestampedObjectsList get_first_from method.""" - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) get_first_from_1 = ts_buffer.get_first_from(1) @@ -243,11 +270,11 @@ class TestTimeStampedBufferClass(unittest.TestCase): with self.assertRaises(KeyError): ts_buffer.get_first_from(4) - + @unittest.skip("DEBUG") def test_get_last_before(self): - """Test TimeStampedBuffer get_last_before method.""" + """Test TimestampedObjectsList get_last_before method.""" - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) get_last_before_2 = ts_buffer.get_last_before(2) @@ -265,13 +292,12 @@ class TestTimeStampedBufferClass(unittest.TestCase): # Check that accessing to early timestamp fails with self.assertRaises(KeyError): - ts_buffer.get_last_before(-1) - - + ts_buffer.get_last_before(-1) + @unittest.skip("DEBUG") def test_get_last_until(self): - """Test TimeStampedBuffer get_last_until method.""" + """Test TimestampedObjectsList get_last_until method.""" - ts_buffer = DataFeatures.TimeStampedBuffer({0: "A", 1: "B", 2: "C", 3: "D"}) + ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) get_last_until_2 = ts_buffer.get_last_until(2) @@ -291,60 +317,6 @@ class TestTimeStampedBufferClass(unittest.TestCase): ts_buffer.get_last_until(-1) - def test_as_dataframe(self): - """Test TimeStampedBuffer as_dataframe method.""" - - ts_buffer = random_data_buffer(10, ["data_A", "data_B", "data_C"]) - - # Check dataframe conversion - ts_buffer_dataframe = ts_buffer.as_dataframe() - - self.assertEqual(ts_buffer_dataframe.index.name, "timestamp") - self.assertEqual(ts_buffer_dataframe.index.size, 10) - - self.assertEqual(ts_buffer_dataframe.columns.size, 3) - self.assertEqual(ts_buffer_dataframe.columns[0], "data_A") - self.assertEqual(ts_buffer_dataframe.columns[1], "data_B") - self.assertEqual(ts_buffer_dataframe.columns[2], "data_C") - - self.assertEqual(ts_buffer_dataframe.index.dtype, 'float64') - self.assertEqual(ts_buffer_dataframe["data_A"].dtype, 'object') - self.assertEqual(ts_buffer_dataframe["data_B"].dtype, 'object') - self.assertEqual(ts_buffer_dataframe["data_C"].dtype, 'object') - - # Check data exclusion option - ts_buffer_dataframe = ts_buffer.as_dataframe(exclude=["data_B"]) - - self.assertEqual(ts_buffer_dataframe.index.name, "timestamp") - self.assertEqual(ts_buffer_dataframe.index.size, 10) - - self.assertEqual(ts_buffer_dataframe.columns.size, 2) - self.assertEqual(ts_buffer_dataframe.columns[0], "data_A") - self.assertEqual(ts_buffer_dataframe.columns[1], "data_C") - - # Check dataframe split option - ts_buffer_dataframe = ts_buffer.as_dataframe(split={"data_B": ["data_B0", "data_B1"]}) - - self.assertEqual(ts_buffer_dataframe.index.name, "timestamp") - self.assertEqual(ts_buffer_dataframe.index.size, 10) - - self.assertEqual(ts_buffer_dataframe.columns.size, 4) - self.assertEqual(ts_buffer_dataframe.columns[0], "data_A") - self.assertEqual(ts_buffer_dataframe.columns[1], "data_B0") - self.assertEqual(ts_buffer_dataframe.columns[2], "data_B1") - self.assertEqual(ts_buffer_dataframe.columns[3], "data_C") - - # Check dataframe conversion with dataclass - data = BasicDataClass((123, 456)) - ts_buffer_dataframe = DataFeatures.TimeStampedBuffer({0: data}).as_dataframe() - - self.assertEqual(ts_buffer_dataframe.index.name, "timestamp") - self.assertEqual(ts_buffer_dataframe.index.size, 1) - - self.assertEqual(ts_buffer_dataframe.columns.size, 1) - self.assertEqual(ts_buffer_dataframe.columns[0], "value") - - if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index ce3ce52..b8acb2e 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -30,10 +30,10 @@ from colorama import Style, Fore TimeStampType = TypeVar('TimeStamp', int, float) """Type definition for timestamp as integer or float values.""" -TimeStampedObjectType = TypeVar('TimeStampedObject', bound="TimeStampedObject") +TimestampedObjectType = TypeVar('TimestampedObject', bound="TimestampedObject") # Type definition for type annotation convenience -TimeStampedObjectsListType = TypeVar('TimeStampedObjectsList', bound="TimeStampedObjectsList") +TimestampedObjectsListType = TypeVar('TimestampedObjectsList', bound="TimestampedObjectsList") # Type definition for type annotation convenience def module_path(obj) -> str: @@ -123,7 +123,48 @@ class JsonEncoder(json.JSONEncoder): return public_dict -class TimeStampedObjectsList(list): +class DataDictionary(dict): + """Enable dot.notation access to dictionary attributes""" + + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + +class TimestampedObject(): + """Abstract class to enable timestamp management.""" + + def __init__(self, timestamp: int|float = math.nan): + """Initialize TimestampedObject.""" + self._timestamp = timestamp + + def __repr__(self): + """String representation.""" + return json.dumps(as_dict(self)) + + @property + def timestamp(self) -> int|float: + """Get object timestamp.""" + return self._timestamp + + @timestamp.setter + def timestamp(self, timestamp: int|float): + """Set object timestamp.""" + + assert(type(timestamp) == int or type(timestamp) == float) + + self._timestamp = timestamp + + def untimestamp(self): + """Reset object timestamp.""" + self.timestamp = math.nan + + def is_timestamped(self) -> bool: + """Is the object timestamped?""" + timestamped = not math.isnan(self.timestamp) + + return timestamped + +class TimestampedObjectsList(list): """Handle timestamped object into a list. !!! warning "Timestamped objects are not sorted internally" @@ -145,7 +186,7 @@ class TimeStampedObjectsList(list): """Get object type handled by the list.""" return self.__object_type - def append(self, ts_object: TimeStampedObjectType|dict): + def append(self, ts_object: TimestampedObjectType|dict): """Append timestamped object.""" # Convert dict into GazePosition @@ -183,6 +224,67 @@ class TimeStampedObjectsList(list): """Get all timestamped objects as list of tuple.""" return [tuple(as_dict(ts_object, filter=False).values()) for ts_object in self] + @classmethod + def from_dataframe(self, ts_object_type: type, dataframe: pandas.DataFrame, exclude=[]) -> TimestampedObjectsListType: + """Create a TimestampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" + + dataframe.drop(exclude, inplace=True, axis=True) + + assert(dataframe.index.name == 'timestamp') + + object_list = [ts_object_type(timestamp=timestamp, **object_dict) for timestamp, object_dict in dataframe.to_dict('index').items()] + + return TimestampedObjectsList(ts_object_type, object_list) + + def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: + """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). + + The optional *split* argument allows tuple values to be stored in dedicated columns. + For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} + + !!! warning "Values must be dictionaries" + + Each key is stored as a column name. + + !!! note + + Timestamps are stored as index column called 'timestamp'. + """ + + df = pandas.DataFrame(self.tuples(), columns=self.__object_properties) + + # Exclude columns + df.drop(exclude, inplace=True, axis=True) + + # Split columns + if len(split) > 0: + + splited_columns = [] + + for column in df.columns: + + if column in split.keys(): + + df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) + df.drop(column, inplace=True, axis=True) + + for new_column in split[column]: + + splited_columns.append(new_column) + + else: + + splited_columns.append(column) + + # Reorder splited columns + df = df[splited_columns] + + # Append timestamps as index column + df['timestamp'] = self.timestamps() + df.set_index('timestamp', inplace=True) + + return df + def __repr__(self): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) @@ -191,7 +293,7 @@ class TimeStampedObjectsList(list): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) - def pop_last_until(self, timestamp: TimeStampType) -> TimeStampedObjectType: + def pop_last_until(self, timestamp: TimeStampType) -> TimestampedObjectType: """Pop all item until a given timestamped value and return the first after.""" # get last item before given timestamp @@ -203,7 +305,7 @@ class TimeStampedObjectsList(list): return self[0] - def pop_last_before(self, timestamp: TimeStampType) -> TimeStampedObjectType: + def pop_last_before(self, timestamp: TimeStampType) -> TimestampedObjectType: """Pop all item before a given timestamped value and return the last one.""" # get last item before given timestamp @@ -217,7 +319,7 @@ class TimeStampedObjectsList(list): return poped_value - def get_first_from(self, timestamp: TimeStampType) -> TimeStampedObjectType: + def get_first_from(self, timestamp: TimeStampType) -> TimestampedObjectType: """Retreive first item timestamp from a given timestamp value.""" first_from_index = bisect.bisect_left(self.timestamps(), timestamp) @@ -230,7 +332,7 @@ class TimeStampedObjectsList(list): raise KeyError(f'No data stored after {timestamp} timestamp.') - def get_last_before(self, timestamp: TimeStampType) -> TimeStampedObjectType: + def get_last_before(self, timestamp: TimeStampType) -> TimestampedObjectType: """Retreive last item timestamp before a given timestamp value.""" last_before_index = bisect.bisect_left(self.timestamps(), timestamp) - 1 @@ -243,7 +345,7 @@ class TimeStampedObjectsList(list): raise KeyError(f'No data stored before {ts} timestamp.') - def get_last_until(self, timestamp: TimeStampType) -> TimeStampedObjectType: + def get_last_until(self, timestamp: TimeStampType) -> TimestampedObjectType: """Retreive last item timestamp until a given timestamp value.""" last_until_index = bisect.bisect_right(self.timestamps(), timestamp) - 1 @@ -257,14 +359,14 @@ class TimeStampedObjectsList(list): raise KeyError(f'No data stored until {ts} timestamp.') @classmethod - def from_json(self, json_filepath: str) -> TimeStampedObjectsListType: - """Create a TimeStampedObjectsList from .json file.""" + def from_json(self, json_filepath: str) -> TimestampedObjectsListType: + """Create a TimestampedObjectsList from .json file.""" with open(json_filepath, encoding='utf-8') as ts_objects_file: json_ts_objects = json.load(ts_objects_file) - return TimeStampedObjectsList([ast.literal_eval(ts_object) for ts_object in json_ts_objects]) + return TimestampedObjectsList([ast.literal_eval(ts_object) for ts_object in json_ts_objects]) def to_json(self, json_filepath: str): """Save a TimeStampedBuffer to .json file.""" @@ -273,65 +375,6 @@ class TimeStampedObjectsList(list): json.dump(self, ts_buffer_file, ensure_ascii=False, cls=JsonEncoder) - @classmethod - def from_dataframe(self, dataframe: pandas.DataFrame, exclude=[]) -> TimeStampedObjectsListType: - """Create a TimeStampedObjectsList from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).""" - - dataframe.drop(exclude, inplace=True, axis=True) - - assert(dataframe.index.name == 'timestamp') - - return TimeStampedObjectsList(dataframe.to_dict('index')) - - def as_dataframe(self, exclude=[], split={}) -> pandas.DataFrame: - """Convert as [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). - - The optional *split* argument allows tuple values to be stored in dedicated columns. - For example: to convert {"point": (0, 0)} data as two separated "x" and "y" columns, use split={"point": ["x", "y"]} - - !!! warning "Values must be dictionaries" - - Each key is stored as a column name. - - !!! note - - Timestamps are stored as index column called 'timestamp'. - """ - - df = pandas.DataFrame(self.tuples(), columns=self.__object_properties) - - # Exclude columns - df.drop(exclude, inplace=True, axis=True) - - # Split columns - if len(split) > 0: - - splited_columns = [] - - for column in df.columns: - - if column in split.keys(): - - df[split[column]] = pandas.DataFrame(df[column].tolist(), index=df.index) - df.drop(column, inplace=True, axis=True) - - for new_column in split[column]: - - splited_columns.append(new_column) - - else: - - splited_columns.append(column) - - # Reorder splited columns - df = df[splited_columns] - - # Append timestamps as index column - df['timestamp'] = self.timestamps() - df.set_index('timestamp', inplace=True) - - return df - def plot(self, names=[], colors=[], split={}, samples=None) -> list: """Plot as [matplotlib](https://matplotlib.org/) time chart.""" @@ -357,43 +400,6 @@ class TimeStampedObjectsList(list): return legend_patches -class DataDictionary(dict): - """Enable dot.notation access to dictionary attributes""" - - __getattr__ = dict.get - __setattr__ = dict.__setitem__ - __delattr__ = dict.__delitem__ - -class TimestampedObject(): - """Abstract class to enable timestamp management.""" - - def __init__(self, timestamp: int|float = math.nan): - - self._timestamp = timestamp - - @property - def timestamp(self) -> int|float: - """Get object timestamp.""" - return self._timestamp - - @timestamp.setter - def timestamp(self, timestamp: int|float): - """Set object timestamp.""" - - assert(type(timestamp) == int or type(timestamp) == float) - - self._timestamp = timestamp - - def untimestamp(self): - """Reset object timestamp.""" - self.timestamp = math.nan - - def is_timestamped(self) -> bool: - """Is the object timestamped?""" - timestamped = not math.isnan(self.timestamp) - - return timestamped - class SharedObject(TimestampedObject): """Abstract class to enable multiple threads sharing and timestamp management.""" diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 2f83703..fb3dceb 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -177,12 +177,12 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeStampedGazePositions") # Type definition for type annotation convenience -class TimeStampedGazePositions(DataFeatures.TimeStampedObjectsList): +class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze positions into a list""" def __init__(self, gaze_positions: list = []): - DataFeatures.TimeStampedObjectsList.__init__(self, GazePosition, gaze_positions) + DataFeatures.TimestampedObjectsList.__init__(self, GazePosition, gaze_positions) def values(self) -> list: """Get all timestamped position values as list of tuple.""" @@ -510,12 +510,12 @@ def is_saccade(gaze_movement): TimeStampedGazeMovementsType = TypeVar('TimeStampedGazeMovements', bound="TimeStampedGazeMovements") # Type definition for type annotation convenience -class TimeStampedGazeMovements(DataFeatures.TimeStampedObjectsList): +class TimeStampedGazeMovements(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze movements into a list""" def __init__(self, gaze_movements: list = []): - DataFeatures.TimeStampedObjectsList.__init__(self, GazeMovement, gaze_movements) + DataFeatures.TimestampedObjectsList.__init__(self, GazeMovement, gaze_movements) GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus") # Type definition for type annotation convenience @@ -539,7 +539,7 @@ class GazeStatus(GazePosition): TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus") # Type definition for type annotation convenience -class TimeStampedGazeStatus(DataFeatures.TimeStampedObjectsList): +class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList): """Handle timestamped gaze movements into a list !!! note -- cgit v1.1 From e9788440bfea28bec77ef43abc407885b1c50ae8 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 00:33:27 +0100 Subject: Making DataFeatures test to work again. --- src/argaze.test/DataFeatures.py | 296 +++++++++++++++++++------------- src/argaze.test/DataLog/FileWriter.py | 37 ---- src/argaze.test/DataLog/__init__.py | 0 src/argaze.test/utils/ts_buffer.json | 1 - src/argaze.test/utils/ts_data_file.json | 82 +++++++++ src/argaze/DataFeatures.py | 47 ++--- 6 files changed, 282 insertions(+), 181 deletions(-) delete mode 100644 src/argaze.test/DataLog/FileWriter.py delete mode 100644 src/argaze.test/DataLog/__init__.py delete mode 100644 src/argaze.test/utils/ts_buffer.json create mode 100644 src/argaze.test/utils/ts_data_file.json diff --git a/src/argaze.test/DataFeatures.py b/src/argaze.test/DataFeatures.py index 4d9c909..9543915 100644 --- a/src/argaze.test/DataFeatures.py +++ b/src/argaze.test/DataFeatures.py @@ -19,16 +19,21 @@ import numpy class BasicDataClass(DataFeatures.TimestampedObject): """Define a basic dataclass for testing purpose.""" - def __init__(self, value: tuple = (), **kwargs): + def __init__(self, value: tuple = (), message: str = None, **kwargs): DataFeatures.TimestampedObject.__init__(self, **kwargs) self.__value = value + self.__message = message @property def value(self): return self.__value + @property + def message(self): + return self.__message + def random_data_list(size): """ Generate a random TimestampedObjectsList for testing purpose. Timestamps are current time. @@ -42,7 +47,7 @@ def random_data_list(size): for i in range(0, size): # Edit data - random_data = BasicDataClass((i, random.random())) + random_data = BasicDataClass((i, random.random()), f'test_{i}') # Timestamp data random_data.timestamp = time.time() @@ -54,6 +59,14 @@ def random_data_list(size): return DataFeatures.TimestampedObjectsList(BasicDataClass, data_list) +# DEBUG +''' +print('test_as_dataframe: export ts_data_file.json') +current_directory = os.path.dirname(os.path.abspath(__file__)) +json_filepath = os.path.join(current_directory, 'utils/ts_data_file.json') +random_data_list(10).to_json(json_filepath) +''' + class TestTimestampedObjectsListClass(unittest.TestCase): """Test TimestampedObjectsList class.""" @@ -83,23 +96,24 @@ class TestTimestampedObjectsListClass(unittest.TestCase): # Check dataframe conversion self.assertEqual(data_frame.index.name, "timestamp") self.assertEqual(data_frame.index.size, 10) - self.assertEqual(data_frame.columns.size, 1) + self.assertEqual(data_frame.columns.size, 2) self.assertEqual(data_frame.index.dtype, 'float64') self.assertEqual(data_frame["value"].dtype, 'object') + self.assertEqual(data_frame["message"].dtype, 'object') # Check data exclusion option data_frame = random_data_list(10).as_dataframe(exclude=["value"]) self.assertEqual(data_frame.index.name, "timestamp") self.assertEqual(data_frame.index.size, 10) - self.assertEqual(data_frame.columns.size, 0) + self.assertEqual(data_frame.columns.size, 1) # Check dataframe split option data_frame = random_data_list(10).as_dataframe(split={"value": ["value_0", "value_1"]}) self.assertEqual(data_frame.index.name, "timestamp") self.assertEqual(data_frame.index.size, 10) - self.assertEqual(data_frame.columns.size, 2) + self.assertEqual(data_frame.columns.size, 3) self.assertEqual(data_frame["value_0"].dtype, 'int64') self.assertEqual(data_frame["value_1"].dtype, 'float64') @@ -112,210 +126,250 @@ class TestTimestampedObjectsListClass(unittest.TestCase): data_list = DataFeatures.TimestampedObjectsList.from_dataframe(BasicDataClass, data_frame) self.assertEqual(len(data_list), 10) - @unittest.skip("DEBUG") + def test_from_json(self): """Test TimestampedObjectsList creation from json file.""" # Edit dataframe csv file path current_directory = os.path.dirname(os.path.abspath(__file__)) - json_filepath = os.path.join(current_directory, 'utils/ts_buffer.json') + json_filepath = os.path.join(current_directory, 'utils/ts_data_file.json') # Load TimestampedObjectsList from json file - ts_buffer = DataFeatures.TimestampedObjectsList.from_json(json_filepath) + data_list = DataFeatures.TimestampedObjectsList.from_json(BasicDataClass, json_filepath) - self.assertEqual(len(ts_buffer), 3) - @unittest.skip("DEBUG") + self.assertEqual(len(data_list), 10) + self.assertEqual(type(data_list[0]), BasicDataClass) + def test___repr__(self): """Test TimestampedObjectsList string representation.""" - self.assertEqual(repr(DataFeatures.TimestampedObjectsList()), "{}") - self.assertEqual(repr(DataFeatures.TimestampedObjectsList({0: ""})), "{\"0\": \"\"}") - self.assertEqual(repr(DataFeatures.TimestampedObjectsList({0.1: ""})), "{\"0.1\": \"\"}") - - data = BasicDataClass((123, 456)) - ts_buffer = DataFeatures.TimestampedObjectsList({0: data}) + self.assertEqual(repr(DataFeatures.TimestampedObjectsList(BasicDataClass)), "[]") - self.assertEqual(repr(ts_buffer), "{\"0\": {\"value\": [123, 456]}}") + data_list = [BasicDataClass((0, 0), 'test', timestamp=0)] + self.assertEqual(repr(DataFeatures.TimestampedObjectsList(BasicDataClass, data_list)), "[{\"value\": [0, 0], \"message\": \"test\", \"timestamp\": 0}]") - array = numpy.zeros(3) - ts_buffer = DataFeatures.TimestampedObjectsList({0: array}) - - self.assertEqual(repr(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}") - @unittest.skip("DEBUG") def test___str__(self): """Test TimestampedObjectsList string representation.""" - self.assertEqual(str(DataFeatures.TimestampedObjectsList()), "{}") - self.assertEqual(str(DataFeatures.TimestampedObjectsList({0: ""})), "{\"0\": \"\"}") - self.assertEqual(str(DataFeatures.TimestampedObjectsList({0.1: ""})), "{\"0.1\": \"\"}") - - data = BasicDataClass((123, 456)) - ts_buffer = DataFeatures.TimestampedObjectsList({0: data}) - - self.assertEqual(str(ts_buffer), "{\"0\": {\"value\": [123, 456]}}") + self.assertEqual(str(DataFeatures.TimestampedObjectsList(BasicDataClass)), "[]") - array = numpy.zeros(3) - ts_buffer = DataFeatures.TimestampedObjectsList({0: array}) + data_list = [BasicDataClass((0, 0), 'test', timestamp=0)] + self.assertEqual(str(DataFeatures.TimestampedObjectsList(BasicDataClass, data_list)), "[{\"value\": [0, 0], \"message\": \"test\", \"timestamp\": 0}]") - self.assertEqual(str(ts_buffer), "{\"0\": [0.0, 0.0, 0.0]}") - @unittest.skip("DEBUG") def test_append(self): """Test TimestampedObjectsList append method.""" - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}) - ts_buffer_next = DataFeatures.TimestampedObjectsList({2: "C", 3: "D"}) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass) + next_data = BasicDataClass((0, 0), 'test', timestamp=0) - self.assertEqual(len(ts_buffer.append(ts_buffer_next)), 4) - self.assertEqual(list(ts_buffer.append(ts_buffer_next).keys()), [0, 1, 2, 3]) - @unittest.skip("DEBUG") - def test_first(self): - """Test TimestampedObjectsList first property.""" + data_list.append(next_data) - self.assertEqual(DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}).first, (0, "A")) - - # Check that accessing to first item of an empty TimestampedObjectsList fails - with self.assertRaises(IndexError): - - DataFeatures.TimestampedObjectsList().first - @unittest.skip("DEBUG") + self.assertEqual(len(data_list), 1) + self.assertEqual(data_list.timestamps(), [0]) + def test_pop_first(self): """Test TimestampedObjectsList pop_first method.""" - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \ + [ + BasicDataClass(message="A", timestamp=0), + BasicDataClass(message="B", timestamp=1) + ]) + + first = data_list.pop(0) + + self.assertEqual(len(data_list), 1) + self.assertEqual(first.message, "A") + self.assertEqual(first.timestamp, 0) + self.assertEqual(data_list[0].message, "B") + self.assertEqual(data_list[0].timestamp, 1) - self.assertEqual(ts_buffer.pop_first(), (0, "A")) - self.assertEqual(len(ts_buffer), 1) - self.assertEqual(ts_buffer.first, (1, "B")) - @unittest.skip("DEBUG") def test_pop_last_until(self): """Test TimestampedObjectsList pop_last_until method.""" - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \ + [ + BasicDataClass(message="A", timestamp=0), + BasicDataClass(message="B", timestamp=1), + BasicDataClass(message="C", timestamp=2), + BasicDataClass(message="D", timestamp=3) + ]) # Check pop until an existing timestamp - pop_last_until_2 = ts_buffer.pop_last_until(2) + pop_last_until_2 = data_list.pop_last_until(2) - self.assertEqual(pop_last_until_2, (2, "C")) - self.assertEqual(len(ts_buffer), 2) - self.assertEqual(ts_buffer.first, (2, "C")) + self.assertEqual(pop_last_until_2.message, "C") + self.assertEqual(pop_last_until_2.timestamp, 2) + self.assertEqual(len(data_list), 2) + self.assertEqual(data_list[0].message, "C") + self.assertEqual(data_list[0].timestamp, 2) # Check first until an none existing timestamp - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) - - pop_last_until_1dot5 = ts_buffer.pop_last_until(1.5) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \ + [ + BasicDataClass(message="A", timestamp=0), + BasicDataClass(message="B", timestamp=1), + BasicDataClass(message="C", timestamp=2), + BasicDataClass(message="D", timestamp=3) + ]) + + pop_last_until_1dot5 = data_list.pop_last_until(1.5) + + self.assertEqual(pop_last_until_1dot5.message, "B") + self.assertEqual(pop_last_until_1dot5.timestamp, 1) + self.assertEqual(len(data_list), 3) + self.assertEqual(data_list[0].message, "B") + self.assertEqual(data_list[0].timestamp, 1) - self.assertEqual(pop_last_until_1dot5, (1, "B")) - self.assertEqual(len(ts_buffer), 3) - self.assertEqual(ts_buffer.first, (1, "B")) - @unittest.skip("DEBUG") def test_pop_last_before(self): """Test TimestampedObjectsList pop_last_before method.""" - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \ + [ + BasicDataClass(message="A", timestamp=0), + BasicDataClass(message="B", timestamp=1), + BasicDataClass(message="C", timestamp=2), + BasicDataClass(message="D", timestamp=3) + ]) # Check pop until an existing timestamp - last_before_2 = ts_buffer.pop_last_before(2) + last_before_2 = data_list.pop_last_before(2) - self.assertEqual(last_before_2, (1, "B")) - self.assertEqual(len(ts_buffer), 2) - self.assertEqual(ts_buffer.first, (2, "C")) + self.assertEqual(last_before_2.message, "B") + self.assertEqual(last_before_2.timestamp, 1) + self.assertEqual(len(data_list), 2) + self.assertEqual(data_list[0].message, "C") + self.assertEqual(data_list[0].timestamp, 2) # Check pop until an none existing timestamp - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) - - first_until_1dot5 = ts_buffer.pop_last_before(1.5) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \ + [ + BasicDataClass(message="A", timestamp=0), + BasicDataClass(message="B", timestamp=1), + BasicDataClass(message="C", timestamp=2), + BasicDataClass(message="D", timestamp=3) + ]) + + first_until_1dot5 = data_list.pop_last_before(1.5) + + self.assertEqual(first_until_1dot5.message, "B") + self.assertEqual(first_until_1dot5.timestamp, 1) + self.assertEqual(len(data_list), 2) + self.assertEqual(data_list[0].message, "C") + self.assertEqual(data_list[0].timestamp, 2) - self.assertEqual(first_until_1dot5, (1, "B")) - self.assertEqual(len(ts_buffer), 2) - self.assertEqual(ts_buffer.first, (2, "C")) - @unittest.skip("DEBUG") - def test_last(self): - """Test TimestampedObjectsList last property.""" - - self.assertEqual(DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}).last, (1, "B")) - - # Check that accessing to last item of an empty TimestampedObjectsList fails - with self.assertRaises(IndexError): - - DataFeatures.TimestampedObjectsList().last - @unittest.skip("DEBUG") def test_pop_last(self): """Test TimestampedObjectsList pop_last method.""" - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B"}) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \ + [ + BasicDataClass(message="A", timestamp=0), + BasicDataClass(message="B", timestamp=1) + ]) + + last = data_list.pop(-1) + + self.assertEqual(len(data_list), 1) + self.assertEqual(last.message, "B") + self.assertEqual(last.timestamp, 1) + self.assertEqual(data_list[0].message, "A") + self.assertEqual(data_list[0].timestamp, 0) - self.assertEqual(ts_buffer.pop_last(), (1, "B")) - self.assertEqual(len(ts_buffer), 1) - self.assertEqual(ts_buffer.last, (0, "A")) - @unittest.skip("DEBUG") def test_get_first_from(self): """Test TimestampedObjectsList get_first_from method.""" - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \ + [ + BasicDataClass(message="A", timestamp=0), + BasicDataClass(message="B", timestamp=1), + BasicDataClass(message="C", timestamp=2), + BasicDataClass(message="D", timestamp=3) + ]) - get_first_from_1 = ts_buffer.get_first_from(1) + get_first_from_1 = data_list.get_first_from(1) - self.assertEqual(get_first_from_1, (1, "B")) - self.assertEqual(len(ts_buffer), 4) + self.assertEqual(get_first_from_1.message, "B") + self.assertEqual(get_first_from_1.timestamp, 1) + self.assertEqual(len(data_list), 4) - get_first_from_1dot5 = ts_buffer.get_first_from(1.5) + get_first_from_1dot5 = data_list.get_first_from(1.5) - self.assertEqual(get_first_from_1dot5, (2, "C")) + self.assertEqual(get_first_from_1dot5.message, "C") + self.assertEqual(get_first_from_1dot5.timestamp, 2) - get_first_from_0 = ts_buffer.get_first_from(0) + get_first_from_0 = data_list.get_first_from(0) - self.assertEqual(get_first_from_0, (0, "A")) + self.assertEqual(get_first_from_0.message, "A") + self.assertEqual(get_first_from_0.timestamp, 0) # Check that accessing to lately timestamp fails with self.assertRaises(KeyError): - ts_buffer.get_first_from(4) - @unittest.skip("DEBUG") + data_list.get_first_from(4) + def test_get_last_before(self): """Test TimestampedObjectsList get_last_before method.""" - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \ + [ + BasicDataClass(message="A", timestamp=0), + BasicDataClass(message="B", timestamp=1), + BasicDataClass(message="C", timestamp=2), + BasicDataClass(message="D", timestamp=3) + ]) - get_last_before_2 = ts_buffer.get_last_before(2) + get_last_before_2 = data_list.get_last_before(2) - self.assertEqual(get_last_before_2, (1, "B")) - self.assertEqual(len(ts_buffer), 4) + self.assertEqual(get_last_before_2.message, "B") + self.assertEqual(get_last_before_2.timestamp, 1) + self.assertEqual(len(data_list), 4) - get_last_before_1dot5 = ts_buffer.get_last_before(1.5) + get_last_before_1dot5 = data_list.get_last_before(1.5) - self.assertEqual(get_last_before_1dot5, (1, "B")) + self.assertEqual(get_last_before_1dot5.message,"B") + self.assertEqual(get_last_before_1dot5.timestamp, 1) - get_last_before_4 = ts_buffer.get_last_before(4) + get_last_before_4 = data_list.get_last_before(4) - self.assertEqual(get_last_before_4, (3, "D")) + self.assertEqual(get_last_before_4.message, "D") + self.assertEqual(get_last_before_4.timestamp, 3) # Check that accessing to early timestamp fails with self.assertRaises(KeyError): - ts_buffer.get_last_before(-1) - @unittest.skip("DEBUG") + data_list.get_last_before(-1) + def test_get_last_until(self): """Test TimestampedObjectsList get_last_until method.""" - ts_buffer = DataFeatures.TimestampedObjectsList({0: "A", 1: "B", 2: "C", 3: "D"}) + data_list = DataFeatures.TimestampedObjectsList(BasicDataClass, \ + [ + BasicDataClass(message="A", timestamp=0), + BasicDataClass(message="B", timestamp=1), + BasicDataClass(message="C", timestamp=2), + BasicDataClass(message="D", timestamp=3) + ]) - get_last_until_2 = ts_buffer.get_last_until(2) + get_last_until_2 = data_list.get_last_until(2) - self.assertEqual(get_last_until_2, (2, "C")) - self.assertEqual(len(ts_buffer), 4) + self.assertEqual(get_last_until_2.message, "C") + self.assertEqual(get_last_until_2.timestamp, 2) + self.assertEqual(len(data_list), 4) - get_last_until_1dot5 = ts_buffer.get_last_until(1.5) + get_last_until_1dot5 = data_list.get_last_until(1.5) - self.assertEqual(get_last_until_1dot5, (1, "B")) + self.assertEqual(get_last_until_1dot5.message, "B") + self.assertEqual(get_last_until_1dot5.timestamp, 1) - get_last_until_4 = ts_buffer.get_last_until(4) + get_last_until_4 = data_list.get_last_until(4) - self.assertEqual(get_last_until_4, (3, "D")) + self.assertEqual(get_last_until_4.message, "D") + self.assertEqual(get_last_until_4.timestamp, 3) # Check that accessing to early timestamp fails with self.assertRaises(KeyError): - ts_buffer.get_last_until(-1) + data_list.get_last_until(-1) if __name__ == '__main__': diff --git a/src/argaze.test/DataLog/FileWriter.py b/src/argaze.test/DataLog/FileWriter.py deleted file mode 100644 index 648385c..0000000 --- a/src/argaze.test/DataLog/FileWriter.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python - -""" """ - -__author__ = "Théo de la Hogue" -__credits__ = [] -__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" -__license__ = "BSD" - -import unittest -import os - -from argaze import DataFeatures -from argaze.DataLog import FileWriter -from argaze.utils import UtilsFeatures - -DataFeaturesTest = UtilsFeatures.importFromTestPackage('DataFeatures') - -class TestTimeStampedDataLogger(unittest.TestCase): - """Test DataLogger class.""" - - def test_creation(self): - """Test logger creation.""" - - file_writer = FileWriter.TimeStampedDataLogger(path='./_export/logs/data.txt', separator=',') - - # Check file creation - self.assertEqual(os.path.exists('./_export/logs/data.txt'), True) - - # Write into file - file_writer.emit(0, 'A') - file_writer.emit(1, 'B') - file_writer.emit(2, 'C') - -if __name__ == '__main__': - - unittest.main() \ No newline at end of file diff --git a/src/argaze.test/DataLog/__init__.py b/src/argaze.test/DataLog/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/argaze.test/utils/ts_buffer.json b/src/argaze.test/utils/ts_buffer.json deleted file mode 100644 index b1433de..0000000 --- a/src/argaze.test/utils/ts_buffer.json +++ /dev/null @@ -1 +0,0 @@ -{"0":{"timestamp":0,"int_value":1,"float_value":0.000001,"string_value":"A","list_value":[[0,0],[0,1],[1,1],[1,0]],"tuple_value":[0,0],"nan_value":1.0,"json_value":{"0":"A","1":"B","2":"C","3":"D"}},"1":{"timestamp":1,"int_value":2,"float_value":0.000002,"string_value":"B","list_value":[[0,0],[0,2],[2,2],[2,0]],"tuple_value":[1,1],"nan_value":null,"json_value":{"0":"A","1":"B","2":"C","3":"D"}},"2":{"timestamp":2,"int_value":3,"float_value":0.000003,"string_value":"C","list_value":[[0,0],[0,3],[3,3],[3,0]],"tuple_value":[2,2],"nan_value":1.0,"json_value":{"0":"A","1":"B","2":"C","3":"D"}}} \ No newline at end of file diff --git a/src/argaze.test/utils/ts_data_file.json b/src/argaze.test/utils/ts_data_file.json new file mode 100644 index 0000000..d69dd77 --- /dev/null +++ b/src/argaze.test/utils/ts_data_file.json @@ -0,0 +1,82 @@ +[ + { + "value": [ + 0, + 0.04245991513702008 + ], + "message": "test_0", + "timestamp": 1709160858.387703 + }, + { + "value": [ + 1, + 0.6761490271896192 + ], + "message": "test_1", + "timestamp": 1709160858.38784 + }, + { + "value": [ + 2, + 0.046407850274610474 + ], + "message": "test_2", + "timestamp": 1709160858.387973 + }, + { + "value": [ + 3, + 0.9378067398496651 + ], + "message": "test_3", + "timestamp": 1709160858.388105 + }, + { + "value": [ + 4, + 0.4197936347606107 + ], + "message": "test_4", + "timestamp": 1709160858.388236 + }, + { + "value": [ + 5, + 0.26937423401632943 + ], + "message": "test_5", + "timestamp": 1709160858.3883672 + }, + { + "value": [ + 6, + 0.9478731386524537 + ], + "message": "test_6", + "timestamp": 1709160858.3884978 + }, + { + "value": [ + 7, + 0.39010865778889914 + ], + "message": "test_7", + "timestamp": 1709160858.388629 + }, + { + "value": [ + 8, + 0.4100480317631575 + ], + "message": "test_8", + "timestamp": 1709160858.388763 + }, + { + "value": [ + 9, + 0.5900791904864906 + ], + "message": "test_9", + "timestamp": 1709160858.388895 + } +] \ No newline at end of file diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index b8acb2e..6909222 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -285,6 +285,23 @@ class TimestampedObjectsList(list): return df + @classmethod + def from_json(self, ts_object_type: type, json_filepath: str) -> TimestampedObjectsListType: + """Create a TimestampedObjectsList from .json file.""" + + with open(json_filepath, encoding='utf-8') as ts_objects_file: + + json_ts_objects = json.load(ts_objects_file) + + return TimestampedObjectsList(ts_object_type, [ts_object_type(**ts_object_dict) for ts_object_dict in json_ts_objects]) + + def to_json(self, json_filepath: str): + """Save a TimestampedObjectsList to .json file.""" + + with open(json_filepath, 'w', encoding='utf-8') as ts_objects_file: + + json.dump(self, ts_objects_file, ensure_ascii=False, default=(lambda obj: as_dict(obj)), indent=' ') + def __repr__(self): """String representation""" return json.dumps([as_dict(ts_object) for ts_object in self], ensure_ascii=False,) @@ -322,7 +339,8 @@ class TimestampedObjectsList(list): def get_first_from(self, timestamp: TimeStampType) -> TimestampedObjectType: """Retreive first item timestamp from a given timestamp value.""" - first_from_index = bisect.bisect_left(self.timestamps(), timestamp) + ts_list = self.timestamps() + first_from_index = bisect.bisect_left(ts_list, timestamp) if first_from_index < len(self): @@ -335,7 +353,8 @@ class TimestampedObjectsList(list): def get_last_before(self, timestamp: TimeStampType) -> TimestampedObjectType: """Retreive last item timestamp before a given timestamp value.""" - last_before_index = bisect.bisect_left(self.timestamps(), timestamp) - 1 + ts_list = self.timestamps() + last_before_index = bisect.bisect_left(ts_list, timestamp) - 1 if last_before_index >= 0: @@ -343,12 +362,13 @@ class TimestampedObjectsList(list): else: - raise KeyError(f'No data stored before {ts} timestamp.') + raise KeyError(f'No data stored before {timestamp} timestamp.') def get_last_until(self, timestamp: TimeStampType) -> TimestampedObjectType: """Retreive last item timestamp until a given timestamp value.""" - last_until_index = bisect.bisect_right(self.timestamps(), timestamp) - 1 + ts_list = self.timestamps() + last_until_index = bisect.bisect_right(ts_list, timestamp) - 1 if last_until_index >= 0: @@ -356,24 +376,7 @@ class TimestampedObjectsList(list): else: - raise KeyError(f'No data stored until {ts} timestamp.') - - @classmethod - def from_json(self, json_filepath: str) -> TimestampedObjectsListType: - """Create a TimestampedObjectsList from .json file.""" - - with open(json_filepath, encoding='utf-8') as ts_objects_file: - - json_ts_objects = json.load(ts_objects_file) - - return TimestampedObjectsList([ast.literal_eval(ts_object) for ts_object in json_ts_objects]) - - def to_json(self, json_filepath: str): - """Save a TimeStampedBuffer to .json file.""" - - with open(json_filepath, 'w', encoding='utf-8') as ts_buffer_file: - - json.dump(self, ts_buffer_file, ensure_ascii=False, cls=JsonEncoder) + raise KeyError(f'No data stored until {timestamp} timestamp.') def plot(self, names=[], colors=[], split={}, samples=None) -> list: """Plot as [matplotlib](https://matplotlib.org/) time chart.""" -- cgit v1.1 From 10969554e3126c65d19e408b06b3169f35b81e41 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 01:00:40 +0100 Subject: First work to update DispersionThresholdIdentification. --- .../DispersionThresholdIdentification.py | 94 ++++++++++------------ src/argaze/DataFeatures.py | 9 +++ .../DispersionThresholdIdentification.py | 2 +- src/argaze/GazeFeatures.py | 4 +- 4 files changed, 55 insertions(+), 54 deletions(-) diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index f0d286a..7e74c1d 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -49,9 +49,11 @@ def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time gaze_position = GazeFeatures.GazePosition() + # Timestamp gaze position + gaze_position.timestamp = time.time() - start_time + start_ts + # Store gaze position - ts = time.time() - start_time + start_ts - ts_gaze_positions[ts] = gaze_position + ts_gaze_positions.append(gaze_position) return ts_gaze_positions @@ -87,9 +89,11 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl gaze_position = GazeFeatures.GazePosition() + # Timestamp gaze position + gaze_position.timestamp = time.time() - start_time + start_ts + # Store gaze position - ts = time.time() - start_time + start_ts - ts_gaze_positions[ts] = gaze_position + ts_gaze_positions.append(gaze_position) return ts_gaze_positions @@ -115,7 +119,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), size) # Check fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) self.assertEqual(len(fixation.positions.keys()), size) self.assertLessEqual(fixation.deviation_max, deviation_max) @@ -134,9 +138,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): max_time = 0.1 ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions) @@ -147,7 +151,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), size * 2) # Check first fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) self.assertEqual(len(fixation.positions.keys()), size) self.assertLessEqual(fixation.deviation_max, deviation_max) @@ -156,7 +160,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.finished, True) # Check first saccade - ts, saccade = ts_saccades.pop_first() + saccade = ts_saccades.pop(0) self.assertEqual(len(saccade.positions.keys()), 2) self.assertGreaterEqual(saccade.duration, min_time) @@ -164,14 +168,11 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(saccade.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = fixation.positions.last - first_ts, first_position = saccade.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) + self.assertEqual(fixation[-1].value, saccade[0].value) # Check second fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) self.assertEqual(len(fixation.positions.keys()), size) self.assertLessEqual(fixation.deviation_max, deviation_max) @@ -180,11 +181,8 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = saccade.positions.last - first_ts, first_position = fixation.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp) + self.assertEqual(saccade[-1].value, fixation[0].value) def test_fixation_and_short_saccade_identification(self): """Test DispersionThresholdIdentification fixation and saccade identification.""" @@ -199,10 +197,10 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): max_time = 0.1 ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A.last[0]) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions.last[0]) + ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A[-1].timestamp) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_move_positions).append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_move_positions + ts_gaze_positions_B gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions) @@ -213,7 +211,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), size * 2 + move) # Check first fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) self.assertEqual(len(fixation.positions.keys()), size) self.assertLessEqual(fixation.deviation_max, deviation_max) @@ -222,7 +220,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.finished, True) # Check first saccade - ts, saccade = ts_saccades.pop_first() + saccade = ts_saccades.pop(0) self.assertEqual(len(saccade.positions.keys()), move + 2) self.assertGreaterEqual(saccade.duration, (move + 1) * min_time) @@ -230,14 +228,11 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(saccade.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = fixation.positions.last - first_ts, first_position = saccade.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) + self.assertEqual(fixation[-1].value, saccade[0].value) # Check second fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) self.assertEqual(len(fixation.positions.keys()), size) self.assertLessEqual(fixation.deviation_max, deviation_max) @@ -246,11 +241,8 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = saccade.positions.last - first_ts, first_position = fixation.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp) + self.assertEqual(saccade[-1].value, fixation[0].value) def test_invalid_gaze_position(self): """Test DispersionThresholdIdentification fixation and saccade identification with invalid gaze position.""" @@ -273,7 +265,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), size-3) # Check first fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) self.assertEqual(len(fixation.positions.keys()), 7) self.assertLessEqual(fixation.deviation_max, deviation_max) @@ -282,7 +274,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(fixation.finished, True) # Check second fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) self.assertEqual(len(fixation.positions.keys()), 5) self.assertLessEqual(fixation.deviation_max, deviation_max) @@ -302,8 +294,8 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): max_time = 0.1 ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) - ts_gaze_positions_C = build_gaze_fixation(size, center_C, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_B.last[0]) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) + ts_gaze_positions_C = build_gaze_fixation(size, center_C, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_B[-1].timestamp) fixation_A = DispersionThresholdIdentification.Fixation(ts_gaze_positions_A) fixation_B = DispersionThresholdIdentification.Fixation(ts_gaze_positions_B) @@ -329,7 +321,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): max_time = 0.1 ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) @@ -342,7 +334,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), size*2) # Check unique fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) self.assertEqual(len(fixation.positions.keys()), size * 2) #self.assertGreaterEqual(fixation.deviation_max, deviation_max) @@ -361,19 +353,19 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): max_time = 0.1 ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) # Get last ts to terminate identification on last gaze position - last_ts, _ = ts_gaze_positions.last + last_ts = ts_gaze_positions[-1].timestamp # Iterate on gaze positions - for ts, gaze_position in ts_gaze_positions.items(): + for gaze_position in ts_gaze_positions: - finished_gaze_movement = gaze_movement_identifier.identify(ts, gaze_position, terminate=(ts == last_ts)) + finished_gaze_movement = gaze_movement_identifier.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) if GazeFeatures.is_fixation(finished_gaze_movement): @@ -393,9 +385,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check that last gaze position date is not equal to given gaze position date if finished_gaze_movement.valid: - last_ts, _ = finished_gaze_movement.positions.last + last_ts = finished_gaze_movement[-1].timestamp - self.assertNotEqual(last_ts, ts) + self.assertNotEqual(last_ts, gaze_position.timestamp) # Check that last gaze position date of current fixation is equal to given gaze position date # NOTE: This is not true for saccade as, for I-DT, there is a minimal time window while the gaze movement is unknown @@ -404,9 +396,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): if GazeFeatures.is_fixation(current_gaze_movement): - last_ts, _ = current_gaze_movement.positions.last + last_ts = current_gaze_movement[-1].timestamp - self.assertEqual(last_ts, ts) + self.assertEqual(last_ts, gaze_position.timestamp) def test_identification_generator(self): """Test DispersionThresholdIdentification identification using generator.""" @@ -419,7 +411,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): max_time = 0.1 ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 6909222..6be946f 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -205,6 +205,15 @@ class TimestampedObjectsList(list): super().append(ts_object) + def __add__(self, ts_objects: list = []) -> TimestampedObjectsListType: + """Append timestamped objects list.""" + + for ts_object in ts_objects: + + self.append(ts_object) + + return self + @property def duration(self): """Get inferred duration from first and last timestamps.""" diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index f928c5a..a452e98 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -227,7 +227,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__valid_positions = GazeFeatures.TimeStampedGazePositions() # Store current gaze position - self.__valid_positions[ts] = gaze_position + self.__valid_positions.append(gaze_position) # Output last fixation return last_fixation if not terminate else self.current_saccade.finish() diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index fb3dceb..71c643e 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -612,7 +612,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): # Iterate on gaze positions for gaze_position in ts_gaze_positions: - finished_gaze_movement = self.identify(ts, gaze_position, terminate=(gaze_position.timestamp == last_ts)) + finished_gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) if is_fixation(finished_gaze_movement): @@ -685,7 +685,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): # Iterate on gaze positions for gaze_position in ts_gaze_positions: - finished_gaze_movement = self.identify(ts, gaze_position, terminate=(gaze_position.timestamp == last_ts)) + finished_gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) if finished_gaze_movement: -- cgit v1.1 From faa6d8acf3c9e4d11a3ee84df2d5a48501befd68 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 10:36:47 +0100 Subject: Fixing DispersionThresholdIdentification test. --- .../DispersionThresholdIdentification.py | 54 +++++------ src/argaze/DataFeatures.py | 16 +++- .../DispersionThresholdIdentification.py | 106 ++++++++++----------- src/argaze/GazeFeatures.py | 101 ++++++++------------ 4 files changed, 128 insertions(+), 149 deletions(-) diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index 7e74c1d..07496c3 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -121,12 +121,12 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check fixation fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size) + self.assertEqual(len(fixation), size) self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) self.assertLessEqual(fixation.finished, True) - + def test_fixation_and_direct_saccade_identification(self): """Test DispersionThresholdIdentification fixation and saccade identification.""" @@ -153,7 +153,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check first fixation fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size) + self.assertEqual(len(fixation), size) self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) @@ -162,7 +162,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check first saccade saccade = ts_saccades.pop(0) - self.assertEqual(len(saccade.positions.keys()), 2) + self.assertEqual(len(saccade), 2) self.assertGreaterEqual(saccade.duration, min_time) self.assertLessEqual(saccade.duration, max_time) self.assertLessEqual(saccade.finished, True) @@ -174,7 +174,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check second fixation fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size) + self.assertEqual(len(fixation), size) self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) @@ -183,7 +183,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check that last position of a movement is equal to first position of next movement self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp) self.assertEqual(saccade[-1].value, fixation[0].value) - + def test_fixation_and_short_saccade_identification(self): """Test DispersionThresholdIdentification fixation and saccade identification.""" @@ -213,7 +213,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check first fixation fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size) + self.assertEqual(len(fixation), size) self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) @@ -222,7 +222,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check first saccade saccade = ts_saccades.pop(0) - self.assertEqual(len(saccade.positions.keys()), move + 2) + self.assertEqual(len(saccade), move + 2) self.assertGreaterEqual(saccade.duration, (move + 1) * min_time) self.assertLessEqual(saccade.duration, (move + 1) * max_time) self.assertLessEqual(saccade.finished, True) @@ -234,7 +234,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check second fixation fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size) + self.assertEqual(len(fixation), size) self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) @@ -243,9 +243,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check that last position of a movement is equal to first position of next movement self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp) self.assertEqual(saccade[-1].value, fixation[0].value) - - def test_invalid_gaze_position(self): - """Test DispersionThresholdIdentification fixation and saccade identification with invalid gaze position.""" + + def test_empty_gaze_position(self): + """Test DispersionThresholdIdentification fixation and saccade identification with empty gaze position.""" size = 15 center = (0, 0) @@ -267,7 +267,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check first fixation fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), 7) + self.assertEqual(len(fixation), 7) self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, 6 * min_time) self.assertLessEqual(fixation.duration, 6 * max_time) @@ -276,12 +276,12 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check second fixation fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), 5) + self.assertEqual(len(fixation), 5) self.assertLessEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, 4 * min_time) self.assertLessEqual(fixation.duration, 4 * max_time) self.assertLessEqual(fixation.finished, True) - + def test_fixation_overlapping(self): """Test Fixation overlap function.""" @@ -336,12 +336,12 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check unique fixation fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size * 2) + self.assertEqual(len(fixation), size * 2) #self.assertGreaterEqual(fixation.deviation_max, deviation_max) self.assertGreaterEqual(fixation.duration, (2 * size - 1) * min_time) self.assertLessEqual(fixation.duration, (2 * size - 1) * max_time) self.assertLessEqual(fixation.finished, True) - + def test_identification_browsing(self): """Test DispersionThresholdIdentification identification browsing.""" @@ -355,7 +355,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) @@ -369,7 +369,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): if GazeFeatures.is_fixation(finished_gaze_movement): - self.assertEqual(len(finished_gaze_movement.positions.keys()), size) + self.assertEqual(len(finished_gaze_movement), size) self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max) self.assertGreaterEqual(finished_gaze_movement.duration, (size-1) * min_time) self.assertLessEqual(finished_gaze_movement.duration, (size-1) * max_time) @@ -377,13 +377,13 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): elif GazeFeatures.is_saccade(finished_gaze_movement): - self.assertEqual(len(finished_gaze_movement.positions.keys()), 2) + self.assertEqual(len(finished_gaze_movement), 2) self.assertGreaterEqual(finished_gaze_movement.duration, min_time) self.assertLessEqual(finished_gaze_movement.duration, max_time) self.assertLessEqual(finished_gaze_movement.finished, True) # Check that last gaze position date is not equal to given gaze position date - if finished_gaze_movement.valid: + if finished_gaze_movement: last_ts = finished_gaze_movement[-1].timestamp @@ -392,14 +392,14 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): # Check that last gaze position date of current fixation is equal to given gaze position date # NOTE: This is not true for saccade as, for I-DT, there is a minimal time window while the gaze movement is unknown current_gaze_movement = gaze_movement_identifier.current_gaze_movement - if current_gaze_movement.valid: + if current_gaze_movement: if GazeFeatures.is_fixation(current_gaze_movement): last_ts = current_gaze_movement[-1].timestamp self.assertEqual(last_ts, gaze_position.timestamp) - + def test_identification_generator(self): """Test DispersionThresholdIdentification identification using generator.""" @@ -413,15 +413,15 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) - for ts, finished_gaze_movement in gaze_movement_identifier(ts_gaze_positions): + for finished_gaze_movement in gaze_movement_identifier(ts_gaze_positions): if GazeFeatures.is_fixation(finished_gaze_movement): - self.assertEqual(len(finished_gaze_movement.positions.keys()), size) + self.assertEqual(len(finished_gaze_movement), size) self.assertLessEqual(finished_gaze_movement.deviation_max, deviation_max) self.assertGreaterEqual(finished_gaze_movement.duration, size * min_time) self.assertLessEqual(finished_gaze_movement.duration, size * max_time) @@ -429,7 +429,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): elif GazeFeatures.is_saccade(finished_gaze_movement): - self.assertEqual(len(finished_gaze_movement.positions.keys()), 2) + self.assertEqual(len(finished_gaze_movement), 2) self.assertGreaterEqual(finished_gaze_movement.duration, 2 * min_time) self.assertLessEqual(finished_gaze_movement.duration, 2 * max_time) self.assertLessEqual(finished_gaze_movement.finished, True) diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 6be946f..ff9baec 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -205,9 +205,17 @@ class TimestampedObjectsList(list): super().append(ts_object) + def look_for(self, timestamp: TimeStampType) -> TimestampedObjectType: + """Look for object at given timestamp.""" + for ts_object in self: + + if ts_object.timestamp == timestamp: + + return ts_object + def __add__(self, ts_objects: list = []) -> TimestampedObjectsListType: """Append timestamped objects list.""" - + for ts_object in ts_objects: self.append(ts_object) @@ -774,7 +782,7 @@ def PipelineStepMethod(method): PipelineStepMethod must have a timestamp as first argument. """ - def wrapper(self, timestamp, *args, unwrap: bool = False): + def wrapper(self, timestamp, *args, unwrap: bool = False, **kwargs): """Wrap pipeline step method to measure execution time. Parameters: @@ -784,7 +792,7 @@ def PipelineStepMethod(method): """ if unwrap: - return method(self, timestamp, *args) + return method(self, timestamp, *args, **kwargs) # Initialize execution time assessment start = time.perf_counter() @@ -794,7 +802,7 @@ def PipelineStepMethod(method): try: # Execute wrapped method - result = method(self, timestamp, *args) + result = method(self, timestamp, *args, **kwargs) except Exception as e: diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index a452e98..c85e576 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -26,49 +26,39 @@ FixationType = TypeVar('Fixation', bound="Fixation") SaccadeType = TypeVar('Saccade', bound="Saccade") # Type definition for type annotation convenience -@dataclass(frozen=True) class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" - deviation_max: float = field(init=False) - """Maximal gaze position distance to the centroïd.""" + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): - def __post_init__(self): + super().__init__(positions, finished, message, **kwargs) - super().__post_init__() + if positions: - positions_array = numpy.asarray(self.positions.values()) - centroid = numpy.mean(positions_array, axis=0) - deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) - - # Update frozen focus attribute using centroid - object.__setattr__(self, 'focus', (centroid[0], centroid[1])) + positions_array = numpy.asarray(self.values()) + centroid = numpy.mean(positions_array, axis=0) + deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) - # Update frozen deviation_max attribute - object.__setattr__(self, 'deviation_max', deviations_array.max()) + # Set focus as positions centroid + self.focus = (centroid[0], centroid[1]) - def point_deviation(self, gaze_position) -> float: - """Get distance of a point from the fixation's centroïd.""" + # Set deviation_max attribute + self.__deviation_max = deviations_array.max() - return numpy.sqrt((self.focus[0] - gaze_position.value[0])**2 + (self.focus[1] - gaze_position.value[1])**2) + @property + def deviation_max(self): + """Get fixation's maximal deviation.""" + return self.__deviation_max - def overlap(self, fixation) -> bool: + def overlap(self, fixation: FixationType) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" - - positions_array = numpy.asarray(self.positions.values()) - centroid = numpy.array(list(self.focus)) + + positions_array = numpy.asarray(fixation.values()) + centroid = numpy.mean(self.focus, axis=0) deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) - return deviations_array.min() <= self.deviation_max - - def merge(self, fixation) -> FixationType: - """Merge another fixation into this fixation.""" - - self.positions.append(fixation.positions) - self.__post_init__() - - return self - + return min(deviations_array) <= self.deviation_max + def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None): """Draw fixation into image. @@ -93,12 +83,12 @@ class Fixation(GazeFeatures.Fixation): self.draw_positions(image, **draw_positions) -@dataclass(frozen=True) class Saccade(GazeFeatures.Saccade): """Define dispersion based saccade.""" - def __post_init__(self): - super().__post_init__() + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): + + super().__init__(positions, finished, message, **kwargs) def draw(self, image: numpy.array, line_color: tuple = None): """Draw saccade into image. @@ -115,7 +105,6 @@ class Saccade(GazeFeatures.Saccade): cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) -@dataclass class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): """Implementation of the I-DT algorithm as described in: @@ -123,25 +112,36 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): *Identifying fixations and saccades in eye-tracking protocols.* Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA'00, 71-78). [https://doi.org/10.1145/355017.355028](https://doi.org/10.1145/355017.355028) + + Parameters: + deviation_max_threshold: Maximal distance allowed to consider a gaze movement as a fixation. + duration_min_threshold: Minimal duration allowed to consider a gaze movement as a fixation. \ + It is also used as maximal duration allowed to wait valid gaze positions. """ - deviation_max_threshold: int|float - """Maximal distance allowed to consider a gaze movement as a fixation.""" - - duration_min_threshold: int|float - """Minimal duration allowed to consider a gaze movement as a fixation. - It is also used as maximal duration allowed to wait valid gaze positions.""" - - def __post_init__(self): + def __init__(self, deviation_max_threshold: int|float, duration_min_threshold: int|float): super().__init__() + self.__deviation_max_threshold = deviation_max_threshold + self.__duration_min_threshold = duration_min_threshold + self.__valid_positions = GazeFeatures.TimeStampedGazePositions() self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() + @property + def deviation_max_threshold(self): + """Get identifier's deviation max threshold.""" + return self.__deviation_max_threshold + + @property + def duration_min_threshold(self): + """Get identifier duration min threshold.""" + return self.__duration_min_threshold + @DataFeatures.PipelineStepMethod - def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType: + def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: # Ignore non valid gaze position if not gaze_position: @@ -151,9 +151,9 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Check if too much time elapsed since last valid gaze position if len(self.__valid_positions) > 0: - ts_last, _ = self.__valid_positions[-1] + ts_last = self.__valid_positions[-1].timestamp - if (ts - ts_last) > self.duration_min_threshold: + if (timestamp - ts_last) > self.__duration_min_threshold: # Get last movement last_movement = self.current_gaze_movement.finish() @@ -172,17 +172,14 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Store gaze positions until a minimal duration self.__valid_positions.append(gaze_position) - first_ts, _ = self.__valid_positions[0] - last_ts, _ = self.__valid_positions[-1] - # Once the minimal duration is reached - if last_ts - first_ts >= self.duration_min_threshold: + if self.__valid_positions.duration >= self.__duration_min_threshold: # Calculate the deviation of valid gaze positions deviation = Fixation(self.__valid_positions).deviation_max # Valid gaze positions deviation small enough - if deviation <= self.deviation_max_threshold: + if deviation <= self.__deviation_max_threshold: last_saccade = GazeFeatures.GazeMovement() @@ -190,8 +187,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): if len(self.__saccade_positions) > 0: # Copy oldest valid position into saccade positions - first_ts, first_position = self.__valid_positions[0] - self.__saccade_positions.append(first_position) + self.__saccade_positions.append(self.__valid_positions[0]) # Finish last saccade last_saccade = self.current_saccade.finish() @@ -214,8 +210,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): if len(self.__fixation_positions) > 0: # Copy most recent fixation position into saccade positions - last_ts, last_position = self.__fixation_positions[-1] - self.__saccade_positions.append(last_position) + self.__saccade_positions.append(self.__fixation_positions[-1]) # Finish last fixation last_fixation = self.current_fixation.finish() @@ -233,8 +228,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return last_fixation if not terminate else self.current_saccade.finish() # Move oldest valid position into saccade positions - first_ts, first_position = self.__valid_positions.pop_first() - self.__saccade_positions.append(first_position) + self.__saccade_positions.append(self.__valid_positions.pop(0)) # Always return unvalid gaze movement at least return GazeFeatures.GazeMovement() diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 71c643e..eac9e5c 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -520,31 +520,34 @@ class TimeStampedGazeMovements(DataFeatures.TimestampedObjectsList): GazeStatusType = TypeVar('GazeStatus', bound="GazeStatus") # Type definition for type annotation convenience -@dataclass(frozen=True) -class GazeStatus(GazePosition): - """Define gaze status as a gaze position belonging to an identified and indexed gaze movement.""" +class GazeStatus(list, DataFeatures.TimestampedObject): + """Define gaze status as a list of 1 or 2 (index, GazeMovementType) tuples. - movement_type: str = field(kw_only=True) - """GazeMovement type to which gaze position belongs.""" + Parameters: + position: the position that the status represents. + """ - movement_index: int = field(kw_only=True) - """GazeMovement index to which gaze positon belongs.""" + def __init__(self, position: GazePosition): - @classmethod - def from_position(cls, gaze_position: GazePosition, movement_type: str, movement_index: int) -> GazeStatusType: - """Initialize from a gaze position instance.""" + DataFeatures.TimestampedObject.__init__(self, timestamp=position.timestamp) + + self.__position = position + + @property + def position(self) -> GazePosition: + """Get gaze status position.""" + return self.__position + + def append(self, movement_index: int, movement_type:type): + """Append movement index and type.""" - return cls(gaze_position, precision=gaze_position.precision, movement_type=movement_type, movement_index=movement_index) + super().append((movement_index, movement_type)) TimeStampedGazeStatusType = TypeVar('TimeStampedGazeStatus', bound="TimeStampedGazeStatus") # Type definition for type annotation convenience class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList): - """Handle timestamped gaze movements into a list - - !!! note - List of gaze status are required as a gaze position can belongs to two consecutive gaze movements as last and first position. - """ + """Handle timestamped gaze status into a list.""" def __init__(self): @@ -568,7 +571,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): terminate: allows to notify identification algorithm that given gaze position will be the last one. Returns: - finished_gaze_movement: identified gaze movement once it is finished otherwise it returns unvalid gaze movement. + gaze_movement: identified gaze movement once it is finished otherwise it returns empty gaze movement. """ raise NotImplementedError('identify() method not implemented') @@ -612,57 +615,31 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): # Iterate on gaze positions for gaze_position in ts_gaze_positions: - finished_gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) - - if is_fixation(finished_gaze_movement): + gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) - ts_fixations.append(finished_gaze_movement) + if gaze_movement: # First gaze movement position is always shared with previous gaze movement - for movement_position in finished_gaze_movement: - - gaze_status = GazeStatus.from_position(movement_position, 'Fixation', len(ts_fixations)) - - if movement_position.timestamp != finished_gaze_movement.timestamp: - - ts_status.append([gaze_status]) - - else: - - try: - - ts_status[finished_gaze_movement.timestamp].append(gaze_status) - - except KeyError: - - ts_status[finished_gaze_movement.timestamp] = [gaze_status] - - elif is_saccade(finished_gaze_movement): - - ts_saccades.append(finished_gaze_movement) - - # First gaze movement position is always shared with previous gaze movement - for movement_position in finished_gaze_movement: - - gaze_status = GazeStatus.from_position(position, 'Saccade', len(ts_saccades)) - - if movement_position.timestamp != finished_gaze_movement.timestamp: - - ts_status.append([gaze_status]) + for movement_position in gaze_movement: - else: + # Is a status already exist for this position? + gaze_status = ts_status.look_for(movement_position.timestamp) - try: + if not gaze_status: + + gaze_status = GazeStatus(movement_position) + ts_status.append(gaze_status) - ts_status[finished_gaze_movement.timestamp].append(gaze_status) + gaze_status.append(len(ts_fixations), type(gaze_movement)) - except KeyError: + # Store gaze movment into the appropriate list + if is_fixation(gaze_movement): - ts_status[finished_gaze_movement.timestamp] = [gaze_status] + ts_fixations.append(gaze_movement) - else: + elif is_saccade(gaze_movement): - continue + ts_saccades.append(gaze_movement) return ts_fixations, ts_saccades, ts_status @@ -674,7 +651,7 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): Returns: timestamp: first gaze position date of identified gaze movement - finished_gaze_movement: identified gaze movement once it is finished + gaze_movement: identified gaze movement once it is finished """ assert(type(ts_gaze_positions) == TimeStampedGazePositions) @@ -685,11 +662,11 @@ class GazeMovementIdentifier(DataFeatures.PipelineStepObject): # Iterate on gaze positions for gaze_position in ts_gaze_positions: - finished_gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) + gaze_movement = self.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) - if finished_gaze_movement: + if gaze_movement: - yield finished_gaze_movement + yield gaze_movement ScanStepType = TypeVar('ScanStep', bound="ScanStep") # Type definition for type annotation convenience -- cgit v1.1 From 1a3aac125980019ae86493782795569327bc8eaa Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 11:09:32 +0100 Subject: Fixing VelocityThresholdIdentification tests. --- .../DispersionThresholdIdentification.py | 16 +-- .../VelocityThresholdIdentification.py | 123 ++++++++------------ .../DispersionThresholdIdentification.py | 18 +-- .../VelocityThresholdIdentification.py | 129 ++++++++++----------- 4 files changed, 124 insertions(+), 162 deletions(-) diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index 07496c3..156f6f1 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -359,13 +359,10 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) - # Get last ts to terminate identification on last gaze position - last_ts = ts_gaze_positions[-1].timestamp - # Iterate on gaze positions for gaze_position in ts_gaze_positions: - finished_gaze_movement = gaze_movement_identifier.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == last_ts)) + finished_gaze_movement = gaze_movement_identifier.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == ts_gaze_positions[-1].timestamp)) if GazeFeatures.is_fixation(finished_gaze_movement): @@ -382,13 +379,6 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): self.assertLessEqual(finished_gaze_movement.duration, max_time) self.assertLessEqual(finished_gaze_movement.finished, True) - # Check that last gaze position date is not equal to given gaze position date - if finished_gaze_movement: - - last_ts = finished_gaze_movement[-1].timestamp - - self.assertNotEqual(last_ts, gaze_position.timestamp) - # Check that last gaze position date of current fixation is equal to given gaze position date # NOTE: This is not true for saccade as, for I-DT, there is a minimal time window while the gaze movement is unknown current_gaze_movement = gaze_movement_identifier.current_gaze_movement @@ -396,9 +386,7 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): if GazeFeatures.is_fixation(current_gaze_movement): - last_ts = current_gaze_movement[-1].timestamp - - self.assertEqual(last_ts, gaze_position.timestamp) + self.assertEqual(current_gaze_movement[-1].timestamp, gaze_position.timestamp) def test_identification_generator(self): """Test DispersionThresholdIdentification identification using generator.""" diff --git a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py index 24f2e3c..262cfc0 100644 --- a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py @@ -17,8 +17,8 @@ from argaze.GazeAnalysis import VelocityThresholdIdentification import numpy -def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float, min_time: float, max_time: float, start_ts: float = 0., validity: list = []): - """ Generate N TimeStampedGazePositions strating from a starting position for testing purpose. +def build_gaze_fixation(size: int, center: tuple, deviation_max: float, min_time: float, max_time: float, start_ts: float = 0., validity: list = []): + """ Generate N TimeStampedGazePsoitions dispersed around a center point for testing purpose. Timestamps are current time after random sleep (second). GazePositions are random values. """ @@ -26,8 +26,6 @@ def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float, start_time = time.time() - last_valid_position = start_position - for i in range(0, size): # Sleep a random time @@ -43,21 +41,19 @@ def build_gaze_fixation(size: int, start_position: tuple, deviation_max: float, if valid: # Edit gaze position - random_x = last_valid_position[0] + deviation_max * (random.random() - 0.5) / math.sqrt(2) - random_y = last_valid_position[1] + deviation_max * (random.random() - 0.5) / math.sqrt(2) - + random_x = center[0] + deviation_max * (random.random() - 0.5) / math.sqrt(2) + random_y = center[1] + deviation_max * (random.random() - 0.5) / math.sqrt(2) gaze_position = GazeFeatures.GazePosition((random_x, random_y)) - # Remember last valid gaze position - last_valid_position = gaze_position.value - else: gaze_position = GazeFeatures.GazePosition() + # Timestamp gaze position + gaze_position.timestamp = time.time() - start_time + start_ts + # Store gaze position - ts = time.time() - start_time + start_ts - ts_gaze_positions[ts] = gaze_position + ts_gaze_positions.append(gaze_position) return ts_gaze_positions @@ -93,9 +89,11 @@ def build_gaze_saccade(size: int, center_A: tuple, center_B: tuple, min_time: fl gaze_position = GazeFeatures.GazePosition() + # Timestamp gaze position + gaze_position.timestamp = time.time() - start_time + start_ts + # Store gaze position - ts = time.time() - start_time + start_ts - ts_gaze_positions[ts] = gaze_position + ts_gaze_positions.append(gaze_position) return ts_gaze_positions @@ -122,9 +120,9 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), size - 1) # Check fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size - 1) + self.assertEqual(len(fixation), size - 1) self.assertGreaterEqual(fixation.duration, (size - 2) * min_time) self.assertLessEqual(fixation.duration, (size - 2) * max_time) self.assertLessEqual(fixation.finished, True) @@ -141,9 +139,9 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): velocity_max = deviation_max / min_time ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B gaze_movement_identifier = VelocityThresholdIdentification.GazeMovementIdentifier(velocity_max_threshold=velocity_max, duration_min_threshold=max_time*2) ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions) @@ -154,42 +152,36 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), size * 2 - 1) # Check first fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size - 1) + self.assertEqual(len(fixation), size - 1) self.assertGreaterEqual(fixation.duration, (size - 2) * min_time) self.assertLessEqual(fixation.duration, (size - 2) * max_time) self.assertLessEqual(fixation.finished, True) # Check first saccade - ts, saccade = ts_saccades.pop_first() + saccade = ts_saccades.pop(0) - self.assertEqual(len(saccade.positions.keys()), 2) + self.assertEqual(len(saccade), 2) self.assertGreaterEqual(saccade.duration, min_time) self.assertLessEqual(saccade.duration, max_time) self.assertLessEqual(saccade.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = fixation.positions.last - first_ts, first_position = saccade.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) + self.assertEqual(fixation[-1].value, saccade[0].value) # Check second fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size) + self.assertEqual(len(fixation), size) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) self.assertLessEqual(fixation.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = saccade.positions.last - first_ts, first_position = fixation.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(saccade[-1].timestamp, fixation[0].timestamp) + self.assertEqual(saccade[-1].value, fixation[0].value) def test_fixation_and_short_saccade_identification(self): """Test VelocityThresholdIdentification fixation and saccade identification.""" @@ -205,10 +197,10 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): velocity_max = deviation_max / min_time ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A.last[0]) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions.last[0]) + ts_move_positions = build_gaze_saccade(move, out_A, center_B, min_time, min_time, start_ts=ts_gaze_positions_A[-1].timestamp) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_move_positions[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_move_positions).append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_move_positions + ts_gaze_positions_B gaze_movement_identifier = VelocityThresholdIdentification.GazeMovementIdentifier(velocity_max_threshold=velocity_max, duration_min_threshold=max_time*2) ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions) @@ -219,42 +211,36 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), 2 * size + move - 1) # Check first fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size - 1) # BUG: NOT ALWAYS TRUE !!! + self.assertEqual(len(fixation), size - 1) # BUG: NOT ALWAYS TRUE !!! self.assertGreaterEqual(fixation.duration, (size - 2) * min_time) self.assertLessEqual(fixation.duration, (size - 2) * max_time) self.assertLessEqual(fixation.finished, True) # Check first saccade - ts, saccade = ts_saccades.pop_first() + saccade = ts_saccades.pop(0) - self.assertEqual(len(saccade.positions.keys()), move + 2) + self.assertEqual(len(saccade), move + 2) self.assertGreaterEqual(saccade.duration, (move + 1) * min_time) self.assertLessEqual(saccade.duration, (move + 1) * max_time) self.assertLessEqual(saccade.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = fixation.positions.last - first_ts, first_position = saccade.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(fixation[-1].timestamp, saccade[0].timestamp) + self.assertEqual(fixation[-1].value, saccade[0].value) # Check second fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), size) + self.assertEqual(len(fixation), size) self.assertGreaterEqual(fixation.duration, (size - 1) * min_time) self.assertLessEqual(fixation.duration, (size - 1) * max_time) self.assertLessEqual(fixation.finished, True) # Check that last position of a movement is equal to first position of next movement - last_ts, last_position = saccade.positions.last - first_ts, first_position = fixation.positions.first - - self.assertEqual(last_ts, first_ts) - self.assertEqual(last_position.value, first_position.value) + self.assertEqual(saccade[-1], fixation[0]) + self.assertEqual(saccade[-1].value, fixation[0].value) def test_invalid_gaze_position(self): """Test VelocityThresholdIdentification fixation and saccade identification with invalid gaze position.""" @@ -278,17 +264,17 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): self.assertEqual(len(ts_status), len(validity)-5) # Check first fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), 6) + self.assertEqual(len(fixation), 6) self.assertGreaterEqual(fixation.duration, 5 * min_time) self.assertLessEqual(fixation.duration, 5 * max_time) self.assertLessEqual(fixation.finished, True) # Check second fixation - ts, fixation = ts_fixations.pop_first() + fixation = ts_fixations.pop(0) - self.assertEqual(len(fixation.positions.keys()), 4) + self.assertEqual(len(fixation), 4) self.assertGreaterEqual(fixation.duration, 3 * min_time) self.assertLessEqual(fixation.duration, 3 * max_time) self.assertLessEqual(fixation.finished, True) @@ -305,34 +291,27 @@ class TestVelocityThresholdIdentificationClass(unittest.TestCase): velocity_max = deviation_max / min_time ts_gaze_positions_A = build_gaze_fixation(size, center_A, deviation_max, min_time, max_time) - ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A.last[0]) + ts_gaze_positions_B = build_gaze_fixation(size, center_B, deviation_max, min_time, max_time, start_ts=ts_gaze_positions_A[-1].timestamp) - ts_gaze_positions = ts_gaze_positions_A.append(ts_gaze_positions_B) + ts_gaze_positions = ts_gaze_positions_A + ts_gaze_positions_B gaze_movement_identifier = VelocityThresholdIdentification.GazeMovementIdentifier(velocity_max_threshold=velocity_max, duration_min_threshold=max_time*2) - - # Get last ts to terminate identification on last gaze position - last_ts, _ = ts_gaze_positions.last # Iterate on gaze positions - for ts, gaze_position in ts_gaze_positions.items(): + for gaze_position in ts_gaze_positions: - finished_gaze_movement = gaze_movement_identifier.identify(ts, gaze_position, terminate=(ts == last_ts)) + finished_gaze_movement = gaze_movement_identifier.identify(gaze_position.timestamp, gaze_position, terminate=(gaze_position.timestamp == ts_gaze_positions[-1])) # Check that last gaze position date is not equal to given gaze position date - if finished_gaze_movement.valid: + if finished_gaze_movement: - last_ts, _ = finished_gaze_movement.positions.last - - self.assertNotEqual(last_ts, ts) + self.assertNotEqual(finished_gaze_movement[-1].timestamp, gaze_position.timestamp) # Check that last gaze position date of current movement is equal to given gaze position date current_gaze_movement = gaze_movement_identifier.current_gaze_movement - if current_gaze_movement.valid: - - last_ts, _ = current_gaze_movement.positions.last + if current_gaze_movement: - self.assertEqual(last_ts, ts) + self.assertEqual(current_gaze_movement[-1].timestamp, gaze_position.timestamp) if __name__ == '__main__': diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index c85e576..13529e7 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -100,8 +100,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - _, start_position = self.positions[0] - _, last_position = self.positions[-1] + start_position = self.positions[0] + last_position = self.positions[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @@ -143,13 +143,13 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @DataFeatures.PipelineStepMethod def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: - # Ignore non valid gaze position + # Ignore empty gaze position if not gaze_position: return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() # Check if too much time elapsed since last valid gaze position - if len(self.__valid_positions) > 0: + if self.__valid_positions: ts_last = self.__valid_positions[-1].timestamp @@ -184,7 +184,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): last_saccade = GazeFeatures.GazeMovement() # Is there saccade positions? - if len(self.__saccade_positions) > 0: + if self.__saccade_positions: # Copy oldest valid position into saccade positions self.__saccade_positions.append(self.__valid_positions[0]) @@ -207,7 +207,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): last_fixation = GazeFeatures.GazeMovement() # Is there fixation positions? - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: # Copy most recent fixation position into saccade positions self.__saccade_positions.append(self.__fixation_positions[-1]) @@ -237,9 +237,9 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): def current_gaze_movement(self) -> GazeMovementType: # It shouldn't have a current fixation and a current saccade at the same time - assert(not (len(self.__fixation_positions) > 0 and len(self.__saccade_positions) > 0)) + assert(not (self.__fixation_positions and len(self.__saccade_positions) > 1)) - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: return Fixation(self.__fixation_positions) @@ -252,7 +252,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @property def current_fixation(self) -> FixationType: - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: return Fixation(self.__fixation_positions) diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index 971ba9b..c1d448a 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -25,53 +25,39 @@ FixationType = TypeVar('Fixation', bound="Fixation") SaccadeType = TypeVar('Saccade', bound="Saccade") # Type definition for type annotation convenience -@dataclass(frozen=True) class Fixation(GazeFeatures.Fixation): """Define dispersion based fixation.""" - deviation_max: float = field(init=False) - """Maximal gaze position distance to the centroïd.""" + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): - def __post_init__(self): + super().__init__(positions, finished, message, **kwargs) - super().__post_init__() + if positions: - points = self.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([numpy.mean(points_x), numpy.mean(points_y)]) - deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) + positions_array = numpy.asarray(self.values()) + centroid = numpy.mean(positions_array, axis=0) + deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) - # Update frozen focus attribute using centroid - object.__setattr__(self, 'focus', (centroid_array[0], centroid_array[1])) + # Set focus as positions centroid + self.focus = (centroid[0], centroid[1]) - # Update frozen deviation_max attribute - object.__setattr__(self, 'deviation_max', max(deviations_array)) + # Set deviation_max attribute + self.__deviation_max = deviations_array.max() - def point_deviation(self, gaze_position) -> float: - """Get distance of a point from the fixation's centroïd.""" - - return numpy.sqrt((self.centroid[0] - gaze_position.value[0])**2 + (self.centroid[1] - gaze_position.value[1])**2) + @property + def deviation_max(self): + """Get fixation's maximal deviation.""" + return self.__deviation_max - def overlap(self, fixation) -> bool: + def overlap(self, fixation: FixationType) -> bool: """Does a gaze position from another fixation having a deviation to this fixation centroïd smaller than maximal deviation?""" - - points = fixation.positions.values() - points_x, points_y = [p[0] for p in points], [p[1] for p in points] - points_array = numpy.column_stack([points_x, points_y]) - centroid_array = numpy.array([self.centroid[0], self.centroid[1]]) - deviations_array = numpy.sqrt(numpy.sum((points_array - centroid_array)**2, axis=1)) + + positions_array = numpy.asarray(fixation.values()) + centroid = numpy.mean(self.focus, axis=0) + deviations_array = numpy.sqrt(numpy.sum((positions_array - centroid)**2, axis=1)) return min(deviations_array) <= self.deviation_max - - def merge(self, fixation) -> FixationType: - """Merge another fixation into this fixation.""" - - self.positions.append(fixation.positions) - self.__post_init__() - - return self - + def draw(self, image: numpy.array, deviation_circle_color: tuple = None, duration_border_color: tuple = None, duration_factor: float = 1., draw_positions: dict = None): """Draw fixation into image. @@ -85,7 +71,7 @@ class Fixation(GazeFeatures.Fixation): if duration_border_color is not None: cv2.circle(image, (int(self.focus[0]), int(self.focus[1])), int(self.deviation_max), duration_border_color, int(self.duration * duration_factor)) - + # Draw deviation circle if required if deviation_circle_color is not None: @@ -96,12 +82,12 @@ class Fixation(GazeFeatures.Fixation): self.draw_positions(image, **draw_positions) -@dataclass(frozen=True) class Saccade(GazeFeatures.Saccade): """Define dispersion based saccade.""" - def __post_init__(self): - super().__post_init__() + def __init__(self, positions: GazeFeatures.TimeStampedGazePositions = (), finished: bool = False, message: str = None, **kwargs): + + super().__init__(positions, finished, message, **kwargs) def draw(self, image: numpy.array, line_color: tuple = None): """Draw saccade into image. @@ -113,8 +99,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - _, start_position = self.positions.first - _, last_position = self.positions.last + start_position = self.positions[0] + last_position = self.positions[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @@ -126,45 +112,56 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): saccades in eye-tracking protocols. In Proceedings of the 2000 symposium on Eye tracking research & applications (ETRA '00). ACM, New York, NY, USA, 71-78. [http://dx.doi.org/10.1145/355017.355028](http://dx.doi.org/10.1145/355017.355028) - """ - - velocity_max_threshold: int|float - """Maximal velocity allowed to consider a gaze movement as a fixation.""" - duration_min_threshold: int|float - """Minimal duration allowed to wait valid gaze positions.""" + Parameters: + velocity_max_threshold: Maximal velocity allowed to consider a gaze movement as a fixation. + duration_min_threshold: Minimal duration allowed to wait valid gaze positions. + """ - def __post_init__(self): + def __init__(self, velocity_max_threshold: int|float, duration_min_threshold: int|float): super().__init__() + self.__velocity_max_threshold = velocity_max_threshold + self.__duration_min_threshold = duration_min_threshold + self.__last_ts = -1 self.__last_position = None self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() + @property + def velocity_max_threshold(self): + """Get identifier's velocity max threshold.""" + return self.__velocity_max_threshold + + @property + def duration_min_threshold(self): + """Get identifier duration min threshold.""" + return self.__duration_min_threshold + @DataFeatures.PipelineStepMethod - def identify(self, ts: int|float, gaze_position, terminate=False) -> GazeMovementType: + def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: - # Ignore non valid gaze position - if not gaze_position.valid: + # Ignore empty gaze position + if not gaze_position: return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() # Store first valid position if self.__last_ts < 0: - self.__last_ts = ts + self.__last_ts = timestamp self.__last_position = gaze_position return GazeFeatures.GazeMovement() # Check if too much time elapsed since last gaze position - if (ts - self.__last_ts) > self.duration_min_threshold: + if (timestamp - self.__last_ts) > self.duration_min_threshold: # Remember last position - self.__last_ts = ts + self.__last_ts = timestamp self.__last_position = gaze_position # Get last movement @@ -178,10 +175,10 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): return last_movement # Velocity - velocity = abs(gaze_position.distance(self.__last_position) / (ts - self.__last_ts)) + velocity = abs(gaze_position.distance(self.__last_position) / (timestamp - self.__last_ts)) # Remember last position - self.__last_ts = ts + self.__last_ts = timestamp self.__last_position = gaze_position # Velocity is greater than threshold @@ -193,8 +190,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): if len(self.__fixation_positions) > 0: # Copy most recent fixation position into saccade positions - last_ts, last_position = self.__fixation_positions.last - self.__saccade_positions[last_ts] = last_position + self.__saccade_positions.append(self.__fixation_positions[-1]) # Create last fixation last_fixation = self.current_fixation.finish() @@ -203,7 +199,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__fixation_positions = GazeFeatures.TimeStampedGazePositions() # Append to saccade positions - self.__saccade_positions[ts] = gaze_position + self.__saccade_positions.append(gaze_position) # Output last fixation return last_fixation if not terminate else self.current_saccade.finish() @@ -214,11 +210,10 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): last_saccade = GazeFeatures.GazeMovement() # Does last saccade exist? - if len(self.__saccade_positions) > 0: + if self.__saccade_positions: # Copy most recent saccade position into fixation positions - last_ts, last_position = self.__saccade_positions.last - self.__fixation_positions[last_ts] = last_position + self.__fixation_positions.append(self.__saccade_positions[-1]) # Create last saccade last_saccade = self.current_saccade.finish() @@ -227,7 +222,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): self.__saccade_positions = GazeFeatures.TimeStampedGazePositions() # Append to fixation positions - self.__fixation_positions[ts] = gaze_position + self.__fixation_positions.append(gaze_position) # Output last saccade return last_saccade if not terminate else self.current_fixation.finish() @@ -239,13 +234,13 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): def current_gaze_movement(self) -> GazeMovementType: # It shouldn't have a current fixation and a current saccade at the same time - assert(not (len(self.__fixation_positions) > 0 and len(self.__saccade_positions) > 0)) + assert(not (self.__fixation_positions and self.__saccade_positions)) - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: return Fixation(self.__fixation_positions) - if len(self.__saccade_positions) > 0: + if len(self.__saccade_positions) > 1: return Saccade(self.__saccade_positions) @@ -255,7 +250,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @property def current_fixation(self) -> FixationType: - if len(self.__fixation_positions) > 0: + if self.__fixation_positions: return Fixation(self.__fixation_positions) @@ -265,7 +260,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @property def current_saccade(self) -> SaccadeType: - if len(self.__saccade_positions) > 0: + if len(self.__saccade_positions) > 1: return Saccade(self.__saccade_positions) -- cgit v1.1 From cd601be0b9366a9bd1554523319e57801440ed64 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 14:18:50 +0100 Subject: More work on time management. --- .../advanced_topics/scripting.md | 2 +- .../DispersionThresholdIdentification.py | 3 + src/argaze.test/GazeFeatures.py | 43 +++++------- src/argaze.test/PupillFeatures.py | 60 ++++++----------- src/argaze/ArFeatures.py | 26 +++++--- src/argaze/DataFeatures.py | 46 ++----------- src/argaze/GazeAnalysis/DeviationCircleCoverage.py | 2 +- .../DispersionThresholdIdentification.py | 10 +-- .../VelocityThresholdIdentification.py | 4 +- src/argaze/GazeFeatures.py | 26 +++++--- src/argaze/PupillAnalysis/WorkloadIndex.py | 44 +++++++----- src/argaze/PupillFeatures.py | 78 +++++++--------------- 12 files changed, 139 insertions(+), 205 deletions(-) diff --git a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md index 8c21dec..5999cbc 100644 --- a/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md +++ b/docs/user_guide/gaze_analysis_pipeline/advanced_topics/scripting.md @@ -80,7 +80,7 @@ Calling [ArFrame.look](../../../argaze.md/#argaze.ArFeatures.ArFrame.look) metho ... ar_frame.last_gaze_position # Check if a gaze movement has been identified - if ar_frame.last_gaze_movement.valid and ar_frame.last_gaze_movement.finished: + if ar_frame.last_gaze_movement and ar_frame.last_gaze_movement.finished: # Do something with identified fixation if GazeFeatures.is_fixation(ar_frame.last_gaze_movement): diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index 156f6f1..311f31b 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -113,6 +113,9 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions) + # DEBUG + print(gaze_movement_identifier) + # Check result size self.assertEqual(len(ts_fixations), 1) self.assertEqual(len(ts_saccades), 0) diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index c6ccfca..e678093 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -31,6 +31,9 @@ def random_gaze_positions(size, frame_dimension: tuple[float, float] = (1, 1)): # Edit gaze position random_gaze_position = GazeFeatures.GazePosition((random.random() * frame_dimension[0], random.random() * frame_dimension[1]), precision=5) + # DEBUG + print('random_gaze_position', type(random_gaze_position), random_gaze_position.__class__.__bases__) + # Timestamp gaze position random_gaze_position.timestamp = time.time() @@ -375,13 +378,11 @@ def build_scan_path(size, frame_dimension: tuple[float, float] = (1, 1)): for i in range(size): - fixation = TestFixation(random_gaze_positions(10, frame_dimension)) - ts, _ = fixation.first - scan_path.append_fixation(ts, fixation) + fixation = TestFixation(random_gaze_positions(10, frame_dimension), timestamp=i) + scan_path.append_fixation(fixation) - saccade = TestSaccade(random_gaze_positions(2, frame_dimension)) - ts, _ = saccade.first - scan_path.append_saccade(ts, saccade) + saccade = TestSaccade(random_gaze_positions(2, frame_dimension), timestamp=i+1) + scan_path.append_saccade(saccade) return scan_path @@ -404,9 +405,7 @@ class TestScanPathClass(unittest.TestCase): # Append a saccade that should be ignored saccade = TestSaccade(random_gaze_positions(2)) - ts = saccade[0].timestamp - - new_step = scan_path.append_saccade(ts, saccade) + new_step = scan_path.append_saccade(saccade) # Check that no scan step have been created yet self.assertEqual(len(scan_path), 0) @@ -415,9 +414,7 @@ class TestScanPathClass(unittest.TestCase): # Append first fixation fixation_A = TestFixation(random_gaze_positions(10)) - ts = fixation_A[0].timestamp - - new_step = scan_path.append_fixation(ts, fixation_A) + new_step = scan_path.append_fixation(fixation_A) # Check that no scan step have been created yet self.assertEqual(len(scan_path), 0) @@ -426,9 +423,7 @@ class TestScanPathClass(unittest.TestCase): # Append consecutive saccade saccade_A = TestSaccade(random_gaze_positions(2)) - ts = saccade_A[0].timestamp - - new_step_A = scan_path.append_saccade(ts, saccade_A) + new_step_A = scan_path.append_saccade(saccade_A) # Check that new scan step have been created self.assertEqual(len(scan_path), 1) @@ -439,9 +434,7 @@ class TestScanPathClass(unittest.TestCase): # Append 2 consecutive fixations then a saccade fixation_B1 = TestFixation(random_gaze_positions(10)) - ts = fixation_B1[0].timestamp - - new_step = scan_path.append_fixation(ts, fixation_B1) + new_step = scan_path.append_fixation(fixation_B1) # Check that no scan step have been created yet self.assertEqual(len(scan_path), 1) @@ -449,9 +442,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(new_step, None) fixation_B2 = TestFixation(random_gaze_positions(10)) - ts = fixation_B2[0].timestamp - - new_step = scan_path.append_fixation(ts, fixation_B2) + new_step = scan_path.append_fixation(fixation_B2) # Check that no scan step have been created yet self.assertEqual(len(scan_path), 1) @@ -459,9 +450,7 @@ class TestScanPathClass(unittest.TestCase): self.assertEqual(new_step, None) saccade_B = TestSaccade(random_gaze_positions(2)) - ts = saccade_B[0].timestamp - - new_step_B = scan_path.append_saccade(ts, saccade_B) + new_step_B = scan_path.append_saccade(saccade_B) # Check that new scan step have been created self.assertEqual(len(scan_path), 2) @@ -521,12 +510,12 @@ def build_aoi_scan_path(expected_aoi, aoi_path): # Append a hidden last step to allow last given step creation aoi_path.append(aoi_path[-2]) - for aoi in aoi_path: + for i, aoi in enumerate(aoi_path): - fixation = TestFixation(random_gaze_positions(10)) + fixation = TestFixation(random_gaze_positions(10), timestamp=i) aoi_scan_path.append_fixation(fixation, aoi) - saccade = TestSaccade(random_gaze_positions(2)) + saccade = TestSaccade(random_gaze_positions(2), timestamp=i+1) aoi_scan_path.append_saccade(saccade) return aoi_scan_path diff --git a/src/argaze.test/PupillFeatures.py b/src/argaze.test/PupillFeatures.py index f0e8e1b..9cf26eb 100644 --- a/src/argaze.test/PupillFeatures.py +++ b/src/argaze.test/PupillFeatures.py @@ -8,6 +8,7 @@ __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" import unittest +import math from argaze import PupillFeatures @@ -43,14 +44,12 @@ class TestPupillDiameterClass(unittest.TestCase): # Check empty PupillDiameter empty_pupill_diameter = PupillFeatures.PupillDiameter() - self.assertEqual(empty_pupill_diameter.value, 0.) - self.assertEqual(empty_pupill_diameter.valid, False) + self.assertEqual(empty_pupill_diameter, math.nan) # Check float PupillDiameter float_pupill_diameter = PupillFeatures.PupillDiameter(1.23) - self.assertEqual(float_pupill_diameter.value, 1.23) - self.assertEqual(float_pupill_diameter.valid, True) + self.assertEqual(float_pupill_diameter, 1.23) def test_properties(self): """Test PupillDiameter properties cannot be modified after creation.""" @@ -60,32 +59,16 @@ class TestPupillDiameterClass(unittest.TestCase): # Check that pupill diameter value setting fails with self.assertRaises(AttributeError): - pupill_diameter.value = 123 + pupill_diameter = 123 - self.assertNotEqual(pupill_diameter.value, 123) - self.assertEqual(pupill_diameter.value, 0.) + self.assertNotEqual(pupill_diameter, 123) + self.assertEqual(pupill_diameter, math.nan) def test___repr__(self): """Test PupillDiameter string representation.""" # Check empty PupillDiameter representation - self.assertEqual(repr(PupillFeatures.PupillDiameter()), "{\"value\": 0.0}") - -class TestUnvalidPupillDiameterClass(unittest.TestCase): - """Test UnvalidPupillDiameter class.""" - - def test_new(self): - """Test UnvalidPupillDiameter creation.""" - - unvalid_pupill_diameter = PupillFeatures.UnvalidPupillDiameter() - - self.assertEqual(unvalid_pupill_diameter.value, 0.) - self.assertEqual(unvalid_pupill_diameter.valid, False) - - def test___repr__(self): - """Test UnvalidPupillDiameter string representation.""" - - self.assertEqual(repr(PupillFeatures.UnvalidPupillDiameter()), "{\"message\": null, \"value\": 0.0}") + self.assertEqual(repr(PupillFeatures.PupillDiameter()), "{\"value\": NaN}") class TestTimeStampedPupillDiametersClass(unittest.TestCase): """Test TimeStampedPupillDiameters class.""" @@ -93,22 +76,23 @@ class TestTimeStampedPupillDiametersClass(unittest.TestCase): def test___setitem__(self): """Test __setitem__ method.""" - ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters() - ts_pupill_diameters[0] = PupillFeatures.PupillDiameter() - ts_pupill_diameters[1] = PupillFeatures.UnvalidPupillDiameter() - ts_pupill_diameters[2] = {"value": 1.23} + ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters([ + PupillFeatures.PupillDiameter(), + PupillFeatures.PupillDiameter(0.63), + {"value": 1.23} + ]) - # Check PupillDiameter is correctly stored and accessible as a PupillDiameter + # Check empty PupillDiameter is correctly stored and accessible as a PupillDiameter self.assertIsInstance(ts_pupill_diameters[0], PupillFeatures.PupillDiameter) - self.assertEqual(ts_pupill_diameters[0].valid, False) + self.assertEqual(ts_pupill_diameters[0], math.nan) - # Check UnvalidPupillDiameter is correctly stored and accessible as a UnvalidPupillDiameter - self.assertIsInstance(ts_pupill_diameters[1], PupillFeatures.UnvalidPupillDiameter) - self.assertEqual(ts_pupill_diameters[1].valid, False) + # Check PupillDiameter is correctly stored and accessible as a PupillDiameter + self.assertIsInstance(ts_pupill_diameters[1], PupillFeatures.PupillDiameter) + self.assertEqual(ts_pupill_diameters[0], 0.63) - # Check dict with "value" and "precision" keys is correctly stored and accessible as a PupillDiameter + # Check dict with "value" key is correctly stored and accessible as a PupillDiameter self.assertIsInstance(ts_pupill_diameters[2], PupillFeatures.PupillDiameter) - self.assertEqual(ts_pupill_diameters[2].valid, True) + self.assertEqual(ts_pupill_diameters[0], 1.23) # Check that bad data type insertion fails with self.assertRaises(AssertionError): @@ -125,11 +109,11 @@ class TestTimeStampedPupillDiametersClass(unittest.TestCase): ts_pupill_diameters = PupillFeatures.TimeStampedPupillDiameters() - self.assertEqual(repr(PupillFeatures.TimeStampedPupillDiameters()), "{}") + self.assertEqual(repr(PupillFeatures.TimeStampedPupillDiameters()), "[]") - ts_pupill_diameters[0] = PupillFeatures.PupillDiameter() + ts_pupill_diameters.append(PupillFeatures.PupillDiameter()) - self.assertEqual(repr(ts_pupill_diameters), "{\"0\": {\"value\": 0.0}}") + self.assertEqual(repr(ts_pupill_diameters), "[{\"value\": NaN, \"timestamp\": 0}]") ts_pupill_diameters[0] = PupillFeatures.UnvalidPupillDiameter() diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index b3ecad6..8005d48 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -415,7 +415,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): self.__looked_aoi_name, _ = self.__aoi_matcher.match(timestamp, self.__aoi_scene, gaze_movement) # Valid and finished gaze movement has been identified - if gaze_movement.valid and gaze_movement.finished: + if gaze_movement and gaze_movement.finished: if GazeFeatures.is_fixation(gaze_movement): @@ -908,22 +908,22 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Identify gaze movement if self.__gaze_movement_identifier is not None: - + # Identify finished gaze movement self.__identified_gaze_movement = self.__gaze_movement_identifier.identify(timestamp, self.__calibrated_gaze_position) # Valid and finished gaze movement has been identified - if self.__identified_gaze_movement.valid and self.__identified_gaze_movement.finished: - + if self.__identified_gaze_movement and self.__identified_gaze_movement.finished: + if GazeFeatures.is_fixation(self.__identified_gaze_movement): - + # Append fixation to scan path if self.__scan_path is not None: - + self.__scan_path.append_fixation(self.__identified_gaze_movement) elif GazeFeatures.is_saccade(self.__identified_gaze_movement): - + # Append saccade to scan path if self.__scan_path is not None: @@ -931,10 +931,10 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Is there a new step? if scan_step and len(self.__scan_path) > 1: - + # Analyze aoi scan path for scan_path_analyzer_module_path, scan_path_analyzer in self.__scan_path_analyzers.items(): - + scan_path_analyzer.analyze(timestamp, self.__scan_path) # Update scan path analyzed state @@ -1013,12 +1013,16 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): # Draw current fixation if required if draw_fixations is not None and self.__gaze_movement_identifier is not None: - self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) + if self.__gaze_movement_identifier.current_fixation: + + self.__gaze_movement_identifier.current_fixation.draw(image, **draw_fixations) # Draw current saccade if required if draw_saccades is not None and self.__gaze_movement_identifier is not None: - self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) + if self.__gaze_movement_identifier.current_saccade: + + self.__gaze_movement_identifier.current_saccade.draw(image, **draw_saccades) # Draw layers if required if draw_layers is not None: diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index ff9baec..8df991b 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -149,20 +149,15 @@ class TimestampedObject(): @timestamp.setter def timestamp(self, timestamp: int|float): """Set object timestamp.""" - - assert(type(timestamp) == int or type(timestamp) == float) - self._timestamp = timestamp def untimestamp(self): """Reset object timestamp.""" - self.timestamp = math.nan + self._timestamp = math.nan def is_timestamped(self) -> bool: """Is the object timestamped?""" - timestamped = not math.isnan(self.timestamp) - - return timestamped + return not math.isnan(self._timestamp) class TimestampedObjectsList(list): """Handle timestamped object into a list. @@ -188,7 +183,7 @@ class TimestampedObjectsList(list): def append(self, ts_object: TimestampedObjectType|dict): """Append timestamped object.""" - + # Convert dict into GazePosition if type(ts_object) == dict: @@ -200,7 +195,7 @@ class TimestampedObjectsList(list): if not issubclass(ts_object.__class__, self.__object_type): raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') - + assert(ts_object.is_timestamped()) super().append(ts_object) @@ -421,42 +416,15 @@ class TimestampedObjectsList(list): return legend_patches class SharedObject(TimestampedObject): - """Abstract class to enable multiple threads sharing and timestamp management.""" + """Abstract class to enable multiple threads sharing for timestamped object.""" - def __init__(self): + def __init__(self, timestamp: int|float = math.nan): - super().__init__() + TimestampedObject.__init__(self, timestamp) self._lock = threading.Lock() self._execution_times = {} self._exceptions = {} - @property - def lock(self) -> threading.Lock: - """Get shared object lock object.""" - return self._lock - - @property - def timestamp(self) -> int|float: - """Get shared object timestamp.""" - with self._lock: - return super().timestamp - - @timestamp.setter - def timestamp(self, timestamp: int|float): - """Set shared object timestamp.""" - with self._lock: - super().timestamp = timestamp - - def untimestamp(self): - """Reset shared object timestamp.""" - with self._lock: - self.timestamp = math.nan - - def is_timestamped(self) -> bool: - """Is the object timestamped?""" - with self._lock: - return super().is_timestamped() - class PipelineStepObject(): """ Define class to assess pipeline step methods execution time and observe them. diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py index 62b5e9a..3849d59 100644 --- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py +++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py @@ -107,7 +107,7 @@ class AOIMatcher(GazeFeatures.AOIMatcher): self.__reset() - elif not gaze_movement.valid: + elif not gaze_movement: self.__reset() diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py index 13529e7..2b89cf6 100644 --- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py @@ -100,8 +100,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - start_position = self.positions[0] - last_position = self.positions[-1] + start_position = self[0] + last_position = self[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) @@ -142,12 +142,12 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): @DataFeatures.PipelineStepMethod def identify(self, timestamp: int|float, gaze_position, terminate=False) -> GazeMovementType: - + # Ignore empty gaze position if not gaze_position: return GazeFeatures.GazeMovement() if not terminate else self.current_fixation.finish() - + # Check if too much time elapsed since last valid gaze position if self.__valid_positions: @@ -171,7 +171,7 @@ class GazeMovementIdentifier(GazeFeatures.GazeMovementIdentifier): # Store gaze positions until a minimal duration self.__valid_positions.append(gaze_position) - + # Once the minimal duration is reached if self.__valid_positions.duration >= self.__duration_min_threshold: diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py index c1d448a..a95905f 100644 --- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py +++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py @@ -99,8 +99,8 @@ class Saccade(GazeFeatures.Saccade): # Draw line if required if line_color is not None: - start_position = self.positions[0] - last_position = self.positions[-1] + start_position = self[0] + last_position = self[-1] cv2.line(image, (int(start_position[0]), int(start_position[1])), (int(last_position[0]), int(last_position[1])), line_color, 2) diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index eac9e5c..6a02142 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -34,13 +34,13 @@ class GazePosition(tuple, DataFeatures.TimestampedObject): message: a string to describe why the the position is what it is. """ - def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, **kwargs): + def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan): return tuple.__new__(cls, position) - def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, **kwargs): + def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan): - DataFeatures.TimestampedObject.__init__(self, **kwargs) + DataFeatures.TimestampedObject.__init__(self, timestamp) self.__precision = precision self.__message = message @@ -178,7 +178,7 @@ TimeStampedGazePositionsType = TypeVar('TimeStampedGazePositions', bound="TimeSt # Type definition for type annotation convenience class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): - """Handle timestamped gaze positions into a list""" + """Handle timestamped gaze positions into a list.""" def __init__(self, gaze_positions: list = []): @@ -188,6 +188,7 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): """Get all timestamped position values as list of tuple.""" return [tuple(ts_position) for ts_position in self] + ''' Is it still needed as there is a TimestampedObjectsList.from_json method? @classmethod def from_json(self, json_filepath: str) -> TimeStampedGazePositionsType: """Create a TimeStampedGazePositionsType from .json file.""" @@ -197,6 +198,7 @@ class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList): json_positions = json.load(ts_positions_file) return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions}) + ''' @classmethod def from_dataframe(self, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> TimeStampedGazePositionsType: @@ -367,15 +369,15 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): message: a string to describe why the movement is what it is. """ - def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + def __new__(cls, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan): return TimeStampedGazePositions.__new__(cls, positions) - def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs): + def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, timestamp: int|float = math.nan): """Initialize GazeMovement""" TimeStampedGazePositions.__init__(self, positions) - DataFeatures.TimestampedObject.__init__(self, **kwargs) + DataFeatures.TimestampedObject.__init__(self, timestamp) self.__finished = finished self.__message = message @@ -385,9 +387,13 @@ class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): """Get first position timestamp.""" return self[0].timestamp + def is_timestamped(self) -> bool: + """If first position exist, the movement is timestamped.""" + return bool(self) + @timestamp.setter def timestamp(self, timestamp: int|float): - """Block gaze movment timestamp setting.""" + """Block gaze movement timestamp setting.""" raise('GazeMovement timestamp is first positon timestamp.') @property @@ -774,7 +780,7 @@ class ScanPath(list): self.__duration -= oldest_step.duration - def append_saccade(self, ts, saccade) -> ScanStepType: + def append_saccade(self, saccade) -> ScanStepType: """Append new saccade to scan path and return last new scan step if one have been created.""" # Ignore saccade if no fixation came before @@ -802,7 +808,7 @@ class ScanPath(list): # Clear last fixation self.__last_fixation = None - def append_fixation(self, ts, fixation): + def append_fixation(self, fixation): """Append new fixation to scan path. !!! warning Consecutives fixations are ignored keeping the last fixation""" diff --git a/src/argaze/PupillAnalysis/WorkloadIndex.py b/src/argaze/PupillAnalysis/WorkloadIndex.py index 1f3c586..f97dce3 100644 --- a/src/argaze/PupillAnalysis/WorkloadIndex.py +++ b/src/argaze/PupillAnalysis/WorkloadIndex.py @@ -15,51 +15,61 @@ from argaze import PupillFeatures import numpy -@dataclass class PupillDiameterAnalyzer(PupillFeatures.PupillDiameterAnalyzer): - """Periodic average of pupill diameter variations to pupill diameter reference value.""" + """Periodic average of pupill diameter variations to pupill diameter reference value. - reference: PupillFeatures.PupillDiameter - """ """ + Parameters: + reference: base line value. + period: identification period length. + """ + def __init__(self, reference: PupillFeatures.PupillDiameter, period: int|float = 1): - period: int | float = field(default=1) - """Identification period length.""" + assert(not math.isnan(self.__reference)) - def __post_init__(self): - - assert(self.reference.valid) + self.__reference = reference + self.__period = period self.__variations_sum = 0. self.__variations_number = 0 self.__last_ts = 0 + @property + def reference(self) -> PupillFeatures.PupillDiameter: + """Get workload index reference.""" + return self.__reference + + @property + def period(self) -> int|float: + """Get workload index period.""" + return self.__period + @DataFeatures.PipelineStepMethod - def analyze(self, ts: int|float, pupill_diameter) -> float: + def analyze(self, pupill_diameter: PupillFeatures.PupillDiameter) -> float: """Analyze workload index from successive timestamped pupill diameters.""" # Ignore non valid pupill diameter - if not pupill_diameter.valid: + if not math.isnan(pupill_diameter): return None - if ts - self.__last_ts >= self.period: + if pupill_diameter.timestamp - self.__last_ts >= self.__period: - if self.__variations_number > 0 and self.reference.value > 0.: + if self.__variations_number > 0 and self.__reference.value > 0.: - workload_index = (self.__variations_sum / self.__variations_number) / self.reference.value + workload_index = (self.__variations_sum / self.__variations_number) / self.__reference.value else: workload_index = 0. - self.__variations_sum = pupill_diameter.value - self.reference.value + self.__variations_sum = pupill_diameter.value - self.__reference.value self.__variations_number = 1 - self.__last_ts = ts + self.__last_ts = pupill_diameter.timestamp return workload_index else: - self.__variations_sum += pupill_diameter.value - self.reference.value + self.__variations_sum += pupill_diameter.value - self.__reference.value self.__variations_number += 1 \ No newline at end of file diff --git a/src/argaze/PupillFeatures.py b/src/argaze/PupillFeatures.py index d8f9331..492e7ca 100644 --- a/src/argaze/PupillFeatures.py +++ b/src/argaze/PupillFeatures.py @@ -10,71 +10,41 @@ __license__ = "BSD" from typing import TypeVar from dataclasses import dataclass, field import json +import math from argaze import DataFeatures -@dataclass(frozen=True) -class PupillDiameter(): - """Define pupill diameter as ...""" - - value: float = field(default=0.) - """Pupill diameter value.""" - - @property - def valid(self) -> bool: - """Is the value not 0""" - - return self.value != 0. - - def __repr__(self): - """String representation""" +PupillDiameterType = TypeVar('PupillDiameter', bound="PupillDiameter") +# Type definition for type annotation convenience - return json.dumps(self, ensure_ascii = False, default=vars) +class PupillDiameter(float, DataFeatures.TimestampedObject): + """Define pupill diameter as a single float value. -class UnvalidPupillDiameter(PupillDiameter): - """Unvalid pupill diameter.""" + Parameters: + value: pupill diameter value. + """ + def __new__(cls, value: float = math.nan, **kwargs): - def __init__(self, message=None): + return float.__new__(cls, value) - self.message = message + def __init__(self, value: float = math.nan, **kwargs): - super().__init__(0.) + super().__init__(**kwargs) + @property + def value(self): + """Get pupill diameter value.""" + return float(self) + TimeStampedPupillDiametersType = TypeVar('TimeStampedPupillDiameters', bound="TimeStampedPupillDiameters") # Type definition for type annotation convenience -class TimeStampedPupillDiameters(DataFeatures.TimeStampedBuffer): - """Define timestamped buffer to store pupill diameters.""" - - def __setitem__(self, key, value: PupillDiameter|dict): - """Force PupillDiameter storage.""" - - # Convert dict into PupillDiameter - if type(value) == dict: - - assert(set(['value']).issubset(value.keys())) - - if 'message' in value.keys(): - - value = UnvalidPupillDiameter(value['message']) - - else: - - value = PupillDiameter(value['value']) - - assert(type(value) == PupillDiameter or type(value) == UnvalidPupillDiameter) - - super().__setitem__(key, value) - - @classmethod - def from_json(self, json_filepath: str) -> TimeStampedPupillDiametersType: - """Create a TimeStampedPupillDiametersType from .json file.""" - - with open(json_filepath, encoding='utf-8') as ts_buffer_file: +class TimeStampedPupillDiameters(DataFeatures.TimestampedObjectsList): + """Handle timestamped pupill diamters into a list.""" - json_buffer = json.load(ts_buffer_file) + def __init__(self, pupill_diameters: list = []): - return TimeStampedPupillDiameters({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) + DataFeatures.TimestampedObjectsList.__init__(self, PupillDiameter, pupill_diameters) TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") # Type definition for type annotation convenience @@ -83,7 +53,7 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a pupill diameter analyser.""" @DataFeatures.PipelineStepMethod - def analyze(self, timestamp: int|float, pupill_diameter, float) -> float: + def analyze(self, pupill_diameter: PupillDiameterType) -> any: """Analyze pupill diameter from successive timestamped pupill diameters.""" raise NotImplementedError('analyze() method not implemented') @@ -96,9 +66,9 @@ class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): ts_analyzis = DataFeatures.TimeStampedBuffer() # Iterate on pupill diameters - for ts, pupill_diameter in ts_pupill_diameters.items(): + for pupill_diameter in ts_pupill_diameters: - analysis = self.analyze(ts, pupill_diameter) + analysis = self.analyze(pupill_diameter) if analysis is not None: -- cgit v1.1 From 74fb292fd3fd8012dcf82f1d62a03ac738424b58 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 16:09:50 +0100 Subject: Updating more test. --- src/argaze.test/ArUcoMarkers/ArUcoCamera.py | 83 +++++++++++----------- .../ArUcoMarkers/utils/aruco_camera.json | 39 ++++++---- .../DispersionThresholdIdentification.py | 3 - src/argaze.test/GazeFeatures.py | 3 - src/argaze/ArFeatures.py | 6 ++ src/argaze/ArUcoMarkers/ArUcoCamera.py | 4 +- src/argaze/DataFeatures.py | 4 +- src/argaze/GazeFeatures.py | 3 + 8 files changed, 78 insertions(+), 67 deletions(-) diff --git a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py index 6145f40..a3c5943 100644 --- a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py @@ -25,49 +25,46 @@ class TestArUcoCameraClass(unittest.TestCase): json_filepath = os.path.join(current_directory, 'utils/aruco_camera.json') # Load test aruco camera - aruco_camera = ArUcoCamera.ArUcoCamera.from_json(json_filepath) - - # Check aruco camera meta data - self.assertEqual(aruco_camera.name, "TestArUcoCamera") - - # Check ArUco detector - self.assertEqual(aruco_camera.aruco_detector.dictionary.name, "DICT_ARUCO_ORIGINAL") - self.assertEqual(aruco_camera.aruco_detector.marker_size, 3.0) - self.assertEqual(aruco_camera.aruco_detector.parameters.cornerRefinementMethod, 3) - self.assertEqual(aruco_camera.aruco_detector.parameters.aprilTagQuadSigma, 2) - self.assertEqual(aruco_camera.aruco_detector.parameters.aprilTagDeglitch, 1) - - # Check ArUco detector optic parameters - self.assertEqual(aruco_camera.aruco_detector.optic_parameters.rms, 1.0) - self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.dimensions, [1920, 1080])) - self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.K, [[1.0, 0.0, 1.0], [0.0, 1.0, 1.0], [0.0, 0.0, 1.0]])) - self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.D, [-1.0, -0.5, 0.0, 0.5, 1.0])) - - # Check camera scenes - self.assertEqual(len(aruco_camera.scenes), 2) - self.assertIsNone(numpy.testing.assert_array_equal(list(aruco_camera.scenes.keys()), ["TestSceneA", "TestSceneB"])) - - # Load test scene - ar_scene = aruco_camera.scenes["TestSceneA"] - - # Check Aruco scene - self.assertEqual(len(ar_scene.aruco_markers_group.places), 2) - self.assertIsNone(numpy.testing.assert_array_equal(ar_scene.aruco_markers_group.places[0].translation, [1, 0, 0])) - self.assertIsNone(numpy.testing.assert_array_equal(ar_scene.aruco_markers_group.places[0].rotation, [[1.,0.,0.],[0.,1.,0.],[0.,0.,1.]])) - self.assertEqual(ar_scene.aruco_markers_group.places[0].marker.identifier, 0) - - self.assertIsNone(numpy.testing.assert_array_equal(ar_scene.aruco_markers_group.places[1].translation, [0, 1, 0])) - self.assertIsNone(numpy.testing.assert_array_almost_equal(ar_scene.aruco_markers_group.places[1].rotation, [[0.,0.,1.],[0., 1.,0.],[-1.,0.,0.]])) - self.assertEqual(ar_scene.aruco_markers_group.places[1].marker.identifier, 1) - - # Check AOI scene - self.assertEqual(len(ar_scene.aoi_scene.items()), 1) - self.assertEqual(ar_scene.aoi_scene['Test'].points_number, 4) - self.assertIsNone(numpy.testing.assert_array_equal(ar_scene.aoi_scene['Test'].size, [1., 1., 0.])) - - # Check ArScene - self.assertEqual(ar_scene.angle_tolerance, 1.0) - self.assertEqual(ar_scene.distance_tolerance, 2.0) + with ArUcoCamera.ArUcoCamera.from_json(json_filepath) as aruco_camera: + + # Check aruco camera meta data + self.assertEqual(aruco_camera.name, "TestArUcoCamera") + + # Check ArUco detector + self.assertEqual(aruco_camera.aruco_detector.dictionary.name, "DICT_ARUCO_ORIGINAL") + self.assertEqual(aruco_camera.aruco_detector.parameters.cornerRefinementMethod, 3) + self.assertEqual(aruco_camera.aruco_detector.parameters.aprilTagQuadSigma, 2) + self.assertEqual(aruco_camera.aruco_detector.parameters.aprilTagDeglitch, 1) + + # Check ArUco detector optic parameters + self.assertEqual(aruco_camera.aruco_detector.optic_parameters.rms, 1.0) + self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.dimensions, [1920, 1080])) + self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.K, [[1.0, 0.0, 1.0], [0.0, 1.0, 1.0], [0.0, 0.0, 1.0]])) + self.assertIsNone(numpy.testing.assert_array_equal(aruco_camera.aruco_detector.optic_parameters.D, [-1.0, -0.5, 0.0, 0.5, 1.0])) + + # Check camera scenes + self.assertEqual(len(aruco_camera.scenes), 2) + self.assertIsNone(numpy.testing.assert_array_equal(list(aruco_camera.scenes.keys()), ["TestSceneA", "TestSceneB"])) + + # Load test scene + ar_scene = aruco_camera.scenes["TestSceneA"] + + # Check Aruco scene + self.assertEqual(len(ar_scene.aruco_markers_group.places), 2) + self.assertIsNone(numpy.testing.assert_allclose(ar_scene.aruco_markers_group.places[0].corners[0], [-0.5, 1.5, 0.], rtol=0, atol=1e-3)) + self.assertEqual(ar_scene.aruco_markers_group.places[0].marker.identifier, 0) + + self.assertIsNone(numpy.testing.assert_allclose(ar_scene.aruco_markers_group.places[1].corners[0], [0., 2.5, -1.5], rtol=0, atol=1e-3)) + self.assertEqual(ar_scene.aruco_markers_group.places[1].marker.identifier, 1) + + # Check layers and AOI scene + self.assertEqual(len(ar_scene.layers.items()), 1) + self.assertEqual(len(ar_scene.layers["Main"].aoi_scene), 1) + self.assertEqual(ar_scene.layers["Main"].aoi_scene['Test'].points_number, 4) + + # Check ArScene + self.assertEqual(ar_scene.angle_tolerance, 1.0) + self.assertEqual(ar_scene.distance_tolerance, 2.0) if __name__ == '__main__': diff --git a/src/argaze.test/ArUcoMarkers/utils/aruco_camera.json b/src/argaze.test/ArUcoMarkers/utils/aruco_camera.json index 7648916..980dc9f 100644 --- a/src/argaze.test/ArUcoMarkers/utils/aruco_camera.json +++ b/src/argaze.test/ArUcoMarkers/utils/aruco_camera.json @@ -1,10 +1,10 @@ { "name": "TestArUcoCamera", + "size": [1920, 1080], "aruco_detector": { "dictionary": { "name": "DICT_ARUCO_ORIGINAL" }, - "marker_size": 3.0, "optic_parameters": { "rms": 1.0, "dimensions": [ @@ -45,45 +45,54 @@ "scenes": { "TestSceneA" : { "aruco_markers_group": { - "marker_size": 3.0, - "dictionary": { - "name": "DICT_ARUCO_ORIGINAL" - }, + "dictionary": "DICT_ARUCO_ORIGINAL", "places": { "0": { "translation": [1, 0, 0], - "rotation": [0, 0, 0] + "rotation": [0, 0, 0], + "size": 3.0 }, "1": { "translation": [0, 1, 0], - "rotation": [0, 90, 0] + "rotation": [0, 90, 0], + "size": 3.0 } } }, - "aoi_scene": "aoi.obj", + "layers": { + "Main" : { + "aoi_scene": "aoi_3d.obj" + } + }, "angle_tolerance": 1.0, "distance_tolerance": 2.0 }, "TestSceneB" : { "aruco_markers_group": { - "marker_size": 3.0, - "dictionary": { - "name": "DICT_ARUCO_ORIGINAL" - }, + "dictionary": "DICT_ARUCO_ORIGINAL", "places": { "0": { "translation": [1, 0, 0], - "rotation": [0, 0, 0] + "rotation": [0, 0, 0], + "size": 3.0 }, "1": { "translation": [0, 1, 0], - "rotation": [0, 90, 0] + "rotation": [0, 90, 0], + "size": 3.0 } } }, - "aoi_scene": "aoi.obj", + "layers": { + "Main" : { + "aoi_scene": "aoi_3d.obj" + } + }, "angle_tolerance": 1.0, "distance_tolerance": 2.0 } + }, + "layers": { + "Main": {} } } \ No newline at end of file diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py index 311f31b..156f6f1 100644 --- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py +++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py @@ -113,9 +113,6 @@ class TestDispersionThresholdIdentificationClass(unittest.TestCase): gaze_movement_identifier = DispersionThresholdIdentification.GazeMovementIdentifier(deviation_max_threshold=deviation_max, duration_min_threshold=max_time*2) ts_fixations, ts_saccades, ts_status = gaze_movement_identifier.browse(ts_gaze_positions) - # DEBUG - print(gaze_movement_identifier) - # Check result size self.assertEqual(len(ts_fixations), 1) self.assertEqual(len(ts_saccades), 0) diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index e678093..c0c014c 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -31,9 +31,6 @@ def random_gaze_positions(size, frame_dimension: tuple[float, float] = (1, 1)): # Edit gaze position random_gaze_position = GazeFeatures.GazePosition((random.random() * frame_dimension[0], random.random() * frame_dimension[1]), precision=5) - # DEBUG - print('random_gaze_position', type(random_gaze_position), random_gaze_position.__class__.__bases__) - # Timestamp gaze position random_gaze_position.timestamp = time.time() diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 8005d48..95d89d9 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -882,11 +882,17 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): !!! warning Be aware that gaze positions are in the same range of value than size attribute. + !!! note + This method timestamps incoming gaze position. + Parameters: timestamp: method call timestamp (unit does'nt matter) gaze_position: gaze position to project """ + # Timestamp gaze position + gaze_position.timestamp = timestamp + # Use frame lock feature with self._lock: diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py index 7f30252..f6297a8 100644 --- a/src/argaze/ArUcoMarkers/ArUcoCamera.py +++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py @@ -187,8 +187,8 @@ class ArUcoCamera(ArFeatures.ArCamera): pass - # Timestamp camera frame - self.timestamp = timestamp + # Timestamp camera frame + self.timestamp = timestamp def __image(self, draw_detected_markers: dict = None, draw_scenes: dict = None, draw_optic_parameters_grid: dict = None, **kwargs: dict) -> numpy.array: """Get frame image with ArUco detection visualisation. diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py index 8df991b..849601f 100644 --- a/src/argaze/DataFeatures.py +++ b/src/argaze/DataFeatures.py @@ -196,8 +196,10 @@ class TimestampedObjectsList(list): raise TypeError(f'{type(ts_object)} object is not {self.__object_type} instance') - assert(ts_object.is_timestamped()) + if not ts_object.is_timestamped(): + raise ValueError(f'object is not timestamped') + super().append(ts_object) def look_for(self, timestamp: TimeStampType) -> TimestampedObjectType: diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py index 6a02142..263f793 100644 --- a/src/argaze/GazeFeatures.py +++ b/src/argaze/GazeFeatures.py @@ -363,6 +363,9 @@ GazeMovementType = TypeVar('GazeMovement', bound="GazeMovement") class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject): """Define abstract gaze movement class as timestamped gaze positions list. + !!! note + Gaze movement timestamp is always equal to its first position timestamp. + Parameters: positions: timestamp gaze positions. finished: is the movement finished? -- cgit v1.1 From e82fd7d761beed64b85c1b9da83dd9c0e81d26de Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 16:10:10 +0100 Subject: Adding file for test. --- src/argaze.test/ArUcoMarkers/utils/aoi_3d.obj | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 src/argaze.test/ArUcoMarkers/utils/aoi_3d.obj diff --git a/src/argaze.test/ArUcoMarkers/utils/aoi_3d.obj b/src/argaze.test/ArUcoMarkers/utils/aoi_3d.obj new file mode 100644 index 0000000..92e85bd --- /dev/null +++ b/src/argaze.test/ArUcoMarkers/utils/aoi_3d.obj @@ -0,0 +1,7 @@ +o Test +v 0.000000 0.000000 0.000000 +v 25.000000 0.000000 0.000000 +v 0.000000 14.960000 0.000000 +v 25.000000 14.960000 0.000000 +s off +f 1 2 4 3 -- cgit v1.1 From e18530884d302331b3a7cabd6f7e0d2d1ef155ca Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 16:12:17 +0100 Subject: Catching ValueError instead of AssertionError. --- src/argaze.test/DataFeatures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/argaze.test/DataFeatures.py b/src/argaze.test/DataFeatures.py index 9543915..c64ad4c 100644 --- a/src/argaze.test/DataFeatures.py +++ b/src/argaze.test/DataFeatures.py @@ -83,7 +83,7 @@ class TestTimestampedObjectsListClass(unittest.TestCase): self.assertEqual(DataFeatures.TimestampedObjectsList(BasicDataClass), []) # Check that TimestampedObjectsList creation fails when data are not timestamped - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): data_list = [BasicDataClass((0, 0))] DataFeatures.TimestampedObjectsList(BasicDataClass, data_list) -- cgit v1.1 From ac8cc60d27a57c892354214b04327878b511cc44 Mon Sep 17 00:00:00 2001 From: Théo de la Hogue Date: Thu, 29 Feb 2024 16:40:28 +0100 Subject: Checking gaze position. --- src/argaze.test/GazeFeatures.py | 12 +++++++----- src/argaze/ArFeatures.py | 18 ++++++++++-------- src/argaze/GazeAnalysis/LinearRegression.py | 6 +++--- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py index c0c014c..035c76a 100644 --- a/src/argaze.test/GazeFeatures.py +++ b/src/argaze.test/GazeFeatures.py @@ -71,7 +71,7 @@ class TestGazePositionClass(unittest.TestCase): empty_gaze_position = GazeFeatures.GazePosition(message="empty for test") self.assertEqual(empty_gaze_position, ()) - self.assertEqual(empty_gaze_position.value, ()) + self.assertEqual(empty_gaze_position, ()) self.assertEqual(empty_gaze_position.precision, None) self.assertEqual(empty_gaze_position.message, "empty for test") self.assertEqual(len(empty_gaze_position), 0) @@ -84,7 +84,7 @@ class TestGazePositionClass(unittest.TestCase): self.assertEqual(int_gaze_position, (123, 456)) self.assertEqual(int_gaze_position[0], 123) self.assertEqual(int_gaze_position[1], 456) - self.assertEqual(int_gaze_position.value, (123, 456)) + self.assertEqual(int_gaze_position, (123, 456)) self.assertEqual(int_gaze_position.precision, 55) self.assertEqual(len(int_gaze_position), 2) self.assertEqual(bool(int_gaze_position), True) @@ -96,7 +96,7 @@ class TestGazePositionClass(unittest.TestCase): self.assertEqual(float_gaze_position, (1.23, 4.56)) self.assertEqual(float_gaze_position[0], 1.23) self.assertEqual(float_gaze_position[1], 4.56) - self.assertEqual(float_gaze_position.value, (1.23, 4.56)) + self.assertEqual(float_gaze_position, (1.23, 4.56)) self.assertEqual(float_gaze_position.precision, 5.5) self.assertEqual(len(float_gaze_position), 2) self.assertEqual(bool(float_gaze_position), True) @@ -111,9 +111,11 @@ class TestGazePositionClass(unittest.TestCase): with self.assertRaises(AttributeError): gaze_position.value = (12, 34) + # WARNING: gaze_position = (12, 34) is possible !!!! + # How to prevent this? - self.assertNotEqual(gaze_position.value, (12, 34)) - self.assertEqual(gaze_position.value, ()) + self.assertNotEqual(gaze_position, (12, 34)) + self.assertEqual(gaze_position, ()) # Check that gaze position precision setting fails with self.assertRaises(AttributeError): diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py index 95d89d9..abe4ed7 100644 --- a/src/argaze/ArFeatures.py +++ b/src/argaze/ArFeatures.py @@ -958,7 +958,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject): scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]]) # Update heatmap image - self.__heatmap.update(timestamp, self.__calibrated_gaze_position.value * scale) + self.__heatmap.update(timestamp, self.__calibrated_gaze_position * scale) # Look layers with valid identified gaze movement # Note: don't filter valid/unvalid finished/unfished gaze movement to allow layers to reset internally @@ -1470,16 +1470,18 @@ class ArCamera(ArFrame): aoi_2d = camera_layer.aoi_scene[scene_frame.name] - # TODO?: Should we prefer to use camera frame AOIMatcher object? - if aoi_2d.contains_point(gaze_position.value): + if gaze_position: - inner_x, inner_y = aoi_2d.clockwise().inner_axis(*gaze_position.value) + # TODO?: Should we prefer to use camera frame AOIMatcher object? + if aoi_2d.contains_point(gaze_position): - # QUESTION: How to project gaze precision? - inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) + inner_x, inner_y = aoi_2d.clockwise().inner_axis(*gaze_position) - # Project inner gaze position into scene frame - scene_frame.look(timestamp, inner_gaze_position * scene_frame.size) + # QUESTION: How to project gaze precision? + inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y)) + + # Project inner gaze position into scene frame + scene_frame.look(timestamp, inner_gaze_position * scene_frame.size) # Ignore missing aoi in camera frame layer projection except KeyError as e: diff --git a/src/argaze/GazeAnalysis/LinearRegression.py b/src/argaze/GazeAnalysis/LinearRegression.py index 414832a..717e8a3 100644 --- a/src/argaze/GazeAnalysis/LinearRegression.py +++ b/src/argaze/GazeAnalysis/LinearRegression.py @@ -45,8 +45,8 @@ class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator): def store(self, timestamp: int|float, observed_gaze_position: GazeFeatures.GazePosition, expected_gaze_position: GazeFeatures.GazePosition): """Store observed and expected gaze positions.""" - self.__observed_positions.append(observed_gaze_position.value) - self.__expected_positions.append(expected_gaze_position.value) + self.__observed_positions.append(observed_gaze_position) + self.__expected_positions.append(expected_gaze_position) def reset(self): """Reset observed and expected gaze positions.""" @@ -78,7 +78,7 @@ class GazePositionCalibrator(GazeFeatures.GazePositionCalibrator): if not self.calibrating: - return GazeFeatures.GazePosition(self.__linear_regression.predict(numpy.array([gaze_position.value]))[0], precision=gaze_position.precision) + return GazeFeatures.GazePosition(self.__linear_regression.predict(numpy.array([gaze_position]))[0], precision=gaze_position.precision) else: -- cgit v1.1