trxcon/l1sched: clarify TDMA Fn (mod 26) maps
[osmocom-bb.git] / src / target / trx_toolkit / ctrl_if.py
blobf9d30c19f8ef6dd22ead88722fdda685efcb4e99
1 # -*- coding: utf-8 -*-
3 # TRX Toolkit
4 # CTRL interface implementation
6 # (C) 2016-2020 by Vadim Yanitskiy <axilirator@gmail.com>
7 # Contributions by sysmocom - s.f.m.c. GmbH
9 # All Rights Reserved
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 import logging as log
22 import time
24 from udp_link import UDPLink
26 class CTRLInterface(UDPLink):
27 def __init__(self, *udp_link_args):
28 UDPLink.__init__(self, *udp_link_args)
29 log.debug("Init TRXC interface (%s)" % self.desc_link())
31 # Do not delay RSP messages by default
32 self.rsp_delay_ms = 0
34 def handle_rx(self):
35 # Read data from socket
36 data, remote = self.sock.recvfrom(128)
37 data = data.decode()
39 if not self.verify_req(data):
40 log.error("Wrong data on TRXC interface")
41 return
43 # Attempt to parse a command
44 request = self.prepare_req(data)
45 rc = self.parse_cmd(request)
47 if type(rc) is tuple:
48 self.send_response(request, remote, rc[0], rc[1])
49 else:
50 self.send_response(request, remote, rc)
52 def verify_req(self, data):
53 # Verify command signature
54 return data.startswith("CMD")
56 def prepare_req(self, data):
57 # Strip signature, paddings and \0
58 request = data[4:].strip().strip("\0")
59 # Split into a command and arguments
60 request = request.split(" ")
61 # Now we have something like ["TXTUNE", "941600"]
62 return request
64 # If va is True, the command can have variable number of arguments
65 def verify_cmd(self, request, cmd, argc, va = False):
66 # Check if requested command matches
67 if request[0] != cmd:
68 return False
70 # And has enough arguments
71 req_len = len(request[1:])
72 if not va and req_len != argc:
73 return False
74 elif va and req_len < argc:
75 return False
77 return True
79 def send_response(self, request, remote, response_code, params = None):
80 # Include status code, for example ["TXTUNE", "0", "941600"]
81 request.insert(1, str(response_code))
83 # Optionally append command specific parameters
84 if params is not None:
85 request += params
87 # Add the response signature, and join back to string
88 response = "RSP " + " ".join(request) + "\0"
89 # If configured, delay sending the RSP message
90 if self.rsp_delay_ms > 0:
91 time.sleep(self.rsp_delay_ms / 1000.0)
92 # Now we have something like "RSP TXTUNE 0 941600"
93 self.sendto(response, remote)
95 def parse_cmd(self, request):
96 raise NotImplementedError