trxcon/l1sched: clarify TDMA Fn (mod 26) maps
[osmocom-bb.git] / src / target / trx_toolkit / data_msg.py
blob898a4ae67e5b5b568ad598adc207ab93bf4f226b
1 # -*- coding: utf-8 -*-
3 # TRX Toolkit
4 # DATA interface message definitions and helpers
6 # (C) 2018-2019 by Vadim Yanitskiy <axilirator@gmail.com>
8 # All Rights Reserved
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 2 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 import random
21 import struct
22 import abc
24 from typing import List
25 from enum import Enum
26 from gsm_shared import *
28 class Modulation(Enum):
29 """ Modulation types defined in 3GPP TS 45.002 """
30 ModGMSK = (0b0000, 1 * GMSK_BURST_LEN)
31 Mod8PSK = (0b0100, 3 * GMSK_BURST_LEN)
32 ModGMSK_AB = (0b0110, 1 * GMSK_BURST_LEN)
33 # ModRFU = (0b0111, 0) # Reserved for Future Use
34 Mod16QAM = (0b1000, 4 * GMSK_BURST_LEN)
35 Mod32QAM = (0b1010, 5 * GMSK_BURST_LEN)
36 ModAQPSK = (0b1100, 2 * GMSK_BURST_LEN)
38 def __init__(self, coding, bl):
39 # Coding in TRXD header
40 self.coding = coding
41 # Burst length
42 self.bl = bl
44 @classmethod
45 def pick(self, coding):
46 for mod in list(self):
47 if mod.coding == coding:
48 return mod
49 return None
51 @classmethod
52 def pick_by_bl(self, bl):
53 for mod in list(self):
54 if mod.bl == bl:
55 return mod
56 return None
58 class Msg(abc.ABC):
59 ''' TRXD (DATA) message coding API (common part). '''
61 # NOTE: up to 16 versions can be encoded
62 CHDR_VERSION_MAX = 0b1111
63 KNOWN_VERSIONS = (0, 1)
65 def __init__(self, fn = None, tn = None, burst = None, ver = 0):
66 self.burst = burst
67 self.ver = ver
68 self.fn = fn
69 self.tn = tn
71 @property
72 def CHDR_LEN(self):
73 ''' The common header length. '''
74 return 1 + 4 # (VER + TN) + FN
76 @abc.abstractmethod
77 def gen_hdr(self):
78 ''' Generate message specific header. '''
80 @abc.abstractmethod
81 def parse_hdr(self, hdr):
82 ''' Parse message specific header. '''
84 @abc.abstractmethod
85 def gen_burst(self):
86 ''' Generate message specific burst. '''
88 @abc.abstractmethod
89 def parse_burst(self, burst):
90 ''' Parse message specific burst. '''
92 @abc.abstractmethod
93 def rand_burst(self):
94 ''' Generate a random message specific burst. '''
96 def rand_fn(self):
97 ''' Generate a random frame number. '''
98 return random.randint(0, GSM_HYPERFRAME)
100 def rand_tn(self):
101 ''' Generate a random timeslot number. '''
102 return random.randint(0, 7)
104 def rand_hdr(self):
105 ''' Randomize the message header. '''
106 self.fn = self.rand_fn()
107 self.tn = self.rand_tn()
109 def desc_hdr(self):
110 ''' Generate human-readable header description. '''
112 result = ""
114 if self.ver > 0:
115 result += ("ver=%u " % self.ver)
117 if self.fn is not None:
118 result += ("fn=%u " % self.fn)
120 if self.tn is not None:
121 result += ("tn=%u " % self.tn)
123 if self.burst is not None and len(self.burst) > 0:
124 result += ("bl=%u " % len(self.burst))
126 return result
128 @staticmethod
129 def usbit2sbit(bits: List[int]) -> List[int]:
130 ''' Convert unsigned soft-bits {254..0} to soft-bits {-127..127}. '''
131 return [-127 if (b == 0xff) else 127 - b for b in bits]
133 @staticmethod
134 def sbit2usbit(bits: List[int]) -> List[int]:
135 ''' Convert soft-bits {-127..127} to unsigned soft-bits {254..0}. '''
136 return [127 - b for b in bits]
138 @staticmethod
139 def sbit2ubit(bits: List[int]) -> List[int]:
140 ''' Convert soft-bits {-127..127} to bits {1..0}. '''
141 return [int(b < 0) for b in bits]
143 @staticmethod
144 def ubit2sbit(bits: List[int]) -> List[int]:
145 ''' Convert bits {1..0} to soft-bits {-127..127}. '''
146 return [-127 if b else 127 for b in bits]
148 def validate(self):
149 ''' Validate the message fields (throws ValueError). '''
151 if not self.ver in self.KNOWN_VERSIONS:
152 raise ValueError("Unknown TRXD header version %d" % self.ver)
154 if self.fn is None:
155 raise ValueError("TDMA frame-number is not set")
157 if self.fn < 0 or self.fn > GSM_HYPERFRAME:
158 raise ValueError("TDMA frame-number %d is out of range" % self.fn)
160 if self.tn is None:
161 raise ValueError("TDMA time-slot is not set")
163 if self.tn < 0 or self.tn > 7:
164 raise ValueError("TDMA time-slot %d is out of range" % self.tn)
166 def gen_msg(self, legacy = False):
167 ''' Generate a TRX DATA message. '''
169 # Validate all the fields
170 self.validate()
172 # Allocate an empty byte-array
173 buf = bytearray()
175 # Put version (4 bits) and TDMA TN (3 bits)
176 buf.append((self.ver << 4) | (self.tn & 0x07))
178 # Put TDMA FN (4 octets, BE)
179 buf += struct.pack(">L", self.fn)
181 # Generate message specific header part
182 hdr = self.gen_hdr()
183 buf += hdr
185 # Generate burst
186 if self.burst is not None:
187 buf += self.gen_burst()
189 # This is a rudiment from (legacy) OpenBTS transceiver,
190 # some L1 implementations still expect two dummy bytes.
191 if legacy and self.ver == 0x00:
192 buf += bytearray(2)
194 return buf
196 def parse_msg(self, msg):
197 ''' Parse a TRX DATA message. '''
199 # Make sure we have at least common header
200 if len(msg) < self.CHDR_LEN:
201 raise ValueError("Message is to short: missing common header")
203 # Parse the header version first
204 self.ver = (msg[0] >> 4)
205 if not self.ver in self.KNOWN_VERSIONS:
206 raise ValueError("Unknown TRXD header version %d" % self.ver)
208 # Parse TDMA TN and FN
209 self.tn = (msg[0] & 0x07)
210 self.fn = struct.unpack(">L", msg[1:5])[0]
212 # Make sure we have the whole header,
213 # including the version specific fields
214 if len(msg) < self.HDR_LEN:
215 raise ValueError("Message is to short: missing version specific header")
217 # Specific message part
218 self.parse_hdr(msg)
220 # Copy burst, skipping header
221 msg_burst = msg[self.HDR_LEN:]
222 if len(msg_burst) > 0:
223 self.parse_burst(msg_burst)
224 else:
225 self.burst = None
227 class TxMsg(Msg):
228 ''' Tx (L1 -> TRX) message coding API. '''
230 # Constants
231 PWR_MIN = 0x00
232 PWR_MAX = 0xff
234 # Specific message fields
235 pwr = None
237 @property
238 def HDR_LEN(self):
239 ''' Calculate header length depending on its version. '''
241 # Common header length
242 length = self.CHDR_LEN
244 # Message specific header length
245 if self.ver in (0x00, 0x01):
246 length += 1 # PWR
247 else:
248 raise IndexError("Unhandled version %u" % self.ver)
250 return length
252 def validate(self):
253 ''' Validate the message fields (throws ValueError). '''
255 # Validate common fields
256 Msg.validate(self)
258 if self.pwr is None:
259 raise ValueError("Tx Attenuation level is not set")
261 if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX:
262 raise ValueError("Tx Attenuation %d is out of range" % self.pwr)
264 # FIXME: properly handle IDLE / NOPE indications
265 if self.burst is None:
266 raise ValueError("Tx burst bits are not set")
268 # FIXME: properly handle IDLE / NOPE indications
269 if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
270 raise ValueError("Tx burst has odd length %u" % len(self.burst))
272 def rand_pwr(self, min = None, max = None):
273 ''' Generate a random power level. '''
275 if min is None:
276 min = self.PWR_MIN
278 if max is None:
279 max = self.PWR_MAX
281 return random.randint(min, max)
283 def rand_hdr(self):
284 ''' Randomize message specific header. '''
286 Msg.rand_hdr(self)
287 self.pwr = self.rand_pwr()
289 def desc_hdr(self):
290 ''' Generate human-readable header description. '''
292 # Describe the common part
293 result = Msg.desc_hdr(self)
295 if self.pwr is not None:
296 result += ("pwr=%u " % self.pwr)
298 # Strip useless whitespace and return
299 return result.strip()
301 def gen_hdr(self):
302 ''' Generate message specific header part. '''
304 # Allocate an empty byte-array
305 buf = bytearray()
307 # Put power
308 buf.append(self.pwr)
310 return buf
312 def parse_hdr(self, hdr):
313 ''' Parse message specific header part. '''
315 # Parse power level
316 self.pwr = hdr[5]
318 def gen_burst(self):
319 ''' Generate message specific burst. '''
321 # Copy burst 'as is'
322 return bytearray(self.burst)
324 def parse_burst(self, burst):
325 ''' Parse message specific burst. '''
327 length = len(burst)
329 # Distinguish between GSM and EDGE
330 if length >= EDGE_BURST_LEN:
331 self.burst = list(burst[:EDGE_BURST_LEN])
332 else:
333 self.burst = list(burst[:GMSK_BURST_LEN])
335 def rand_burst(self, length = GMSK_BURST_LEN):
336 ''' Generate a random message specific burst. '''
337 self.burst = [random.randint(0, 1) for _ in range(length)]
339 def trans(self, ver = None):
340 ''' Transform this message into RxMsg. '''
342 # Allocate a new message
343 msg = RxMsg(fn = self.fn, tn = self.tn,
344 ver = self.ver if ver is None else ver)
346 # Convert burst bits
347 if self.burst is not None:
348 msg.burst = self.ubit2sbit(self.burst)
349 else:
350 msg.nope_ind = True
352 return msg
354 class RxMsg(Msg):
355 ''' Rx (TRX -> L1) message coding API. '''
357 # rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise
358 RSSI_MIN = -120
359 RSSI_MAX = -47
361 # Min and max values of int16_t
362 TOA256_MIN = -32768
363 TOA256_MAX = 32767
365 # TSC (Training Sequence Code) range
366 TSC_RANGE = range(0, 8)
368 # C/I range (in centiBels)
369 CI_MIN = -1280
370 CI_MAX = 1280
372 # IDLE frame / nope detection indicator
373 NOPE_IND = (1 << 7)
375 # Specific message fields
376 rssi = None
377 toa256 = None
379 # Version 0x01 specific (default values)
380 mod_type = Modulation.ModGMSK
381 nope_ind = False
383 tsc_set = None
384 tsc = None
385 ci = None
387 @property
388 def HDR_LEN(self):
389 ''' Calculate header length depending on its version. '''
391 # Common header length
392 length = self.CHDR_LEN
394 # Message specific header length
395 if self.ver == 0x00:
396 # RSSI + ToA
397 length += 1 + 2
398 elif self.ver == 0x01:
399 # RSSI + ToA + TS + C/I
400 length += 1 + 2 + 1 + 2
401 else:
402 raise IndexError("Unhandled version %u" % self.ver)
404 return length
406 def _validate_burst_v0(self):
407 # Burst is mandatory
408 if self.burst is None:
409 raise ValueError("Rx burst bits are not set")
411 # ... and can be either of GSM (GMSK) or EDGE (8-PSK)
412 if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
413 raise ValueError("Rx burst has odd length %u" % len(self.burst))
415 def _validate_burst_v1(self):
416 # Burst is omitted in case of an IDLE / NOPE indication
417 if self.nope_ind and self.burst is None:
418 return
420 if self.nope_ind and self.burst is not None:
421 raise ValueError("NOPE.ind comes with burst?!?")
422 if self.burst is None:
423 raise ValueError("Rx burst bits are not set")
425 # Burst length depends on modulation type
426 if len(self.burst) != self.mod_type.bl:
427 raise ValueError("Rx burst has odd length %u" % len(self.burst))
429 def validate_burst(self):
430 ''' Validate the burst (throws ValueError). '''
432 if self.ver == 0x00:
433 self._validate_burst_v0()
434 elif self.ver >= 0x01:
435 self._validate_burst_v1()
437 def validate(self):
438 ''' Validate the message header fields (throws ValueError). '''
440 # Validate common fields
441 Msg.validate(self)
443 if self.rssi is None:
444 raise ValueError("RSSI is not set")
446 if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX:
447 raise ValueError("RSSI %d is out of range" % self.rssi)
449 if self.toa256 is None:
450 raise ValueError("ToA256 is not set")
452 if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX:
453 raise ValueError("ToA256 %d is out of range" % self.toa256)
455 # Version specific parameters (omited for NOPE.ind)
456 if self.ver >= 0x01 and not self.nope_ind:
457 if type(self.mod_type) is not Modulation:
458 raise ValueError("Unknown Rx modulation type")
460 if self.tsc_set is None:
461 raise ValueError("TSC set is not set")
463 if self.mod_type is Modulation.ModGMSK:
464 if self.tsc_set not in range(0, 4):
465 raise ValueError("TSC set %d is out of range" % self.tsc_set)
466 else:
467 if self.tsc_set not in range(0, 2):
468 raise ValueError("TSC set %d is out of range" % self.tsc_set)
470 if self.tsc is None:
471 raise ValueError("TSC is not set")
473 if self.tsc not in self.TSC_RANGE:
474 raise ValueError("TSC %d is out of range" % self.tsc)
476 # Version specific parameters (also present in NOPE.ind)
477 if self.ver >= 0x01:
478 if self.ci is None:
479 raise ValueError("C/I is not set")
481 if self.ci < self.CI_MIN or self.ci > self.CI_MAX:
482 raise ValueError("C/I %d is out of range" % self.ci)
484 self.validate_burst()
486 def rand_rssi(self, min = None, max = None):
487 ''' Generate a random RSSI value. '''
489 if min is None:
490 min = self.RSSI_MIN
492 if max is None:
493 max = self.RSSI_MAX
495 return random.randint(min, max)
497 def rand_toa256(self, min = None, max = None):
498 ''' Generate a random ToA (Time of Arrival) value. '''
500 if min is None:
501 min = self.TOA256_MIN
503 if max is None:
504 max = self.TOA256_MAX
506 return random.randint(min, max)
508 def rand_hdr(self):
509 ''' Randomize message specific header. '''
511 Msg.rand_hdr(self)
512 self.rssi = self.rand_rssi()
513 self.toa256 = self.rand_toa256()
515 if self.ver >= 0x01:
516 self.mod_type = random.choice(list(Modulation))
517 if self.mod_type is Modulation.ModGMSK:
518 self.tsc_set = random.randint(0, 3)
519 else:
520 self.tsc_set = random.randint(0, 1)
521 self.tsc = random.choice(self.TSC_RANGE)
523 # C/I: Carrier-to-Interference ratio
524 self.ci = random.randint(self.CI_MIN, self.CI_MAX)
526 def desc_hdr(self):
527 ''' Generate human-readable header description. '''
529 # Describe the common part
530 result = Msg.desc_hdr(self)
532 if self.rssi is not None:
533 result += ("rssi=%d " % self.rssi)
535 if self.toa256 is not None:
536 result += ("toa256=%d " % self.toa256)
538 if self.ver >= 0x01:
539 if not self.nope_ind:
540 if self.mod_type is not None:
541 result += ("%s " % self.mod_type)
542 if self.tsc_set is not None:
543 result += ("set=%u " % self.tsc_set)
544 if self.tsc is not None:
545 result += ("tsc=%u " % self.tsc)
546 if self.ci is not None:
547 result += ("C/I=%d cB " % self.ci)
548 else:
549 result += "(IDLE / NOPE IND) "
551 # Strip useless whitespace and return
552 return result.strip()
554 def gen_mts(self):
555 ''' Encode Modulation and Training Sequence info. '''
557 # IDLE / nope indication has no MTS info
558 if self.nope_ind:
559 return self.NOPE_IND
561 # TSC: . . . . . X X X
562 mts = self.tsc & 0b111
564 # MTS: . X X X X . . .
565 mts |= self.mod_type.coding << 3
566 mts |= self.tsc_set << 3
568 return mts
570 def parse_mts(self, mts):
571 ''' Parse Modulation and Training Sequence info. '''
573 # IDLE / nope indication has no MTS info
574 self.nope_ind = (mts & self.NOPE_IND) > 0
575 if self.nope_ind:
576 self.mod_type = None
577 self.tsc_set = None
578 self.tsc = None
579 return
581 # TSC: . . . . . X X X
582 self.tsc = mts & 0b111
584 # MTS: . X X X X . . .
585 mts = (mts >> 3) & 0b1111
586 if (mts & 0b1100) > 0:
587 # Mask: . . . . M M M S
588 self.mod_type = Modulation.pick(mts & 0b1110)
589 self.tsc_set = mts & 0b1
590 else:
591 # GMSK: . . . . 0 0 S S
592 self.mod_type = Modulation.ModGMSK
593 self.tsc_set = mts & 0b11
595 def gen_hdr(self):
596 ''' Generate message specific header part. '''
598 # Allocate an empty byte-array
599 buf = bytearray()
601 # Put RSSI
602 buf.append(-self.rssi)
604 # Encode ToA (Time of Arrival)
605 # Big endian, 2 bytes (int32_t)
606 buf += struct.pack(">h", self.toa256)
608 if self.ver >= 0x01:
609 # Modulation and Training Sequence info
610 mts = self.gen_mts()
611 buf.append(mts)
613 # C/I: Carrier-to-Interference ratio (in centiBels)
614 buf += struct.pack(">h", self.ci)
616 return buf
618 def parse_hdr(self, hdr):
619 ''' Parse message specific header part. '''
621 # Parse RSSI
622 self.rssi = -(hdr[5])
624 # Parse ToA (Time of Arrival)
625 self.toa256 = struct.unpack(">h", hdr[6:8])[0]
627 if self.ver >= 0x01:
628 # Modulation and Training Sequence info
629 self.parse_mts(hdr[8])
631 # C/I: Carrier-to-Interference ratio (in centiBels)
632 self.ci = struct.unpack(">h", hdr[9:11])[0]
634 def gen_burst(self):
635 ''' Generate message specific burst. '''
637 # Convert soft-bits to unsigned soft-bits
638 burst_usbits = self.sbit2usbit(self.burst)
640 # Encode to bytes
641 return bytearray(burst_usbits)
643 def _parse_burst_v0(self, burst):
644 ''' Parse message specific burst for header version 0. '''
646 bl = len(burst)
648 # We need to guess modulation by the length of burst
649 self.mod_type = Modulation.pick_by_bl(bl)
650 if self.mod_type is None:
651 # Some old transceivers append two dummy bytes
652 self.mod_type = Modulation.pick_by_bl(bl - 2)
654 if self.mod_type is None:
655 raise ValueError("Odd burst length %u" % bl)
657 return burst[:self.mod_type.bl]
659 def parse_burst(self, burst):
660 ''' Parse message specific burst. '''
662 burst = list(burst)
664 if self.ver == 0x00:
665 burst = self._parse_burst_v0(burst)
667 # Convert unsigned soft-bits to soft-bits
668 self.burst = self.usbit2sbit(burst)
670 def rand_burst(self, length = None):
671 ''' Generate a random message specific burst. '''
673 if length is None:
674 length = self.mod_type.bl
676 self.burst = [random.randint(-127, 127) for _ in range(length)]
678 def trans(self, ver = None):
679 ''' Transform this message to TxMsg. '''
681 # Allocate a new message
682 msg = TxMsg(fn = self.fn, tn = self.tn,
683 ver = self.ver if ver is None else ver)
685 # Convert burst bits
686 if self.burst is not None:
687 msg.burst = self.sbit2ubit(self.burst)
689 return msg