8 from subprocess
import Popen
, PIPE
10 def run_proc(args
, shell
= False, input = None):
11 """A simple wrapper around Popen, returning (rc, stdout, stderr)"""
12 process
= Popen(args
, text
= True, shell
= shell
,
13 stdin
= PIPE
, stdout
= PIPE
, stderr
= PIPE
)
14 output
, error
= process
.communicate(input)
15 return (process
.returncode
, output
, error
)
17 keywords
= ("iptables-translate", "ip6tables-translate", "arptables-translate", "ebtables-translate")
18 xtables_nft_multi
= 'xtables-nft-multi'
20 if sys
.stdout
.isatty():
21 colors
= {"magenta": "\033[95m", "green": "\033[92m", "yellow": "\033[93m",
22 "red": "\033[91m", "end": "\033[0m"}
24 colors
= {"magenta": "", "green": "", "yellow": "", "red": "", "end": ""}
28 return colors
["magenta"] + string
+ colors
["end"]
32 return colors
["red"] + string
+ colors
["end"]
36 return colors
["yellow"] + string
+ colors
["end"]
40 return colors
["green"] + string
+ colors
["end"]
43 def test_one_xlate(name
, sourceline
, expected
, result
):
44 cmd
= [xtables_nft_multi
] + shlex
.split(sourceline
)
45 rc
, output
, error
= run_proc(cmd
)
47 result
.append(name
+ ": " + red("Error: ") + "Call failed: " + " ".join(cmd
))
51 translation
= output
.rstrip(" \n")
52 if translation
!= expected
:
53 result
.append(name
+ ": " + red("Fail"))
54 result
.append(magenta("src: ") + sourceline
.rstrip(" \n"))
55 result
.append(magenta("exp: ") + expected
)
56 result
.append(magenta("res: ") + translation
+ "\n")
61 def test_one_replay(name
, sourceline
, expected
, result
):
65 if sourceline
.find(';') >= 0:
66 sourceline
, searchline
= sourceline
.split(';')
68 srcwords
= shlex
.split(sourceline
)
71 ipt
= srccmd
.split('-')[0]
76 for idx
in range(1, len(srcwords
)):
77 if srcwords
[idx
] in ["-A", "-I", "--append", "--insert"]:
79 chain_name
= srcwords
[idx
+ 1]
80 elif srcwords
[idx
] in ["-t", "--table"]:
82 table_name
= srcwords
[idx
+ 1]
85 return True # nothing to do?
87 if searchline
is None:
88 # adjust sourceline as required
89 checkcmd
= srcwords
[:]
91 checkcmd
[chain_idx
] = "--check"
93 checkcmd
= [ipt
, "-t", table_name
]
94 checkcmd
+= ["--check", chain_name
, searchline
]
97 if srccmd
.startswith("ip6"):
99 elif srccmd
.startswith("arp"):
101 elif srccmd
.startswith("ebt"):
104 expected
= [ l
.removeprefix("nft ").strip(" '") for l
in expected
.split("\n") ]
107 "add table " + fam
+ table_name
,
108 "add chain " + fam
+ table_name
+ " " + chain_name
,
111 rc
, output
, error
= run_proc([args
.nft
, "-f", "-"], shell
= False, input = "\n".join(nft_input
))
113 result
.append(name
+ ": " + red("Replay Fail"))
114 result
.append(args
.nft
+ " call failed: " + error
.rstrip('\n'))
115 for line
in nft_input
:
116 result
.append(magenta("input: ") + line
)
119 rc
, output
, error
= run_proc([xtables_nft_multi
] + checkcmd
)
121 result
.append(name
+ ": " + red("Check Fail"))
122 result
.append(magenta("check: ") + " ".join(checkcmd
))
123 result
.append(magenta("error: ") + error
)
124 rc
, output
, error
= run_proc([xtables_nft_multi
, ipt
+ "-save"])
125 for l
in output
.split("\n"):
126 result
.append(magenta("ipt: ") + l
)
127 rc
, output
, error
= run_proc([args
.nft
, "list", "ruleset"])
128 for l
in output
.split("\n"):
129 result
.append(magenta("nft: ") + l
)
135 def run_test(name
, payload
):
136 global xtables_nft_multi
140 tests
= passed
= failed
= errors
= 0
143 line
= payload
.readline()
145 if not line
.startswith(keywords
):
146 line
= payload
.readline()
149 sourceline
= replayline
= line
.rstrip("\n")
150 if line
.find(';') >= 0:
151 sourceline
= line
.split(';')[0]
153 expected
= payload
.readline().rstrip(" \n")
154 next_expected
= payload
.readline()
155 if next_expected
.startswith("nft"):
156 expected
+= "\n" + next_expected
.rstrip(" \n")
157 line
= payload
.readline()
162 if test_one_xlate(name
, sourceline
, expected
, result
):
171 if test_one_replay(name
, replayline
, expected
, result
):
177 rc
, output
, error
= run_proc([args
.nft
, "flush", "ruleset"])
179 result
.append(name
+ ": " + red("Fail"))
180 result
.append("nft flush ruleset call failed: " + error
)
182 if (passed
== tests
):
183 print(name
+ ": " + green("OK"))
185 print("\n".join(result
), file=sys
.stderr
)
186 return tests
, passed
, failed
, errors
189 def load_test_files():
190 test_files
= total_tests
= total_passed
= total_error
= total_failed
= 0
191 tests_path
= os
.path
.join(os
.path
.dirname(sys
.argv
[0]), "extensions")
192 tests
= sorted(os
.listdir(tests_path
))
193 for test
in [os
.path
.join(tests_path
, f
)
194 for f
in tests
if f
.endswith(".txlate")]:
195 with
open(test
, "r") as payload
:
196 tests
, passed
, failed
, errors
= run_test(test
, payload
)
199 total_passed
+= passed
200 total_failed
+= failed
201 total_error
+= errors
202 return (test_files
, total_tests
, total_passed
, total_failed
, total_error
)
206 # prefer unshare module
209 unshare
.unshare(unshare
.CLONE_NEWNET
)
214 # sledgehammer style:
215 # - call ourselves prefixed by 'unshare -n' if found
216 # - pass extra --no-netns parameter to avoid another recursion
220 unshare
= shutil
.which("unshare")
224 sys
.argv
.append("--no-netns")
225 os
.execv(unshare
, [unshare
, "-n", sys
.executable
] + sys
.argv
)
233 global xtables_nft_multi
237 print("Replay test requires root, sorry", file=sys
.stderr
)
239 if not args
.no_netns
and not spawn_netns():
240 print("Cannot run in own namespace, connectivity might break",
244 os
.putenv("XTABLES_LIBDIR", os
.path
.abspath("extensions"))
245 xtables_nft_multi
= os
.path
.abspath(os
.path
.curdir
) \
246 + '/iptables/' + xtables_nft_multi
248 files
= tests
= passed
= failed
= errors
= 0
249 for test
in args
.test
:
250 if not test
.endswith(".txlate"):
253 with
open(test
, "r") as payload
:
254 t
, p
, f
, e
= run_test(test
, payload
)
261 print(red("Error: ") + "test file does not exist", file=sys
.stderr
)
265 files
, tests
, passed
, failed
, errors
= load_test_files()
271 print("%d test %s, %d tests, %d tests passed, %d tests failed, %d errors"
272 % (files
, file_word
, tests
, passed
, failed
, errors
))
273 return passed
- tests
276 parser
= argparse
.ArgumentParser()
277 parser
.add_argument('-H', '--host', action
='store_true',
278 help='Run tests against installed binaries')
279 parser
.add_argument('-R', '--replay', action
='store_true',
280 help='Replay tests to check iptables-nft parser')
281 parser
.add_argument('-n', '--nft', type=str, default
='nft',
282 help='Replay using given nft binary (default: \'%(default)s\')')
283 parser
.add_argument('--no-netns', action
='store_true',
284 help='Do not run testsuite in own network namespace')
285 parser
.add_argument("test", nargs
="*", help="run only the specified test file(s)")
286 args
= parser
.parse_args()