1 # -*- coding: utf-8 -*-
4 # CTRL interface implementation
6 # (C) 2016-2020 by Vadim Yanitskiy <axilirator@gmail.com>
7 # Contributions by sysmocom - s.f.m.c. GmbH
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
24 from udp_link
import UDPLink
26 class CTRLInterface(UDPLink
):
27 def __init__(self
, *udp_link_args
):
28 UDPLink
.__init
__(self
, *udp_link_args
)
29 log
.debug("Init TRXC interface (%s)" % self
.desc_link())
31 # Do not delay RSP messages by default
35 # Read data from socket
36 data
, remote
= self
.sock
.recvfrom(128)
39 if not self
.verify_req(data
):
40 log
.error("Wrong data on TRXC interface")
43 # Attempt to parse a command
44 request
= self
.prepare_req(data
)
45 rc
= self
.parse_cmd(request
)
48 self
.send_response(request
, remote
, rc
[0], rc
[1])
50 self
.send_response(request
, remote
, rc
)
52 def verify_req(self
, data
):
53 # Verify command signature
54 return data
.startswith("CMD")
56 def prepare_req(self
, data
):
57 # Strip signature, paddings and \0
58 request
= data
[4:].strip().strip("\0")
59 # Split into a command and arguments
60 request
= request
.split(" ")
61 # Now we have something like ["TXTUNE", "941600"]
64 # If va is True, the command can have variable number of arguments
65 def verify_cmd(self
, request
, cmd
, argc
, va
= False):
66 # Check if requested command matches
70 # And has enough arguments
71 req_len
= len(request
[1:])
72 if not va
and req_len
!= argc
:
74 elif va
and req_len
< argc
:
79 def send_response(self
, request
, remote
, response_code
, params
= None):
80 # Include status code, for example ["TXTUNE", "0", "941600"]
81 request
.insert(1, str(response_code
))
83 # Optionally append command specific parameters
84 if params
is not None:
87 # Add the response signature, and join back to string
88 response
= "RSP " + " ".join(request
) + "\0"
89 # If configured, delay sending the RSP message
90 if self
.rsp_delay_ms
> 0:
91 time
.sleep(self
.rsp_delay_ms
/ 1000.0)
92 # Now we have something like "RSP TXTUNE 0 941600"
93 self
.sendto(response
, remote
)
95 def parse_cmd(self
, request
):
96 raise NotImplementedError