aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--LICENSE4
-rw-r--r--src/argaze.test/ArUcoMarkers/ArUcoBoard.py2
-rw-r--r--src/argaze.test/ArUcoMarkers/ArUcoCamera.py2
-rw-r--r--src/argaze.test/ArUcoMarkers/ArUcoDetector.py2
-rw-r--r--src/argaze.test/ArUcoMarkers/ArUcoMarker.py2
-rw-r--r--src/argaze.test/ArUcoMarkers/ArUcoMarkersDictionary.py2
-rw-r--r--src/argaze.test/ArUcoMarkers/ArUcoOpticCalibrator.py2
-rw-r--r--src/argaze.test/ArUcoMarkers/ArUcoScene.py2
-rw-r--r--src/argaze.test/AreaOfInterest/AOI2DScene.py2
-rw-r--r--src/argaze.test/AreaOfInterest/AOI3DScene.py2
-rw-r--r--src/argaze.test/AreaOfInterest/AOIFeatures.py2
-rw-r--r--src/argaze.test/DataFeatures.py2
-rw-r--r--src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py2
-rw-r--r--src/argaze.test/GazeAnalysis/Entropy.py2
-rw-r--r--src/argaze.test/GazeAnalysis/ExploreExploitRatio.py2
-rw-r--r--src/argaze.test/GazeAnalysis/KCoefficient.py2
-rw-r--r--src/argaze.test/GazeAnalysis/LempelZivComplexity.py2
-rw-r--r--src/argaze.test/GazeAnalysis/NGram.py2
-rw-r--r--src/argaze.test/GazeAnalysis/NearestNeighborIndex.py2
-rw-r--r--src/argaze.test/GazeAnalysis/TransitionMatrix.py2
-rw-r--r--src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py2
-rw-r--r--src/argaze.test/GazeFeatures.py2
-rw-r--r--src/argaze.test/OpenCVCuda.py2
-rw-r--r--src/argaze.test/PupillAnalysis/WorkloadIndex.py2
-rw-r--r--src/argaze.test/PupillFeatures.py2
-rw-r--r--src/argaze/ArFeatures.py104
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoBoard.py2
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoCamera.py2
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoDetector.py476
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoMarker.py2
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py2
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py617
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py2
-rw-r--r--src/argaze/ArUcoMarkers/ArUcoScene.py2
-rw-r--r--src/argaze/AreaOfInterest/AOI2DScene.py2
-rw-r--r--src/argaze/AreaOfInterest/AOI3DScene.py2
-rw-r--r--src/argaze/AreaOfInterest/AOIFeatures.py2
-rw-r--r--src/argaze/DataFeatures.py6
-rw-r--r--src/argaze/GazeAnalysis/Basic.py2
-rw-r--r--src/argaze/GazeAnalysis/DeviationCircleCoverage.py2
-rw-r--r--src/argaze/GazeAnalysis/DispersionThresholdIdentification.py2
-rw-r--r--src/argaze/GazeAnalysis/Entropy.py2
-rw-r--r--src/argaze/GazeAnalysis/ExploreExploitRatio.py2
-rw-r--r--src/argaze/GazeAnalysis/FocusPointInside.py2
-rw-r--r--src/argaze/GazeAnalysis/KCoefficient.py2
-rw-r--r--src/argaze/GazeAnalysis/LempelZivComplexity.py2
-rw-r--r--src/argaze/GazeAnalysis/LinearRegression.py2
-rw-r--r--src/argaze/GazeAnalysis/NGram.py2
-rw-r--r--src/argaze/GazeAnalysis/NearestNeighborIndex.py2
-rw-r--r--src/argaze/GazeAnalysis/TransitionMatrix.py2
-rw-r--r--src/argaze/GazeAnalysis/VelocityThresholdIdentification.py2
-rw-r--r--src/argaze/GazeFeatures.py1626
-rw-r--r--src/argaze/PupilAnalysis/WorkloadIndex.py2
-rw-r--r--src/argaze/PupilFeatures.py2
-rw-r--r--src/argaze/__main__.py2
-rw-r--r--src/argaze/utils/UtilsFeatures.py2
-rw-r--r--src/argaze/utils/aruco_markers_group_export.py283
-rw-r--r--src/argaze/utils/contexts/OpenCV.py2
-rw-r--r--src/argaze/utils/contexts/PupilLabs.py55
-rw-r--r--src/argaze/utils/contexts/TobiiProGlasses2.py187
-rw-r--r--src/argaze/utils/demo/recorders.py2
61 files changed, 1728 insertions, 1734 deletions
diff --git a/LICENSE b/LICENSE
index 94a9ed0..76525e7 100644
--- a/LICENSE
+++ b/LICENSE
@@ -645,7 +645,7 @@ the "copyright" line and a pointer to where the full notice is found.
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
Also add information on how to contact you by electronic and paper mail.
@@ -664,7 +664,7 @@ might be different; for a GUI interface, you would use an "about box".
You should also get your employer (if you work as a programmer) or school,
if any, to sign a "copyright disclaimer" for the program, if necessary.
For more information on this, and how to apply and follow the GNU GPL, see
-<http://www.gnu.org/licenses/>.
+<https://www.gnu.org/licenses/>.
The GNU General Public License does not permit incorporating your program
into proprietary programs. If your program is a subroutine library, you
diff --git a/src/argaze.test/ArUcoMarkers/ArUcoBoard.py b/src/argaze.test/ArUcoMarkers/ArUcoBoard.py
index f3ce194..0bfa568 100644
--- a/src/argaze.test/ArUcoMarkers/ArUcoBoard.py
+++ b/src/argaze.test/ArUcoMarkers/ArUcoBoard.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py
index 5864465..091383e 100644
--- a/src/argaze.test/ArUcoMarkers/ArUcoCamera.py
+++ b/src/argaze.test/ArUcoMarkers/ArUcoCamera.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/ArUcoMarkers/ArUcoDetector.py b/src/argaze.test/ArUcoMarkers/ArUcoDetector.py
index 7babb94..62e8a09 100644
--- a/src/argaze.test/ArUcoMarkers/ArUcoDetector.py
+++ b/src/argaze.test/ArUcoMarkers/ArUcoDetector.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/ArUcoMarkers/ArUcoMarker.py b/src/argaze.test/ArUcoMarkers/ArUcoMarker.py
index bb83a51..de88623 100644
--- a/src/argaze.test/ArUcoMarkers/ArUcoMarker.py
+++ b/src/argaze.test/ArUcoMarkers/ArUcoMarker.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/ArUcoMarkers/ArUcoMarkersDictionary.py b/src/argaze.test/ArUcoMarkers/ArUcoMarkersDictionary.py
index b8156ba..7a5e9e8 100644
--- a/src/argaze.test/ArUcoMarkers/ArUcoMarkersDictionary.py
+++ b/src/argaze.test/ArUcoMarkers/ArUcoMarkersDictionary.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/ArUcoMarkers/ArUcoOpticCalibrator.py b/src/argaze.test/ArUcoMarkers/ArUcoOpticCalibrator.py
index b51647e..79d2ead 100644
--- a/src/argaze.test/ArUcoMarkers/ArUcoOpticCalibrator.py
+++ b/src/argaze.test/ArUcoMarkers/ArUcoOpticCalibrator.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/ArUcoMarkers/ArUcoScene.py b/src/argaze.test/ArUcoMarkers/ArUcoScene.py
index 68c8d4d..f29b1d3 100644
--- a/src/argaze.test/ArUcoMarkers/ArUcoScene.py
+++ b/src/argaze.test/ArUcoMarkers/ArUcoScene.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/AreaOfInterest/AOI2DScene.py b/src/argaze.test/AreaOfInterest/AOI2DScene.py
index daa6431..ae9e6b2 100644
--- a/src/argaze.test/AreaOfInterest/AOI2DScene.py
+++ b/src/argaze.test/AreaOfInterest/AOI2DScene.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/AreaOfInterest/AOI3DScene.py b/src/argaze.test/AreaOfInterest/AOI3DScene.py
index f16b1f5..591b5df 100644
--- a/src/argaze.test/AreaOfInterest/AOI3DScene.py
+++ b/src/argaze.test/AreaOfInterest/AOI3DScene.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/AreaOfInterest/AOIFeatures.py b/src/argaze.test/AreaOfInterest/AOIFeatures.py
index 9b93a9f..25fdca9 100644
--- a/src/argaze.test/AreaOfInterest/AOIFeatures.py
+++ b/src/argaze.test/AreaOfInterest/AOIFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/DataFeatures.py b/src/argaze.test/DataFeatures.py
index 04163be..a0031bc 100644
--- a/src/argaze.test/DataFeatures.py
+++ b/src/argaze.test/DataFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
index 87f439f..9326060 100644
--- a/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze.test/GazeAnalysis/DispersionThresholdIdentification.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeAnalysis/Entropy.py b/src/argaze.test/GazeAnalysis/Entropy.py
index 2b09b27..0bfd7b8 100644
--- a/src/argaze.test/GazeAnalysis/Entropy.py
+++ b/src/argaze.test/GazeAnalysis/Entropy.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeAnalysis/ExploreExploitRatio.py b/src/argaze.test/GazeAnalysis/ExploreExploitRatio.py
index ceef343..0c2eb96 100644
--- a/src/argaze.test/GazeAnalysis/ExploreExploitRatio.py
+++ b/src/argaze.test/GazeAnalysis/ExploreExploitRatio.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeAnalysis/KCoefficient.py b/src/argaze.test/GazeAnalysis/KCoefficient.py
index d6af6b6..3c79cc3 100644
--- a/src/argaze.test/GazeAnalysis/KCoefficient.py
+++ b/src/argaze.test/GazeAnalysis/KCoefficient.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeAnalysis/LempelZivComplexity.py b/src/argaze.test/GazeAnalysis/LempelZivComplexity.py
index 13e44c0..91d7df1 100644
--- a/src/argaze.test/GazeAnalysis/LempelZivComplexity.py
+++ b/src/argaze.test/GazeAnalysis/LempelZivComplexity.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeAnalysis/NGram.py b/src/argaze.test/GazeAnalysis/NGram.py
index 54986fb..69c06cd 100644
--- a/src/argaze.test/GazeAnalysis/NGram.py
+++ b/src/argaze.test/GazeAnalysis/NGram.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeAnalysis/NearestNeighborIndex.py b/src/argaze.test/GazeAnalysis/NearestNeighborIndex.py
index 080a28c..3325a2a 100644
--- a/src/argaze.test/GazeAnalysis/NearestNeighborIndex.py
+++ b/src/argaze.test/GazeAnalysis/NearestNeighborIndex.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeAnalysis/TransitionMatrix.py b/src/argaze.test/GazeAnalysis/TransitionMatrix.py
index c1d3ee2..5aeef94 100644
--- a/src/argaze.test/GazeAnalysis/TransitionMatrix.py
+++ b/src/argaze.test/GazeAnalysis/TransitionMatrix.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py
index 55a1932..39a8834 100644
--- a/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py
+++ b/src/argaze.test/GazeAnalysis/VelocityThresholdIdentification.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/GazeFeatures.py b/src/argaze.test/GazeFeatures.py
index 35574bd..74cbc12 100644
--- a/src/argaze.test/GazeFeatures.py
+++ b/src/argaze.test/GazeFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/OpenCVCuda.py b/src/argaze.test/OpenCVCuda.py
index 2689db8..e552436 100644
--- a/src/argaze.test/OpenCVCuda.py
+++ b/src/argaze.test/OpenCVCuda.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/PupillAnalysis/WorkloadIndex.py b/src/argaze.test/PupillAnalysis/WorkloadIndex.py
index da9b72d..ebb043a 100644
--- a/src/argaze.test/PupillAnalysis/WorkloadIndex.py
+++ b/src/argaze.test/PupillAnalysis/WorkloadIndex.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze.test/PupillFeatures.py b/src/argaze.test/PupillFeatures.py
index b0cf65d..085973b 100644
--- a/src/argaze.test/PupillFeatures.py
+++ b/src/argaze.test/PupillFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/ArFeatures.py b/src/argaze/ArFeatures.py
index 56a941d..32d1542 100644
--- a/src/argaze/ArFeatures.py
+++ b/src/argaze/ArFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -16,24 +16,18 @@ __credits__ = []
__copyright__ = "Copyright 2023, Ecole Nationale de l'Aviation Civile (ENAC)"
__license__ = "GPLv3"
-from typing import Iterator, Union
import logging
-import json
-import os
-import sys
-import importlib
-import threading
-import time
import math
+import os
+from typing import Iterator, Union
+
+import cv2
+import numpy
from argaze import DataFeatures, GazeFeatures
from argaze.AreaOfInterest import *
-from argaze.GazeAnalysis import *
from argaze.utils import UtilsFeatures
-import numpy
-import cv2
-
class PoseEstimationFailed(Exception):
"""
@@ -129,12 +123,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
@aoi_scene.setter
def aoi_scene(self, aoi_scene_value: AOIFeatures.AOIScene | str | dict):
+ new_aoi_scene = None
+
if issubclass(type(aoi_scene_value), AOIFeatures.AOIScene):
new_aoi_scene = aoi_scene_value
# str: relative path to file
- elif type(aoi_scene_value) == str:
+ elif type(aoi_scene_value) is str:
filepath = os.path.join(DataFeatures.get_working_directory(), aoi_scene_value)
file_format = filepath.split('.')[-1]
@@ -154,10 +150,14 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
new_aoi_scene = AOI3DScene.AOI3DScene.from_obj(filepath)
# dict:
- elif type(aoi_scene_value) == dict:
+ elif type(aoi_scene_value) is dict:
new_aoi_scene = AOIFeatures.AOIScene.from_dict(aoi_scene_value)
+ else:
+
+ raise ValueError("Bad aoi scene value")
+
# Cast aoi scene to its effective dimension
if new_aoi_scene.dimension == 2:
@@ -213,6 +213,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""AOI scan path analyzers list."""
return self.__aoi_scan_path_analyzers
+ # noinspection PyUnresolvedReferences
@aoi_scan_path_analyzers.setter
@DataFeatures.PipelineStepAttributeSetter
def aoi_scan_path_analyzers(self, aoi_scan_path_analyzers: list):
@@ -245,7 +246,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
for a in self.__aoi_scan_path_analyzers:
- if type(a) == property_type:
+ if type(a) is property_type:
setattr(analyzer, name, a)
found = True
@@ -254,7 +255,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
- if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path == None:
+ if len(self.__aoi_scan_path_analyzers) > 0 and self.aoi_scan_path is None:
self.__aoi_scan_path = GazeFeatures.ScanPath()
# Edit parent
@@ -311,7 +312,7 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
self.__aoi_scan_path.expected_aoi = expected_aoi
@DataFeatures.PipelineStepMethod
- def look(self, gaze_movement: GazeFeatures.GazePosition = GazeFeatures.GazePosition()):
+ def look(self, gaze_movement: GazeFeatures.GazeMovement = None):
"""
Project timestamped gaze movement into layer.
@@ -321,6 +322,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
Parameters:
gaze_movement: gaze movement to project
"""
+ if gaze_movement is None:
+ gaze_movement = GazeFeatures.GazeMovement()
# Use layer lock feature
with self._lock:
@@ -383,8 +386,8 @@ class ArLayer(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
Parameters:
image: image where to draw.
- draw_aoi_scene: AreaOfInterest.AOI2DScene.draw parameters (if None, no aoi scene is drawn)
- draw_aoi_matching: AOIMatcher.draw parameters (which depends on the loaded aoi matcher module,
+ draw_aoi_scene: [AOI2DScene.draw][argaze.AreaOfInterest.AOI2DScene.draw] parameters (if None, no aoi scene is drawn)
+ draw_aoi_matching: [AOIMatcher.draw][argaze.GazeFeatures.AOIMatcher.draw] parameters (which depends on the loaded aoi matcher module,
if None, no aoi matching is drawn)
"""
@@ -530,6 +533,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
"""Scan path analyzers list."""
return self.__scan_path_analyzers
+ # noinspection PyUnresolvedReferences
@scan_path_analyzers.setter
@DataFeatures.PipelineStepAttributeSetter
def scan_path_analyzers(self, scan_path_analyzers: list):
@@ -562,7 +566,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
for a in self.__scan_path_analyzers:
- if type(a) == property_type:
+ if type(a) is property_type:
setattr(analyzer, name, a)
found = True
@@ -571,7 +575,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
f'{type(analyzer)} analyzer loading fails because {property_type} analyzer is missing.')
# Force scan path creation
- if len(self.__scan_path_analyzers) > 0 and self.__scan_path == None:
+ if len(self.__scan_path_analyzers) > 0 and self.__scan_path is None:
self.__scan_path = GazeFeatures.ScanPath()
# Edit parent
@@ -739,8 +743,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
# Analyze aoi scan path
for scan_path_analyzer in self.__scan_path_analyzers:
- scan_path_analyzer.analyze(self.__scan_path,
- timestamp=self.__identified_gaze_movement.timestamp)
+ scan_path_analyzer.analyze(self.__scan_path, timestamp=self.__identified_gaze_movement.timestamp)
# Update scan path analyzed state
self.__scan_path_analyzed = True
@@ -756,8 +759,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
scale = numpy.array([self.__heatmap.size[0] / self.__size[0], self.__heatmap.size[1] / self.__size[1]])
# Update heatmap image
- self.__heatmap.update(self.__calibrated_gaze_position * scale,
- timestamp=self.__calibrated_gaze_position.timestamp)
+ self.__heatmap.update(self.__calibrated_gaze_position * scale, timestamp=self.__calibrated_gaze_position.timestamp)
# Look layers with valid identified gaze movement
# Note: don't filter valid/invalid finished/unfinished gaze movement to allow layers to reset internally
@@ -765,9 +767,7 @@ class ArFrame(DataFeatures.SharedObject, DataFeatures.PipelineStepObject):
layer.look(self.__identified_gaze_movement)
@DataFeatures.PipelineStepImage
- def image(self, background_weight: float = None, heatmap_weight: float = None,
- draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None,
- draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array:
+ def image(self, background_weight: float = None, heatmap_weight: float = None, draw_gaze_position_calibrator: dict = None, draw_scan_path: dict = None, draw_layers: dict = None, draw_gaze_positions: dict = None, draw_fixations: dict = None, draw_saccades: dict = None) -> numpy.array:
"""
Get background image with overlaid visualisations.
@@ -901,12 +901,12 @@ class ArScene(DataFeatures.PipelineStepObject):
for layer_name, layer_data in layers.items():
- if type(layer_data) == dict:
+ if type(layer_data) is dict:
self._layers[layer_name] = ArLayer(name=layer_name, **layer_data)
# str: relative path to JSON file
- elif type(layer_data) == str:
+ elif type(layer_data) is str:
self._layers[layer_name] = DataFeatures.from_json(
os.path.join(DataFeatures.get_working_directory(), layer_data))
@@ -929,18 +929,22 @@ class ArScene(DataFeatures.PipelineStepObject):
for frame_name, frame_data in frames.items():
- if type(frame_data) == dict:
+ if type(frame_data) is dict:
new_frame = ArFrame(name=frame_name, **frame_data)
# str: relative path to JSON file
- elif type(frame_data) == str:
+ elif type(frame_data) is str:
new_frame = DataFeatures.from_json(os.path.join(DataFeatures.get_working_directory(), frame_data))
# Loaded frame name have to be equals to dictionary key
assert (new_frame.name == frame_name)
+ else:
+
+ raise ValueError("Bad frame data.")
+
# Look for a scene layer with an AOI named like the frame
for scene_layer_name, scene_layer in self.layers.items():
@@ -1013,8 +1017,7 @@ class ArScene(DataFeatures.PipelineStepObject):
raise NotImplementedError('estimate_pose() method not implemented')
@DataFeatures.PipelineStepMethod
- def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> \
- Iterator[Union[str, AOI2DScene.AOI2DScene]]:
+ def project(self, tvec: numpy.array, rvec: numpy.array, visual_hfov: float = 0., visual_vfov: float = 0.) -> Iterator[Union[str, AOI2DScene.AOI2DScene]]:
"""Project layers according estimated pose and optional field of view clipping angles.
Parameters:
@@ -1050,6 +1053,7 @@ class ArScene(DataFeatures.PipelineStepObject):
aoi_scene_copy = layer.aoi_scene.copy()
# Project layer aoi scene
+ # noinspection PyUnresolvedReferences
yield name, aoi_scene_copy.project(tvec, rvec, self.parent.aruco_detector.optic_parameters.K)
@@ -1246,15 +1250,13 @@ class ArCamera(ArFrame):
inner_x, inner_y = aoi_2d.clockwise().inner_axis(*timestamped_gaze_position)
# QUESTION: How to project gaze precision?
- inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y),
- timestamp=timestamped_gaze_position.timestamp)
+ inner_gaze_position = GazeFeatures.GazePosition((inner_x, inner_y), timestamp=timestamped_gaze_position.timestamp)
# Project inner gaze position into scene frame
scene_frame.look(inner_gaze_position * scene_frame.size)
# Ignore missing aoi in camera frame layer projection
- except KeyError as e:
-
+ except KeyError:
pass
@DataFeatures.PipelineStepMethod
@@ -1375,8 +1377,7 @@ class ArContext(DataFeatures.PipelineStepObject):
"""Exit from ArContext."""
pass
- def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None,
- precision: int | float = None):
+ def _process_gaze_position(self, timestamp: int | float, x: int | float = None, y: int | float = None, precision: int | float = None):
"""Request pipeline to process new gaze position at a timestamp."""
logging.debug('ArContext._process_gaze_position %s', self.name)
@@ -1395,14 +1396,12 @@ class ArContext(DataFeatures.PipelineStepObject):
if x is None and y is None:
# Edit empty gaze position
- self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp),
- catch_exceptions=self.__catch_exceptions)
+ self.__pipeline.look(GazeFeatures.GazePosition(timestamp=timestamp), catch_exceptions=self.__catch_exceptions)
else:
# Edit gaze position
- self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp),
- catch_exceptions=self.__catch_exceptions)
+ self.__pipeline.look(GazeFeatures.GazePosition((x, y), precision=precision, timestamp=timestamp), catch_exceptions=self.__catch_exceptions)
except DataFeatures.TimestampedException as e:
@@ -1439,8 +1438,7 @@ class ArContext(DataFeatures.PipelineStepObject):
logging.debug('\t> watch image (%i x %i)', width, height)
- self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp),
- catch_exceptions=self.__catch_exceptions)
+ self.__pipeline.watch(DataFeatures.TimestampedImage(image, timestamp=timestamp), catch_exceptions=self.__catch_exceptions)
# TODO: make this step optional
self.__pipeline.map(timestamp=timestamp, catch_exceptions=self.__catch_exceptions)
@@ -1477,8 +1475,7 @@ class ArContext(DataFeatures.PipelineStepObject):
if image.is_timestamped():
info_stack += 1
- cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1,
- (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, f'Frame at {image.timestamp}ms', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
if issubclass(type(self.__pipeline), ArCamera):
@@ -1491,8 +1488,7 @@ class ArContext(DataFeatures.PipelineStepObject):
watch_time = math.nan
info_stack += 1
- cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz',
- (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, f'Watch {watch_time}ms at {self.__process_camera_image_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
if issubclass(type(self.__pipeline), ArFrame):
@@ -1505,8 +1501,7 @@ class ArContext(DataFeatures.PipelineStepObject):
look_time = math.nan
info_stack += 1
- cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz',
- (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(image, f'Look {look_time:.2f}ms at {self.__process_gaze_position_frequency}Hz', (20, info_stack * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
if draw_exceptions:
@@ -1515,8 +1510,7 @@ class ArContext(DataFeatures.PipelineStepObject):
e = self.__exceptions.pop()
i = len(self.__exceptions)
- cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - (i) * 50), (0, 0, 127), -1)
- cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1,
- (255, 255, 255), 1, cv2.LINE_AA)
+ cv2.rectangle(image, (0, height - (i + 1) * 50), (width, height - i * 50), (0, 0, 127), -1)
+ cv2.putText(image, f'error: {e}', (20, height - (i + 1) * 50 + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
return image
diff --git a/src/argaze/ArUcoMarkers/ArUcoBoard.py b/src/argaze/ArUcoMarkers/ArUcoBoard.py
index a6d8b02..be475d5 100644
--- a/src/argaze/ArUcoMarkers/ArUcoBoard.py
+++ b/src/argaze/ArUcoMarkers/ArUcoBoard.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/ArUcoMarkers/ArUcoCamera.py b/src/argaze/ArUcoMarkers/ArUcoCamera.py
index bf4e5d3..e61bddc 100644
--- a/src/argaze/ArUcoMarkers/ArUcoCamera.py
+++ b/src/argaze/ArUcoMarkers/ArUcoCamera.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/ArUcoMarkers/ArUcoDetector.py b/src/argaze/ArUcoMarkers/ArUcoDetector.py
index cd8ff20..f675c8f 100644
--- a/src/argaze/ArUcoMarkers/ArUcoDetector.py
+++ b/src/argaze/ArUcoMarkers/ArUcoDetector.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -29,336 +29,336 @@ from argaze.ArUcoMarkers import ArUcoMarkersDictionary, ArUcoMarker, ArUcoOpticC
class DetectorParameters():
- """Wrapper class around ArUco marker detector parameters.
+ """Wrapper class around ArUco marker detector parameters.
- !!! note
- More details on [opencv page](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html)
- """
+ !!! note
+ More details on [opencv page](https://docs.opencv.org/4.x/d1/dcd/structcv_1_1aruco_1_1DetectorParameters.html)
+ """
- __parameters = aruco.DetectorParameters()
- __parameters_names = [
- 'adaptiveThreshConstant',
- 'adaptiveThreshWinSizeMax',
- 'adaptiveThreshWinSizeMin',
- 'adaptiveThreshWinSizeStep',
- 'aprilTagCriticalRad',
- 'aprilTagDeglitch',
- 'aprilTagMaxLineFitMse',
- 'aprilTagMaxNmaxima',
- 'aprilTagMinClusterPixels',
- 'aprilTagMinWhiteBlackDiff',
- 'aprilTagQuadDecimate',
- 'aprilTagQuadSigma',
- 'cornerRefinementMaxIterations',
- 'cornerRefinementMethod',
- 'cornerRefinementMinAccuracy',
- 'cornerRefinementWinSize',
- 'markerBorderBits',
- 'minMarkerPerimeterRate',
- 'maxMarkerPerimeterRate',
- 'minMarkerDistanceRate',
- 'detectInvertedMarker',
- 'errorCorrectionRate',
- 'maxErroneousBitsInBorderRate',
- 'minCornerDistanceRate',
- 'minDistanceToBorder',
- 'minOtsuStdDev',
- 'perspectiveRemoveIgnoredMarginPerCell',
- 'perspectiveRemovePixelPerCell',
- 'polygonalApproxAccuracyRate',
- 'useAruco3Detection'
- ]
+ __parameters = aruco.DetectorParameters()
+ __parameters_names = [
+ 'adaptiveThreshConstant',
+ 'adaptiveThreshWinSizeMax',
+ 'adaptiveThreshWinSizeMin',
+ 'adaptiveThreshWinSizeStep',
+ 'aprilTagCriticalRad',
+ 'aprilTagDeglitch',
+ 'aprilTagMaxLineFitMse',
+ 'aprilTagMaxNmaxima',
+ 'aprilTagMinClusterPixels',
+ 'aprilTagMinWhiteBlackDiff',
+ 'aprilTagQuadDecimate',
+ 'aprilTagQuadSigma',
+ 'cornerRefinementMaxIterations',
+ 'cornerRefinementMethod',
+ 'cornerRefinementMinAccuracy',
+ 'cornerRefinementWinSize',
+ 'markerBorderBits',
+ 'minMarkerPerimeterRate',
+ 'maxMarkerPerimeterRate',
+ 'minMarkerDistanceRate',
+ 'detectInvertedMarker',
+ 'errorCorrectionRate',
+ 'maxErroneousBitsInBorderRate',
+ 'minCornerDistanceRate',
+ 'minDistanceToBorder',
+ 'minOtsuStdDev',
+ 'perspectiveRemoveIgnoredMarginPerCell',
+ 'perspectiveRemovePixelPerCell',
+ 'polygonalApproxAccuracyRate',
+ 'useAruco3Detection'
+ ]
- def __init__(self, **kwargs):
+ def __init__(self, **kwargs):
- for parameter, value in kwargs.items():
+ for parameter, value in kwargs.items():
+ setattr(self.__parameters, parameter, value)
- setattr(self.__parameters, parameter, value)
+ self.__dict__.update(kwargs)
- self.__dict__.update(kwargs)
+ def __setattr__(self, parameter, value):
- def __setattr__(self, parameter, value):
+ setattr(self.__parameters, parameter, value)
- setattr(self.__parameters, parameter, value)
+ def __getattr__(self, parameter):
- def __getattr__(self, parameter):
+ return getattr(self.__parameters, parameter)
- return getattr(self.__parameters, parameter)
+ @classmethod
+ def from_json(cls, json_filepath) -> Self:
+ """Load detector parameters from .json file."""
- @classmethod
- def from_json(cls, json_filepath) -> Self:
- """Load detector parameters from .json file."""
+ with open(json_filepath) as configuration_file:
+ return DetectorParameters(**json.load(configuration_file))
- with open(json_filepath) as configuration_file:
+ def __str__(self) -> str:
+ """Detector parameters string representation."""
- return DetectorParameters(**json.load(configuration_file))
+ return f'{self}'
- def __str__(self) -> str:
- """Detector parameters string representation."""
+ def __format__(self, spec: str) -> str:
+ """Formated detector parameters string representation.
- return f'{self}'
+ Parameters:
+ spec: 'modified' to get only modified parameters.
+ """
- def __format__(self, spec: str) -> str:
- """Formated detector parameters string representation.
+ output = ''
- Parameters:
- spec: 'modified' to get only modified parameters.
- """
+ for parameter in self.__parameters_names:
- output = ''
+ if parameter in self.__dict__.keys():
- for parameter in self.__parameters_names:
+ output += f'\t*{parameter}: {getattr(self.__parameters, parameter)}\n'
- if parameter in self.__dict__.keys():
+ elif spec == "":
- output += f'\t*{parameter}: {getattr(self.__parameters, parameter)}\n'
+ output += f'\t{parameter}: {getattr(self.__parameters, parameter)}\n'
- elif spec == "":
+ return output
- output += f'\t{parameter}: {getattr(self.__parameters, parameter)}\n'
+ @property
+ def internal(self):
+ return self.__parameters
- return output
-
- @property
- def internal(self):
- return self.__parameters
class ArUcoDetector(DataFeatures.PipelineStepObject):
- """OpenCV ArUco library wrapper."""
+ """OpenCV ArUco library wrapper."""
- # noinspection PyMissingConstructor
- @DataFeatures.PipelineStepInit
- def __init__(self, **kwargs):
- """Initialize ArUcoDetector."""
+ # noinspection PyMissingConstructor
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+ """Initialize ArUcoDetector."""
- # Init private attributes
- self.__dictionary = None
- self.__optic_parameters = None
- self.__parameters = None
+ # Init private attributes
+ self.__dictionary = None
+ self.__optic_parameters = None
+ self.__parameters = None
- # Init detected markers data
- self.__detected_markers = {}
+ # Init detected markers data
+ self.__detected_markers = {}
- # Init detected board data
- self.__board = None
- self.__board_corners_number = 0
- self.__board_corners = []
- self.__board_corners_ids = []
+ # Init detected board data
+ self.__board = None
+ self.__board_corners_number = 0
+ self.__board_corners = []
+ self.__board_corners_ids = []
- @property
- def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary:
- """ArUco markers dictionary to detect."""
- return self.__dictionary
+ @property
+ def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary:
+ """ArUco markers dictionary to detect."""
+ return self.__dictionary
- @dictionary.setter
- @DataFeatures.PipelineStepAttributeSetter
- def dictionary(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary):
+ @dictionary.setter
+ @DataFeatures.PipelineStepAttributeSetter
+ def dictionary(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary):
- self.__dictionary = dictionary
-
- @property
- def optic_parameters(self) -> ArUcoOpticCalibrator.OpticParameters:
- """Optic parameters to use for ArUco detection into image."""
- return self.__optic_parameters
+ self.__dictionary = dictionary
- @optic_parameters.setter
- @DataFeatures.PipelineStepAttributeSetter
- def optic_parameters(self, optic_parameters: ArUcoOpticCalibrator.OpticParameters):
+ @property
+ def optic_parameters(self) -> ArUcoOpticCalibrator.OpticParameters:
+ """Optic parameters to use for ArUco detection into image."""
+ return self.__optic_parameters
- self.__optic_parameters = optic_parameters
-
- @property
- def parameters(self) -> DetectorParameters:
- """ArUco detector parameters."""
- return self.__parameters
+ @optic_parameters.setter
+ @DataFeatures.PipelineStepAttributeSetter
+ def optic_parameters(self, optic_parameters: ArUcoOpticCalibrator.OpticParameters):
- @parameters.setter
- @DataFeatures.PipelineStepAttributeSetter
- def parameters(self, parameters: DetectorParameters):
+ self.__optic_parameters = optic_parameters
- self.__parameters = parameters
+ @property
+ def parameters(self) -> DetectorParameters:
+ """ArUco detector parameters."""
+ return self.__parameters
- @DataFeatures.PipelineStepMethod
- def detect_markers(self, image: numpy.array):
- """Detect all ArUco markers into an image.
+ @parameters.setter
+ @DataFeatures.PipelineStepAttributeSetter
+ def parameters(self, parameters: DetectorParameters):
- !!! danger "DON'T MIRROR IMAGE"
- It makes the markers detection to fail.
+ self.__parameters = parameters
- !!! danger "DON'T UNDISTORTED IMAGE"
- Camera intrinsic parameters and distortion coefficients are used later during pose estimation.
- """
+ @DataFeatures.PipelineStepMethod
+ def detect_markers(self, image: numpy.array):
+ """Detect all ArUco markers into an image.
- # Reset detected markers data
- self.__detected_markers, detected_markers_corners, detected_markers_ids = {}, [], []
+ !!! danger "DON'T MIRROR IMAGE"
+ It makes the markers detection to fail.
- # Detect markers into gray picture
- detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(cv.cvtColor(image, cv.COLOR_BGR2GRAY), self.__dictionary.markers, parameters = self.__parameters.internal)
-
- # Is there detected markers ?
- if len(detected_markers_corners) > 0:
+ !!! danger "DON'T UNDISTORTED IMAGE"
+ Camera intrinsic parameters and distortion coefficients are used later during pose estimation.
+ """
- # Transform markers ids array into list
- detected_markers_ids = detected_markers_ids.T[0]
+ # Reset detected markers data
+ self.__detected_markers, detected_markers_corners, detected_markers_ids = {}, [], []
- for i, marker_id in enumerate(detected_markers_ids):
+ # Detect markers into gray picture
+ detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(cv.cvtColor(image, cv.COLOR_BGR2GRAY),
+ self.__dictionary.markers,
+ parameters=self.__parameters.internal)
- marker = ArUcoMarker.ArUcoMarker(self.__dictionary, marker_id)
- marker.corners = detected_markers_corners[i][0]
+ # Is there detected markers ?
+ if len(detected_markers_corners) > 0:
- # No pose estimation: call estimate_markers_pose to get one
- marker.translation = numpy.empty([0])
- marker.rotation = numpy.empty([0])
- marker.points = numpy.empty([0])
+ # Transform markers ids array into list
+ detected_markers_ids = detected_markers_ids.T[0]
- self.__detected_markers[marker_id] = marker
+ for i, marker_id in enumerate(detected_markers_ids):
+ marker = ArUcoMarker.ArUcoMarker(self.__dictionary, marker_id)
+ marker.corners = detected_markers_corners[i][0]
- def estimate_markers_pose(self, size: float, ids: list = []):
- """Estimate pose detected markers pose considering a marker size.
+ # No pose estimation: call estimate_markers_pose to get one
+ marker.translation = numpy.empty([0])
+ marker.rotation = numpy.empty([0])
+ marker.points = numpy.empty([0])
- Parameters:
- size: size of markers in centimeters.
- ids: markers id list to select detected markers.
- """
+ self.__detected_markers[marker_id] = marker
- # Is there detected markers ?
- if len(self.__detected_markers) > 0:
+ def estimate_markers_pose(self, size: float, ids: list = []):
+ """Estimate pose detected markers pose considering a marker size.
- # Select all markers by default
- if len(ids) == 0:
+ Parameters:
+ size: size of markers in centimeters.
+ ids: markers id list to select detected markers.
+ """
- ids = self.__detected_markers.keys()
+ # Is there detected markers ?
+ if len(self.__detected_markers) > 0:
- # Prepare data for aruco.estimatePoseSingleMarkers function
- selected_markers_corners = tuple()
- selected_markers_ids = []
+ # Select all markers by default
+ if len(ids) == 0:
+ ids = self.__detected_markers.keys()
- for marker_id, marker in self.__detected_markers.items():
+ # Prepare data for aruco.estimatePoseSingleMarkers function
+ selected_markers_corners = tuple()
+ selected_markers_ids = []
- if marker_id in ids:
+ for marker_id, marker in self.__detected_markers.items():
- selected_markers_corners += (marker.corners,)
- selected_markers_ids.append(marker_id)
+ if marker_id in ids:
+ selected_markers_corners += (marker.corners,)
+ selected_markers_ids.append(marker_id)
- # Estimate pose of selected markers
- if len(selected_markers_corners) > 0:
+ # Estimate pose of selected markers
+ if len(selected_markers_corners) > 0:
- markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(selected_markers_corners, size, numpy.array(self.__optic_parameters.K), numpy.array(self.__optic_parameters.D))
+ markers_rvecs, markers_tvecs, markers_points = aruco.estimatePoseSingleMarkers(selected_markers_corners,
+ size, numpy.array(
+ self.__optic_parameters.K), numpy.array(self.__optic_parameters.D))
- for i, marker_id in enumerate(selected_markers_ids):
+ for i, marker_id in enumerate(selected_markers_ids):
+ marker = self.__detected_markers[marker_id]
- marker = self.__detected_markers[marker_id]
+ marker.translation = markers_tvecs[i][0]
+ marker.rotation, _ = cv.Rodrigues(markers_rvecs[i][0])
+ marker.size = size
+ marker.points = markers_points.reshape(4, 3).dot(marker.rotation) - marker.translation
- marker.translation = markers_tvecs[i][0]
- marker.rotation, _ = cv.Rodrigues(markers_rvecs[i][0])
- marker.size = size
- marker.points = markers_points.reshape(4, 3).dot(marker.rotation) - marker.translation
+ def detected_markers(self) -> dict[int, ArUcoMarker.ArUcoMarker]:
+ """Access to detected markers' dictionary."""
- def detected_markers(self) -> dict[int, ArUcoMarker.ArUcoMarker]:
- """Access to detected markers' dictionary."""
+ return self.__detected_markers
- return self.__detected_markers
+ def detected_markers_number(self) -> int:
+ """Return detected markers number."""
- def detected_markers_number(self) -> int:
- """Return detected markers number."""
+ return len(list(self.__detected_markers.keys()))
- return len(list(self.__detected_markers.keys()))
+ def draw_detected_markers(self, image: numpy.array, draw_marker: dict = None):
+ """Draw detected markers.
- def draw_detected_markers(self, image: numpy.array, draw_marker: dict = None):
- """Draw detected markers.
+ Parameters:
+ image: image where to draw
+ draw_marker: ArucoMarker.draw parameters (if None, no marker drawn)
+ """
- Parameters:
- image: image where to draw
- draw_marker: ArucoMarker.draw parameters (if None, no marker drawn)
- """
+ if draw_marker is not None:
- if draw_marker is not None:
+ for marker_id, marker in self.__detected_markers.items():
+ marker.draw(image, self.__optic_parameters.K, self.__optic_parameters.D, **draw_marker)
- for marker_id, marker in self.__detected_markers.items():
+ def detect_board(self, image: numpy.array, board, expected_markers_number):
+ """Detect ArUco markers board in image setting up the number of detected markers needed to agree detection.
- marker.draw(image, self.__optic_parameters.K, self.__optic_parameters.D, **draw_marker)
-
- def detect_board(self, image: numpy.array, board, expected_markers_number):
- """Detect ArUco markers board in image setting up the number of detected markers needed to agree detection.
+ !!! danger "DON'T MIRROR IMAGE"
+ It makes the markers detection to fail.
+ """
- !!! danger "DON'T MIRROR IMAGE"
- It makes the markers detection to fail.
- """
-
- # detect markers from gray picture
- gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
- detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers, parameters = self.__parameters.internal)
+ # detect markers from gray picture
+ gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
+ detected_markers_corners, detected_markers_ids, _ = aruco.detectMarkers(gray, self.__dictionary.markers,
+ parameters=self.__parameters.internal)
- # if all board markers are detected
- if len(detected_markers_corners) == expected_markers_number:
+ # if all board markers are detected
+ if len(detected_markers_corners) == expected_markers_number:
- self.__board = board
- self.__board_corners_number, self.__board_corners, self.__board_corners_ids = aruco.interpolateCornersCharuco(detected_markers_corners, detected_markers_ids, gray, self.__board.model)
+ self.__board = board
+ self.__board_corners_number, self.__board_corners, self.__board_corners_ids = aruco.interpolateCornersCharuco(
+ detected_markers_corners, detected_markers_ids, gray, self.__board.model)
- else:
+ else:
- self.__board = None
- self.__board_corners_number = 0
- self.__board_corners = []
- self.__board_corners_ids = []
+ self.__board = None
+ self.__board_corners_number = 0
+ self.__board_corners = []
+ self.__board_corners_ids = []
- def draw_board(self, image: numpy.array):
- """Draw detected board corners in image."""
+ def draw_board(self, image: numpy.array):
+ """Draw detected board corners in image."""
- if self.__board != None:
+ if self.__board is not None:
+ cv.drawChessboardCorners(image, ((self.__board.size[0] - 1), (self.__board.size[1] - 1)),
+ self.__board_corners, True)
- cv.drawChessboardCorners(image, ((self.__board.size[0] - 1 ), (self.__board.size[1] - 1)), self.__board_corners, True)
+ def board_corners_number(self) -> int:
+ """Get detected board corners number."""
- def board_corners_number(self) -> int:
- """Get detected board corners number."""
+ return self.__board_corners_number
- return self.__board_corners_number
+ def board_corners_identifier(self) -> list[int]:
+ """Get detected board corners identifier."""
- def board_corners_identifier(self) -> list[int]:
- """Get detected board corners identifier."""
+ return self.__board_corners_ids
- return self.__board_corners_ids
+ def board_corners(self) -> list:
+ """Get detected board corners."""
- def board_corners(self) -> list:
- """Get detected board corners."""
+ return self.__board_corners
- return self.__board_corners
class Observer():
- """Define ArUcoDetector observer to count how many times detection succeeded and how many times markers are detected."""
-
- def __init__(self):
- """Initialize marker detection metrics."""
+ """Define ArUcoDetector observer to count how many times detection succeeded and how many times markers are detected."""
- self.__try_count = 0
- self.__success_count = 0
- self.__detected_ids = []
+ def __init__(self):
+ """Initialize marker detection metrics."""
- @property
- def metrics(self) -> tuple[int, int, dict]:
- """Get marker detection metrics.
+ self.__try_count = 0
+ self.__success_count = 0
+ self.__detected_ids = []
- Returns:
- number of detect function call
- dict with number of detection for each marker identifier
- """
+ @property
+ def metrics(self) -> tuple[int, int, dict]:
+ """Get marker detection metrics.
- return self.__try_count, self.__success_count, Counter(self.__detected_ids)
+ Returns:
+ number of detect function call
+ dict with number of detection for each marker identifier
+ """
- def reset(self):
- """Reset marker detection metrics."""
+ return self.__try_count, self.__success_count, Counter(self.__detected_ids)
- self.__try_count = 0
- self.__success_count = 0
- self.__detected_ids = []
+ def reset(self):
+ """Reset marker detection metrics."""
- def on_detect_markers(self, timestamp, aruco_detector, exception):
- """Update ArUco markers detection metrics."""
+ self.__try_count = 0
+ self.__success_count = 0
+ self.__detected_ids = []
- self.__try_count += 1
- detected_markers_list = list(aruco_detector.detected_markers().keys())
+ def on_detect_markers(self, timestamp, aruco_detector, exception):
+ """Update ArUco markers detection metrics."""
- if len(detected_markers_list):
+ self.__try_count += 1
+ detected_markers_list = list(aruco_detector.detected_markers().keys())
- self.__success_count += 1
- self.__detected_ids.extend(detected_markers_list)
+ if len(detected_markers_list):
+ self.__success_count += 1
+ self.__detected_ids.extend(detected_markers_list)
diff --git a/src/argaze/ArUcoMarkers/ArUcoMarker.py b/src/argaze/ArUcoMarkers/ArUcoMarker.py
index 42cb174..cf573dc 100644
--- a/src/argaze/ArUcoMarkers/ArUcoMarker.py
+++ b/src/argaze/ArUcoMarkers/ArUcoMarker.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py
index 72fc688..613a3c5 100644
--- a/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py
+++ b/src/argaze/ArUcoMarkers/ArUcoMarkersDictionary.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py
index a6f7b43..fd33664 100644
--- a/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py
+++ b/src/argaze/ArUcoMarkers/ArUcoMarkersGroup.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -33,447 +33,444 @@ T0 = numpy.array([0., 0., 0.])
R0 = numpy.array([[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]])
"""Define no rotation matrix."""
+
def make_rotation_matrix(x, y, z):
+ # Create rotation matrix around x-axis
+ c = numpy.cos(numpy.deg2rad(x))
+ s = numpy.sin(numpy.deg2rad(x))
+ rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]])
- # Create rotation matrix around x-axis
- c = numpy.cos(numpy.deg2rad(x))
- s = numpy.sin(numpy.deg2rad(x))
- Rx = numpy.array([[1, 0, 0], [0, c, -s], [0, s, c]])
+ # Create rotation matrix around y-axis
+ c = numpy.cos(numpy.deg2rad(y))
+ s = numpy.sin(numpy.deg2rad(y))
+ ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]])
- # Create rotation matrix around y-axis
- c = numpy.cos(numpy.deg2rad(y))
- s = numpy.sin(numpy.deg2rad(y))
- Ry = numpy.array([[c, 0, s], [0, 1, 0], [-s, 0, c]])
+ # Create rotation matrix around z axis
+ c = numpy.cos(numpy.deg2rad(z))
+ s = numpy.sin(numpy.deg2rad(z))
+ rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])
- # Create rotation matrix around z axis
- c = numpy.cos(numpy.deg2rad(z))
- s = numpy.sin(numpy.deg2rad(z))
- Rz = numpy.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])
+ # Return intrinsic rotation matrix
+ return rx.dot(ry.dot(rz))
- # Return intrinsic rotation matrix
- return Rx.dot(Ry.dot(Rz))
-def is_rotation_matrix(R):
+def is_rotation_matrix(mat):
+ rt = numpy.transpose(mat)
+ should_be_identity = numpy.dot(rt, mat)
+ i = numpy.identity(3, dtype=mat.dtype)
+ n = numpy.linalg.norm(i - should_be_identity)
- Rt = numpy.transpose(R)
- shouldBeIdentity = numpy.dot(Rt, R)
- I = numpy.identity(3, dtype = R.dtype)
- n = numpy.linalg.norm(I - shouldBeIdentity)
+ return n < 1e-3
- return n < 1e-3
@dataclass(frozen=True)
-class Place():
- """Define a place as list of corners position and a marker.
+class Place:
+ """Define a place as list of corners position and a marker.
+
+ Parameters:
+ corners: 3D corners position in group referential.
+ marker: ArUco marker linked to the place.
+ """
- Parameters:
- corners: 3D corners position in group referential.
- marker: ArUco marker linked to the place.
- """
+ corners: numpy.array
+ marker: ArUcoMarker.ArUcoMarker
- corners: numpy.array
- marker: ArUcoMarker.ArUcoMarker
class ArUcoMarkersGroup(DataFeatures.PipelineStepObject):
- """
- Handle group of ArUco markers as one unique spatial entity and estimate its pose.
- """
+ """
+ Handle group of ArUco markers as one unique spatial entity and estimate its pose.
+ """
+
+ # noinspection PyMissingConstructor
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+ """Initialize ArUcoMarkersGroup"""
+
+ # Init private attributes
+ self.marker_size = None
+ self.__dictionary = None
+ self.__places = {}
+ self.__translation = numpy.zeros(3)
+ self.__rotation = numpy.zeros(3)
- # noinspection PyMissingConstructor
- @DataFeatures.PipelineStepInit
- def __init__(self, **kwargs):
- """Initialize ArUcoMarkersGroup"""
+ @property
+ def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary:
+ """Expected dictionary of all markers in the group."""
+ return self.__dictionary
- # Init private attributes
- self.marker_size = None
- self.__dictionary = None
- self.__places = {}
- self.__translation = numpy.zeros(3)
- self.__rotation = numpy.zeros(3)
+ @dictionary.setter
+ def dictionary(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary):
- @property
- def dictionary(self) -> ArUcoMarkersDictionary.ArUcoMarkersDictionary:
- """Expected dictionary of all markers in the group."""
- return self.__dictionary
+ self.__dictionary = dictionary
- @dictionary.setter
- def dictionary(self, dictionary: ArUcoMarkersDictionary.ArUcoMarkersDictionary):
+ @property
+ def places(self) -> dict:
+ """Expected markers place."""
+ return self.__places
- self.__dictionary = dictionary
-
- @property
- def places(self) -> dict:
- """Expected markers place."""
- return self.__places
+ @places.setter
+ def places(self, places: dict):
- @places.setter
- def places(self, places: dict):
+ # Normalize places data
+ new_places = {}
- # Normalize places data
- new_places = {}
+ for identifier, data in places.items():
- for identifier, data in places.items():
+ # Convert string identifier to int value
+ if type(identifier) is str:
- # Convert string identifier to int value
- if type(identifier) == str:
+ identifier = int(identifier)
- identifier = int(identifier)
+ # Get translation vector
+ tvec = numpy.array(data.pop('translation')).astype(numpy.float32)
- # Get translation vector
- tvec = numpy.array(data.pop('translation')).astype(numpy.float32)
+ # Check rotation value shape
+ rvalue = numpy.array(data.pop('rotation')).astype(numpy.float32)
- # Check rotation value shape
- rvalue = numpy.array(data.pop('rotation')).astype(numpy.float32)
+ # Rotation matrix
+ if rvalue.shape == (3, 3):
- # Rotation matrix
- if rvalue.shape == (3, 3):
+ rmat = rvalue
- rmat = rvalue
+ # Rotation vector (expected in degree)
+ elif rvalue.shape == (3,):
- # Rotation vector (expected in degree)
- elif rvalue.shape == (3,):
+ rmat = make_rotation_matrix(rvalue[0], rvalue[1], rvalue[2]).astype(numpy.float32)
- rmat = make_rotation_matrix(rvalue[0], rvalue[1], rvalue[2]).astype(numpy.float32)
+ else:
- else:
+ raise ValueError(f'Bad rotation value: {rvalue}')
- raise ValueError(f'Bad rotation value: {rvalue}')
+ assert (is_rotation_matrix(rmat))
- assert(is_rotation_matrix(rmat))
+ # Get marker size
+ size = float(numpy.array(data.pop('size')).astype(numpy.float32))
- # Get marker size
- size = float(numpy.array(data.pop('size')).astype(numpy.float32))
+ new_marker = ArUcoMarker.ArUcoMarker(self.__dictionary, identifier, size)
- new_marker = ArUcoMarker.ArUcoMarker(self.__dictionary, identifier, size)
+ # Build marker corners thanks to translation vector and rotation matrix
+ place_corners = numpy.array([[-size / 2, size / 2, 0], [size / 2, size / 2, 0], [size / 2, -size / 2, 0], [-size / 2, -size / 2, 0]])
+ place_corners = place_corners.dot(rmat) + tvec
- # Build marker corners thanks to translation vector and rotation matrix
- place_corners = numpy.array([[-size/2, size/2, 0], [size/2, size/2, 0], [size/2, -size/2, 0], [-size/2, -size/2, 0]])
- place_corners = place_corners.dot(rmat) + tvec
+ new_places[identifier] = Place(place_corners, new_marker)
- new_places[identifier] = Place(place_corners, new_marker)
+ # else places are configured using detected markers estimated points
+ elif isinstance(data, ArUcoMarker.ArUcoMarker):
- # else places are configured using detected markers estimated points
- elif isinstance(data, ArUcoMarker.ArUcoMarker):
+ new_places[identifier] = Place(data.points, data)
- new_places[identifier] = Place(data.points, data)
+ # else places are already at expected format
+ elif (type(identifier) is int) and isinstance(data, Place):
- # else places are already at expected format
- elif (type(identifier) == int) and isinstance(data, Place):
+ new_places[identifier] = data
- new_places[identifier] = data
+ self.__places = new_places
- self.__places = new_places
+ @property
+ def identifiers(self) -> list:
+ """List place marker identifiers belonging to the group."""
+ return list(self.__places.keys())
- @property
- def identifiers(self) -> list:
- """List place marker identifiers belonging to the group."""
- return list(self.__places.keys())
+ @property
+ def translation(self) -> numpy.array:
+ """Get ArUco marker group translation vector."""
+ return self.__translation
- @property
- def translation(self) -> numpy.array:
- """Get ArUco marker group translation vector."""
- return self.__translation
+ @translation.setter
+ def translation(self, tvec):
+ """Set ArUco marker group translation vector."""
+ self.__translation = tvec
- @translation.setter
- def translation(self, tvec):
- """Set ArUco marker group translation vector."""
- self.__translation = tvec
+ @property
+ def rotation(self) -> numpy.array:
+ """Get ArUco marker group rotation matrix."""
+ return self.__translation
- @property
- def rotation(self) -> numpy.array:
- """Get ArUco marker group rotation matrix."""
- return self.__translation
+ @rotation.setter
+ def rotation(self, rmat):
+ """Set ArUco marker group rotation matrix."""
+ self.__rotation = rmat
- @rotation.setter
- def rotation(self, rmat):
- """Set ArUco marker group rotation matrix."""
- self.__rotation = rmat
+ def as_dict(self) -> dict:
+ """Export ArUco marker group properties as dictionary."""
- def as_dict(self) -> dict:
- """Export ArUco marker group properties as dictionary."""
+ return {
+ **DataFeatures.PipelineStepObject.as_dict(self),
+ "dictionary": self.__dictionary,
+ "places": self.__places
+ }
- return {
- **DataFeatures.PipelineStepObject.as_dict(self),
- "dictionary": self.__dictionary,
- "places": self.__places
- }
-
- @classmethod
- def from_obj(cls, obj_filepath: str) -> Self:
- """Load ArUco markers group from .obj file.
+ @classmethod
+ def from_obj(cls, obj_filepath: str) -> Self:
+ """Load ArUco markers group from .obj file.
- !!! note
- Expected object (o) name format: <DICTIONARY>#<IDENTIFIER>_Marker
+ !!! note
+ Expected object (o) name format: <DICTIONARY>#<IDENTIFIER>_Marker
- !!! note
- All markers have to belong to the same dictionary.
+ !!! note
+ All markers have to belong to the same dictionary.
- """
+ """
- new_dictionary = None
- new_places = {}
-
- # Regex rules for .obj file parsing
- OBJ_RX_DICT = {
- 'object': re.compile(r'o (.*)#([0-9]+)_(.*)\n'),
- 'vertices': re.compile(r'v ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+)\n'),
- 'face': re.compile(r'f ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)\n'),
- 'comment': re.compile(r'#(.*)\n') # keep comment regex after object regex because the # is used in object string too
- }
+ new_dictionary = None
+ new_places = {}
- # Regex .obj line parser
- def __parse_obj_line(line):
+ # Regex rules for .obj file parsing
+ obj_rx_dict = {
+ 'object': re.compile(r'o (.*)#([0-9]+)_(.*)\n'),
+ 'vertices': re.compile(r'v ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+) ([+-]?[0-9]*[.]?[0-9]+)\n'),
+ 'face': re.compile(r'f ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)\n'),
+ 'comment': re.compile(r'#(.*)\n')
+ # keep comment regex after object regex because the # is used in object string too
+ }
- for key, rx in OBJ_RX_DICT.items():
- match = rx.search(line)
- if match:
- return key, match
+ # Regex .obj line parser
+ def __parse_obj_line(ln):
- # If there are no matches
- return None, None
-
- # Start parsing
- try:
+ for k, rx in obj_rx_dict.items():
+ m = rx.search(ln)
+ if m:
+ return k, m
- identifier = None
- vertices = []
- faces = {}
+ # If there are no matches
+ return None, None
- # Open the file and read through it line by line
- with open(obj_filepath, 'r') as file:
+ # Start parsing
+ try:
- line = file.readline()
+ identifier = None
+ vertices = []
+ faces = {}
- while line:
+ # Open the file and read through it line by line
+ with open(obj_filepath, 'r') as file:
- # At each line check for a match with a regex
- key, match = __parse_obj_line(line)
+ line = file.readline()
- # Extract comment
- if key == 'comment':
- pass
+ while line:
- # Extract marker dictionary and identifier
- elif key == 'object':
+ # At each line check for a match with a regex
+ key, match = __parse_obj_line(line)
- dictionary = str(match.group(1))
- identifier = int(match.group(2))
- last = str(match.group(3))
+ # Extract comment
+ if key == 'comment':
+ pass
- # Init new group dictionary with first dictionary name
- if new_dictionary == None:
+ # Extract marker dictionary and identifier
+ elif key == 'object':
- new_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(dictionary)
+ dictionary = str(match.group(1))
+ identifier = int(match.group(2))
- # Check all others marker dictionary are equal to new group dictionary
- elif dictionary != new_dictionary.name:
+ # Init new group dictionary with first dictionary name
+ if new_dictionary is None:
- raise NameError(f'Marker {identifier} dictionary is not {new_dictionary.name}')
+ new_dictionary = ArUcoMarkersDictionary.ArUcoMarkersDictionary(dictionary)
- # Fill vertices array
- elif key == 'vertices':
+ # Check all others marker dictionary are equal to new group dictionary
+ elif dictionary != new_dictionary.name:
- vertices.append(tuple([float(match.group(1)), float(match.group(2)), float(match.group(3))]))
+ raise NameError(f'Marker {identifier} dictionary is not {new_dictionary.name}')
- # Extract vertices ids
- elif key == 'face':
+ # Fill vertices array
+ elif key == 'vertices':
- faces[identifier] = [int(match.group(1)), int(match.group(2)), int(match.group(3)), int(match.group(4))]
+ vertices.append(tuple([float(match.group(1)), float(match.group(2)), float(match.group(3))]))
- # Go to next line
- line = file.readline()
+ # Extract vertices ids
+ elif key == 'face':
- file.close()
+ faces[identifier] = [int(match.group(1)), int(match.group(2)), int(match.group(3)), int(match.group(4))]
- # Retrieve marker vertices thanks to face vertices ids
- for identifier, face in faces.items():
+ # Go to next line
+ line = file.readline()
- # Gather place corners in clockwise order
- cw_corners = numpy.array([ vertices[i-1] for i in reversed(face) ])
+ file.close()
- # Edit place axis from corners positions
- place_x_axis = cw_corners[2] - cw_corners[3]
- place_x_axis_norm = numpy.linalg.norm(place_x_axis)
-
- place_y_axis = cw_corners[0] - cw_corners[3]
- place_y_axis_norm = numpy.linalg.norm(place_y_axis)
+ # Retrieve marker vertices thanks to face vertices ids
+ for identifier, face in faces.items():
- # Check axis size: they should be almost equal
- if math.isclose(place_x_axis_norm, place_y_axis_norm, rel_tol=1e-3):
+ # Gather place corners in clockwise order
+ cw_corners = numpy.array([vertices[i - 1] for i in reversed(face)])
- new_marker_size = place_x_axis_norm
+ # Edit place axis from corners positions
+ place_x_axis = cw_corners[2] - cw_corners[3]
+ place_x_axis_norm = numpy.linalg.norm(place_x_axis)
- else:
+ place_y_axis = cw_corners[0] - cw_corners[3]
+ place_y_axis_norm = numpy.linalg.norm(place_y_axis)
- raise ValueError(f'{new_dictionary}#{identifier}_Marker is not a square.')
+ # Check axis size: they should be almost equal
+ if math.isclose(place_x_axis_norm, place_y_axis_norm, rel_tol=1e-3):
- # Create a new place related to a new marker
- new_marker = ArUcoMarker.ArUcoMarker(new_dictionary, identifier, new_marker_size)
- new_places[identifier] = Place(cw_corners, new_marker)
+ new_marker_size = place_x_axis_norm
- except IOError:
- raise IOError(f'File not found: {obj_filepath}')
+ else:
- # Instantiate ArUco markers group
- data = {
- 'dictionary': new_dictionary,
- 'places': new_places
- }
+ raise ValueError(f'{new_dictionary}#{identifier}_Marker is not a square.')
- return ArUcoMarkersGroup(**data)
+ # Create a new place related to a new marker
+ new_marker = ArUcoMarker.ArUcoMarker(new_dictionary, identifier, new_marker_size)
+ new_places[identifier] = Place(cw_corners, new_marker)
- def filter_markers(self, detected_markers: dict) -> tuple[dict, dict]:
- """Sort markers belonging to the group from given detected markers dict (cf ArUcoDetector.detect_markers()).
+ except IOError:
+ raise IOError(f'File not found: {obj_filepath}')
- Returns:
- dict of markers belonging to this group
- dict of remaining markers not belonging to this group
- """
+ # Instantiate ArUco markers group
+ data = {
+ 'dictionary': new_dictionary,
+ 'places': new_places
+ }
- group_markers = {}
- remaining_markers = {}
+ return ArUcoMarkersGroup(**data)
- for (marker_id, marker) in detected_markers.items():
+ def filter_markers(self, detected_markers: dict) -> tuple[dict, dict]:
+ """Sort markers belonging to the group from given detected markers dict (cf ArUcoDetector.detect_markers()).
- if marker_id in self.__places.keys():
+ Returns:
+ dict of markers belonging to this group
+ dict of remaining markers not belonging to this group
+ """
- group_markers[marker_id] = marker
+ group_markers = {}
+ remaining_markers = {}
- else:
-
- remaining_markers[marker_id] = marker
+ for (marker_id, marker) in detected_markers.items():
- return group_markers, remaining_markers
+ if marker_id in self.__places.keys():
- def estimate_pose_from_markers_corners(self, markers: dict, K: numpy.array, D: numpy.array) -> tuple[bool, numpy.array, numpy.array]:
- """Estimate pose from markers corners and places corners.
+ group_markers[marker_id] = marker
- Parameters:
- markers: detected markers to use for pose estimation.
- K: intrinsic camera parameters
- D: camera distorsion matrix
+ else:
- Returns:
- success: True if the pose estimation succeeded
- tvec: scene translation vector
- rvec: scene rotation vector
- """
+ remaining_markers[marker_id] = marker
- markers_corners_2d = []
- places_corners_3d = []
+ return group_markers, remaining_markers
- for identifier, marker in markers.items():
+ def estimate_pose_from_markers_corners(self, markers: dict, k: numpy.array, d: numpy.array) -> tuple[
+ bool, numpy.array, numpy.array]:
+ """Estimate pose from markers corners and places corners.
- try:
+ Parameters:
+ markers: detected markers to use for pose estimation.
+ k: intrinsic camera parameters
+ d: camera distortion matrix
- place = self.__places[identifier]
+ Returns:
+ success: True if the pose estimation succeeded
+ tvec: scene translation vector
+ rvec: scene rotation vector
+ """
- for marker_corner in marker.corners:
- markers_corners_2d.append(list(marker_corner))
+ markers_corners_2d = []
+ places_corners_3d = []
- for place_corner in place.corners:
- places_corners_3d.append(list(place_corner))
+ for identifier, marker in markers.items():
- except KeyError:
+ try:
- raise ValueError(f'Marker {marker.identifier} doesn\'t belong to the group.')
+ place = self.__places[identifier]
- # SolvPnP using cv2.SOLVEPNP_SQPNP flag
- # TODO: it works also with cv2.SOLVEPNP_EPNP flag so we need to test which is the faster.
- # About SolvPnP flags: https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html
- success, rvec, tvec = cv2.solvePnP(numpy.array(places_corners_3d), numpy.array(markers_corners_2d), numpy.array(K), numpy.array(D), flags=cv2.SOLVEPNP_SQPNP)
+ for marker_corner in marker.corners:
+ markers_corners_2d.append(list(marker_corner))
- # Refine pose estimation using Gauss-Newton optimisation
- if success :
+ for place_corner in place.corners:
+ places_corners_3d.append(list(place_corner))
- rvec, tvec = cv2.solvePnPRefineVVS(numpy.array(places_corners_3d), numpy.array(markers_corners_2d), numpy.array(K), numpy.array(D), rvec, tvec)
+ except KeyError:
- self.__translation = tvec.T
- self.__rotation = rvec.T
+ raise ValueError(f'Marker {marker.identifier} doesn\'t belong to the group.')
- return success, self.__translation, self.__rotation
+ # SolvPnP using cv2.SOLVEPNP_SQPNP flag
+ # TODO: it works also with cv2.SOLVEPNP_EPNP flag so we need to test which is the faster.
+ # About SolvPnP flags: https://docs.opencv.org/4.x/d5/d1f/calib3d_solvePnP.html
+ success, rvec, tvec = cv2.solvePnP(numpy.array(places_corners_3d), numpy.array(markers_corners_2d), numpy.array(k), numpy.array(d), flags=cv2.SOLVEPNP_SQPNP)
- def draw_axes(self, image: numpy.array, K, D, thickness: int = 0, length: float = 0):
- """Draw group axes."""
+ # Refine pose estimation using Gauss-Newton optimisation
+ if success:
+ rvec, tvec = cv2.solvePnPRefineVVS(numpy.array(places_corners_3d), numpy.array(markers_corners_2d), numpy.array(k), numpy.array(d), rvec, tvec)
- try:
- axisPoints = numpy.float32([[length, 0, 0], [0, length, 0], [0, 0, length], [0, 0, 0]]).reshape(-1, 3)
- axisPoints, _ = cv2.projectPoints(axisPoints, self.__rotation, self.__translation, numpy.array(K), numpy.array(D))
- axisPoints = axisPoints.astype(int)
+ self.__translation = tvec.T
+ self.__rotation = rvec.T
- cv2.line(image, tuple(axisPoints[3].ravel()), tuple(axisPoints[0].ravel()), (0, 0, 255), thickness) # X (red)
- cv2.line(image, tuple(axisPoints[3].ravel()), tuple(axisPoints[1].ravel()), (0, 255, 0), thickness) # Y (green)
- cv2.line(image, tuple(axisPoints[3].ravel()), tuple(axisPoints[2].ravel()), (255, 0, 0), thickness) # Z (blue)
+ return success, self.__translation, self.__rotation
- # Ignore errors due to out of field axis: their coordinate are larger than int32 limitations.
- except cv2.error:
- pass
+ def draw_axes(self, image: numpy.array, k: numpy.array, d: numpy.array, thickness: int = 0, length: float = 0):
+ """Draw group axes."""
- def draw_places(self, image: numpy.array, K, D, color: tuple = None, border_size: int = 0):
- """Draw group places."""
+ try:
+ axis_points = numpy.float32([[length, 0, 0], [0, length, 0], [0, 0, length], [0, 0, 0]]).reshape(-1, 3)
+ axis_points, _ = cv2.projectPoints(axis_points, self.__rotation, self.__translation, numpy.array(k), numpy.array(d))
+ axis_points = axis_points.astype(int)
- l = self.marker_size / 2
+ cv2.line(image, tuple(axis_points[3].ravel()), tuple(axis_points[0].ravel()), (0, 0, 255), thickness) # X (red)
+ cv2.line(image, tuple(axis_points[3].ravel()), tuple(axis_points[1].ravel()), (0, 255, 0), thickness) # Y (green)
+ cv2.line(image, tuple(axis_points[3].ravel()), tuple(axis_points[2].ravel()), (255, 0, 0), thickness) # Z (blue)
- for identifier, place in self.__places.items():
+ # Ignore errors due to out of field axis: their coordinate are larger than int32 limitations.
+ except cv2.error:
+ pass
- try:
+ def draw_places(self, image: numpy.array, k: numpy.array, d: numpy.array, color: tuple = None, border_size: int = 0):
+ """Draw group places."""
- placePoints, _ = cv2.projectPoints(place.corners, self.__rotation, self.__translation, numpy.array(K), numpy.array(D))
- placePoints = placePoints.astype(int)
-
- cv2.line(image, tuple(placePoints[0].ravel()), tuple(placePoints[1].ravel()), color, border_size)
- cv2.line(image, tuple(placePoints[1].ravel()), tuple(placePoints[2].ravel()), color, border_size)
- cv2.line(image, tuple(placePoints[2].ravel()), tuple(placePoints[3].ravel()), color, border_size)
- cv2.line(image, tuple(placePoints[3].ravel()), tuple(placePoints[0].ravel()), color, border_size)
+ for identifier, place in self.__places.items():
- # Ignore errors due to out of field places: their coordinate are larger than int32 limitations.
- except cv2.error:
- pass
+ try:
- def draw(self, image: numpy.array, K: numpy.array, D: numpy.array, draw_axes: dict = None, draw_places: dict = None):
- """Draw group axes and places.
-
- Parameters:
- image: where to draw.
- K:
- D:
- draw_axes: draw_axes parameters (if None, no axes drawn)
- draw_places: draw_places parameters (if None, no places drawn)
- """
+ place_points, _ = cv2.projectPoints(place.corners, self.__rotation, self.__translation, numpy.array(k), numpy.array(d))
+ place_points = place_points.astype(int)
- # Draw axes if required
- if draw_axes is not None:
+ cv2.line(image, tuple(place_points[0].ravel()), tuple(place_points[1].ravel()), color, border_size)
+ cv2.line(image, tuple(place_points[1].ravel()), tuple(place_points[2].ravel()), color, border_size)
+ cv2.line(image, tuple(place_points[2].ravel()), tuple(place_points[3].ravel()), color, border_size)
+ cv2.line(image, tuple(place_points[3].ravel()), tuple(place_points[0].ravel()), color, border_size)
- self.draw_axes(image, K, D, **draw_axes)
+ # Ignore errors due to out of field places: their coordinate are larger than int32 limitations.
+ except cv2.error:
+ pass
- # Draw places if required
- if draw_places is not None:
+ def draw(self, image: numpy.array, k: numpy.array, d: numpy.array, draw_axes: dict = None, draw_places: dict = None):
+ """Draw group axes and places.
+
+ Parameters:
+ image: where to draw.
+ k: intrinsic camera parameters
+ d: camera distortion matrix
+ draw_axes: draw_axes parameters (if None, no axes drawn)
+ draw_places: draw_places parameters (if None, no places drawn)
+ """
- self.draw_places(image, K, D, **draw_places)
+ # Draw axes if required
+ if draw_axes is not None:
+ self.draw_axes(image, k, d, **draw_axes)
- def to_obj(self, obj_filepath):
- """Save group to .obj file."""
+ # Draw places if required
+ if draw_places is not None:
+ self.draw_places(image, k, d, **draw_places)
- with open(obj_filepath, 'w', encoding='utf-8') as file:
+ def to_obj(self, obj_filepath):
+ """Save group to .obj file."""
- file.write('# ArGaze OBJ File\n')
- file.write('# http://achil.recherche.enac.fr/features/eye/argaze/\n')
+ with open(obj_filepath, 'w', encoding='utf-8') as file:
- v_count = 0
+ file.write('# ArGaze OBJ File\n')
+ file.write('# https://achil.recherche.enac.fr/features/eye/argaze/\n')
- for p, (identifier, place) in enumerate(self.__places.items()):
+ v_count = 0
- file.write(f'o {self.__dictionary.name}#{identifier}_Marker\n')
+ for p, (identifier, place) in enumerate(self.__places.items()):
- vertices = ''
+ file.write(f'o {self.__dictionary.name}#{identifier}_Marker\n')
- # Write vertices in reverse order
- for v in [3, 2, 1, 0]:
+ vertices = ''
- file.write(f'v {" ".join(map(str, place.corners[v]))}\n')
- v_count += 1
+ # Write vertices in reverse order
+ for v in [3, 2, 1, 0]:
+ file.write(f'v {" ".join(map(str, place.corners[v]))}\n')
+ v_count += 1
- vertices += f' {v_count}'
+ vertices += f' {v_count}'
- #file.write('s off\n')
- file.write(f'f{vertices}\n')
+ # file.write('s off\n')
+ file.write(f'f{vertices}\n')
diff --git a/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py b/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py
index 12cbc54..459a03e 100644
--- a/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py
+++ b/src/argaze/ArUcoMarkers/ArUcoOpticCalibrator.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/ArUcoMarkers/ArUcoScene.py b/src/argaze/ArUcoMarkers/ArUcoScene.py
index b818dff..bb7bdbf 100644
--- a/src/argaze/ArUcoMarkers/ArUcoScene.py
+++ b/src/argaze/ArUcoMarkers/ArUcoScene.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/AreaOfInterest/AOI2DScene.py b/src/argaze/AreaOfInterest/AOI2DScene.py
index 2c8f003..0affe35 100644
--- a/src/argaze/AreaOfInterest/AOI2DScene.py
+++ b/src/argaze/AreaOfInterest/AOI2DScene.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/AreaOfInterest/AOI3DScene.py b/src/argaze/AreaOfInterest/AOI3DScene.py
index 1964d23..955b910 100644
--- a/src/argaze/AreaOfInterest/AOI3DScene.py
+++ b/src/argaze/AreaOfInterest/AOI3DScene.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/AreaOfInterest/AOIFeatures.py b/src/argaze/AreaOfInterest/AOIFeatures.py
index 7da5bb5..fe4af2c 100644
--- a/src/argaze/AreaOfInterest/AOIFeatures.py
+++ b/src/argaze/AreaOfInterest/AOIFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/DataFeatures.py b/src/argaze/DataFeatures.py
index 4e85aaf..fe7a5ac 100644
--- a/src/argaze/DataFeatures.py
+++ b/src/argaze/DataFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -894,12 +894,12 @@ class PipelineStepObject():
self.__name = name
@property
- def parent(self) -> object:
+ def parent(self) -> Self:
"""Get pipeline step object's parent object."""
return self.__parent
@parent.setter
- def parent(self, parent: object):
+ def parent(self, parent: Self):
"""Set layer's parent object."""
self.__parent = parent
diff --git a/src/argaze/GazeAnalysis/Basic.py b/src/argaze/GazeAnalysis/Basic.py
index ec98b30..cfe3eeb 100644
--- a/src/argaze/GazeAnalysis/Basic.py
+++ b/src/argaze/GazeAnalysis/Basic.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py
index 6847f44..5b51bbd 100644
--- a/src/argaze/GazeAnalysis/DeviationCircleCoverage.py
+++ b/src/argaze/GazeAnalysis/DeviationCircleCoverage.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
index a860e47..eb90a2a 100644
--- a/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/DispersionThresholdIdentification.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/Entropy.py b/src/argaze/GazeAnalysis/Entropy.py
index 5bac43e..2f98d2c 100644
--- a/src/argaze/GazeAnalysis/Entropy.py
+++ b/src/argaze/GazeAnalysis/Entropy.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/ExploreExploitRatio.py b/src/argaze/GazeAnalysis/ExploreExploitRatio.py
index 3b2d53b..b907a5c 100644
--- a/src/argaze/GazeAnalysis/ExploreExploitRatio.py
+++ b/src/argaze/GazeAnalysis/ExploreExploitRatio.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/FocusPointInside.py b/src/argaze/GazeAnalysis/FocusPointInside.py
index 5d26650..361ea75 100644
--- a/src/argaze/GazeAnalysis/FocusPointInside.py
+++ b/src/argaze/GazeAnalysis/FocusPointInside.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/KCoefficient.py b/src/argaze/GazeAnalysis/KCoefficient.py
index 9980dfe..9e2f317 100644
--- a/src/argaze/GazeAnalysis/KCoefficient.py
+++ b/src/argaze/GazeAnalysis/KCoefficient.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/LempelZivComplexity.py b/src/argaze/GazeAnalysis/LempelZivComplexity.py
index 810dbba..696d343 100644
--- a/src/argaze/GazeAnalysis/LempelZivComplexity.py
+++ b/src/argaze/GazeAnalysis/LempelZivComplexity.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/LinearRegression.py b/src/argaze/GazeAnalysis/LinearRegression.py
index 5a823a1..df3fab2 100644
--- a/src/argaze/GazeAnalysis/LinearRegression.py
+++ b/src/argaze/GazeAnalysis/LinearRegression.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/NGram.py b/src/argaze/GazeAnalysis/NGram.py
index ca60734..fc7f2e4 100644
--- a/src/argaze/GazeAnalysis/NGram.py
+++ b/src/argaze/GazeAnalysis/NGram.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/NearestNeighborIndex.py b/src/argaze/GazeAnalysis/NearestNeighborIndex.py
index 81bab22..98a95a1 100644
--- a/src/argaze/GazeAnalysis/NearestNeighborIndex.py
+++ b/src/argaze/GazeAnalysis/NearestNeighborIndex.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/TransitionMatrix.py b/src/argaze/GazeAnalysis/TransitionMatrix.py
index 8012f5e..567bd39 100644
--- a/src/argaze/GazeAnalysis/TransitionMatrix.py
+++ b/src/argaze/GazeAnalysis/TransitionMatrix.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
index 78cc170..f5fb069 100644
--- a/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
+++ b/src/argaze/GazeAnalysis/VelocityThresholdIdentification.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/GazeFeatures.py b/src/argaze/GazeFeatures.py
index 5777a8d..43c6cc7 100644
--- a/src/argaze/GazeFeatures.py
+++ b/src/argaze/GazeFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -29,1191 +29,1199 @@ from argaze.AreaOfInterest import AOIFeatures
class GazePosition(tuple, DataFeatures.TimestampedObject):
- """Define gaze position as a tuple of coordinates with precision.
+ """Define gaze position as a tuple of coordinates with precision.
- Parameters:
- precision: the radius of a circle around value where other same gaze position measurements could be.
- message: a string to describe why the position is what it is.
- """
+ Parameters:
+ precision: the radius of a circle around value where other same gaze position measurements could be.
+ message: a string to describe why the position is what it is.
+ """
- def __new__(cls, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan):
+ def __new__(cls, position: tuple = (), precision: int | float = None, message: str = None,
+ timestamp: int | float = math.nan):
- return tuple.__new__(cls, position)
+ return tuple.__new__(cls, position)
- def __init__(self, position: tuple = (), precision: int|float = None, message: str = None, timestamp: int|float = math.nan):
+ def __init__(self, position: tuple = (), precision: int | float = None, message: str = None,
+ timestamp: int | float = math.nan):
- DataFeatures.TimestampedObject.__init__(self, timestamp)
- self.__precision = precision
- self.__message = message
+ DataFeatures.TimestampedObject.__init__(self, timestamp)
+ self.__precision = precision
+ self.__message = message
- @property
- def value(self):
- """Get position's tuple value."""
- return tuple(self)
+ @property
+ def value(self):
+ """Get position's tuple value."""
+ return tuple(self)
- @property
- def precision(self):
- """Get position's precision."""
- return self.__precision
+ @property
+ def precision(self):
+ """Get position's precision."""
+ return self.__precision
- @property
- def message(self):
- """Get position's message."""
- return self.__message
+ @property
+ def message(self):
+ """Get position's message."""
+ return self.__message
- @classmethod
- def from_dict(cls, position_data: dict) -> Self:
+ @classmethod
+ def from_dict(cls, position_data: dict) -> Self:
- if 'value' in position_data.keys():
+ if 'value' in position_data.keys():
- value = position_data.pop('value')
- return GazePosition(value, **position_data)
+ value = position_data.pop('value')
+ return GazePosition(value, **position_data)
- else:
+ else:
- return GazePosition(**position_data)
+ return GazePosition(**position_data)
- def __bool__(self) -> bool:
- """Is the position value valid?"""
- return len(self) > 0
+ def __bool__(self) -> bool:
+ """Is the position value valid?"""
+ return len(self) > 0
- def __repr__(self):
- """String representation"""
+ def __repr__(self):
+ """String representation"""
- return json.dumps(DataFeatures.as_dict(self))
+ return json.dumps(DataFeatures.as_dict(self))
- def __add__(self, position: Self) -> Self:
- """Add position.
+ def __add__(self, position: Self) -> Self:
+ """Add position.
- !!! note
- The returned position precision is the maximal precision.
+ !!! note
+ The returned position precision is the maximal precision.
- !!! note
- The returned position timestamp is the self object timestamp.
- """
- if self.__precision is not None and position.precision is not None:
+ !!! note
+ The returned position timestamp is the self object timestamp.
+ """
+ if self.__precision is not None and position.precision is not None:
- return GazePosition(tuple(numpy.array(self) + numpy.array(position)), precision = max(self.__precision, position.precision), timestamp = self.timestamp)
+ return GazePosition(tuple(numpy.array(self) + numpy.array(position)),
+ precision=max(self.__precision, position.precision), timestamp=self.timestamp)
- else:
+ else:
- return GazePosition(tuple(numpy.array(self) + numpy.array(position)), timestamp = self.timestamp)
+ return GazePosition(tuple(numpy.array(self) + numpy.array(position)), timestamp=self.timestamp)
- __radd__ = __add__
+ __radd__ = __add__
- def __sub__(self, position: Self) -> Self:
- """Subtract position.
+ def __sub__(self, position: Self) -> Self:
+ """Subtract position.
- !!! note
- The returned position precision is the maximal precision.
+ !!! note
+ The returned position precision is the maximal precision.
- !!! note
- The returned position timestamp is the self object timestamp.
- """
- if self.__precision is not None and position.precision is not None:
+ !!! note
+ The returned position timestamp is the self object timestamp.
+ """
+ if self.__precision is not None and position.precision is not None:
- return GazePosition(tuple(numpy.array(self) - numpy.array(position)), precision = max(self.__precision, position.precision), timestamp = self.timestamp)
+ return GazePosition(tuple(numpy.array(self) - numpy.array(position)),
+ precision=max(self.__precision, position.precision), timestamp=self.timestamp)
- else:
+ else:
- return GazePosition(tuple(numpy.array(self) - numpy.array(position)), timestamp = self.timestamp)
+ return GazePosition(tuple(numpy.array(self) - numpy.array(position)), timestamp=self.timestamp)
- def __rsub__(self, position: Self) -> Self:
- """Reversed subtract position.
+ def __rsub__(self, position: Self) -> Self:
+ """Reversed subtract position.
- !!! note
- The returned position precision is the maximal precision.
+ !!! note
+ The returned position precision is the maximal precision.
- !!! note
- The returned position timestamp is the self object timestamp.
- """
- if self.__precision is not None and position.precision is not None:
-
- return GazePosition(tuple(numpy.array(position) - numpy.array(self)), precision = max(self.__precision, position.precision), timestamp = self.timestamp)
+ !!! note
+ The returned position timestamp is the self object timestamp.
+ """
+ if self.__precision is not None and position.precision is not None:
- else:
+ return GazePosition(tuple(numpy.array(position) - numpy.array(self)),
+ precision=max(self.__precision, position.precision), timestamp=self.timestamp)
- return GazePosition(tuple(numpy.array(position) - numpy.array(self)), timestamp = self.timestamp)
+ else:
- def __mul__(self, factor: int|float) -> Self:
- """Multiply position by a factor.
+ return GazePosition(tuple(numpy.array(position) - numpy.array(self)), timestamp=self.timestamp)
- !!! note
- The returned position precision is also multiplied by the factor.
+ def __mul__(self, factor: int | float | tuple) -> Self:
+ """Multiply position by a factor.
- !!! note
- The returned position timestamp is the self object timestamp.
- """
- return GazePosition(tuple(numpy.array(self) * factor), precision = self.__precision * factor if self.__precision is not None else None, timestamp = self.timestamp)
+ !!! note
+ The returned position precision is also multiplied by the factor.
- def __pow__(self, factor: int|float) -> Self:
- """Power position by a factor.
+ !!! note
+ The returned position timestamp is the self object timestamp.
+ """
+ return GazePosition(tuple(numpy.array(self) * factor), precision=self.__precision * factor if self.__precision is not None else None, timestamp=self.timestamp)
- !!! note
- The returned position precision is also powered by the factor.
+ def __pow__(self, factor: int | float) -> Self:
+ """Power position by a factor.
- !!! note
- The returned position timestamp is the self object timestamp.
- """
- return GazePosition(tuple(numpy.array(self) ** factor), precision = self.__precision ** factor if self.__precision is not None else None, timestamp = self.timestamp)
+ !!! note
+ The returned position precision is also powered by the factor.
- def distance(self, gaze_position) -> float:
- """Distance to another gaze positions."""
+ !!! note
+ The returned position timestamp is the self object timestamp.
+ """
+ return GazePosition(tuple(numpy.array(self) ** factor),
+ precision=self.__precision ** factor if self.__precision is not None else None,
+ timestamp=self.timestamp)
- distance = (self[0] - gaze_position[0])**2 + (self[1] - gaze_position[1])**2
- distance = numpy.sqrt(distance)
+ def distance(self, gaze_position) -> float:
+ """Distance to another gaze positions."""
- return distance
+ distance = (self[0] - gaze_position[0]) ** 2 + (self[1] - gaze_position[1]) ** 2
+ distance = numpy.sqrt(distance)
- def overlap(self, gaze_position, both=False) -> float:
- """Does this gaze position overlap another gaze position considering its precision?
- Set both to True to test if the other gaze position overlaps this one too."""
+ return distance
- distance = numpy.sqrt(numpy.sum((self - gaze_position)**2))
+ def overlap(self, gaze_position, both=False) -> float:
+ """Does this gaze position overlap another gaze position considering its precision?
+ Set both to True to test if the other gaze position overlaps this one too."""
- if both:
- return distance < min(self.__precision, gaze_position.precision)
- else:
- return distance < self.__precision
+ distance = numpy.sqrt(numpy.sum((self - gaze_position) ** 2))
- def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True):
- """Draw gaze position point and precision circle."""
+ if both:
+ return distance < min(self.__precision, gaze_position.precision)
+ else:
+ return distance < self.__precision
- if self:
+ def draw(self, image: numpy.array, color: tuple = None, size: int = None, draw_precision=True):
+ """Draw gaze position point and precision circle."""
- int_value = (int(self[0]), int(self[1]))
+ if self:
- # Draw point at position if required
- if color is not None:
- cv2.circle(image, int_value, size, color, -1)
+ int_value = (int(self[0]), int(self[1]))
+
+ # Draw point at position if required
+ if color is not None:
+ cv2.circle(image, int_value, size, color, -1)
+
+ # Draw precision circle
+ if self.__precision is not None and draw_precision:
+ cv2.circle(image, int_value, round(self.__precision), color, 1)
- # Draw precision circle
- if self.__precision is not None and draw_precision:
- cv2.circle(image, int_value, round(self.__precision), color, 1)
class TimeStampedGazePositions(DataFeatures.TimestampedObjectsList):
- """Handle timestamped gaze positions into a list."""
-
- def __init__(self, gaze_positions=None):
+ """Handle timestamped gaze positions into a list."""
- if gaze_positions is None:
- gaze_positions = []
+ def __init__(self, gaze_positions=None):
- DataFeatures.TimestampedObjectsList.__init__(self, GazePosition, gaze_positions)
+ if gaze_positions is None:
+ gaze_positions = []
- def values(self) -> list:
- """Get all timestamped position values as list of tuple."""
- return [tuple(ts_position) for ts_position in self]
+ DataFeatures.TimestampedObjectsList.__init__(self, GazePosition, gaze_positions)
- ''' Is it still needed as there is a TimestampedObjectsList.from_json method?
- @classmethod
- def from_json(self, json_filepath: str) -> TimeStampedGazePositions:
- """Create a TimeStampedGazePositions from .json file."""
+ def values(self) -> list:
+ """Get all timestamped position values as list of tuple."""
+ return [tuple(ts_position) for ts_position in self]
- with open(json_filepath, encoding='utf-8') as ts_positions_file:
+ ''' Is it still needed as there is a TimestampedObjectsList.from_json method?
+ @classmethod
+ def from_json(self, json_filepath: str) -> TimeStampedGazePositions:
+ """Create a TimeStampedGazePositions from .json file."""
- json_positions = json.load(ts_positions_file)
+ with open(json_filepath, encoding='utf-8') as ts_positions_file:
- return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions})
- '''
+ json_positions = json.load(ts_positions_file)
- @classmethod
- def from_dataframe(cls, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None, message: str = None) -> Self:
- """Create a TimeStampedGazePositions from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
+ return TimeStampedGazePositions({ast.literal_eval(ts_str): json_positions[ts_str] for ts_str in json_positions})
+ '''
- Parameters:
- dataframe:
- timestamp: specific timestamp column label.
- x: specific x column label.
- y: specific y column label.
- precision: specific precision column label if exist.
- message: specific message column label if exist.
- """
+ @classmethod
+ def from_dataframe(cls, dataframe: pandas.DataFrame, timestamp: str, x: str, y: str, precision: str = None,
+ message: str = None) -> Self:
+ """Create a TimeStampedGazePositions from [Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
- # Copy columns
- columns = (timestamp, x, y)
+ Parameters:
+ dataframe:
+ timestamp: specific timestamp column label.
+ x: specific x column label.
+ y: specific y column label.
+ precision: specific precision column label if exist.
+ message: specific message column label if exist.
+ """
- if precision is not None:
+ # Copy columns
+ columns = (timestamp, x, y)
- columns += (precision,)
+ if precision is not None:
+ columns += (precision,)
- if message is not None:
+ if message is not None:
+ columns += (message,)
- columns += (message,)
+ df = dataframe.loc[:, columns]
- df = dataframe.loc[:, columns]
+ # Merge x and y columns into one 'value' column
+ df['value'] = tuple(zip(df[x], df[y]))
+ df.drop(columns=[x, y], inplace=True, axis=1)
- # Merge x and y columns into one 'value' column
- df['value'] = tuple(zip(df[x], df[y]))
- df.drop(columns=[x, y], inplace=True, axis=1)
+ # Replace tuple values containing NaN values by ()
+ df['value'] = df.apply(lambda row: () if pandas.isnull(list(row.value)).any() else row.value, axis=True)
- # Replace tuple values containing NaN values by ()
- df['value'] = df.apply(lambda row: () if pandas.isnull(list(row.value)).any() else row.value, axis=True)
+ # Handle precision data
+ if precision:
- # Handle precision data
- if precision:
+ # Rename precision column into 'precision' column
+ df.rename(columns={precision: 'precision'}, inplace=True)
- # Rename precision column into 'precision' column
- df.rename(columns={precision: 'precision'}, inplace=True)
+ else:
- else:
+ # Append a None precision column
+ df['precision'] = df.apply(lambda row: None, axis=True)
- # Append a None precision column
- df['precision'] = df.apply(lambda row: None, axis=True)
+ # Handle message data
+ if message:
- # Handle message data
- if message:
+ # Rename message column into 'message' column
+ df.rename(columns={precision: 'message'}, inplace=True)
- # Rename message column into 'message' column
- df.rename(columns={precision: 'message'}, inplace=True)
+ else:
- else:
+ # Append a None message column
+ df['message'] = df.apply(lambda row: None, axis=True)
- # Append a None message column
- df['message'] = df.apply(lambda row: None, axis=True)
+ # Rename timestamp column into 'timestamp' column
+ df.rename(columns={timestamp: 'timestamp'}, inplace=True)
- # Rename timestamp column into 'timestamp' column
- df.rename(columns={timestamp: 'timestamp'}, inplace=True)
-
- # Filter duplicate timestamps
- df = df[df.timestamp.duplicated() == False]
+ # Filter duplicate timestamps
+ df = df[df.timestamp.duplicated() == False]
+
+ # Create timestamped gaze positions
+ return TimeStampedGazePositions(df.apply(
+ lambda row: GazePosition(row.value, precision=row.precision, message=row.message, timestamp=row.timestamp),
+ axis=True))
- # Create timestamped gaze positions
- return TimeStampedGazePositions(df.apply(lambda row: GazePosition(row.value, precision=row.precision, message=row.message, timestamp=row.timestamp), axis=True))
class GazePositionCalibrationFailed(Exception):
- """Exception raised by GazePositionCalibrator."""
+ """Exception raised by GazePositionCalibrator."""
- def __init__(self, message):
+ def __init__(self, message):
+ super().__init__(message)
- super().__init__(message)
class GazePositionCalibrator(DataFeatures.PipelineStepObject):
- """Abstract class to define what should provide a gaze position calibrator algorithm."""
+ """Abstract class to define what should provide a gaze position calibrator algorithm."""
- # noinspection PyMissingConstructor
- @DataFeatures.PipelineStepInit
- def __init__(self, **kwargs):
+ # noinspection PyMissingConstructor
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+ pass
- pass
+ def store(self, observed_gaze_position: GazePosition, expected_gaze_position: GazePosition):
+ """Store observed and expected gaze positions.
- def store(self, observed_gaze_position: GazePosition, expected_gaze_position: GazePosition):
- """Store observed and expected gaze positions.
+ Parameters:
+ observed_gaze_position: where gaze position actually is
+ expected_gaze_position: where gaze position should be
+ """
- Parameters:
- observed_gaze_position: where gaze position actually is
- expected_gaze_position: where gaze position should be
- """
+ raise NotImplementedError('calibrate() method not implemented')
- raise NotImplementedError('calibrate() method not implemented')
+ def reset(self):
+ """Reset observed and expected gaze positions."""
- def reset(self):
- """Reset observed and expected gaze positions."""
+ raise NotImplementedError('reset() method not implemented')
- raise NotImplementedError('reset() method not implemented')
+ def calibrate(self) -> any:
+ """Process calibration from observed and expected gaze positions.
- def calibrate(self) -> any:
- """Process calibration from observed and expected gaze positions.
+ Returns:
+ calibration outputs: any data returned to assess calibration
+ """
- Returns:
- calibration outputs: any data returned to assess calibration
- """
+ raise NotImplementedError('terminate() method not implemented')
- raise NotImplementedError('terminate() method not implemented')
+ def apply(self, observed_gaze_position: GazePosition) -> GazePosition:
+ """Apply calibration onto observed gaze position.
- def apply(self, observed_gaze_position: GazePosition) -> GazePosition:
- """Apply calibration onto observed gaze position.
+ Parameters:
+ observed_gaze_position: where gaze position actually is
- Parameters:
- observed_gaze_position: where gaze position actually is
+ Returns:
+ expected_gaze_position: where gaze position should be if the calibrator is ready else, observed gaze position
+ """
- Returns:
- expected_gaze_position: where gaze position should be if the calibrator is ready else, observed gaze position
- """
+ raise NotImplementedError('apply() method not implemented')
- raise NotImplementedError('apply() method not implemented')
+ def draw(self, image: numpy.array, **kwargs):
+ """Draw calibration into image.
+
+ Parameters:
+ image: where to draw
+ """
- def draw(self, image: numpy.array, **kwargs):
- """Draw calibration into image.
-
- Parameters:
- image: where to draw
- """
+ raise NotImplementedError('draw() method not implemented')
- raise NotImplementedError('draw() method not implemented')
+ def is_calibrating(self) -> bool:
+ """Is the calibration running?"""
- def is_calibrating(self) -> bool:
- """Is the calibration running?"""
+ raise NotImplementedError('ready getter not implemented')
- raise NotImplementedError('ready getter not implemented')
class GazeMovement(TimeStampedGazePositions, DataFeatures.TimestampedObject):
- """Define abstract gaze movement class as timestamped gaze positions list.
-
- !!! note
- Gaze movement timestamp is always equal to its first position timestamp.
+ """Define abstract gaze movement class as timestamped gaze positions list.
- Parameters:
- positions: timestamp gaze positions.
- finished: is the movement finished?
- message: a string to describe why the movement is what it is.
- """
+ !!! note
+ Gaze movement timestamp is always equal to its first position timestamp.
- def __new__(cls, positions: TimeStampedGazePositions = None, finished: bool = False,
- message: str = None, timestamp: int|float = math.nan):
+ Parameters:
+ positions: timestamp gaze positions.
+ finished: is the movement finished?
+ message: a string to describe why the movement is what it is.
+ """
- # noinspection PyArgumentList
- return TimeStampedGazePositions.__new__(cls, positions)
+ def __new__(cls, positions: TimeStampedGazePositions = None, finished: bool = False,
+ message: str = None, timestamp: int | float = math.nan):
- def __init__(self, positions: TimeStampedGazePositions = None, finished: bool = False,
- message: str = None, timestamp: int|float = math.nan):
- """Initialize GazeMovement"""
+ # noinspection PyArgumentList
+ return TimeStampedGazePositions.__new__(cls, positions)
- TimeStampedGazePositions.__init__(self, positions)
- DataFeatures.TimestampedObject.__init__(self, timestamp)
+ def __init__(self, positions: TimeStampedGazePositions = None, finished: bool = False,
+ message: str = None, timestamp: int | float = math.nan):
+ """Initialize GazeMovement"""
- self.__finished = finished
- self.__message = message
+ TimeStampedGazePositions.__init__(self, positions)
+ DataFeatures.TimestampedObject.__init__(self, timestamp)
- @property
- def timestamp(self) -> int|float:
- """Get first position timestamp."""
- if self:
- return self[0].timestamp
+ self.__finished = finished
+ self.__message = message
- def is_timestamped(self) -> bool:
- """If first position exist, the movement is timestamped."""
- return bool(self)
+ @property
+ def timestamp(self) -> int | float:
+ """Get first position timestamp."""
+ if self:
+ return self[0].timestamp
- @timestamp.setter
- def timestamp(self, timestamp: int|float):
- """Block gaze movement timestamp setting."""
- raise('GazeMovement timestamp is first position timestamp.')
+ def is_timestamped(self) -> bool:
+ """If first position exist, the movement is timestamped."""
+ return bool(self)
- def is_finished(self) -> bool:
- """Is the movement finished?"""
- return self.__finished
+ @timestamp.setter
+ def timestamp(self, timestamp: int | float):
+ """Block gaze movement timestamp setting."""
+ raise ('GazeMovement timestamp is first position timestamp.')
- def finish(self) -> Self:
- """Set gaze movement as finished"""
- self.__finished = True
- return self
+ def is_finished(self) -> bool:
+ """Is the movement finished?"""
+ return self.__finished
- @property
- def message(self):
- """Get movement's message."""
- return self.__message
+ def finish(self) -> Self:
+ """Set gaze movement as finished"""
+ self.__finished = True
+ return self
- @property
- def amplitude(self):
- """Get inferred amplitude from first and last positions."""
- if self:
+ @property
+ def message(self):
+ """Get movement's message."""
+ return self.__message
- return numpy.linalg.norm(self[0] - self[-1])
+ @property
+ def amplitude(self):
+ """Get inferred amplitude from first and last positions."""
+ if self:
- else:
+ return numpy.linalg.norm(self[0] - self[-1])
- return 0
+ else:
- def __str__(self) -> str:
- """String display"""
+ return 0
- if self:
+ def __str__(self) -> str:
+ """String display"""
- output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.is_finished()}'
+ if self:
- for position in self:
+ output = f'{type(self)}:\n\tduration={self.duration}\n\tsize={len(self)}\n\tfinished={self.is_finished()}'
- output += f'\n\t{position.timestamp}:\n\t\tvalue={position},\n\t\tprecision={position.precision}'
+ for position in self:
+ output += f'\n\t{position.timestamp}:\n\t\tvalue={position},\n\t\tprecision={position.precision}'
- else:
+ else:
- output = f'{type(self)}'
+ output = f'{type(self)}'
- return output
+ return output
- def draw_positions(self, image: numpy.array, position_color: tuple = None, line_color: tuple = None):
- """Draw gaze movement positions with line between each position.
-
- Parameters:
- image: where to draw
- position_color: color of position point
- line_color: color of line between each position
- """
+ def draw_positions(self, image: numpy.array, position_color: tuple = None, line_color: tuple = None):
+ """Draw gaze movement positions with line between each position.
+
+ Parameters:
+ image: where to draw
+ position_color: color of position point
+ line_color: color of line between each position
+ """
- positions = self.copy()
+ positions = self.copy()
- while len(positions) >= 2:
+ while len(positions) >= 2:
- start_gaze_position = positions.pop(0)
- next_gaze_position = positions[0]
+ start_gaze_position = positions.pop(0)
+ next_gaze_position = positions[0]
- # Draw line between positions if required
- if line_color is not None:
+ # Draw line between positions if required
+ if line_color is not None:
+ cv2.line(image, (int(start_gaze_position[0]), int(start_gaze_position[1])),
+ (int(next_gaze_position[0]), int(next_gaze_position[1])), line_color, 1)
- cv2.line(image, (int(start_gaze_position[0]), int(start_gaze_position[1])), (int(next_gaze_position[0]), int(next_gaze_position[1])), line_color, 1)
+ # Draw position if required
+ if position_color is not None:
+ start_gaze_position.draw(image, position_color, draw_precision=False)
- # Draw position if required
- if position_color is not None:
+ def draw(self, image: numpy.array, **kwargs):
+ """Draw gaze movement into image."""
- start_gaze_position.draw(image, position_color, draw_precision=False)
+ raise NotImplementedError('draw() method not implemented')
- def draw(self, image: numpy.array, **kwargs):
- """Draw gaze movement into image."""
-
- raise NotImplementedError('draw() method not implemented')
class Fixation(GazeMovement):
- """Define abstract fixation as gaze movement."""
+ """Define abstract fixation as gaze movement."""
- def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
+ def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False,
+ message: str = None, **kwargs):
+ super().__init__(positions, finished, message, **kwargs)
- super().__init__(positions, finished, message, **kwargs)
+ self._focus = ()
- self._focus = ()
+ @property
+ def focus(self) -> tuple:
+ """Get representative position of the fixation."""
+ return self._focus
- @property
- def focus(self) -> tuple:
- """Get representative position of the fixation."""
- return self._focus
+ @focus.setter
+ def focus(self, focus: tuple):
+ """Set representative position of the fixation."""
+ self._focus = focus
- @focus.setter
- def focus(self, focus: tuple):
- """Set representative position of the fixation."""
- self._focus = focus
+ def merge(self, fixation) -> Self:
+ """Merge another fixation into this fixation."""
- def merge(self, fixation) -> Self:
- """Merge another fixation into this fixation."""
+ raise NotImplementedError('merge() method not implemented')
- raise NotImplementedError('merge() method not implemented')
def is_fixation(gaze_movement):
- """Is a gaze movement a fixation?"""
+ """Is a gaze movement a fixation?"""
+
+ return type(gaze_movement).__bases__[0] == Fixation or type(gaze_movement) == Fixation
- return type(gaze_movement).__bases__[0] == Fixation or type(gaze_movement) == Fixation
class Saccade(GazeMovement):
- """Define abstract saccade as gaze movement."""
+ """Define abstract saccade as gaze movement."""
- def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False, message: str = None, **kwargs):
+ def __init__(self, positions: TimeStampedGazePositions = TimeStampedGazePositions(), finished: bool = False,
+ message: str = None, **kwargs):
+ super().__init__(positions, finished, message, **kwargs)
- super().__init__(positions, finished, message, **kwargs)
def is_saccade(gaze_movement):
- """Is a gaze movement a saccade?"""
+ """Is a gaze movement a saccade?"""
+
+ return type(gaze_movement).__bases__[0] == Saccade or type(gaze_movement) == Saccade
- return type(gaze_movement).__bases__[0] == Saccade or type(gaze_movement) == Saccade
class TimeStampedGazeMovements(DataFeatures.TimestampedObjectsList):
- """Handle timestamped gaze movements into a list"""
+ """Handle timestamped gaze movements into a list"""
- def __init__(self, gaze_movements: list = []):
+ def __init__(self, gaze_movements: list = []):
+ DataFeatures.TimestampedObjectsList.__init__(self, GazeMovement, gaze_movements)
- DataFeatures.TimestampedObjectsList.__init__(self, GazeMovement, gaze_movements)
class GazeStatus(list, DataFeatures.TimestampedObject):
- """Define gaze status as a list of 1 or 2 (index, GazeMovement) tuples.
+ """Define gaze status as a list of 1 or 2 (index, GazeMovement) tuples.
- Parameters:
- position: the position that the status represents.
- """
+ Parameters:
+ position: the position that the status represents.
+ """
- def __init__(self, position: GazePosition):
+ def __init__(self, position: GazePosition):
+ DataFeatures.TimestampedObject.__init__(self, timestamp=position.timestamp)
- DataFeatures.TimestampedObject.__init__(self, timestamp=position.timestamp)
+ self.__position = position
- self.__position = position
+ @property
+ def position(self) -> GazePosition:
+ """Get gaze status position."""
+ return self.__position
- @property
- def position(self) -> GazePosition:
- """Get gaze status position."""
- return self.__position
+ def append(self, movement_index: int, movement_type: type):
+ """Append movement index and type."""
- def append(self, movement_index: int, movement_type:type):
- """Append movement index and type."""
+ super().append((movement_index, movement_type))
- super().append((movement_index, movement_type))
class TimeStampedGazeStatus(DataFeatures.TimestampedObjectsList):
- """Handle timestamped gaze status into a list."""
+ """Handle timestamped gaze status into a list."""
- def __init__(self):
+ def __init__(self):
+ super().__init__(GazeStatus)
- super().__init__(GazeStatus)
class GazeMovementIdentifier(DataFeatures.PipelineStepObject):
- """Abstract class to define what should provide a gaze movement identifier."""
+ """Abstract class to define what should provide a gaze movement identifier."""
- # noinspection PyMissingConstructor
- @DataFeatures.PipelineStepInit
- def __init__(self, **kwargs):
+ # noinspection PyMissingConstructor
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
- pass
+ pass
- @DataFeatures.PipelineStepMethod
- def identify(self, timestamped_gaze_position: GazePosition, terminate:bool=False) -> GazeMovement:
- """Identify gaze movement from successive timestamped gaze positions.
+ @DataFeatures.PipelineStepMethod
+ def identify(self, timestamped_gaze_position: GazePosition, terminate: bool = False) -> GazeMovement:
+ """Identify gaze movement from successive timestamped gaze positions.
- !!! warning "Mandatory"
- Each identified gaze movement have to share its first/last gaze position with previous/next gaze movement.
+ !!! warning "Mandatory"
+ Each identified gaze movement have to share its first/last gaze position with previous/next gaze movement.
- Parameters:
- timestamped_gaze_position: new gaze position from where identification have to be done considering former gaze positions.
- terminate: allows to notify identification algorithm that given gaze position will be the last one.
-
- Returns:
- gaze_movement: identified gaze movement once it is finished otherwise it returns empty gaze movement at least.
- """
+ Parameters:
+ timestamped_gaze_position: new gaze position from where identification have to be done considering former gaze positions.
+ terminate: allows to notify identification algorithm that given gaze position will be the last one.
+
+ Returns:
+ gaze_movement: identified gaze movement once it is finished otherwise it returns empty gaze movement at least.
+ """
- raise NotImplementedError('identify() method not implemented')
+ raise NotImplementedError('identify() method not implemented')
- def current_gaze_movement(self) -> GazeMovement:
- """Get the current identified gaze movement (finished or in progress) if it exists otherwise, an empty gaze movement."""
+ def current_gaze_movement(self) -> GazeMovement:
+ """Get the current identified gaze movement (finished or in progress) if it exists otherwise, an empty gaze movement."""
- raise NotImplementedError('current_gaze_movement getter not implemented')
+ raise NotImplementedError('current_gaze_movement getter not implemented')
- def current_fixation(self) -> Fixation:
- """Get the current identified fixation (finished or in progress) if it exists otherwise, an empty gaze movement."""
+ def current_fixation(self) -> Fixation:
+ """Get the current identified fixation (finished or in progress) if it exists otherwise, an empty gaze movement."""
- raise NotImplementedError('current_fixation getter not implemented')
+ raise NotImplementedError('current_fixation getter not implemented')
- def current_saccade(self) -> Saccade:
- """Get the current identified saccade (finished or in progress) if it exists otherwise, an empty gaze movement."""
+ def current_saccade(self) -> Saccade:
+ """Get the current identified saccade (finished or in progress) if it exists otherwise, an empty gaze movement."""
- raise NotImplementedError('current_saccade getter not implemented')
+ raise NotImplementedError('current_saccade getter not implemented')
- def browse(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[TimeStampedGazeMovements, TimeStampedGazeMovements, TimeStampedGazeStatus]:
- """Identify fixations and saccades browsing timestamped gaze positions.
+ def browse(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[
+ TimeStampedGazeMovements, TimeStampedGazeMovements, TimeStampedGazeStatus]:
+ """Identify fixations and saccades browsing timestamped gaze positions.
- Returns:
- timestamped_fixations: all fixations stored by timestamped.
- timestamped_saccades: all saccades stored by timestamped.
- timestamped_gaze_status: all gaze status stored by timestamped.
- """
+ Returns:
+ timestamped_fixations: all fixations stored by timestamped.
+ timestamped_saccades: all saccades stored by timestamped.
+ timestamped_gaze_status: all gaze status stored by timestamped.
+ """
- assert(type(ts_gaze_positions) == TimeStampedGazePositions)
+ assert (type(ts_gaze_positions) == TimeStampedGazePositions)
- ts_fixations = TimeStampedGazeMovements()
- ts_saccades = TimeStampedGazeMovements()
- ts_status = TimeStampedGazeStatus()
+ ts_fixations = TimeStampedGazeMovements()
+ ts_saccades = TimeStampedGazeMovements()
+ ts_status = TimeStampedGazeStatus()
- # Get last ts to terminate identification on last gaze position
- last_ts = ts_gaze_positions[-1].timestamp
+ # Get last ts to terminate identification on last gaze position
+ last_ts = ts_gaze_positions[-1].timestamp
- # Iterate on gaze positions
- for gaze_position in ts_gaze_positions:
+ # Iterate on gaze positions
+ for gaze_position in ts_gaze_positions:
- gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts))
+ gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts))
- if gaze_movement:
+ if gaze_movement:
- # First gaze movement position is always shared with previous gaze movement
- for movement_position in gaze_movement:
+ # First gaze movement position is always shared with previous gaze movement
+ for movement_position in gaze_movement:
- # Is a status already exist for this position?
- gaze_status = ts_status.look_for(movement_position.timestamp)
+ # Is a status already exist for this position?
+ gaze_status = ts_status.look_for(movement_position.timestamp)
- if not gaze_status:
-
- gaze_status = GazeStatus(movement_position)
- ts_status.append(gaze_status)
+ if not gaze_status:
+ gaze_status = GazeStatus(movement_position)
+ ts_status.append(gaze_status)
- gaze_status.append(len(ts_fixations), type(gaze_movement))
+ gaze_status.append(len(ts_fixations), type(gaze_movement))
- # Store gaze movement into the appropriate list
- if is_fixation(gaze_movement):
+ # Store gaze movement into the appropriate list
+ if is_fixation(gaze_movement):
- ts_fixations.append(gaze_movement)
+ ts_fixations.append(gaze_movement)
- elif is_saccade(gaze_movement):
+ elif is_saccade(gaze_movement):
- ts_saccades.append(gaze_movement)
+ ts_saccades.append(gaze_movement)
- return ts_fixations, ts_saccades, ts_status
+ return ts_fixations, ts_saccades, ts_status
- def __call__(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[int|float, GazeMovement]:
- """GazeMovement generator.
+ def __call__(self, ts_gaze_positions: TimeStampedGazePositions) -> tuple[int | float, GazeMovement]:
+ """GazeMovement generator.
- Parameters:
- ts_gaze_positions: timestamped gaze positions to process.
+ Parameters:
+ ts_gaze_positions: timestamped gaze positions to process.
- Returns:
- timestamp: first gaze position date of identified gaze movement
- gaze_movement: identified gaze movement once it is finished
- """
+ Returns:
+ timestamp: first gaze position date of identified gaze movement
+ gaze_movement: identified gaze movement once it is finished
+ """
- assert(type(ts_gaze_positions) == TimeStampedGazePositions)
+ assert (type(ts_gaze_positions) == TimeStampedGazePositions)
- # Get last ts to terminate identification on last gaze position
- last_ts = ts_gaze_positions[-1]
+ # Get last ts to terminate identification on last gaze position
+ last_ts = ts_gaze_positions[-1]
- # Iterate on gaze positions
- for gaze_position in ts_gaze_positions:
+ # Iterate on gaze positions
+ for gaze_position in ts_gaze_positions:
- gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts))
+ gaze_movement = self.identify(gaze_position, terminate=(gaze_position.timestamp == last_ts))
- if gaze_movement:
+ if gaze_movement:
+ yield gaze_movement
- yield gaze_movement
class ScanStepError(Exception):
- """Exception raised at ScanStep creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade."""
+ """Exception raised at ScanStep creation if a aoi scan step doesn't start by a fixation or doesn't end by a saccade."""
- def __init__(self, message):
+ def __init__(self, message):
+ super().__init__(message)
- super().__init__(message)
class ScanStep():
- """Define a scan step as a fixation and a consecutive saccade.
+ """Define a scan step as a fixation and a consecutive saccade.
- Parameters:
- first_fixation: a fixation that comes before the next saccade.
- last_saccade: a saccade that comes after the previous fixation.
-
- !!! warning
- Scan step have to start by a fixation and then end by a saccade.
- """
+ Parameters:
+ first_fixation: a fixation that comes before the next saccade.
+ last_saccade: a saccade that comes after the previous fixation.
+
+ !!! warning
+ Scan step have to start by a fixation and then end by a saccade.
+ """
- def __init__(self, first_fixation: Fixation, last_saccade: Saccade):
+ def __init__(self, first_fixation: Fixation, last_saccade: Saccade):
- self.__first_fixation = first_fixation
- self.__last_saccade = last_saccade
+ self.__first_fixation = first_fixation
+ self.__last_saccade = last_saccade
- # First movement have to be a fixation
- if not is_fixation(self.__first_fixation):
+ # First movement have to be a fixation
+ if not is_fixation(self.__first_fixation):
+ raise ScanStepError('First step movement is not a fixation')
- raise ScanStepError('First step movement is not a fixation')
+ # Last movement have to be a saccade
+ if not is_saccade(self.__last_saccade):
+ raise ScanStepError('Last step movement is not a saccade')
- # Last movement have to be a saccade
- if not is_saccade(self.__last_saccade):
-
- raise ScanStepError('Last step movement is not a saccade')
+ @property
+ def first_fixation(self):
+ """Get scan step first fixation."""
+ return self.__first_fixation
- @property
- def first_fixation(self):
- """Get scan step first fixation."""
- return self.__first_fixation
+ @property
+ def last_saccade(self):
+ """Get scan step last saccade."""
+ return self.__last_saccade
- @property
- def last_saccade(self):
- """Get scan step last saccade."""
- return self.__last_saccade
-
- @property
- def fixation_duration(self) -> int|float:
- """Time spent on AOI
+ @property
+ def fixation_duration(self) -> int | float:
+ """Time spent on AOI
- Returns:
- fixation duration
- """
+ Returns:
+ fixation duration
+ """
- return self.__first_fixation.duration
+ return self.__first_fixation.duration
- @property
- def duration(self) -> int|float:
- """Time spent on AOI and time spent to go to next AOI
+ @property
+ def duration(self) -> int | float:
+ """Time spent on AOI and time spent to go to next AOI
- Returns:
- duration
- """
+ Returns:
+ duration
+ """
- return self.__first_fixation.duration + self.__last_saccade.duration
+ return self.__first_fixation.duration + self.__last_saccade.duration
-class ScanPath(list):
- """List of scan steps."""
- def __init__(self, duration_max: int|float = 0):
-
- super().__init__()
+class ScanPath(list):
+ """List of scan steps."""
- self.__duration_max = duration_max
- self.__last_fixation = None
- self.__duration = 0
+ def __init__(self, duration_max: int | float = 0):
- @property
- def duration_max(self) -> float:
- """Duration from which older scan steps are removed each time new scan steps are added. 0 means no maximal duration."""
- return self.__duration_max
+ super().__init__()
- @duration_max.setter
- def duration_max(self, duration_max: float):
+ self.__duration_max = duration_max
+ self.__last_fixation = None
+ self.__duration = 0
- self.__duration_max = duration_max
+ @property
+ def duration_max(self) -> float:
+ """Duration from which older scan steps are removed each time new scan steps are added. 0 means no maximal duration."""
+ return self.__duration_max
- @property
- def duration(self) -> int|float:
- """Sum of all scan steps duration
+ @duration_max.setter
+ def duration_max(self, duration_max: float):
- Returns:
- duration
- """
+ self.__duration_max = duration_max
- return self.__duration
+ @property
+ def duration(self) -> int | float:
+ """Sum of all scan steps duration
- def __check_duration(self):
- """Constrain path duration to maximal duration."""
+ Returns:
+ duration
+ """
- if self.__duration_max > 0:
+ return self.__duration
- while self.__duration > self.__duration_max:
+ def __check_duration(self):
+ """Constrain path duration to maximal duration."""
- oldest_step = self.pop(0)
+ if self.__duration_max > 0:
- self.__duration -= oldest_step.duration
+ while self.__duration > self.__duration_max:
+ oldest_step = self.pop(0)
- def append_saccade(self, saccade) -> ScanStep:
- """Append new saccade to scan path and return last new scan step if one have been created."""
+ self.__duration -= oldest_step.duration
- # Ignore saccade if no fixation came before
- if self.__last_fixation != None:
+ def append_saccade(self, saccade) -> ScanStep:
+ """Append new saccade to scan path and return last new scan step if one have been created."""
- try:
+ # Ignore saccade if no fixation came before
+ if self.__last_fixation != None:
- # Edit new step
- new_step = ScanStep(self.__last_fixation, saccade)
+ try:
- # Append new step
- super().append(new_step)
+ # Edit new step
+ new_step = ScanStep(self.__last_fixation, saccade)
- # Update duration
- self.__duration += new_step.duration
+ # Append new step
+ super().append(new_step)
- # Constrain path duration to maximal duration
- self.__check_duration()
+ # Update duration
+ self.__duration += new_step.duration
- # Return new step
- return new_step
+ # Constrain path duration to maximal duration
+ self.__check_duration()
- finally:
+ # Return new step
+ return new_step
- # Clear last fixation
- self.__last_fixation = None
+ finally:
- def append_fixation(self, fixation):
- """Append new fixation to scan path.
- !!! warning
- Consecutive fixations are ignored keeping the last fixation"""
+ # Clear last fixation
+ self.__last_fixation = None
- self.__last_fixation = fixation
+ def append_fixation(self, fixation):
+ """Append new fixation to scan path.
+ !!! warning
+ Consecutive fixations are ignored keeping the last fixation"""
- def draw(self, image: numpy.array, draw_fixations: dict = None, draw_saccades: dict = None, deepness: int = 0):
- """Draw scan path into image.
+ self.__last_fixation = fixation
- Parameters:
- image: where to draw
- draw_fixations: Fixation.draw parameters (which depends on the loaded gaze movement identifier module,
- if None, no fixation is drawn)
- draw_saccades: Saccade.draw parameters (which depends on the loaded gaze movement identifier module,
- if None, no saccade is drawn)
- deepness: number of steps back to draw
- """
+ def draw(self, image: numpy.array, draw_fixations: dict = None, draw_saccades: dict = None, deepness: int = 0):
+ """Draw scan path into image.
- for step in self[-deepness:]:
+ Parameters:
+ image: where to draw
+ draw_fixations: Fixation.draw parameters (which depends on the loaded gaze movement identifier module,
+ if None, no fixation is drawn)
+ draw_saccades: Saccade.draw parameters (which depends on the loaded gaze movement identifier module,
+ if None, no saccade is drawn)
+ deepness: number of steps back to draw
+ """
- # Draw fixation if required
- if draw_fixations is not None:
+ for step in self[-deepness:]:
- step.first_fixation.draw(image, **draw_fixations)
+ # Draw fixation if required
+ if draw_fixations is not None:
+ step.first_fixation.draw(image, **draw_fixations)
- # Draw saccade if required
- if draw_saccades is not None:
+ # Draw saccade if required
+ if draw_saccades is not None:
+ step.last_saccade.draw(image, **draw_saccades)
- step.last_saccade.draw(image, **draw_saccades)
class ScanPathAnalyzer(DataFeatures.PipelineStepObject):
- """Abstract class to define what should provide a scan path analyzer."""
+ """Abstract class to define what should provide a scan path analyzer."""
- # noinspection PyMissingConstructor
- @DataFeatures.PipelineStepInit
- def __init__(self, **kwargs):
+ # noinspection PyMissingConstructor
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+ self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if
+ isinstance(value, property) and value.fset is None]
- self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if isinstance(value, property) and value.fset is None]
+ def analysis(self) -> DataFeatures.DataDictionary:
+ """Get all scan path analyzer analysis as data dictionary."""
- def analysis(self) -> DataFeatures.DataDictionary:
- """Get all scan path analyzer analysis as data dictionary."""
+ return DataFeatures.DataDictionary({a: getattr(self, a) for a in self.__analysis})
- return DataFeatures.DataDictionary( {a: getattr(self, a) for a in self.__analysis} )
+ @DataFeatures.PipelineStepMethod
+ def analyze(self, scan_path: ScanPath):
+ """Analyze scan path."""
- @DataFeatures.PipelineStepMethod
- def analyze(self, scan_path: ScanPath):
- """Analyze scan path."""
+ raise NotImplementedError('analyze() method not implemented')
- raise NotImplementedError('analyze() method not implemented')
class AOIMatcher(DataFeatures.PipelineStepObject):
- """Abstract class to define what should provide an AOI matcher algorithm."""
+ """Abstract class to define what should provide an AOI matcher algorithm."""
- # noinspection PyMissingConstructor
- @DataFeatures.PipelineStepInit
- def __init__(self, **kwargs):
+ # noinspection PyMissingConstructor
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+ self.__exclude = []
- self.__exclude = []
+ @property
+ def exclude(self):
+ """List of AOI to exclude from matching."""
+ return self.__exclude
- @property
- def exclude(self):
- """List of AOI to exclude from matching."""
- return self.__exclude
+ @exclude.setter
+ def exclude(self, exclude: list[str]):
+ self.__exclude = exclude
- @exclude.setter
- def exclude(self, exclude: list[str]):
+ def match(self, gaze_movement: GazeMovement, aoi_scene: AOIFeatures.AOIScene) -> tuple[
+ str, AOIFeatures.AreaOfInterest]:
+ """Which AOI is looked in the scene?"""
- self.__exclude = exclude
-
- def match(self, gaze_movement: GazeMovement, aoi_scene: AOIFeatures.AOIScene) -> tuple[str, AOIFeatures.AreaOfInterest]:
- """Which AOI is looked in the scene?"""
+ raise NotImplementedError('match() method not implemented')
- raise NotImplementedError('match() method not implemented')
+ def draw(self, image: numpy.array, aoi_scene: AOIFeatures.AOIScene):
+ """Draw matching into image.
+
+ Parameters:
+ image: where to draw
+ aoi_scene: to refresh looked aoi if required
+ """
- def draw(self, image: numpy.array, aoi_scene: AOIFeatures.AOIScene):
- """Draw matching into image.
-
- Parameters:
- image: where to draw
- aoi_scene: to refresh looked aoi if required
- """
+ raise NotImplementedError('draw() method not implemented')
- raise NotImplementedError('draw() method not implemented')
+ def looked_aoi(self) -> AOIFeatures.AreaOfInterest:
+ """Get most likely looked aoi."""
- def looked_aoi(self) -> AOIFeatures.AreaOfInterest:
- """Get most likely looked aoi."""
+ raise NotImplementedError('looked_aoi() method not implemented')
- raise NotImplementedError('looked_aoi() method not implemented')
+ def looked_aoi_name(self) -> str:
+ """Get most likely looked aoi name."""
+ raise NotImplementedError('looked_aoi_name() method not implemented')
- def looked_aoi_name(self) -> str:
- """Get most likely looked aoi name."""
- raise NotImplementedError('looked_aoi_name() method not implemented')
class AOIScanStepError(Exception):
- """
- Exception raised at AOIScanStep creation if an aoi scan step doesn't start by a fixation or
- doesn't end by a saccade.
- """
+ """
+ Exception raised at AOIScanStep creation if an aoi scan step doesn't start by a fixation or
+ doesn't end by a saccade.
+ """
- def __init__(self, message, aoi=''):
+ def __init__(self, message, aoi=''):
+ super().__init__(message)
- super().__init__(message)
+ self.aoi = aoi
- self.aoi = aoi
class AOIScanStep():
- """Define an aoi scan step as a set of successive gaze movements onto a same AOI.
-
- Parameters:
- movements: all movements over an AOI and the last saccade that comes out.
- aoi: AOI name
- letter: AOI unique letter to ease sequence analysis.
-
- !!! warning
- Aoi scan step have to start by a fixation and then end by a saccade.
- """
-
- def __init__(self, movements: TimeStampedGazeMovements, aoi: str = '', letter: str = ''):
-
- self.__movements = movements
- self.__aoi = aoi
- self.__letter = letter
-
- # First movement have to be a fixation
- if not is_fixation(self.first_fixation):
-
- raise AOIScanStepError('First step movement is not a fixation', self.aoi)
-
- # Last movement have to be a saccade
- if not is_saccade(self.last_saccade):
-
- raise AOIScanStepError('Last step movement is not a saccade', self.aoi)
-
- @property
- def movements(self):
- """Get AOI scan step movements."""
- return self.__movements
-
- @property
- def aoi(self):
- """Get AOI scan step aoi."""
- return self.__aoi
-
- @property
- def letter(self):
- """Get AOI scan step letter."""
- return self.__letter
-
- @property
- def first_fixation(self):
- """First fixation on AOI."""
- return self.movements[0]
-
- @property
- def last_saccade(self):
- """Last saccade that comes out AOI."""
- return self.movements[-1]
-
- @property
- def fixation_duration(self) -> int|float:
- """Time spent on AOI
-
- Returns:
- fixation duration
- """
- return self.last_saccade[0].timestamp - self.first_fixation[0].timestamp
-
- @property
- def duration(self) -> int|float:
- """Time spent on AOI and time spent to go to next AOI
-
- Returns:
- duration
- """
- return self.last_saccade[-1].timestamp - self.first_fixation[0].timestamp
+ """Define an aoi scan step as a set of successive gaze movements onto a same AOI.
+
+ Parameters:
+ movements: all movements over an AOI and the last saccade that comes out.
+ aoi: AOI name
+ letter: AOI unique letter to ease sequence analysis.
+
+ !!! warning
+ Aoi scan step have to start by a fixation and then end by a saccade.
+ """
+
+ def __init__(self, movements: TimeStampedGazeMovements, aoi: str = '', letter: str = ''):
+
+ self.__movements = movements
+ self.__aoi = aoi
+ self.__letter = letter
+
+ # First movement have to be a fixation
+ if not is_fixation(self.first_fixation):
+ raise AOIScanStepError('First step movement is not a fixation', self.aoi)
+
+ # Last movement have to be a saccade
+ if not is_saccade(self.last_saccade):
+ raise AOIScanStepError('Last step movement is not a saccade', self.aoi)
+
+ @property
+ def movements(self):
+ """Get AOI scan step movements."""
+ return self.__movements
+
+ @property
+ def aoi(self):
+ """Get AOI scan step aoi."""
+ return self.__aoi
+
+ @property
+ def letter(self):
+ """Get AOI scan step letter."""
+ return self.__letter
+
+ @property
+ def first_fixation(self):
+ """First fixation on AOI."""
+ return self.movements[0]
+
+ @property
+ def last_saccade(self):
+ """Last saccade that comes out AOI."""
+ return self.movements[-1]
+
+ @property
+ def fixation_duration(self) -> int | float:
+ """Time spent on AOI
+
+ Returns:
+ fixation duration
+ """
+ return self.last_saccade[0].timestamp - self.first_fixation[0].timestamp
+
+ @property
+ def duration(self) -> int | float:
+ """Time spent on AOI and time spent to go to next AOI
+
+ Returns:
+ duration
+ """
+ return self.last_saccade[-1].timestamp - self.first_fixation[0].timestamp
+
# Define strings for outside AOI case
OutsideAOI = 'GazeFeatures.OutsideAOI'
-class AOIScanPath(list):
- """List of aoi scan steps over successive aoi."""
- def __init__(self, expected_aoi: list[str] = [], duration_max: int|float = 0):
+class AOIScanPath(list):
+ """List of aoi scan steps over successive aoi."""
- super().__init__()
+ def __init__(self, expected_aoi: list[str] = [], duration_max: int | float = 0):
- self.__expected_aoi = expected_aoi
- self.__duration_max = duration_max
- self.__duration = 0
+ super().__init__()
- self.clear()
+ self.__expected_aoi = expected_aoi
+ self.__duration_max = duration_max
+ self.__duration = 0
- @property
- def expected_aoi(self):
- """List of all expected aoi."""
+ self.clear()
- return self.__expected_aoi
+ @property
+ def expected_aoi(self):
+ """List of all expected aoi."""
- @expected_aoi.setter
- def expected_aoi(self, expected_aoi: list[str] = []):
- """Edit list of all expected aoi.
+ return self.__expected_aoi
- !!! warning
- This will clear the AOIScanPath
- """
+ @expected_aoi.setter
+ def expected_aoi(self, expected_aoi: list[str] = []):
+ """Edit list of all expected aoi.
- # Check expected aoi are not the same as previous ones
- if len(expected_aoi) == len(self.__expected_aoi[1:]):
+ !!! warning
+ This will clear the AOIScanPath
+ """
- equal = [a == b for a, b in zip(expected_aoi, self.__expected_aoi[1:])]
+ # Check expected aoi are not the same as previous ones
+ if len(expected_aoi) == len(self.__expected_aoi[1:]):
- if all(equal):
+ equal = [a == b for a, b in zip(expected_aoi, self.__expected_aoi[1:])]
- return
-
- # Otherwise, update expected aoi
- self.__expected_aoi = [OutsideAOI]
- self.__expected_aoi += expected_aoi
+ if all(equal):
+ return
- self.clear()
+ # Otherwise, update expected aoi
+ self.__expected_aoi = [OutsideAOI]
+ self.__expected_aoi += expected_aoi
- @property
- def duration_max(self) -> float:
- """Duration from which older aoi scan steps are removed each time new aoi scan steps are added. 0 means no maximal duration."""
- return self.__duration_max
+ self.clear()
- @duration_max.setter
- def duration_max(self, duration_max: float):
+ @property
+ def duration_max(self) -> float:
+ """Duration from which older aoi scan steps are removed each time new aoi scan steps are added. 0 means no maximal duration."""
+ return self.__duration_max
- self.__duration_max = duration_max
+ @duration_max.setter
+ def duration_max(self, duration_max: float):
- @property
- def duration(self) -> float:
- """Sum of all scan steps duration"""
+ self.__duration_max = duration_max
- return self.__duration
+ @property
+ def duration(self) -> float:
+ """Sum of all scan steps duration"""
- def __check_duration(self):
- """Constrain path duration to maximal duration."""
+ return self.__duration
- if self.__duration_max > 0:
+ def __check_duration(self):
+ """Constrain path duration to maximal duration."""
- while self.__duration > self.__duration_max:
+ if self.__duration_max > 0:
- oldest_step = self.pop(0)
+ while self.__duration > self.__duration_max:
- self.__duration -= oldest_step.duration
+ oldest_step = self.pop(0)
- # Edit transition matrix
- if len(self) > 0:
+ self.__duration -= oldest_step.duration
- # Decrement [index: source, columns: destination] value
- self.__transition_matrix.loc[oldest_step.aoi, self[0].aoi,] -= 1
+ # Edit transition matrix
+ if len(self) > 0:
+ # Decrement [index: source, columns: destination] value
+ self.__transition_matrix.loc[oldest_step.aoi, self[0].aoi,] -= 1
- def clear(self):
- """Clear aoi scan steps list, letter sequence and transition matrix."""
+ def clear(self):
+ """Clear aoi scan steps list, letter sequence and transition matrix."""
- super().clear()
+ super().clear()
- # noinspection PyAttributeOutsideInit
- self.__movements = TimeStampedGazeMovements()
- # noinspection PyAttributeOutsideInit
- self.__current_aoi = ''
- # noinspection PyAttributeOutsideInit
- self.__index = ord('A')
- # noinspection PyAttributeOutsideInit
- self.__aoi_letter = {}
- # noinspection PyAttributeOutsideInit
- self.__letter_aoi = {}
+ # noinspection PyAttributeOutsideInit
+ self.__movements = TimeStampedGazeMovements()
+ # noinspection PyAttributeOutsideInit
+ self.__current_aoi = ''
+ # noinspection PyAttributeOutsideInit
+ self.__index = ord('A')
+ # noinspection PyAttributeOutsideInit
+ self.__aoi_letter = {}
+ # noinspection PyAttributeOutsideInit
+ self.__letter_aoi = {}
- size = len(self.__expected_aoi)
- # noinspection PyAttributeOutsideInit
- self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi, columns=self.__expected_aoi)
+ size = len(self.__expected_aoi)
+ # noinspection PyAttributeOutsideInit
+ self.__transition_matrix = pandas.DataFrame(numpy.zeros((size, size)), index=self.__expected_aoi,
+ columns=self.__expected_aoi)
- def __get_aoi_letter(self, aoi):
+ def __get_aoi_letter(self, aoi):
- try :
+ try:
- return self.__aoi_letter[aoi]
+ return self.__aoi_letter[aoi]
- except KeyError:
+ except KeyError:
- letter = chr(self.__index)
- self.__aoi_letter[aoi] = letter
- self.__index += 1
- return letter
+ letter = chr(self.__index)
+ self.__aoi_letter[aoi] = letter
+ self.__index += 1
+ return letter
- def get_letter_aoi(self, letter):
- """Get which aoi is related to a unique letter."""
+ def get_letter_aoi(self, letter):
+ """Get which aoi is related to a unique letter."""
- return self.__letter_aoi[letter]
+ return self.__letter_aoi[letter]
- @property
- def letter_sequence(self) -> str:
- """Convert aoi scan path into a string with unique letter per aoi step."""
+ @property
+ def letter_sequence(self) -> str:
+ """Convert aoi scan path into a string with unique letter per aoi step."""
- sequence = ''
- for step in self:
- sequence += step.letter
+ sequence = ''
+ for step in self:
+ sequence += step.letter
- return sequence
-
- @property
- def current_aoi(self):
- """AOI name of aoi scan step under construction"""
+ return sequence
- return self.__current_aoi
+ @property
+ def current_aoi(self):
+ """AOI name of aoi scan step under construction"""
- @property
- def transition_matrix(self) -> pandas.DataFrame:
- """[Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) where indexes are transition departures and columns are transition destinations."""
+ return self.__current_aoi
- return self.__transition_matrix
+ @property
+ def transition_matrix(self) -> pandas.DataFrame:
+ """[Pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) where indexes are transition departures and columns are transition destinations."""
- def append_saccade(self, saccade):
- """Append new saccade to aoi scan path."""
+ return self.__transition_matrix
- # Ignore saccade if no fixation have been stored before
- if len(self.__movements) > 0:
+ def append_saccade(self, saccade):
+ """Append new saccade to aoi scan path."""
- self.__movements.append(saccade)
+ # Ignore saccade if no fixation have been stored before
+ if len(self.__movements) > 0:
+ self.__movements.append(saccade)
- def append_fixation(self, fixation, looked_aoi: str):
- """Append new fixation to aoi scan path and return last new aoi scan step if one have been created.
+ def append_fixation(self, fixation, looked_aoi: str):
+ """Append new fixation to aoi scan path and return last new aoi scan step if one have been created.
- !!! warning
- It could raise AOIScanStepError
- """
+ !!! warning
+ It could raise AOIScanStepError
+ """
- # Replace None aoi by generic OutsideAOI name
- if looked_aoi is None:
+ # Replace None aoi by generic OutsideAOI name
+ if looked_aoi is None:
- looked_aoi = OutsideAOI
+ looked_aoi = OutsideAOI
- # Raise error when aoi is not expected
- elif looked_aoi not in self.__expected_aoi:
+ # Raise error when aoi is not expected
+ elif looked_aoi not in self.__expected_aoi:
- raise AOIScanStepError('AOI not expected', looked_aoi)
+ raise AOIScanStepError('AOI not expected', looked_aoi)
- # Is it fixation onto a new aoi?
- if looked_aoi != self.__current_aoi and len(self.__movements) > 0:
+ # Is it fixation onto a new aoi?
+ if looked_aoi != self.__current_aoi and len(self.__movements) > 0:
- try:
+ try:
- # Edit unique letter per aoi
- letter = self.__get_aoi_letter(self.__current_aoi)
+ # Edit unique letter per aoi
+ letter = self.__get_aoi_letter(self.__current_aoi)
- # Remember which letter identify which aoi
- self.__letter_aoi[letter] = self.__current_aoi
+ # Remember which letter identify which aoi
+ self.__letter_aoi[letter] = self.__current_aoi
- # Edit new step
- new_step = AOIScanStep(self.__movements, self.__current_aoi, letter)
+ # Edit new step
+ new_step = AOIScanStep(self.__movements, self.__current_aoi, letter)
- # Edit transition matrix
- if len(self) > 0:
+ # Edit transition matrix
+ if len(self) > 0:
+ # Increment [index: source, columns: destination] value
+ self.__transition_matrix.loc[self[-1].aoi, self.__current_aoi,] += 1
- # Increment [index: source, columns: destination] value
- self.__transition_matrix.loc[self[-1].aoi, self.__current_aoi,] += 1
+ # Append new step
+ super().append(new_step)
- # Append new step
- super().append(new_step)
+ # Update duration
+ self.__duration += new_step.duration
- # Update duration
- self.__duration += new_step.duration
+ # Constrain path duration to maximal duration
+ self.__check_duration()
- # Constrain path duration to maximal duration
- self.__check_duration()
+ # Return new step
+ return new_step
- # Return new step
- return new_step
+ finally:
- finally:
+ # Clear movements
+ # noinspection PyAttributeOutsideInit
+ self.__movements = TimeStampedGazeMovements()
- # Clear movements
- # noinspection PyAttributeOutsideInit
- self.__movements = TimeStampedGazeMovements()
+ # Append new fixation
+ self.__movements.append(fixation)
- # Append new fixation
- self.__movements.append(fixation)
+ # Remember new aoi
+ self.__current_aoi = looked_aoi
+ else:
- # Remember new aoi
- self.__current_aoi = looked_aoi
- else:
+ # Append new fixation
+ self.__movements.append(fixation)
- # Append new fixation
- self.__movements.append(fixation)
+ # Remember aoi
+ # noinspection PyAttributeOutsideInit
+ self.__current_aoi = looked_aoi
- # Remember aoi
- # noinspection PyAttributeOutsideInit
- self.__current_aoi = looked_aoi
+ return None
- return None
+ def fixations_count(self):
+ """Get how many fixations are there in the scan path and how many fixation are there in each aoi."""
- def fixations_count(self):
- """Get how many fixations are there in the scan path and how many fixation are there in each aoi."""
+ scan_fixations_count = 0
+ aoi_fixations_count = {aoi: 0 for aoi in self.__expected_aoi}
- scan_fixations_count = 0
- aoi_fixations_count = {aoi: 0 for aoi in self.__expected_aoi}
+ for aoi_scan_step in self:
+ step_fixations_count = len(aoi_scan_step.movements) - 1 # -1: to ignore last saccade
- for aoi_scan_step in self:
+ scan_fixations_count += step_fixations_count
+ aoi_fixations_count[aoi_scan_step.aoi] += step_fixations_count
- step_fixations_count = len(aoi_scan_step.movements) - 1 # -1: to ignore last saccade
+ return scan_fixations_count, aoi_fixations_count
- scan_fixations_count += step_fixations_count
- aoi_fixations_count[aoi_scan_step.aoi] += step_fixations_count
-
- return scan_fixations_count, aoi_fixations_count
class AOIScanPathAnalyzer(DataFeatures.PipelineStepObject):
- """Abstract class to define what should provide an aoi scan path analyzer."""
-
- # noinspection PyMissingConstructor
- @DataFeatures.PipelineStepInit
- def __init__(self, **kwargs):
+ """Abstract class to define what should provide an aoi scan path analyzer."""
- self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if isinstance(value, property) and value.fset is None]
+ # noinspection PyMissingConstructor
+ @DataFeatures.PipelineStepInit
+ def __init__(self, **kwargs):
+ self.__analysis = [name for (name, value) in self.__class__.__dict__.items() if
+ isinstance(value, property) and value.fset is None]
- def analysis(self) -> DataFeatures.DataDictionary:
- """Get all aoi scan path analyzer analysis as data dictionary."""
+ def analysis(self) -> DataFeatures.DataDictionary:
+ """Get all aoi scan path analyzer analysis as data dictionary."""
- return DataFeatures.DataDictionary( {a: getattr(self, a) for a in self.__analysis} )
+ return DataFeatures.DataDictionary({a: getattr(self, a) for a in self.__analysis})
- @DataFeatures.PipelineStepMethod
- def analyze(self, aoi_scan_path: AOIScanPath):
- """Analyze aoi scan path."""
+ @DataFeatures.PipelineStepMethod
+ def analyze(self, aoi_scan_path: AOIScanPath):
+ """Analyze aoi scan path."""
- raise NotImplementedError('analyze() method not implemented')
+ raise NotImplementedError('analyze() method not implemented')
diff --git a/src/argaze/PupilAnalysis/WorkloadIndex.py b/src/argaze/PupilAnalysis/WorkloadIndex.py
index bced982..00995e9 100644
--- a/src/argaze/PupilAnalysis/WorkloadIndex.py
+++ b/src/argaze/PupilAnalysis/WorkloadIndex.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/PupilFeatures.py b/src/argaze/PupilFeatures.py
index c38d10a..e73176d 100644
--- a/src/argaze/PupilFeatures.py
+++ b/src/argaze/PupilFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/__main__.py b/src/argaze/__main__.py
index 15a78c1..3e1fe9e 100644
--- a/src/argaze/__main__.py
+++ b/src/argaze/__main__.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/utils/UtilsFeatures.py b/src/argaze/utils/UtilsFeatures.py
index 133809b..c04d20a 100644
--- a/src/argaze/utils/UtilsFeatures.py
+++ b/src/argaze/utils/UtilsFeatures.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/utils/aruco_markers_group_export.py b/src/argaze/utils/aruco_markers_group_export.py
index 46507b8..569ba6b 100644
--- a/src/argaze/utils/aruco_markers_group_export.py
+++ b/src/argaze/utils/aruco_markers_group_export.py
@@ -10,7 +10,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -27,199 +27,208 @@ from argaze import DataFeatures
from argaze.ArUcoMarkers import ArUcoDetector, ArUcoOpticCalibrator, ArUcoMarkersGroup
from argaze.utils import UtilsFeatures
-def main():
- """
- Detect DICTIONARY and SIZE ArUco markers inside a MOVIE frame then, export detected ArUco markers group as .obj file into an OUTPUT folder.
- """
-
- # Manage arguments
- parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
- parser.add_argument('movie', metavar='MOVIE', type=str, default=None, help='movie path')
- parser.add_argument('dictionary', metavar='DICTIONARY', type=str, default=None, help='expected ArUco markers dictionary')
- parser.add_argument('size', metavar='SIZE', type=float, default=None, help='expected ArUco markers size (in cm)')
-
- parser.add_argument('-p', '--parameters', metavar='PARAMETERS', type=str, default=None, help='ArUco detector parameters file')
- parser.add_argument('-op', '--optic_parameters', metavar='OPTIC_PARAMETERS', type=str, default=None, help='ArUco detector optic parameters file')
-
- parser.add_argument('-s', '--start', metavar='START', type=float, default=0., help='start time in second')
- parser.add_argument('-o', '--output', metavar='OUTPUT', type=str, default='.', help='export folder path')
- parser.add_argument('-v', '--verbose', action='store_true', default=False, help='enable verbose mode to print information in console')
- args = parser.parse_args()
-
- # Load movie
- video_capture = cv2.VideoCapture(args.movie)
+def main():
+ """
+ Detect DICTIONARY and SIZE ArUco markers inside a MOVIE frame then, export detected ArUco markers group as .obj file into an OUTPUT folder.
+ """
- video_fps = video_capture.get(cv2.CAP_PROP_FPS)
- image_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
- image_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
+ # Manage arguments
+ parser = argparse.ArgumentParser(description=main.__doc__.split('-')[0])
+ parser.add_argument('movie', metavar='MOVIE', type=str, default=None, help='movie path')
+ parser.add_argument('dictionary', metavar='DICTIONARY', type=str, default=None,
+ help='expected ArUco markers dictionary')
+ parser.add_argument('size', metavar='SIZE', type=float, default=None, help='expected ArUco markers size (in cm)')
- # Edit ArUco detector configuration
- configuration = {
- "dictionary": args.dictionary
- }
+ parser.add_argument('-p', '--parameters', metavar='PARAMETERS', type=str, default=None,
+ help='ArUco detector parameters file')
+ parser.add_argument('-op', '--optic_parameters', metavar='OPTIC_PARAMETERS', type=str, default=None,
+ help='ArUco detector optic parameters file')
- if args.parameters:
+ parser.add_argument('-s', '--start', metavar='START', type=float, default=0., help='start time in second')
+ parser.add_argument('-o', '--output', metavar='OUTPUT', type=str, default='.', help='export folder path')
+ parser.add_argument('-v', '--verbose', action='store_true', default=False,
+ help='enable verbose mode to print information in console')
- configuration["parameters"] = args.parameters
+ args = parser.parse_args()
- if args.optic_parameters:
+ # Load movie
+ video_capture = cv2.VideoCapture(args.movie)
- configuration["optic_parameters"] = args.optic_parameters
+ video_fps = video_capture.get(cv2.CAP_PROP_FPS)
+ image_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
+ image_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
- # Load ArUco detector configuration
- aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
+ # Edit ArUco detector configuration
+ configuration = {
+ "dictionary": args.dictionary
+ }
- if args.verbose:
+ if args.parameters:
+ configuration["parameters"] = args.parameters
- print(aruco_detector)
+ if args.optic_parameters:
+ configuration["optic_parameters"] = args.optic_parameters
- # Create empty ArUco scene
- aruco_markers_group = None
+ # Load ArUco detector configuration
+ aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
- # Edit draw parameters
- draw_parameters = {
- "color": [255, 255, 255],
- "draw_axes": {
- "thickness": 4
- }
- }
+ if args.verbose:
+ print(aruco_detector)
- # Create a window
- cv2.namedWindow("Export detected ArUco markers", cv2.WINDOW_AUTOSIZE)
+ # Create empty ArUco scene
+ aruco_markers_group = None
- # Init image selection
- current_image_index = -1
- _, current_image = video_capture.read()
- next_image_index = int(args.start * video_fps)
- refresh = False
+ # Edit draw parameters
+ draw_parameters = {
+ "color": [255, 255, 255],
+ "draw_axes": {
+ "thickness": 4
+ }
+ }
- # Waiting for 'ctrl+C' interruption
- with contextlib.suppress(KeyboardInterrupt):
+ # Create a window
+ cv2.namedWindow("Export detected ArUco markers", cv2.WINDOW_AUTOSIZE)
- while True:
+ # Init image selection
+ current_image_index = -1
+ _, current_image = video_capture.read()
+ next_image_index = int(args.start * video_fps)
+ refresh = False
- # Select a new image and detect markers once
- if next_image_index != current_image_index or refresh:
+ # Waiting for 'ctrl+C' interruption
+ with contextlib.suppress(KeyboardInterrupt):
- video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index)
+ while True:
- success, video_image = video_capture.read()
+ # Select a new image and detect markers once
+ if next_image_index != current_image_index or refresh:
- video_height, video_width, _ = video_image.shape
+ video_capture.set(cv2.CAP_PROP_POS_FRAMES, next_image_index)
- # Create default optic parameters adapted to frame size
- if aruco_detector.optic_parameters is None:
+ success, video_image = video_capture.read()
- # Note: The choice of 1000 for default focal length should be discussed...
- aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.), width=video_width, height=video_height))
+ video_height, video_width, _ = video_image.shape
- if success:
+ # Create default optic parameters adapted to frame size
+ if aruco_detector.optic_parameters is None:
+ # Note: The choice of 1000 for default focal length should be discussed...
+ aruco_detector.optic_parameters = ArUcoOpticCalibrator.OpticParameters(rms=-1, dimensions=(
+ video_width, video_height), K=ArUcoOpticCalibrator.K0(focal_length=(1000., 1000.),
+ width=video_width, height=video_height))
- # Refresh once
- refresh = False
+ if success:
- current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1
- current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC)
+ # Refresh once
+ refresh = False
- try:
+ current_image_index = video_capture.get(cv2.CAP_PROP_POS_FRAMES) - 1
+ current_image_time = video_capture.get(cv2.CAP_PROP_POS_MSEC)
- # Detect and project AR features
- aruco_detector.detect_markers(video_image)
+ try:
- # Estimate all detected markers pose
- aruco_detector.estimate_markers_pose(args.size)
+ # Detect and project AR features
+ aruco_detector.detect_markers(video_image)
- # Build aruco scene from detected markers
- aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary, aruco_detector.detected_markers())
+ # Estimate all detected markers pose
+ aruco_detector.estimate_markers_pose(args.size)
- # Detection succeeded
- exception = None
+ # Build aruco scene from detected markers
+ aruco_markers_group = ArUcoMarkersGroup.ArUcoMarkersGroup(aruco_detector.dictionary,
+ aruco_detector.detected_markers())
- # Write errors
- except Exception as e:
+ # Detection succeeded
+ exception = None
- aruco_markers_group = None
+ # Write errors
+ except Exception as e:
- exception = e
+ aruco_markers_group = None
- # Draw detected markers
- aruco_detector.draw_detected_markers(video_image, draw_parameters)
+ exception = e
- # Write detected markers
- cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}', (20, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ # Draw detected markers
+ aruco_detector.draw_detected_markers(video_image, draw_parameters)
- # Write timing
- cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
+ # Write detected markers
+ cv2.putText(video_image, f'Detecting markers {list(aruco_detector.detected_markers().keys())}',
+ (20, video_height - 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
- # Write exception
- if exception is not None:
+ # Write timing
+ cv2.putText(video_image, f'Frame at {int(current_image_time)}ms', (20, 40),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ # Write exception
+ if exception is not None:
+ cv2.putText(video_image, f'error: {exception}', (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1,
+ (0, 255, 255), 1, cv2.LINE_AA)
- # Write documentation
- cv2.putText(video_image, f'<- previous image', (video_width-500, video_height-160), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'-> next image', (video_width-500, video_height-120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'r: reload config', (video_width-500, video_height-80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width-500, video_height-40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ # Write documentation
+ cv2.putText(video_image, f'<- previous image', (video_width - 500, video_height - 160),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(video_image, f'-> next image', (video_width - 500, video_height - 120),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(video_image, f'r: reload config', (video_width - 500, video_height - 80),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
+ cv2.putText(video_image, f'Ctrl+s: export ArUco markers', (video_width - 500, video_height - 40),
+ cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA)
- # Copy image
- current_image = video_image.copy()
+ # Copy image
+ current_image = video_image.copy()
- # Keep last image
- else:
+ # Keep last image
+ else:
- video_image = current_image.copy()
+ video_image = current_image.copy()
- key_pressed = cv2.waitKey(10)
+ key_pressed = cv2.waitKey(10)
- #if key_pressed != -1:
- # print(key_pressed)
+ #if key_pressed != -1:
+ # print(key_pressed)
- # Select previous image with left arrow
- if key_pressed == 2:
- next_image_index -= 1
+ # Select previous image with left arrow
+ if key_pressed == 2:
+ next_image_index -= 1
- # Select next image with right arrow
- if key_pressed == 3:
- next_image_index += 1
+ # Select next image with right arrow
+ if key_pressed == 3:
+ next_image_index += 1
- # Clip image index
- if next_image_index < 0:
- next_image_index = 0
+ # Clip image index
+ if next_image_index < 0:
+ next_image_index = 0
- # r: reload configuration
- if key_pressed == 114:
+ # r: reload configuration
+ if key_pressed == 114:
+ aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
+ refresh = True
+ print('Configuration reloaded')
- aruco_detector = DataFeatures.from_dict(ArUcoDetector.ArUcoDetector, configuration)
- refresh = True
- print('Configuration reloaded')
+ # Save selected marker edition using 'Ctrl + s'
+ if key_pressed == 19:
- # Save selected marker edition using 'Ctrl + s'
- if key_pressed == 19:
+ if aruco_markers_group:
- if aruco_markers_group:
+ aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj')
+ print(f'ArUco markers saved into {args.output}')
- aruco_markers_group.to_obj(f'{args.output}/{int(current_image_time)}-aruco_markers_group.obj')
- print(f'ArUco markers saved into {args.output}')
+ else:
- else:
+ print(f'No ArUco markers to export')
- print(f'No ArUco markers to export')
+ # Close window using 'Esc' key
+ if key_pressed == 27:
+ break
- # Close window using 'Esc' key
- if key_pressed == 27:
- break
+ # Display video
+ cv2.imshow(aruco_detector.name, video_image)
- # Display video
- cv2.imshow(aruco_detector.name, video_image)
+ # Close movie capture
+ video_capture.release()
- # Close movie capture
- video_capture.release()
+ # Stop image display
+ cv2.destroyAllWindows()
- # Stop image display
- cv2.destroyAllWindows()
if __name__ == '__main__':
-
- main() \ No newline at end of file
+ main()
diff --git a/src/argaze/utils/contexts/OpenCV.py b/src/argaze/utils/contexts/OpenCV.py
index f89189d..20be1a4 100644
--- a/src/argaze/utils/contexts/OpenCV.py
+++ b/src/argaze/utils/contexts/OpenCV.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
diff --git a/src/argaze/utils/contexts/PupilLabs.py b/src/argaze/utils/contexts/PupilLabs.py
index 43fe47e..d814deb 100644
--- a/src/argaze/utils/contexts/PupilLabs.py
+++ b/src/argaze/utils/contexts/PupilLabs.py
@@ -1,5 +1,5 @@
"""Handle network connection to Pupil Labs devices. Tested with Pupil Invisible.
- Based on Pupil Labs' Realtime Python API.
+ Based on Pupil Labs' Realtime Python API.
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Damien Mouratille"
@@ -31,6 +31,7 @@ import cv2
from pupil_labs.realtime_api.simple import discover_one_device
+
class LiveStream(ArFeatures.ArContext):
@DataFeatures.PipelineStepInit
@@ -59,75 +60,71 @@ class LiveStream(ArFeatures.ArContext):
logging.info('Device found. Stream loading.')
# Open gaze stream
- self.__gaze_thread = threading.Thread(target = self.__stream_gaze)
+ self.__gaze_thread = threading.Thread(target=self.__stream_gaze)
logging.debug('> starting gaze thread...')
self.__gaze_thread.start()
-
+
# Open video stream
- self.__video_thread = threading.Thread(target = self.__stream_video)
+ self.__video_thread = threading.Thread(target=self.__stream_video)
logging.debug('> starting video thread...')
self.__video_thread.start()
-
-
return self
-
def __stream_gaze(self):
"""Stream gaze."""
logging.debug('Stream gaze from Pupil Device')
while not self.__stop_event.is_set():
-
+
try:
while True:
gaze = self.__device.receive_gaze_datum()
gaze_timestamp = int((gaze.timestamp_unix_seconds - self.__start_time) * 1e3)
-
+
logging.debug('Gaze received at %i timestamp', gaze_timestamp)
-
+
# When gaze position is valid
if gaze.worn is True:
-
+
self._process_gaze_position(
- timestamp = gaze_timestamp,
- x = int(gaze.x),
- y = int(gaze.y))
+ timestamp=gaze_timestamp,
+ x=int(gaze.x),
+ y=int(gaze.y))
else:
# Process empty gaze position
logging.debug('Not worn at %i timestamp', gaze_timestamp)
- self._process_gaze_position(timestamp = gaze_timestamp)
-
+ self._process_gaze_position(timestamp=gaze_timestamp)
+
except KeyboardInterrupt:
pass
-
-
+
def __stream_video(self):
"""Stream video."""
logging.debug('Stream video from Pupil Device')
while not self.__stop_event.is_set():
-
+
try:
while True:
scene_frame, frame_datetime = self.__device.receive_scene_video_frame()
-
+
scene_timestamp = int((frame_datetime - self.__start_time) * 1e3)
-
+
logging.debug('Video received at %i timestamp', scene_timestamp)
-
+
self._process_camera_image(
- timestamp = scene_timestamp,
- image = scene_frame)
-
+ timestamp=scene_timestamp,
+ image=scene_frame)
+
except KeyboardInterrupt:
pass
@@ -135,10 +132,10 @@ class LiveStream(ArFeatures.ArContext):
def __exit__(self, exception_type, exception_value, exception_traceback):
logging.debug('Pupil-Labs context stops...')
-
+
# Close data stream
self.__stop_event.set()
-
+
# Stop streaming
threading.Thread.join(self.__gaze_thread)
- threading.Thread.join(self.__video_thread) \ No newline at end of file
+ threading.Thread.join(self.__video_thread)
diff --git a/src/argaze/utils/contexts/TobiiProGlasses2.py b/src/argaze/utils/contexts/TobiiProGlasses2.py
index 0fba2ff..0c2b8f9 100644
--- a/src/argaze/utils/contexts/TobiiProGlasses2.py
+++ b/src/argaze/utils/contexts/TobiiProGlasses2.py
@@ -9,7 +9,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"
@@ -75,13 +75,15 @@ DEFAULT_TOBII_IMAGE_PARAMETERS = {
"draw_something": False
}
+
# Define extra classes to support Tobii data parsing
@dataclass
class DirSig():
"""Define dir sig data (dir sig)."""
- dir: int # meaning ?
- sig: int # meaning ?
+ dir: int # meaning ?
+ sig: int # meaning ?
+
@dataclass
class PresentationTimeStamp():
@@ -90,6 +92,7 @@ class PresentationTimeStamp():
value: int
"""Pts value."""
+
@dataclass
class VideoTimeStamp():
"""Define video time stamp (vts) data."""
@@ -100,20 +103,23 @@ class VideoTimeStamp():
offset: int
"""Primary time stamp value."""
+
@dataclass
class EventSynch():
"""Define event synch (evts) data."""
- value: int # meaning ?
+ value: int # meaning ?
"""Evts value."""
+
@dataclass
class Event():
"""Define event data (ets type tag)."""
- ets: int # meaning ?
+ ets: int # meaning ?
type: str
- tag: str # dict ?
+ tag: str # dict ?
+
@dataclass
class Accelerometer():
@@ -122,6 +128,7 @@ class Accelerometer():
value: numpy.array
"""Accelerometer value"""
+
@dataclass
class Gyroscope():
"""Define gyroscope data (gy)."""
@@ -129,6 +136,7 @@ class Gyroscope():
value: numpy.array
"""Gyroscope value"""
+
@dataclass
class PupilCenter():
"""Define pupil center data (gidx pc eye)."""
@@ -136,7 +144,8 @@ class PupilCenter():
validity: int
index: int
value: tuple[(float, float, float)]
- eye: str # 'right' or 'left'
+ eye: str # 'right' or 'left'
+
@dataclass
class PupilDiameter():
@@ -145,7 +154,8 @@ class PupilDiameter():
validity: int
index: int
value: float
- eye: str # 'right' or 'left'
+ eye: str # 'right' or 'left'
+
@dataclass
class GazeDirection():
@@ -154,7 +164,8 @@ class GazeDirection():
validity: int
index: int
value: tuple[(float, float, float)]
- eye: str # 'right' or 'left'
+ eye: str # 'right' or 'left'
+
@dataclass
class GazePosition():
@@ -162,9 +173,10 @@ class GazePosition():
validity: int
index: int
- l: str # ?
+ l: str # ?
value: tuple[(float, float)]
+
@dataclass
class GazePosition3D():
"""Define gaze position 3D data (gidx gp3)."""
@@ -173,6 +185,7 @@ class GazePosition3D():
index: int
value: tuple[(float, float)]
+
@dataclass
class MarkerPosition():
"""Define marker data (marker3d marker2d)."""
@@ -180,6 +193,7 @@ class MarkerPosition():
value_3d: tuple[(float, float, float)]
value_2d: tuple[(float, float)]
+
class TobiiJsonDataParser():
def __init__(self):
@@ -319,6 +333,7 @@ class TobiiJsonDataParser():
return MarkerPosition(data['marker3d'], data['marker2d'])
+
class LiveStream(ArFeatures.ArContext):
@DataFeatures.PipelineStepInit
@@ -343,14 +358,14 @@ class LiveStream(ArFeatures.ArContext):
# Init protected attributes
self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS}
-
+
@property
def address(self) -> str:
"""Network address where to find the device."""
return self.__address
@address.setter
- def address(self, address:str):
+ def address(self, address: str):
self.__address = address
@@ -358,7 +373,6 @@ class LiveStream(ArFeatures.ArContext):
if "%" in self.__address:
if sys.platform == "win32":
-
self.__address = self.__address.split("%")[0]
# Define base url
@@ -372,7 +386,7 @@ class LiveStream(ArFeatures.ArContext):
self.__base_url = 'http://' + self.__address
@property
- def configuration(self)-> dict:
+ def configuration(self) -> dict:
"""Patch system configuration dictionary."""
return self.__configuration
@@ -388,15 +402,14 @@ class LiveStream(ArFeatures.ArContext):
return self.__project_name
@project.setter
- def project(self, project:str):
-
+ def project(self, project: str):
+
self.__project_name = project
def __bind_project(self):
"""Bind to a project or create one if it doesn't exist."""
if self.__project_name is None:
-
raise Exception(f'Project binding fails: setup project before.')
self.__project_id = None
@@ -409,7 +422,6 @@ class LiveStream(ArFeatures.ArContext):
try:
if project['pr_info']['Name'] == self.__project_name:
-
self.__project_id = project['pr_id']
logging.debug('> %s project already exist: %s', self.__project_name, self.__project_id)
@@ -420,13 +432,12 @@ class LiveStream(ArFeatures.ArContext):
# The project doesn't exist, create one
if self.__project_id is None:
-
logging.debug('> %s project doesn\'t exist', self.__project_name)
data = {
- 'pr_info' : {
+ 'pr_info': {
'CreationDate': self.__get_current_datetime(timeformat=TOBII_DATETIME_FORMAT_HUMREAD),
- 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)),
+ 'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__project_name)),
'Name': self.__project_name
},
'pr_created': self.__get_current_datetime()
@@ -439,12 +450,12 @@ class LiveStream(ArFeatures.ArContext):
logging.debug('> new %s project created: %s', self.__project_name, self.__project_id)
@property
- def participant(self)-> str:
+ def participant(self) -> str:
"""Participant name"""
return self.__participant_name
@participant.setter
- def participant(self, participant:str):
+ def participant(self, participant: str):
self.__participant_name = participant
@@ -456,13 +467,11 @@ class LiveStream(ArFeatures.ArContext):
"""
if self.__participant_name is None:
-
raise Exception(f'Participant binding fails: setup participant before.')
- if self.__project_id is None :
-
+ if self.__project_id is None:
raise Exception(f'Participant binding fails: bind to a project before')
-
+
self.__participant_id = None
# Check if participant exist
@@ -473,7 +482,6 @@ class LiveStream(ArFeatures.ArContext):
try:
if participant['pa_info']['Name'] == self.__participant_name:
-
self.__participant_id = participant['pa_id']
logging.debug('> %s participant already exist: %s', self.__participant_name, self.__participant_id)
@@ -484,15 +492,14 @@ class LiveStream(ArFeatures.ArContext):
# The participant doesn't exist, create one
if self.__participant_id is None:
-
logging.debug('> %s participant doesn\'t exist', self.__participant_name)
data = {
'pa_project': self.__project_id,
- 'pa_info': {
+ 'pa_info': {
'EagleId': str(uuid.uuid5(uuid.NAMESPACE_DNS, self.__participant_name)),
'Name': self.__participant_name,
- 'Notes': '' # TODO: set participant notes
+ 'Notes': '' # TODO: set participant notes
},
'pa_created': self.__get_current_datetime()
}
@@ -507,7 +514,7 @@ class LiveStream(ArFeatures.ArContext):
def __enter__(self):
logging.info('Tobii Pro Glasses 2 connexion starts...')
-
+
# Update current configuration with configuration patch
logging.debug('> updating configuration')
@@ -527,7 +534,6 @@ class LiveStream(ArFeatures.ArContext):
logging.info('Tobii Pro Glasses 2 configuration:')
for key, value in configuration.items():
-
logging.info('%s: %s', key, str(value))
# Store video stream info
@@ -546,7 +552,6 @@ class LiveStream(ArFeatures.ArContext):
# Bind to participant if required
if self.__participant_name is not None:
-
logging.debug('> binding participant %s', self.__participant_name)
self.__bind_participant()
@@ -558,21 +563,22 @@ class LiveStream(ArFeatures.ArContext):
# Open data stream
self.__data_socket = self.__make_socket()
- self.__data_thread = threading.Thread(target = self.__stream_data)
+ self.__data_thread = threading.Thread(target=self.__stream_data)
logging.debug('> starting data thread...')
self.__data_thread.start()
# Open video stream
self.__video_socket = self.__make_socket()
- self.__video_thread = threading.Thread(target = self.__stream_video)
+ self.__video_thread = threading.Thread(target=self.__stream_video)
logging.debug('> starting video thread...')
self.__video_thread.start()
# Keep connection alive
- self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \""+ str(uuid.uuid4()) +"\", \"op\": \"start\"}"
- self.__keep_alive_thread = threading.Thread(target = self.__keep_alive)
+ self.__keep_alive_msg = "{\"type\": \"live.data.unicast\", \"key\": \"" + str(
+ uuid.uuid4()) + "\", \"op\": \"start\"}"
+ self.__keep_alive_thread = threading.Thread(target=self.__keep_alive)
logging.debug('> starting keep alive thread...')
self.__keep_alive_thread.start()
@@ -583,7 +589,7 @@ class LiveStream(ArFeatures.ArContext):
def __exit__(self, exception_type, exception_value, exception_traceback):
logging.debug('%s.__exit__', DataFeatures.get_class_path(self))
-
+
# Close data stream
self.__stop_event.set()
@@ -612,7 +618,6 @@ class LiveStream(ArFeatures.ArContext):
image = super().image(**kwargs)
if draw_something:
-
cv2.putText(image, 'SOMETHING', (512, 512), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)
return image
@@ -623,10 +628,10 @@ class LiveStream(ArFeatures.ArContext):
iptype = socket.AF_INET
if ':' in self.__address:
-
iptype = socket.AF_INET6
- res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)
+ res = socket.getaddrinfo(self.__address, self.__udpport, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0,
+ socket.AI_PASSIVE)
family, socktype, proto, canonname, sockaddr = res[0]
new_socket = socket.socket(family, socktype, proto)
@@ -635,13 +640,11 @@ class LiveStream(ArFeatures.ArContext):
try:
if iptype == socket.AF_INET6:
-
new_socket.setsockopt(socket.SOL_SOCKET, 25, 1)
except socket.error as e:
if e.errno == 1:
-
logging.error('Binding to a network interface is permitted only for root users.')
return new_socket
@@ -672,7 +675,6 @@ class LiveStream(ArFeatures.ArContext):
# Store first timestamp
if first_ts == 0:
-
first_ts = data_ts
# Edit millisecond timestamp
@@ -689,15 +691,15 @@ class LiveStream(ArFeatures.ArContext):
# Process timestamped gaze position
self._process_gaze_position(
- timestamp = timestamp,
- x = int(data_object.value[0] * self.__video_width),
- y = int(data_object.value[1] * self.__video_height) )
+ timestamp=timestamp,
+ x=int(data_object.value[0] * self.__video_width),
+ y=int(data_object.value[1] * self.__video_height))
else:
# Process empty gaze position
- self._process_gaze_position(timestamp = timestamp)
-
+ self._process_gaze_position(timestamp=timestamp)
+
def __stream_video(self):
"""Stream video from dedicated socket."""
@@ -712,7 +714,7 @@ class LiveStream(ArFeatures.ArContext):
self.__video_buffer_lock = threading.Lock()
# Open video buffer reader
- self.__video_buffer_read_thread = threading.Thread(target = self.__video_buffer_read)
+ self.__video_buffer_read_thread = threading.Thread(target=self.__video_buffer_read)
logging.debug('> starting video buffer reader thread...')
self.__video_buffer_read_thread.start()
@@ -726,7 +728,6 @@ class LiveStream(ArFeatures.ArContext):
# Quit if the video acquisition thread have been stopped
if self.__stop_event.is_set():
-
logging.debug('> stop event is set')
break
@@ -736,7 +737,6 @@ class LiveStream(ArFeatures.ArContext):
# Store first timestamp
if first_ts == 0:
-
first_ts = image.time
# Edit millisecond timestamp
@@ -762,7 +762,6 @@ class LiveStream(ArFeatures.ArContext):
# Can't read image while it is locked
while self.__video_buffer_lock.locked():
-
# Check 10 times per frame
time.sleep(1 / (10 * self.__video_fps))
@@ -782,7 +781,6 @@ class LiveStream(ArFeatures.ArContext):
logging.debug('> read image at %i timestamp', timestamp)
if len(self.__video_buffer) > 0:
-
logging.warning('skipping %i image', len(self.__video_buffer))
# Clear buffer
@@ -790,9 +788,9 @@ class LiveStream(ArFeatures.ArContext):
# Process camera image
self._process_camera_image(
- timestamp = timestamp,
- image = image)
-
+ timestamp=timestamp,
+ image=image)
+
except Exception as e:
logging.warning('%s.__video_buffer_read: %s', DataFeatures.get_class_path(self), e)
@@ -806,7 +804,6 @@ class LiveStream(ArFeatures.ArContext):
logging.debug('%s.__keep_alive', DataFeatures.get_class_path(self))
while not self.__stop_event.is_set():
-
self.__data_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
self.__video_socket.sendto(self.__keep_alive_msg.encode('utf-8'), (self.__address, self.__udpport))
@@ -833,7 +830,7 @@ class LiveStream(ArFeatures.ArContext):
return data
- def __post_request(self, api_action, data = None, wait_for_response = True) -> any:
+ def __post_request(self, api_action, data=None, wait_for_response=True) -> any:
"""Send a POST request and get result back."""
url = self.__base_url + api_action
@@ -845,7 +842,6 @@ class LiveStream(ArFeatures.ArContext):
data = json.dumps(data)
if wait_for_response is False:
-
threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start()
return None
@@ -863,7 +859,7 @@ class LiveStream(ArFeatures.ArContext):
return res
- def __wait_for_status(self, api_action, key, values, timeout = None) -> any:
+ def __wait_for_status(self, api_action, key, values, timeout=None) -> any:
"""Wait until a status matches given values."""
url = self.__base_url + api_action
@@ -875,8 +871,8 @@ class LiveStream(ArFeatures.ArContext):
req.add_header('Content-Type', 'application/json')
try:
-
- response = urlopen(req, None, timeout = timeout)
+
+ response = urlopen(req, None, timeout=timeout)
except URLError as e:
@@ -910,12 +906,10 @@ class LiveStream(ArFeatures.ArContext):
status = self.calibration_status()
while status == 'calibrating':
-
time.sleep(1)
status = self.calibration_status()
if status == 'uncalibrated' or status == 'stale' or status == 'failed':
-
raise Exception(f'Calibration {status}')
# CALIBRATION
@@ -931,11 +925,10 @@ class LiveStream(ArFeatures.ArContext):
# Calibration have to be done for a project and a participant
if project_id is None or participant_id is None:
-
raise Exception(f'Setup project and participant before')
data = {
- 'ca_project': project_id,
+ 'ca_project': project_id,
'ca_type': 'default',
'ca_participant': participant_id,
'ca_created': self.__get_current_datetime()
@@ -954,11 +947,11 @@ class LiveStream(ArFeatures.ArContext):
if self.__calibration_id is not None:
- status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state', ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
+ status = self.__wait_for_status('/api/calibrations/' + self.__calibration_id + '/status', 'ca_state',
+ ['calibrating', 'calibrated', 'stale', 'uncalibrated', 'failed'])
# Forget calibration id
if status != 'calibrating':
-
# noinspection PyAttributeOutsideInit
self.__calibration_id = None
@@ -970,10 +963,12 @@ class LiveStream(ArFeatures.ArContext):
# RECORDING FEATURES
- def __wait_for_recording_status(self, recording_id, status_array = ['init', 'starting', 'recording', 'pausing', 'paused', 'stopping', 'stopped', 'done', 'stale', 'failed']):
+ def __wait_for_recording_status(self, recording_id,
+ status_array=['init', 'starting', 'recording', 'pausing', 'paused', 'stopping',
+ 'stopped', 'done', 'stale', 'failed']):
return self.__wait_for_status('/api/recordings/' + recording_id + '/status', 'rec_state', status_array)
- def create_recording(self, participant_name, recording_name = '', recording_notes = '') -> str:
+ def create_recording(self, participant_name, recording_name='', recording_notes='') -> str:
"""Create a new recording.
Returns:
@@ -1001,7 +996,7 @@ class LiveStream(ArFeatures.ArContext):
def start_recording(self, recording_id) -> bool:
"""Start recording on the Tobii interface's SD Card."""
-
+
self.__post_request('/api/recordings/' + recording_id + '/start')
return self.__wait_for_recording_status(recording_id, ['recording']) == 'recording'
@@ -1044,14 +1039,14 @@ class LiveStream(ArFeatures.ArContext):
# EVENTS AND EXPERIMENTAL VARIABLES
- def __post_recording_data(self, event_type: str, event_tag = ''):
+ def __post_recording_data(self, event_type: str, event_tag=''):
data = {'type': event_type, 'tag': event_tag}
self.__post_request('/api/events', data, wait_for_response=False)
- def send_event(self, event_type: str, event_value = None):
+ def send_event(self, event_type: str, event_value=None):
self.__post_recording_data('JsonEvent', "{'event_type': '%s','event_value': '%s'}" % (event_type, event_value))
- def send_variable(self, variable_name: str, variable_value = None):
+ def send_variable(self, variable_name: str, variable_value=None):
self.__post_recording_data(str(variable_name), str(variable_value))
# MISC
@@ -1060,7 +1055,8 @@ class LiveStream(ArFeatures.ArContext):
self.__get_request('/api/eject')
def get_battery_info(self):
- return ( "Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (float(self.get_battery_level()), float(self.get_battery_remaining_time())) )
+ return ("Battery info = [ Level: %.2f %% - Remaining Time: %.2f s ]" % (
+ float(self.get_battery_level()), float(self.get_battery_remaining_time())))
def get_battery_level(self):
return self.get_battery_status()['level']
@@ -1087,7 +1083,7 @@ class LiveStream(ArFeatures.ArContext):
return self.__get_request('/api/system/status')
def get_storage_info(self):
- return ( "Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()) )
+ return ("Storage info = [ Remaining Time: %.2f s ]" % float(self.get_battery_remaining_time()))
def get_storage_remaining_time(self):
return self.get_storage_status()['remaining_time']
@@ -1166,7 +1162,7 @@ class PostProcessing(ArFeatures.ArContext):
# Init protected attributes
self._image_parameters = {**ArFeatures.DEFAULT_ARCONTEXT_IMAGE_PARAMETERS, **DEFAULT_TOBII_IMAGE_PARAMETERS}
-
+
@property
def segment(self) -> str:
"""Path to segment folder."""
@@ -1202,7 +1198,7 @@ class PostProcessing(ArFeatures.ArContext):
# Read segment info
with open(os.path.join(self.__segment, TOBII_SEGMENT_INFO_FILENAME)) as info_file:
-
+
try:
info = json.load(info_file)
@@ -1212,10 +1208,10 @@ class PostProcessing(ArFeatures.ArContext):
raise RuntimeError(f'JSON fails to load {self.__path}/{TOBII_SEGMENT_INFO_FILENAME}')
# Constrain reading dates
- self.__end = min(self.__end, int(info["seg_length"] * 1e3)) if self.__end != None else int(info["seg_length"] * 1e3)
+ self.__end = min(self.__end, int(info["seg_length"] * 1e3)) if self.__end != None else int(
+ info["seg_length"] * 1e3)
if self.__start >= self.__end:
-
raise ValueError('Start reading timestamp is equal or greater than end reading timestamp.')
# TODO: log various info
@@ -1227,7 +1223,7 @@ class PostProcessing(ArFeatures.ArContext):
self.__stop_event = threading.Event()
# Open reading thread
- self.__reading_thread = threading.Thread(target = self.__read)
+ self.__reading_thread = threading.Thread(target=self.__read)
logging.debug('> starting reading thread...')
self.__reading_thread.start()
@@ -1236,7 +1232,7 @@ class PostProcessing(ArFeatures.ArContext):
def __exit__(self, exception_type, exception_value, exception_traceback):
logging.debug('%s.__exit__', DataFeatures.get_class_path(self))
-
+
# Close data stream
self.__stop_event.set()
@@ -1249,15 +1245,14 @@ class PostProcessing(ArFeatures.ArContext):
for video_ts, video_image, data_list in self:
if self.__stop_event.is_set():
-
break
logging.debug('> read image at %i timestamp', video_ts)
# Process camera image
self._process_camera_image(
- timestamp = video_ts,
- image = video_image)
+ timestamp=video_ts,
+ image=video_image)
height, width, _ = video_image.shape
@@ -1277,14 +1272,14 @@ class PostProcessing(ArFeatures.ArContext):
# Process timestamped gaze position
self._process_gaze_position(
- timestamp = data_ts,
- x = int(data_object.value[0] * width),
- y = int(data_object.value[1] * height) )
+ timestamp=data_ts,
+ x=int(data_object.value[0] * width),
+ y=int(data_object.value[1] * height))
else:
# Process empty gaze position
- self._process_gaze_position(timestamp = data_ts)
+ self._process_gaze_position(timestamp=data_ts)
def __iter__(self):
@@ -1304,7 +1299,6 @@ class PostProcessing(ArFeatures.ArContext):
next_data_ts, next_data_object, next_data_object_type = self.__next_data()
while next_data_ts < next_video_ts:
-
data_list.append((next_data_ts, next_data_object, next_data_object_type))
next_data_ts, next_data_object, next_data_object_type = self.__next_data()
@@ -1321,14 +1315,12 @@ class PostProcessing(ArFeatures.ArContext):
# Ignore before start timestamp
if ts < self.__start:
-
return self.__next__()
# Ignore images after end timestamp
if self.__end != None:
if ts >= self.__end:
-
raise StopIteration
# Return millisecond timestamp and image
@@ -1337,7 +1329,7 @@ class PostProcessing(ArFeatures.ArContext):
def __next_data(self):
data = json.loads(next(self.__data_file).decode('utf-8'))
-
+
# Parse data status
status = data.pop('s', -1)
@@ -1357,7 +1349,6 @@ class PostProcessing(ArFeatures.ArContext):
# Ignore data before first vts entry
if self.__vts_ts == -1:
-
return self.__next_data()
ts -= self.__vts_ts
@@ -1365,15 +1356,13 @@ class PostProcessing(ArFeatures.ArContext):
# Ignore timestamps out of the given time range
if ts < self.__start * 1e3:
-
return self.__next_data()
if ts >= self.__end * 1e3:
-
raise StopIteration
# Parse data
data_object, data_object_type = self.__parser.parse_data(status, data)
# Return millisecond timestamp, data object and type
- return ts * 1e-3, data_object, data_object_type \ No newline at end of file
+ return ts * 1e-3, data_object, data_object_type
diff --git a/src/argaze/utils/demo/recorders.py b/src/argaze/utils/demo/recorders.py
index 0debc12..679e6f7 100644
--- a/src/argaze/utils/demo/recorders.py
+++ b/src/argaze/utils/demo/recorders.py
@@ -8,7 +8,7 @@ This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
-this program. If not, see <http://www.gnu.org/licenses/>.
+this program. If not, see <https://www.gnu.org/licenses/>.
"""
__author__ = "Théo de la Hogue"