nft: Drop interface mask leftovers from post_parse callbacks
[iptables-mirror.git] / xlate-test.py
blob1c8cfe71ffd46eb6bf898c7232a4da074799f7c4
1 #!/usr/bin/env python3
2 # encoding: utf-8
4 import os
5 import sys
6 import shlex
7 import argparse
8 from subprocess import Popen, PIPE
10 def run_proc(args, shell = False, input = None):
11 """A simple wrapper around Popen, returning (rc, stdout, stderr)"""
12 process = Popen(args, text = True, shell = shell,
13 stdin = PIPE, stdout = PIPE, stderr = PIPE)
14 output, error = process.communicate(input)
15 return (process.returncode, output, error)
17 keywords = ("iptables-translate", "ip6tables-translate", "arptables-translate", "ebtables-translate")
18 xtables_nft_multi = 'xtables-nft-multi'
20 if sys.stdout.isatty():
21 colors = {"magenta": "\033[95m", "green": "\033[92m", "yellow": "\033[93m",
22 "red": "\033[91m", "end": "\033[0m"}
23 else:
24 colors = {"magenta": "", "green": "", "yellow": "", "red": "", "end": ""}
27 def magenta(string):
28 return colors["magenta"] + string + colors["end"]
31 def red(string):
32 return colors["red"] + string + colors["end"]
35 def yellow(string):
36 return colors["yellow"] + string + colors["end"]
39 def green(string):
40 return colors["green"] + string + colors["end"]
43 def test_one_xlate(name, sourceline, expected, result):
44 cmd = [xtables_nft_multi] + shlex.split(sourceline)
45 rc, output, error = run_proc(cmd)
46 if rc != 0:
47 result.append(name + ": " + red("Error: ") + "Call failed: " + " ".join(cmd))
48 result.append(error)
49 return False
51 translation = output.rstrip(" \n")
52 if translation != expected:
53 result.append(name + ": " + red("Fail"))
54 result.append(magenta("src: ") + sourceline.rstrip(" \n"))
55 result.append(magenta("exp: ") + expected)
56 result.append(magenta("res: ") + translation + "\n")
57 return False
59 return True
61 def test_one_replay(name, sourceline, expected, result):
62 global args
64 searchline = None
65 if sourceline.find(';') >= 0:
66 sourceline, searchline = sourceline.split(';')
68 srcwords = shlex.split(sourceline)
70 srccmd = srcwords[0]
71 ipt = srccmd.split('-')[0]
72 table_idx = -1
73 chain_idx = -1
74 table_name = "filter"
75 chain_name = None
76 for idx in range(1, len(srcwords)):
77 if srcwords[idx] in ["-A", "-I", "--append", "--insert"]:
78 chain_idx = idx
79 chain_name = srcwords[idx + 1]
80 elif srcwords[idx] in ["-t", "--table"]:
81 table_idx = idx
82 table_name = srcwords[idx + 1]
84 if not chain_name:
85 return True # nothing to do?
87 if searchline is None:
88 # adjust sourceline as required
89 checkcmd = srcwords[:]
90 checkcmd[0] = ipt
91 checkcmd[chain_idx] = "--check"
92 else:
93 checkcmd = [ipt, "-t", table_name]
94 checkcmd += ["--check", chain_name, searchline]
96 fam = ""
97 if srccmd.startswith("ip6"):
98 fam = "ip6 "
99 elif srccmd.startswith("arp"):
100 fam = "arp "
101 elif srccmd.startswith("ebt"):
102 fam = "bridge "
104 expected = [ l.removeprefix("nft ").strip(" '") for l in expected.split("\n") ]
105 nft_input = [
106 "flush ruleset",
107 "add table " + fam + table_name,
108 "add chain " + fam + table_name + " " + chain_name,
109 ] + expected
111 rc, output, error = run_proc([args.nft, "-f", "-"], shell = False, input = "\n".join(nft_input))
112 if rc != 0:
113 result.append(name + ": " + red("Replay Fail"))
114 result.append(args.nft + " call failed: " + error.rstrip('\n'))
115 for line in nft_input:
116 result.append(magenta("input: ") + line)
117 return False
119 rc, output, error = run_proc([xtables_nft_multi] + checkcmd)
120 if rc != 0:
121 result.append(name + ": " + red("Check Fail"))
122 result.append(magenta("check: ") + " ".join(checkcmd))
123 result.append(magenta("error: ") + error)
124 rc, output, error = run_proc([xtables_nft_multi, ipt + "-save"])
125 for l in output.split("\n"):
126 result.append(magenta("ipt: ") + l)
127 rc, output, error = run_proc([args.nft, "list", "ruleset"])
128 for l in output.split("\n"):
129 result.append(magenta("nft: ") + l)
130 return False
132 return True
135 def run_test(name, payload):
136 global xtables_nft_multi
137 global args
139 test_passed = True
140 tests = passed = failed = errors = 0
141 result = []
143 line = payload.readline()
144 while line:
145 if not line.startswith(keywords):
146 line = payload.readline()
147 continue
149 sourceline = replayline = line.rstrip("\n")
150 if line.find(';') >= 0:
151 sourceline = line.split(';')[0]
153 expected = payload.readline().rstrip(" \n")
154 next_expected = payload.readline()
155 if next_expected.startswith("nft"):
156 expected += "\n" + next_expected.rstrip(" \n")
157 line = payload.readline()
158 else:
159 line = next_expected
161 tests += 1
162 if test_one_xlate(name, sourceline, expected, result):
163 passed += 1
164 else:
165 errors += 1
166 test_passed = False
167 continue
169 if args.replay:
170 tests += 1
171 if test_one_replay(name, replayline, expected, result):
172 passed += 1
173 else:
174 errors += 1
175 test_passed = False
177 rc, output, error = run_proc([args.nft, "flush", "ruleset"])
178 if rc != 0:
179 result.append(name + ": " + red("Fail"))
180 result.append("nft flush ruleset call failed: " + error)
182 if (passed == tests):
183 print(name + ": " + green("OK"))
184 if not test_passed:
185 print("\n".join(result), file=sys.stderr)
186 return tests, passed, failed, errors
189 def load_test_files():
190 test_files = total_tests = total_passed = total_error = total_failed = 0
191 tests_path = os.path.join(os.path.dirname(sys.argv[0]), "extensions")
192 tests = sorted(os.listdir(tests_path))
193 for test in [os.path.join(tests_path, f)
194 for f in tests if f.endswith(".txlate")]:
195 with open(test, "r") as payload:
196 tests, passed, failed, errors = run_test(test, payload)
197 test_files += 1
198 total_tests += tests
199 total_passed += passed
200 total_failed += failed
201 total_error += errors
202 return (test_files, total_tests, total_passed, total_failed, total_error)
205 def spawn_netns():
206 # prefer unshare module
207 try:
208 import unshare
209 unshare.unshare(unshare.CLONE_NEWNET)
210 return True
211 except:
212 pass
214 # sledgehammer style:
215 # - call ourselves prefixed by 'unshare -n' if found
216 # - pass extra --no-netns parameter to avoid another recursion
217 try:
218 import shutil
220 unshare = shutil.which("unshare")
221 if unshare is None:
222 return False
224 sys.argv.append("--no-netns")
225 os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
226 except:
227 pass
229 return False
232 def main():
233 global xtables_nft_multi
235 if args.replay:
236 if os.getuid() != 0:
237 print("Replay test requires root, sorry", file=sys.stderr)
238 return
239 if not args.no_netns and not spawn_netns():
240 print("Cannot run in own namespace, connectivity might break",
241 file=sys.stderr)
243 if not args.host:
244 os.putenv("XTABLES_LIBDIR", os.path.abspath("extensions"))
245 xtables_nft_multi = os.path.abspath(os.path.curdir) \
246 + '/iptables/' + xtables_nft_multi
248 files = tests = passed = failed = errors = 0
249 for test in args.test:
250 if not test.endswith(".txlate"):
251 test += ".txlate"
252 try:
253 with open(test, "r") as payload:
254 t, p, f, e = run_test(test, payload)
255 files += 1
256 tests += t
257 passed += p
258 failed += f
259 errors += e
260 except IOError:
261 print(red("Error: ") + "test file does not exist", file=sys.stderr)
262 return 99
264 if files == 0:
265 files, tests, passed, failed, errors = load_test_files()
267 if files > 1:
268 file_word = "files"
269 else:
270 file_word = "file"
271 print("%d test %s, %d tests, %d tests passed, %d tests failed, %d errors"
272 % (files, file_word, tests, passed, failed, errors))
273 return passed - tests
276 parser = argparse.ArgumentParser()
277 parser.add_argument('-H', '--host', action='store_true',
278 help='Run tests against installed binaries')
279 parser.add_argument('-R', '--replay', action='store_true',
280 help='Replay tests to check iptables-nft parser')
281 parser.add_argument('-n', '--nft', type=str, default='nft',
282 help='Replay using given nft binary (default: \'%(default)s\')')
283 parser.add_argument('--no-netns', action='store_true',
284 help='Do not run testsuite in own network namespace')
285 parser.add_argument("test", nargs="*", help="run only the specified test file(s)")
286 args = parser.parse_args()
287 sys.exit(main())