ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / dns.py
bloba331e26209d496110ae21b325077db7427c16e9c
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba import dsdb
19 from samba.ndr import ndr_unpack, ndr_pack
20 from samba.samdb import SamDB
21 from samba.auth import system_session
22 import ldb
23 from ldb import ERR_OPERATIONS_ERROR
24 import os
25 import sys
26 import struct
27 import socket
28 import samba.ndr as ndr
29 from samba import credentials
30 from samba.dcerpc import dns, dnsp, dnsserver
31 from samba.dnsserver import TXTRecord
32 from samba.dnsserver import record_from_string, dns_record_match
33 from samba.tests.subunitrun import SubunitOptions, TestProgram
34 from samba import werror, WERRORError
35 from samba.tests.dns_base import DNSTest
36 import samba.getopt as options
37 import optparse
40 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
41 sambaopts = options.SambaOptions(parser)
42 parser.add_option_group(sambaopts)
44 # This timeout only has relevance when testing against Windows
45 # Format errors tend to return patchy responses, so a timeout is needed.
46 parser.add_option("--timeout", type="int", dest="timeout",
47 help="Specify timeout for DNS requests")
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
55 opts, args = parser.parse_args()
57 lp = sambaopts.get_loadparm()
58 creds = credopts.get_credentials(lp)
60 timeout = opts.timeout
62 if len(args) < 2:
63 parser.print_usage()
64 sys.exit(1)
66 server_name = args[0]
67 server_ip = args[1]
68 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
71 class TestSimpleQueries(DNSTest):
72 def setUp(self):
73 super().setUp()
74 global server, server_ip, lp, creds, timeout
75 self.server = server_name
76 self.server_ip = server_ip
77 self.lp = lp
78 self.creds = creds
79 self.timeout = timeout
81 def test_one_a_query(self):
82 "create a query packet containing one query record"
83 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
84 questions = []
86 name = "%s.%s" % (self.server, self.get_dns_domain())
87 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
88 print("asking for ", q.name)
89 questions.append(q)
91 self.finish_name_packet(p, questions)
92 (response, response_packet) =\
93 self.dns_transaction_udp(p, host=server_ip)
94 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
95 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
96 self.assertEqual(response.ancount, 1)
97 self.assertEqual(response.answers[0].rdata,
98 self.server_ip)
100 def test_one_SOA_query(self):
101 "create a query packet containing one query record for the SOA"
102 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
103 questions = []
105 name = "%s" % (self.get_dns_domain())
106 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
107 print("asking for ", q.name)
108 questions.append(q)
110 self.finish_name_packet(p, questions)
111 (response, response_packet) =\
112 self.dns_transaction_udp(p, host=server_ip)
113 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
114 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
115 self.assertEqual(response.ancount, 1)
116 self.assertEqual(
117 response.answers[0].rdata.mname.upper(),
118 ("%s.%s" % (self.server, self.get_dns_domain())).upper())
120 def test_one_a_query_tcp(self):
121 "create a query packet containing one query record via TCP"
122 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
123 questions = []
125 name = "%s.%s" % (self.server, self.get_dns_domain())
126 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
127 print("asking for ", q.name)
128 questions.append(q)
130 self.finish_name_packet(p, questions)
131 (response, response_packet) =\
132 self.dns_transaction_tcp(p, host=server_ip)
133 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
134 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
135 self.assertEqual(response.ancount, 1)
136 self.assertEqual(response.answers[0].rdata,
137 self.server_ip)
139 def test_one_mx_query(self):
140 "create a query packet causing an empty RCODE_OK answer"
141 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
142 questions = []
144 name = "%s.%s" % (self.server, self.get_dns_domain())
145 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
146 print("asking for ", q.name)
147 questions.append(q)
149 self.finish_name_packet(p, questions)
150 (response, response_packet) =\
151 self.dns_transaction_udp(p, host=server_ip)
152 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
153 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
154 self.assertEqual(response.ancount, 0)
156 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
157 questions = []
159 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
160 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
161 print("asking for ", q.name)
162 questions.append(q)
164 self.finish_name_packet(p, questions)
165 (response, response_packet) =\
166 self.dns_transaction_udp(p, host=server_ip)
167 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
168 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
169 self.assertEqual(response.ancount, 0)
171 def test_two_queries(self):
172 "create a query packet containing two query records"
173 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
174 questions = []
176 name = "%s.%s" % (self.server, self.get_dns_domain())
177 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
178 questions.append(q)
180 name = "%s.%s" % ('bogusname', self.get_dns_domain())
181 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
182 questions.append(q)
184 self.finish_name_packet(p, questions)
185 try:
186 (response, response_packet) =\
187 self.dns_transaction_udp(p, host=server_ip)
188 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
189 except socket.timeout:
190 # Windows chooses not to respond to incorrectly formatted queries.
191 # Although this appears to be non-deterministic even for the same
192 # request twice, it also appears to be based on a how poorly the
193 # request is formatted.
194 pass
196 def test_qtype_all_query(self):
197 "create a QTYPE_ALL query"
198 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
199 questions = []
201 name = "%s.%s" % (self.server, self.get_dns_domain())
202 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
203 print("asking for ", q.name)
204 questions.append(q)
206 self.finish_name_packet(p, questions)
207 (response, response_packet) =\
208 self.dns_transaction_udp(p, host=server_ip)
210 num_answers = 1
211 dc_ipv6 = os.getenv('SERVER_IPV6')
212 if dc_ipv6 is not None:
213 num_answers += 1
215 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
216 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
217 self.assertEqual(response.ancount, num_answers)
218 self.assertEqual(response.answers[0].rdata,
219 self.server_ip)
220 if dc_ipv6 is not None:
221 self.assertEqual(response.answers[1].rdata, dc_ipv6)
223 def test_qclass_none_query(self):
224 "create a QCLASS_NONE query"
225 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
226 questions = []
228 name = "%s.%s" % (self.server, self.get_dns_domain())
229 q = self.make_name_question(
230 name,
231 dns.DNS_QTYPE_ALL,
232 dns.DNS_QCLASS_NONE)
233 questions.append(q)
235 self.finish_name_packet(p, questions)
236 try:
237 (response, response_packet) =\
238 self.dns_transaction_udp(p, host=server_ip)
239 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
240 except socket.timeout:
241 # Windows chooses not to respond to incorrectly formatted queries.
242 # Although this appears to be non-deterministic even for the same
243 # request twice, it also appears to be based on a how poorly the
244 # request is formatted.
245 pass
247 def test_soa_hostname_query(self):
248 "create a SOA query for a hostname"
249 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
250 questions = []
252 name = "%s.%s" % (self.server, self.get_dns_domain())
253 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
254 questions.append(q)
256 self.finish_name_packet(p, questions)
257 (response, response_packet) =\
258 self.dns_transaction_udp(p, host=server_ip)
259 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
260 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
261 # We don't get SOA records for single hosts
262 self.assertEqual(response.ancount, 0)
263 # But we do respond with an authority section
264 self.assertEqual(response.nscount, 1)
266 def test_soa_unknown_hostname_query(self):
267 "create a SOA query for an unknown hostname"
268 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
269 questions = []
271 name = "foobar.%s" % (self.get_dns_domain())
272 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
273 questions.append(q)
275 self.finish_name_packet(p, questions)
276 (response, response_packet) =\
277 self.dns_transaction_udp(p, host=server_ip)
278 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
279 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
280 # We don't get SOA records for single hosts
281 self.assertEqual(response.ancount, 0)
282 # But we do respond with an authority section
283 self.assertEqual(response.nscount, 1)
285 def test_soa_domain_query(self):
286 "create a SOA query for a domain"
287 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
288 questions = []
290 name = self.get_dns_domain()
291 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
292 questions.append(q)
294 self.finish_name_packet(p, questions)
295 (response, response_packet) =\
296 self.dns_transaction_udp(p, host=server_ip)
297 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
298 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
299 self.assertEqual(response.ancount, 1)
300 self.assertEqual(response.answers[0].rdata.minimum, 3600)
303 class TestDNSUpdates(DNSTest):
304 def setUp(self):
305 super().setUp()
306 global server, server_ip, lp, creds, timeout
307 self.server = server_name
308 self.server_ip = server_ip
309 self.lp = lp
310 self.creds = creds
311 self.timeout = timeout
313 def test_two_updates(self):
314 "create two update requests"
315 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
316 updates = []
318 name = "%s.%s" % (self.server, self.get_dns_domain())
319 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
320 updates.append(u)
322 name = self.get_dns_domain()
323 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
324 updates.append(u)
326 self.finish_name_packet(p, updates)
327 try:
328 (response, response_packet) =\
329 self.dns_transaction_udp(p, host=server_ip)
330 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
331 except socket.timeout:
332 # Windows chooses not to respond to incorrectly formatted queries.
333 # Although this appears to be non-deterministic even for the same
334 # request twice, it also appears to be based on a how poorly the
335 # request is formatted.
336 pass
338 def test_update_wrong_qclass(self):
339 "create update with DNS_QCLASS_NONE"
340 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
341 updates = []
343 name = self.get_dns_domain()
344 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
345 updates.append(u)
347 self.finish_name_packet(p, updates)
348 (response, response_packet) =\
349 self.dns_transaction_udp(p, host=server_ip)
350 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
352 def test_update_prereq_with_non_null_ttl(self):
353 "test update with a non-null TTL"
354 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
355 updates = []
357 name = self.get_dns_domain()
359 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
360 updates.append(u)
361 self.finish_name_packet(p, updates)
363 prereqs = []
364 r = dns.res_rec()
365 r.name = "%s.%s" % (self.server, self.get_dns_domain())
366 r.rr_type = dns.DNS_QTYPE_TXT
367 r.rr_class = dns.DNS_QCLASS_NONE
368 r.ttl = 1
369 r.length = 0
370 prereqs.append(r)
372 p.ancount = len(prereqs)
373 p.answers = prereqs
375 try:
376 (response, response_packet) =\
377 self.dns_transaction_udp(p, host=server_ip)
378 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
379 except socket.timeout:
380 # Windows chooses not to respond to incorrectly formatted queries.
381 # Although this appears to be non-deterministic even for the same
382 # request twice, it also appears to be based on a how poorly the
383 # request is formatted.
384 pass
386 def test_update_prereq_with_non_null_length(self):
387 "test update with a non-null length"
388 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
389 updates = []
391 name = self.get_dns_domain()
393 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
394 updates.append(u)
395 self.finish_name_packet(p, updates)
397 prereqs = []
398 r = dns.res_rec()
399 r.name = "%s.%s" % (self.server, self.get_dns_domain())
400 r.rr_type = dns.DNS_QTYPE_TXT
401 r.rr_class = dns.DNS_QCLASS_ANY
402 r.ttl = 0
403 r.length = 1
404 prereqs.append(r)
406 p.ancount = len(prereqs)
407 p.answers = prereqs
409 (response, response_packet) =\
410 self.dns_transaction_udp(p, host=server_ip)
411 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
413 def test_update_prereq_nonexisting_name(self):
414 "test update with a nonexisting name"
415 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
416 updates = []
418 name = self.get_dns_domain()
420 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
421 updates.append(u)
422 self.finish_name_packet(p, updates)
424 prereqs = []
425 r = dns.res_rec()
426 r.name = "idontexist.%s" % self.get_dns_domain()
427 r.rr_type = dns.DNS_QTYPE_TXT
428 r.rr_class = dns.DNS_QCLASS_ANY
429 r.ttl = 0
430 r.length = 0
431 prereqs.append(r)
433 p.ancount = len(prereqs)
434 p.answers = prereqs
436 (response, response_packet) =\
437 self.dns_transaction_udp(p, host=server_ip)
438 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
440 def test_update_add_txt_record(self):
441 "test adding records works"
442 prefix, txt = 'textrec', ['"This is a test"']
443 p = self.make_txt_update(prefix, txt)
444 (response, response_packet) =\
445 self.dns_transaction_udp(p, host=server_ip)
446 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
447 self.check_query_txt(prefix, txt)
449 def test_delete_record(self):
450 "Test if deleting records works"
452 NAME = "deleterec.%s" % self.get_dns_domain()
454 # First, create a record to make sure we have a record to delete.
455 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
456 updates = []
458 name = self.get_dns_domain()
460 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
461 updates.append(u)
462 self.finish_name_packet(p, updates)
464 updates = []
465 r = dns.res_rec()
466 r.name = NAME
467 r.rr_type = dns.DNS_QTYPE_TXT
468 r.rr_class = dns.DNS_QCLASS_IN
469 r.ttl = 900
470 r.length = 0xffff
471 rdata = self.make_txt_record(['"This is a test"'])
472 r.rdata = rdata
473 updates.append(r)
474 p.nscount = len(updates)
475 p.nsrecs = updates
477 (response, response_packet) =\
478 self.dns_transaction_udp(p, host=server_ip)
479 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
481 # Now check the record is around
482 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
483 questions = []
484 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
485 questions.append(q)
487 self.finish_name_packet(p, questions)
488 (response, response_packet) =\
489 self.dns_transaction_udp(p, host=server_ip)
490 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
492 # Now delete the record
493 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
494 updates = []
496 name = self.get_dns_domain()
498 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
499 updates.append(u)
500 self.finish_name_packet(p, updates)
502 updates = []
503 r = dns.res_rec()
504 r.name = NAME
505 r.rr_type = dns.DNS_QTYPE_TXT
506 r.rr_class = dns.DNS_QCLASS_NONE
507 r.ttl = 0
508 r.length = 0xffff
509 rdata = self.make_txt_record(['"This is a test"'])
510 r.rdata = rdata
511 updates.append(r)
512 p.nscount = len(updates)
513 p.nsrecs = updates
515 (response, response_packet) =\
516 self.dns_transaction_udp(p, host=server_ip)
517 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
519 # And finally check it's gone
520 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
521 questions = []
523 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
524 questions.append(q)
526 self.finish_name_packet(p, questions)
527 (response, response_packet) =\
528 self.dns_transaction_udp(p, host=server_ip)
529 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
531 def test_readd_record(self):
532 "Test if adding, deleting and then re-adding a records works"
534 NAME = "readdrec.%s" % self.get_dns_domain()
536 # Create the record
537 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
538 updates = []
540 name = self.get_dns_domain()
542 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
543 updates.append(u)
544 self.finish_name_packet(p, updates)
546 updates = []
547 r = dns.res_rec()
548 r.name = NAME
549 r.rr_type = dns.DNS_QTYPE_TXT
550 r.rr_class = dns.DNS_QCLASS_IN
551 r.ttl = 900
552 r.length = 0xffff
553 rdata = self.make_txt_record(['"This is a test"'])
554 r.rdata = rdata
555 updates.append(r)
556 p.nscount = len(updates)
557 p.nsrecs = updates
559 (response, response_packet) =\
560 self.dns_transaction_udp(p, host=server_ip)
561 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
563 # Now check the record is around
564 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
565 questions = []
566 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
567 questions.append(q)
569 self.finish_name_packet(p, questions)
570 (response, response_packet) =\
571 self.dns_transaction_udp(p, host=server_ip)
572 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
574 # Now delete the record
575 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
576 updates = []
578 name = self.get_dns_domain()
580 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
581 updates.append(u)
582 self.finish_name_packet(p, updates)
584 updates = []
585 r = dns.res_rec()
586 r.name = NAME
587 r.rr_type = dns.DNS_QTYPE_TXT
588 r.rr_class = dns.DNS_QCLASS_NONE
589 r.ttl = 0
590 r.length = 0xffff
591 rdata = self.make_txt_record(['"This is a test"'])
592 r.rdata = rdata
593 updates.append(r)
594 p.nscount = len(updates)
595 p.nsrecs = updates
597 (response, response_packet) =\
598 self.dns_transaction_udp(p, host=server_ip)
599 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
601 # check it's gone
602 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
603 questions = []
605 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
606 questions.append(q)
608 self.finish_name_packet(p, questions)
609 (response, response_packet) =\
610 self.dns_transaction_udp(p, host=server_ip)
611 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
613 # recreate the record
614 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
615 updates = []
617 name = self.get_dns_domain()
619 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
620 updates.append(u)
621 self.finish_name_packet(p, updates)
623 updates = []
624 r = dns.res_rec()
625 r.name = NAME
626 r.rr_type = dns.DNS_QTYPE_TXT
627 r.rr_class = dns.DNS_QCLASS_IN
628 r.ttl = 900
629 r.length = 0xffff
630 rdata = self.make_txt_record(['"This is a test"'])
631 r.rdata = rdata
632 updates.append(r)
633 p.nscount = len(updates)
634 p.nsrecs = updates
636 (response, response_packet) =\
637 self.dns_transaction_udp(p, host=server_ip)
638 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
640 # Now check the record is around
641 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
642 questions = []
643 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
644 questions.append(q)
646 self.finish_name_packet(p, questions)
647 (response, response_packet) =\
648 self.dns_transaction_udp(p, host=server_ip)
649 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
651 def test_update_add_mx_record(self):
652 "test adding MX records works"
653 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
654 updates = []
656 name = self.get_dns_domain()
658 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
659 updates.append(u)
660 self.finish_name_packet(p, updates)
662 updates = []
663 r = dns.res_rec()
664 r.name = "%s" % self.get_dns_domain()
665 r.rr_type = dns.DNS_QTYPE_MX
666 r.rr_class = dns.DNS_QCLASS_IN
667 r.ttl = 900
668 r.length = 0xffff
669 rdata = dns.mx_record()
670 rdata.preference = 10
671 rdata.exchange = 'mail.%s' % self.get_dns_domain()
672 r.rdata = rdata
673 updates.append(r)
674 p.nscount = len(updates)
675 p.nsrecs = updates
677 (response, response_packet) =\
678 self.dns_transaction_udp(p, host=server_ip)
679 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
681 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
682 questions = []
684 name = "%s" % self.get_dns_domain()
685 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
686 questions.append(q)
688 self.finish_name_packet(p, questions)
689 (response, response_packet) =\
690 self.dns_transaction_udp(p, host=server_ip)
691 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
692 self.assertEqual(response.ancount, 1)
693 ans = response.answers[0]
694 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
695 self.assertEqual(ans.rdata.preference, 10)
696 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
699 class TestComplexQueries(DNSTest):
700 def make_dns_update(self, key, value, qtype):
701 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
703 name = self.get_dns_domain()
704 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
705 self.finish_name_packet(p, [u])
707 r = dns.res_rec()
708 r.name = key
709 r.rr_type = qtype
710 r.rr_class = dns.DNS_QCLASS_IN
711 r.ttl = 900
712 r.length = 0xffff
713 r.rdata = value
714 p.nscount = 1
715 p.nsrecs = [r]
716 (response, response_packet) =\
717 self.dns_transaction_udp(p, host=server_ip)
718 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
720 def setUp(self):
721 super().setUp()
723 global server, server_ip, lp, creds, timeout
724 self.server = server_name
725 self.server_ip = server_ip
726 self.lp = lp
727 self.creds = creds
728 self.timeout = timeout
730 def test_one_a_query(self):
731 "create a query packet containing one query record"
733 try:
735 # Create the record
736 name = "cname_test.%s" % self.get_dns_domain()
737 rdata = "%s.%s" % (self.server, self.get_dns_domain())
738 self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
740 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
741 questions = []
743 # Check the record
744 name = "cname_test.%s" % self.get_dns_domain()
745 q = self.make_name_question(name,
746 dns.DNS_QTYPE_A,
747 dns.DNS_QCLASS_IN)
748 print("asking for ", q.name)
749 questions.append(q)
751 self.finish_name_packet(p, questions)
752 (response, response_packet) =\
753 self.dns_transaction_udp(p, host=self.server_ip)
754 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
755 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
756 self.assertEqual(response.ancount, 2)
757 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
758 self.assertEqual(response.answers[0].rdata, "%s.%s" %
759 (self.server, self.get_dns_domain()))
760 self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_A)
761 self.assertEqual(response.answers[1].rdata,
762 self.server_ip)
764 finally:
765 # Delete the record
766 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
767 updates = []
769 name = self.get_dns_domain()
771 u = self.make_name_question(name,
772 dns.DNS_QTYPE_SOA,
773 dns.DNS_QCLASS_IN)
774 updates.append(u)
775 self.finish_name_packet(p, updates)
777 updates = []
778 r = dns.res_rec()
779 r.name = "cname_test.%s" % self.get_dns_domain()
780 r.rr_type = dns.DNS_QTYPE_CNAME
781 r.rr_class = dns.DNS_QCLASS_NONE
782 r.ttl = 0
783 r.length = 0xffff
784 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
785 updates.append(r)
786 p.nscount = len(updates)
787 p.nsrecs = updates
789 (response, response_packet) =\
790 self.dns_transaction_udp(p, host=self.server_ip)
791 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
793 def test_cname_two_chain(self):
794 name0 = "cnamechain0.%s" % self.get_dns_domain()
795 name1 = "cnamechain1.%s" % self.get_dns_domain()
796 name2 = "cnamechain2.%s" % self.get_dns_domain()
797 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
798 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
799 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
801 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
802 questions = []
803 q = self.make_name_question(name1, dns.DNS_QTYPE_A,
804 dns.DNS_QCLASS_IN)
805 questions.append(q)
807 self.finish_name_packet(p, questions)
808 (response, response_packet) =\
809 self.dns_transaction_udp(p, host=server_ip)
810 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
811 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
812 self.assertEqual(response.ancount, 3)
814 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
815 self.assertEqual(response.answers[0].name, name1)
816 self.assertEqual(response.answers[0].rdata, name2)
818 self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
819 self.assertEqual(response.answers[1].name, name2)
820 self.assertEqual(response.answers[1].rdata, name0)
822 self.assertEqual(response.answers[2].rr_type, dns.DNS_QTYPE_A)
823 self.assertEqual(response.answers[2].rdata,
824 self.server_ip)
826 def test_invalid_empty_cname(self):
827 name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
828 try:
829 self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
830 except AssertionError:
831 pass
832 else:
833 self.fail("Successfully added empty CNAME, which is invalid.")
835 def test_cname_two_chain_not_matching_qtype(self):
836 name0 = "cnamechain0.%s" % self.get_dns_domain()
837 name1 = "cnamechain1.%s" % self.get_dns_domain()
838 name2 = "cnamechain2.%s" % self.get_dns_domain()
839 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
840 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
841 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
843 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
844 questions = []
845 q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
846 dns.DNS_QCLASS_IN)
847 questions.append(q)
849 self.finish_name_packet(p, questions)
850 (response, response_packet) =\
851 self.dns_transaction_udp(p, host=server_ip)
852 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
853 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
855 # CNAME should return all intermediate results!
856 # Only the A records exists, not the TXT.
857 self.assertEqual(response.ancount, 2)
859 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
860 self.assertEqual(response.answers[0].name, name1)
861 self.assertEqual(response.answers[0].rdata, name2)
863 self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
864 self.assertEqual(response.answers[1].name, name2)
865 self.assertEqual(response.answers[1].rdata, name0)
867 def test_cname_loop(self):
868 cname1 = "cnamelooptestrec." + self.get_dns_domain()
869 cname2 = "cnamelooptestrec2." + self.get_dns_domain()
870 cname3 = "cnamelooptestrec3." + self.get_dns_domain()
871 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
872 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
873 self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME)
875 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
876 questions = []
878 q = self.make_name_question(cname1,
879 dns.DNS_QTYPE_A,
880 dns.DNS_QCLASS_IN)
881 questions.append(q)
882 self.finish_name_packet(p, questions)
884 (response, response_packet) =\
885 self.dns_transaction_udp(p, host=self.server_ip)
887 max_recursion_depth = 20
888 self.assertEqual(len(response.answers), max_recursion_depth)
890 # Make sure cname limit doesn't count other records. This is a generic
891 # test called in tests below
892 def max_rec_test(self, rtype, rec_gen):
893 name = "limittestrec{0}.{1}".format(rtype, self.get_dns_domain())
894 limit = 20
895 num_recs_to_enter = limit + 5
897 for i in range(1, num_recs_to_enter+1):
898 ip = rec_gen(i)
899 self.make_dns_update(name, ip, rtype)
901 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
902 questions = []
904 q = self.make_name_question(name,
905 rtype,
906 dns.DNS_QCLASS_IN)
907 questions.append(q)
908 self.finish_name_packet(p, questions)
910 (response, response_packet) =\
911 self.dns_transaction_udp(p, host=self.server_ip)
913 self.assertEqual(len(response.answers), num_recs_to_enter)
915 def test_record_limit_A(self):
916 def ip4_gen(i):
917 return "127.0.0." + str(i)
918 self.max_rec_test(rtype=dns.DNS_QTYPE_A, rec_gen=ip4_gen)
920 def test_record_limit_AAAA(self):
921 def ip6_gen(i):
922 return "AAAA:0:0:0:0:0:0:" + str(i)
923 self.max_rec_test(rtype=dns.DNS_QTYPE_AAAA, rec_gen=ip6_gen)
925 def test_record_limit_SRV(self):
926 def srv_gen(i):
927 rec = dns.srv_record()
928 rec.priority = 1
929 rec.weight = 1
930 rec.port = 92
931 rec.target = "srvtestrec" + str(i)
932 return rec
933 self.max_rec_test(rtype=dns.DNS_QTYPE_SRV, rec_gen=srv_gen)
935 # Same as test_record_limit_A but with a preceding CNAME follow
936 def test_cname_limit(self):
937 cname1 = "cnamelimittestrec." + self.get_dns_domain()
938 cname2 = "cnamelimittestrec2." + self.get_dns_domain()
939 cname3 = "cnamelimittestrec3." + self.get_dns_domain()
940 ip_prefix = '127.0.0.'
941 limit = 20
942 num_recs_to_enter = limit + 5
944 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
945 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
946 num_arecs_to_enter = num_recs_to_enter - 2
947 for i in range(1, num_arecs_to_enter+1):
948 ip = ip_prefix + str(i)
949 self.make_dns_update(cname3, ip, dns.DNS_QTYPE_A)
951 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
952 questions = []
954 q = self.make_name_question(cname1,
955 dns.DNS_QTYPE_A,
956 dns.DNS_QCLASS_IN)
957 questions.append(q)
958 self.finish_name_packet(p, questions)
960 (response, response_packet) =\
961 self.dns_transaction_udp(p, host=self.server_ip)
963 self.assertEqual(len(response.answers), num_recs_to_enter)
965 # ANY query on cname record shouldn't follow the link
966 def test_cname_any_query(self):
967 cname1 = "cnameanytestrec." + self.get_dns_domain()
968 cname2 = "cnameanytestrec2." + self.get_dns_domain()
969 cname3 = "cnameanytestrec3." + self.get_dns_domain()
971 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
972 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
974 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
975 questions = []
977 q = self.make_name_question(cname1,
978 dns.DNS_QTYPE_ALL,
979 dns.DNS_QCLASS_IN)
980 questions.append(q)
981 self.finish_name_packet(p, questions)
983 (response, response_packet) =\
984 self.dns_transaction_udp(p, host=self.server_ip)
986 self.assertEqual(len(response.answers), 1)
987 self.assertEqual(response.answers[0].name, cname1)
988 self.assertEqual(response.answers[0].rdata, cname2)
991 class TestInvalidQueries(DNSTest):
992 def setUp(self):
993 super().setUp()
994 global server, server_ip, lp, creds, timeout
995 self.server = server_name
996 self.server_ip = server_ip
997 self.lp = lp
998 self.creds = creds
999 self.timeout = timeout
1001 def test_one_a_query(self):
1002 """send 0 bytes follows by create a query packet
1003 containing one query record"""
1005 s = None
1006 try:
1007 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
1008 s.connect((self.server_ip, 53))
1009 s.send(b"", 0)
1010 finally:
1011 if s is not None:
1012 s.close()
1014 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1015 questions = []
1017 name = "%s.%s" % (self.server, self.get_dns_domain())
1018 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1019 print("asking for ", q.name)
1020 questions.append(q)
1022 self.finish_name_packet(p, questions)
1023 (response, response_packet) =\
1024 self.dns_transaction_udp(p, host=self.server_ip)
1025 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1026 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1027 self.assertEqual(response.ancount, 1)
1028 self.assertEqual(response.answers[0].rdata,
1029 self.server_ip)
1031 def test_one_a_reply(self):
1032 "send a reply instead of a query"
1033 global timeout
1035 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1036 questions = []
1038 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
1039 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1040 print("asking for ", q.name)
1041 questions.append(q)
1043 self.finish_name_packet(p, questions)
1044 p.operation |= dns.DNS_FLAG_REPLY
1045 s = None
1046 try:
1047 send_packet = ndr.ndr_pack(p)
1048 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1049 s.settimeout(timeout)
1050 host = self.server_ip
1051 s.connect((host, 53))
1052 tcp_packet = struct.pack('!H', len(send_packet))
1053 tcp_packet += send_packet
1054 s.send(tcp_packet, 0)
1055 recv_packet = s.recv(0xffff + 2, 0)
1056 self.assertEqual(0, len(recv_packet))
1057 except socket.timeout:
1058 # Windows chooses not to respond to incorrectly formatted queries.
1059 # Although this appears to be non-deterministic even for the same
1060 # request twice, it also appears to be based on a how poorly the
1061 # request is formatted.
1062 pass
1063 finally:
1064 if s is not None:
1065 s.close()
1068 class TestZones(DNSTest):
1069 def setUp(self):
1070 super().setUp()
1071 global server, server_ip, lp, creds, timeout
1072 self.server = server_name
1073 self.server_ip = server_ip
1074 self.lp = lp
1075 self.creds = creds
1076 self.timeout = timeout
1078 self.zone = "test.lan"
1079 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1080 (self.server_ip),
1081 self.lp, self.creds)
1083 self.samdb = SamDB(url="ldap://" + self.server_ip,
1084 lp=self.get_loadparm(),
1085 session_info=system_session(),
1086 credentials=self.creds)
1087 self.zone_dn = "DC=" + self.zone +\
1088 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1089 str(self.samdb.get_default_basedn())
1091 def tearDown(self):
1092 super().tearDown()
1094 try:
1095 self.delete_zone(self.zone)
1096 except RuntimeError as e:
1097 (num, string) = e.args
1098 if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
1099 raise
1101 def make_zone_obj(self, zone, aging_enabled=False):
1102 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1103 zone_create.pszZoneName = zone
1104 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
1105 zone_create.fAging = int(aging_enabled)
1106 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
1107 zone_create.fDsIntegrated = 1
1108 zone_create.fLoadExisting = 1
1109 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
1110 return zone_create
1112 def create_zone(self, zone, aging_enabled=False):
1113 zone_create = self.make_zone_obj(zone, aging_enabled)
1114 try:
1115 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1116 self.rpc_conn.DnssrvOperation2(client_version,
1118 self.server_ip,
1119 None,
1121 'ZoneCreate',
1122 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1123 zone_create)
1124 except WERRORError as e:
1125 self.fail(e)
1127 def set_params(self, **kwargs):
1128 for key, val in kwargs.items():
1129 name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
1130 name_param.dwParam = val
1131 name_param.pszNodeName = key
1133 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1134 nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
1135 try:
1136 self.rpc_conn.DnssrvOperation2(client_version,
1138 self.server,
1139 self.zone,
1141 'ResetDwordProperty',
1142 nap_type,
1143 name_param)
1144 except WERRORError as e:
1145 self.fail(str(e))
1147 def ldap_modify_dnsrecs(self, name, func):
1148 dn = 'DC={0},{1}'.format(name, self.zone_dn)
1149 dns_recs = self.ldap_get_dns_records(name)
1150 for rec in dns_recs:
1151 func(rec)
1152 update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1153 self.samdb.modify(ldb.Message.from_dict(self.samdb,
1154 update_dict,
1155 ldb.FLAG_MOD_REPLACE))
1157 def dns_update_record(self, prefix, txt):
1158 p = self.make_txt_update(prefix, txt, self.zone)
1159 (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1160 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1161 recs = self.ldap_get_dns_records(prefix)
1162 recs = [r for r in recs if r.data.str == txt]
1163 self.assertEqual(len(recs), 1)
1164 return recs[0]
1166 def dns_tombstone(self, prefix, txt, zone):
1167 name = prefix + "." + zone
1169 to = dnsp.DnssrvRpcRecord()
1170 to.dwTimeStamp = 1000
1171 to.wType = dnsp.DNS_TYPE_TOMBSTONE
1173 self.samdb.dns_replace(name, [to])
1175 def ldap_get_records(self, name):
1176 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1177 # 0 results case for a test below.
1179 expr = "(&(objectClass=dnsNode)(name={0}))".format(name)
1180 return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1181 expression=expr, attrs=["*"])
1183 def ldap_get_dns_records(self, name):
1184 records = self.ldap_get_records(name)
1185 return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1186 for r in records[0].get('dnsRecord')]
1188 def ldap_get_zone_settings(self):
1189 records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1190 expression="(&(objectClass=dnsZone)" +
1191 "(name={0}))".format(self.zone),
1192 attrs=["dNSProperty"])
1193 self.assertEqual(len(records), 1)
1194 props = [ndr_unpack(dnsp.DnsProperty, r)
1195 for r in records[0].get('dNSProperty')]
1197 # We have no choice but to repeat these here.
1198 zone_prop_ids = {0x00: "EMPTY",
1199 0x01: "TYPE",
1200 0x02: "ALLOW_UPDATE",
1201 0x08: "SECURE_TIME",
1202 0x10: "NOREFRESH_INTERVAL",
1203 0x11: "SCAVENGING_SERVERS",
1204 0x12: "AGING_ENABLED_TIME",
1205 0x20: "REFRESH_INTERVAL",
1206 0x40: "AGING_STATE",
1207 0x80: "DELETED_FROM_HOSTNAME",
1208 0x81: "MASTER_SERVERS",
1209 0x82: "AUTO_NS_SERVERS",
1210 0x83: "DCPROMO_CONVERT",
1211 0x90: "SCAVENGING_SERVERS_DA",
1212 0x91: "MASTER_SERVERS_DA",
1213 0x92: "NS_SERVERS_DA",
1214 0x100: "NODE_DBFLAGS"}
1215 return {zone_prop_ids[p.id].lower(): p.data for p in props}
1217 def set_aging(self, enable=False):
1218 self.create_zone(self.zone, aging_enabled=enable)
1219 self.set_params(NoRefreshInterval=1,
1220 RefreshInterval=1,
1221 Aging=int(bool(enable)),
1222 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1224 def test_set_aging(self, enable=True, name='agingtest', txt=None):
1225 if txt is None:
1226 txt = ['test txt']
1227 self.set_aging(enable=True)
1228 settings = self.ldap_get_zone_settings()
1229 self.assertTrue(settings['aging_state'] is not None)
1230 self.assertTrue(settings['aging_state'])
1232 rec = self.dns_update_record('agingtest', ['test txt'])
1233 self.assertNotEqual(rec.dwTimeStamp, 0)
1235 def test_set_aging_disabled(self):
1236 self.set_aging(enable=False)
1237 settings = self.ldap_get_zone_settings()
1238 self.assertTrue(settings['aging_state'] is not None)
1239 self.assertFalse(settings['aging_state'])
1241 rec = self.dns_update_record('agingtest', ['test txt'])
1242 self.assertNotEqual(rec.dwTimeStamp, 0)
1244 def test_aging_update(self, enable=True):
1245 name, txt = 'agingtest', ['test txt']
1246 self.set_aging(enable=True)
1247 before_mod = self.dns_update_record(name, txt)
1248 if not enable:
1249 self.set_params(Aging=0)
1250 dec = 2
1252 def mod_ts(rec):
1253 self.assertTrue(rec.dwTimeStamp > 0)
1254 rec.dwTimeStamp -= dec
1255 self.ldap_modify_dnsrecs(name, mod_ts)
1256 after_mod = self.ldap_get_dns_records(name)
1257 self.assertEqual(len(after_mod), 1)
1258 after_mod = after_mod[0]
1259 self.assertEqual(after_mod.dwTimeStamp,
1260 before_mod.dwTimeStamp - dec)
1261 after_update = self.dns_update_record(name, txt)
1262 after_should_equal = before_mod if enable else after_mod
1263 self.assertEqual(after_should_equal.dwTimeStamp,
1264 after_update.dwTimeStamp)
1266 def test_aging_update_disabled(self):
1267 self.test_aging_update(enable=False)
1269 def test_aging_refresh(self):
1270 name, txt = 'agingtest', ['test txt']
1271 self.create_zone(self.zone, aging_enabled=True)
1272 interval = 10
1273 self.set_params(NoRefreshInterval=interval,
1274 RefreshInterval=interval,
1275 Aging=1,
1276 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1277 before_mod = self.dns_update_record(name, txt)
1279 def mod_ts(rec):
1280 self.assertTrue(rec.dwTimeStamp > 0)
1281 rec.dwTimeStamp -= interval // 2
1282 self.ldap_modify_dnsrecs(name, mod_ts)
1283 update_during_norefresh = self.dns_update_record(name, txt)
1285 def mod_ts(rec):
1286 self.assertTrue(rec.dwTimeStamp > 0)
1287 rec.dwTimeStamp -= interval + interval // 2
1288 self.ldap_modify_dnsrecs(name, mod_ts)
1289 update_during_refresh = self.dns_update_record(name, txt)
1290 self.assertEqual(update_during_norefresh.dwTimeStamp,
1291 before_mod.dwTimeStamp - interval / 2)
1292 self.assertEqual(update_during_refresh.dwTimeStamp,
1293 before_mod.dwTimeStamp)
1295 def test_rpc_add_no_timestamp(self):
1296 name, txt = 'agingtest', ['test txt']
1297 self.set_aging(enable=True)
1298 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1299 rec_buf.rec = TXTRecord(txt)
1300 self.rpc_conn.DnssrvUpdateRecord2(
1301 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1303 self.server_ip,
1304 self.zone,
1305 name,
1306 rec_buf,
1307 None)
1308 recs = self.ldap_get_dns_records(name)
1309 self.assertEqual(len(recs), 1)
1310 self.assertEqual(recs[0].dwTimeStamp, 0)
1312 def test_static_record_dynamic_update(self):
1313 name, txt = 'agingtest', ['test txt']
1314 txt2 = ['test txt2']
1315 self.set_aging(enable=True)
1316 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1317 rec_buf.rec = TXTRecord(txt)
1318 self.rpc_conn.DnssrvUpdateRecord2(
1319 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1321 self.server_ip,
1322 self.zone,
1323 name,
1324 rec_buf,
1325 None)
1327 rec2 = self.dns_update_record(name, txt2)
1328 self.assertEqual(rec2.dwTimeStamp, 0)
1330 def test_dynamic_record_static_update(self):
1331 name, txt = 'agingtest', ['test txt']
1332 txt2 = ['test txt2']
1333 txt3 = ['test txt3']
1334 self.set_aging(enable=True)
1336 self.dns_update_record(name, txt)
1338 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1339 rec_buf.rec = TXTRecord(txt2)
1340 self.rpc_conn.DnssrvUpdateRecord2(
1341 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1343 self.server_ip,
1344 self.zone,
1345 name,
1346 rec_buf,
1347 None)
1349 self.dns_update_record(name, txt3)
1351 recs = self.ldap_get_dns_records(name)
1352 # Put in dict because ldap recs might be out of order
1353 recs = {str(r.data.str): r for r in recs}
1354 self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1355 self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1356 self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1358 def test_dns_tombstone_custom_match_rule(self):
1359 lp = self.get_loadparm()
1360 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1361 session_info=system_session(),
1362 credentials=self.creds)
1364 name, txt = 'agingtest', ['test txt']
1365 name2, txt2 = 'agingtest2', ['test txt2']
1366 name3, txt3 = 'agingtest3', ['test txt3']
1367 name4, txt4 = 'agingtest4', ['test txt4']
1368 name5, txt5 = 'agingtest5', ['test txt5']
1370 self.create_zone(self.zone, aging_enabled=True)
1371 interval = 10
1372 self.set_params(NoRefreshInterval=interval,
1373 RefreshInterval=interval,
1374 Aging=1,
1375 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1377 self.dns_update_record(name, txt)
1379 self.dns_update_record(name2, txt)
1380 self.dns_update_record(name2, txt2)
1382 self.dns_update_record(name3, txt)
1383 self.dns_update_record(name3, txt2)
1384 last_update = self.dns_update_record(name3, txt3)
1386 # Modify txt1 of the first 2 names
1387 def mod_ts(rec):
1388 if rec.data.str == txt:
1389 rec.dwTimeStamp -= 2
1390 self.ldap_modify_dnsrecs(name, mod_ts)
1391 self.ldap_modify_dnsrecs(name2, mod_ts)
1393 # create a static dns record.
1394 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1395 rec_buf.rec = TXTRecord(txt4)
1396 self.rpc_conn.DnssrvUpdateRecord2(
1397 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1399 self.server_ip,
1400 self.zone,
1401 name4,
1402 rec_buf,
1403 None)
1405 # Create a tomb stoned record.
1406 self.dns_update_record(name5, txt5)
1407 self.dns_tombstone(name5, txt5, self.zone)
1409 self.ldap_get_dns_records(name3)
1410 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1411 expr = expr.format(int(last_update.dwTimeStamp) - 1)
1412 try:
1413 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1414 expression=expr, attrs=["*"])
1415 except ldb.LdbError as e:
1416 self.fail(str(e))
1417 updated_names = {str(r.get('name')) for r in res}
1418 self.assertEqual(updated_names, set([name, name2]))
1420 def test_dns_tombstone_custom_match_rule_no_records(self):
1421 lp = self.get_loadparm()
1422 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1423 session_info=system_session(),
1424 credentials=self.creds)
1426 self.create_zone(self.zone, aging_enabled=True)
1427 interval = 10
1428 self.set_params(NoRefreshInterval=interval,
1429 RefreshInterval=interval,
1430 Aging=1,
1431 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1433 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1434 expr = expr.format(1)
1436 try:
1437 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1438 expression=expr, attrs=["*"])
1439 except ldb.LdbError as e:
1440 self.fail(str(e))
1441 self.assertEqual(0, len(res))
1443 def test_dns_tombstone_custom_match_rule_fail(self):
1444 self.create_zone(self.zone, aging_enabled=True)
1445 samdb = SamDB(url=lp.samdb_url(),
1446 lp=lp,
1447 session_info=system_session(),
1448 credentials=self.creds)
1450 # Property name in not dnsRecord
1451 expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1452 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1453 expression=expr, attrs=["*"])
1454 self.assertEqual(len(res), 0)
1456 # No value for tombstone time
1457 try:
1458 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1459 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1460 expression=expr, attrs=["*"])
1461 self.assertEqual(len(res), 0)
1462 self.fail("Exception: ldb.ldbError not generated")
1463 except ldb.LdbError as e:
1464 (num, msg) = e.args
1465 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1467 # Tombstone time = -
1468 try:
1469 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1470 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1471 expression=expr, attrs=["*"])
1472 self.assertEqual(len(res), 0)
1473 self.fail("Exception: ldb.ldbError not generated")
1474 except ldb.LdbError as e:
1475 (num, _) = e.args
1476 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1478 # Tombstone time longer than 64 characters
1479 try:
1480 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1481 expr = expr.format("1" * 65)
1482 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1483 expression=expr, attrs=["*"])
1484 self.assertEqual(len(res), 0)
1485 self.fail("Exception: ldb.ldbError not generated")
1486 except ldb.LdbError as e:
1487 (num, _) = e.args
1488 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1490 # Non numeric Tombstone time
1491 try:
1492 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1493 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1494 expression=expr, attrs=["*"])
1495 self.assertEqual(len(res), 0)
1496 self.fail("Exception: ldb.ldbError not generated")
1497 except ldb.LdbError as e:
1498 (num, _) = e.args
1499 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1501 # Non system session
1502 try:
1503 db = SamDB(url="ldap://" + self.server_ip,
1504 lp=self.get_loadparm(),
1505 credentials=self.creds)
1507 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1508 res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1509 expression=expr, attrs=["*"])
1510 self.assertEqual(len(res), 0)
1511 self.fail("Exception: ldb.ldbError not generated")
1512 except ldb.LdbError as e:
1513 (num, _) = e.args
1514 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1516 def test_basic_scavenging(self):
1517 lp = self.get_loadparm()
1518 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1519 session_info=system_session(),
1520 credentials=self.creds)
1522 self.create_zone(self.zone, aging_enabled=True)
1523 interval = 1
1524 self.set_params(NoRefreshInterval=interval,
1525 RefreshInterval=interval,
1526 Aging=1,
1527 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1528 name, txt = 'agingtest', ['test txt']
1529 name2, txt2 = 'agingtest2', ['test txt2']
1530 name3, txt3 = 'agingtest3', ['test txt3']
1531 name4, txt4 = 'agingtest4', ['test txt4']
1532 name5, txt5 = 'agingtest5', ['test txt5']
1533 self.dns_update_record(name, txt)
1534 self.dns_update_record(name2, txt)
1535 self.dns_update_record(name2, txt2)
1536 self.dns_update_record(name3, txt)
1537 self.dns_update_record(name3, txt2)
1539 # Create a tombstoned record.
1540 self.dns_update_record(name4, txt4)
1541 self.dns_tombstone(name4, txt4, self.zone)
1542 records = self.ldap_get_records(name4)
1543 self.assertIn("dNSTombstoned", records[0])
1544 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1546 # Create an un-tombstoned record, with dnsTombstoned: FALSE
1547 self.dns_update_record(name5, txt5)
1548 self.dns_tombstone(name5, txt5, self.zone)
1549 self.dns_update_record(name5, txt5)
1550 records = self.ldap_get_records(name5)
1551 self.assertIn("dNSTombstoned", records[0])
1552 self.assertEqual(records[0]["dNSTombstoned"][0], b"FALSE")
1554 last_add = self.dns_update_record(name3, txt3)
1556 def mod_ts(rec):
1557 self.assertTrue(rec.dwTimeStamp > 0)
1558 if rec.data.str == txt:
1559 rec.dwTimeStamp -= interval * 5
1561 def mod_ts_all(rec):
1562 rec.dwTimeStamp -= interval * 5
1563 self.ldap_modify_dnsrecs(name, mod_ts)
1564 self.ldap_modify_dnsrecs(name2, mod_ts)
1565 self.ldap_modify_dnsrecs(name3, mod_ts)
1566 self.ldap_modify_dnsrecs(name5, mod_ts_all)
1567 self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1568 dsdb._scavenge_dns_records(self.samdb)
1570 recs = self.ldap_get_dns_records(name)
1571 self.assertEqual(len(recs), 1)
1572 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1573 records = self.ldap_get_records(name)
1574 self.assertIn("dNSTombstoned", records[0])
1575 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1577 recs = self.ldap_get_dns_records(name2)
1578 self.assertEqual(len(recs), 1)
1579 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1580 self.assertEqual(recs[0].data.str, txt2)
1582 recs = self.ldap_get_dns_records(name3)
1583 self.assertEqual(len(recs), 2)
1584 txts = {str(r.data.str) for r in recs}
1585 self.assertEqual(txts, {str(txt2), str(txt3)})
1586 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1587 self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1589 recs = self.ldap_get_dns_records(name4)
1590 self.assertEqual(len(recs), 1)
1591 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1592 records = self.ldap_get_records(name4)
1593 self.assertIn("dNSTombstoned", records[0])
1594 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1596 recs = self.ldap_get_dns_records(name5)
1597 self.assertEqual(len(recs), 1)
1598 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1599 records = self.ldap_get_records(name5)
1600 self.assertIn("dNSTombstoned", records[0])
1601 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1603 for make_it_work in [False, True]:
1604 inc = -1 if make_it_work else 1
1606 def mod_ts(rec):
1607 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1608 self.ldap_modify_dnsrecs(name, mod_ts)
1609 dsdb._dns_delete_tombstones(self.samdb)
1610 recs = self.ldap_get_records(name)
1611 if make_it_work:
1612 self.assertEqual(len(recs), 0)
1613 else:
1614 self.assertEqual(len(recs), 1)
1616 def test_fully_qualified_zone(self):
1618 def create_zone_expect_exists(zone):
1619 try:
1620 zone_create = self.make_zone_obj(zone)
1621 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1622 zc_type = dnsserver.DNSSRV_TYPEID_ZONE_CREATE
1623 self.rpc_conn.DnssrvOperation2(client_version,
1625 self.server_ip,
1626 None,
1628 'ZoneCreate',
1629 zc_type,
1630 zone_create)
1631 except WERRORError as e:
1632 enum, _ = e.args
1633 if enum != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
1634 self.fail(e)
1635 return
1636 self.fail("Zone {} should already exist".format(zone))
1638 # Create unqualified, then check creating qualified fails.
1639 self.create_zone(self.zone)
1640 create_zone_expect_exists(self.zone + '.')
1642 # Same again, but the other way around.
1643 self.create_zone(self.zone + '2.')
1644 create_zone_expect_exists(self.zone + '2')
1646 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1647 request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY
1648 tid = dnsserver.DNSSRV_TYPEID_DWORD
1649 typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version,
1651 self.server_ip,
1652 None,
1653 'EnumZones',
1654 tid,
1655 request_filter)
1657 self.delete_zone(self.zone)
1658 self.delete_zone(self.zone + '2')
1660 # Two zones should've been created, neither of them fully qualified.
1661 zones_we_just_made = []
1662 zones = [str(z.pszZoneName) for z in res.ZoneArray]
1663 for zone in zones:
1664 if zone.startswith(self.zone):
1665 zones_we_just_made.append(zone)
1666 self.assertEqual(len(zones_we_just_made), 2)
1667 self.assertEqual(set(zones_we_just_made), {self.zone + '2', self.zone})
1669 def delete_zone(self, zone):
1670 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1672 self.server_ip,
1673 zone,
1675 'DeleteZoneFromDs',
1676 dnsserver.DNSSRV_TYPEID_NULL,
1677 None)
1679 def test_soa_query(self):
1680 zone = "test.lan"
1681 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1682 questions = []
1684 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1685 questions.append(q)
1686 self.finish_name_packet(p, questions)
1688 (response, response_packet) =\
1689 self.dns_transaction_udp(p, host=server_ip)
1690 # Windows returns OK while BIND logically seems to return NXDOMAIN
1691 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1692 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1693 self.assertEqual(response.ancount, 0)
1695 self.create_zone(zone)
1696 (response, response_packet) =\
1697 self.dns_transaction_udp(p, host=server_ip)
1698 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1699 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1700 self.assertEqual(response.ancount, 1)
1701 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1703 self.delete_zone(zone)
1704 (response, response_packet) =\
1705 self.dns_transaction_udp(p, host=server_ip)
1706 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1707 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1708 self.assertEqual(response.ancount, 0)
1710 def set_dnsProperty_zero_length(self, dnsproperty_id):
1711 records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1712 expression="(&(objectClass=dnsZone)" +
1713 "(name={0}))".format(self.zone),
1714 attrs=["dNSProperty"])
1715 self.assertEqual(len(records), 1)
1716 props = [ndr_unpack(dnsp.DnsProperty, r)
1717 for r in records[0].get('dNSProperty')]
1718 new_props = [ndr.ndr_pack(p) for p in props if p.id == dnsproperty_id]
1720 zero_length_p = dnsp.DnsProperty_short()
1721 zero_length_p.id = dnsproperty_id
1722 zero_length_p.namelength = 1
1723 zero_length_p.name = 1
1724 new_props += [ndr.ndr_pack(zero_length_p)]
1726 dn = records[0].dn
1727 update_dict = {'dn': dn, 'dnsProperty': new_props}
1728 self.samdb.modify(ldb.Message.from_dict(self.samdb,
1729 update_dict,
1730 ldb.FLAG_MOD_REPLACE))
1732 def test_update_while_dnsProperty_zero_length(self):
1733 self.create_zone(self.zone)
1734 self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_ALLOW_UPDATE)
1735 rec = self.dns_update_record('dnspropertytest', ['test txt'])
1736 self.assertNotEqual(rec.dwTimeStamp, 0)
1738 def test_enum_zones_while_dnsProperty_zero_length(self):
1739 self.create_zone(self.zone)
1740 self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_ALLOW_UPDATE)
1741 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1742 request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY
1743 tid = dnsserver.DNSSRV_TYPEID_DWORD
1744 typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version,
1746 self.server_ip,
1747 None,
1748 'EnumZones',
1749 tid,
1750 request_filter)
1752 def test_rpc_zone_update_while_dnsProperty_zero_length(self):
1753 self.create_zone(self.zone)
1754 self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_ALLOW_UPDATE)
1755 self.set_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_SECURE)
1757 def test_rpc_zone_update_while_other_dnsProperty_zero_length(self):
1758 self.create_zone(self.zone)
1759 self.set_dnsProperty_zero_length(dnsp.DSPROPERTY_ZONE_MASTER_SERVERS_DA)
1760 self.set_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_SECURE)
1762 class TestRPCRoundtrip(DNSTest):
1763 def setUp(self):
1764 super().setUp()
1765 global server, server_ip, lp, creds
1766 self.server = server_name
1767 self.server_ip = server_ip
1768 self.lp = lp
1769 self.creds = creds
1770 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1771 (self.server_ip),
1772 self.lp,
1773 self.creds)
1775 def rpc_update(self, fqn=None, data=None, wType=None, delete=False):
1776 fqn = fqn or ("rpctestrec." + self.get_dns_domain())
1778 rec = record_from_string(wType, data)
1779 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1780 add_rec_buf.rec = rec
1782 add_arg = add_rec_buf
1783 del_arg = None
1784 if delete:
1785 add_arg = None
1786 del_arg = add_rec_buf
1788 self.rpc_conn.DnssrvUpdateRecord2(
1789 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1791 self.server_ip,
1792 self.get_dns_domain(),
1793 fqn,
1794 add_arg,
1795 del_arg)
1797 def test_rpc_self_referencing_cname(self):
1798 cname = "cnametest2_unqual_rec_loop"
1799 cname_fqn = "%s.%s" % (cname, self.get_dns_domain())
1801 try:
1802 self.rpc_update(fqn=cname, data=cname_fqn,
1803 wType=dnsp.DNS_TYPE_CNAME, delete=True)
1804 except WERRORError as e:
1805 if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST:
1806 self.fail("RPC DNS gaven wrong error on pre-test cleanup "
1807 "for self referencing CNAME: %s" % e.args[0])
1809 try:
1810 self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn)
1811 except WERRORError as e:
1812 if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP:
1813 self.fail("RPC DNS gaven wrong error on insertion of "
1814 "self referencing CNAME: %s" % e.args[0])
1815 return
1817 self.fail("RPC DNS allowed insertion of self referencing CNAME")
1819 def test_update_add_txt_rpc_to_dns(self):
1820 prefix, txt = 'rpctextrec', ['"This is a test"']
1822 name = "%s.%s" % (prefix, self.get_dns_domain())
1824 rec = record_from_string(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1825 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1826 add_rec_buf.rec = rec
1827 try:
1828 self.rpc_conn.DnssrvUpdateRecord2(
1829 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1831 self.server_ip,
1832 self.get_dns_domain(),
1833 name,
1834 add_rec_buf,
1835 None)
1837 except WERRORError as e:
1838 self.fail(str(e))
1840 try:
1841 self.check_query_txt(prefix, txt)
1842 finally:
1843 self.rpc_conn.DnssrvUpdateRecord2(
1844 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1846 self.server_ip,
1847 self.get_dns_domain(),
1848 name,
1849 None,
1850 add_rec_buf)
1852 def test_update_add_null_padded_txt_record(self):
1853 "test adding records works"
1854 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1855 p = self.make_txt_update(prefix, txt)
1856 (response, response_packet) =\
1857 self.dns_transaction_udp(p, host=server_ip)
1858 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1859 self.check_query_txt(prefix, txt)
1860 self.assertIsNotNone(
1861 dns_record_match(self.rpc_conn,
1862 self.server_ip,
1863 self.get_dns_domain(),
1864 "%s.%s" % (prefix, self.get_dns_domain()),
1865 dnsp.DNS_TYPE_TXT,
1866 '"\\"This is a test\\"" "" ""'))
1868 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1869 p = self.make_txt_update(prefix, txt)
1870 (response, response_packet) =\
1871 self.dns_transaction_udp(p, host=server_ip)
1872 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1873 self.check_query_txt(prefix, txt)
1874 self.assertIsNotNone(
1875 dns_record_match(
1876 self.rpc_conn,
1877 self.server_ip,
1878 self.get_dns_domain(),
1879 "%s.%s" % (prefix, self.get_dns_domain()),
1880 dnsp.DNS_TYPE_TXT,
1881 '"\\"This is a test\\"" "" "" "more text"'))
1883 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1884 p = self.make_txt_update(prefix, txt)
1885 (response, response_packet) =\
1886 self.dns_transaction_udp(p, host=server_ip)
1887 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1888 self.check_query_txt(prefix, txt)
1889 self.assertIsNotNone(
1890 dns_record_match(
1891 self.rpc_conn,
1892 self.server_ip,
1893 self.get_dns_domain(),
1894 "%s.%s" % (prefix, self.get_dns_domain()),
1895 dnsp.DNS_TYPE_TXT,
1896 '"" "" "\\"This is a test\\""'))
1898 def test_update_add_padding_rpc_to_dns(self):
1899 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1900 prefix = 'rpc' + prefix
1901 name = "%s.%s" % (prefix, self.get_dns_domain())
1903 rec = record_from_string(dnsp.DNS_TYPE_TXT,
1904 '"\\"This is a test\\"" "" ""')
1905 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1906 add_rec_buf.rec = rec
1907 try:
1908 self.rpc_conn.DnssrvUpdateRecord2(
1909 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1911 self.server_ip,
1912 self.get_dns_domain(),
1913 name,
1914 add_rec_buf,
1915 None)
1917 except WERRORError as e:
1918 self.fail(str(e))
1920 try:
1921 self.check_query_txt(prefix, txt)
1922 finally:
1923 self.rpc_conn.DnssrvUpdateRecord2(
1924 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1926 self.server_ip,
1927 self.get_dns_domain(),
1928 name,
1929 None,
1930 add_rec_buf)
1932 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1933 prefix = 'rpc' + prefix
1934 name = "%s.%s" % (prefix, self.get_dns_domain())
1936 rec = record_from_string(dnsp.DNS_TYPE_TXT,
1937 '"\\"This is a test\\"" "" "" "more text"')
1938 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1939 add_rec_buf.rec = rec
1940 try:
1941 self.rpc_conn.DnssrvUpdateRecord2(
1942 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1944 self.server_ip,
1945 self.get_dns_domain(),
1946 name,
1947 add_rec_buf,
1948 None)
1950 except WERRORError as e:
1951 self.fail(str(e))
1953 try:
1954 self.check_query_txt(prefix, txt)
1955 finally:
1956 self.rpc_conn.DnssrvUpdateRecord2(
1957 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1959 self.server_ip,
1960 self.get_dns_domain(),
1961 name,
1962 None,
1963 add_rec_buf)
1965 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1966 prefix = 'rpc' + prefix
1967 name = "%s.%s" % (prefix, self.get_dns_domain())
1969 rec = record_from_string(dnsp.DNS_TYPE_TXT,
1970 '"" "" "\\"This is a test\\""')
1971 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1972 add_rec_buf.rec = rec
1973 try:
1974 self.rpc_conn.DnssrvUpdateRecord2(
1975 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1977 self.server_ip,
1978 self.get_dns_domain(),
1979 name,
1980 add_rec_buf,
1981 None)
1982 except WERRORError as e:
1983 self.fail(str(e))
1985 try:
1986 self.check_query_txt(prefix, txt)
1987 finally:
1988 self.rpc_conn.DnssrvUpdateRecord2(
1989 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1991 self.server_ip,
1992 self.get_dns_domain(),
1993 name,
1994 None,
1995 add_rec_buf)
1997 # Test is incomplete due to strlen against txt records
1998 def test_update_add_null_char_txt_record(self):
1999 "test adding records works"
2000 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
2001 p = self.make_txt_update(prefix, txt)
2002 (response, response_packet) =\
2003 self.dns_transaction_udp(p, host=server_ip)
2004 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2005 self.check_query_txt(prefix, ['NULL'])
2006 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2007 self.get_dns_domain(),
2008 "%s.%s" % (prefix, self.get_dns_domain()),
2009 dnsp.DNS_TYPE_TXT, '"NULL"'))
2011 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
2012 p = self.make_txt_update(prefix, txt)
2013 (response, response_packet) =\
2014 self.dns_transaction_udp(p, host=server_ip)
2015 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2016 self.check_query_txt(prefix, ['NULL', 'NULL'])
2017 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2018 self.get_dns_domain(),
2019 "%s.%s" % (prefix, self.get_dns_domain()),
2020 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
2022 def test_update_add_null_char_rpc_to_dns(self):
2023 prefix = 'rpcnulltextrec'
2024 name = "%s.%s" % (prefix, self.get_dns_domain())
2026 rec = record_from_string(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
2027 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2028 add_rec_buf.rec = rec
2029 try:
2030 self.rpc_conn.DnssrvUpdateRecord2(
2031 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2033 self.server_ip,
2034 self.get_dns_domain(),
2035 name,
2036 add_rec_buf,
2037 None)
2039 except WERRORError as e:
2040 self.fail(str(e))
2042 try:
2043 self.check_query_txt(prefix, ['NULL'])
2044 finally:
2045 self.rpc_conn.DnssrvUpdateRecord2(
2046 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2048 self.server_ip,
2049 self.get_dns_domain(),
2050 name,
2051 None,
2052 add_rec_buf)
2054 def test_update_add_hex_char_txt_record(self):
2055 "test adding records works"
2056 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
2057 p = self.make_txt_update(prefix, txt)
2058 (response, response_packet) =\
2059 self.dns_transaction_udp(p, host=server_ip)
2060 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2061 self.check_query_txt(prefix, txt)
2062 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2063 self.get_dns_domain(),
2064 "%s.%s" % (prefix, self.get_dns_domain()),
2065 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
2067 def test_update_add_hex_rpc_to_dns(self):
2068 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
2069 prefix = 'rpc' + prefix
2070 name = "%s.%s" % (prefix, self.get_dns_domain())
2072 rec = record_from_string(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
2073 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2074 add_rec_buf.rec = rec
2075 try:
2076 self.rpc_conn.DnssrvUpdateRecord2(
2077 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2079 self.server_ip,
2080 self.get_dns_domain(),
2081 name,
2082 add_rec_buf,
2083 None)
2085 except WERRORError as e:
2086 self.fail(str(e))
2088 try:
2089 self.check_query_txt(prefix, txt)
2090 finally:
2091 self.rpc_conn.DnssrvUpdateRecord2(
2092 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2094 self.server_ip,
2095 self.get_dns_domain(),
2096 name,
2097 None,
2098 add_rec_buf)
2100 def test_update_add_slash_txt_record(self):
2101 "test adding records works"
2102 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
2103 p = self.make_txt_update(prefix, txt)
2104 (response, response_packet) =\
2105 self.dns_transaction_udp(p, host=server_ip)
2106 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2107 self.check_query_txt(prefix, txt)
2108 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2109 self.get_dns_domain(),
2110 "%s.%s" % (prefix, self.get_dns_domain()),
2111 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
2113 # This test fails against Windows as it eliminates slashes in RPC
2114 # One typical use for a slash is in records like 'var=value' to
2115 # escape '=' characters.
2116 def test_update_add_slash_rpc_to_dns(self):
2117 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
2118 prefix = 'rpc' + prefix
2119 name = "%s.%s" % (prefix, self.get_dns_domain())
2121 rec = record_from_string(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
2122 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2123 add_rec_buf.rec = rec
2124 try:
2125 self.rpc_conn.DnssrvUpdateRecord2(
2126 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2128 self.server_ip,
2129 self.get_dns_domain(),
2130 name,
2131 add_rec_buf,
2132 None)
2134 except WERRORError as e:
2135 self.fail(str(e))
2137 try:
2138 self.check_query_txt(prefix, txt)
2140 finally:
2141 self.rpc_conn.DnssrvUpdateRecord2(
2142 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2144 self.server_ip,
2145 self.get_dns_domain(),
2146 name,
2147 None,
2148 add_rec_buf)
2150 def test_update_add_two_txt_records(self):
2151 "test adding two txt records works"
2152 prefix, txt = 'textrec2', ['"This is a test"',
2153 '"and this is a test, too"']
2154 p = self.make_txt_update(prefix, txt)
2155 (response, response_packet) =\
2156 self.dns_transaction_udp(p, host=server_ip)
2157 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2158 self.check_query_txt(prefix, txt)
2159 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2160 self.get_dns_domain(),
2161 "%s.%s" % (prefix, self.get_dns_domain()),
2162 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
2163 ' "\\"and this is a test, too\\""'))
2165 def test_update_add_two_rpc_to_dns(self):
2166 prefix, txt = 'textrec2', ['"This is a test"',
2167 '"and this is a test, too"']
2168 prefix = 'rpc' + prefix
2169 name = "%s.%s" % (prefix, self.get_dns_domain())
2171 rec = record_from_string(dnsp.DNS_TYPE_TXT,
2172 '"\\"This is a test\\""' +
2173 ' "\\"and this is a test, too\\""')
2174 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2175 add_rec_buf.rec = rec
2176 try:
2177 self.rpc_conn.DnssrvUpdateRecord2(
2178 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2180 self.server_ip,
2181 self.get_dns_domain(),
2182 name,
2183 add_rec_buf,
2184 None)
2186 except WERRORError as e:
2187 self.fail(str(e))
2189 try:
2190 self.check_query_txt(prefix, txt)
2191 finally:
2192 self.rpc_conn.DnssrvUpdateRecord2(
2193 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2195 self.server_ip,
2196 self.get_dns_domain(),
2197 name,
2198 None,
2199 add_rec_buf)
2201 def test_update_add_empty_txt_records(self):
2202 "test adding two txt records works"
2203 prefix, txt = 'emptytextrec', []
2204 p = self.make_txt_update(prefix, txt)
2205 (response, response_packet) =\
2206 self.dns_transaction_udp(p, host=server_ip)
2207 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2208 self.check_query_txt(prefix, txt)
2209 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2210 self.get_dns_domain(),
2211 "%s.%s" % (prefix, self.get_dns_domain()),
2212 dnsp.DNS_TYPE_TXT, ''))
2214 def test_update_add_empty_rpc_to_dns(self):
2215 prefix, txt = 'rpcemptytextrec', []
2217 name = "%s.%s" % (prefix, self.get_dns_domain())
2219 rec = record_from_string(dnsp.DNS_TYPE_TXT, '')
2220 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2221 add_rec_buf.rec = rec
2222 try:
2223 self.rpc_conn.DnssrvUpdateRecord2(
2224 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2226 self.server_ip,
2227 self.get_dns_domain(),
2228 name,
2229 add_rec_buf,
2230 None)
2231 except WERRORError as e:
2232 self.fail(str(e))
2234 try:
2235 self.check_query_txt(prefix, txt)
2236 finally:
2237 self.rpc_conn.DnssrvUpdateRecord2(
2238 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2240 self.server_ip,
2241 self.get_dns_domain(),
2242 name,
2243 None,
2244 add_rec_buf)
2247 TestProgram(module=__name__, opts=subunitopts)