3 # By Gerald Combs <gerald@wireshark.org>
5 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 # SPDX-License-Identifier: GPL-2.0-or-later
14 from subprocesstest
import count_output
, grep_output
18 class TestDissectDtnTcpcl
:
19 def test_tcpclv3_xfer(self
, cmd_tshark
, capture_file
, test_env
):
20 stdout
= subprocess
.check_output((cmd_tshark
,
21 '-r', capture_file('dtn_tcpclv3_bpv6_transfer.pcapng'),
22 '-Tfields', '-etcpcl.ack.length',
23 ), encoding
='utf-8', env
=test_env
)
24 assert count_output(stdout
, r
'1064') == 2
26 def test_tcpclv4_xfer(self
, cmd_tshark
, capture_file
, test_env
):
27 stdout
= subprocess
.check_output((cmd_tshark
,
28 '-r', capture_file('dtn_tcpclv4_bpv7_transfer.pcapng'),
29 '-Tfields', '-etcpcl.v4.xfer_ack.ack_len',
30 ), encoding
='utf-8', env
=test_env
)
31 assert count_output(stdout
, r
'199') == 2
34 class TestDissectBpv7
:
35 def test_bpv7_admin_status(self
, cmd_tshark
, capture_file
, test_env
):
36 stdout
= subprocess
.check_output((cmd_tshark
,
37 '-r', capture_file('dtn_udpcl_bpv7_bpsec_bib_admin.pcapng'),
38 '-Tfields', '-ebpv7.status_rep.identity',
39 ), encoding
='utf-8', env
=test_env
)
40 assert grep_output(stdout
, r
'Source: ipn:93.185, DTN Time: 1396536125, Seq: 281')
42 def test_bpv7_bpsec_bib(self
, cmd_tshark
, capture_file
, test_env
):
43 stdout
= subprocess
.check_output((cmd_tshark
,
44 '-r', capture_file('dtn_udpcl_bpv7_bpsec_bib_admin.pcapng'),
45 '-Tfields', '-ebpsec.asb.ctxid',
46 ), encoding
='utf-8', env
=test_env
)
47 assert count_output(stdout
, r
'1') == 1
49 def test_bpv7_bpsec_bib_admin_type(self
, cmd_tshark
, capture_file
, test_env
):
50 # BIB doesn't alter payload
51 stdout
= subprocess
.check_output((cmd_tshark
,
52 '-r', capture_file('dtn_udpcl_bpv7_bpsec_bib_admin.pcapng'),
53 '-Tfields', '-ebpv7.admin_rec.type_code',
54 ), encoding
='utf-8', env
=test_env
)
55 assert count_output(stdout
, r
'1') == 1
57 def test_bpv7_bpsec_bcb(self
, cmd_tshark
, capture_file
, test_env
):
58 stdout
= subprocess
.check_output((cmd_tshark
,
59 '-r', capture_file('dtn_udpcl_bpv7_bpsec_bcb_admin.pcapng'),
60 '-Tfields', '-ebpsec.asb.ctxid',
61 ), encoding
='utf-8', env
=test_env
)
62 assert count_output(stdout
, r
'2') == 1
64 def test_bpv7_bpsec_bcb_admin_type(self
, cmd_tshark
, capture_file
, test_env
):
65 # BCB inhibits payload dissection
66 stdout
= subprocess
.check_output((cmd_tshark
,
67 '-r', capture_file('dtn_udpcl_bpv7_bpsec_bcb_admin.pcapng'),
68 '-Tfields', '-ebpv7.admin_rec.type_code',
69 ), encoding
='utf-8', env
=test_env
)
70 assert count_output(stdout
, r
'1') == 0
73 class TestDissectCose
:
75 These test captures were generated from the COSE example files with command:
76 for FN in test/captures/cose*.cbordiag; do python3 tools/generate_cbor_pcap.py --content-type 'application/cose' --infile $FN --outfile ${FN%.cbordiag}.pcap; done
78 def test_cose_sign_tagged(self
, cmd_tshark
, capture_file
, test_env
):
79 stdout
= subprocess
.check_output((cmd_tshark
,
80 '-r', capture_file('cose_sign_tagged.pcap'),
81 '-Tfields', '-ecose.msg.signature',
82 ), encoding
='utf-8', env
=test_env
)
83 assert grep_output(stdout
, 'e2aeafd40d69d19dfe6e52077c5d7ff4e408282cbefb5d06cbf414af2e19d982ac45ac98b8544c908b4507de1e90b717c3d34816fe926a2b98f53afd2fa0f30a')
85 def test_cose_sign1_tagged(self
, cmd_tshark
, capture_file
, test_env
):
86 stdout
= subprocess
.check_output((cmd_tshark
,
87 '-r', capture_file('cose_sign1_tagged.pcap'),
88 '-Tfields', '-ecose.msg.signature',
89 ), encoding
='utf-8', env
=test_env
)
90 assert grep_output(stdout
, '8eb33e4ca31d1c465ab05aac34cc6b23d58fef5c083106c4d25a91aef0b0117e2af9a291aa32e14ab834dc56ed2a223444547e01f11d3b0916e5a4c345cacb36')
92 def test_cose_encrypt_tagged(self
, cmd_tshark
, capture_file
, test_env
):
93 stdout
= subprocess
.check_output((cmd_tshark
,
94 '-r', capture_file('cose_encrypt_tagged.pcap'),
95 '-Tfields', '-ecose.kid',
96 ), encoding
='utf-8', env
=test_env
)
97 assert grep_output(stdout
, '6f75722d736563726574')
99 def test_cose_encrypt0_tagged(self
, cmd_tshark
, capture_file
, test_env
):
100 stdout
= subprocess
.check_output((cmd_tshark
,
101 '-r', capture_file('cose_encrypt0_tagged.pcap'),
102 '-Tfields', '-ecose.iv',
103 ), encoding
='utf-8', env
=test_env
)
104 assert grep_output(stdout
, '89f52f65a1c580933b5261a78c')
106 def test_cose_mac_tagged(self
, cmd_tshark
, capture_file
, test_env
):
107 stdout
= subprocess
.check_output((cmd_tshark
,
108 '-r', capture_file('cose_mac_tagged.pcap'),
109 '-Tfields', '-ecose.kid',
110 ), encoding
='utf-8', env
=test_env
)
111 assert grep_output(stdout
, '30313863306165352d346439622d343731622d626664362d656566333134626337303337')
113 def test_cose_mac0_tagged(self
, cmd_tshark
, capture_file
, test_env
):
114 stdout
= subprocess
.check_output((cmd_tshark
,
115 '-r', capture_file('cose_mac0_tagged.pcap'),
116 '-Tfields', '-ecose.msg.mac_tag',
117 ), encoding
='utf-8', env
=test_env
)
118 assert grep_output(stdout
, '726043745027214f')
120 def test_cose_keyset(self
, cmd_tshark
, capture_file
, test_env
):
121 stdout
= subprocess
.check_output((cmd_tshark
,
122 '-r', capture_file('cose_keyset.pcap'),
123 '-Tfields', '-ecose.key.k',
124 ), encoding
='utf-8', env
=test_env
)
125 assert grep_output(stdout
, '849b57219dae48de646d07dbb533566e976686457c1491be3a76dcea6c427188')
128 class TestDissectGprpc
:
129 def test_grpc_with_json(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
130 '''gRPC with JSON payload'''
131 if not features
.have_nghttp2
:
132 pytest
.skip('Requires nghttp2.')
133 stdout
= subprocess
.check_output((cmd_tshark
,
134 '-r', capture_file('grpc_person_search_json_with_image.pcapng.gz'),
135 '-d', 'tcp.port==50052,http2',
137 '-Y', 'grpc.message_length == 208 && json.value.string == "87561234"',
138 ), encoding
='utf-8', env
=test_env
)
139 assert grep_output(stdout
, 'GRPC/JSON')
141 def test_grpc_with_protobuf(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
142 '''gRPC with Protobuf payload'''
143 if not features
.have_nghttp2
:
144 pytest
.skip('Requires nghttp2.')
145 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
146 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
147 stdout
= subprocess
.check_output((cmd_tshark
,
148 '-r', capture_file('grpc_person_search_protobuf_with_image.pcapng.gz'),
149 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
150 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
151 '-d', 'tcp.port==50051,http2',
153 '-Y', 'protobuf.message.name == "tutorial.PersonSearchRequest"'
154 ' || (grpc.message_length == 66 && protobuf.field.value.string == "Jason"'
155 ' && protobuf.field.value.int64 == 1602601886)',
156 ), encoding
='utf-8', env
=test_env
)
157 assert grep_output(stdout
, 'tutorial.PersonSearchService/Search') # grpc request
158 assert grep_output(stdout
, 'tutorial.Person') # grpc response
160 def test_grpc_streaming_mode_reassembly(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
161 '''gRPC/HTTP2 streaming mode reassembly'''
162 if not features
.have_nghttp2
:
163 pytest
.skip('Requires nghttp2.')
164 stdout
= subprocess
.check_output((cmd_tshark
,
165 '-r', capture_file('grpc_stream_reassembly_sample.pcapng.gz'),
166 '-d', 'tcp.port==50051,http2',
167 '-d', 'tcp.port==44363,http2',
168 '-2', # make http2.body.reassembled.in available
169 '-Y', # Case1: In frame28, one http DATA contains 4 completed grpc messages (json data seq=1,2,3,4).
170 '(frame.number == 28 && grpc && json.value.number == 1 && json.value.number == 2'
171 ' && json.value.number == 3 && json.value.number == 4 && http2.body.reassembled.in == 45) ||'
172 # Case2: In frame28, last grpc message (the 5th) only has 4 bytes, which need one more byte
173 # to be a message head. a completed message is reassembled in frame45. (json data seq=5)
174 '(frame.number == 45 && grpc && http2.body.fragment == 28 && json.value.number == 5'
175 ' && http2.body.reassembled.in == 61) ||'
176 # Case3: In frame45, one http DATA frame contains two partial fragment, one is part of grpc
177 # message of previous http DATA (frame28), another is first part of grpc message of next http
178 # DATA (which will be reassembled in next http DATA frame61). (json data seq=6)
179 '(frame.number == 61 && grpc && http2.body.fragment == 45 && json.value.number == 6) ||'
180 # Case4: A big grpc message across frame100, frame113, frame126 and finally reassembled in frame139.
181 '(frame.number == 100 && grpc && http2.body.reassembled.in == 139) ||'
182 '(frame.number == 113 && !grpc && http2.body.reassembled.in == 139) ||'
183 '(frame.number == 126 && !grpc && http2.body.reassembled.in == 139) ||'
184 '(frame.number == 139 && grpc && json.value.number == 9) ||'
185 # Case5: An large grpc message of 200004 bytes.
186 '(frame.number == 164 && grpc && grpc.message_length == 200004)',
187 ), encoding
='utf-8', env
=test_env
)
188 assert count_output(stdout
, 'DATA') == 8
190 def test_grpc_http2_fake_headers(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
191 '''HTTP2/gRPC fake headers (used when HTTP2 initial HEADERS frame is missing)'''
192 if not features
.have_nghttp2
:
193 pytest
.skip('Requires nghttp2.')
194 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
195 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
196 stdout
= subprocess
.check_output((cmd_tshark
,
197 '-r', capture_file('grpc_person_search_protobuf_with_image-missing_headers.pcapng.gz'),
198 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
199 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
200 '-o', 'uat:http2_fake_headers: "{}","{}","{}","{}","{}","{}","{}"'.format(
201 '50051','3','IN',':path','/tutorial.PersonSearchService/Search','FALSE', 'TRUE'),
202 '-o', 'uat:http2_fake_headers: "{}","{}","{}","{}","{}","{}","{}"'.format(
203 '50051','0','IN','content-type','application/grpc','FALSE','TRUE'),
204 '-o', 'uat:http2_fake_headers: "{}","{}","{}","{}","{}","{}","{}"'.format(
205 '50051','0','OUT','content-type','application/grpc','FALSE','TRUE'),
206 '-d', 'tcp.port==50051,http2',
208 '-Y', 'protobuf.field.value.string == "Jason" || protobuf.field.value.string == "Lily"',
209 ), encoding
='utf-8', env
=test_env
)
210 assert count_output(stdout
, 'DATA') == 2
213 class TestDissectGrpcWeb
:
214 def test_grpc_web_unary_call_over_http1(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
215 '''gRPC-Web unary call over http1'''
216 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
217 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
218 stdout
= subprocess
.check_output((cmd_tshark
,
219 '-r', capture_file('grpc_web.pcapng.gz'),
220 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
221 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
222 '-o', 'protobuf.preload_protos: TRUE',
223 '-o', 'protobuf.pbf_as_hf: TRUE',
224 '-d', 'tcp.port==57226,http',
226 '-Y', '(tcp.stream eq 0) && (pbf.greet.HelloRequest.name == "88888888"'
227 '|| pbf.greet.HelloRequest.name == "99999999"'
228 '|| pbf.greet.HelloReply.message == "Hello 99999999")',
229 ), encoding
='utf-8', env
=test_env
)
230 assert count_output(stdout
, 'greet.HelloRequest') == 2
231 assert count_output(stdout
, 'greet.HelloReply') == 1
233 def test_grpc_web_unary_call_over_http2(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
234 '''gRPC-Web unary call over http2'''
235 if not features
.have_nghttp2
:
236 pytest
.skip('Requires nghttp2.')
237 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
238 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
239 stdout
= subprocess
.check_output((cmd_tshark
,
240 '-r', capture_file('grpc_web.pcapng.gz'),
241 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
242 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
243 '-o', 'protobuf.preload_protos: TRUE',
244 '-o', 'protobuf.pbf_as_hf: TRUE',
245 '-d', 'tcp.port==57228,http2',
247 '-Y', '(tcp.stream eq 1) && (pbf.greet.HelloRequest.name == "88888888"'
248 '|| pbf.greet.HelloRequest.name == "99999999"'
249 '|| pbf.greet.HelloReply.message == "Hello 99999999")',
250 ), encoding
='utf-8', env
=test_env
)
251 assert count_output(stdout
, 'greet.HelloRequest') == 2
252 assert count_output(stdout
, 'greet.HelloReply') == 1
254 def test_grpc_web_reassembly_and_stream_over_http2(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
255 '''gRPC-Web data reassembly and server stream over http2'''
256 if not features
.have_nghttp2
:
257 pytest
.skip('Requires nghttp2.')
258 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
259 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
260 stdout
= subprocess
.check_output((cmd_tshark
,
261 '-r', capture_file('grpc_web.pcapng.gz'),
262 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
263 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
264 '-o', 'protobuf.preload_protos: TRUE',
265 '-o', 'protobuf.pbf_as_hf: TRUE',
266 '-d', 'tcp.port==57228,http2',
268 '-Y', '(tcp.stream eq 2) && ((pbf.greet.HelloRequest.name && grpc.message_length == 80004)'
269 '|| (pbf.greet.HelloReply.message && (grpc.message_length == 23 || grpc.message_length == 80012)))',
270 ), encoding
='utf-8', env
=test_env
)
271 assert count_output(stdout
, 'greet.HelloRequest') == 2
272 assert count_output(stdout
, 'greet.HelloReply') == 4
274 def test_grpc_web_text_unary_call_over_http1(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
275 '''gRPC-Web-Text unary call over http1'''
276 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
277 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
278 stdout
= subprocess
.check_output((cmd_tshark
,
279 '-r', capture_file('grpc_web.pcapng.gz'),
280 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
281 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
282 '-o', 'protobuf.preload_protos: TRUE',
283 '-o', 'protobuf.pbf_as_hf: TRUE',
284 '-d', 'tcp.port==57226,http',
286 '-Y', '(tcp.stream eq 5) && (pbf.greet.HelloRequest.name == "88888888"'
287 '|| pbf.greet.HelloRequest.name == "99999999"'
288 '|| pbf.greet.HelloReply.message == "Hello 99999999")',
289 ), encoding
='utf-8', env
=test_env
)
290 assert grep_output(stdout
, 'GRPC-Web-Text')
291 assert count_output(stdout
, 'greet.HelloRequest') == 2
292 assert count_output(stdout
, 'greet.HelloReply') == 1
294 def test_grpc_web_text_unary_call_over_http2(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
295 '''gRPC-Web-Text unary call over http2'''
296 if not features
.have_nghttp2
:
297 pytest
.skip('Requires nghttp2.')
298 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
299 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
300 stdout
= subprocess
.check_output((cmd_tshark
,
301 '-r', capture_file('grpc_web.pcapng.gz'),
302 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
303 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
304 '-o', 'protobuf.preload_protos: TRUE',
305 '-o', 'protobuf.pbf_as_hf: TRUE',
306 '-d', 'tcp.port==57228,http2',
308 '-Y', '(tcp.stream eq 6) && (pbf.greet.HelloRequest.name == "88888888"'
309 '|| pbf.greet.HelloRequest.name == "99999999"'
310 '|| pbf.greet.HelloReply.message == "Hello 99999999")',
311 ), encoding
='utf-8', env
=test_env
)
312 assert grep_output(stdout
, 'GRPC-Web-Text')
313 assert count_output(stdout
, 'greet.HelloRequest') == 2
314 assert count_output(stdout
, 'greet.HelloReply') == 1
316 def test_grpc_web_text_reassembly_and_stream_over_http2(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
317 '''gRPC-Web-Text data reassembly and server stream over http2'''
318 if not features
.have_nghttp2
:
319 pytest
.skip('Requires nghttp2.')
320 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
321 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
322 stdout
= subprocess
.check_output((cmd_tshark
,
323 '-r', capture_file('grpc_web.pcapng.gz'),
324 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
325 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
326 '-o', 'protobuf.preload_protos: TRUE',
327 '-o', 'protobuf.pbf_as_hf: TRUE',
328 '-d', 'tcp.port==57228,http2',
330 '-Y', '(tcp.stream eq 8) && ((pbf.greet.HelloRequest.name && grpc.message_length == 80004)'
331 '|| (pbf.greet.HelloReply.message && (grpc.message_length == 23 || grpc.message_length == 80012)))',
332 ), encoding
='utf-8', env
=test_env
)
333 assert grep_output(stdout
, 'GRPC-Web-Text')
334 assert count_output(stdout
, 'greet.HelloRequest') == 2
335 assert count_output(stdout
, 'greet.HelloReply') == 4
337 def test_grpc_web_text_reassembly_over_http1(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
338 '''gRPC-Web-Text data reassembly over http1'''
339 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
340 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
341 stdout
= subprocess
.check_output((cmd_tshark
,
342 '-r', capture_file('grpc_web.pcapng.gz'),
343 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
344 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
345 '-o', 'protobuf.preload_protos: TRUE',
346 '-o', 'protobuf.pbf_as_hf: TRUE',
347 '-d', 'tcp.port==57226,http',
349 '-Y', '(tcp.stream eq 7) && (grpc.message_length == 80004 || grpc.message_length == 80010)',
350 ), encoding
='utf-8', env
=test_env
)
351 assert grep_output(stdout
, 'GRPC-Web-Text')
352 assert count_output(stdout
, 'greet.HelloRequest') == 1
353 assert count_output(stdout
, 'greet.HelloReply') == 1
355 def test_grpc_web_server_stream_over_http1(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
356 '''gRPC-Web data server stream over http1'''
357 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
358 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
359 stdout
= subprocess
.check_output((cmd_tshark
,
360 '-r', capture_file('grpc_web.pcapng.gz'),
361 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
362 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
363 '-o', 'protobuf.preload_protos: TRUE',
364 '-o', 'protobuf.pbf_as_hf: TRUE',
365 '-d', 'tcp.port==57226,http',
367 '-Y', '(tcp.stream eq 9) && ((pbf.greet.HelloRequest.name && grpc.message_length == 10)'
368 '|| (pbf.greet.HelloReply.message && grpc.message_length == 18))',
369 ), encoding
='utf-8', env
=test_env
)
370 assert grep_output(stdout
, 'GRPC-Web')
371 assert count_output(stdout
, 'greet.HelloRequest') == 1
372 assert count_output(stdout
, 'greet.HelloReply') == 9
374 def test_grpc_web_reassembly_and_stream_over_http1(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
375 '''gRPC-Web data reassembly and server stream over http1'''
376 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
377 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
378 stdout
= subprocess
.check_output((cmd_tshark
,
379 '-r', capture_file('grpc_web.pcapng.gz'),
380 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
381 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
382 '-o', 'protobuf.preload_protos: TRUE',
383 '-o', 'protobuf.pbf_as_hf: TRUE',
384 '-d', 'tcp.port==57226,http',
386 '-Y', '(tcp.stream eq 10) && ((pbf.greet.HelloRequest.name && grpc.message_length == 80004)'
387 '|| (pbf.greet.HelloReply.message && (grpc.message_length == 23 || grpc.message_length == 80012)))',
388 ), encoding
='utf-8', env
=test_env
)
389 assert grep_output(stdout
, 'GRPC-Web')
390 assert count_output(stdout
, 'greet.HelloRequest') == 2
391 assert count_output(stdout
, 'greet.HelloReply') == 6
395 class TestDissectHttp
:
396 def test_http_brotli_decompression(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
397 '''HTTP brotli decompression'''
398 if not features
.have_brotli
:
399 pytest
.skip('Requires brotli.')
400 stdout
= subprocess
.check_output((cmd_tshark
,
401 '-r', capture_file('http-brotli.pcapng'),
402 '-Y', 'http.response.code==200',
403 '-Tfields', '-etext',
404 ), encoding
='utf-8', env
=test_env
)
405 assert grep_output(stdout
, 'This is a test file for testing brotli decompression in Wireshark')
407 class TestDissectHttp2
:
408 def test_http2_data_reassembly(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
409 '''HTTP2 data reassembly'''
410 if not features
.have_nghttp2
:
411 pytest
.skip('Requires nghttp2.')
412 key_file
= os
.path
.join(dirs
.key_dir
, 'http2-data-reassembly.keys')
413 stdout
= subprocess
.check_output((cmd_tshark
,
414 '-r', capture_file('http2-data-reassembly.pcap'),
415 '-o', 'tls.keylog_file: {}'.format(key_file
),
416 '-d', 'tcp.port==8443,tls',
417 '-Y', 'http2.data.data matches "PNG" && http2.data.data matches "END"',
418 ), encoding
='utf-8', env
=test_env
)
419 assert grep_output(stdout
, 'DATA')
421 def test_http2_brotli_decompression(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
422 '''HTTP2 brotli decompression'''
423 if not features
.have_nghttp2
:
424 pytest
.skip('Requires nghttp2.')
425 if not features
.have_brotli
:
426 pytest
.skip('Requires brotli.')
427 stdout
= subprocess
.check_output((cmd_tshark
,
428 '-r', capture_file('http2-brotli.pcapng'),
429 '-Y', 'http2.data.data matches "This is a test file for testing brotli decompression in Wireshark"',
430 ), encoding
='utf-8', env
=test_env
)
431 assert grep_output(stdout
, 'DATA')
433 def test_http2_zstd_decompression(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
434 '''HTTP/2 zstd decompression'''
435 if not features
.have_nghttp2
:
436 pytest
.skip('Requires nghttp2.')
437 if not features
.have_zstd
:
438 pytest
.skip('Requires zstd.')
439 stdout
= subprocess
.check_output((cmd_tshark
,
440 '-r', capture_file('http2-zstd.pcapng'),
441 '-Y', 'http2.data.data matches "Your browser supports decompressing Zstandard."',
442 ), encoding
='utf-8', env
=test_env
)
443 assert grep_output(stdout
, 'DATA')
445 def test_http2_follow_0(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
446 '''Follow HTTP/2 Stream ID 0 test'''
447 if not features
.have_nghttp2
:
448 pytest
.skip('Requires nghttp2.')
449 key_file
= os
.path
.join(dirs
.key_dir
, 'http2-data-reassembly.keys')
450 stdout
= subprocess
.check_output((cmd_tshark
,
451 '-r', capture_file('http2-data-reassembly.pcap'),
452 '-o', 'tls.keylog_file: {}'.format(key_file
),
453 '-z', 'follow,http2,hex,0,0'
454 ), encoding
='utf-8', env
=test_env
)
456 assert grep_output(stdout
, '00000000 00 00 12 04 00 00 00 00')
457 # Stream ID 1 bytes, decrypted but compressed by HPACK
458 assert not grep_output(stdout
, '00000000 00 00 2c 01 05 00 00 00')
459 # Stream ID 1 bytes, decrypted and uncompressed, human readable
460 assert not grep_output(stdout
, '00000000 3a 6d 65 74 68 6f 64 3a')
462 def test_http2_follow_1(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
463 '''Follow HTTP/2 Stream ID 1 test'''
464 if not features
.have_nghttp2
:
465 pytest
.skip('Requires nghttp2.')
466 key_file
= os
.path
.join(dirs
.key_dir
, 'http2-data-reassembly.keys')
467 stdout
= subprocess
.check_output((cmd_tshark
,
468 '-r', capture_file('http2-data-reassembly.pcap'),
469 '-o', 'tls.keylog_file: {}'.format(key_file
),
470 '-z', 'follow,http2,hex,0,1'
471 ), encoding
='utf-8', env
=test_env
)
473 assert not grep_output(stdout
, '00000000 00 00 12 04 00 00 00 00')
474 # Stream ID 1 bytes, decrypted but compressed by HPACK
475 assert not grep_output(stdout
, '00000000 00 00 2c 01 05 00 00 00')
476 # Stream ID 1 bytes, decrypted and uncompressed, human readable
477 assert grep_output(stdout
, '00000000 3a 6d 65 74 68 6f 64 3a')
479 class TestDissectHttp2
:
480 def test_http3_qpack_reassembly(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
481 '''HTTP/3 QPACK encoder stream reassembly'''
482 if not features
.have_nghttp3
:
483 pytest
.skip('Requires nghttp3.')
484 stdout
= subprocess
.check_output((cmd_tshark
,
485 '-r', capture_file('http3-qpack-reassembly-anon.pcapng'),
486 '-Y', 'http3.frame_type == "HEADERS"',
487 '-T', 'fields', '-e', 'http3.headers.method',
488 '-e', 'http3.headers.authority',
489 '-e', 'http3.headers.referer', '-e', 'http3.headers.user_agent',
490 '-e', 'http3.qpack.encoder.icnt'
491 ), encoding
='utf-8', env
=test_env
)
492 assert grep_output(stdout
, 'POST')
493 assert grep_output(stdout
, 'googlevideo.com')
494 assert grep_output(stdout
, 'https://www.youtube.com')
495 assert grep_output(stdout
, 'Mozilla/5.0')
496 assert grep_output(stdout
, '21') # Total number of QPACK insertions
498 class TestDissectProtobuf
:
499 def test_protobuf_udp_message_mapping(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
500 '''Test Protobuf UDP Message Mapping and parsing google.protobuf.Timestamp features'''
501 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
502 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
503 stdout
= subprocess
.check_output((cmd_tshark
,
504 '-r', capture_file('protobuf_udp_addressbook_with_image_ts.pcapng'),
505 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
506 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
507 '-o', 'uat:protobuf_udp_message_types: "8127","tutorial.AddressBook"',
508 '-o', 'protobuf.preload_protos: TRUE',
509 '-o', 'protobuf.pbf_as_hf: TRUE',
510 '-Y', 'pbf.tutorial.Person.name == "Jason"'
511 ' && pbf.tutorial.Person.last_updated > "2020-10-15"'
512 ' && pbf.tutorial.Person.last_updated < "2020-10-19"',
513 ), encoding
='utf-8', env
=test_env
)
514 assert grep_output(stdout
, 'tutorial.AddressBook')
516 def test_protobuf_message_type_leading_with_dot(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
517 '''Test Protobuf Message type is leading with dot'''
518 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
519 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
520 stdout
= subprocess
.check_output((cmd_tshark
,
521 '-r', capture_file('protobuf_test_leading_dot.pcapng'),
522 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
523 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
524 '-o', 'uat:protobuf_udp_message_types: "8123","a.b.msg"',
525 '-o', 'protobuf.preload_protos: TRUE',
526 '-o', 'protobuf.pbf_as_hf: TRUE',
527 '-Y', 'pbf.a.b.a.b.c.param3 contains "in a.b.a.b.c" && pbf.a.b.c.param6 contains "in a.b.c"',
528 ), encoding
='utf-8', env
=test_env
)
529 assert grep_output(stdout
, 'PB[(]a.b.msg[)]')
531 def test_protobuf_map_and_oneof_types(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
532 '''Test Protobuf map and oneof types, and taking keyword as identification'''
533 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
534 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
535 stdout
= subprocess
.check_output((cmd_tshark
,
536 '-r', capture_file('protobuf_test_map_and_oneof_types.pcapng'),
537 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
538 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
539 '-o', 'uat:protobuf_udp_message_types: "8124","test.map.MapMaster"',
540 '-o', 'protobuf.preload_protos: TRUE',
541 '-o', 'protobuf.pbf_as_hf: TRUE',
542 '-Y', 'pbf.test.map.MapMaster.param3 == "I\'m param3 for oneof test."' # test oneof type
543 ' && pbf.test.map.MapMaster.param4MapEntry.value == 1234' # test map type
544 ' && pbf.test.map.Foo.param1 == 88 && pbf.test.map.MapMaster.param5MapEntry.key == 88'
545 ), encoding
='utf-8', env
=test_env
)
546 assert grep_output(stdout
, 'PB[(]test.map.MapMaster[)]')
548 def test_protobuf_default_value(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
549 '''Test Protobuf feature adding missing fields with default values'''
550 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
551 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
552 stdout
= subprocess
.check_output((cmd_tshark
,
553 '-r', capture_file('protobuf_test_default_value.pcapng'),
554 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
555 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
556 '-o', 'uat:protobuf_udp_message_types: "8128","wireshark.protobuf.test.TestDefaultValueMessage"',
557 '-o', 'protobuf.preload_protos: TRUE',
558 '-o', 'protobuf.pbf_as_hf: TRUE',
559 '-o', 'protobuf.add_default_value: all',
561 '-Y', 'pbf.wireshark.protobuf.test.TestDefaultValueMessage.enumFooWithDefaultValue_Fouth == -4'
562 ' && pbf.wireshark.protobuf.test.TestDefaultValueMessage.boolWithDefaultValue_False == false'
563 ' && pbf.wireshark.protobuf.test.TestDefaultValueMessage.int32WithDefaultValue_0 == 0'
564 ' && pbf.wireshark.protobuf.test.TestDefaultValueMessage.doubleWithDefaultValue_Negative0point12345678 == -0.12345678'
565 ' && pbf.wireshark.protobuf.test.TestDefaultValueMessage.stringWithDefaultValue_SymbolPi contains "Pi."'
566 ' && pbf.wireshark.protobuf.test.TestDefaultValueMessage.bytesWithDefaultValue_1F2F890D0A00004B == 1f:2f:89:0d:0a:00:00:4b'
567 ' && pbf.wireshark.protobuf.test.TestDefaultValueMessage.optional' # test taking keyword 'optional' as identification
568 ' && pbf.wireshark.protobuf.test.TestDefaultValueMessage.message' # test taking keyword 'message' as identification
569 ), encoding
='utf-8', env
=test_env
)
570 assert grep_output(stdout
, 'floatWithDefaultValue_0point23: 0.23') # another default value will be displayed
571 assert grep_output(stdout
, 'missing required field \'missingRequiredField\'') # check the missing required field export warn
573 def test_protobuf_field_subdissector(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
574 '''Test "protobuf_field" subdissector table'''
575 if not features
.have_lua
:
576 pytest
.skip('Test requires Lua scripting support.')
577 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
578 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
579 lua_file
= os
.path
.join(dirs
.lua_dir
, 'protobuf_test_field_subdissector_table.lua')
580 stdout
= subprocess
.check_output((cmd_tshark
,
581 '-r', capture_file('protobuf_udp_addressbook_with_image_ts.pcapng'),
582 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
583 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
584 '-o', 'uat:protobuf_udp_message_types: "8127","tutorial.AddressBook"',
585 '-o', 'protobuf.preload_protos: TRUE',
586 '-o', 'protobuf.pbf_as_hf: TRUE',
587 '-X', 'lua_script:{}'.format(lua_file
),
588 '-Y', 'pbf.tutorial.Person.name == "Jason" && pbf.tutorial.Person.last_updated && png',
589 ), encoding
='utf-8', env
=test_env
)
590 assert grep_output(stdout
, 'PB[(]tutorial.AddressBook[)]')
592 def test_protobuf_called_by_custom_dissector(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
593 '''Test Protobuf invoked by other dissector (passing type by pinfo.private)'''
594 if not features
.have_lua
:
595 pytest
.skip('Test requires Lua scripting support.')
596 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
597 user_defined_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'user_defined_types').replace('\\', '/')
598 lua_file
= os
.path
.join(dirs
.lua_dir
, 'protobuf_test_called_by_custom_dissector.lua')
599 stdout
= subprocess
.check_output((cmd_tshark
,
600 '-r', capture_file('protobuf_tcp_addressbook.pcapng.gz'),
601 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
602 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(user_defined_types_dir
, 'TRUE'),
603 '-o', 'protobuf.preload_protos: TRUE',
604 '-o', 'protobuf.pbf_as_hf: TRUE',
605 '-X', 'lua_script:{}'.format(lua_file
),
606 '-d', 'tcp.port==18127,addrbook',
607 '-Y', 'pbf.tutorial.Person.name == "Jason" && pbf.tutorial.Person.last_updated',
608 ), encoding
='utf-8', env
=test_env
)
609 assert grep_output(stdout
, 'tutorial.AddressBook')
611 def test_protobuf_complex_syntax(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
612 '''Test Protobuf parsing complex syntax .proto files'''
613 well_know_types_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'well_know_types').replace('\\', '/')
614 complex_proto_files_dir
= os
.path
.join(dirs
.protobuf_lang_files_dir
, 'complex_proto_files').replace('\\', '/')
615 stdout
= subprocess
.check_output((cmd_tshark
,
616 '-r', capture_file('protobuf_udp_addressbook_with_image_ts.pcapng'),
617 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(well_know_types_dir
, 'FALSE'),
618 '-o', 'uat:protobuf_search_paths: "{}","{}"'.format(complex_proto_files_dir
, 'TRUE'),
619 '-o', 'protobuf.preload_protos: TRUE',
620 '-o', 'protobuf.pbf_as_hf: TRUE',
621 '-Y', 'pbf.wireshark.protobuf.test.complex.syntax.TestFileParsed.last_field_for_wireshark_test'
622 ' && pbf.protobuf_unittest.TestFileParsed.last_field_for_wireshark_test',
623 ), encoding
='utf-8', env
=test_env
)
624 # the output must be empty and not contain something like:
625 # tshark: "pbf.xxx.TestFileParsed.last_field_for_wireshark_test" is neither a field nor a protocol name.
627 # tshark: Protobuf: Error(s)
628 assert not grep_output(stdout
, '.last_field_for_wireshark_test')
629 assert not grep_output(stdout
, 'Protobuf: Error')
631 class TestDissectTcp
:
633 def check_tcp_out_of_order(cmd_tshark
, dirs
, test_env
, extraArgs
=[]):
634 capture_file
= os
.path
.join(dirs
.capture_dir
, 'http-ooo.pcap')
635 stdout
= subprocess
.check_output([cmd_tshark
,
637 '-otcp.reassemble_out_of_order:TRUE',
639 ] + extraArgs
, encoding
='utf-8', env
=test_env
)
640 assert count_output(stdout
, 'HTTP') == 5
641 assert grep_output(stdout
, r
'^\s*4\s.*PUT /1 HTTP/1.1')
642 assert grep_output(stdout
, r
'^\s*7\s.*GET /2 HTTP/1.1')
643 assert grep_output(stdout
, r
'^\s*10\s.*PUT /3 HTTP/1.1')
644 assert grep_output(stdout
, r
'^\s*11\s.*PUT /4 HTTP/1.1')
645 assert grep_output(stdout
, r
'^\s*15\s.*PUT /5 HTTP/1.1')
647 def test_tcp_out_of_order_onepass(self
, cmd_tshark
, dirs
, test_env
):
648 self
.check_tcp_out_of_order(cmd_tshark
, dirs
, test_env
)
650 def test_tcp_out_of_order_twopass(self
, cmd_tshark
, dirs
, test_env
):
651 self
.check_tcp_out_of_order(cmd_tshark
, dirs
, test_env
, extraArgs
=['-2'])
653 def test_tcp_out_of_order_data_after_syn(self
, cmd_tshark
, capture_file
, test_env
):
654 '''Test when the first non-empty segment is OoO.'''
655 stdout
= subprocess
.check_output((cmd_tshark
,
656 '-r', capture_file('dns-ooo.pcap'),
657 '-otcp.reassemble_out_of_order:TRUE',
658 '-Y', 'dns', '-Tfields', '-edns.qry.name',
659 ), encoding
='utf-8', env
=test_env
)
660 assert stdout
.strip() == 'example.com'
662 def test_tcp_out_of_order_first_gap(self
, cmd_tshark
, capture_file
, test_env
):
664 Test reporting of "reassembled_in" in the OoO frame that contains the
665 initial segment (Bug 15420). Additionally, test for proper reporting
666 when the initial segment is retransmitted.
667 For PDU H123 (where H is the HTTP Request header and 1, 2 and 3 are part
668 of the body), the order is: (SYN) 2 H H 1 3 H.
670 stdout
= subprocess
.check_output((cmd_tshark
,
671 '-r', capture_file('http-ooo2.pcap'),
672 '-otcp.reassemble_out_of_order:TRUE',
674 '-eframe.number', '-etcp.reassembled_in', '-e_ws.col.info',
676 ), encoding
='utf-8', env
=test_env
)
677 lines
= stdout
.split('\n')
678 # 2 - start of OoO MSP
679 assert '2\t6\t[TCP Previous segment not captured]' in lines
[1]
680 assert '[TCP segment of a reassembled PDU]' in lines
[1] or '[TCP PDU reassembled in' in lines
[1]
682 # H - first time that the start of the MSP is delivered
683 assert '3\t6\t[TCP Out-Of-Order]' in lines
[2]
684 assert '[TCP segment of a reassembled PDU]' in lines
[2] or '[TCP PDU reassembled in' in lines
[2]
686 # H - first retransmission. Because this is before the reassembly
687 # completes we can add it to the reassembly
688 assert '4\t6\t[TCP Retransmission]' in lines
[3]
689 assert '[TCP segment of a reassembled PDU]' in lines
[3] or '[TCP PDU reassembled in' in lines
[3]
691 # 1 - continue reassembly
692 assert '5\t6\t[TCP Out-Of-Order]' in lines
[4]
693 assert '[TCP segment of a reassembled PDU]' in lines
[4] or '[TCP PDU reassembled in' in lines
[4]
695 # 3 - finish reassembly
696 assert '6\t\tPUT /0 HTTP/1.1' in lines
[5]
698 # H - second retransmission. This is after the reassembly completes
699 # so we do not add it to the reassembly (but throw a ReassemblyError.)
700 assert '7\t\t' in lines
[6]
701 assert '[TCP segment of a reassembled PDU]' not in lines
[6] and '[TCP PDU reassembled in' not in lines
[6]
703 def test_tcp_reassembly_more_data_1(self
, cmd_tshark
, capture_file
, test_env
):
705 Tests that reassembly also works when a new packet begins at the same
706 sequence number as the initial segment. This models behavior with the
707 ZeroWindowProbe: the initial segment contains a single byte. The second
708 segment contains that byte, plus the remainder.
710 stdout
= subprocess
.check_output((cmd_tshark
,
711 '-r', capture_file('retrans-tls.pcap'),
712 '-Ytls', '-Tfields', '-eframe.number', '-etls.record.length',),
713 encoding
='utf-8', env
=test_env
)
714 # First pass dissection actually accepted the first frame as TLS, but
715 # subsequently requested reassembly.
716 assert stdout
== '1\t\n2\t16\n'
718 def test_tcp_reassembly_more_data_2(self
, cmd_tshark
, capture_file
, test_env
):
720 Like test_tcp_reassembly_more_data_1, but checks the second pass (-2).
722 stdout
= subprocess
.check_output((cmd_tshark
,
723 '-r', capture_file('retrans-tls.pcap'),
724 '-Ytls', '-Tfields', '-eframe.number', '-etls.record.length', '-2'),
725 encoding
='utf-8', env
=test_env
)
726 assert stdout
== '2\t16\n'
728 class TestDissectGit
:
729 def test_git_prot(self
, cmd_tshark
, capture_file
, features
, test_env
):
731 Check for Git protocol version 2, flush and delimiter packets.
732 Ensure there are no malformed packets.
734 stdout
= subprocess
.check_output((cmd_tshark
,
735 '-r', capture_file('gitOverTCP.pcap'),
736 '-Ygit', '-Tfields', '-egit.version', '-egit.packet_type',
737 '-zexpert', '-e_ws.expert',
738 ), encoding
='utf-8', env
=test_env
)
739 # `epan/dissectors/packet-git.c` parses the Git Protocol version
740 # from ASCII '1' or '2' to integers 49 or 50 in grep output.
741 # 0x0000 are flush packets.
742 # 0x0001 are delimiter packets.
743 # Pre-existing git Malformed Packets in this pcap were addressed
744 # with the parsing of the delimiter packets. This test ensures
745 # pcap gitOverTCP's delim packets are parsed and that there are no
746 # malformed packets with "Expert Info/Errors" in the same pcap.
747 # Additional test cases for other scenarios, i.e actually malformed
748 # git packets, might be required.
749 assert stdout
== '50\t\t\n\t0\t\n\t\t\n\t1,0\t\n'
751 class TestDissectTls
:
753 def check_tls_handshake_reassembly(cmd_tshark
, capture_file
, test_env
,
755 # Include -zexpert just to be sure that no exception has occurred. It
756 # is not strictly necessary as the extension to be matched is the last
757 # one in the handshake message.
758 stdout
= subprocess
.check_output([cmd_tshark
,
759 '-r', capture_file('tls-fragmented-handshakes.pcap.gz'),
761 '-Ytls.handshake.extension.data',
762 '-Tfields', '-etls.handshake.extension.data'] + extraArgs
,
763 encoding
='utf-8', env
=test_env
)
764 stdout
= stdout
.replace(',', '\n')
765 # Expected output are lines with 0001, 0002, ..., 03e8
766 expected
= ''.join('%04x\n' % i
for i
in range(1, 1001))
767 assert stdout
== expected
770 def check_tls_reassembly_over_tcp_reassembly(cmd_tshark
, capture_file
, test_env
,
772 stdout
= subprocess
.check_output([cmd_tshark
,
773 '-r', capture_file('tls-fragmented-over-tcp-segmented.pcapng.gz'),
776 '-Tfields', '-ehttp.host'] + extraArgs
,
777 encoding
='utf-8', env
=test_env
)
778 stdout
= stdout
.replace(',', '\n')
779 assert stdout
== 'reports.crashlytics.com\n'
781 def test_tls_handshake_reassembly(self
, cmd_tshark
, capture_file
, test_env
):
782 '''Verify that TCP and TLS handshake reassembly works.'''
783 self
.check_tls_handshake_reassembly(cmd_tshark
, capture_file
, test_env
)
785 def test_tls_handshake_reassembly_2(self
, cmd_tshark
, capture_file
, test_env
):
786 '''Verify that TCP and TLS handshake reassembly works (second pass).'''
787 self
.check_tls_handshake_reassembly(
788 cmd_tshark
, capture_file
, test_env
, extraArgs
=['-2'])
790 def test_tls_reassembly_over_tcp_reassembly(self
, cmd_tshark
, capture_file
, features
, test_env
):
791 '''Verify that TLS reassembly over TCP reassembly works.'''
792 if not features
.have_gnutls
:
793 pytest
.skip('Requires GnuTLS.')
794 self
.check_tls_reassembly_over_tcp_reassembly(cmd_tshark
, capture_file
, test_env
)
796 def test_tls_reassembly_over_tcp_reassembly_2(self
, cmd_tshark
, capture_file
, features
, test_env
):
797 '''Verify that TLS reassembly over TCP reassembly works (second pass).'''
798 # pinfo->curr_layer_num can be different on the second pass than the
799 # first pass, because the HTTP dissector isn't called for the first
800 # TLS record on the second pass.
801 if not features
.have_gnutls
:
802 pytest
.skip('Requires GnuTLS.')
803 self
.check_tls_reassembly_over_tcp_reassembly(cmd_tshark
, capture_file
,
804 test_env
, extraArgs
=['-2'])
807 def check_tls_out_of_order(cmd_tshark
, capture_file
, test_env
, extraArgs
=[]):
808 stdout
= subprocess
.check_output([cmd_tshark
,
809 '-r', capture_file('challenge01_ooo_stream.pcapng.gz'),
810 '-otcp.reassemble_out_of_order:TRUE',
812 '-zhttp,stat,png or image-jfif',
813 ] + extraArgs
, encoding
='utf-8', env
=test_env
)
814 assert grep_output(stdout
, r
'200 OK\s*11')
816 def test_tls_out_of_order(self
, cmd_tshark
, capture_file
, features
, test_env
):
817 '''Verify that TLS reassembly over TCP reassembly works.'''
818 if not features
.have_gnutls
:
819 pytest
.skip('Requires GnuTLS.')
820 self
.check_tls_out_of_order(cmd_tshark
, capture_file
, test_env
)
822 def test_tls_out_of_order_second_pass(self
, cmd_tshark
, capture_file
, features
, test_env
):
823 '''Verify that TLS reassembly over TCP reassembly works (second pass).'''
824 if not features
.have_gnutls
:
825 pytest
.skip('Requires GnuTLS.')
826 self
.check_tls_out_of_order(cmd_tshark
, capture_file
,
827 test_env
, extraArgs
=['-2'])
829 class TestDissectQuic
:
831 def check_quic_tls_handshake_reassembly(cmd_tshark
, capture_file
, test_env
,
833 # An assortment of QUIC carrying TLS handshakes that need to be
834 # reassembled, including fragmented in one packet, fragmented in
835 # multiple packets, fragmented in multiple out of order packets,
836 # retried, retried with overlap from the original packets, and retried
837 # with one of the original packets missing (but all data there.)
838 # Include -zexpert just to be sure that nothing Warn or higher occurred.
839 # Note level expert infos may be expected with the overlaps and
841 stdout
= subprocess
.check_output([cmd_tshark
,
842 '-r', capture_file('quic-fragmented-handshakes.pcapng.gz'),
844 '-Ytls.handshake.type',
845 '-o', 'gui.column.format:"Handshake Type","%Cus:tls.handshake.type:0:R"',
847 encoding
='utf-8', env
=test_env
)
848 assert count_output(stdout
, 'Client Hello') == 18
849 assert count_output(stdout
, 'Server Hello') == 2
850 assert count_output(stdout
, 'Finished') == 2
851 assert count_output(stdout
, 'New Session Ticket,New Session Ticket') == 1
852 assert count_output(stdout
, 'Certificate') == 2
853 assert not grep_output(stdout
, 'Warns')
854 assert not grep_output(stdout
, 'Errors')
856 def test_quic_tls_handshake_reassembly(self
, cmd_tshark
, capture_file
, test_env
):
857 '''Verify that QUIC and TLS handshake reassembly works.'''
858 self
.check_quic_tls_handshake_reassembly(cmd_tshark
, capture_file
, test_env
)
860 def test_quic_tls_handshake_reassembly_2(self
, cmd_tshark
, capture_file
, test_env
):
861 '''Verify that QUIC and TLS handshake reassembly works (second pass).'''
862 self
.check_quic_tls_handshake_reassembly(
863 cmd_tshark
, capture_file
, test_env
, extraArgs
=['-2'])
865 class TestDecompressSmb2
:
867 def extract_compressed_payload(cmd_tshark
, capture_file
, test_env
, frame_num
):
868 stdout
= subprocess
.check_output((cmd_tshark
,
869 '-r', capture_file('smb311-lz77-lz77huff-lznt1.pcap.gz'),
870 '-Tfields', '-edata.data',
871 '-Y', 'frame.number == %d'%frame
_num
,
872 ), encoding
='utf-8', env
=test_env
)
873 assert b
'a'*4096 == bytes
.fromhex(stdout
.strip())
875 def test_smb311_read_lz77(self
, cmd_tshark
, capture_file
, test_env
):
876 self
.extract_compressed_payload(cmd_tshark
, capture_file
, test_env
, 1)
878 def test_smb311_read_lz77huff(self
, cmd_tshark
, capture_file
, test_env
):
879 self
.extract_compressed_payload(cmd_tshark
, capture_file
, test_env
, 2)
881 def test_smb311_read_lznt1(self
, cmd_tshark
, capture_file
, test_env
):
882 if sys
.byteorder
== 'big':
883 pytest
.skip('this test is supported on little endian only')
884 self
.extract_compressed_payload(cmd_tshark
, capture_file
, test_env
, 3)
886 def extract_chained_compressed_payload(self
, cmd_tshark
, capture_file
, test_env
, frame_num
):
887 stdout
= subprocess
.check_output((cmd_tshark
,
888 '-r', capture_file('smb311-chained-patternv1-lznt1.pcapng.gz'),
889 '-Tfields', '-edata.data',
890 '-Y', 'frame.number == %d'%frame
_num
,
891 ), encoding
='utf-8', env
=test_env
)
892 assert b
'\xaa'*256 == bytes
.fromhex(stdout
.strip())
894 def test_smb311_chained_lznt1_patternv1(self
, cmd_tshark
, capture_file
, test_env
):
895 if sys
.byteorder
== 'big':
896 pytest
.skip('this test is supported on little endian only')
897 self
.extract_chained_compressed_payload(cmd_tshark
, capture_file
, test_env
, 1)
899 def test_smb311_chained_none_patternv1(self
, cmd_tshark
, capture_file
, test_env
):
900 self
.extract_chained_compressed_payload(cmd_tshark
, capture_file
, test_env
, 2)
902 class TestDissectCommunityId
:
904 def check_baseline(dirs
, output
, baseline
):
905 baseline_file
= os
.path
.join(dirs
.baseline_dir
, baseline
)
906 with
open(baseline_file
) as f
:
907 baseline_data
= f
.read()
909 assert output
== baseline_data
911 def test_communityid(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
912 # Run tshark on our Community ID test pcap, enabling the
913 # postdissector (it is disabled by default), and asking for
914 # the Community ID value as field output. Verify that this
915 # exits successfully:
916 stdout
= subprocess
.check_output(
918 '--enable-protocol', 'communityid',
919 '-r', capture_file('communityid.pcap.gz'),
920 '-Tfields', '-ecommunityid',
921 ), encoding
='utf-8', env
=test_env
)
923 self
.check_baseline(dirs
, stdout
, 'communityid.txt')
925 def test_communityid_filter(self
, cmd_tshark
, features
, dirs
, capture_file
, test_env
):
926 # Run tshark on our Community ID test pcap, enabling the
927 # postdissector and filtering the result.
928 stdout
= subprocess
.check_output(
930 '--enable-protocol', 'communityid',
931 '-r', capture_file('communityid.pcap.gz'),
932 '-Tfields', '-ecommunityid',
933 'communityid=="1:d/FP5EW3wiY1vCndhwleRRKHowQ="'
934 ), encoding
='utf-8', env
=test_env
)
936 self
.check_baseline(dirs
, stdout
, 'communityid-filtered.txt')
938 class TestDecompressMongo
:
939 def test_decompress_zstd(self
, cmd_tshark
, features
, capture_file
, test_env
):
940 if not features
.have_zstd
:
941 pytest
.skip('Requires zstd.')
942 stdout
= subprocess
.check_output((cmd_tshark
,
943 '-d', 'tcp.port==27017,mongo',
944 '-r', capture_file('mongo-zstd.pcapng'),
945 '-Tfields', '-emongo.element.name'
946 ), encoding
='utf-8', env
=test_env
)
947 # Check the element names of the decompressed body.
948 assert 'drop,lsid,id,$db' == stdout
.strip()