trxcon/l1sched: clarify TDMA Fn (mod 26) maps
[osmocom-bb.git] / src / target / trx_toolkit / burst_send.py
blob27f585e0ef50adf9f808931a9f3bfe220749e970
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # TRX Toolkit
5 # Auxiliary tool to send existing bursts via TRX DATA interface
7 # (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
9 # All Rights Reserved
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
23 import logging as log
24 import signal
25 import argparse
26 import sys
28 from app_common import ApplicationBase
29 from data_dump import DATADumpFile
30 from data_if import DATAInterface
31 from data_msg import *
33 class Application(ApplicationBase):
34 def __init__(self):
35 self.app_print_copyright(APP_CR_HOLDERS)
36 self.argv = self.parse_argv()
38 # Set up signal handlers
39 signal.signal(signal.SIGINT, self.sig_handler)
41 # Configure logging
42 self.app_init_logging(self.argv)
44 # Open requested capture file
45 self.ddf = DATADumpFile(self.argv.capture_file)
47 def run(self):
48 # Init DATA interface with TRX or L1
49 if self.argv.conn_mode == "TRX":
50 self.data_if = DATAInterface(
51 self.argv.remote_addr, self.argv.base_port + 2,
52 self.argv.bind_addr, self.argv.base_port + 102)
53 elif self.argv.conn_mode == "L1":
54 self.data_if = DATAInterface(
55 self.argv.remote_addr, self.argv.base_port + 102,
56 self.argv.bind_addr, self.argv.base_port + 2)
58 # Read messages from the capture
59 messages = self.ddf.parse_all(
60 skip = self.argv.cnt_skip, count = self.argv.cnt_count)
61 if messages is False:
62 log.error("Parsing failed, nothing to send")
63 sys.exit(1)
65 for msg in messages:
66 # Pass filter
67 if not self.msg_pass_filter(msg):
68 continue
70 log.info("Sending a burst %s to %s..."
71 % (msg.desc_hdr(), self.argv.conn_mode))
73 # Send message
74 self.data_if.send_msg(msg)
76 def msg_pass_filter(self, msg):
77 # Direction filter
78 if isinstance(msg, RxMsg) and self.argv.conn_mode == "TRX":
79 return False # cannot send RxMsg to TRX
80 if isinstance(msg, TxMsg) and self.argv.conn_mode == "L1":
81 return False # cannot send TxMsg to L1
83 # Timeslot filter
84 if self.argv.pf_tn is not None:
85 if msg.tn != self.argv.pf_tn:
86 return False
88 # Frame number filter
89 if self.argv.pf_fn_lt is not None:
90 if msg.fn > self.argv.pf_fn_lt:
91 return False
92 if self.argv.pf_fn_gt is not None:
93 if msg.fn < self.argv.pf_fn_gt:
94 return False
96 # Burst passed ;)
97 return True
99 def parse_argv(self):
100 parser = argparse.ArgumentParser(prog = "burst_send",
101 description = "Auxiliary tool to send (reply) captured bursts")
103 # Register common logging options
104 self.app_reg_logging_options(parser)
106 trx_group = parser.add_argument_group("TRX interface")
107 trx_group.add_argument("-r", "--remote-addr",
108 dest = "remote_addr", type = str, default = "127.0.0.1",
109 help = "Set remote address (default %(default)s)")
110 trx_group.add_argument("-b", "--bind-addr",
111 dest = "bind_addr", type = str, default = "0.0.0.0",
112 help = "Set bind address (default %(default)s)")
113 trx_group.add_argument("-p", "--base-port",
114 dest = "base_port", type = int, default = 6700,
115 help = "Set base port number (default %(default)s)")
116 trx_group.add_argument("-m", "--conn-mode",
117 dest = "conn_mode", type = str,
118 choices = ["TRX", "L1"], default = "TRX",
119 help = "Where to send bursts (default %(default)s)")
120 trx_group.add_argument("-i", "--capture-file", metavar = "FILE",
121 dest = "capture_file", type = str, required = True,
122 help = "Capture file to read bursts from")
124 cnt_group = parser.add_argument_group("Count limitations (optional)")
125 cnt_group.add_argument("--skip", metavar = "N",
126 dest = "cnt_skip", type = int,
127 help = "Skip N messages before sending")
128 cnt_group.add_argument("--count", metavar = "N",
129 dest = "cnt_count", type = int,
130 help = "Stop after sending N messages")
132 pf_group = parser.add_argument_group("Filtering (optional)")
133 pf_group.add_argument("--timeslot", metavar = "TN",
134 dest = "pf_tn", type = int, choices = range(0, 8),
135 help = "TDMA timeslot number (equal TN)")
136 pf_group.add_argument("--frame-num-lt", metavar = "FN",
137 dest = "pf_fn_lt", type = int,
138 help = "TDMA frame number (lower than FN)")
139 pf_group.add_argument("--frame-num-gt", metavar = "FN",
140 dest = "pf_fn_gt", type = int,
141 help = "TDMA frame number (greater than FN)")
143 return parser.parse_args()
145 def sig_handler(self, signum, frame):
146 log.info("Signal %d received" % signum)
147 if signum == signal.SIGINT:
148 sys.exit(0)
150 if __name__ == '__main__':
151 app = Application()
152 app.run()