3 # By Gerald Combs <gerald@wireshark.org>
5 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 # SPDX-License-Identifier: GPL-2.0-or-later
17 from subprocesstest
import cat_dhcp_command
, cat_cap_file_command
, count_output
, grep_output
, check_packet_count
27 testout_pcap
= 'testout.pcap'
28 testout_pcapng
= 'testout.pcapng'
31 class UdpTrafficGenerator(threading
.Thread
):
33 super().__init
__(daemon
=True)
34 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
38 while not self
.stopped
:
40 self
.sock
.sendto(b
'Wireshark test\n', ('127.0.0.1', 9))
49 def traffic_generator():
51 Traffic generator factory. Invoking it returns a tuple (start_func, cfilter)
52 where cfilter is a capture filter to match the generated traffic.
53 start_func can be invoked to start generating traffic and returns a function
54 which can be used to stop traffic generation early.
55 Currently generates a bunch of UDP traffic to localhost.
58 def start_processes():
59 thread
= UdpTrafficGenerator()
61 threads
.append(thread
)
64 yield start_processes
, 'udp port 9'
66 for thread
in threads
:
70 @pytest.fixture(scope
='session')
71 def wireshark_k(wireshark_command
):
72 return tuple(list(wireshark_command
) + ['-k'])
75 def capture_command(*args
, shell
=False, quoted
=False):
77 if type(cmd_args
[0]) != str:
78 # Assume something like ['wireshark', '-k']
79 cmd_args
= list(cmd_args
[0]) + list(cmd_args
)[1:]
81 cmd_args
[0] = f
'"{cmd_args[0]}"'
82 cmd_args
= ' '.join(cmd_args
)
87 def check_capture_10_packets(capture_interface
, cmd_capinfos
, traffic_generator
, result_file
):
88 start_traffic
, cfilter
= traffic_generator
89 def check_capture_10_packets_real(self
, cmd
=None, to_stdout
=False, env
=None):
90 assert cmd
is not None
91 testout_file
= result_file(testout_pcap
)
92 stop_traffic
= start_traffic()
94 subprocesstest
.check_run(capture_command(cmd
,
95 '-i', '"{}"'.format(capture_interface
),
99 '-a', 'duration:{}'.format(capture_duration
),
100 '-f', '"{}"'.format(cfilter
),
106 subprocesstest
.check_run(capture_command(cmd
,
107 '-i', capture_interface
,
111 '-a', 'duration:{}'.format(capture_duration
),
115 check_packet_count(cmd_capinfos
, 10, testout_file
)
116 return check_capture_10_packets_real
120 def check_capture_fifo(cmd_capinfos
, result_file
):
121 if sys
.platform
== 'win32':
122 pytest
.skip('Test requires OS fifo support.')
124 def check_capture_fifo_real(self
, cmd
=None, env
=None):
125 assert cmd
is not None
126 testout_file
= result_file(testout_pcap
)
127 fifo_file
= result_file('testout.fifo')
129 # If a previous test left its fifo laying around, e.g. from a failure, remove it.
134 slow_dhcp_cmd
= cat_dhcp_command('slow')
135 fifo_proc
= subprocess
.Popen(
136 ('{0} > {1}'.format(slow_dhcp_cmd
, fifo_file
)),
138 subprocesstest
.check_run(capture_command(cmd
,
142 '-a', 'duration:{}'.format(capture_duration
),
145 assert os
.path
.isfile(testout_file
)
146 check_packet_count(cmd_capinfos
, 8, testout_file
)
147 return check_capture_fifo_real
151 def check_capture_stdin(cmd_capinfos
, result_file
):
152 # Capturing always requires dumpcap, hence the dependency on it.
153 def check_capture_stdin_real(self
, cmd
=None, env
=None):
154 # Similar to suite_io.check_io_4_packets.
155 assert cmd
is not None
156 testout_file
= result_file(testout_pcap
)
157 slow_dhcp_cmd
= cat_dhcp_command('slow')
158 capture_cmd
= capture_command(cmd
,
160 '-w', f
'"{testout_file}"',
161 '-a', 'duration:{}'.format(capture_duration
),
164 is_gui
= type(cmd
) != str and '-k' in cmd
[0]
166 capture_cmd
+= ' --log-level=info'
167 if sysconfig
.get_platform().startswith('mingw'):
168 pytest
.skip('FIXME Pipes are broken with the MSYS2 shell')
169 pipe_proc
= subprocesstest
.check_run(slow_dhcp_cmd
+ ' | ' + capture_cmd
, shell
=True, capture_output
=True, env
=env
)
171 # Wireshark uses stdout and not stderr for diagnostic messages
173 assert grep_output(pipe_proc
.stdout
, 'Wireshark is up and ready to go'), 'No startup message.'
174 assert grep_output(pipe_proc
.stdout
, 'Capture started'), 'No capture start message.'
175 assert grep_output(pipe_proc
.stdout
, 'Capture stopped'), 'No capture stop message.'
176 assert os
.path
.isfile(testout_file
)
177 check_packet_count(cmd_capinfos
, 8, testout_file
)
178 return check_capture_stdin_real
182 def check_capture_read_filter(capture_interface
, traffic_generator
, cmd_capinfos
, result_file
):
183 start_traffic
, cfilter
= traffic_generator
184 def check_capture_read_filter_real(self
, cmd
=None, env
=None):
185 assert cmd
is not None
186 testout_file
= result_file(testout_pcap
)
187 stop_traffic
= start_traffic()
188 subprocesstest
.check_run(capture_command(cmd
,
189 '-i', capture_interface
,
193 '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
195 '-a', 'duration:{}'.format(capture_duration
),
199 check_packet_count(cmd_capinfos
, 0, testout_file
)
200 return check_capture_read_filter_real
203 def check_capture_snapshot_len(capture_interface
, cmd_tshark
, traffic_generator
, cmd_capinfos
, result_file
):
204 start_traffic
, cfilter
= traffic_generator
205 def check_capture_snapshot_len_real(self
, cmd
=None, env
=None):
206 assert cmd
is not None
207 stop_traffic
= start_traffic()
208 testout_file
= result_file(testout_pcap
)
209 subprocesstest
.check_run(capture_command(cmd
,
210 '-i', capture_interface
,
213 '-s', str(snapshot_len
),
214 '-a', 'duration:{}'.format(capture_duration
),
218 assert os
.path
.isfile(testout_file
)
220 # Use tshark to filter out all packets larger than 68 bytes.
221 testout2_file
= result_file('testout2.pcap')
223 subprocesstest
.check_run((cmd_tshark
,
226 '-Y', 'frame.cap_len>{}'.format(snapshot_len
),
228 check_packet_count(cmd_capinfos
, 0, testout2_file
)
229 return check_capture_snapshot_len_real
233 def check_dumpcap_autostop_stdin(cmd_dumpcap
, cmd_capinfos
, result_file
):
234 def check_dumpcap_autostop_stdin_real(self
, packets
=None, filesize
=None, env
=None):
235 # Similar to check_capture_stdin.
236 testout_file
= result_file(testout_pcap
)
237 cat100_dhcp_cmd
= cat_dhcp_command('cat100')
238 condition
='oops:invalid'
240 if packets
is not None:
241 condition
= 'packets:{}'.format(packets
)
242 elif filesize
is not None:
243 condition
= 'filesize:{}'.format(filesize
)
245 raise AssertionError('Need one of packets or filesize')
247 cmd_
= '"{}"'.format(cmd_dumpcap
)
248 capture_cmd
= ' '.join((cmd_
,
253 if sysconfig
.get_platform().startswith('mingw'):
254 pytest
.skip('FIXME Pipes are broken with the MSYS2 shell')
255 subprocesstest
.check_run(cat100_dhcp_cmd
+ ' | ' + capture_cmd
, shell
=True, env
=env
)
256 assert os
.path
.isfile(testout_file
)
258 if packets
is not None:
259 check_packet_count(cmd_capinfos
, packets
, testout_file
)
260 elif filesize
is not None:
261 capturekb
= os
.path
.getsize(testout_file
) / 1000
262 assert capturekb
>= filesize
263 return check_dumpcap_autostop_stdin_real
267 def check_dumpcap_ringbuffer_stdin(cmd_dumpcap
, cmd_capinfos
, result_file
):
268 def check_dumpcap_ringbuffer_stdin_real(self
, packets
=None, filesize
=None, env
=None):
269 # Similar to check_capture_stdin.
270 rb_unique
= 'dhcp_rb_' + uuid
.uuid4().hex[:6] # Random ID
271 testout_file
= result_file('testout.{}.pcapng'.format(rb_unique
))
272 testout_glob
= result_file('testout.{}_*.pcapng'.format(rb_unique
))
273 cat100_dhcp_cmd
= cat_dhcp_command('cat100')
274 condition
='oops:invalid'
276 if packets
is not None:
277 condition
= 'packets:{}'.format(packets
)
278 elif filesize
is not None:
279 condition
= 'filesize:{}'.format(filesize
)
281 raise AssertionError('Need one of packets or filesize')
283 cmd_
= '"{}"'.format(cmd_dumpcap
)
284 capture_cmd
= ' '.join((cmd_
,
290 if sysconfig
.get_platform().startswith('mingw'):
291 pytest
.skip('FIXME Pipes are broken with the MSYS2 shell')
292 subprocesstest
.check_run(cat100_dhcp_cmd
+ ' | ' + capture_cmd
, shell
=True, env
=env
)
294 rb_files
= glob
.glob(testout_glob
)
295 assert len(rb_files
) == 2
298 assert os
.path
.isfile(rbf
)
299 if packets
is not None:
300 check_packet_count(cmd_capinfos
, packets
, rbf
)
301 elif filesize
is not None:
302 capturekb
= os
.path
.getsize(rbf
) / 1000
303 assert capturekb
>= filesize
304 return check_dumpcap_ringbuffer_stdin_real
308 def check_dumpcap_pcapng_sections(cmd_dumpcap
, cmd_tshark
, cmd_capinfos
, capture_file
, result_file
):
309 if sys
.platform
== 'win32':
310 pytest
.skip('Test requires OS fifo support.')
311 def check_dumpcap_pcapng_sections_real(self
, multi_input
=False, multi_output
=False, env
=None):
312 # Make sure we always test multiple SHBs in an input.
314 capture_file('many_interfaces.pcapng.1'),
315 capture_file('many_interfaces.pcapng.2')
318 in_files_l
.append([ capture_file('many_interfaces.pcapng.3') ])
321 # Default values for our validity tests
331 check_vals
= [ check_val_d
]
333 for in_files
in in_files_l
:
334 fifo_file
= result_file('dumpcap_pcapng_sections_{}.fifo'.format(len(fifo_files
) + 1))
335 fifo_files
.append(fifo_file
)
336 # If a previous test left its fifo laying around, e.g. from a failure, remove it.
339 except Exception: pass
341 cat_cmd
= cat_cap_file_command(in_files
)
342 fifo_procs
.append(subprocess
.Popen(('{0} > {1}'.format(cat_cmd
, fifo_file
)), shell
=True))
345 rb_unique
= 'sections_rb_' + uuid
.uuid4().hex[:6] # Random ID
346 testout_file
= result_file('testout.{}.pcapng'.format(rb_unique
))
347 testout_glob
= result_file('testout.{}_*.pcapng'.format(rb_unique
))
348 check_vals
.append(check_val_d
.copy())
349 # check_vals[]['filename'] will be filled in below
351 testout_file
= result_file(testout_pcapng
)
352 check_vals
[0]['filename'] = testout_file
355 if not multi_input
and not multi_output
:
356 # Passthrough SHBs, single output file
361 check_vals
[0]['packet_count'] = 79
362 check_vals
[0]['idb_count'] = 22
363 check_vals
[0]['ua_pt1_count'] = 1
364 check_vals
[0]['ua_pt2_count'] = 1
365 elif not multi_input
and multi_output
:
366 # Passthrough SHBs, multiple output files
373 check_vals
[0]['packet_count'] = 53
374 check_vals
[0]['idb_count'] = 11
375 check_vals
[0]['ua_pt1_count'] = 1
376 check_vals
[1]['packet_count'] = 26
377 check_vals
[1]['idb_count'] = 22
378 check_vals
[1]['ua_pt1_count'] = 1
379 check_vals
[1]['ua_pt2_count'] = 1
380 elif multi_input
and not multi_output
:
381 # Dumpcap SHBs, single output file
387 check_vals
[0]['packet_count'] = 88
388 check_vals
[0]['idb_count'] = 33
389 check_vals
[0]['ua_dc_count'] = 1
391 # Dumpcap SHBs, multiple output files
399 check_vals
[0]['packet_count'] = 53
400 check_vals
[0]['idb_count'] = 11
401 check_vals
[0]['ua_dc_count'] = 1
402 check_vals
[1]['packet_count'] = 35
403 check_vals
[1]['idb_count'] = 33
404 check_vals
[1]['ua_dc_count'] = 1
406 capture_cmd
= capture_command(cmd_dumpcap
, *capture_cmd_args
)
408 subprocesstest
.check_run(capture_cmd
, env
=env
)
409 for fifo_proc
in fifo_procs
: fifo_proc
.kill()
413 rb_files
= sorted(glob
.glob(testout_glob
))
414 assert len(rb_files
) == 2
415 check_vals
[0]['filename'] = rb_files
[0]
416 check_vals
[1]['filename'] = rb_files
[1]
419 assert os
.path
.isfile(rbf
)
423 if not multi_input
and not multi_output
:
424 # Check strict bit-for-bit passthrough.
425 in_hash
= hashlib
.sha256()
426 out_hash
= hashlib
.sha256()
427 for in_file
in in_files_l
[0]:
428 in_cap_file
= capture_file(in_file
)
429 with
open(in_cap_file
, 'rb') as f
:
430 in_hash
.update(f
.read())
431 with
open(testout_file
, 'rb') as f
:
432 out_hash
.update(f
.read())
433 assert in_hash
.hexdigest() == out_hash
.hexdigest()
435 # many_interfaces.pcapng.1 : 64 packets written by "Passthrough test #1"
436 # many_interfaces.pcapng.2 : 15 packets written by "Passthrough test #2"
437 # many_interfaces.pcapng.3 : 9 packets written by "Passthrough test #3"
438 # Each has 11 interfaces.
439 idb_compare_eq
= True
440 if multi_input
and multi_output
:
441 # Having multiple inputs forces the use of threads. In our
442 # case this means that non-packet block counts in the first
443 # file in is nondeterministic.
444 idb_compare_eq
= False
445 for check_val
in check_vals
:
446 check_packet_count(cmd_capinfos
, check_val
['packet_count'], check_val
['filename'])
448 tshark_proc
= subprocesstest
.check_run(capture_command(cmd_tshark
,
449 '-r', check_val
['filename'],
451 '-X', 'read_format:MIME Files Format'
452 ), capture_output
=True, env
=env
)
453 # XXX Are there any other sanity checks we should run?
455 assert count_output(tshark_proc
.stdout
, r
'Block \d+: Interface Description Block \d+') \
456 == check_val
['idb_count']
458 assert count_output(tshark_proc
.stdout
, r
'Block \d+: Interface Description Block \d+') \
459 >= check_val
['idb_count']
460 idb_compare_eq
= True
461 assert count_output(tshark_proc
.stdout
, r
'Option: User Application = Passthrough test #1') \
462 == check_val
['ua_pt1_count']
463 assert count_output(tshark_proc
.stdout
, r
'Option: User Application = Passthrough test #2') \
464 == check_val
['ua_pt2_count']
465 assert count_output(tshark_proc
.stdout
, r
'Option: User Application = Passthrough test #3') \
466 == check_val
['ua_pt3_count']
467 assert count_output(tshark_proc
.stdout
, r
'Option: User Application = Dumpcap \(Wireshark\)') \
468 == check_val
['ua_dc_count']
469 return check_dumpcap_pcapng_sections_real
472 class TestWiresharkCapture
:
473 def test_wireshark_capture_10_packets_to_file(self
, request
, wireshark_k
, check_capture_10_packets
, make_screenshot_on_error
, test_env
):
474 '''Capture 10 packets from the network to a file using Wireshark'''
475 disabled
= request
.config
.getoption('--disable-gui', default
=False)
477 pytest
.skip('GUI tests are disabled via --disable-gui')
478 with
make_screenshot_on_error():
479 check_capture_10_packets(self
, cmd
=wireshark_k
, env
=test_env
)
481 # Wireshark doesn't currently support writing to stdout while capturing.
482 # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
483 # '''Capture 10 packets from the network to stdout using Wireshark'''
484 # check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
486 def test_wireshark_capture_from_fifo(self
, request
, wireshark_k
, check_capture_fifo
, make_screenshot_on_error
, test_env
):
487 '''Capture from a fifo using Wireshark'''
488 disabled
= request
.config
.getoption('--disable-gui', default
=False)
490 pytest
.skip('GUI tests are disabled via --disable-gui')
491 with
make_screenshot_on_error():
492 check_capture_fifo(self
, cmd
=wireshark_k
, env
=test_env
)
494 def test_wireshark_capture_from_stdin(self
, request
, wireshark_k
, check_capture_stdin
, make_screenshot_on_error
, test_env
):
495 '''Capture from stdin using Wireshark'''
496 disabled
= request
.config
.getoption('--disable-gui', default
=False)
498 pytest
.skip('GUI tests are disabled via --disable-gui')
499 with
make_screenshot_on_error():
500 check_capture_stdin(self
, cmd
=wireshark_k
, env
=test_env
)
502 def test_wireshark_capture_snapshot_len(self
, request
, wireshark_k
, check_capture_snapshot_len
, make_screenshot_on_error
, test_env
):
503 '''Capture truncated packets using Wireshark'''
504 disabled
= request
.config
.getoption('--disable-gui', default
=False)
506 pytest
.skip('GUI tests are disabled via --disable-gui')
507 with
make_screenshot_on_error():
508 check_capture_snapshot_len(self
, cmd
=wireshark_k
, env
=test_env
)
511 class TestTsharkCapture
:
512 def test_tshark_capture_10_packets_to_file(self
, cmd_tshark
, check_capture_10_packets
, test_env
):
513 '''Capture 10 packets from the network to a file using TShark'''
514 check_capture_10_packets(self
, cmd
=cmd_tshark
, env
=test_env
)
516 def test_tshark_capture_10_packets_to_stdout(self
, cmd_tshark
, check_capture_10_packets
, test_env
):
517 '''Capture 10 packets from the network to stdout using TShark'''
518 check_capture_10_packets(self
, cmd
=cmd_tshark
, to_stdout
=True, env
=test_env
)
520 def test_tshark_capture_from_fifo(self
, cmd_tshark
, check_capture_fifo
, test_env
):
521 '''Capture from a fifo using TShark'''
522 check_capture_fifo(self
, cmd
=cmd_tshark
, env
=test_env
)
524 def test_tshark_capture_from_stdin(self
, cmd_tshark
, check_capture_stdin
, test_env
):
525 '''Capture from stdin using TShark'''
526 check_capture_stdin(self
, cmd
=cmd_tshark
, env
=test_env
)
528 def test_tshark_capture_snapshot_len(self
, cmd_tshark
, check_capture_snapshot_len
, test_env
):
529 '''Capture truncated packets using TShark'''
530 check_capture_snapshot_len(self
, cmd
=cmd_tshark
, env
=test_env
)
533 class TestDumpcapCapture
:
534 def test_dumpcap_capture_10_packets_to_file(self
, cmd_dumpcap
, check_capture_10_packets
, base_env
):
535 '''Capture 10 packets from the network to a file using Dumpcap'''
536 check_capture_10_packets(self
, cmd
=cmd_dumpcap
, env
=base_env
)
538 def test_dumpcap_capture_10_packets_to_stdout(self
, cmd_dumpcap
, check_capture_10_packets
, base_env
):
539 '''Capture 10 packets from the network to stdout using Dumpcap'''
540 check_capture_10_packets(self
, cmd
=cmd_dumpcap
, to_stdout
=True, env
=base_env
)
542 def test_dumpcap_capture_from_fifo(self
, cmd_dumpcap
, check_capture_fifo
, base_env
):
543 '''Capture from a fifo using Dumpcap'''
544 check_capture_fifo(self
, cmd
=cmd_dumpcap
, env
=base_env
)
546 def test_dumpcap_capture_from_stdin(self
, cmd_dumpcap
, check_capture_stdin
, base_env
):
547 '''Capture from stdin using Dumpcap'''
548 check_capture_stdin(self
, cmd
=cmd_dumpcap
, env
=base_env
)
550 def test_dumpcap_capture_snapshot_len(self
, check_capture_snapshot_len
, cmd_dumpcap
, base_env
):
551 '''Capture truncated packets using Dumpcap'''
552 check_capture_snapshot_len(self
, cmd
=cmd_dumpcap
, env
=base_env
)
555 class TestDumpcapAutostop
:
556 # duration, filesize, packets, files
557 def test_dumpcap_autostop_filesize(self
, check_dumpcap_autostop_stdin
, base_env
):
558 '''Capture from stdin using Dumpcap until we reach a file size limit'''
559 check_dumpcap_autostop_stdin(self
, filesize
=15, env
=base_env
)
561 def test_dumpcap_autostop_packets(self
, check_dumpcap_autostop_stdin
, base_env
):
562 '''Capture from stdin using Dumpcap until we reach a packet limit'''
563 check_dumpcap_autostop_stdin(self
, packets
=97, env
=base_env
) # Last prime before 100. Arbitrary.
566 class TestDumpcapRingbuffer
:
567 # duration, interval, filesize, packets, files
568 def test_dumpcap_ringbuffer_filesize(self
, check_dumpcap_ringbuffer_stdin
, base_env
):
569 '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
570 check_dumpcap_ringbuffer_stdin(self
, filesize
=15, env
=base_env
)
572 def test_dumpcap_ringbuffer_packets(self
, check_dumpcap_ringbuffer_stdin
, base_env
):
573 '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
574 check_dumpcap_ringbuffer_stdin(self
, packets
=47, env
=base_env
) # Last prime before 50. Arbitrary.
577 class TestDumpcapPcapngSections
:
578 def test_dumpcap_pcapng_single_in_single_out(self
, check_dumpcap_pcapng_sections
, base_env
):
579 '''Capture from a single pcapng source using Dumpcap and write a single file'''
580 if sys
.byteorder
== 'big':
581 pytest
.skip('this test is supported on little endian only')
582 check_dumpcap_pcapng_sections(self
, env
=base_env
)
584 def test_dumpcap_pcapng_single_in_multi_out(self
, check_dumpcap_pcapng_sections
, base_env
):
585 '''Capture from a single pcapng source using Dumpcap and write two files'''
586 if sys
.byteorder
== 'big':
587 pytest
.skip('this test is supported on little endian only')
588 check_dumpcap_pcapng_sections(self
, multi_output
=True, env
=base_env
)
590 def test_dumpcap_pcapng_multi_in_single_out(self
, check_dumpcap_pcapng_sections
, base_env
):
591 '''Capture from two pcapng sources using Dumpcap and write a single file'''
592 if sys
.byteorder
== 'big':
593 pytest
.skip('this test is supported on little endian only')
594 check_dumpcap_pcapng_sections(self
, multi_input
=True, env
=base_env
)
596 def test_dumpcap_pcapng_multi_in_multi_out(self
, check_dumpcap_pcapng_sections
, base_env
):
597 '''Capture from two pcapng sources using Dumpcap and write two files'''
598 if sys
.byteorder
== 'big':
599 pytest
.skip('this test is supported on little endian only')
600 check_dumpcap_pcapng_sections(self
, multi_input
=True, multi_output
=True, env
=base_env
)