1 # -*- coding: utf-8 -*-
4 # Helpers for DATA capture management
6 # (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 2 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
23 from data_msg
import *
31 # Generates raw bytes from a DATA message
32 # Return value: raw message bytes
33 def dump_msg(self
, msg
):
34 # Determine a message type
35 if isinstance(msg
, TxMsg
):
37 elif isinstance(msg
, RxMsg
):
40 raise ValueError("Unknown message type")
42 # Generate a message payload
43 msg_raw
= msg
.gen_msg()
45 # Calculate and pack the message length
46 msg_len
= len(msg_raw
)
48 # Pack to unsigned short (2 bytes, BE)
49 msg_len
= struct
.pack(">H", msg_len
)
51 # Concatenate a message with header
52 return bytearray(tag
+ msg_len
) + msg_raw
54 def parse_hdr(self
, hdr
):
55 # Extract the header info
56 msg_len
= struct
.unpack(">H", hdr
[1:3])[0]
59 # Check if tag is known
60 if tag
== self
.TAG_TxMsg
:
62 elif tag
== self
.TAG_RxMsg
:
70 class DATADumpFile(DATADump
):
71 def __init__(self
, capture
):
72 # Check if capture file is already opened
73 if isinstance(capture
, str):
74 log
.info("Opening capture file '%s'..." % capture
)
75 self
.f
= open(capture
, "a+b")
80 log
.info("Closing the capture file")
83 # Moves the file descriptor before a specified message
85 # True in case of success,
86 # or False in case of EOF or header parsing error.
87 def _seek2msg(self
, idx
):
88 # Seek to the beginning of the capture
91 # Read the capture in loop...
93 # Attempt to read a message header
94 hdr_raw
= self
.f
.read(self
.HDR_LENGTH
)
95 if len(hdr_raw
) != self
.HDR_LENGTH
:
99 rc
= self
.parse_hdr(hdr_raw
)
101 log
.error("Couldn't parse a message header")
108 self
.f
.seek(msg_len
, 1)
112 # Parses a single message at the current descriptor position
114 # a parsed message in case of success,
115 # or None in case of EOF or header parsing error,
116 # or False in case of message parsing error.
117 def _parse_msg(self
):
118 # Attempt to read a message header
119 hdr_raw
= self
.f
.read(self
.HDR_LENGTH
)
120 if len(hdr_raw
) != self
.HDR_LENGTH
:
123 # Attempt to parse it
124 rc
= self
.parse_hdr(hdr_raw
)
126 log
.error("Couldn't parse a message header")
132 # Attempt to read a message
133 msg_raw
= self
.f
.read(msg_len
)
134 if len(msg_raw
) != msg_len
:
135 log
.error("Message length mismatch")
138 # Attempt to parse a message
140 msg_raw
= bytearray(msg_raw
)
141 msg
.parse_msg(msg_raw
)
143 log
.error("Couldn't parse a message, skipping...")
149 # Parses a particular message defined by index idx
151 # a parsed message in case of success,
152 # or None in case of EOF, out of range, or header parsing error,
153 # or False in case of message parsing error.
154 def parse_msg(self
, idx
):
155 # Move descriptor to the beginning of requested message
156 rc
= self
._seek
2msg
(idx
)
158 log
.error("Couldn't find requested message")
161 # Attempt to parse a message
162 return self
._parse
_msg
()
164 # Parses all messages from a given file
166 # list of parsed messages,
167 # or False in case of range error.
168 def parse_all(self
, skip
= None, count
= None):
171 # Should we skip some messages?
173 # Seek to the beginning of the capture
176 rc
= self
._seek
2msg
(skip
)
178 log
.error("Couldn't find requested message")
181 # Read the capture in loop...
183 # Attempt to parse a message
184 msg
= self
._parse
_msg
()
186 # EOF or broken header
190 # Skip unparsed messages
194 # Success, append a message
198 if count
is not None:
199 if len(result
) == count
:
204 # Writes a new message at the end of the capture
205 def append_msg(self
, msg
):
206 # Generate raw bytes and write
207 msg_raw
= self
.dump_msg(msg
)
208 self
.f
.write(msg_raw
)
210 # Writes a list of messages at the end of the capture
211 def append_all(self
, msgs
):