trxcon/l1sched: clarify TDMA Fn (mod 26) maps
[osmocom-bb.git] / src / target / trx_toolkit / fake_trx.py
blob0daecb403ab0bcb28e61c61ea3d562907fa477da
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # TRX Toolkit
5 # Virtual Um-interface (fake transceiver)
7 # (C) 2017-2019 by Vadim Yanitskiy <axilirator@gmail.com>
9 # All Rights Reserved
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 APP_CR_HOLDERS = [("2017-2019", "Vadim Yanitskiy <axilirator@gmail.com>")]
23 import logging as log
24 import signal
25 import argparse
26 import random
27 import select
28 import sys
29 import re
31 from app_common import ApplicationBase
32 from burst_fwd import BurstForwarder
33 from transceiver import Transceiver
34 from data_msg import Modulation
35 from clck_gen import CLCKGen
36 from trx_list import TRXList
37 from fake_pm import FakePM
38 from gsm_shared import *
40 class FakeTRX(Transceiver):
41 """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
43 == ToA / RSSI measurement simulation
45 Since this is a virtual environment, we can simulate different
46 parameters of the physical RF interface:
48 - ToA (Timing of Arrival) - measured difference between expected
49 and actual time of burst arrival in units of 1/256 of GSM symbol
50 periods. A pair of both base and threshold values defines a range
51 of ToA value randomization:
53 from (toa256_base - toa256_rand_threshold)
54 to (toa256_base + toa256_rand_threshold).
56 - RSSI (Received Signal Strength Indication) - measured "power" of
57 the signal (per burst) in dBm. A pair of both base and threshold
58 values defines a range of RSSI value randomization:
60 from (rssi_base - rssi_rand_threshold)
61 to (rssi_base + rssi_rand_threshold).
63 - C/I (Carrier-to-Interference ratio) - value in cB (centiBels),
64 computed from the training sequence of each received burst, by
65 comparing the "ideal" training sequence with the actual one.
66 A pair of both base and threshold values defines a range of
67 C/I randomization:
69 from (ci_base - ci_rand_threshold)
70 to (ci_base + ci_rand_threshold).
72 Please note that the randomization is optional and disabled by default.
74 == Timing Advance handling
76 The BTS is using ToA measurements for UL bursts in order to calculate
77 Timing Advance value, that is then indicated to a MS, which in its turn
78 shall apply this value to the transmitted signal in order to compensate
79 the delay. Basically, every burst is transmitted in advance defined by
80 the indicated Timing Advance value. The valid range is 0..63, where
81 each unit means one GSM symbol advance. The actual Timing Advance value
82 is set using SETTA control command from MS. By default, it's set to 0.
84 == Path loss simulation
86 === Burst dropping
88 In some cases, e.g. due to a weak signal or high interference, a burst
89 can be lost, i.e. not detected by the receiver. This can also be
90 simulated using FAKE_DROP command on the control interface:
92 - burst_drop_amount - the amount of DL/UL bursts
93 to be dropped (i.e. not forwarded towards the MS/BTS),
95 - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
97 == Configuration
99 All simulation parameters mentioned above can be changed at runtime
100 using the commands with prefix 'FAKE_' on the control interface.
101 All of them are handled by our custom CTRL command handler.
105 NOMINAL_TX_POWER_DEFAULT = 50 # dBm
106 TX_ATT_DEFAULT = 0 # dB
107 PATH_LOSS_DEFAULT = 110 # dB
109 TOA256_BASE_DEFAULT = 0
110 CI_BASE_DEFAULT = 90
112 # Default values for NOPE / IDLE indications
113 TOA256_NOISE_DEFAULT = 0
114 RSSI_NOISE_DEFAULT = -110
115 CI_NOISE_DEFAULT = -30
117 def __init__(self, *trx_args, **trx_kwargs):
118 Transceiver.__init__(self, *trx_args, **trx_kwargs)
120 # fake RSSI is disabled by default, only enabled through TRXC FAKE_RSSI.
121 # When disabled, RSSI is calculated based on Tx power and Rx path loss
122 self.fake_rssi_enabled = False
124 self.rf_muted = False
126 # Actual ToA, RSSI, C/I, TA values
127 self.tx_power_base = self.NOMINAL_TX_POWER_DEFAULT
128 self.tx_att_base = self.TX_ATT_DEFAULT
129 self.toa256_base = self.TOA256_BASE_DEFAULT
130 self.rssi_base = self.NOMINAL_TX_POWER_DEFAULT - self.TX_ATT_DEFAULT - self.PATH_LOSS_DEFAULT
131 self.ci_base = self.CI_BASE_DEFAULT
132 self.ta = 0
134 # ToA, RSSI, C/I randomization thresholds
135 self.toa256_rand_threshold = 0
136 self.rssi_rand_threshold = 0
137 self.ci_rand_threshold = 0
139 # Path loss simulation (burst dropping)
140 self.burst_drop_amount = 0
141 self.burst_drop_period = 1
143 @property
144 def toa256(self):
145 # Check if randomization is required
146 if self.toa256_rand_threshold == 0:
147 return self.toa256_base
149 # Generate a random ToA value in required range
150 toa256_min = self.toa256_base - self.toa256_rand_threshold
151 toa256_max = self.toa256_base + self.toa256_rand_threshold
152 return random.randint(toa256_min, toa256_max)
154 @property
155 def rssi(self):
156 # Check if randomization is required
157 if self.rssi_rand_threshold == 0:
158 return self.rssi_base
160 # Generate a random RSSI value in required range
161 rssi_min = self.rssi_base - self.rssi_rand_threshold
162 rssi_max = self.rssi_base + self.rssi_rand_threshold
163 return random.randint(rssi_min, rssi_max)
165 @property
166 def tx_power(self):
167 return self.tx_power_base - self.tx_att_base
169 @property
170 def ci(self):
171 # Check if randomization is required
172 if self.ci_rand_threshold == 0:
173 return self.ci_base
175 # Generate a random C/I value in required range
176 ci_min = self.ci_base - self.ci_rand_threshold
177 ci_max = self.ci_base + self.ci_rand_threshold
178 return random.randint(ci_min, ci_max)
180 # Path loss simulation: burst dropping
181 # Returns: True - drop, False - keep
182 def sim_burst_drop(self, msg):
183 # Check if dropping is required
184 if self.burst_drop_amount == 0:
185 return False
187 if msg.fn % self.burst_drop_period == 0:
188 log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)"
189 % (self, msg.fn, self.burst_drop_period))
190 self.burst_drop_amount -= 1
191 return True
193 return False
195 def _handle_data_msg_v1(self, src_msg, msg):
196 # C/I (Carrier-to-Interference ratio)
197 msg.ci = self.ci
199 # Pick modulation type by burst length
200 bl = len(src_msg.burst)
201 msg.mod_type = Modulation.pick_by_bl(bl)
203 # Pick TSC (Training Sequence Code) and TSC set
204 if msg.mod_type is Modulation.ModGMSK:
205 ss = TrainingSeqGMSK.pick(src_msg.burst)
206 msg.tsc = ss.tsc if ss is not None else 0
207 msg.tsc_set = ss.tsc_set if ss is not None else 0
208 else: # TODO: other modulation types (at least 8-PSK)
209 msg.tsc_set = 0
210 msg.tsc = 0
212 # Takes (partially initialized) TRXD Rx message,
213 # simulates RF path parameters (such as RSSI),
214 # and sends towards the L1
215 def handle_data_msg(self, src_trx, src_msg, msg):
216 if self.rf_muted:
217 msg.nope_ind = True
218 elif not msg.nope_ind:
219 # Path loss simulation
220 msg.nope_ind = self.sim_burst_drop(msg)
221 if msg.nope_ind:
222 # Before TRXDv1, we simply drop the message
223 if msg.ver < 0x01:
224 del msg
225 return
227 # Since TRXDv1, we should send a NOPE.ind
228 del msg.burst # burst bits are omited
229 msg.burst = None
231 # TODO: shoud we make these values configurable?
232 msg.toa256 = self.TOA256_NOISE_DEFAULT
233 msg.rssi = self.RSSI_NOISE_DEFAULT
234 msg.ci = self.CI_NOISE_DEFAULT
236 self.data_if.send_msg(msg)
237 return
239 # Complete message header
240 msg.toa256 = self.toa256
242 # Apply RSSI based on transmitter:
243 if not self.fake_rssi_enabled:
244 msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT
245 else: # Apply fake RSSI
246 msg.rssi = self.rssi
248 # Version specific fields
249 if msg.ver >= 0x01:
250 self._handle_data_msg_v1(src_msg, msg)
252 # Apply optional Timing Advance
253 if src_trx.ta != 0:
254 msg.toa256 -= src_trx.ta * 256
256 Transceiver.handle_data_msg(self, msg)
258 # Simulation specific CTRL command handler
259 def ctrl_cmd_handler(self, request):
260 # Timing Advance
261 # Syntax: CMD SETTA <TA>
262 if self.ctrl_if.verify_cmd(request, "SETTA", 1):
263 log.debug("(%s) Recv SETTA cmd" % self)
265 # Store indicated value
266 self.ta = int(request[1])
267 return 0
269 # Timing of Arrival simulation
270 # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
271 elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
272 log.debug("(%s) Recv FAKE_TOA cmd" % self)
274 # Parse and apply both base and threshold
275 self.toa256_base = int(request[1])
276 self.toa256_rand_threshold = int(request[2])
277 return 0
279 # Timing of Arrival simulation
280 # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
281 elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
282 log.debug("(%s) Recv FAKE_TOA cmd" % self)
284 # Parse and apply delta
285 self.toa256_base += int(request[1])
286 return 0
288 # RSSI simulation
289 # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
290 elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
291 log.debug("(%s) Recv FAKE_RSSI cmd" % self)
293 # Use negative threshold to disable fake_rssi if previously enabled:
294 if int(request[2]) < 0:
295 self.fake_rssi_enabled = False
296 return 0
298 # Parse and apply both base and threshold
299 self.rssi_base = int(request[1])
300 self.rssi_rand_threshold = int(request[2])
301 self.fake_rssi_enabled = True
302 return 0
304 # RSSI simulation
305 # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
306 elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
307 log.debug("(%s) Recv FAKE_RSSI cmd" % self)
309 # Parse and apply delta
310 self.rssi_base += int(request[1])
311 return 0
313 # C/I simulation
314 # Absolute form: CMD FAKE_CI <BASE> <THRESH>
315 elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 2):
316 log.debug("(%s) Recv FAKE_CI cmd" % self)
318 # Parse and apply both base and threshold
319 self.ci_base = int(request[1])
320 self.ci_rand_threshold = int(request[2])
321 return 0
323 # C/I simulation
324 # Relative form: CMD FAKE_CI <+-BASE_DELTA>
325 elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 1):
326 log.debug("(%s) Recv FAKE_CI cmd" % self)
328 # Parse and apply delta
329 self.ci_base += int(request[1])
330 return 0
332 # Path loss simulation: burst dropping
333 # Syntax: CMD FAKE_DROP <AMOUNT>
334 # Dropping pattern: fn % 1 == 0
335 elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
336 log.debug("(%s) Recv FAKE_DROP cmd" % self)
338 # Parse / validate amount of bursts
339 num = int(request[1])
340 if num < 0:
341 log.error("(%s) FAKE_DROP amount shall not "
342 "be negative" % self)
343 return -1
345 self.burst_drop_amount = num
346 self.burst_drop_period = 1
347 return 0
349 # Path loss simulation: burst dropping
350 # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
351 # Dropping pattern: fn % period == 0
352 elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
353 log.debug("(%s) Recv FAKE_DROP cmd" % self)
355 # Parse / validate amount of bursts
356 num = int(request[1])
357 if num < 0:
358 log.error("(%s) FAKE_DROP amount shall not "
359 "be negative" % self)
360 return -1
362 # Parse / validate period
363 period = int(request[2])
364 if period <= 0:
365 log.error("(%s) FAKE_DROP period shall "
366 "be greater than zero" % self)
367 return -1
369 self.burst_drop_amount = num
370 self.burst_drop_period = period
371 return 0
373 # Artificial delay for the TRXC interface
374 # Syntax: CMD FAKE_TRXC_DELAY <DELAY_MS>
375 elif self.ctrl_if.verify_cmd(request, "FAKE_TRXC_DELAY", 1):
376 log.debug("(%s) Recv FAKE_TRXC_DELAY cmd", self)
378 self.ctrl_if.rsp_delay_ms = int(request[1])
379 log.info("(%s) Artificial TRXC delay set to %d",
380 self, self.ctrl_if.rsp_delay_ms)
382 # Unhandled command
383 return None
385 class Application(ApplicationBase):
386 def __init__(self):
387 self.app_print_copyright(APP_CR_HOLDERS)
388 self.argv = self.parse_argv()
390 # Set up signal handlers
391 signal.signal(signal.SIGINT, self.sig_handler)
393 # Configure logging
394 self.app_init_logging(self.argv)
396 # List of all transceivers
397 self.trx_list = TRXList()
399 # Init shared clock generator
400 self.clck_gen = CLCKGen([])
402 # Power measurement emulation
403 # Noise: -120 .. -105
404 # BTS: -75 .. -50
405 self.fake_pm = FakePM(-120, -105, -75, -50)
406 self.fake_pm.trx_list = self.trx_list
408 # Init TRX instance for BTS
409 self.append_trx(self.argv.bts_addr, self.argv.bts_base_port, name = "BTS")
411 # Init TRX instance for BB
412 self.append_trx(self.argv.bb_addr, self.argv.bb_base_port, name = "MS", child_mgt = False)
414 # Additional transceivers (optional)
415 if self.argv.trx_list is not None:
416 for trx_def in self.argv.trx_list:
417 (name, addr, port, idx) = trx_def
418 self.append_child_trx(addr, port, name = name, child_idx = idx)
420 # Burst forwarding between transceivers
421 self.burst_fwd = BurstForwarder(self.trx_list.trx_list)
423 log.info("Init complete")
425 def append_trx(self, remote_addr, base_port, **kwargs):
426 trx = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port,
427 clck_gen = self.clck_gen, pwr_meas = self.fake_pm, **kwargs)
428 self.trx_list.add_trx(trx)
430 def append_child_trx(self, remote_addr, base_port, **kwargs):
431 child_idx = kwargs.get("child_idx", 0)
432 if child_idx == 0: # Index 0 indicates parent transceiver
433 self.append_trx(remote_addr, base_port, **kwargs)
434 return
436 # Find 'parent' transceiver for a new child
437 trx_parent = self.trx_list.find_trx(remote_addr, base_port)
438 if trx_parent is None:
439 raise IndexError("Couldn't find parent transceiver "
440 "for '%s:%d/%d'" % (remote_addr, base_port, child_idx))
442 # Allocate a new child
443 trx_child = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port,
444 pwr_meas = self.fake_pm, **kwargs)
445 self.trx_list.add_trx(trx_child)
447 # Link a new 'child' with its 'parent'
448 trx_parent.child_trx_list.add_trx(trx_child)
450 def run(self):
451 # Compose list of to be monitored sockets
452 sock_list = []
453 for trx in self.trx_list.trx_list:
454 sock_list.append(trx.ctrl_if.sock)
455 sock_list.append(trx.data_if.sock)
457 # Enter main loop
458 while True:
459 # Wait until we get any data on any socket
460 r_event, _, _ = select.select(sock_list, [], [])
462 # Iterate over all transceivers
463 for trx in self.trx_list.trx_list:
464 # DATA interface
465 if trx.data_if.sock in r_event:
466 msg = trx.recv_data_msg()
467 if msg is not None:
468 self.burst_fwd.forward_msg(trx, msg)
470 # CTRL interface
471 if trx.ctrl_if.sock in r_event:
472 trx.ctrl_if.handle_rx()
474 def shutdown(self):
475 log.info("Shutting down...")
477 # Stop clock generator
478 self.clck_gen.stop()
480 # Parses a TRX definition of the following
481 # format: REMOTE_ADDR:BIND_PORT[/TRX_NUM]
482 # e.g. [2001:0db8:85a3:0000:0000:8a2e:0370:7334]:5700/5
483 # e.g. 127.0.0.1:5700 or 127.0.0.1:5700/1
484 # e.g. foo@127.0.0.1:5700 or bar@127.0.0.1:5700/1
485 @staticmethod
486 def trx_def(val):
487 try:
488 result = re.match(r"(.+@)?(.+):([0-9]+)(/[0-9]+)?", val)
489 (name, addr, port, idx) = result.groups()
490 except:
491 raise argparse.ArgumentTypeError("Invalid TRX definition: %s" % val)
493 if idx is not None:
494 idx = int(idx[1:])
495 else:
496 idx = 0
498 # Cut '@' from TRX name
499 if name is not None:
500 name = name[:-1]
502 return (name, addr, int(port), idx)
504 def parse_argv(self):
505 parser = argparse.ArgumentParser(prog = "fake_trx",
506 description = "Virtual Um-interface (fake transceiver)")
508 # Register common logging options
509 self.app_reg_logging_options(parser)
511 trx_group = parser.add_argument_group("TRX interface")
512 trx_group.add_argument("-b", "--trx-bind-addr",
513 dest = "trx_bind_addr", type = str, default = "0.0.0.0",
514 help = "Set FakeTRX bind address (default %(default)s)")
515 trx_group.add_argument("-R", "--bts-addr",
516 dest = "bts_addr", type = str, default = "127.0.0.1",
517 help = "Set BTS remote address (default %(default)s)")
518 trx_group.add_argument("-r", "--bb-addr",
519 dest = "bb_addr", type = str, default = "127.0.0.1",
520 help = "Set BB remote address (default %(default)s)")
521 trx_group.add_argument("-P", "--bts-base-port",
522 dest = "bts_base_port", type = int, default = 5700,
523 help = "Set BTS base port number (default %(default)s)")
524 trx_group.add_argument("-p", "--bb-base-port",
525 dest = "bb_base_port", type = int, default = 6700,
526 help = "Set BB base port number (default %(default)s)")
528 mtrx_group = parser.add_argument_group("Additional transceivers")
529 mtrx_group.add_argument("--trx",
530 metavar = "REMOTE_ADDR:BASE_PORT[/TRX_NUM]",
531 dest = "trx_list", type = self.trx_def, action = "append",
532 help = "Add a transceiver for BTS or MS (e.g. 127.0.0.1:5703)")
534 argv = parser.parse_args()
536 # Make sure there is no overlap between ports
537 if argv.bts_base_port == argv.bb_base_port:
538 parser.error("BTS and BB base ports shall be different")
540 return argv
542 def sig_handler(self, signum, frame):
543 log.info("Signal %d received" % signum)
544 if signum == signal.SIGINT:
545 self.shutdown()
546 sys.exit(0)
548 if __name__ == '__main__':
549 app = Application()
550 app.run()