aboutsummaryrefslogtreecommitdiff
path: root/src/argaze/TobiiGlassesPro2/TobiiNetworkInterface.py
blob: 157697585eda9d3efc96914d0c1b1618ce86fef9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
from typing import TypeVar, Any
import logging
import sys
import socket
import threading
import json
import time

# python2 backwards compatibility for errors
if sys.version_info[0] < 3:
	class ConnectionError(BaseException):
		pass

try:
	import netifaces
	TOBII_DISCOVERY_ALLOWED = True
except:
	TOBII_DISCOVERY_ALLOWED = False

try:
	from urllib.parse import urlparse, urlencode
	from urllib.request import urlopen, Request
	from urllib.error import URLError, HTTPError

except ImportError:
	from urlparse import urlparse
	from urllib import urlencode
	from urllib2 import urlopen, Request, HTTPError, URLError

socket.IPPROTO_IPV6 = 41

Socket = TypeVar('socket', bound="socket")
# Type definition for type annotation convenience

class TobiiNetworkInterface():
	"""Handle network connection to Tobii glasses Pro 2 device.  
	It is a major rewrite of [tobiiglassesctrl/controller.py](https://github.com/ddetommaso/TobiiGlassesPyController/blob/master/tobiiglassesctrl/controller.py)."""

	def __init__(self, address = None):

		self.udpport = 49152
		self.address = address
		self.iface_name = None

		if self.address is None:

			data, address = self.__discover_device()

			if address is None:
				raise ConnectionError("No device found using discovery process")
			else:
				try:
					self.address = data["ipv4"]
				except:
					self.address = address

		if "%" in self.address:
			if sys.platform == "win32":
				self.address,self.iface_name = self.address.split("%")
			else:
				self.iface_name = self.address.split("%")[1]

		if ':' in self.address:
			self.base_url = 'http://[%s]' % self.address
		else:
			self.base_url = 'http://' + self.address

		self.__peer = (self.address, self.udpport)

	def make_socket(self) -> Socket:
		"""Create a socket to enable network communication."""

		iptype = socket.AF_INET

		if ':' in self.__peer[0]:
			iptype = socket.AF_INET6

		res = socket.getaddrinfo(self.__peer[0], self.__peer[1], socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)
		family, socktype, proto, canonname, sockaddr = res[0]
		new_socket = socket.socket(family, socktype, proto)

		new_socket.settimeout(5.0)

		try:
			if iptype == socket.AF_INET6:
				new_socket.setsockopt(socket.SOL_SOCKET, 25, 1)

		except socket.error as e:
			if e.errno == 1:
				logging.warning("Binding to a network interface is permitted only for root users.")

		return new_socket
		
	def __discover_device(self):

		if TOBII_DISCOVERY_ALLOWED == False:
			logging.error("Device discovery is not available due to a missing dependency (netifaces)")
			exit(1)

		MULTICAST_ADDR = 'ff02::1'
		PORT = 13006

		for i in netifaces.interfaces():

			if netifaces.AF_INET6 in netifaces.ifaddresses(i).keys():

				if "%" in netifaces.ifaddresses(i)[netifaces.AF_INET6][0]['addr']:

					if_name = netifaces.ifaddresses(i)[netifaces.AF_INET6][0]['addr'].split("%")[1]
					if_idx = socket.getaddrinfo(MULTICAST_ADDR + "%" + if_name, PORT, socket.AF_INET6, socket.SOCK_DGRAM)[0][4][3]

					s6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
					s6.settimeout(30.0)
					s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, if_idx)
					s6.bind(('::', PORT))

					PORT_OUT = PORT if sys.platform == 'win32' or sys.platform == 'darwin' else PORT + 1

					try:

						# Sending discover request
						discover_json = '{"type":"discover"}'
						s6.sendto(discover_json.encode('utf-8'), (MULTICAST_ADDR, PORT_OUT))

						# Waiting for a reponse from the device ...
						data, address = s6.recvfrom(1024)
						jdata = json.loads(data.decode('utf-8'))

						addr = address[0]

						if sys.version_info.major == 3 and sys.version_info.minor >= 8:
							addr = address[0] + '%' + if_name

						return (jdata, addr)

					except:

						# No device found on interface
						pass

		return (None, None)

	def get_request(self, api_action) -> str:
		"""Send a GET request and get data back."""

		url = self.base_url + api_action
		res = urlopen(url).read()

		try:
			data = json.loads(res.decode('utf-8'))
		except json.JSONDecodeError:
			data = None

		return data

	def post_request(self, api_action, data=None, wait_for_response=True) -> str:
		"""Send a POST request and get result back."""

		url = self.base_url + api_action
		req = Request(url)
		req.add_header('Content-Type', 'application/json')
		data = json.dumps(data)

		logging.debug("Sending JSON: " + str(data))

		if wait_for_response is False:
			threading.Thread(target=urlopen, args=(req, data.encode('utf-8'),)).start()
			return None

		response = urlopen(req, data.encode('utf-8'))
		res = response.read()

		logging.debug("Response: " + str(res))

		try:
			res = json.loads(res.decode('utf-8'))
		except:
			pass

		return res

	def send_keep_alive_msg(self, socket, msg):
		"""Send a message to keep socket opened."""

		res = socket.sendto(msg.encode('utf-8'), self.__peer)
		
	def grab_data(self, socket) -> bytes:
		"""Read incoming socket data."""

		try:
			data, address = socket.recvfrom(1024)
			return data

		except TimeoutError:

			logging.error("A timeout occurred while receiving data")

	def wait_for_status(self, api_action, key, values, timeout = None) -> Any:
		"""Wait until a status matches given values."""

		url = self.base_url + api_action
		running = True

		while running:

			req = Request(url)
			req.add_header('Content-Type', 'application/json')

			try:
				
				response = urlopen(req, None, timeout = timeout)

			except URLError as e:

				logging.error(e.reason)
				return -1

			data = response.read()
			json_data = json.loads(data.decode('utf-8'))

			if json_data[key] in values:
				running = False

			time.sleep(1)

		return json_data[key]