Merge pull request #10355 from geoffsim/MSP-VTX
[inav.git] / src / utils / assistnow.py
blob4968a85a9dda0f873932d37cbd9de3ef42743aa5
1 #!/usr/bin/env python3
2 # https://developer.thingstream.io/guides/location-services/assistnow-user-guide
4 # Create a file named tokens.yaml
5 # Add your assist now tokens
6 # assistnow_online: XXXXXXXXXXX
7 # assistnow_offline: XXXXXXXXXXX
9 import requests
10 import yaml
11 import sys
12 import socket
13 import selectors
14 import types
15 import serial
16 import getopt
17 import io
18 import time
20 online_token = ""
21 offline_token = ""
22 passthrough = False
23 token_file = "tokens.yaml"
24 serial_port = None
25 dry_run = False
28 # UBX frame
29 # 0xB5
30 # 0x62
31 # 0x?? class
32 # 0x?? id
33 # 0x?? len low
34 # 0x?? len high
35 # 0x?? len bytes payload
36 # 0x?? crc1
37 # 0x?? crc2
38 # Total len = 8 + len
40 hasFirstHeader = False
41 hasSecondHeader = False
42 ubxClass = False
43 ubxId = False
44 lenLow = False
45 lenHigh = False
46 payloadLen = 0
47 skipped = 0
48 currentCommand = []
50 def resetUbloxState():
51 global hasFirstHeader
52 global hasSecondHeader
53 global ubxClass
54 global ubxId
55 global lenLow
56 global lenHigh
57 global payloadLen
58 global skipped
59 global currentCommand
61 hasFirstHeader = False
62 hasSecondHeader = False
63 ubxClass = False
64 ubxId = False
65 lenLow = False
66 lenHigh = False
67 payloadLen = 0
68 skipped = 0
69 currentCommand = []
71 def splitUbloxCommands(ubxBytes):
72 ubxCommands = []
73 global hasFirstHeader
74 global hasSecondHeader
75 global ubxClass
76 global ubxId
77 global lenLow
78 global lenHigh
79 global payloadLen
80 global skipped
81 global currentCommand
83 resetUbloxState()
85 #print("%s" % (type(ubxBytes)))
86 #print("len: %i" % (len(ubxBytes)))
88 for i in range(len(ubxBytes)):
89 if not hasFirstHeader:
90 if ubxBytes[i] == 0xb5:
91 hasFirstHeader = True
92 currentCommand.append(ubxBytes[i])
93 continue
94 else:
95 resetUbloxState()
96 continue
97 if not hasSecondHeader:
98 if ubxBytes[i] == 0x62:
99 hasSecondHeader = True
100 currentCommand.append(ubxBytes[i])
101 continue
102 else:
103 resetUbloxState()
104 continue
105 if not ubxClass:
106 ubxClass = True
107 #print ("ubxClass: %02x" % (ubxBytes[i]))
108 currentCommand.append(ubxBytes[i])
109 continue
110 if not ubxId:
111 ubxId = True
112 #print ("ubxId: %02x" % (ubxBytes[i]))
113 currentCommand.append(ubxBytes[i])
114 continue
115 if not lenLow:
116 lenLow = True
117 payloadLen = int(ubxBytes[i])
118 currentCommand.append(ubxBytes[i])
119 continue
120 if not lenHigh:
121 lenHigh = True
122 payloadLen = (int(ubxBytes[i]) << 8) | payloadLen
123 #print ("Payload len %i" % (payloadLen))
124 payloadLen += 2 # add crc bytes
125 currentCommand.append(ubxBytes[i])
126 continue
127 if skipped < payloadLen - 1:
128 skipped = skipped + 1
129 currentCommand.append(ubxBytes[i])
130 continue
131 if skipped == payloadLen - 1:
132 skipped = skipped + 1
133 currentCommand.append(ubxBytes[i])
134 ubxCommands.append(currentCommand)
135 resetUbloxState()
136 continue;
138 return ubxCommands
140 def loadTokens(file):
141 global online_token
142 global offline_token
143 yaml_config = {}
145 with open(file, "r") as stream:
146 try:
147 yaml_config = yaml.safe_load(stream)
148 online_token = yaml_config['assistnow_online']
149 offline_token = yaml_config['assistnow_online']
150 except yaml.YAMLError as exc:
151 print(exc)
152 stream.close()
154 def crc8_dvb_s2( crc:int, b:int) -> int:
155 crc ^= b
156 for ii in range(8):
157 if (crc & 0x80) == 0x80:
158 crc = ((crc << 1) ^ 0xD5) & 0xFF
159 else:
160 crc = (crc << 1) & 0xFF
162 return int(crc & 0xFF)
164 def ubloxToMsp(ubxCmd):
165 ubloxLen = len(ubxCmd)
166 #msp = bytearray(b"$X<\x00d\x00\x00\x00\x8F")
167 crc = 0
168 msp = bytearray(b"$X<\x00\x50\x20")
169 crc = crc8_dvb_s2(crc, 0x00)
170 crc = crc8_dvb_s2(crc, 0x50)
171 crc = crc8_dvb_s2(crc, 0x20)
172 msp.append(ubloxLen & 0xFF)
173 crc = crc8_dvb_s2(crc, ubloxLen & 0xFF)
174 msp.append((ubloxLen >> 8) & 0xFF)
175 crc = crc8_dvb_s2(crc, (ubloxLen >> 8) & 0xFF)
177 if(len(msp) != 8):
178 print ("Wrong size")
180 for i in range(ubloxLen):
181 msp.append(ubxCmd[i])
182 crc = crc8_dvb_s2(crc, ubxCmd[i])
184 #print ("msp: %s" % (bytes(msp)))
186 msp.append(crc & 0xFF)
187 print ("CRC: %i" % (crc))
189 return bytes(msp)
192 def usage():
193 print ("assistnow.py -s /dev/ttyS0 [-t tokens.yaml]")
195 def sendUbxMessages(s, ubxMessages):
196 printed = 0
197 for cmd in ubxMessages:
198 printed += 1
199 if(len(msp) > 8):
200 print ("%i/%i ubx: %i" % (printed, len(ubxMessages), len(cmd)))
201 try:
202 s.write(cmd)
203 except serial.SerialException as err:
204 print (err)
205 print (cmd)
206 break
207 #time.sleep(0.1)
210 def sendMspMessages(s, ubxMessages):
211 printed = 0
212 for cmd in ubxMessages:
213 msp = ubloxToMsp(cmd)
214 #msp = bytearray(b"$X<\x00d\x00\x00\x00\x8F")
215 printed += 1
216 if(len(msp) > 8):
217 print ("%i/%i msp: %i ubx: %i" % (printed, len(ubxMessages), len(msp), len(cmd)))
218 try:
219 s.write(msp)
220 #s.sendall(msp)
221 except serial.SerialException as err:
222 print (err)
223 print (cmd)
224 print (msp)
225 break
226 #time.sleep(1.0)
228 try:
229 opts, args = getopt.getopt(sys.argv[1:], "s:t:pd", ["serial=", "tokens=", "passthrough", "dry-run"])
230 except getopt.GetoptError as err:
231 # print help information and exit:
232 print(err) # will print something like "option -a not recognized"
233 usage()
234 sys.exit(2)
236 for o, a in opts:
237 if o in ("-s", "--serial="):
238 serial_port = a
239 elif o in ("-t", "--tokens="):
240 token_file = a
241 elif o in ("-p", "--passthrough"):
242 passthrough = True
243 elif o in ("-d", "--dry-run"):
244 dry_run = True
245 else:
246 usage()
247 sys.exit(2)
249 #if serial_port == None and not dry_run:
250 # usage()
251 # sys.exit(2)
253 loadTokens(token_file)
256 #m8 only
257 fmt="mga"
258 gnss="gps,gal,bds,glo,qzss"
259 offline_gnss="gps,gal,bds,glo"
260 #m8 only
261 alm="gps,qzss,gal,bds,glo"
263 online_servers = ['online-live1.services.u-blox.com', 'online-live2.services.u-blox.com']
264 #m8 format = mga
265 #m7 format = aid
266 url = "https://online-live1.services.u-blox.com/GetOnlineData.ashx?token=" + online_token + ";gnss=" + gnss + ";datatype=eph,alm,aux,pos;format=" + fmt + ";"
267 online_req = requests.get(url)
269 print (online_req)
270 #print (online_req.content)
271 print (len(online_req.content))
272 online = io.BytesIO(online_req.content)
273 online_bytes = online.read()
274 online_cmds = splitUbloxCommands(online_bytes)
275 print ("AssitnowOnline: %i" % (len(online_cmds)))
277 max_payload = 0
278 max_msp_payload = 0
279 for cmd in online_cmds:
280 if len(cmd) > max_payload:
281 max_payload = len(cmd)
282 max_msp_payload = len(ubloxToMsp(cmd))
283 print ("Max ubx payload: %i" % (max_payload))
284 print ("Max msp payload: %i" % (max_msp_payload))
286 of = open("aon.ubx", "wb")
287 of.write(online_req.content)
288 of.close()
290 period = "5"
292 offline_servers = ['offline-live1.services.u-blox.com', 'offline-live2.services.u-blox.com']
293 offline_url = "https://offline-live1.services.u-blox.com/GetOfflineData.ashx?token=" + offline_token + ";gnss=" + offline_gnss + ";format=" + fmt + ";period=" + period + ";resolution=1;alm=" + alm + ";"
294 offline_req = requests.get(offline_url)
296 print(offline_req)
297 #print(offline_req.content)
298 off = io.BytesIO(offline_req.content)
299 print(len(offline_req.content))
300 offline_bytes = off.read()
301 offline_cmds = splitUbloxCommands(offline_bytes)
302 print ("AssitnowOffline: %i" % (len(offline_cmds)))
304 for cmd in offline_cmds:
305 if len(cmd) > max_payload:
306 max_payload = len(cmd)
307 print ("Max ubx payload: %i" % (max_payload))
308 max_msp_payload = len(ubloxToMsp(cmd))
309 print ("Max msp payload: %i" % (max_msp_payload))
311 of = open("aoff.ubx", "wb")
312 of.write(offline_req.content)
313 of.close()
315 print ("Connecting...")
316 s = serial.Serial(serial_port, 230400)
317 #s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
318 #s.connect(('localhost', 5760))
319 print ("Connected.")
321 if not dry_run:
322 if not passthrough:
323 print ("Offline cmds...")
324 sendMspMessages(s, offline_cmds)
325 print ("Online cmds...")
326 sendMspMessages(s, online_cmds)
327 else:
328 #serial.write('#\r\n')
329 #serial.write('gpspassthrough\r\n')
330 sendUbxMessages(s, offline_cmds)
331 sendUbxMessages(s, online_cmds)