3 # By Gerald Combs <gerald@wireshark.org>
5 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 # SPDX-License-Identifier: GPL-2.0-or-later
9 '''File format conversion tests'''
12 from subprocesstest
import count_output
15 from pathlib
import PurePath
17 # XXX Currently unused. It would be nice to be able to use this below.
18 time_output_args
= ('-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta')
20 # Microsecond pcap, direct read was used to generate the baseline:
21 # tshark -Tfields -e frame.number -e frame.time_epoch -e frame.time_delta \
22 # -r captures/dhcp.pcap > baseline/ff-ts-usec-pcap-direct.txt
23 baseline_file
= 'ff-ts-usec-pcap-direct.txt'
26 @pytest.fixture(scope
='session')
27 def fileformats_baseline_str(dirs
):
28 with
open(os
.path
.join(dirs
.baseline_dir
, baseline_file
), 'r') as f
:
32 class TestFileFormatPcap
:
33 def test_pcap_usec_stdin(self
, cmd_tshark
, capture_file
, fileformats_baseline_str
, test_env
):
34 '''Microsecond pcap direct vs microsecond pcap stdin'''
35 capture_stdout
= subprocess
.check_output(' '.join((f
'"{cmd_tshark}"',
38 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
39 '<', capture_file('dhcp.pcap')
41 shell
=True, encoding
='utf-8', env
=test_env
)
42 assert capture_stdout
== fileformats_baseline_str
44 def test_pcap_nsec_stdin(self
, cmd_tshark
, capture_file
, fileformats_baseline_str
, test_env
):
45 '''Microsecond pcap direct vs nanosecond pcap stdin'''
46 capture_stdout
= subprocess
.check_output(' '.join((f
'"{cmd_tshark}"',
49 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
50 '<', capture_file('dhcp-nanosecond.pcap')
52 shell
=True, encoding
='utf-8', env
=test_env
)
53 assert capture_stdout
== fileformats_baseline_str
55 def test_pcap_nsec_direct(self
, cmd_tshark
, capture_file
, fileformats_baseline_str
, test_env
):
56 '''Microsecond pcap direct vs nanosecond pcap direct'''
57 capture_stdout
= subprocess
.check_output((cmd_tshark
,
58 '-r', capture_file('dhcp-nanosecond.pcap'),
60 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
62 encoding
='utf-8', env
=test_env
)
63 assert capture_stdout
== fileformats_baseline_str
66 class TestFileFormatsPcapng
:
67 def test_pcapng_usec_stdin(self
, cmd_tshark
, capture_file
, fileformats_baseline_str
, test_env
):
68 '''Microsecond pcap direct vs microsecond pcapng stdin'''
69 capture_stdout
= subprocess
.check_output(' '.join((f
'"{cmd_tshark}"',
72 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
73 '<', capture_file('dhcp.pcapng')
75 shell
=True, encoding
='utf-8', env
=test_env
)
76 assert capture_stdout
== fileformats_baseline_str
78 def test_pcapng_usec_direct(self
, cmd_tshark
, capture_file
, fileformats_baseline_str
, test_env
):
79 '''Microsecond pcap direct vs microsecond pcapng direct'''
80 capture_stdout
= subprocess
.check_output((cmd_tshark
,
81 '-r', capture_file('dhcp.pcapng'),
83 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
85 encoding
='utf-8', env
=test_env
)
86 assert capture_stdout
== fileformats_baseline_str
88 def test_pcapng_nsec_stdin(self
, cmd_tshark
, capture_file
, fileformats_baseline_str
, test_env
):
89 '''Microsecond pcap direct vs nanosecond pcapng stdin'''
90 capture_stdout
= subprocess
.check_output(' '.join((f
'"{cmd_tshark}"',
93 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
94 '<', capture_file('dhcp-nanosecond.pcapng')
96 shell
=True, encoding
='utf-8', env
=test_env
)
97 assert capture_stdout
== fileformats_baseline_str
99 def test_pcapng_nsec_direct(self
, cmd_tshark
, capture_file
, fileformats_baseline_str
, test_env
):
100 '''Microsecond pcap direct vs nanosecond pcapng direct'''
101 capture_stdout
= subprocess
.check_output((cmd_tshark
,
102 '-r', capture_file('dhcp-nanosecond.pcapng'),
104 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
106 encoding
='utf-8', env
=test_env
)
107 assert capture_stdout
== fileformats_baseline_str
110 def check_pcapng_dsb_fields(request
, cmd_tshark
):
111 '''Factory that checks whether the DSB within the capture file matches.'''
112 def check_dsb_fields_real(outfile
, fields
, env
=None):
113 proc_stdout
= subprocess
.check_output((cmd_tshark
,
115 '-Xread_format:MIME Files Format',
117 '-e', 'pcapng.dsb.secrets_type',
118 '-e', 'pcapng.dsb.secrets_length',
119 '-e', 'pcapng.dsb.secrets_data',
120 '-Y', 'pcapng.dsb.secrets_data'
121 ), encoding
='utf-8', env
=env
)
122 # Convert "t1,t2 l1,l2 v1,2" -> [(t1, l1, v1), (t2, l2, v2)]
123 output
= proc_stdout
.strip()
124 actual
= list(zip(*[x
.split(",") for x
in output
.split('\t')]))
125 def format_field(field
):
127 v_hex
= ''.join('%02x' % c
for c
in v
)
128 return ('0x%08x' % t
, str(l
), v_hex
)
129 fields
= [format_field(field
) for field
in fields
]
130 assert fields
== actual
131 return check_dsb_fields_real
134 class TestFileFormatsPcapngDsb
:
135 def test_pcapng_dsb_1(self
, cmd_tshark
, dirs
, capture_file
, result_file
, check_pcapng_dsb_fields
, base_env
):
136 '''Check that DSBs are preserved while rewriting files.'''
137 dsb_keys1
= os
.path
.join(dirs
.key_dir
, 'tls12-dsb-1.keys')
138 dsb_keys2
= os
.path
.join(dirs
.key_dir
, 'tls12-dsb-2.keys')
139 outfile
= result_file('tls12-dsb-same.pcapng')
140 subprocess
.run((cmd_tshark
,
141 '-r', capture_file('tls12-dsb.pcapng'),
143 ), check
=True, env
=base_env
)
144 with
open(dsb_keys1
, 'r') as f
:
145 dsb1_contents
= f
.read().encode('utf8')
146 with
open(dsb_keys2
, 'r') as f
:
147 dsb2_contents
= f
.read().encode('utf8')
148 check_pcapng_dsb_fields(outfile
, (
149 (0x544c534b, len(dsb1_contents
), dsb1_contents
),
150 (0x544c534b, len(dsb2_contents
), dsb2_contents
),
153 def test_pcapng_dsb_2(self
, cmd_editcap
, dirs
, capture_file
, result_file
, check_pcapng_dsb_fields
, base_env
):
154 '''Insert a single DSB into a pcapng file.'''
155 key_file
= os
.path
.join(dirs
.key_dir
, 'dhe1_keylog.dat')
156 outfile
= result_file('dhe1-dsb.pcapng')
157 subprocess
.run((cmd_editcap
,
158 '--inject-secrets', 'tls,%s' % key_file
,
159 capture_file('dhe1.pcapng.gz'), outfile
160 ), check
=True, env
=base_env
)
161 with
open(key_file
, 'rb') as f
:
162 keylog_contents
= f
.read()
163 check_pcapng_dsb_fields(outfile
, (
164 (0x544c534b, len(keylog_contents
), keylog_contents
),
167 def test_pcapng_dsb_3(self
, cmd_editcap
, dirs
, capture_file
, result_file
, check_pcapng_dsb_fields
, base_env
):
168 '''Insert two DSBs into a pcapng file.'''
169 key_file1
= os
.path
.join(dirs
.key_dir
, 'dhe1_keylog.dat')
170 key_file2
= os
.path
.join(dirs
.key_dir
, 'http2-data-reassembly.keys')
171 outfile
= result_file('dhe1-dsb.pcapng')
172 subprocess
.run((cmd_editcap
,
173 '--inject-secrets', 'tls,%s' % key_file1
,
174 '--inject-secrets', 'tls,%s' % key_file2
,
175 capture_file('dhe1.pcapng.gz'), outfile
176 ), check
=True, env
=base_env
)
177 with
open(key_file1
, 'rb') as f
:
178 keylog1_contents
= f
.read()
179 with
open(key_file2
, 'rb') as f
:
180 keylog2_contents
= f
.read()
181 check_pcapng_dsb_fields(outfile
, (
182 (0x544c534b, len(keylog1_contents
), keylog1_contents
),
183 (0x544c534b, len(keylog2_contents
), keylog2_contents
),
186 def test_pcapng_dsb_4(self
, cmd_editcap
, dirs
, capture_file
, result_file
, check_pcapng_dsb_fields
, base_env
):
187 '''Insert a single DSB into a pcapng file with existing DSBs.'''
188 dsb_keys1
= os
.path
.join(dirs
.key_dir
, 'tls12-dsb-1.keys')
189 dsb_keys2
= os
.path
.join(dirs
.key_dir
, 'tls12-dsb-2.keys')
190 key_file
= os
.path
.join(dirs
.key_dir
, 'dhe1_keylog.dat')
191 outfile
= result_file('tls12-dsb-extra.pcapng')
192 subprocess
.run((cmd_editcap
,
193 '--inject-secrets', 'tls,%s' % key_file
,
194 capture_file('tls12-dsb.pcapng'), outfile
195 ), check
=True, env
=base_env
)
196 with
open(dsb_keys1
, 'r') as f
:
197 dsb1_contents
= f
.read().encode('utf8')
198 with
open(dsb_keys2
, 'r') as f
:
199 dsb2_contents
= f
.read().encode('utf8')
200 with
open(key_file
, 'rb') as f
:
201 keylog_contents
= f
.read()
202 # New DSBs are inserted before the first record. Due to the current
203 # implementation, this is inserted before other (existing) DSBs. This
204 # might change in the future if it is deemed more logical.
205 check_pcapng_dsb_fields(outfile
, (
206 (0x544c534b, len(keylog_contents
), keylog_contents
),
207 (0x544c534b, len(dsb1_contents
), dsb1_contents
),
208 (0x544c534b, len(dsb2_contents
), dsb2_contents
),
211 def test_pcapng_dsb_bad_key(self
, cmd_editcap
, dirs
, capture_file
, result_file
, check_pcapng_dsb_fields
, base_env
):
212 '''Insertion of a RSA key file is not very effective.'''
213 rsa_keyfile
= os
.path
.join(dirs
.key_dir
, 'rsasnakeoil2.key')
214 p12_keyfile
= os
.path
.join(dirs
.key_dir
, 'key.p12')
215 outfile
= result_file('rsasnakeoil2-dsb.pcapng')
216 proc
= subprocess
.run((cmd_editcap
,
217 '--inject-secrets', 'tls,%s' % rsa_keyfile
,
218 '--inject-secrets', 'tls,%s' % p12_keyfile
,
219 capture_file('rsasnakeoil2.pcap'), outfile
220 ), capture_output
=True, encoding
='utf-8', check
=True, env
=base_env
)
221 assert count_output(proc
.stderr
, 'unsupported private key file') == 2
222 with
open(rsa_keyfile
, 'rb') as f
:
223 dsb1_contents
= f
.read()
224 with
open(p12_keyfile
, 'rb') as f
:
225 dsb2_contents
= f
.read()
226 check_pcapng_dsb_fields(outfile
, (
227 (0x544c534b, len(dsb1_contents
), dsb1_contents
),
228 (0x544c534b, len(dsb2_contents
), dsb2_contents
),
231 def test_pcapng_dsb_extract(self
, cmd_editcap
, dirs
, capture_file
, result_file
, check_pcapng_dsb_fields
, base_env
):
232 '''Check that extracted DSBs match the original key log files.'''
233 dsb_keys1
= os
.path
.join(dirs
.key_dir
, 'tls12-dsb-1.keys')
234 dsb_keys2
= os
.path
.join(dirs
.key_dir
, 'tls12-dsb-2.keys')
235 outfile
= result_file('tls12-dsb-extract.key')
236 subprocess
.run((cmd_editcap
,
238 capture_file('tls12-dsb.pcapng'), outfile
239 ), check
=True, env
=base_env
)
240 p
= PurePath(outfile
)
241 with
open(dsb_keys1
, 'r') as f
:
242 dsb1_contents
= f
.read().encode('utf8')
243 with
open(dsb_keys2
, 'r') as f
:
244 dsb2_contents
= f
.read().encode('utf8')
245 # Python 3.9 and higher has p.with_stem(p.stem + "_00000"))
246 with
open(p
.with_name(p
.stem
+ "_00000" + p
.suffix
)) as f
:
247 dsb1_out
= f
.read().encode('utf8')
248 with
open(p
.with_name(p
.stem
+ "_00001" + p
.suffix
)) as f
:
249 dsb2_out
= f
.read().encode('utf8')
250 assert dsb1_contents
== dsb1_out
251 assert dsb2_contents
== dsb2_out
253 class TestFileFormatMime
:
254 def test_mime_pcapng_gz(self
, cmd_tshark
, capture_file
, test_env
):
255 '''Test that the full uncompressed contents is shown.'''
256 proc_stdout
= subprocess
.check_output((cmd_tshark
,
257 '-r', capture_file('icmp.pcapng.gz'),
258 '-Xread_format:MIME Files Format',
261 '-e', 'pcapng.block.length',
262 '-e', 'pcapng.block.length_trailer',
263 ), encoding
='utf-8', env
=test_env
)
264 assert proc_stdout
.strip() == '480\t128,88,132,132\t128,88,132,132'
266 class TestFileFormatCllog
:
267 def test_cllog_cl2000(self
, cmd_tshark
, capture_file
, test_env
):
268 '''Basic test of CAN Logger file format reader.'''
269 proc_stdout
= subprocess
.check_output((cmd_tshark
,
270 '-r', capture_file('canlogger-cl2000.txt'),
271 '-Xread_format:CSS Electronics CLX000 CAN log',
274 ), encoding
='utf-8', env
=test_env
)
275 assert ' '.join(proc_stdout
.strip().splitlines()) == \
276 '2015 2024 2015 2024 2015 2024 2015 2024'