2 # Wireshark dissector tests
3 # By Atli Guðmundsson <atli@tern.is>
5 # SPDX-License-Identifier: GPL-2.0-or-later
13 class _dissection_validator_real
:
15 Collects a set of byte bundles, matching json objects and a protocol
16 name and verifies that a byte bundle converts into the matching json
17 object using the following execution chain:
19 byte bundle -> text2pcap -> tshark <protocol> -> json
21 Note: The idea for this approach came about when it was realized that
22 calling text2pcap and tshark for each byte bundle resulted in
23 unacceptable overhead during execution of the unittests.
26 def __init__(self
, protocol
, request
, cmd_tshark
, cmd_text2pcap
, result_file
, env
):
27 self
.dissection_list
= []
28 self
.protocol
= protocol
29 self
.cmd_tshark
= cmd_tshark
30 self
.cmd_text2pcap
= cmd_text2pcap
31 self
.test_case
= request
.instance
32 self
.result_file
= result_file
35 def add_dissection(self
, byte_list
, expected_result
, line_no
=None):
36 '''Adds a byte bundle and an expected result to the set of byte
39 byte bundles must be iterable.'''
41 hex_string
= ' '.join('{:02x}'.format(ele
) for ele
in bytes(byte_list
))
44 caller
= inspect
.getframeinfo(inspect
.stack()[1][0])
45 line_no
= caller
.lineno
47 self
.dissection_list
.append((line_no
, hex_string
, expected_result
))
49 # Uncomment the following lines to record in a text file all the dissector byte
50 # bundles, in the order they are presented:
52 # with open("full.txt", 'a') as f:
53 # f.write("0 {}\n".format(hex_string))
55 # Then use the following command to convert full.txt into a pcap file,
56 # replacing <port> with the default port of your protocol:
57 # # text2pcap -u <port>,<port> full.txt out.pcap
59 def check_dissections(self
):
60 '''Processes and verifies all added byte bundles and their expected
61 results. At the end of processing the current set is emptied.'''
63 text_file
= self
.result_file('txt')
64 pcap_file
= self
.result_file('pcap')
66 # create our text file of hex encoded messages
67 with
open(text_file
, 'w') as f
:
68 for line_no
, hex_string
, expected_result
in self
.dissection_list
:
69 f
.write("0 {}\n".format(hex_string
))
71 # generate our pcap file by feeding the messages to text2pcap
72 subprocess
.check_call((
78 # generate our dissection from our pcap file
79 tshark_stdout
= subprocess
.check_output((
83 '-d', 'udp.port==1234,{}'.format(self
.protocol
),
85 ), encoding
='utf-8', env
=self
.env
)
87 dissections
= json
.loads(tshark_stdout
)
88 for (line_no
, hex_string
, expected_result
), dissection
in zip(self
.dissection_list
, dissections
):
90 # strip away everything except the protocol
91 result
= dissection
['_source']['layers']
92 assert self
.protocol
in result
93 result
= result
[self
.protocol
]
95 # verify that the dissection is as expected
96 assert expected_result
== result
, \
97 "expected != result, while dissecting [{}] from line {}.".format(hex_string
, line_no
)
99 # cleanup for next test
100 self
.dissection_list
= []
104 def dissection_validator(request
, cmd_tshark
, cmd_text2pcap
, result_file
, test_env
):
106 def generate_validator(protocol
):
107 retval
= _dissection_validator_real(
116 return generate_validator