2 # -*- coding: utf-8 -*-
5 # Auxiliary tool to generate and send random bursts via TRX DATA
6 # interface, which may be useful for fuzzing and testing
8 # (C) 2017-2019 by Vadim Yanitskiy <axilirator@gmail.com>
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation; either version 2 of the License, or
15 # (at your option) any later version.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
22 APP_CR_HOLDERS
= [("2017-2019", "Vadim Yanitskiy <axilirator@gmail.com>")]
29 from app_common
import ApplicationBase
30 from rand_burst_gen
import RandBurstGen
31 from data_dump
import DATADumpFile
32 from data_if
import DATAInterface
33 from gsm_shared
import *
34 from data_msg
import *
36 class Application(ApplicationBase
):
38 self
.app_print_copyright(APP_CR_HOLDERS
)
39 self
.argv
= self
.parse_argv()
41 # Set up signal handlers
42 signal
.signal(signal
.SIGINT
, self
.sig_handler
)
45 self
.app_init_logging(self
.argv
)
47 # Open requested capture file
48 if self
.argv
.output_file
is not None:
49 self
.ddf
= DATADumpFile(self
.argv
.output_file
)
52 # Init DATA interface with TRX or L1
53 if self
.argv
.conn_mode
== "TRX":
54 self
.data_if
= DATAInterface(
55 self
.argv
.remote_addr
, self
.argv
.base_port
+ 2,
56 self
.argv
.bind_addr
, self
.argv
.base_port
+ 102)
57 elif self
.argv
.conn_mode
== "L1":
58 self
.data_if
= DATAInterface(
59 self
.argv
.remote_addr
, self
.argv
.base_port
+ 102,
60 self
.argv
.bind_addr
, self
.argv
.base_port
+ 2)
62 # Init random burst generator
63 burst_gen
= RandBurstGen()
65 # Init an empty DATA message
66 if self
.argv
.conn_mode
== "TRX":
67 msg
= TxMsg(ver
= self
.argv
.hdr_ver
)
68 elif self
.argv
.conn_mode
== "L1":
69 msg
= RxMsg(ver
= self
.argv
.hdr_ver
)
71 # Generate a random frame number or use provided one
72 fn_init
= msg
.rand_fn() if self
.argv
.tdma_fn
is None \
73 else self
.argv
.tdma_fn
75 # Send as much bursts as required
76 for i
in range(self
.argv
.burst_count
):
77 # Randomize the message header
80 # Increase and set frame number
81 msg
.fn
= (fn_init
+ i
) % GSM_HYPERFRAME
84 if self
.argv
.tdma_tn
is not None:
85 msg
.tn
= self
.argv
.tdma_tn
87 # Set transmit power level
88 if self
.argv
.pwr
is not None:
89 msg
.pwr
= self
.argv
.pwr
92 if self
.argv
.toa
is not None:
93 msg
.toa256
= int(float(self
.argv
.toa
) * 256.0 + 0.5)
94 elif self
.argv
.toa256
is not None:
95 msg
.toa256
= self
.argv
.toa256
98 if self
.argv
.rssi
is not None:
99 msg
.rssi
= self
.argv
.rssi
102 # TODO: Only GMSK and TSC set 0 for now
103 msg
.mod_type
= Modulation
.ModGMSK
106 if self
.argv
.tsc
is not None:
107 msg
.tsc
= self
.argv
.tsc
109 if self
.argv
.ci
is not None:
110 msg
.ci
= self
.argv
.ci
112 # Generate a random burst
113 if self
.argv
.burst_type
== "NB":
114 burst
= burst_gen
.gen_nb()
115 elif self
.argv
.burst_type
== "FB":
116 burst
= burst_gen
.gen_fb()
117 elif self
.argv
.burst_type
== "SB":
118 burst
= burst_gen
.gen_sb()
119 elif self
.argv
.burst_type
== "AB":
120 burst
= burst_gen
.gen_ab()
122 # Convert to soft-bits in case of TRX -> L1 message
123 if self
.argv
.conn_mode
== "L1":
124 burst
= msg
.ubit2sbit(burst
)
129 log
.info("Sending %d/%d %s burst %s to %s..."
130 % (i
+ 1, self
.argv
.burst_count
, self
.argv
.burst_type
,
131 msg
.desc_hdr(), self
.argv
.conn_mode
))
134 self
.data_if
.send_msg(msg
)
136 # Append a new message to the capture
137 if self
.argv
.output_file
is not None:
138 self
.ddf
.append_msg(msg
)
140 def parse_argv(self
):
141 parser
= argparse
.ArgumentParser(prog
= "burst_gen",
142 description
= "Auxiliary tool to generate and send random bursts")
144 # Register common logging options
145 self
.app_reg_logging_options(parser
)
147 trx_group
= parser
.add_argument_group("TRX interface")
148 trx_group
.add_argument("-r", "--remote-addr",
149 dest
= "remote_addr", type = str, default
= "127.0.0.1",
150 help = "Set remote address (default %(default)s)")
151 trx_group
.add_argument("-b", "--bind-addr",
152 dest
= "bind_addr", type = str, default
= "0.0.0.0",
153 help = "Set bind address (default %(default)s)")
154 trx_group
.add_argument("-p", "--base-port",
155 dest
= "base_port", type = int, default
= 6700,
156 help = "Set base port number (default %(default)s)")
157 trx_group
.add_argument("-m", "--conn-mode",
158 dest
= "conn_mode", type = str,
159 choices
= ["TRX", "L1"], default
= "TRX",
160 help = "Where to send bursts (default %(default)s)")
161 trx_group
.add_argument("-o", "--output-file",
162 dest
= "output_file", type = str,
163 help = "Write bursts to a capture file")
165 bg_group
= parser
.add_argument_group("Burst generation")
166 bg_group
.add_argument("-B", "--burst-type",
167 dest
= "burst_type", type = str,
168 choices
= ["NB", "FB", "SB", "AB"], default
= "NB",
169 help = "Random burst type (default %(default)s)")
170 bg_group
.add_argument("-c", "--burst-count", metavar
= "N",
171 dest
= "burst_count", type = int, default
= 1,
172 help = "How many bursts to send (default %(default)s)")
173 bg_group
.add_argument("-v", "--hdr-version", metavar
= "VER",
174 dest
= "hdr_ver", type = int,
175 default
= 0, choices
= Msg
.KNOWN_VERSIONS
,
176 help = "TRXD header version (default %(default)s)")
177 bg_group
.add_argument("-f", "--frame-number", metavar
= "FN",
178 dest
= "tdma_fn", type = int,
179 help = "Set TDMA frame number (default random)")
180 bg_group
.add_argument("-t", "--timeslot", metavar
= "TN",
181 dest
= "tdma_tn", type = int, choices
= range(0, 8),
182 help = "Set TDMA timeslot (default random)")
184 bg_pwr_group
= bg_group
.add_mutually_exclusive_group()
185 bg_pwr_group
.add_argument("--pwr", metavar
= "dBm",
186 dest
= "pwr", type = int,
187 help = "Set power level (default random)")
188 bg_pwr_group
.add_argument("--rssi", metavar
= "dBm",
189 dest
= "rssi", type = int,
190 help = "Set RSSI (default random)")
192 bg_toa_group
= bg_group
.add_mutually_exclusive_group()
193 bg_toa_group
.add_argument("--toa",
194 dest
= "toa", type = int,
195 help = "Set Timing of Arrival in symbols (default random)")
196 bg_toa_group
.add_argument("--toa256",
197 dest
= "toa256", type = int,
198 help = "Set Timing of Arrival in 1/256 symbol periods")
200 bg_group
.add_argument("--tsc", metavar
= "TSC",
201 dest
= "tsc", type = int, choices
= range(0, 8),
202 help = "Set Training Sequence Code (default random)")
203 bg_group
.add_argument("--ci", metavar
= "CI",
204 dest
= "ci", type = int,
205 help = "C/I: Carrier-to-Interference ratio "
206 "in centiBels (default random)")
208 return parser
.parse_args()
210 def sig_handler(self
, signum
, frame
):
211 log
.info("Signal %d received" % signum
)
212 if signum
== signal
.SIGINT
:
215 if __name__
== '__main__':