trxcon/l1sched: clarify TDMA Fn (mod 26) maps
[osmocom-bb.git] / src / target / trx_toolkit / data_if.py
blob8e8089432058284fb589ff0a6daca91bc6f1c9e2
1 # -*- coding: utf-8 -*-
3 # TRX Toolkit
4 # DATA interface implementation
6 # (C) 2017-2019 by Vadim Yanitskiy <axilirator@gmail.com>
8 # All Rights Reserved
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 2 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 import logging as log
22 from udp_link import UDPLink
23 from data_msg import *
25 class DATAInterface(UDPLink):
26 def __init__(self, *udp_link_args):
27 # Default header version (legacy)
28 self._hdr_ver = 0x00
30 UDPLink.__init__(self, *udp_link_args)
31 log.debug("Init TRXD interface (%s)" % self.desc_link())
33 def set_hdr_ver(self, ver):
34 if not ver in Msg.KNOWN_VERSIONS:
35 return False
37 self._hdr_ver = ver
38 return True
40 def pick_hdr_ver(self, ver_req):
41 # Pick a version that is lower or equal to ver_req
42 for ver in Msg.KNOWN_VERSIONS[::-1]:
43 if ver <= ver_req:
44 return ver
46 # No suitable version found
47 return -1
49 def match_hdr_ver(self, msg):
50 if msg.ver == self._hdr_ver:
51 return True
53 log.error("(%s) Rx DATA message (%s) with unexpected header "
54 "version %u (!= expected %u), ignoring..."
55 % (self.desc_link(), msg.desc_hdr(),
56 msg.ver, self._hdr_ver))
57 return False
59 def recv_raw_data(self):
60 data, _ = self.sock.recvfrom(512)
61 return data
63 def recv_tx_msg(self):
64 # Read raw data from socket
65 data = self.recv_raw_data()
67 # Attempt to parse a TRXD Tx message
68 try:
69 msg = TxMsg()
70 msg.parse_msg(bytearray(data))
71 except:
72 log.error("Failed to parse a TRXD Tx message "
73 "from R:%s:%u" % (self.remote_addr, self.remote_port))
74 return None
76 # Make sure the header version matches
77 # the configured one (self._hdr_ver)
78 if not self.match_hdr_ver(msg):
79 return None
81 return msg
83 def recv_rx_msg(self):
84 # Read raw data from socket
85 data = self.recv_raw_data()
87 # Attempt to parse a TRXD Rx message
88 try:
89 msg = RxMsg()
90 msg.parse_msg(bytearray(data))
91 except:
92 log.error("Failed to parse a TRXD Rx message "
93 "from R:%s:%u" % (self.remote_addr, self.remote_port))
94 return None
96 # Make sure the header version matches
97 # the configured one (self._hdr_ver)
98 if not self.match_hdr_ver(msg):
99 return None
101 return msg
103 def send_msg(self, msg, legacy = False):
104 try:
105 # Validate and encode a TRXD message
106 payload = msg.gen_msg(legacy)
107 except ValueError as e:
108 log.error("Failed to encode a TRXD message ('%s') "
109 "due to error: %s" % (msg.desc_hdr(), e))
110 # TODO: we may want to send a NOPE.ind here
111 return
113 # Send message
114 self.send(payload)