1 # -*- coding: utf-8 -*-
4 # DATA interface message definitions and helpers
6 # (C) 2018-2019 by Vadim Yanitskiy <axilirator@gmail.com>
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 2 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
24 from typing
import List
26 from gsm_shared
import *
28 class Modulation(Enum
):
29 """ Modulation types defined in 3GPP TS 45.002 """
30 ModGMSK
= (0b0000, 1 * GMSK_BURST_LEN
)
31 Mod8PSK
= (0b0100, 3 * GMSK_BURST_LEN
)
32 ModGMSK_AB
= (0b0110, 1 * GMSK_BURST_LEN
)
33 # ModRFU = (0b0111, 0) # Reserved for Future Use
34 Mod16QAM
= (0b1000, 4 * GMSK_BURST_LEN
)
35 Mod32QAM
= (0b1010, 5 * GMSK_BURST_LEN
)
36 ModAQPSK
= (0b1100, 2 * GMSK_BURST_LEN
)
38 def __init__(self
, coding
, bl
):
39 # Coding in TRXD header
45 def pick(self
, coding
):
46 for mod
in list(self
):
47 if mod
.coding
== coding
:
52 def pick_by_bl(self
, bl
):
53 for mod
in list(self
):
59 ''' TRXD (DATA) message coding API (common part). '''
61 # NOTE: up to 16 versions can be encoded
62 CHDR_VERSION_MAX
= 0b1111
63 KNOWN_VERSIONS
= (0, 1)
65 def __init__(self
, fn
= None, tn
= None, burst
= None, ver
= 0):
73 ''' The common header length. '''
74 return 1 + 4 # (VER + TN) + FN
78 ''' Generate message specific header. '''
81 def parse_hdr(self
, hdr
):
82 ''' Parse message specific header. '''
86 ''' Generate message specific burst. '''
89 def parse_burst(self
, burst
):
90 ''' Parse message specific burst. '''
94 ''' Generate a random message specific burst. '''
97 ''' Generate a random frame number. '''
98 return random
.randint(0, GSM_HYPERFRAME
)
101 ''' Generate a random timeslot number. '''
102 return random
.randint(0, 7)
105 ''' Randomize the message header. '''
106 self
.fn
= self
.rand_fn()
107 self
.tn
= self
.rand_tn()
110 ''' Generate human-readable header description. '''
115 result
+= ("ver=%u " % self
.ver
)
117 if self
.fn
is not None:
118 result
+= ("fn=%u " % self
.fn
)
120 if self
.tn
is not None:
121 result
+= ("tn=%u " % self
.tn
)
123 if self
.burst
is not None and len(self
.burst
) > 0:
124 result
+= ("bl=%u " % len(self
.burst
))
129 def usbit2sbit(bits
: List
[int]) -> List
[int]:
130 ''' Convert unsigned soft-bits {254..0} to soft-bits {-127..127}. '''
131 return [-127 if (b
== 0xff) else 127 - b
for b
in bits
]
134 def sbit2usbit(bits
: List
[int]) -> List
[int]:
135 ''' Convert soft-bits {-127..127} to unsigned soft-bits {254..0}. '''
136 return [127 - b
for b
in bits
]
139 def sbit2ubit(bits
: List
[int]) -> List
[int]:
140 ''' Convert soft-bits {-127..127} to bits {1..0}. '''
141 return [int(b
< 0) for b
in bits
]
144 def ubit2sbit(bits
: List
[int]) -> List
[int]:
145 ''' Convert bits {1..0} to soft-bits {-127..127}. '''
146 return [-127 if b
else 127 for b
in bits
]
149 ''' Validate the message fields (throws ValueError). '''
151 if not self
.ver
in self
.KNOWN_VERSIONS
:
152 raise ValueError("Unknown TRXD header version %d" % self
.ver
)
155 raise ValueError("TDMA frame-number is not set")
157 if self
.fn
< 0 or self
.fn
> GSM_HYPERFRAME
:
158 raise ValueError("TDMA frame-number %d is out of range" % self
.fn
)
161 raise ValueError("TDMA time-slot is not set")
163 if self
.tn
< 0 or self
.tn
> 7:
164 raise ValueError("TDMA time-slot %d is out of range" % self
.tn
)
166 def gen_msg(self
, legacy
= False):
167 ''' Generate a TRX DATA message. '''
169 # Validate all the fields
172 # Allocate an empty byte-array
175 # Put version (4 bits) and TDMA TN (3 bits)
176 buf
.append((self
.ver
<< 4) |
(self
.tn
& 0x07))
178 # Put TDMA FN (4 octets, BE)
179 buf
+= struct
.pack(">L", self
.fn
)
181 # Generate message specific header part
186 if self
.burst
is not None:
187 buf
+= self
.gen_burst()
189 # This is a rudiment from (legacy) OpenBTS transceiver,
190 # some L1 implementations still expect two dummy bytes.
191 if legacy
and self
.ver
== 0x00:
196 def parse_msg(self
, msg
):
197 ''' Parse a TRX DATA message. '''
199 # Make sure we have at least common header
200 if len(msg
) < self
.CHDR_LEN
:
201 raise ValueError("Message is to short: missing common header")
203 # Parse the header version first
204 self
.ver
= (msg
[0] >> 4)
205 if not self
.ver
in self
.KNOWN_VERSIONS
:
206 raise ValueError("Unknown TRXD header version %d" % self
.ver
)
208 # Parse TDMA TN and FN
209 self
.tn
= (msg
[0] & 0x07)
210 self
.fn
= struct
.unpack(">L", msg
[1:5])[0]
212 # Make sure we have the whole header,
213 # including the version specific fields
214 if len(msg
) < self
.HDR_LEN
:
215 raise ValueError("Message is to short: missing version specific header")
217 # Specific message part
220 # Copy burst, skipping header
221 msg_burst
= msg
[self
.HDR_LEN
:]
222 if len(msg_burst
) > 0:
223 self
.parse_burst(msg_burst
)
228 ''' Tx (L1 -> TRX) message coding API. '''
234 # Specific message fields
239 ''' Calculate header length depending on its version. '''
241 # Common header length
242 length
= self
.CHDR_LEN
244 # Message specific header length
245 if self
.ver
in (0x00, 0x01):
248 raise IndexError("Unhandled version %u" % self
.ver
)
253 ''' Validate the message fields (throws ValueError). '''
255 # Validate common fields
259 raise ValueError("Tx Attenuation level is not set")
261 if self
.pwr
< self
.PWR_MIN
or self
.pwr
> self
.PWR_MAX
:
262 raise ValueError("Tx Attenuation %d is out of range" % self
.pwr
)
264 # FIXME: properly handle IDLE / NOPE indications
265 if self
.burst
is None:
266 raise ValueError("Tx burst bits are not set")
268 # FIXME: properly handle IDLE / NOPE indications
269 if len(self
.burst
) not in (GMSK_BURST_LEN
, EDGE_BURST_LEN
):
270 raise ValueError("Tx burst has odd length %u" % len(self
.burst
))
272 def rand_pwr(self
, min = None, max = None):
273 ''' Generate a random power level. '''
281 return random
.randint(min, max)
284 ''' Randomize message specific header. '''
287 self
.pwr
= self
.rand_pwr()
290 ''' Generate human-readable header description. '''
292 # Describe the common part
293 result
= Msg
.desc_hdr(self
)
295 if self
.pwr
is not None:
296 result
+= ("pwr=%u " % self
.pwr
)
298 # Strip useless whitespace and return
299 return result
.strip()
302 ''' Generate message specific header part. '''
304 # Allocate an empty byte-array
312 def parse_hdr(self
, hdr
):
313 ''' Parse message specific header part. '''
319 ''' Generate message specific burst. '''
322 return bytearray(self
.burst
)
324 def parse_burst(self
, burst
):
325 ''' Parse message specific burst. '''
329 # Distinguish between GSM and EDGE
330 if length
>= EDGE_BURST_LEN
:
331 self
.burst
= list(burst
[:EDGE_BURST_LEN
])
333 self
.burst
= list(burst
[:GMSK_BURST_LEN
])
335 def rand_burst(self
, length
= GMSK_BURST_LEN
):
336 ''' Generate a random message specific burst. '''
337 self
.burst
= [random
.randint(0, 1) for _
in range(length
)]
339 def trans(self
, ver
= None):
340 ''' Transform this message into RxMsg. '''
342 # Allocate a new message
343 msg
= RxMsg(fn
= self
.fn
, tn
= self
.tn
,
344 ver
= self
.ver
if ver
is None else ver
)
347 if self
.burst
is not None:
348 msg
.burst
= self
.ubit2sbit(self
.burst
)
355 ''' Rx (TRX -> L1) message coding API. '''
357 # rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise
361 # Min and max values of int16_t
365 # TSC (Training Sequence Code) range
366 TSC_RANGE
= range(0, 8)
368 # C/I range (in centiBels)
372 # IDLE frame / nope detection indicator
375 # Specific message fields
379 # Version 0x01 specific (default values)
380 mod_type
= Modulation
.ModGMSK
389 ''' Calculate header length depending on its version. '''
391 # Common header length
392 length
= self
.CHDR_LEN
394 # Message specific header length
398 elif self
.ver
== 0x01:
399 # RSSI + ToA + TS + C/I
400 length
+= 1 + 2 + 1 + 2
402 raise IndexError("Unhandled version %u" % self
.ver
)
406 def _validate_burst_v0(self
):
408 if self
.burst
is None:
409 raise ValueError("Rx burst bits are not set")
411 # ... and can be either of GSM (GMSK) or EDGE (8-PSK)
412 if len(self
.burst
) not in (GMSK_BURST_LEN
, EDGE_BURST_LEN
):
413 raise ValueError("Rx burst has odd length %u" % len(self
.burst
))
415 def _validate_burst_v1(self
):
416 # Burst is omitted in case of an IDLE / NOPE indication
417 if self
.nope_ind
and self
.burst
is None:
420 if self
.nope_ind
and self
.burst
is not None:
421 raise ValueError("NOPE.ind comes with burst?!?")
422 if self
.burst
is None:
423 raise ValueError("Rx burst bits are not set")
425 # Burst length depends on modulation type
426 if len(self
.burst
) != self
.mod_type
.bl
:
427 raise ValueError("Rx burst has odd length %u" % len(self
.burst
))
429 def validate_burst(self
):
430 ''' Validate the burst (throws ValueError). '''
433 self
._validate
_burst
_v
0()
434 elif self
.ver
>= 0x01:
435 self
._validate
_burst
_v
1()
438 ''' Validate the message header fields (throws ValueError). '''
440 # Validate common fields
443 if self
.rssi
is None:
444 raise ValueError("RSSI is not set")
446 if self
.rssi
< self
.RSSI_MIN
or self
.rssi
> self
.RSSI_MAX
:
447 raise ValueError("RSSI %d is out of range" % self
.rssi
)
449 if self
.toa256
is None:
450 raise ValueError("ToA256 is not set")
452 if self
.toa256
< self
.TOA256_MIN
or self
.toa256
> self
.TOA256_MAX
:
453 raise ValueError("ToA256 %d is out of range" % self
.toa256
)
455 # Version specific parameters (omited for NOPE.ind)
456 if self
.ver
>= 0x01 and not self
.nope_ind
:
457 if type(self
.mod_type
) is not Modulation
:
458 raise ValueError("Unknown Rx modulation type")
460 if self
.tsc_set
is None:
461 raise ValueError("TSC set is not set")
463 if self
.mod_type
is Modulation
.ModGMSK
:
464 if self
.tsc_set
not in range(0, 4):
465 raise ValueError("TSC set %d is out of range" % self
.tsc_set
)
467 if self
.tsc_set
not in range(0, 2):
468 raise ValueError("TSC set %d is out of range" % self
.tsc_set
)
471 raise ValueError("TSC is not set")
473 if self
.tsc
not in self
.TSC_RANGE
:
474 raise ValueError("TSC %d is out of range" % self
.tsc
)
476 # Version specific parameters (also present in NOPE.ind)
479 raise ValueError("C/I is not set")
481 if self
.ci
< self
.CI_MIN
or self
.ci
> self
.CI_MAX
:
482 raise ValueError("C/I %d is out of range" % self
.ci
)
484 self
.validate_burst()
486 def rand_rssi(self
, min = None, max = None):
487 ''' Generate a random RSSI value. '''
495 return random
.randint(min, max)
497 def rand_toa256(self
, min = None, max = None):
498 ''' Generate a random ToA (Time of Arrival) value. '''
501 min = self
.TOA256_MIN
504 max = self
.TOA256_MAX
506 return random
.randint(min, max)
509 ''' Randomize message specific header. '''
512 self
.rssi
= self
.rand_rssi()
513 self
.toa256
= self
.rand_toa256()
516 self
.mod_type
= random
.choice(list(Modulation
))
517 if self
.mod_type
is Modulation
.ModGMSK
:
518 self
.tsc_set
= random
.randint(0, 3)
520 self
.tsc_set
= random
.randint(0, 1)
521 self
.tsc
= random
.choice(self
.TSC_RANGE
)
523 # C/I: Carrier-to-Interference ratio
524 self
.ci
= random
.randint(self
.CI_MIN
, self
.CI_MAX
)
527 ''' Generate human-readable header description. '''
529 # Describe the common part
530 result
= Msg
.desc_hdr(self
)
532 if self
.rssi
is not None:
533 result
+= ("rssi=%d " % self
.rssi
)
535 if self
.toa256
is not None:
536 result
+= ("toa256=%d " % self
.toa256
)
539 if not self
.nope_ind
:
540 if self
.mod_type
is not None:
541 result
+= ("%s " % self
.mod_type
)
542 if self
.tsc_set
is not None:
543 result
+= ("set=%u " % self
.tsc_set
)
544 if self
.tsc
is not None:
545 result
+= ("tsc=%u " % self
.tsc
)
546 if self
.ci
is not None:
547 result
+= ("C/I=%d cB " % self
.ci
)
549 result
+= "(IDLE / NOPE IND) "
551 # Strip useless whitespace and return
552 return result
.strip()
555 ''' Encode Modulation and Training Sequence info. '''
557 # IDLE / nope indication has no MTS info
561 # TSC: . . . . . X X X
562 mts
= self
.tsc
& 0b111
564 # MTS: . X X X X . . .
565 mts |
= self
.mod_type
.coding
<< 3
566 mts |
= self
.tsc_set
<< 3
570 def parse_mts(self
, mts
):
571 ''' Parse Modulation and Training Sequence info. '''
573 # IDLE / nope indication has no MTS info
574 self
.nope_ind
= (mts
& self
.NOPE_IND
) > 0
581 # TSC: . . . . . X X X
582 self
.tsc
= mts
& 0b111
584 # MTS: . X X X X . . .
585 mts
= (mts
>> 3) & 0b1111
586 if (mts
& 0b1100) > 0:
587 # Mask: . . . . M M M S
588 self
.mod_type
= Modulation
.pick(mts
& 0b1110)
589 self
.tsc_set
= mts
& 0b1
591 # GMSK: . . . . 0 0 S S
592 self
.mod_type
= Modulation
.ModGMSK
593 self
.tsc_set
= mts
& 0b11
596 ''' Generate message specific header part. '''
598 # Allocate an empty byte-array
602 buf
.append(-self
.rssi
)
604 # Encode ToA (Time of Arrival)
605 # Big endian, 2 bytes (int32_t)
606 buf
+= struct
.pack(">h", self
.toa256
)
609 # Modulation and Training Sequence info
613 # C/I: Carrier-to-Interference ratio (in centiBels)
614 buf
+= struct
.pack(">h", self
.ci
)
618 def parse_hdr(self
, hdr
):
619 ''' Parse message specific header part. '''
622 self
.rssi
= -(hdr
[5])
624 # Parse ToA (Time of Arrival)
625 self
.toa256
= struct
.unpack(">h", hdr
[6:8])[0]
628 # Modulation and Training Sequence info
629 self
.parse_mts(hdr
[8])
631 # C/I: Carrier-to-Interference ratio (in centiBels)
632 self
.ci
= struct
.unpack(">h", hdr
[9:11])[0]
635 ''' Generate message specific burst. '''
637 # Convert soft-bits to unsigned soft-bits
638 burst_usbits
= self
.sbit2usbit(self
.burst
)
641 return bytearray(burst_usbits
)
643 def _parse_burst_v0(self
, burst
):
644 ''' Parse message specific burst for header version 0. '''
648 # We need to guess modulation by the length of burst
649 self
.mod_type
= Modulation
.pick_by_bl(bl
)
650 if self
.mod_type
is None:
651 # Some old transceivers append two dummy bytes
652 self
.mod_type
= Modulation
.pick_by_bl(bl
- 2)
654 if self
.mod_type
is None:
655 raise ValueError("Odd burst length %u" % bl
)
657 return burst
[:self
.mod_type
.bl
]
659 def parse_burst(self
, burst
):
660 ''' Parse message specific burst. '''
665 burst
= self
._parse
_burst
_v
0(burst
)
667 # Convert unsigned soft-bits to soft-bits
668 self
.burst
= self
.usbit2sbit(burst
)
670 def rand_burst(self
, length
= None):
671 ''' Generate a random message specific burst. '''
674 length
= self
.mod_type
.bl
676 self
.burst
= [random
.randint(-127, 127) for _
in range(length
)]
678 def trans(self
, ver
= None):
679 ''' Transform this message to TxMsg. '''
681 # Allocate a new message
682 msg
= TxMsg(fn
= self
.fn
, tn
= self
.tn
,
683 ver
= self
.ver
if ver
is None else ver
)
686 if self
.burst
is not None:
687 msg
.burst
= self
.sbit2ubit(self
.burst
)