3 # Extcap for u-blox GNSS receiver
4 # By Timo Warns <timo.warns@gmail.com>
5 # Copyright 2024 Timo Warns
7 # The extcap is based on Wireshark's extcap_example.py with
8 # Copyright 2014 Roland Knall <rknall [AT] gmail.com>
10 # Wireshark - Network traffic analyzer
11 # By Gerald Combs <gerald@wireshark.org>
12 # Copyright 1998 Gerald Combs
14 # SPDX-License-Identifier: GPL-2.0-or-later
18 Extcap for UBX messages from an u-blox GNSS receiver.
19 Tested with UBX protocol version 18.
22 import argparse
, serial
.tools
.list_ports
, serial
, struct
, sys
, time
23 from threading
import Thread
28 ################################
29 # u-blox / UBX related constants
30 ################################
32 UBLOX_DEV_DESCRIPTION
= 'u-blox GNSS receiver'
34 # UBX message structure-related definitions
39 UBX_PAYLOAD_LEN_OFFSET
= 4
41 # UBX GNSS Identifiers
44 UBX_GNSS_ID_GALILEO
= 2
45 UBX_GNSS_ID_BEIDOU
= 3
48 UBX_GNSS_ID_GLONASS
= 6
50 # UBX message class and identifiers
52 UBX_NAV_POSECEF
= [UBX_NAV
, 0x01]
53 UBX_NAV_DOP
= [UBX_NAV
, 0x04]
54 UBX_NAV_PVT
= [UBX_NAV
, 0x07]
55 UBX_NAV_ODO
= [UBX_NAV
, 0x09]
56 UBX_NAV_VELECEF
= [UBX_NAV
, 0x11]
57 UBX_NAV_TIMEGPS
= [UBX_NAV
, 0x20]
58 UBX_NAV_TIMEUTC
= [UBX_NAV
, 0x21]
59 UBX_NAV_TIMELS
= [UBX_NAV
, 0x26]
60 UBX_NAV_SBAS
= [UBX_NAV
, 0x32]
61 UBX_NAV_SAT
= [UBX_NAV
, 0x35]
62 UBX_NAV_EOE
= [UBX_NAV
, 0x61]
65 UBX_RXM_SFRBX
= [UBX_RXM
, 0x13]
66 UBX_RXM_MEASX
= [UBX_RXM
, 0x14]
67 UBX_RXM_RAWX
= [UBX_RXM
, 0x15]
70 UBX_CFG_MSG
= [UBX_CFG
, 0x01]
71 UBX_CFG_SBAS
= [UBX_CFG
, 0x16]
72 UBX_CFG_GNSS
= [UBX_CFG
, 0x3e]
75 UBX_NMEA_GGA
= [UBX_NMEA
, 0x00]
76 UBX_NMEA_GLL
= [UBX_NMEA
, 0x01]
77 UBX_NMEA_GSA
= [UBX_NMEA
, 0x02]
78 UBX_NMEA_GSV
= [UBX_NMEA
, 0x03]
79 UBX_NMEA_RMC
= [UBX_NMEA
, 0x04]
80 UBX_NMEA_VTG
= [UBX_NMEA
, 0x05]
81 UBX_NMEA_GRS
= [UBX_NMEA
, 0x06]
82 UBX_NMEA_GST
= [UBX_NMEA
, 0x07]
83 UBX_NMEA_ZDA
= [UBX_NMEA
, 0x08]
84 UBX_NMEA_GBS
= [UBX_NMEA
, 0x09]
85 UBX_NMEA_TXT
= [UBX_NMEA
, 0x41]
88 # Defines the desired rate per UBX message type.
89 # NMEA messages are disabled by setting their rate to 0.
91 # Eventually, this could be made configurable / controllable via the extcap
94 (UBX_NAV_POSECEF
, 0x01),
98 (UBX_NAV_VELECEF
, 0x01),
99 (UBX_NAV_TIMEGPS
, 0x01),
100 (UBX_NAV_TIMEUTC
, 0x01),
101 (UBX_NAV_TIMELS
, 0xff),
102 (UBX_NAV_SBAS
, 0x01),
105 (UBX_RXM_SFRBX
, 0x01),
106 (UBX_RXM_MEASX
, 0x01),
107 (UBX_RXM_RAWX
, 0x01),
108 (UBX_NMEA_GGA
, 0x00),
109 (UBX_NMEA_GLL
, 0x00),
110 (UBX_NMEA_GSA
, 0x00),
111 (UBX_NMEA_GSV
, 0x00),
112 (UBX_NMEA_RMC
, 0x00),
113 (UBX_NMEA_VTG
, 0x00),
114 (UBX_NMEA_GRS
, 0x00),
115 (UBX_NMEA_GST
, 0x00),
116 (UBX_NMEA_ZDA
, 0x00),
117 (UBX_NMEA_GBS
, 0x00),
118 (UBX_NMEA_TXT
, 0x00),
121 # Defines the desired GNSS config.
122 # Format is (GNSS ID, resTrkCh, maxTrkCh, enable, sigCfgMask).
124 # Eventually, this could be made configurable / controllable via the extcap
127 (UBX_GNSS_ID_GPS
, 8, 14, True, 0x01),
128 (UBX_GNSS_ID_GLONASS
, 0, 0, False, 0x00),
129 (UBX_GNSS_ID_SBAS
, 2, 4, True, 0x01),
130 (UBX_GNSS_ID_GALILEO
, 8, 14, True, 0x01)
133 ########################
134 # PCAP-related constants
135 ########################
138 DLT_NAME
= "DLT_USER0"
140 PCAP_MAGIC
= 0xa1b2c3d4
141 PCAP_VERSION_MAJOR
= 2
142 PCAP_VERSION_MINOR
= 4
145 PCAP_SNAPLEN
= 0xffffffff
147 ##########################
148 # extcap-related constants
149 ##########################
156 CTRL_CMD_INITIALIZED
= 0
162 CTRL_CMD_STATUSBAR_MSG
= 6
163 CTRL_CMD_INFO_MSG
= 7
164 CTRL_CMD_WARN_MSG
= 8
165 CTRL_CMD_ERROR_MSG
= 9
172 def extcap_config(option
):
173 # not options implemented for the moment
176 def extcap_version():
177 print(f
"extcap {{version={VERSION}}}{{help=https://www.wireshark.org}}{{display=u-blox UBX extcap interface}}")
179 def extcap_interfaces():
181 for i
in serial
.tools
.list_ports
.grep(UBLOX_DEV_DESCRIPTION
):
182 print(f
"interface {{value={i.device}}}{{display=u-blox UBX capture}}")
184 print(f
"control {{number={CTRL_ARG_LOGGER}}}{{type=button}}{{role=logger}}{{display=Log}}{{tooltip=Show capture log}}")
187 print(f
"dlt {{number={DLT}}}{{name={DLT_NAME}}}{{display=UBX DLT ({DLT_NAME})}}")
190 control_write(CTRL_ARG_LOGGER
, CTRL_CMD_ADD
, msg
)
203 def pcap_packet(ubx_msg
):
206 caplength
= len(ubx_msg
)
207 timestamp
= int(time
.time())
209 pcap
+= struct
.pack("!IIII", int(timestamp
), 0, caplength
, caplength
)
214 def ubxChecksum(msg
):
222 return [ck_a
& 0xff, ck_b
& 0xff]
224 def ubxMsg(ubxClassId
, payload
):
226 payloadLength
= len(payload
)
228 msg
= bytearray(UBX_HEADER_SIZE
+ payloadLength
+ UBX_CHKSUM_SIZE
)
231 msg
[0:2] = [UBX_PREAMBLE_1
, UBX_PREAMBLE_2
]
234 msg
[2:4] = ubxClassId
237 struct
.pack_into('<H', msg
, UBX_PAYLOAD_LEN_OFFSET
, payloadLength
)
240 msg
[UBX_HEADER_SIZE
:-UBX_CHKSUM_SIZE
] = payload
243 msg
[-UBX_CHKSUM_SIZE
:] = ubxChecksum(msg
[2:-UBX_CHKSUM_SIZE
])
247 def sendUbxMsg(receiver
, msg
):
248 log("Sending UBX message: " + msg
.hex() + "\n")
251 def ubxCfgMsg(ubxMsgClassId
, rate
):
252 return ubxMsg(UBX_CFG_MSG
, ubxMsgClassId
+ [rate
])
254 def ubxCfgGnss(gnssId
, resTrkCh
, maxTrkCh
, enable
, sigCfgMask
):
257 numTrkChHw
= 0x00 # read only
259 numConfigBlocks
= 0x01 # one config block only
261 payload
= bytearray(12)
264 payload
[1] = numTrkChHw
265 payload
[2] = numTrkChUse
266 payload
[3] = numConfigBlocks
269 payload
[5] = resTrkCh
270 payload
[6] = maxTrkCh
271 payload
[7] = 0 # reserved1
272 payload
[8] = 1 if enable
else 0
273 payload
[9] = 0 # flags, reserved
274 payload
[10] = sigCfgMask
275 payload
[11] = 0 # flags, reserved
277 return ubxMsg(UBX_CFG_GNSS
, payload
)
280 def control_read(fn
):
283 sp
, _
, length
, arg
, typ
= struct
.unpack('>sBHBB', header
)
285 payload
= fn
.read(length
- 2).decode('utf-8', 'replace')
288 return arg
, typ
, payload
290 return None, None, None
292 def control_read_thread(control_in
):
294 with
open(control_in
, 'rb', 0) as fn
:
296 while arg
is not None:
297 arg
, typ
, payload
= control_read(fn
)
299 if typ
== CTRL_CMD_INITIALIZED
:
302 def control_write(arg
, typ
, payload
):
305 if fn_out
is not None:
307 packet
+= struct
.pack('>sBHBB', b
'T', 0, len(payload
) + 2, arg
, typ
)
308 if sys
.version_info
[0] >= 3 and isinstance(payload
, str):
309 packet
+= payload
.encode('utf-8')
315 def extcap_capture(interface
, fifo
, control_in
, control_out
):
320 with
open(fifo
, 'wb', 0) as fh
:
322 fh
.write(pcap_header())
324 if control_out
is not None:
325 fn_out
= open(control_out
, 'wb', 0)
326 control_write(CTRL_ARG_LOGGER
, CTRL_CMD_SET
, "Log started at " + time
.strftime("%c") + "\n")
328 if control_in
is not None:
329 # Start reading thread
330 thread
= Thread(target
=control_read_thread
, args
=(control_in
,))
333 with serial
.Serial(baudrate
=9600,
334 bytesize
=serial
.EIGHTBITS
,
335 parity
=serial
.PARITY_NONE
,
337 stopbits
=serial
.STOPBITS_ONE
,
338 timeout
= 0.1) as receiver
:
341 log("Configuring GNSS constellations:\n")
342 for (gnssId
, resTrkCh
, maxTrkCh
, enable
, sigCfgMask
) in UBX_GNSS_CONFIGS
:
343 sendUbxMsg(receiver
, ubxCfgGnss(gnssId
, resTrkCh
, maxTrkCh
, enable
, sigCfgMask
))
346 log("Querying GNSS constellation config:\n")
347 sendUbxMsg(receiver
, ubxMsg(UBX_CFG_GNSS
, []))
350 log("Querying SBAS config:\n")
351 sendUbxMsg(receiver
, ubxMsg(UBX_CFG_SBAS
, []))
353 # set the message rates
354 log("Setting UBX msg rates:\n")
355 for (ubxClassId
, rate
) in UBX_MSG_RATES
:
356 sendUbxMsg(receiver
, ubxCfgMsg(ubxClassId
, rate
))
358 ubx_in_data
= bytearray()
361 ubx_in_data
+= receiver
.read(8192)
365 # Is there enough data remaining for a packet of min. possible size?
366 while i
< len(ubx_in_data
) - UBX_HEADER_SIZE
- UBX_CHKSUM_SIZE
+ 1:
368 if ubx_in_data
[i
] == UBX_PREAMBLE_1
and ubx_in_data
[i
+1] == UBX_PREAMBLE_2
:
370 (payload_len
,) = struct
.unpack("<H", ubx_in_data
[i
+ UBX_PAYLOAD_LEN_OFFSET
: i
+ UBX_PAYLOAD_LEN_OFFSET
+ 2])
372 # Is there enough data remaining for the complete message?
373 if i
+ UBX_HEADER_SIZE
+ payload_len
+ UBX_CHKSUM_SIZE
<= len(ubx_in_data
):
374 ubx_frame
= ubx_in_data
[i
: i
+ UBX_HEADER_SIZE
+ payload_len
+ UBX_CHKSUM_SIZE
]
376 log("Emitting UBX PCAP packet with header " + ubx_frame
[0:6].hex() + "\n")
378 fh
.write(pcap_packet(ubx_frame
))
380 i
= i
+ UBX_HEADER_SIZE
+ payload_len
+ UBX_CHKSUM_SIZE
388 ubx_in_data
= ubx_in_data
[i
:]
391 if fn_out
is not None:
394 def extcap_close_fifo(fifo
):
395 # This is apparently needed to workaround an issue on Windows/macOS
396 # where the message cannot be read. (really?)
397 fh
= open(fifo
, 'wb', 0)
401 print("Usage: %s <--extcap-interfaces | --extcap-dlts | --extcap-interface | --extcap-config | --capture | --extcap-capture-filter | --fifo>" % sys
.argv
[0] )
403 if __name__
== '__main__':
406 parser
= argparse
.ArgumentParser(description
="u-blox UBX extcap")
409 parser
.add_argument("--capture", help="Start the capture routine", action
="store_true" )
410 parser
.add_argument("--extcap-interfaces", help="Provide a list of interfaces to capture from", action
="store_true")
411 parser
.add_argument("--extcap-interface", help="Provide the interface to capture from")
412 parser
.add_argument("--extcap-dlts", help="Provide a list of dlts for the given interface", action
="store_true")
413 parser
.add_argument("--extcap-config", help="Provide a list of configurations for the given interface", action
="store_true")
414 parser
.add_argument("--extcap-capture-filter", help="Used together with capture to provide a capture filter")
415 parser
.add_argument("--fifo", help="Use together with capture to provide the fifo to dump data to")
416 parser
.add_argument("--extcap-control-in", help="Used to get control messages from toolbar")
417 parser
.add_argument("--extcap-control-out", help="Used to send control messages to toolbar")
418 parser
.add_argument("--extcap-version", help="Shows the version of this utility", nargs
='?', default
="")
419 parser
.add_argument("--extcap-reload-option", help="Reload elements for the given option")
422 args
, unknown
= parser
.parse_known_args()
423 except argparse
.ArgumentError
as exc
:
424 print("%s: %s" % (exc
.argument
.dest
, exc
.message
), file=sys
.stderr
)
428 if arg
== "--fifo" or arg
== "--extcap-fifo":
430 elif fifo_found
== 1:
433 extcap_close_fifo(fifo
)
436 if len(sys
.argv
) <= 1:
437 parser
.exit("No arguments given!")
439 if args
.extcap_version
and not args
.extcap_interfaces
:
443 if not args
.extcap_interfaces
and args
.extcap_interface
is None:
444 parser
.exit("An interface must be provided or the selection must be displayed")
446 if args
.extcap_interfaces
or args
.extcap_interface
is None:
451 print(f
"{len(unknown)} unknown arguments given")
453 if args
.extcap_reload_option
and len(args
.extcap_reload_option
) > 0:
454 option
= args
.extcap_reload_option
456 if args
.extcap_config
:
457 extcap_config(option
)
458 elif args
.extcap_dlts
:
461 if args
.fifo
is None:
464 extcap_capture(args
.extcap_interface
, args
.fifo
, args
.extcap_control_in
, args
.extcap_control_out
)
465 except KeyboardInterrupt:
469 sys
.exit(ERROR_USAGE
)