2 # -*- coding: utf-8 -*-
5 # Scapy-based TRX interface sniffer
7 # (C) 2018-2020 by Vadim Yanitskiy <axilirator@gmail.com>
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 APP_CR_HOLDERS
= [("2018-2020", "Vadim Yanitskiy <axilirator@gmail.com>")]
29 from app_common
import ApplicationBase
30 from data_dump
import DATADumpFile
31 from data_msg
import *
33 class Application(ApplicationBase
):
35 cnt_burst_dropped_num
= 0
45 self
.app_print_copyright(APP_CR_HOLDERS
)
46 self
.argv
= self
.parse_argv()
49 self
.app_init_logging(self
.argv
)
51 # Open requested capture file
52 if self
.argv
.output_file
is not None:
53 self
.ddf
= DATADumpFile(self
.argv
.output_file
)
56 # Compose a list of permitted UDP ports
57 rx_port_list
= ["port %d" % (port
+ 102) for port
in self
.argv
.base_ports
]
58 tx_port_list
= ["port %d" % (port
+ 2) for port
in self
.argv
.base_ports
]
60 # Arguments to be passed to scapy.all.sniff()
62 "filter" : "udp and (%s)" % " or ".join(rx_port_list
+ tx_port_list
),
63 "prn" : self
.pkt_handler
,
67 if self
.argv
.cap_file
is not None:
68 log
.info("Reading packets from '%s'..." % self
.argv
.cap_file
)
69 sniff_args
["offline"] = self
.argv
.cap_file
71 log
.info("Listening on interface '%s'..." % self
.argv
.sniff_if
)
72 sniff_args
["iface"] = self
.argv
.sniff_if
74 if self
.argv
.cap_filter
is not None:
75 log
.info("Using additional capture filter '%s'" % self
.argv
.cap_filter
)
76 sniff_args
["filter"] += " and (%s)" % self
.argv
.cap_filter
79 scapy
.all
.sniff(**sniff_args
)
81 # Scapy registers its own signal handler
84 def pkt_handler(self
, ether
):
85 # Prevent loopback packet duplication
86 if self
.argv
.sniff_if
== "lo" and self
.argv
.cap_file
is None:
87 self
.lo_trigger
= not self
.lo_trigger
88 if not self
.lo_trigger
:
91 # Extract a TRX payload
96 # Convert to bytearray
97 msg_raw
= bytearray(trx
.load
)
99 # Determine a burst direction (L1 <-> TRX)
100 tx_dir
= udp
.sport
> udp
.dport
102 # Create an empty DATA message
103 msg
= TxMsg() if tx_dir
else RxMsg()
105 # Attempt to parse the payload as a DATA message
107 msg
.parse_msg(msg_raw
)
109 except ValueError as e
:
110 desc
= msg
.desc_hdr()
112 desc
= "parsing error"
113 log
.warning("Ignoring an incorrect message (%s): %s" % (desc
, e
))
114 self
.cnt_burst_dropped_num
+= 1
117 # Poke burst pass filter
118 if not self
.burst_pass_filter(msg
):
119 self
.cnt_burst_dropped_num
+= 1
123 log
.debug("%s burst: %s", "L1 -> TRX" if tx_dir
else "TRX -> L1", msg
.desc_hdr())
125 # Poke message handler
129 rc
= self
.burst_count(msg
.fn
, msg
.tn
)
133 def burst_pass_filter(self
, msg
):
135 if self
.argv
.direction
is not None:
136 if self
.argv
.direction
== "TRX": # L1 -> TRX
137 if not isinstance(msg
, TxMsg
):
139 elif self
.argv
.direction
== "L1": # TRX -> L1
140 if not isinstance(msg
, RxMsg
):
144 if self
.argv
.pf_tn
is not None:
145 if msg
.tn
!= self
.argv
.pf_tn
:
148 # Frame number filter
149 if self
.argv
.pf_fn_lt
is not None:
150 if msg
.fn
> self
.argv
.pf_fn_lt
:
152 if self
.argv
.pf_fn_gt
is not None:
153 if msg
.fn
< self
.argv
.pf_fn_gt
:
156 # Message type specific filtering
157 if isinstance(msg
, RxMsg
):
159 if not self
.argv
.pf_nope_ind
and msg
.nope_ind
:
163 if self
.argv
.pf_rssi_min
is not None:
164 if msg
.rssi
< self
.argv
.pf_rssi_min
:
166 if self
.argv
.pf_rssi_max
is not None:
167 if msg
.rssi
> self
.argv
.pf_rssi_max
:
173 def msg_handle(self
, msg
):
174 if self
.argv
.verbose
:
177 # Append a new message to the capture
178 if self
.argv
.output_file
is not None:
179 self
.ddf
.append_msg(msg
)
181 def burst_count(self
, fn
, tn
):
182 # Update frame counter
183 if self
.cnt_frame_last
is None:
184 self
.cnt_frame_last
= fn
185 self
.cnt_frame_num
+= 1
187 if fn
!= self
.cnt_frame_last
:
188 self
.cnt_frame_num
+= 1
190 # Update burst counter
191 self
.cnt_burst_num
+= 1
193 # Stop sniffing after N bursts
194 if self
.argv
.burst_count
is not None:
195 if self
.cnt_burst_num
== self
.argv
.burst_count
:
196 log
.info("Collected required amount of bursts")
199 # Stop sniffing after N frames
200 if self
.argv
.frame_count
is not None:
201 if self
.cnt_frame_num
== self
.argv
.frame_count
:
202 log
.info("Collected required amount of frames")
208 log
.info("Shutting down...")
211 log
.info("%u bursts handled, %u dropped" \
212 % (self
.cnt_burst_num
, self
.cnt_burst_dropped_num
))
217 def parse_argv(self
):
218 parser
= argparse
.ArgumentParser(prog
= "trx_sniff",
219 description
= "Scapy-based TRX interface sniffer")
221 parser
.add_argument("-v", "--verbose",
222 dest
= "verbose", action
= "store_true",
223 help = "Print burst bits to stdout")
225 # Register common logging options
226 self
.app_reg_logging_options(parser
)
228 trx_group
= parser
.add_argument_group("TRX interface")
229 trx_group
.add_argument("-p", "--base-port", "--base-ports",
230 dest
= "base_ports", type = int, metavar
= "PORT",
231 default
= [5700, 6700], nargs
= "*",
232 help = "Set base port number (default %(default)s)")
233 trx_group
.add_argument("-o", "--output-file", metavar
= "FILE",
234 dest
= "output_file", type = str,
235 help = "Write bursts to a capture file")
237 input_group
= trx_group
.add_mutually_exclusive_group()
238 input_group
.add_argument("-i", "--sniff-interface",
239 dest
= "sniff_if", type = str, default
= "lo", metavar
= "IF",
240 help = "Set network interface (default '%(default)s')")
241 input_group
.add_argument("-r", "--capture-file",
242 dest
= "cap_file", type = str, metavar
= "FILE",
243 help = "Read packets from a PCAP file")
245 trx_group
.add_argument("-f", "--capture-filter",
246 dest
= "cap_filter", type = str, metavar
= "FILTER",
247 help = "Set additional capture filter (e.g. 'host 192.168.1.2')")
249 cnt_group
= parser
.add_argument_group("Count limitations (optional)")
250 cnt_group
.add_argument("--frame-count", metavar
= "N",
251 dest
= "frame_count", type = int,
252 help = "Stop after sniffing N frames")
253 cnt_group
.add_argument("--burst-count", metavar
= "N",
254 dest
= "burst_count", type = int,
255 help = "Stop after sniffing N bursts")
257 pf_group
= parser
.add_argument_group("Filtering (optional)")
258 pf_group
.add_argument("--direction",
259 dest
= "direction", type = str, choices
= ["TRX", "L1"],
260 help = "Burst direction")
261 pf_group
.add_argument("--timeslot", metavar
= "TN",
262 dest
= "pf_tn", type = int, choices
= range(0, 8),
263 help = "TDMA timeslot number (equal TN)")
264 pf_group
.add_argument("--frame-num-lt", metavar
= "FN",
265 dest
= "pf_fn_lt", type = int,
266 help = "TDMA frame number (lower than FN)")
267 pf_group
.add_argument("--frame-num-gt", metavar
= "FN",
268 dest
= "pf_fn_gt", type = int,
269 help = "TDMA frame number (greater than FN)")
270 pf_group
.add_argument("--no-nope-ind",
271 dest
= "pf_nope_ind", action
= "store_false",
272 help = "Ignore NOPE.ind (NOPE / IDLE indications)")
273 pf_group
.add_argument("--rssi-min", metavar
= "RSSI",
274 dest
= "pf_rssi_min", type = int,
275 help = "Minimum RSSI value (e.g. -75)")
276 pf_group
.add_argument("--rssi-max", metavar
= "RSSI",
277 dest
= "pf_rssi_max", type = int,
278 help = "Maximum RSSI value (e.g. -50)")
280 return parser
.parse_args()
282 if __name__
== '__main__':