ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / dns_wildcard.py
blobd65a53729e3cb7d2d4df51b5f6971a26e6cf2273
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett 2007
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import sys
19 from samba import credentials
20 from samba.dcerpc import dns, dnsserver
21 from samba.dnsserver import record_from_string
22 from samba.tests.subunitrun import SubunitOptions, TestProgram
23 from samba import werror, WERRORError
24 from samba.tests.dns_base import DNSTest
25 import samba.getopt as options
26 import optparse
28 parser = optparse.OptionParser(
29 "dns_wildcard.py <server name> <server ip> [options]")
30 sambaopts = options.SambaOptions(parser)
31 parser.add_option_group(sambaopts)
33 # This timeout only has relevance when testing against Windows
34 # Format errors tend to return patchy responses, so a timeout is needed.
35 parser.add_option("--timeout", type="int", dest="timeout",
36 help="Specify timeout for DNS requests")
38 # To run against Windows
39 # python python/samba/tests/dns_wildcard.py computer_name ip
40 # -U"Administrator%admin_password"
41 # --realm=Domain_name
42 # --timeout 10
45 # use command line creds if available
46 credopts = options.CredentialsOptions(parser)
47 parser.add_option_group(credopts)
48 subunitopts = SubunitOptions(parser)
49 parser.add_option_group(subunitopts)
51 opts, args = parser.parse_args()
53 lp = sambaopts.get_loadparm()
54 creds = credopts.get_credentials(lp)
56 timeout = opts.timeout
58 if len(args) < 2:
59 parser.print_usage()
60 sys.exit(1)
62 server_name = args[0]
63 server_ip = args[1]
64 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
66 WILDCARD_IP = "1.1.1.1"
67 WILDCARD = "*.wildcardtest"
68 EXACT_IP = "1.1.1.2"
69 EXACT = "exact.wildcardtest"
70 LEVEL2_WILDCARD_IP = "1.1.1.3"
71 LEVEL2_WILDCARD = "*.level2.wildcardtest"
72 LEVEL2_EXACT_IP = "1.1.1.4"
73 LEVEL2_EXACT = "exact.level2.wildcardtest"
76 class TestWildCardQueries(DNSTest):
78 def setUp(self):
79 super().setUp()
80 global server, server_ip, lp, creds, timeout
81 self.server = server_name
82 self.server_ip = server_ip
83 self.lp = lp
84 self.creds = creds
85 self.timeout = timeout
87 # Create the dns records
88 self.dns_records = [(dns.DNS_QTYPE_A,
89 "%s.%s" % (WILDCARD, self.get_dns_domain()),
90 WILDCARD_IP),
91 (dns.DNS_QTYPE_A,
92 "%s.%s" % (EXACT, self.get_dns_domain()),
93 EXACT_IP),
94 (dns.DNS_QTYPE_A,
95 ("%s.%s" % (
96 LEVEL2_WILDCARD,
97 self.get_dns_domain())),
98 LEVEL2_WILDCARD_IP),
99 (dns.DNS_QTYPE_A,
100 ("%s.%s" % (
101 LEVEL2_EXACT,
102 self.get_dns_domain())),
103 LEVEL2_EXACT_IP)]
105 c = self.dns_connect()
106 for (typ, name, data) in self.dns_records:
107 self.add_record(c, typ, name, data)
109 def tearDown(self):
110 c = self.dns_connect()
111 for (typ, name, data) in self.dns_records:
112 self.delete_record(c, typ, name, data)
114 def dns_connect(self):
115 binding_str = "ncacn_ip_tcp:%s[sign]" % self.server_ip
116 return dnsserver.dnsserver(binding_str, self.lp, self.creds)
118 def delete_record(self, dns_conn, typ, name, data):
120 rec = record_from_string(typ, data)
121 del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
122 del_rec_buf.rec = rec
124 try:
125 dns_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
127 self.server,
128 self.get_dns_domain(),
129 name,
130 None,
131 del_rec_buf)
132 except WERRORError as e:
133 # Ignore record does not exist errors
134 if e.args[0] != werror.WERR_DNS_ERROR_NAME_DOES_NOT_EXIST:
135 raise e
137 def add_record(self, dns_conn, typ, name, data):
139 rec = record_from_string(typ, data)
140 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
141 add_rec_buf.rec = rec
142 try:
143 dns_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
145 self.server,
146 self.get_dns_domain(),
147 name,
148 add_rec_buf,
149 None)
150 except WERRORError as e:
151 raise e
153 def test_one_a_query_match_wildcard(self):
154 "Query an A record, should match the wildcard entry"
156 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
157 questions = []
159 # Check the record
160 name = "miss.wildcardtest.%s" % self.get_dns_domain()
161 q = self.make_name_question(name,
162 dns.DNS_QTYPE_A,
163 dns.DNS_QCLASS_IN)
164 questions.append(q)
166 self.finish_name_packet(p, questions)
167 (response, response_packet) =\
168 self.dns_transaction_udp(p, host=self.server_ip)
169 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
170 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
171 self.assertEqual(response.ancount, 1)
172 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_A)
173 self.assertEqual(response.answers[0].rdata, WILDCARD_IP)
175 def test_one_a_query_match_wildcard_2_labels(self):
176 """ Query an A record, should match the wild card entry
177 have two labels to the left of the wild card target.
180 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
181 questions = []
183 # Check the record
184 name = "label2.label1.wildcardtest.%s" % self.get_dns_domain()
185 q = self.make_name_question(name,
186 dns.DNS_QTYPE_A,
187 dns.DNS_QCLASS_IN)
188 questions.append(q)
190 self.finish_name_packet(p, questions)
191 (response, response_packet) =\
192 self.dns_transaction_udp(p, host=self.server_ip)
193 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
194 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
195 self.assertEqual(response.ancount, 1)
196 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_A)
197 self.assertEqual(response.answers[0].rdata, WILDCARD_IP)
199 def test_one_a_query_wildcard_entry(self):
200 "Query the wildcard entry"
202 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
203 questions = []
205 # Check the record
206 name = "%s.%s" % (WILDCARD, self.get_dns_domain())
207 q = self.make_name_question(name,
208 dns.DNS_QTYPE_A,
209 dns.DNS_QCLASS_IN)
210 questions.append(q)
212 self.finish_name_packet(p, questions)
213 (response, response_packet) =\
214 self.dns_transaction_udp(p, host=self.server_ip)
215 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
216 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
217 self.assertEqual(response.ancount, 1)
218 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_A)
219 self.assertEqual(response.answers[0].rdata, WILDCARD_IP)
221 def test_one_a_query_exact_match(self):
222 """Query an entry that matches the wild card but has an exact match as
223 well.
225 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
226 questions = []
228 # Check the record
229 name = "%s.%s" % (EXACT, self.get_dns_domain())
230 q = self.make_name_question(name,
231 dns.DNS_QTYPE_A,
232 dns.DNS_QCLASS_IN)
233 questions.append(q)
235 self.finish_name_packet(p, questions)
236 (response, response_packet) =\
237 self.dns_transaction_udp(p, host=self.server_ip)
238 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
239 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
240 self.assertEqual(response.ancount, 1)
241 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_A)
242 self.assertEqual(response.answers[0].rdata, EXACT_IP)
244 def test_one_a_query_match_wildcard_l2(self):
245 "Query an A record, should match the level 2 wildcard entry"
247 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
248 questions = []
250 # Check the record
251 name = "miss.level2.wildcardtest.%s" % self.get_dns_domain()
252 q = self.make_name_question(name,
253 dns.DNS_QTYPE_A,
254 dns.DNS_QCLASS_IN)
255 questions.append(q)
257 self.finish_name_packet(p, questions)
258 (response, response_packet) =\
259 self.dns_transaction_udp(p, host=self.server_ip)
260 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262 self.assertEqual(response.ancount, 1)
263 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_A)
264 self.assertEqual(response.answers[0].rdata, LEVEL2_WILDCARD_IP)
266 def test_one_a_query_match_wildcard_l2_2_labels(self):
267 """Query an A record, should match the level 2 wild card entry
268 have two labels to the left of the wild card target
271 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
272 questions = []
274 # Check the record
275 name = "label1.label2.level2.wildcardtest.%s" % self.get_dns_domain()
276 q = self.make_name_question(name,
277 dns.DNS_QTYPE_A,
278 dns.DNS_QCLASS_IN)
279 questions.append(q)
281 self.finish_name_packet(p, questions)
282 (response, response_packet) =\
283 self.dns_transaction_udp(p, host=self.server_ip)
284 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
285 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
286 self.assertEqual(response.ancount, 1)
287 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_A)
288 self.assertEqual(response.answers[0].rdata, LEVEL2_WILDCARD_IP)
290 def test_one_a_query_exact_match_l2(self):
291 """Query an entry that matches the wild card but has an exact match as
292 well.
294 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
295 questions = []
297 # Check the record
298 name = "%s.%s" % (LEVEL2_EXACT, self.get_dns_domain())
299 q = self.make_name_question(name,
300 dns.DNS_QTYPE_A,
301 dns.DNS_QCLASS_IN)
302 questions.append(q)
304 self.finish_name_packet(p, questions)
305 (response, response_packet) =\
306 self.dns_transaction_udp(p, host=self.server_ip)
307 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
308 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
309 self.assertEqual(response.ancount, 1)
310 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_A)
311 self.assertEqual(response.answers[0].rdata, LEVEL2_EXACT_IP)
313 def test_one_a_query_wildcard_entry_l2(self):
314 "Query the level 2 wildcard entry"
316 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
317 questions = []
319 # Check the record
320 name = "%s.%s" % (LEVEL2_WILDCARD, self.get_dns_domain())
321 q = self.make_name_question(name,
322 dns.DNS_QTYPE_A,
323 dns.DNS_QCLASS_IN)
324 questions.append(q)
326 self.finish_name_packet(p, questions)
327 (response, response_packet) =\
328 self.dns_transaction_udp(p, host=self.server_ip)
329 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
330 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
331 self.assertEqual(response.ancount, 1)
332 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_A)
333 self.assertEqual(response.answers[0].rdata, LEVEL2_WILDCARD_IP)
336 TestProgram(module=__name__, opts=subunitopts)