trxcon/l1sched: clarify TDMA Fn (mod 26) maps
[osmocom-bb.git] / src / target / trx_toolkit / data_dump.py
blob8475ceb2af2ba9282e50b91fec9b2ed7e05d1a5c
1 # -*- coding: utf-8 -*-
3 # TRX Toolkit
4 # Helpers for DATA capture management
6 # (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
8 # All Rights Reserved
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 2 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 import logging as log
21 import struct
23 from data_msg import *
25 class DATADump:
26 # Constants
27 TAG_TxMsg = b'\x01'
28 TAG_RxMsg = b'\x02'
29 HDR_LENGTH = 3
31 # Generates raw bytes from a DATA message
32 # Return value: raw message bytes
33 def dump_msg(self, msg):
34 # Determine a message type
35 if isinstance(msg, TxMsg):
36 tag = self.TAG_TxMsg
37 elif isinstance(msg, RxMsg):
38 tag = self.TAG_RxMsg
39 else:
40 raise ValueError("Unknown message type")
42 # Generate a message payload
43 msg_raw = msg.gen_msg()
45 # Calculate and pack the message length
46 msg_len = len(msg_raw)
48 # Pack to unsigned short (2 bytes, BE)
49 msg_len = struct.pack(">H", msg_len)
51 # Concatenate a message with header
52 return bytearray(tag + msg_len) + msg_raw
54 def parse_hdr(self, hdr):
55 # Extract the header info
56 msg_len = struct.unpack(">H", hdr[1:3])[0]
57 tag = hdr[:1]
59 # Check if tag is known
60 if tag == self.TAG_TxMsg:
61 msg = TxMsg()
62 elif tag == self.TAG_RxMsg:
63 msg = RxMsg()
64 else:
65 # Unknown tag
66 return False
68 return (msg, msg_len)
70 class DATADumpFile(DATADump):
71 def __init__(self, capture):
72 # Check if capture file is already opened
73 if isinstance(capture, str):
74 log.info("Opening capture file '%s'..." % capture)
75 self.f = open(capture, "a+b")
76 else:
77 self.f = capture
79 def __del__(self):
80 log.info("Closing the capture file")
81 self.f.close()
83 # Moves the file descriptor before a specified message
84 # Return value:
85 # True in case of success,
86 # or False in case of EOF or header parsing error.
87 def _seek2msg(self, idx):
88 # Seek to the beginning of the capture
89 self.f.seek(0)
91 # Read the capture in loop...
92 for _ in range(idx):
93 # Attempt to read a message header
94 hdr_raw = self.f.read(self.HDR_LENGTH)
95 if len(hdr_raw) != self.HDR_LENGTH:
96 return False
98 # Attempt to parse it
99 rc = self.parse_hdr(hdr_raw)
100 if rc is False:
101 log.error("Couldn't parse a message header")
102 return False
104 # Expand the header
105 (_, msg_len) = rc
107 # Skip a message
108 self.f.seek(msg_len, 1)
110 return True
112 # Parses a single message at the current descriptor position
113 # Return value:
114 # a parsed message in case of success,
115 # or None in case of EOF or header parsing error,
116 # or False in case of message parsing error.
117 def _parse_msg(self):
118 # Attempt to read a message header
119 hdr_raw = self.f.read(self.HDR_LENGTH)
120 if len(hdr_raw) != self.HDR_LENGTH:
121 return None
123 # Attempt to parse it
124 rc = self.parse_hdr(hdr_raw)
125 if rc is False:
126 log.error("Couldn't parse a message header")
127 return None
129 # Expand the header
130 (msg, msg_len) = rc
132 # Attempt to read a message
133 msg_raw = self.f.read(msg_len)
134 if len(msg_raw) != msg_len:
135 log.error("Message length mismatch")
136 return None
138 # Attempt to parse a message
139 try:
140 msg_raw = bytearray(msg_raw)
141 msg.parse_msg(msg_raw)
142 except:
143 log.error("Couldn't parse a message, skipping...")
144 return False
146 # Success
147 return msg
149 # Parses a particular message defined by index idx
150 # Return value:
151 # a parsed message in case of success,
152 # or None in case of EOF, out of range, or header parsing error,
153 # or False in case of message parsing error.
154 def parse_msg(self, idx):
155 # Move descriptor to the beginning of requested message
156 rc = self._seek2msg(idx)
157 if not rc:
158 log.error("Couldn't find requested message")
159 return None
161 # Attempt to parse a message
162 return self._parse_msg()
164 # Parses all messages from a given file
165 # Return value:
166 # list of parsed messages,
167 # or False in case of range error.
168 def parse_all(self, skip = None, count = None):
169 result = []
171 # Should we skip some messages?
172 if skip is None:
173 # Seek to the beginning of the capture
174 self.f.seek(0)
175 else:
176 rc = self._seek2msg(skip)
177 if not rc:
178 log.error("Couldn't find requested message")
179 return False
181 # Read the capture in loop...
182 while True:
183 # Attempt to parse a message
184 msg = self._parse_msg()
186 # EOF or broken header
187 if msg is None:
188 break
190 # Skip unparsed messages
191 if msg is False:
192 continue
194 # Success, append a message
195 result.append(msg)
197 # Count limitation
198 if count is not None:
199 if len(result) == count:
200 break
202 return result
204 # Writes a new message at the end of the capture
205 def append_msg(self, msg):
206 # Generate raw bytes and write
207 msg_raw = self.dump_msg(msg)
208 self.f.write(msg_raw)
210 # Writes a list of messages at the end of the capture
211 def append_all(self, msgs):
212 for msg in msgs:
213 self.append_msg(msg)