2 # -*- coding: utf-8 -*-
5 # Auxiliary tool to send existing bursts via TRX DATA interface
7 # (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 APP_CR_HOLDERS
= [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
28 from app_common
import ApplicationBase
29 from data_dump
import DATADumpFile
30 from data_if
import DATAInterface
31 from data_msg
import *
33 class Application(ApplicationBase
):
35 self
.app_print_copyright(APP_CR_HOLDERS
)
36 self
.argv
= self
.parse_argv()
38 # Set up signal handlers
39 signal
.signal(signal
.SIGINT
, self
.sig_handler
)
42 self
.app_init_logging(self
.argv
)
44 # Open requested capture file
45 self
.ddf
= DATADumpFile(self
.argv
.capture_file
)
48 # Init DATA interface with TRX or L1
49 if self
.argv
.conn_mode
== "TRX":
50 self
.data_if
= DATAInterface(
51 self
.argv
.remote_addr
, self
.argv
.base_port
+ 2,
52 self
.argv
.bind_addr
, self
.argv
.base_port
+ 102)
53 elif self
.argv
.conn_mode
== "L1":
54 self
.data_if
= DATAInterface(
55 self
.argv
.remote_addr
, self
.argv
.base_port
+ 102,
56 self
.argv
.bind_addr
, self
.argv
.base_port
+ 2)
58 # Read messages from the capture
59 messages
= self
.ddf
.parse_all(
60 skip
= self
.argv
.cnt_skip
, count
= self
.argv
.cnt_count
)
62 log
.error("Parsing failed, nothing to send")
67 if not self
.msg_pass_filter(msg
):
70 log
.info("Sending a burst %s to %s..."
71 % (msg
.desc_hdr(), self
.argv
.conn_mode
))
74 self
.data_if
.send_msg(msg
)
76 def msg_pass_filter(self
, msg
):
78 if isinstance(msg
, RxMsg
) and self
.argv
.conn_mode
== "TRX":
79 return False # cannot send RxMsg to TRX
80 if isinstance(msg
, TxMsg
) and self
.argv
.conn_mode
== "L1":
81 return False # cannot send TxMsg to L1
84 if self
.argv
.pf_tn
is not None:
85 if msg
.tn
!= self
.argv
.pf_tn
:
89 if self
.argv
.pf_fn_lt
is not None:
90 if msg
.fn
> self
.argv
.pf_fn_lt
:
92 if self
.argv
.pf_fn_gt
is not None:
93 if msg
.fn
< self
.argv
.pf_fn_gt
:
100 parser
= argparse
.ArgumentParser(prog
= "burst_send",
101 description
= "Auxiliary tool to send (reply) captured bursts")
103 # Register common logging options
104 self
.app_reg_logging_options(parser
)
106 trx_group
= parser
.add_argument_group("TRX interface")
107 trx_group
.add_argument("-r", "--remote-addr",
108 dest
= "remote_addr", type = str, default
= "127.0.0.1",
109 help = "Set remote address (default %(default)s)")
110 trx_group
.add_argument("-b", "--bind-addr",
111 dest
= "bind_addr", type = str, default
= "0.0.0.0",
112 help = "Set bind address (default %(default)s)")
113 trx_group
.add_argument("-p", "--base-port",
114 dest
= "base_port", type = int, default
= 6700,
115 help = "Set base port number (default %(default)s)")
116 trx_group
.add_argument("-m", "--conn-mode",
117 dest
= "conn_mode", type = str,
118 choices
= ["TRX", "L1"], default
= "TRX",
119 help = "Where to send bursts (default %(default)s)")
120 trx_group
.add_argument("-i", "--capture-file", metavar
= "FILE",
121 dest
= "capture_file", type = str, required
= True,
122 help = "Capture file to read bursts from")
124 cnt_group
= parser
.add_argument_group("Count limitations (optional)")
125 cnt_group
.add_argument("--skip", metavar
= "N",
126 dest
= "cnt_skip", type = int,
127 help = "Skip N messages before sending")
128 cnt_group
.add_argument("--count", metavar
= "N",
129 dest
= "cnt_count", type = int,
130 help = "Stop after sending N messages")
132 pf_group
= parser
.add_argument_group("Filtering (optional)")
133 pf_group
.add_argument("--timeslot", metavar
= "TN",
134 dest
= "pf_tn", type = int, choices
= range(0, 8),
135 help = "TDMA timeslot number (equal TN)")
136 pf_group
.add_argument("--frame-num-lt", metavar
= "FN",
137 dest
= "pf_fn_lt", type = int,
138 help = "TDMA frame number (lower than FN)")
139 pf_group
.add_argument("--frame-num-gt", metavar
= "FN",
140 dest
= "pf_fn_gt", type = int,
141 help = "TDMA frame number (greater than FN)")
143 return parser
.parse_args()
145 def sig_handler(self
, signum
, frame
):
146 log
.info("Signal %d received" % signum
)
147 if signum
== signal
.SIGINT
:
150 if __name__
== '__main__':