#!/usr/bin/env python """ """ __author__ = "Théo de la Hogue" __credits__ = [] __copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)" __license__ = "BSD" from typing import TypeVar from dataclasses import dataclass, field import json from argaze import DataFeatures @dataclass(frozen=True) class PupillDiameter(): """Define pupill diameter as ...""" value: float = field(default=0.) """Pupill diameter value.""" @property def valid(self) -> bool: """Is the value not 0""" return self.value != 0. def __repr__(self): """String representation""" return json.dumps(self, ensure_ascii = False, default=vars) class UnvalidPupillDiameter(PupillDiameter): """Unvalid pupill diameter.""" def __init__(self, message=None): self.message = message super().__init__(0.) TimeStampedPupillDiametersType = TypeVar('TimeStampedPupillDiameters', bound="TimeStampedPupillDiameters") # Type definition for type annotation convenience class TimeStampedPupillDiameters(DataFeatures.TimeStampedBuffer): """Define timestamped buffer to store pupill diameters.""" def __setitem__(self, key, value: PupillDiameter|dict): """Force PupillDiameter storage.""" # Convert dict into PupillDiameter if type(value) == dict: assert(set(['value']).issubset(value.keys())) if 'message' in value.keys(): value = UnvalidPupillDiameter(value['message']) else: value = PupillDiameter(value['value']) assert(type(value) == PupillDiameter or type(value) == UnvalidPupillDiameter) super().__setitem__(key, value) @classmethod def from_json(self, json_filepath: str) -> TimeStampedPupillDiametersType: """Create a TimeStampedPupillDiametersType from .json file.""" with open(json_filepath, encoding='utf-8') as ts_buffer_file: json_buffer = json.load(ts_buffer_file) return TimeStampedPupillDiameters({ast.literal_eval(ts_str): json_buffer[ts_str] for ts_str in json_buffer}) TimeStampedBufferType = TypeVar('TimeStampedBuffer', bound="TimeStampedBuffer") # Type definition for type annotation convenience class PupillDiameterAnalyzer(DataFeatures.PipelineStepObject): """Abstract class to define what should provide a pupill diameter analyser.""" @DataFeatures.PipelineStepMethod def analyze(self, timestamp: int|float, pupill_diameter, float) -> float: """Analyze pupill diameter from successive timestamped pupill diameters.""" raise NotImplementedError('analyze() method not implemented') def browse(self, ts_pupill_diameters: TimeStampedPupillDiameters) -> TimeStampedBufferType: """Analyze by browsing timestamped pupill diameters.""" assert(type(ts_pupill_diameters) == TimeStampedPupillDiameters) ts_analyzis = DataFeatures.TimeStampedBuffer() # Iterate on pupill diameters for ts, pupill_diameter in ts_pupill_diameters.items(): analysis = self.analyze(ts, pupill_diameter) if analysis is not None: ts_analyzis[ts] = analysis return ts_analyzis