3 # By Gerald Combs <gerald@wireshark.org>
5 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
7 # SPDX-License-Identifier: GPL-2.0-or-later
9 '''Subprocess test case superclass'''
19 # - Add a subprocesstest.SkipUnlessCapture decorator?
20 # - Try to catch crashes? See the comments below in waitProcess.
22 process_timeout
= 300 # Seconds
24 class ExitCodes(enum
.IntEnum
):
28 INVALID_FILE_ERROR
= 3
29 INVALID_FILTER_ERROR
= 4
30 INVALID_CAPABILITY
= 5
31 IFACE_NO_LINK_TYPES
= 6
32 IFACE_HAS_NO_TIMESTAMP_TYPES
= 7
35 PCAP_NOT_SUPPORTED
= 10
36 DUMPCAP_NOT_SUPPORTED
= 11
40 def run(*args
, capture_output
=False, stdout
=None, stderr
=None, encoding
='utf-8', **kwargs
):
41 ''' Wrapper for subprocess.run() that captures and decodes output.'''
43 # If the user told us what to do with standard streams use that.
44 if capture_output
or stdout
or stderr
:
45 return subprocess
.run(*args
, capture_output
=capture_output
, stdout
=stdout
, stderr
=stderr
, encoding
=encoding
, **kwargs
)
47 # If the user doesn't want to capture output try to ensure the child inherits the parents stdout and stderr on
48 # all platforms, so pytest can reliably capture it and do the right thing. Otherwise the child output may be interleaved
49 # on the console with the parent and mangle pytest's status output.
51 # Make the child inherit only the parents stdout and stderr.
52 return subprocess
.run(*args
, close_fds
=True, stdout
=sys
.stdout
, stderr
=sys
.stderr
, encoding
=encoding
, **kwargs
)
54 def check_run(*args
, **kwargs
):
55 ''' Same as run(), also check child process returns 0 (success)'''
56 proc
= run(*args
, check
=True, **kwargs
)
59 def cat_dhcp_command(mode
):
60 '''Create a command string for dumping dhcp.pcap to stdout'''
61 # XXX Do this in Python in a thread?
64 sd_cmd
= '"{}" '.format(sys
.executable
)
65 this_dir
= os
.path
.dirname(__file__
)
66 sd_cmd
+= '"{}" {}'.format(os
.path
.join(this_dir
, 'util_dump_dhcp_pcap.py'), mode
)
69 def cat_cap_file_command(cap_files
):
70 '''Create a command string for dumping one or more capture files to stdout'''
71 # XXX Do this in Python in a thread?
72 if isinstance(cap_files
, str):
73 cap_files
= [ cap_files
]
74 quoted_paths
= ' '.join('"{}"'.format(cap_file
) for cap_file
in cap_files
)
75 if sys
.platform
.startswith('win32'):
76 # https://docs.microsoft.com/en-us/previous-versions/windows/it-pro/windows-xp/bb491026(v=technet.10)
77 # says that the `type` command "displays the contents of a text
78 # file." Copy to the console instead.
79 return 'copy {} CON'.format(quoted_paths
)
80 return 'cat {}'.format(quoted_paths
)
82 def count_output(text
, search_pat
=None):
83 '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
89 return len(text
.splitlines())
93 search_re
= re
.compile(search_pat
)
94 for line
in text
.splitlines():
95 if search_re
.search(line
):
100 def grep_output(text
, search_pat
):
101 return count_output(text
, search_pat
) > 0
103 def check_packet_count(cmd_capinfos
, num_packets
, cap_file
):
104 '''Make sure a capture file contains a specific number of packets.'''
105 got_num_packets
= False
106 capinfos_testout
= subprocess
.run([cmd_capinfos
, cap_file
], capture_output
=True, check
=True, encoding
='utf-8')
107 assert capinfos_testout
.returncode
== 0
108 assert capinfos_testout
.stdout
109 count_pat
= r
'Number of packets:\s+{}'.format(num_packets
)
110 if re
.search(count_pat
, capinfos_testout
.stdout
):
111 got_num_packets
= True
112 assert got_num_packets
, 'Failed to capture exactly {} packets'.format(num_packets
)
114 def get_capture_info(cmd_capinfos
, capinfos_args
, cap_file
):
115 '''Run capinfos on a capture file and log its output.
117 capinfos_args must be a sequence.'''
119 capinfos_cmd
= [cmd_capinfos
]
121 capinfos_cmd
+= capinfos_args
122 capinfos_cmd
.append(cap_file
)
123 capinfos_data
= subprocess
.check_output(capinfos_cmd
)
124 capinfos_stdout
= capinfos_data
.decode('UTF-8', 'replace')
125 return capinfos_stdout