2 # https://developer.thingstream.io/guides/location-services/assistnow-user-guide
4 # Create a file named tokens.yaml
5 # Add your assist now tokens
6 # assistnow_online: XXXXXXXXXXX
7 # assistnow_offline: XXXXXXXXXXX
23 token_file
= "tokens.yaml"
35 # 0x?? len bytes payload
40 hasFirstHeader
= False
41 hasSecondHeader
= False
50 def resetUbloxState():
52 global hasSecondHeader
61 hasFirstHeader
= False
62 hasSecondHeader
= False
71 def splitUbloxCommands(ubxBytes
):
74 global hasSecondHeader
85 #print("%s" % (type(ubxBytes)))
86 #print("len: %i" % (len(ubxBytes)))
88 for i
in range(len(ubxBytes
)):
89 if not hasFirstHeader
:
90 if ubxBytes
[i
] == 0xb5:
92 currentCommand
.append(ubxBytes
[i
])
97 if not hasSecondHeader
:
98 if ubxBytes
[i
] == 0x62:
99 hasSecondHeader
= True
100 currentCommand
.append(ubxBytes
[i
])
107 #print ("ubxClass: %02x" % (ubxBytes[i]))
108 currentCommand
.append(ubxBytes
[i
])
112 #print ("ubxId: %02x" % (ubxBytes[i]))
113 currentCommand
.append(ubxBytes
[i
])
117 payloadLen
= int(ubxBytes
[i
])
118 currentCommand
.append(ubxBytes
[i
])
122 payloadLen
= (int(ubxBytes
[i
]) << 8) | payloadLen
123 #print ("Payload len %i" % (payloadLen))
124 payloadLen
+= 2 # add crc bytes
125 currentCommand
.append(ubxBytes
[i
])
127 if skipped
< payloadLen
- 1:
128 skipped
= skipped
+ 1
129 currentCommand
.append(ubxBytes
[i
])
131 if skipped
== payloadLen
- 1:
132 skipped
= skipped
+ 1
133 currentCommand
.append(ubxBytes
[i
])
134 ubxCommands
.append(currentCommand
)
140 def loadTokens(file):
145 with
open(file, "r") as stream
:
147 yaml_config
= yaml
.safe_load(stream
)
148 online_token
= yaml_config
['assistnow_online']
149 offline_token
= yaml_config
['assistnow_online']
150 except yaml
.YAMLError
as exc
:
154 def crc8_dvb_s2( crc
:int, b
:int) -> int:
157 if (crc
& 0x80) == 0x80:
158 crc
= ((crc
<< 1) ^
0xD5) & 0xFF
160 crc
= (crc
<< 1) & 0xFF
162 return int(crc
& 0xFF)
164 def ubloxToMsp(ubxCmd
):
165 ubloxLen
= len(ubxCmd
)
166 #msp = bytearray(b"$X<\x00d\x00\x00\x00\x8F")
168 msp
= bytearray(b
"$X<\x00\x50\x20")
169 crc
= crc8_dvb_s2(crc
, 0x00)
170 crc
= crc8_dvb_s2(crc
, 0x50)
171 crc
= crc8_dvb_s2(crc
, 0x20)
172 msp
.append(ubloxLen
& 0xFF)
173 crc
= crc8_dvb_s2(crc
, ubloxLen
& 0xFF)
174 msp
.append((ubloxLen
>> 8) & 0xFF)
175 crc
= crc8_dvb_s2(crc
, (ubloxLen
>> 8) & 0xFF)
180 for i
in range(ubloxLen
):
181 msp
.append(ubxCmd
[i
])
182 crc
= crc8_dvb_s2(crc
, ubxCmd
[i
])
184 #print ("msp: %s" % (bytes(msp)))
186 msp
.append(crc
& 0xFF)
187 print ("CRC: %i" % (crc
))
193 print ("assistnow.py -s /dev/ttyS0 [-t tokens.yaml]")
195 def sendUbxMessages(s
, ubxMessages
):
197 for cmd
in ubxMessages
:
200 print ("%i/%i ubx: %i" % (printed
, len(ubxMessages
), len(cmd
)))
203 except serial
.SerialException
as err
:
210 def sendMspMessages(s
, ubxMessages
):
212 for cmd
in ubxMessages
:
213 msp
= ubloxToMsp(cmd
)
214 #msp = bytearray(b"$X<\x00d\x00\x00\x00\x8F")
217 print ("%i/%i msp: %i ubx: %i" % (printed
, len(ubxMessages
), len(msp
), len(cmd
)))
221 except serial
.SerialException
as err
:
229 opts
, args
= getopt
.getopt(sys
.argv
[1:], "s:t:pd", ["serial=", "tokens=", "passthrough", "dry-run"])
230 except getopt
.GetoptError
as err
:
231 # print help information and exit:
232 print(err
) # will print something like "option -a not recognized"
237 if o
in ("-s", "--serial="):
239 elif o
in ("-t", "--tokens="):
241 elif o
in ("-p", "--passthrough"):
243 elif o
in ("-d", "--dry-run"):
249 #if serial_port == None and not dry_run:
253 loadTokens(token_file
)
258 gnss
="gps,gal,bds,glo,qzss"
259 offline_gnss
="gps,gal,bds,glo"
261 alm
="gps,qzss,gal,bds,glo"
263 online_servers
= ['online-live1.services.u-blox.com', 'online-live2.services.u-blox.com']
266 url
= "https://online-live1.services.u-blox.com/GetOnlineData.ashx?token=" + online_token
+ ";gnss=" + gnss
+ ";datatype=eph,alm,aux,pos;format=" + fmt
+ ";"
267 online_req
= requests
.get(url
)
270 #print (online_req.content)
271 print (len(online_req
.content
))
272 online
= io
.BytesIO(online_req
.content
)
273 online_bytes
= online
.read()
274 online_cmds
= splitUbloxCommands(online_bytes
)
275 print ("AssitnowOnline: %i" % (len(online_cmds
)))
279 for cmd
in online_cmds
:
280 if len(cmd
) > max_payload
:
281 max_payload
= len(cmd
)
282 max_msp_payload
= len(ubloxToMsp(cmd
))
283 print ("Max ubx payload: %i" % (max_payload
))
284 print ("Max msp payload: %i" % (max_msp_payload
))
286 of
= open("aon.ubx", "wb")
287 of
.write(online_req
.content
)
292 offline_servers
= ['offline-live1.services.u-blox.com', 'offline-live2.services.u-blox.com']
293 offline_url
= "https://offline-live1.services.u-blox.com/GetOfflineData.ashx?token=" + offline_token
+ ";gnss=" + offline_gnss
+ ";format=" + fmt
+ ";period=" + period
+ ";resolution=1;alm=" + alm
+ ";"
294 offline_req
= requests
.get(offline_url
)
297 #print(offline_req.content)
298 off
= io
.BytesIO(offline_req
.content
)
299 print(len(offline_req
.content
))
300 offline_bytes
= off
.read()
301 offline_cmds
= splitUbloxCommands(offline_bytes
)
302 print ("AssitnowOffline: %i" % (len(offline_cmds
)))
304 for cmd
in offline_cmds
:
305 if len(cmd
) > max_payload
:
306 max_payload
= len(cmd
)
307 print ("Max ubx payload: %i" % (max_payload
))
308 max_msp_payload
= len(ubloxToMsp(cmd
))
309 print ("Max msp payload: %i" % (max_msp_payload
))
311 of
= open("aoff.ubx", "wb")
312 of
.write(offline_req
.content
)
315 print ("Connecting...")
316 s
= serial
.Serial(serial_port
, 230400)
317 #s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
318 #s.connect(('localhost', 5760))
323 print ("Offline cmds...")
324 sendMspMessages(s
, offline_cmds
)
325 print ("Online cmds...")
326 sendMspMessages(s
, online_cmds
)
328 #serial.write('#\r\n')
329 #serial.write('gpspassthrough\r\n')
330 sendUbxMessages(s
, offline_cmds
)
331 sendUbxMessages(s
, online_cmds
)