treewide: remove FSF address
[osmocom-bb.git] / src / target / trx_toolkit / trx_sniff.py
blobc91e3e0b0d5726eab9b14257eb7434bf5b23325f
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # TRX Toolkit
5 # Scapy-based TRX interface sniffer
7 # (C) 2018-2020 by Vadim Yanitskiy <axilirator@gmail.com>
9 # All Rights Reserved
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 APP_CR_HOLDERS = [("2018-2020", "Vadim Yanitskiy <axilirator@gmail.com>")]
23 import logging as log
24 import argparse
25 import sys
27 import scapy.all
29 from app_common import ApplicationBase
30 from data_dump import DATADumpFile
31 from data_msg import *
33 class Application(ApplicationBase):
34 # Counters
35 cnt_burst_dropped_num = 0
36 cnt_burst_num = 0
38 cnt_frame_last = None
39 cnt_frame_num = 0
41 # Internal variables
42 lo_trigger = False
44 def __init__(self):
45 self.app_print_copyright(APP_CR_HOLDERS)
46 self.argv = self.parse_argv()
48 # Configure logging
49 self.app_init_logging(self.argv)
51 # Open requested capture file
52 if self.argv.output_file is not None:
53 self.ddf = DATADumpFile(self.argv.output_file)
55 def run(self):
56 # Compose a list of permitted UDP ports
57 rx_port_list = ["port %d" % (port + 102) for port in self.argv.base_ports]
58 tx_port_list = ["port %d" % (port + 2) for port in self.argv.base_ports]
60 # Arguments to be passed to scapy.all.sniff()
61 sniff_args = {
62 "filter" : "udp and (%s)" % " or ".join(rx_port_list + tx_port_list),
63 "prn" : self.pkt_handler,
64 "store" : 0,
67 if self.argv.cap_file is not None:
68 log.info("Reading packets from '%s'..." % self.argv.cap_file)
69 sniff_args["offline"] = self.argv.cap_file
70 else:
71 log.info("Listening on interface '%s'..." % self.argv.sniff_if)
72 sniff_args["iface"] = self.argv.sniff_if
74 if self.argv.cap_filter is not None:
75 log.info("Using additional capture filter '%s'" % self.argv.cap_filter)
76 sniff_args["filter"] += " and (%s)" % self.argv.cap_filter
78 # Start sniffing...
79 scapy.all.sniff(**sniff_args)
81 # Scapy registers its own signal handler
82 self.shutdown()
84 def pkt_handler(self, ether):
85 # Prevent loopback packet duplication
86 if self.argv.sniff_if == "lo" and self.argv.cap_file is None:
87 self.lo_trigger = not self.lo_trigger
88 if not self.lo_trigger:
89 return
91 # Extract a TRX payload
92 ip = ether.payload
93 udp = ip.payload
94 trx = udp.payload
96 # Convert to bytearray
97 msg_raw = bytearray(trx.load)
99 # Determine a burst direction (L1 <-> TRX)
100 tx_dir = udp.sport > udp.dport
102 # Create an empty DATA message
103 msg = TxMsg() if tx_dir else RxMsg()
105 # Attempt to parse the payload as a DATA message
106 try:
107 msg.parse_msg(msg_raw)
108 msg.validate()
109 except ValueError as e:
110 desc = msg.desc_hdr()
111 if desc == "":
112 desc = "parsing error"
113 log.warning("Ignoring an incorrect message (%s): %s" % (desc, e))
114 self.cnt_burst_dropped_num += 1
115 return
117 # Poke burst pass filter
118 if not self.burst_pass_filter(msg):
119 self.cnt_burst_dropped_num += 1
120 return
122 # Debug print
123 log.debug("%s burst: %s", "L1 -> TRX" if tx_dir else "TRX -> L1", msg.desc_hdr())
125 # Poke message handler
126 self.msg_handle(msg)
128 # Poke burst counter
129 rc = self.burst_count(msg.fn, msg.tn)
130 if rc is True:
131 self.shutdown()
133 def burst_pass_filter(self, msg):
134 # Direction filter
135 if self.argv.direction is not None:
136 if self.argv.direction == "TRX": # L1 -> TRX
137 if not isinstance(msg, TxMsg):
138 return False
139 elif self.argv.direction == "L1": # TRX -> L1
140 if not isinstance(msg, RxMsg):
141 return False
143 # Timeslot filter
144 if self.argv.pf_tn is not None:
145 if msg.tn != self.argv.pf_tn:
146 return False
148 # Frame number filter
149 if self.argv.pf_fn_lt is not None:
150 if msg.fn > self.argv.pf_fn_lt:
151 return False
152 if self.argv.pf_fn_gt is not None:
153 if msg.fn < self.argv.pf_fn_gt:
154 return False
156 # Message type specific filtering
157 if isinstance(msg, RxMsg):
158 # NOPE.ind filter
159 if not self.argv.pf_nope_ind and msg.nope_ind:
160 return False
162 # RSSI filter
163 if self.argv.pf_rssi_min is not None:
164 if msg.rssi < self.argv.pf_rssi_min:
165 return False
166 if self.argv.pf_rssi_max is not None:
167 if msg.rssi > self.argv.pf_rssi_max:
168 return False
170 # Burst passed ;)
171 return True
173 def msg_handle(self, msg):
174 if self.argv.verbose:
175 print(msg.burst)
177 # Append a new message to the capture
178 if self.argv.output_file is not None:
179 self.ddf.append_msg(msg)
181 def burst_count(self, fn, tn):
182 # Update frame counter
183 if self.cnt_frame_last is None:
184 self.cnt_frame_last = fn
185 self.cnt_frame_num += 1
186 else:
187 if fn != self.cnt_frame_last:
188 self.cnt_frame_num += 1
190 # Update burst counter
191 self.cnt_burst_num += 1
193 # Stop sniffing after N bursts
194 if self.argv.burst_count is not None:
195 if self.cnt_burst_num == self.argv.burst_count:
196 log.info("Collected required amount of bursts")
197 return True
199 # Stop sniffing after N frames
200 if self.argv.frame_count is not None:
201 if self.cnt_frame_num == self.argv.frame_count:
202 log.info("Collected required amount of frames")
203 return True
205 return False
207 def shutdown(self):
208 log.info("Shutting down...")
210 # Print statistics
211 log.info("%u bursts handled, %u dropped" \
212 % (self.cnt_burst_num, self.cnt_burst_dropped_num))
214 # Exit
215 sys.exit(0)
217 def parse_argv(self):
218 parser = argparse.ArgumentParser(prog = "trx_sniff",
219 description = "Scapy-based TRX interface sniffer")
221 parser.add_argument("-v", "--verbose",
222 dest = "verbose", action = "store_true",
223 help = "Print burst bits to stdout")
225 # Register common logging options
226 self.app_reg_logging_options(parser)
228 trx_group = parser.add_argument_group("TRX interface")
229 trx_group.add_argument("-p", "--base-port", "--base-ports",
230 dest = "base_ports", type = int, metavar = "PORT",
231 default = [5700, 6700], nargs = "*",
232 help = "Set base port number (default %(default)s)")
233 trx_group.add_argument("-o", "--output-file", metavar = "FILE",
234 dest = "output_file", type = str,
235 help = "Write bursts to a capture file")
237 input_group = trx_group.add_mutually_exclusive_group()
238 input_group.add_argument("-i", "--sniff-interface",
239 dest = "sniff_if", type = str, default = "lo", metavar = "IF",
240 help = "Set network interface (default '%(default)s')")
241 input_group.add_argument("-r", "--capture-file",
242 dest = "cap_file", type = str, metavar = "FILE",
243 help = "Read packets from a PCAP file")
245 trx_group.add_argument("-f", "--capture-filter",
246 dest = "cap_filter", type = str, metavar = "FILTER",
247 help = "Set additional capture filter (e.g. 'host 192.168.1.2')")
249 cnt_group = parser.add_argument_group("Count limitations (optional)")
250 cnt_group.add_argument("--frame-count", metavar = "N",
251 dest = "frame_count", type = int,
252 help = "Stop after sniffing N frames")
253 cnt_group.add_argument("--burst-count", metavar = "N",
254 dest = "burst_count", type = int,
255 help = "Stop after sniffing N bursts")
257 pf_group = parser.add_argument_group("Filtering (optional)")
258 pf_group.add_argument("--direction",
259 dest = "direction", type = str, choices = ["TRX", "L1"],
260 help = "Burst direction")
261 pf_group.add_argument("--timeslot", metavar = "TN",
262 dest = "pf_tn", type = int, choices = range(0, 8),
263 help = "TDMA timeslot number (equal TN)")
264 pf_group.add_argument("--frame-num-lt", metavar = "FN",
265 dest = "pf_fn_lt", type = int,
266 help = "TDMA frame number (lower than FN)")
267 pf_group.add_argument("--frame-num-gt", metavar = "FN",
268 dest = "pf_fn_gt", type = int,
269 help = "TDMA frame number (greater than FN)")
270 pf_group.add_argument("--no-nope-ind",
271 dest = "pf_nope_ind", action = "store_false",
272 help = "Ignore NOPE.ind (NOPE / IDLE indications)")
273 pf_group.add_argument("--rssi-min", metavar = "RSSI",
274 dest = "pf_rssi_min", type = int,
275 help = "Minimum RSSI value (e.g. -75)")
276 pf_group.add_argument("--rssi-max", metavar = "RSSI",
277 dest = "pf_rssi_max", type = int,
278 help = "Maximum RSSI value (e.g. -50)")
280 return parser.parse_args()
282 if __name__ == '__main__':
283 app = Application()
284 app.run()