1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba
import dsdb
19 from samba
.ndr
import ndr_unpack
, ndr_pack
20 from samba
.samdb
import SamDB
21 from samba
.auth
import system_session
23 from ldb
import ERR_OPERATIONS_ERROR
28 import samba
.ndr
as ndr
29 from samba
import credentials
30 from samba
.dcerpc
import dns
, dnsp
, dnsserver
31 from samba
.dnsserver
import TXTRecord
32 from samba
.dnsserver
import record_from_string
, dns_record_match
33 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
34 from samba
import werror
, WERRORError
35 from samba
.tests
.dns_base
import DNSTest
36 import samba
.getopt
as options
40 parser
= optparse
.OptionParser("dns.py <server name> <server ip> [options]")
41 sambaopts
= options
.SambaOptions(parser
)
42 parser
.add_option_group(sambaopts
)
44 # This timeout only has relevance when testing against Windows
45 # Format errors tend to return patchy responses, so a timeout is needed.
46 parser
.add_option("--timeout", type="int", dest
="timeout",
47 help="Specify timeout for DNS requests")
49 # use command line creds if available
50 credopts
= options
.CredentialsOptions(parser
)
51 parser
.add_option_group(credopts
)
52 subunitopts
= SubunitOptions(parser
)
53 parser
.add_option_group(subunitopts
)
55 opts
, args
= parser
.parse_args()
57 lp
= sambaopts
.get_loadparm()
58 creds
= credopts
.get_credentials(lp
)
60 timeout
= opts
.timeout
68 creds
.set_krb_forwardable(credentials
.NO_KRB_FORWARDABLE
)
71 class TestSimpleQueries(DNSTest
):
74 global server
, server_ip
, lp
, creds
, timeout
75 self
.server
= server_name
76 self
.server_ip
= server_ip
79 self
.timeout
= timeout
81 def test_one_a_query(self
):
82 "create a query packet containing one query record"
83 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
86 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
87 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
88 print("asking for ", q
.name
)
91 self
.finish_name_packet(p
, questions
)
92 (response
, response_packet
) =\
93 self
.dns_transaction_udp(p
, host
=server_ip
)
94 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
95 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
96 self
.assertEqual(response
.ancount
, 1)
97 self
.assertEqual(response
.answers
[0].rdata
,
100 def test_one_SOA_query(self
):
101 "create a query packet containing one query record for the SOA"
102 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
105 name
= "%s" % (self
.get_dns_domain())
106 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
107 print("asking for ", q
.name
)
110 self
.finish_name_packet(p
, questions
)
111 (response
, response_packet
) =\
112 self
.dns_transaction_udp(p
, host
=server_ip
)
113 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
114 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
115 self
.assertEqual(response
.ancount
, 1)
117 response
.answers
[0].rdata
.mname
.upper(),
118 ("%s.%s" % (self
.server
, self
.get_dns_domain())).upper())
120 def test_one_a_query_tcp(self
):
121 "create a query packet containing one query record via TCP"
122 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
125 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
126 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
127 print("asking for ", q
.name
)
130 self
.finish_name_packet(p
, questions
)
131 (response
, response_packet
) =\
132 self
.dns_transaction_tcp(p
, host
=server_ip
)
133 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
134 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
135 self
.assertEqual(response
.ancount
, 1)
136 self
.assertEqual(response
.answers
[0].rdata
,
139 def test_one_mx_query(self
):
140 "create a query packet causing an empty RCODE_OK answer"
141 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
144 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
145 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
146 print("asking for ", q
.name
)
149 self
.finish_name_packet(p
, questions
)
150 (response
, response_packet
) =\
151 self
.dns_transaction_udp(p
, host
=server_ip
)
152 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
153 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
154 self
.assertEqual(response
.ancount
, 0)
156 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
159 name
= "invalid-%s.%s" % (self
.server
, self
.get_dns_domain())
160 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
161 print("asking for ", q
.name
)
164 self
.finish_name_packet(p
, questions
)
165 (response
, response_packet
) =\
166 self
.dns_transaction_udp(p
, host
=server_ip
)
167 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
168 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
169 self
.assertEqual(response
.ancount
, 0)
171 def test_two_queries(self
):
172 "create a query packet containing two query records"
173 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
176 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
177 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
180 name
= "%s.%s" % ('bogusname', self
.get_dns_domain())
181 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
184 self
.finish_name_packet(p
, questions
)
186 (response
, response_packet
) =\
187 self
.dns_transaction_udp(p
, host
=server_ip
)
188 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
189 except socket
.timeout
:
190 # Windows chooses not to respond to incorrectly formatted queries.
191 # Although this appears to be non-deterministic even for the same
192 # request twice, it also appears to be based on a how poorly the
193 # request is formatted.
196 def test_qtype_all_query(self
):
197 "create a QTYPE_ALL query"
198 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
201 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
202 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_ALL
, dns
.DNS_QCLASS_IN
)
203 print("asking for ", q
.name
)
206 self
.finish_name_packet(p
, questions
)
207 (response
, response_packet
) =\
208 self
.dns_transaction_udp(p
, host
=server_ip
)
211 dc_ipv6
= os
.getenv('SERVER_IPV6')
212 if dc_ipv6
is not None:
215 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
216 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
217 self
.assertEqual(response
.ancount
, num_answers
)
218 self
.assertEqual(response
.answers
[0].rdata
,
220 if dc_ipv6
is not None:
221 self
.assertEqual(response
.answers
[1].rdata
, dc_ipv6
)
223 def test_qclass_none_query(self
):
224 "create a QCLASS_NONE query"
225 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
228 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
229 q
= self
.make_name_question(
235 self
.finish_name_packet(p
, questions
)
237 (response
, response_packet
) =\
238 self
.dns_transaction_udp(p
, host
=server_ip
)
239 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
240 except socket
.timeout
:
241 # Windows chooses not to respond to incorrectly formatted queries.
242 # Although this appears to be non-deterministic even for the same
243 # request twice, it also appears to be based on a how poorly the
244 # request is formatted.
247 def test_soa_hostname_query(self
):
248 "create a SOA query for a hostname"
249 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
252 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
253 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
256 self
.finish_name_packet(p
, questions
)
257 (response
, response_packet
) =\
258 self
.dns_transaction_udp(p
, host
=server_ip
)
259 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
260 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
261 # We don't get SOA records for single hosts
262 self
.assertEqual(response
.ancount
, 0)
263 # But we do respond with an authority section
264 self
.assertEqual(response
.nscount
, 1)
266 def test_soa_unknown_hostname_query(self
):
267 "create a SOA query for an unknown hostname"
268 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
271 name
= "foobar.%s" % (self
.get_dns_domain())
272 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
275 self
.finish_name_packet(p
, questions
)
276 (response
, response_packet
) =\
277 self
.dns_transaction_udp(p
, host
=server_ip
)
278 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
279 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
280 # We don't get SOA records for single hosts
281 self
.assertEqual(response
.ancount
, 0)
282 # But we do respond with an authority section
283 self
.assertEqual(response
.nscount
, 1)
285 def test_soa_domain_query(self
):
286 "create a SOA query for a domain"
287 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
290 name
= self
.get_dns_domain()
291 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
294 self
.finish_name_packet(p
, questions
)
295 (response
, response_packet
) =\
296 self
.dns_transaction_udp(p
, host
=server_ip
)
297 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
298 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
299 self
.assertEqual(response
.ancount
, 1)
300 self
.assertEqual(response
.answers
[0].rdata
.minimum
, 3600)
303 class TestDNSUpdates(DNSTest
):
306 global server
, server_ip
, lp
, creds
, timeout
307 self
.server
= server_name
308 self
.server_ip
= server_ip
311 self
.timeout
= timeout
313 def test_two_updates(self
):
314 "create two update requests"
315 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
318 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
319 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
322 name
= self
.get_dns_domain()
323 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
326 self
.finish_name_packet(p
, updates
)
328 (response
, response_packet
) =\
329 self
.dns_transaction_udp(p
, host
=server_ip
)
330 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
331 except socket
.timeout
:
332 # Windows chooses not to respond to incorrectly formatted queries.
333 # Although this appears to be non-deterministic even for the same
334 # request twice, it also appears to be based on a how poorly the
335 # request is formatted.
338 def test_update_wrong_qclass(self
):
339 "create update with DNS_QCLASS_NONE"
340 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
343 name
= self
.get_dns_domain()
344 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_NONE
)
347 self
.finish_name_packet(p
, updates
)
348 (response
, response_packet
) =\
349 self
.dns_transaction_udp(p
, host
=server_ip
)
350 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NOTIMP
)
352 def test_update_prereq_with_non_null_ttl(self
):
353 "test update with a non-null TTL"
354 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
357 name
= self
.get_dns_domain()
359 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
361 self
.finish_name_packet(p
, updates
)
365 r
.name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
366 r
.rr_type
= dns
.DNS_QTYPE_TXT
367 r
.rr_class
= dns
.DNS_QCLASS_NONE
372 p
.ancount
= len(prereqs
)
376 (response
, response_packet
) =\
377 self
.dns_transaction_udp(p
, host
=server_ip
)
378 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_FORMERR
)
379 except socket
.timeout
:
380 # Windows chooses not to respond to incorrectly formatted queries.
381 # Although this appears to be non-deterministic even for the same
382 # request twice, it also appears to be based on a how poorly the
383 # request is formatted.
386 def test_update_prereq_with_non_null_length(self
):
387 "test update with a non-null length"
388 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
391 name
= self
.get_dns_domain()
393 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
395 self
.finish_name_packet(p
, updates
)
399 r
.name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
400 r
.rr_type
= dns
.DNS_QTYPE_TXT
401 r
.rr_class
= dns
.DNS_QCLASS_ANY
406 p
.ancount
= len(prereqs
)
409 (response
, response_packet
) =\
410 self
.dns_transaction_udp(p
, host
=server_ip
)
411 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
413 def test_update_prereq_nonexisting_name(self
):
414 "test update with a nonexisting name"
415 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
418 name
= self
.get_dns_domain()
420 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
422 self
.finish_name_packet(p
, updates
)
426 r
.name
= "idontexist.%s" % self
.get_dns_domain()
427 r
.rr_type
= dns
.DNS_QTYPE_TXT
428 r
.rr_class
= dns
.DNS_QCLASS_ANY
433 p
.ancount
= len(prereqs
)
436 (response
, response_packet
) =\
437 self
.dns_transaction_udp(p
, host
=server_ip
)
438 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXRRSET
)
440 def test_update_add_txt_record(self
):
441 "test adding records works"
442 prefix
, txt
= 'textrec', ['"This is a test"']
443 p
= self
.make_txt_update(prefix
, txt
)
444 (response
, response_packet
) =\
445 self
.dns_transaction_udp(p
, host
=server_ip
)
446 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
447 self
.check_query_txt(prefix
, txt
)
449 def test_delete_record(self
):
450 "Test if deleting records works"
452 NAME
= "deleterec.%s" % self
.get_dns_domain()
454 # First, create a record to make sure we have a record to delete.
455 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
458 name
= self
.get_dns_domain()
460 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
462 self
.finish_name_packet(p
, updates
)
467 r
.rr_type
= dns
.DNS_QTYPE_TXT
468 r
.rr_class
= dns
.DNS_QCLASS_IN
471 rdata
= self
.make_txt_record(['"This is a test"'])
474 p
.nscount
= len(updates
)
477 (response
, response_packet
) =\
478 self
.dns_transaction_udp(p
, host
=server_ip
)
479 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
481 # Now check the record is around
482 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
484 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
487 self
.finish_name_packet(p
, questions
)
488 (response
, response_packet
) =\
489 self
.dns_transaction_udp(p
, host
=server_ip
)
490 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
492 # Now delete the record
493 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
496 name
= self
.get_dns_domain()
498 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
500 self
.finish_name_packet(p
, updates
)
505 r
.rr_type
= dns
.DNS_QTYPE_TXT
506 r
.rr_class
= dns
.DNS_QCLASS_NONE
509 rdata
= self
.make_txt_record(['"This is a test"'])
512 p
.nscount
= len(updates
)
515 (response
, response_packet
) =\
516 self
.dns_transaction_udp(p
, host
=server_ip
)
517 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
519 # And finally check it's gone
520 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
523 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
526 self
.finish_name_packet(p
, questions
)
527 (response
, response_packet
) =\
528 self
.dns_transaction_udp(p
, host
=server_ip
)
529 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
531 def test_readd_record(self
):
532 "Test if adding, deleting and then re-adding a records works"
534 NAME
= "readdrec.%s" % self
.get_dns_domain()
537 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
540 name
= self
.get_dns_domain()
542 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
544 self
.finish_name_packet(p
, updates
)
549 r
.rr_type
= dns
.DNS_QTYPE_TXT
550 r
.rr_class
= dns
.DNS_QCLASS_IN
553 rdata
= self
.make_txt_record(['"This is a test"'])
556 p
.nscount
= len(updates
)
559 (response
, response_packet
) =\
560 self
.dns_transaction_udp(p
, host
=server_ip
)
561 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
563 # Now check the record is around
564 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
566 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
569 self
.finish_name_packet(p
, questions
)
570 (response
, response_packet
) =\
571 self
.dns_transaction_udp(p
, host
=server_ip
)
572 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
574 # Now delete the record
575 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
578 name
= self
.get_dns_domain()
580 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
582 self
.finish_name_packet(p
, updates
)
587 r
.rr_type
= dns
.DNS_QTYPE_TXT
588 r
.rr_class
= dns
.DNS_QCLASS_NONE
591 rdata
= self
.make_txt_record(['"This is a test"'])
594 p
.nscount
= len(updates
)
597 (response
, response_packet
) =\
598 self
.dns_transaction_udp(p
, host
=server_ip
)
599 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
602 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
605 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
608 self
.finish_name_packet(p
, questions
)
609 (response
, response_packet
) =\
610 self
.dns_transaction_udp(p
, host
=server_ip
)
611 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
613 # recreate the record
614 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
617 name
= self
.get_dns_domain()
619 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
621 self
.finish_name_packet(p
, updates
)
626 r
.rr_type
= dns
.DNS_QTYPE_TXT
627 r
.rr_class
= dns
.DNS_QCLASS_IN
630 rdata
= self
.make_txt_record(['"This is a test"'])
633 p
.nscount
= len(updates
)
636 (response
, response_packet
) =\
637 self
.dns_transaction_udp(p
, host
=server_ip
)
638 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
640 # Now check the record is around
641 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
643 q
= self
.make_name_question(NAME
, dns
.DNS_QTYPE_TXT
, dns
.DNS_QCLASS_IN
)
646 self
.finish_name_packet(p
, questions
)
647 (response
, response_packet
) =\
648 self
.dns_transaction_udp(p
, host
=server_ip
)
649 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
651 def test_update_add_mx_record(self
):
652 "test adding MX records works"
653 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
656 name
= self
.get_dns_domain()
658 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
660 self
.finish_name_packet(p
, updates
)
664 r
.name
= "%s" % self
.get_dns_domain()
665 r
.rr_type
= dns
.DNS_QTYPE_MX
666 r
.rr_class
= dns
.DNS_QCLASS_IN
669 rdata
= dns
.mx_record()
670 rdata
.preference
= 10
671 rdata
.exchange
= 'mail.%s' % self
.get_dns_domain()
674 p
.nscount
= len(updates
)
677 (response
, response_packet
) =\
678 self
.dns_transaction_udp(p
, host
=server_ip
)
679 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
681 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
684 name
= "%s" % self
.get_dns_domain()
685 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_MX
, dns
.DNS_QCLASS_IN
)
688 self
.finish_name_packet(p
, questions
)
689 (response
, response_packet
) =\
690 self
.dns_transaction_udp(p
, host
=server_ip
)
691 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
692 self
.assertEqual(response
.ancount
, 1)
693 ans
= response
.answers
[0]
694 self
.assertEqual(ans
.rr_type
, dns
.DNS_QTYPE_MX
)
695 self
.assertEqual(ans
.rdata
.preference
, 10)
696 self
.assertEqual(ans
.rdata
.exchange
, 'mail.%s' % self
.get_dns_domain())
699 class TestComplexQueries(DNSTest
):
700 def make_dns_update(self
, key
, value
, qtype
):
701 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
703 name
= self
.get_dns_domain()
704 u
= self
.make_name_question(name
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
705 self
.finish_name_packet(p
, [u
])
710 r
.rr_class
= dns
.DNS_QCLASS_IN
716 (response
, response_packet
) =\
717 self
.dns_transaction_udp(p
, host
=server_ip
)
718 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
723 global server
, server_ip
, lp
, creds
, timeout
724 self
.server
= server_name
725 self
.server_ip
= server_ip
728 self
.timeout
= timeout
730 def test_one_a_query(self
):
731 "create a query packet containing one query record"
736 name
= "cname_test.%s" % self
.get_dns_domain()
737 rdata
= "%s.%s" % (self
.server
, self
.get_dns_domain())
738 self
.make_dns_update(name
, rdata
, dns
.DNS_QTYPE_CNAME
)
740 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
744 name
= "cname_test.%s" % self
.get_dns_domain()
745 q
= self
.make_name_question(name
,
748 print("asking for ", q
.name
)
751 self
.finish_name_packet(p
, questions
)
752 (response
, response_packet
) =\
753 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
754 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
755 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
756 self
.assertEqual(response
.ancount
, 2)
757 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
758 self
.assertEqual(response
.answers
[0].rdata
, "%s.%s" %
759 (self
.server
, self
.get_dns_domain()))
760 self
.assertEqual(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_A
)
761 self
.assertEqual(response
.answers
[1].rdata
,
766 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
769 name
= self
.get_dns_domain()
771 u
= self
.make_name_question(name
,
775 self
.finish_name_packet(p
, updates
)
779 r
.name
= "cname_test.%s" % self
.get_dns_domain()
780 r
.rr_type
= dns
.DNS_QTYPE_CNAME
781 r
.rr_class
= dns
.DNS_QCLASS_NONE
784 r
.rdata
= "%s.%s" % (self
.server
, self
.get_dns_domain())
786 p
.nscount
= len(updates
)
789 (response
, response_packet
) =\
790 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
791 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
793 def test_cname_two_chain(self
):
794 name0
= "cnamechain0.%s" % self
.get_dns_domain()
795 name1
= "cnamechain1.%s" % self
.get_dns_domain()
796 name2
= "cnamechain2.%s" % self
.get_dns_domain()
797 self
.make_dns_update(name1
, name2
, dns
.DNS_QTYPE_CNAME
)
798 self
.make_dns_update(name2
, name0
, dns
.DNS_QTYPE_CNAME
)
799 self
.make_dns_update(name0
, server_ip
, dns
.DNS_QTYPE_A
)
801 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
803 q
= self
.make_name_question(name1
, dns
.DNS_QTYPE_A
,
807 self
.finish_name_packet(p
, questions
)
808 (response
, response_packet
) =\
809 self
.dns_transaction_udp(p
, host
=server_ip
)
810 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
811 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
812 self
.assertEqual(response
.ancount
, 3)
814 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
815 self
.assertEqual(response
.answers
[0].name
, name1
)
816 self
.assertEqual(response
.answers
[0].rdata
, name2
)
818 self
.assertEqual(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_CNAME
)
819 self
.assertEqual(response
.answers
[1].name
, name2
)
820 self
.assertEqual(response
.answers
[1].rdata
, name0
)
822 self
.assertEqual(response
.answers
[2].rr_type
, dns
.DNS_QTYPE_A
)
823 self
.assertEqual(response
.answers
[2].rdata
,
826 def test_invalid_empty_cname(self
):
827 name0
= "cnamedotprefix0.%s" % self
.get_dns_domain()
829 self
.make_dns_update(name0
, "", dns
.DNS_QTYPE_CNAME
)
830 except AssertionError:
833 self
.fail("Successfully added empty CNAME, which is invalid.")
835 def test_cname_two_chain_not_matching_qtype(self
):
836 name0
= "cnamechain0.%s" % self
.get_dns_domain()
837 name1
= "cnamechain1.%s" % self
.get_dns_domain()
838 name2
= "cnamechain2.%s" % self
.get_dns_domain()
839 self
.make_dns_update(name1
, name2
, dns
.DNS_QTYPE_CNAME
)
840 self
.make_dns_update(name2
, name0
, dns
.DNS_QTYPE_CNAME
)
841 self
.make_dns_update(name0
, server_ip
, dns
.DNS_QTYPE_A
)
843 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
845 q
= self
.make_name_question(name1
, dns
.DNS_QTYPE_TXT
,
849 self
.finish_name_packet(p
, questions
)
850 (response
, response_packet
) =\
851 self
.dns_transaction_udp(p
, host
=server_ip
)
852 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
853 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
855 # CNAME should return all intermediate results!
856 # Only the A records exists, not the TXT.
857 self
.assertEqual(response
.ancount
, 2)
859 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
860 self
.assertEqual(response
.answers
[0].name
, name1
)
861 self
.assertEqual(response
.answers
[0].rdata
, name2
)
863 self
.assertEqual(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_CNAME
)
864 self
.assertEqual(response
.answers
[1].name
, name2
)
865 self
.assertEqual(response
.answers
[1].rdata
, name0
)
867 def test_cname_loop(self
):
868 cname1
= "cnamelooptestrec." + self
.get_dns_domain()
869 cname2
= "cnamelooptestrec2." + self
.get_dns_domain()
870 cname3
= "cnamelooptestrec3." + self
.get_dns_domain()
871 self
.make_dns_update(cname1
, cname2
, dnsp
.DNS_TYPE_CNAME
)
872 self
.make_dns_update(cname2
, cname3
, dnsp
.DNS_TYPE_CNAME
)
873 self
.make_dns_update(cname3
, cname1
, dnsp
.DNS_TYPE_CNAME
)
875 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
878 q
= self
.make_name_question(cname1
,
882 self
.finish_name_packet(p
, questions
)
884 (response
, response_packet
) =\
885 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
887 max_recursion_depth
= 20
888 self
.assertEqual(len(response
.answers
), max_recursion_depth
)
890 # Make sure cname limit doesn't count other records. This is a generic
891 # test called in tests below
892 def max_rec_test(self
, rtype
, rec_gen
):
893 name
= "limittestrec{0}.{1}".format(rtype
, self
.get_dns_domain())
895 num_recs_to_enter
= limit
+ 5
897 for i
in range(1, num_recs_to_enter
+1):
899 self
.make_dns_update(name
, ip
, rtype
)
901 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
904 q
= self
.make_name_question(name
,
908 self
.finish_name_packet(p
, questions
)
910 (response
, response_packet
) =\
911 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
913 self
.assertEqual(len(response
.answers
), num_recs_to_enter
)
915 def test_record_limit_A(self
):
917 return "127.0.0." + str(i
)
918 self
.max_rec_test(rtype
=dns
.DNS_QTYPE_A
, rec_gen
=ip4_gen
)
920 def test_record_limit_AAAA(self
):
922 return "AAAA:0:0:0:0:0:0:" + str(i
)
923 self
.max_rec_test(rtype
=dns
.DNS_QTYPE_AAAA
, rec_gen
=ip6_gen
)
925 def test_record_limit_SRV(self
):
927 rec
= dns
.srv_record()
931 rec
.target
= "srvtestrec" + str(i
)
933 self
.max_rec_test(rtype
=dns
.DNS_QTYPE_SRV
, rec_gen
=srv_gen
)
935 # Same as test_record_limit_A but with a preceding CNAME follow
936 def test_cname_limit(self
):
937 cname1
= "cnamelimittestrec." + self
.get_dns_domain()
938 cname2
= "cnamelimittestrec2." + self
.get_dns_domain()
939 cname3
= "cnamelimittestrec3." + self
.get_dns_domain()
940 ip_prefix
= '127.0.0.'
942 num_recs_to_enter
= limit
+ 5
944 self
.make_dns_update(cname1
, cname2
, dnsp
.DNS_TYPE_CNAME
)
945 self
.make_dns_update(cname2
, cname3
, dnsp
.DNS_TYPE_CNAME
)
946 num_arecs_to_enter
= num_recs_to_enter
- 2
947 for i
in range(1, num_arecs_to_enter
+1):
948 ip
= ip_prefix
+ str(i
)
949 self
.make_dns_update(cname3
, ip
, dns
.DNS_QTYPE_A
)
951 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
954 q
= self
.make_name_question(cname1
,
958 self
.finish_name_packet(p
, questions
)
960 (response
, response_packet
) =\
961 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
963 self
.assertEqual(len(response
.answers
), num_recs_to_enter
)
965 # ANY query on cname record shouldn't follow the link
966 def test_cname_any_query(self
):
967 cname1
= "cnameanytestrec." + self
.get_dns_domain()
968 cname2
= "cnameanytestrec2." + self
.get_dns_domain()
969 cname3
= "cnameanytestrec3." + self
.get_dns_domain()
971 self
.make_dns_update(cname1
, cname2
, dnsp
.DNS_TYPE_CNAME
)
972 self
.make_dns_update(cname2
, cname3
, dnsp
.DNS_TYPE_CNAME
)
974 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
977 q
= self
.make_name_question(cname1
,
981 self
.finish_name_packet(p
, questions
)
983 (response
, response_packet
) =\
984 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
986 self
.assertEqual(len(response
.answers
), 1)
987 self
.assertEqual(response
.answers
[0].name
, cname1
)
988 self
.assertEqual(response
.answers
[0].rdata
, cname2
)
991 class TestInvalidQueries(DNSTest
):
994 global server
, server_ip
, lp
, creds
, timeout
995 self
.server
= server_name
996 self
.server_ip
= server_ip
999 self
.timeout
= timeout
1001 def test_one_a_query(self
):
1002 """send 0 bytes follows by create a query packet
1003 containing one query record"""
1007 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
, 0)
1008 s
.connect((self
.server_ip
, 53))
1014 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
1017 name
= "%s.%s" % (self
.server
, self
.get_dns_domain())
1018 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
1019 print("asking for ", q
.name
)
1022 self
.finish_name_packet(p
, questions
)
1023 (response
, response_packet
) =\
1024 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
1025 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1026 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1027 self
.assertEqual(response
.ancount
, 1)
1028 self
.assertEqual(response
.answers
[0].rdata
,
1031 def test_one_a_reply(self
):
1032 "send a reply instead of a query"
1035 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
1038 name
= "%s.%s" % ('fakefakefake', self
.get_dns_domain())
1039 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
1040 print("asking for ", q
.name
)
1043 self
.finish_name_packet(p
, questions
)
1044 p
.operation |
= dns
.DNS_FLAG_REPLY
1047 send_packet
= ndr
.ndr_pack(p
)
1048 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
1049 s
.settimeout(timeout
)
1050 host
= self
.server_ip
1051 s
.connect((host
, 53))
1052 tcp_packet
= struct
.pack('!H', len(send_packet
))
1053 tcp_packet
+= send_packet
1054 s
.send(tcp_packet
, 0)
1055 recv_packet
= s
.recv(0xffff + 2, 0)
1056 self
.assertEqual(0, len(recv_packet
))
1057 except socket
.timeout
:
1058 # Windows chooses not to respond to incorrectly formatted queries.
1059 # Although this appears to be non-deterministic even for the same
1060 # request twice, it also appears to be based on a how poorly the
1061 # request is formatted.
1068 class TestZones(DNSTest
):
1071 global server
, server_ip
, lp
, creds
, timeout
1072 self
.server
= server_name
1073 self
.server_ip
= server_ip
1076 self
.timeout
= timeout
1078 self
.zone
= "test.lan"
1079 self
.rpc_conn
= dnsserver
.dnsserver("ncacn_ip_tcp:%s[sign]" %
1081 self
.lp
, self
.creds
)
1083 self
.samdb
= SamDB(url
="ldap://" + self
.server_ip
,
1084 lp
=self
.get_loadparm(),
1085 session_info
=system_session(),
1086 credentials
=self
.creds
)
1087 self
.zone_dn
= "DC=" + self
.zone
+\
1088 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1089 str(self
.samdb
.get_default_basedn())
1095 self
.delete_zone(self
.zone
)
1096 except RuntimeError as e
:
1097 (num
, string
) = e
.args
1098 if num
!= werror
.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
:
1101 def make_zone_obj(self
, zone
, aging_enabled
=False):
1102 zone_create
= dnsserver
.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1103 zone_create
.pszZoneName
= zone
1104 zone_create
.dwZoneType
= dnsp
.DNS_ZONE_TYPE_PRIMARY
1105 zone_create
.fAging
= int(aging_enabled
)
1106 zone_create
.dwDpFlags
= dnsserver
.DNS_DP_DOMAIN_DEFAULT
1107 zone_create
.fDsIntegrated
= 1
1108 zone_create
.fLoadExisting
= 1
1109 zone_create
.fAllowUpdate
= dnsp
.DNS_ZONE_UPDATE_UNSECURE
1112 def create_zone(self
, zone
, aging_enabled
=False):
1113 zone_create
= self
.make_zone_obj(zone
, aging_enabled
)
1115 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1116 self
.rpc_conn
.DnssrvOperation2(client_version
,
1122 dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
,
1124 except WERRORError
as e
:
1127 def set_params(self
, **kwargs
):
1128 for key
, val
in kwargs
.items():
1129 name_param
= dnsserver
.DNS_RPC_NAME_AND_PARAM()
1130 name_param
.dwParam
= val
1131 name_param
.pszNodeName
= key
1133 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1134 nap_type
= dnsserver
.DNSSRV_TYPEID_NAME_AND_PARAM
1136 self
.rpc_conn
.DnssrvOperation2(client_version
,
1141 'ResetDwordProperty',
1144 except WERRORError
as e
:
1147 def ldap_modify_dnsrecs(self
, name
, func
):
1148 dn
= 'DC={0},{1}'.format(name
, self
.zone_dn
)
1149 dns_recs
= self
.ldap_get_dns_records(name
)
1150 for rec
in dns_recs
:
1152 update_dict
= {'dn': dn
, 'dnsRecord': [ndr_pack(r
) for r
in dns_recs
]}
1153 self
.samdb
.modify(ldb
.Message
.from_dict(self
.samdb
,
1155 ldb
.FLAG_MOD_REPLACE
))
1157 def dns_update_record(self
, prefix
, txt
):
1158 p
= self
.make_txt_update(prefix
, txt
, self
.zone
)
1159 (code
, response
) = self
.dns_transaction_udp(p
, host
=self
.server_ip
)
1160 self
.assert_dns_rcode_equals(code
, dns
.DNS_RCODE_OK
)
1161 recs
= self
.ldap_get_dns_records(prefix
)
1162 recs
= [r
for r
in recs
if r
.data
.str == txt
]
1163 self
.assertEqual(len(recs
), 1)
1166 def dns_tombstone(self
, prefix
, txt
, zone
):
1167 name
= prefix
+ "." + zone
1169 to
= dnsp
.DnssrvRpcRecord()
1170 to
.dwTimeStamp
= 1000
1171 to
.wType
= dnsp
.DNS_TYPE_TOMBSTONE
1173 self
.samdb
.dns_replace(name
, [to
])
1175 def ldap_get_records(self
, name
):
1176 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1177 # 0 results case for a test below.
1179 expr
= "(&(objectClass=dnsNode)(name={0}))".format(name
)
1180 return self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1181 expression
=expr
, attrs
=["*"])
1183 def ldap_get_dns_records(self
, name
):
1184 records
= self
.ldap_get_records(name
)
1185 return [ndr_unpack(dnsp
.DnssrvRpcRecord
, r
)
1186 for r
in records
[0].get('dnsRecord')]
1188 def ldap_get_zone_settings(self
):
1189 records
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_BASE
,
1190 expression
="(&(objectClass=dnsZone)" +
1191 "(name={0}))".format(self
.zone
),
1192 attrs
=["dNSProperty"])
1193 self
.assertEqual(len(records
), 1)
1194 props
= [ndr_unpack(dnsp
.DnsProperty
, r
)
1195 for r
in records
[0].get('dNSProperty')]
1197 # We have no choice but to repeat these here.
1198 zone_prop_ids
= {0x00: "EMPTY",
1200 0x02: "ALLOW_UPDATE",
1201 0x08: "SECURE_TIME",
1202 0x10: "NOREFRESH_INTERVAL",
1203 0x11: "SCAVENGING_SERVERS",
1204 0x12: "AGING_ENABLED_TIME",
1205 0x20: "REFRESH_INTERVAL",
1206 0x40: "AGING_STATE",
1207 0x80: "DELETED_FROM_HOSTNAME",
1208 0x81: "MASTER_SERVERS",
1209 0x82: "AUTO_NS_SERVERS",
1210 0x83: "DCPROMO_CONVERT",
1211 0x90: "SCAVENGING_SERVERS_DA",
1212 0x91: "MASTER_SERVERS_DA",
1213 0x92: "NS_SERVERS_DA",
1214 0x100: "NODE_DBFLAGS"}
1215 return {zone_prop_ids
[p
.id].lower(): p
.data
for p
in props
}
1217 def set_aging(self
, enable
=False):
1218 self
.create_zone(self
.zone
, aging_enabled
=enable
)
1219 self
.set_params(NoRefreshInterval
=1,
1221 Aging
=int(bool(enable
)),
1222 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1224 def test_set_aging(self
, enable
=True, name
='agingtest', txt
=None):
1227 self
.set_aging(enable
=True)
1228 settings
= self
.ldap_get_zone_settings()
1229 self
.assertTrue(settings
['aging_state'] is not None)
1230 self
.assertTrue(settings
['aging_state'])
1232 rec
= self
.dns_update_record('agingtest', ['test txt'])
1233 self
.assertNotEqual(rec
.dwTimeStamp
, 0)
1235 def test_set_aging_disabled(self
):
1236 self
.set_aging(enable
=False)
1237 settings
= self
.ldap_get_zone_settings()
1238 self
.assertTrue(settings
['aging_state'] is not None)
1239 self
.assertFalse(settings
['aging_state'])
1241 rec
= self
.dns_update_record('agingtest', ['test txt'])
1242 self
.assertNotEqual(rec
.dwTimeStamp
, 0)
1244 def test_aging_update(self
, enable
=True):
1245 name
, txt
= 'agingtest', ['test txt']
1246 self
.set_aging(enable
=True)
1247 before_mod
= self
.dns_update_record(name
, txt
)
1249 self
.set_params(Aging
=0)
1253 self
.assertTrue(rec
.dwTimeStamp
> 0)
1254 rec
.dwTimeStamp
-= dec
1255 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1256 after_mod
= self
.ldap_get_dns_records(name
)
1257 self
.assertEqual(len(after_mod
), 1)
1258 after_mod
= after_mod
[0]
1259 self
.assertEqual(after_mod
.dwTimeStamp
,
1260 before_mod
.dwTimeStamp
- dec
)
1261 after_update
= self
.dns_update_record(name
, txt
)
1262 after_should_equal
= before_mod
if enable
else after_mod
1263 self
.assertEqual(after_should_equal
.dwTimeStamp
,
1264 after_update
.dwTimeStamp
)
1266 def test_aging_update_disabled(self
):
1267 self
.test_aging_update(enable
=False)
1269 def test_aging_refresh(self
):
1270 name
, txt
= 'agingtest', ['test txt']
1271 self
.create_zone(self
.zone
, aging_enabled
=True)
1273 self
.set_params(NoRefreshInterval
=interval
,
1274 RefreshInterval
=interval
,
1276 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1277 before_mod
= self
.dns_update_record(name
, txt
)
1280 self
.assertTrue(rec
.dwTimeStamp
> 0)
1281 rec
.dwTimeStamp
-= interval
// 2
1282 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1283 update_during_norefresh
= self
.dns_update_record(name
, txt
)
1286 self
.assertTrue(rec
.dwTimeStamp
> 0)
1287 rec
.dwTimeStamp
-= interval
+ interval
// 2
1288 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1289 update_during_refresh
= self
.dns_update_record(name
, txt
)
1290 self
.assertEqual(update_during_norefresh
.dwTimeStamp
,
1291 before_mod
.dwTimeStamp
- interval
/ 2)
1292 self
.assertEqual(update_during_refresh
.dwTimeStamp
,
1293 before_mod
.dwTimeStamp
)
1295 def test_rpc_add_no_timestamp(self
):
1296 name
, txt
= 'agingtest', ['test txt']
1297 self
.set_aging(enable
=True)
1298 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1299 rec_buf
.rec
= TXTRecord(txt
)
1300 self
.rpc_conn
.DnssrvUpdateRecord2(
1301 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1308 recs
= self
.ldap_get_dns_records(name
)
1309 self
.assertEqual(len(recs
), 1)
1310 self
.assertEqual(recs
[0].dwTimeStamp
, 0)
1312 def test_static_record_dynamic_update(self
):
1313 name
, txt
= 'agingtest', ['test txt']
1314 txt2
= ['test txt2']
1315 self
.set_aging(enable
=True)
1316 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1317 rec_buf
.rec
= TXTRecord(txt
)
1318 self
.rpc_conn
.DnssrvUpdateRecord2(
1319 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1327 rec2
= self
.dns_update_record(name
, txt2
)
1328 self
.assertEqual(rec2
.dwTimeStamp
, 0)
1330 def test_dynamic_record_static_update(self
):
1331 name
, txt
= 'agingtest', ['test txt']
1332 txt2
= ['test txt2']
1333 txt3
= ['test txt3']
1334 self
.set_aging(enable
=True)
1336 self
.dns_update_record(name
, txt
)
1338 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1339 rec_buf
.rec
= TXTRecord(txt2
)
1340 self
.rpc_conn
.DnssrvUpdateRecord2(
1341 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1349 self
.dns_update_record(name
, txt3
)
1351 recs
= self
.ldap_get_dns_records(name
)
1352 # Put in dict because ldap recs might be out of order
1353 recs
= {str(r
.data
.str): r
for r
in recs
}
1354 self
.assertNotEqual(recs
[str(txt
)].dwTimeStamp
, 0)
1355 self
.assertEqual(recs
[str(txt2
)].dwTimeStamp
, 0)
1356 self
.assertEqual(recs
[str(txt3
)].dwTimeStamp
, 0)
1358 def test_dns_tombstone_custom_match_rule(self
):
1359 lp
= self
.get_loadparm()
1360 self
.samdb
= SamDB(url
=lp
.samdb_url(), lp
=lp
,
1361 session_info
=system_session(),
1362 credentials
=self
.creds
)
1364 name
, txt
= 'agingtest', ['test txt']
1365 name2
, txt2
= 'agingtest2', ['test txt2']
1366 name3
, txt3
= 'agingtest3', ['test txt3']
1367 name4
, txt4
= 'agingtest4', ['test txt4']
1368 name5
, txt5
= 'agingtest5', ['test txt5']
1370 self
.create_zone(self
.zone
, aging_enabled
=True)
1372 self
.set_params(NoRefreshInterval
=interval
,
1373 RefreshInterval
=interval
,
1375 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1377 self
.dns_update_record(name
, txt
)
1379 self
.dns_update_record(name2
, txt
)
1380 self
.dns_update_record(name2
, txt2
)
1382 self
.dns_update_record(name3
, txt
)
1383 self
.dns_update_record(name3
, txt2
)
1384 last_update
= self
.dns_update_record(name3
, txt3
)
1386 # Modify txt1 of the first 2 names
1388 if rec
.data
.str == txt
:
1389 rec
.dwTimeStamp
-= 2
1390 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1391 self
.ldap_modify_dnsrecs(name2
, mod_ts
)
1393 # create a static dns record.
1394 rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1395 rec_buf
.rec
= TXTRecord(txt4
)
1396 self
.rpc_conn
.DnssrvUpdateRecord2(
1397 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1405 # Create a tomb stoned record.
1406 self
.dns_update_record(name5
, txt5
)
1407 self
.dns_tombstone(name5
, txt5
, self
.zone
)
1409 self
.ldap_get_dns_records(name3
)
1410 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1411 expr
= expr
.format(int(last_update
.dwTimeStamp
) - 1)
1413 res
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1414 expression
=expr
, attrs
=["*"])
1415 except ldb
.LdbError
as e
:
1417 updated_names
= {str(r
.get('name')) for r
in res
}
1418 self
.assertEqual(updated_names
, set([name
, name2
]))
1420 def test_dns_tombstone_custom_match_rule_no_records(self
):
1421 lp
= self
.get_loadparm()
1422 self
.samdb
= SamDB(url
=lp
.samdb_url(), lp
=lp
,
1423 session_info
=system_session(),
1424 credentials
=self
.creds
)
1426 self
.create_zone(self
.zone
, aging_enabled
=True)
1428 self
.set_params(NoRefreshInterval
=interval
,
1429 RefreshInterval
=interval
,
1431 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1433 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1434 expr
= expr
.format(1)
1437 res
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1438 expression
=expr
, attrs
=["*"])
1439 except ldb
.LdbError
as e
:
1441 self
.assertEqual(0, len(res
))
1443 def test_dns_tombstone_custom_match_rule_fail(self
):
1444 self
.create_zone(self
.zone
, aging_enabled
=True)
1445 samdb
= SamDB(url
=lp
.samdb_url(),
1447 session_info
=system_session(),
1448 credentials
=self
.creds
)
1450 # Property name in not dnsRecord
1451 expr
= "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1452 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1453 expression
=expr
, attrs
=["*"])
1454 self
.assertEqual(len(res
), 0)
1456 # No value for tombstone time
1458 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1459 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1460 expression
=expr
, attrs
=["*"])
1461 self
.assertEqual(len(res
), 0)
1462 self
.fail("Exception: ldb.ldbError not generated")
1463 except ldb
.LdbError
as e
:
1465 self
.assertEqual(num
, ERR_OPERATIONS_ERROR
)
1467 # Tombstone time = -
1469 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1470 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1471 expression
=expr
, attrs
=["*"])
1472 self
.assertEqual(len(res
), 0)
1473 self
.fail("Exception: ldb.ldbError not generated")
1474 except ldb
.LdbError
as e
:
1476 self
.assertEqual(num
, ERR_OPERATIONS_ERROR
)
1478 # Tombstone time longer than 64 characters
1480 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1481 expr
= expr
.format("1" * 65)
1482 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1483 expression
=expr
, attrs
=["*"])
1484 self
.assertEqual(len(res
), 0)
1485 self
.fail("Exception: ldb.ldbError not generated")
1486 except ldb
.LdbError
as e
:
1488 self
.assertEqual(num
, ERR_OPERATIONS_ERROR
)
1490 # Non numeric Tombstone time
1492 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1493 res
= samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1494 expression
=expr
, attrs
=["*"])
1495 self
.assertEqual(len(res
), 0)
1496 self
.fail("Exception: ldb.ldbError not generated")
1497 except ldb
.LdbError
as e
:
1499 self
.assertEqual(num
, ERR_OPERATIONS_ERROR
)
1501 # Non system session
1503 db
= SamDB(url
="ldap://" + self
.server_ip
,
1504 lp
=self
.get_loadparm(),
1505 credentials
=self
.creds
)
1507 expr
= "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1508 res
= db
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1509 expression
=expr
, attrs
=["*"])
1510 self
.assertEqual(len(res
), 0)
1511 self
.fail("Exception: ldb.ldbError not generated")
1512 except ldb
.LdbError
as e
:
1514 self
.assertEqual(num
, ERR_OPERATIONS_ERROR
)
1516 def test_basic_scavenging(self
):
1517 lp
= self
.get_loadparm()
1518 self
.samdb
= SamDB(url
=lp
.samdb_url(), lp
=lp
,
1519 session_info
=system_session(),
1520 credentials
=self
.creds
)
1522 self
.create_zone(self
.zone
, aging_enabled
=True)
1524 self
.set_params(NoRefreshInterval
=interval
,
1525 RefreshInterval
=interval
,
1527 AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
1528 name
, txt
= 'agingtest', ['test txt']
1529 name2
, txt2
= 'agingtest2', ['test txt2']
1530 name3
, txt3
= 'agingtest3', ['test txt3']
1531 name4
, txt4
= 'agingtest4', ['test txt4']
1532 name5
, txt5
= 'agingtest5', ['test txt5']
1533 self
.dns_update_record(name
, txt
)
1534 self
.dns_update_record(name2
, txt
)
1535 self
.dns_update_record(name2
, txt2
)
1536 self
.dns_update_record(name3
, txt
)
1537 self
.dns_update_record(name3
, txt2
)
1539 # Create a tombstoned record.
1540 self
.dns_update_record(name4
, txt4
)
1541 self
.dns_tombstone(name4
, txt4
, self
.zone
)
1542 records
= self
.ldap_get_records(name4
)
1543 self
.assertIn("dNSTombstoned", records
[0])
1544 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"TRUE")
1546 # Create an un-tombstoned record, with dnsTombstoned: FALSE
1547 self
.dns_update_record(name5
, txt5
)
1548 self
.dns_tombstone(name5
, txt5
, self
.zone
)
1549 self
.dns_update_record(name5
, txt5
)
1550 records
= self
.ldap_get_records(name5
)
1551 self
.assertIn("dNSTombstoned", records
[0])
1552 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"FALSE")
1554 last_add
= self
.dns_update_record(name3
, txt3
)
1557 self
.assertTrue(rec
.dwTimeStamp
> 0)
1558 if rec
.data
.str == txt
:
1559 rec
.dwTimeStamp
-= interval
* 5
1561 def mod_ts_all(rec
):
1562 rec
.dwTimeStamp
-= interval
* 5
1563 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1564 self
.ldap_modify_dnsrecs(name2
, mod_ts
)
1565 self
.ldap_modify_dnsrecs(name3
, mod_ts
)
1566 self
.ldap_modify_dnsrecs(name5
, mod_ts_all
)
1567 self
.assertTrue(callable(getattr(dsdb
, '_scavenge_dns_records', None)))
1568 dsdb
._scavenge
_dns
_records
(self
.samdb
)
1570 recs
= self
.ldap_get_dns_records(name
)
1571 self
.assertEqual(len(recs
), 1)
1572 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
1573 records
= self
.ldap_get_records(name
)
1574 self
.assertIn("dNSTombstoned", records
[0])
1575 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"TRUE")
1577 recs
= self
.ldap_get_dns_records(name2
)
1578 self
.assertEqual(len(recs
), 1)
1579 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TXT
)
1580 self
.assertEqual(recs
[0].data
.str, txt2
)
1582 recs
= self
.ldap_get_dns_records(name3
)
1583 self
.assertEqual(len(recs
), 2)
1584 txts
= {str(r
.data
.str) for r
in recs
}
1585 self
.assertEqual(txts
, {str(txt2
), str(txt3
)})
1586 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TXT
)
1587 self
.assertEqual(recs
[1].wType
, dnsp
.DNS_TYPE_TXT
)
1589 recs
= self
.ldap_get_dns_records(name4
)
1590 self
.assertEqual(len(recs
), 1)
1591 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
1592 records
= self
.ldap_get_records(name4
)
1593 self
.assertIn("dNSTombstoned", records
[0])
1594 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"TRUE")
1596 recs
= self
.ldap_get_dns_records(name5
)
1597 self
.assertEqual(len(recs
), 1)
1598 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
1599 records
= self
.ldap_get_records(name5
)
1600 self
.assertIn("dNSTombstoned", records
[0])
1601 self
.assertEqual(records
[0]["dNSTombstoned"][0], b
"TRUE")
1603 for make_it_work
in [False, True]:
1604 inc
= -1 if make_it_work
else 1
1607 rec
.data
= (last_add
.dwTimeStamp
- 24 * 14) + inc
1608 self
.ldap_modify_dnsrecs(name
, mod_ts
)
1609 dsdb
._dns
_delete
_tombstones
(self
.samdb
)
1610 recs
= self
.ldap_get_records(name
)
1612 self
.assertEqual(len(recs
), 0)
1614 self
.assertEqual(len(recs
), 1)
1616 def test_fully_qualified_zone(self
):
1618 def create_zone_expect_exists(zone
):
1620 zone_create
= self
.make_zone_obj(zone
)
1621 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1622 zc_type
= dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
1623 self
.rpc_conn
.DnssrvOperation2(client_version
,
1631 except WERRORError
as e
:
1633 if enum
!= werror
.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS
:
1636 self
.fail("Zone {} should already exist".format(zone
))
1638 # Create unqualified, then check creating qualified fails.
1639 self
.create_zone(self
.zone
)
1640 create_zone_expect_exists(self
.zone
+ '.')
1642 # Same again, but the other way around.
1643 self
.create_zone(self
.zone
+ '2.')
1644 create_zone_expect_exists(self
.zone
+ '2')
1646 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1647 request_filter
= dnsserver
.DNS_ZONE_REQUEST_PRIMARY
1648 tid
= dnsserver
.DNSSRV_TYPEID_DWORD
1649 typeid
, res
= self
.rpc_conn
.DnssrvComplexOperation2(client_version
,
1657 self
.delete_zone(self
.zone
)
1658 self
.delete_zone(self
.zone
+ '2')
1660 # Two zones should've been created, neither of them fully qualified.
1661 zones_we_just_made
= []
1662 zones
= [str(z
.pszZoneName
) for z
in res
.ZoneArray
]
1664 if zone
.startswith(self
.zone
):
1665 zones_we_just_made
.append(zone
)
1666 self
.assertEqual(len(zones_we_just_made
), 2)
1667 self
.assertEqual(set(zones_we_just_made
), {self
.zone
+ '2', self
.zone
})
1669 def delete_zone(self
, zone
):
1670 self
.rpc_conn
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1676 dnsserver
.DNSSRV_TYPEID_NULL
,
1679 def test_soa_query(self
):
1681 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
1684 q
= self
.make_name_question(zone
, dns
.DNS_QTYPE_SOA
, dns
.DNS_QCLASS_IN
)
1686 self
.finish_name_packet(p
, questions
)
1688 (response
, response_packet
) =\
1689 self
.dns_transaction_udp(p
, host
=server_ip
)
1690 # Windows returns OK while BIND logically seems to return NXDOMAIN
1691 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
1692 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1693 self
.assertEqual(response
.ancount
, 0)
1695 self
.create_zone(zone
)
1696 (response
, response_packet
) =\
1697 self
.dns_transaction_udp(p
, host
=server_ip
)
1698 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1699 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1700 self
.assertEqual(response
.ancount
, 1)
1701 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_SOA
)
1703 self
.delete_zone(zone
)
1704 (response
, response_packet
) =\
1705 self
.dns_transaction_udp(p
, host
=server_ip
)
1706 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_NXDOMAIN
)
1707 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
1708 self
.assertEqual(response
.ancount
, 0)
1710 def set_dnsProperty_zero_length(self
, dnsproperty_id
):
1711 records
= self
.samdb
.search(base
=self
.zone_dn
, scope
=ldb
.SCOPE_BASE
,
1712 expression
="(&(objectClass=dnsZone)" +
1713 "(name={0}))".format(self
.zone
),
1714 attrs
=["dNSProperty"])
1715 self
.assertEqual(len(records
), 1)
1716 props
= [ndr_unpack(dnsp
.DnsProperty
, r
)
1717 for r
in records
[0].get('dNSProperty')]
1718 new_props
= [ndr
.ndr_pack(p
) for p
in props
if p
.id == dnsproperty_id
]
1720 zero_length_p
= dnsp
.DnsProperty_short()
1721 zero_length_p
.id = dnsproperty_id
1722 zero_length_p
.namelength
= 1
1723 zero_length_p
.name
= 1
1724 new_props
+= [ndr
.ndr_pack(zero_length_p
)]
1727 update_dict
= {'dn': dn
, 'dnsProperty': new_props
}
1728 self
.samdb
.modify(ldb
.Message
.from_dict(self
.samdb
,
1730 ldb
.FLAG_MOD_REPLACE
))
1732 def test_update_while_dnsProperty_zero_length(self
):
1733 self
.create_zone(self
.zone
)
1734 self
.set_dnsProperty_zero_length(dnsp
.DSPROPERTY_ZONE_ALLOW_UPDATE
)
1735 rec
= self
.dns_update_record('dnspropertytest', ['test txt'])
1736 self
.assertNotEqual(rec
.dwTimeStamp
, 0)
1738 def test_enum_zones_while_dnsProperty_zero_length(self
):
1739 self
.create_zone(self
.zone
)
1740 self
.set_dnsProperty_zero_length(dnsp
.DSPROPERTY_ZONE_ALLOW_UPDATE
)
1741 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1742 request_filter
= dnsserver
.DNS_ZONE_REQUEST_PRIMARY
1743 tid
= dnsserver
.DNSSRV_TYPEID_DWORD
1744 typeid
, res
= self
.rpc_conn
.DnssrvComplexOperation2(client_version
,
1752 def test_rpc_zone_update_while_dnsProperty_zero_length(self
):
1753 self
.create_zone(self
.zone
)
1754 self
.set_dnsProperty_zero_length(dnsp
.DSPROPERTY_ZONE_ALLOW_UPDATE
)
1755 self
.set_params(AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_SECURE
)
1757 def test_rpc_zone_update_while_other_dnsProperty_zero_length(self
):
1758 self
.create_zone(self
.zone
)
1759 self
.set_dnsProperty_zero_length(dnsp
.DSPROPERTY_ZONE_MASTER_SERVERS_DA
)
1760 self
.set_params(AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_SECURE
)
1762 class TestRPCRoundtrip(DNSTest
):
1765 global server
, server_ip
, lp
, creds
1766 self
.server
= server_name
1767 self
.server_ip
= server_ip
1770 self
.rpc_conn
= dnsserver
.dnsserver("ncacn_ip_tcp:%s[sign]" %
1775 def rpc_update(self
, fqn
=None, data
=None, wType
=None, delete
=False):
1776 fqn
= fqn
or ("rpctestrec." + self
.get_dns_domain())
1778 rec
= record_from_string(wType
, data
)
1779 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1780 add_rec_buf
.rec
= rec
1782 add_arg
= add_rec_buf
1786 del_arg
= add_rec_buf
1788 self
.rpc_conn
.DnssrvUpdateRecord2(
1789 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1792 self
.get_dns_domain(),
1797 def test_rpc_self_referencing_cname(self
):
1798 cname
= "cnametest2_unqual_rec_loop"
1799 cname_fqn
= "%s.%s" % (cname
, self
.get_dns_domain())
1802 self
.rpc_update(fqn
=cname
, data
=cname_fqn
,
1803 wType
=dnsp
.DNS_TYPE_CNAME
, delete
=True)
1804 except WERRORError
as e
:
1805 if e
.args
[0] != werror
.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST
:
1806 self
.fail("RPC DNS gaven wrong error on pre-test cleanup "
1807 "for self referencing CNAME: %s" % e
.args
[0])
1810 self
.rpc_update(fqn
=cname
, wType
=dnsp
.DNS_TYPE_CNAME
, data
=cname_fqn
)
1811 except WERRORError
as e
:
1812 if e
.args
[0] != werror
.WERR_DNS_ERROR_CNAME_LOOP
:
1813 self
.fail("RPC DNS gaven wrong error on insertion of "
1814 "self referencing CNAME: %s" % e
.args
[0])
1817 self
.fail("RPC DNS allowed insertion of self referencing CNAME")
1819 def test_update_add_txt_rpc_to_dns(self
):
1820 prefix
, txt
= 'rpctextrec', ['"This is a test"']
1822 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1824 rec
= record_from_string(dnsp
.DNS_TYPE_TXT
, '"\\"This is a test\\""')
1825 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1826 add_rec_buf
.rec
= rec
1828 self
.rpc_conn
.DnssrvUpdateRecord2(
1829 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1832 self
.get_dns_domain(),
1837 except WERRORError
as e
:
1841 self
.check_query_txt(prefix
, txt
)
1843 self
.rpc_conn
.DnssrvUpdateRecord2(
1844 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1847 self
.get_dns_domain(),
1852 def test_update_add_null_padded_txt_record(self
):
1853 "test adding records works"
1854 prefix
, txt
= 'pad1textrec', ['"This is a test"', '', '']
1855 p
= self
.make_txt_update(prefix
, txt
)
1856 (response
, response_packet
) =\
1857 self
.dns_transaction_udp(p
, host
=server_ip
)
1858 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1859 self
.check_query_txt(prefix
, txt
)
1860 self
.assertIsNotNone(
1861 dns_record_match(self
.rpc_conn
,
1863 self
.get_dns_domain(),
1864 "%s.%s" % (prefix
, self
.get_dns_domain()),
1866 '"\\"This is a test\\"" "" ""'))
1868 prefix
, txt
= 'pad2textrec', ['"This is a test"', '', '', 'more text']
1869 p
= self
.make_txt_update(prefix
, txt
)
1870 (response
, response_packet
) =\
1871 self
.dns_transaction_udp(p
, host
=server_ip
)
1872 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1873 self
.check_query_txt(prefix
, txt
)
1874 self
.assertIsNotNone(
1878 self
.get_dns_domain(),
1879 "%s.%s" % (prefix
, self
.get_dns_domain()),
1881 '"\\"This is a test\\"" "" "" "more text"'))
1883 prefix
, txt
= 'pad3textrec', ['', '', '"This is a test"']
1884 p
= self
.make_txt_update(prefix
, txt
)
1885 (response
, response_packet
) =\
1886 self
.dns_transaction_udp(p
, host
=server_ip
)
1887 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
1888 self
.check_query_txt(prefix
, txt
)
1889 self
.assertIsNotNone(
1893 self
.get_dns_domain(),
1894 "%s.%s" % (prefix
, self
.get_dns_domain()),
1896 '"" "" "\\"This is a test\\""'))
1898 def test_update_add_padding_rpc_to_dns(self
):
1899 prefix
, txt
= 'pad1textrec', ['"This is a test"', '', '']
1900 prefix
= 'rpc' + prefix
1901 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1903 rec
= record_from_string(dnsp
.DNS_TYPE_TXT
,
1904 '"\\"This is a test\\"" "" ""')
1905 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1906 add_rec_buf
.rec
= rec
1908 self
.rpc_conn
.DnssrvUpdateRecord2(
1909 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1912 self
.get_dns_domain(),
1917 except WERRORError
as e
:
1921 self
.check_query_txt(prefix
, txt
)
1923 self
.rpc_conn
.DnssrvUpdateRecord2(
1924 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1927 self
.get_dns_domain(),
1932 prefix
, txt
= 'pad2textrec', ['"This is a test"', '', '', 'more text']
1933 prefix
= 'rpc' + prefix
1934 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1936 rec
= record_from_string(dnsp
.DNS_TYPE_TXT
,
1937 '"\\"This is a test\\"" "" "" "more text"')
1938 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1939 add_rec_buf
.rec
= rec
1941 self
.rpc_conn
.DnssrvUpdateRecord2(
1942 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1945 self
.get_dns_domain(),
1950 except WERRORError
as e
:
1954 self
.check_query_txt(prefix
, txt
)
1956 self
.rpc_conn
.DnssrvUpdateRecord2(
1957 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1960 self
.get_dns_domain(),
1965 prefix
, txt
= 'pad3textrec', ['', '', '"This is a test"']
1966 prefix
= 'rpc' + prefix
1967 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
1969 rec
= record_from_string(dnsp
.DNS_TYPE_TXT
,
1970 '"" "" "\\"This is a test\\""')
1971 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1972 add_rec_buf
.rec
= rec
1974 self
.rpc_conn
.DnssrvUpdateRecord2(
1975 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1978 self
.get_dns_domain(),
1982 except WERRORError
as e
:
1986 self
.check_query_txt(prefix
, txt
)
1988 self
.rpc_conn
.DnssrvUpdateRecord2(
1989 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1992 self
.get_dns_domain(),
1997 # Test is incomplete due to strlen against txt records
1998 def test_update_add_null_char_txt_record(self
):
1999 "test adding records works"
2000 prefix
, txt
= 'nulltextrec', ['NULL\x00BYTE']
2001 p
= self
.make_txt_update(prefix
, txt
)
2002 (response
, response_packet
) =\
2003 self
.dns_transaction_udp(p
, host
=server_ip
)
2004 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2005 self
.check_query_txt(prefix
, ['NULL'])
2006 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2007 self
.get_dns_domain(),
2008 "%s.%s" % (prefix
, self
.get_dns_domain()),
2009 dnsp
.DNS_TYPE_TXT
, '"NULL"'))
2011 prefix
, txt
= 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
2012 p
= self
.make_txt_update(prefix
, txt
)
2013 (response
, response_packet
) =\
2014 self
.dns_transaction_udp(p
, host
=server_ip
)
2015 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2016 self
.check_query_txt(prefix
, ['NULL', 'NULL'])
2017 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2018 self
.get_dns_domain(),
2019 "%s.%s" % (prefix
, self
.get_dns_domain()),
2020 dnsp
.DNS_TYPE_TXT
, '"NULL" "NULL"'))
2022 def test_update_add_null_char_rpc_to_dns(self
):
2023 prefix
= 'rpcnulltextrec'
2024 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2026 rec
= record_from_string(dnsp
.DNS_TYPE_TXT
, '"NULL\x00BYTE"')
2027 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2028 add_rec_buf
.rec
= rec
2030 self
.rpc_conn
.DnssrvUpdateRecord2(
2031 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2034 self
.get_dns_domain(),
2039 except WERRORError
as e
:
2043 self
.check_query_txt(prefix
, ['NULL'])
2045 self
.rpc_conn
.DnssrvUpdateRecord2(
2046 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2049 self
.get_dns_domain(),
2054 def test_update_add_hex_char_txt_record(self
):
2055 "test adding records works"
2056 prefix
, txt
= 'hextextrec', ['HIGH\xFFBYTE']
2057 p
= self
.make_txt_update(prefix
, txt
)
2058 (response
, response_packet
) =\
2059 self
.dns_transaction_udp(p
, host
=server_ip
)
2060 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2061 self
.check_query_txt(prefix
, txt
)
2062 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2063 self
.get_dns_domain(),
2064 "%s.%s" % (prefix
, self
.get_dns_domain()),
2065 dnsp
.DNS_TYPE_TXT
, '"HIGH\xFFBYTE"'))
2067 def test_update_add_hex_rpc_to_dns(self
):
2068 prefix
, txt
= 'hextextrec', ['HIGH\xFFBYTE']
2069 prefix
= 'rpc' + prefix
2070 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2072 rec
= record_from_string(dnsp
.DNS_TYPE_TXT
, '"HIGH\xFFBYTE"')
2073 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2074 add_rec_buf
.rec
= rec
2076 self
.rpc_conn
.DnssrvUpdateRecord2(
2077 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2080 self
.get_dns_domain(),
2085 except WERRORError
as e
:
2089 self
.check_query_txt(prefix
, txt
)
2091 self
.rpc_conn
.DnssrvUpdateRecord2(
2092 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2095 self
.get_dns_domain(),
2100 def test_update_add_slash_txt_record(self
):
2101 "test adding records works"
2102 prefix
, txt
= 'slashtextrec', ['Th\\=is=is a test']
2103 p
= self
.make_txt_update(prefix
, txt
)
2104 (response
, response_packet
) =\
2105 self
.dns_transaction_udp(p
, host
=server_ip
)
2106 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2107 self
.check_query_txt(prefix
, txt
)
2108 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2109 self
.get_dns_domain(),
2110 "%s.%s" % (prefix
, self
.get_dns_domain()),
2111 dnsp
.DNS_TYPE_TXT
, '"Th\\\\=is=is a test"'))
2113 # This test fails against Windows as it eliminates slashes in RPC
2114 # One typical use for a slash is in records like 'var=value' to
2115 # escape '=' characters.
2116 def test_update_add_slash_rpc_to_dns(self
):
2117 prefix
, txt
= 'slashtextrec', ['Th\\=is=is a test']
2118 prefix
= 'rpc' + prefix
2119 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2121 rec
= record_from_string(dnsp
.DNS_TYPE_TXT
, '"Th\\\\=is=is a test"')
2122 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2123 add_rec_buf
.rec
= rec
2125 self
.rpc_conn
.DnssrvUpdateRecord2(
2126 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2129 self
.get_dns_domain(),
2134 except WERRORError
as e
:
2138 self
.check_query_txt(prefix
, txt
)
2141 self
.rpc_conn
.DnssrvUpdateRecord2(
2142 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2145 self
.get_dns_domain(),
2150 def test_update_add_two_txt_records(self
):
2151 "test adding two txt records works"
2152 prefix
, txt
= 'textrec2', ['"This is a test"',
2153 '"and this is a test, too"']
2154 p
= self
.make_txt_update(prefix
, txt
)
2155 (response
, response_packet
) =\
2156 self
.dns_transaction_udp(p
, host
=server_ip
)
2157 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2158 self
.check_query_txt(prefix
, txt
)
2159 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2160 self
.get_dns_domain(),
2161 "%s.%s" % (prefix
, self
.get_dns_domain()),
2162 dnsp
.DNS_TYPE_TXT
, '"\\"This is a test\\""' +
2163 ' "\\"and this is a test, too\\""'))
2165 def test_update_add_two_rpc_to_dns(self
):
2166 prefix
, txt
= 'textrec2', ['"This is a test"',
2167 '"and this is a test, too"']
2168 prefix
= 'rpc' + prefix
2169 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2171 rec
= record_from_string(dnsp
.DNS_TYPE_TXT
,
2172 '"\\"This is a test\\""' +
2173 ' "\\"and this is a test, too\\""')
2174 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2175 add_rec_buf
.rec
= rec
2177 self
.rpc_conn
.DnssrvUpdateRecord2(
2178 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2181 self
.get_dns_domain(),
2186 except WERRORError
as e
:
2190 self
.check_query_txt(prefix
, txt
)
2192 self
.rpc_conn
.DnssrvUpdateRecord2(
2193 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2196 self
.get_dns_domain(),
2201 def test_update_add_empty_txt_records(self
):
2202 "test adding two txt records works"
2203 prefix
, txt
= 'emptytextrec', []
2204 p
= self
.make_txt_update(prefix
, txt
)
2205 (response
, response_packet
) =\
2206 self
.dns_transaction_udp(p
, host
=server_ip
)
2207 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
2208 self
.check_query_txt(prefix
, txt
)
2209 self
.assertIsNotNone(dns_record_match(self
.rpc_conn
, self
.server_ip
,
2210 self
.get_dns_domain(),
2211 "%s.%s" % (prefix
, self
.get_dns_domain()),
2212 dnsp
.DNS_TYPE_TXT
, ''))
2214 def test_update_add_empty_rpc_to_dns(self
):
2215 prefix
, txt
= 'rpcemptytextrec', []
2217 name
= "%s.%s" % (prefix
, self
.get_dns_domain())
2219 rec
= record_from_string(dnsp
.DNS_TYPE_TXT
, '')
2220 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
2221 add_rec_buf
.rec
= rec
2223 self
.rpc_conn
.DnssrvUpdateRecord2(
2224 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2227 self
.get_dns_domain(),
2231 except WERRORError
as e
:
2235 self
.check_query_txt(prefix
, txt
)
2237 self
.rpc_conn
.DnssrvUpdateRecord2(
2238 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2241 self
.get_dns_domain(),
2247 TestProgram(module
=__name__
, opts
=subunitopts
)