ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / dcerpc / dnsserver.py
blob13c9af818206b8b3fe741fa26878da922a447154
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dcerpc.dnsserver"""
20 import os
21 import ldb
23 from samba.auth import system_session
24 from samba.samdb import SamDB
25 from samba.ndr import ndr_unpack
26 from samba.dcerpc import dnsp, dnsserver, security
27 from samba.tests import RpcInterfaceTestCase, env_get_var_value
28 from samba.dnsserver import record_from_string, flag_from_string, ARecord
29 from samba import sd_utils, descriptor
30 from samba import WERRORError, werror
33 class DnsserverTests(RpcInterfaceTestCase):
35 @classmethod
36 def setUpClass(cls):
37 super().setUpClass()
39 good_dns = ["SAMDOM.EXAMPLE.COM",
40 "1.EXAMPLE.COM",
41 "%sEXAMPLE.COM" % ("1." * 100),
42 "EXAMPLE",
43 "\n.COM",
44 "!@#$%^&*()_",
45 "HIGH\xFFBYTE",
46 "@.EXAMPLE.COM",
47 "."]
48 bad_dns = ["...",
49 ".EXAMPLE.COM",
50 ".EXAMPLE.",
51 "",
52 "SAMDOM..EXAMPLE.COM"]
54 good_mx = ["SAMDOM.EXAMPLE.COM 65535"]
55 bad_mx = []
57 good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"]
58 bad_srv = []
60 for bad_dn in bad_dns:
61 bad_mx.append("%s 1" % bad_dn)
62 bad_srv.append("%s 0 0 0" % bad_dn)
63 for good_dn in good_dns:
64 good_mx.append("%s 1" % good_dn)
65 good_srv.append("%s 0 0 0" % good_dn)
67 cls.good_records = {
68 "A": ["192.168.0.1",
69 "255.255.255.255"],
70 "AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
71 "0000:0000:0000:0000:0000:0000:0000:0000",
72 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
73 "1234:1234:1234::",
74 "1234:1234:1234:1234:1234::",
75 "1234:5678:9ABC:DEF0::",
76 "0000:0000::0000",
77 "1234::5678:9ABC:0000:0000:0000:0000",
78 "::1",
79 "::",
80 "1:1:1:1:1:1:1:1"],
81 "PTR": good_dns,
82 "CNAME": good_dns,
83 "NS": good_dns,
84 "MX": good_mx,
85 "SRV": good_srv,
86 "TXT": ["text", "", "@#!", "\n"]
89 cls.bad_records = {
90 "A": ["192.168.0.500",
91 "255.255.255.255/32"],
92 "AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
93 "0000:0000:0000:0000:0000:0000:0000:0000/1",
94 "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
95 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
96 "1234:5678:9ABC:DEF0:1234:5678:9ABC",
97 "1111::1111::1111"],
98 "PTR": bad_dns,
99 "CNAME": bad_dns,
100 "NS": bad_dns,
101 "MX": bad_mx,
102 "SRV": bad_srv
105 # Because we use uint16_t for these numbers, we can't
106 # actually create these records.
107 invalid_mx = ["SAMDOM.EXAMPLE.COM -1",
108 "SAMDOM.EXAMPLE.COM 65536",
109 "%s 1" % ("A" * 256)]
110 invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
111 "SAMDOM.EXAMPLE.COM 0 0 65536",
112 "SAMDOM.EXAMPLE.COM 65536 0 0"]
113 cls.invalid_records = {
114 "MX": invalid_mx,
115 "SRV": invalid_srv
118 def setUp(self):
119 super().setUp()
120 self.server = os.environ["DC_SERVER"]
121 self.zone = env_get_var_value("REALM").lower()
122 self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
123 self.get_loadparm(),
124 self.get_credentials())
126 self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"],
127 lp=self.get_loadparm(),
128 session_info=system_session(),
129 credentials=self.get_credentials())
131 self.custom_zone = "zone"
132 zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
133 zone_create_info.pszZoneName = self.custom_zone
134 zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
135 zone_create_info.fAging = 0
136 zone_create_info.fDsIntegrated = 1
137 zone_create_info.fLoadExisting = 1
138 zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
140 self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
142 self.server,
143 None,
145 'ZoneCreate',
146 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
147 zone_create_info)
149 def tearDown(self):
150 self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
152 self.server,
153 self.custom_zone,
155 'DeleteZoneFromDs',
156 dnsserver.DNSSRV_TYPEID_NULL,
157 None)
158 super().tearDown()
160 def test_enum_is_sorted(self):
162 Confirm the zone is sorted
165 record_str = "192.168.50.50"
166 record_type_str = "A"
167 self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str)
168 self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str)
169 self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str)
170 self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str)
171 self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str)
173 # This becomes an extra A on the zone itself by server-side magic
174 self.add_record(self.custom_zone, self.custom_zone, record_type_str, record_str)
176 _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
178 self.server,
179 self.custom_zone,
180 "@",
181 None,
182 flag_from_string(record_type_str),
183 dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
184 None,
185 None)
187 self.assertEqual(len(result.rec), 6)
188 self.assertEqual(result.rec[0].dnsNodeName.str, "")
189 self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0")
190 self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1")
191 self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2")
192 self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3")
193 self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4")
195 def test_enum_is_sorted_with_zone_dup(self):
197 Confirm the zone is sorted
200 record_str = "192.168.50.50"
201 record_type_str = "A"
202 self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str)
203 self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str)
204 self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str)
205 self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str)
206 self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str)
208 # This triggers a bug in old Samba
209 self.add_record(self.custom_zone, self.custom_zone + "1", record_type_str, record_str)
211 dn, record = self.get_record_from_db(self.custom_zone, self.custom_zone + "1")
213 new_dn = ldb.Dn(self.samdb, str(dn))
214 new_dn.set_component(0, "dc", self.custom_zone)
215 self.samdb.rename(dn, new_dn)
217 _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
219 self.server,
220 self.custom_zone,
221 "@",
222 None,
223 flag_from_string(record_type_str),
224 dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
225 None,
226 None)
228 self.assertEqual(len(result.rec), 7)
229 self.assertEqual(result.rec[0].dnsNodeName.str, "")
230 self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0")
231 self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1")
232 self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2")
233 self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3")
234 self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4")
236 # Windows doesn't reload the zone fast enough, but doesn't
237 # have the bug anyway, it will sort last on both names (where
238 # it should)
239 if result.rec[6].dnsNodeName.str != (self.custom_zone + "1"):
240 self.assertEqual(result.rec[6].dnsNodeName.str, self.custom_zone)
242 def test_enum_is_sorted_children_prefix_first(self):
244 Confirm the zone returns the selected prefix first but no more
245 as Samba is flappy for the full sort
248 record_str = "192.168.50.50"
249 record_type_str = "A"
250 self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str)
251 self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str)
252 self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str)
253 self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str)
254 self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str)
256 # Not expected to be returned
257 self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str)
259 _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
261 self.server,
262 self.custom_zone,
263 "a.b",
264 None,
265 flag_from_string(record_type_str),
266 dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
267 None,
268 None)
270 self.assertEqual(len(result.rec), 6)
271 self.assertEqual(result.rec[0].dnsNodeName.str, "")
273 def test_enum_is_sorted_children(self):
275 Confirm the zone is sorted
278 record_str = "192.168.50.50"
279 record_type_str = "A"
280 self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str)
281 self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str)
282 self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str)
283 self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str)
284 self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str)
286 # Not expected to be returned
287 self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str)
289 _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
291 self.server,
292 self.custom_zone,
293 "a.b",
294 None,
295 flag_from_string(record_type_str),
296 dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
297 None,
298 None)
300 self.assertEqual(len(result.rec), 6)
301 self.assertEqual(result.rec[0].dnsNodeName.str, "")
302 self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0")
303 self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1")
304 self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2")
305 self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3")
306 self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4")
308 # This test fails against Samba (but passes against Windows),
309 # because Samba does not return the record when we enum records.
310 # Records can be given DNS_RANK_NONE when the zone they are in
311 # does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be
312 # deleted, however, we do not consider this urgent to fix and
313 # so this test is a knownfail.
314 def test_rank_none(self):
316 See what happens when we set a record's rank to
317 DNS_RANK_NONE.
320 record_str = "192.168.50.50"
321 record_type_str = "A"
322 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
324 dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
325 record.rank = 0 # DNS_RANK_NONE
326 res = self.samdb.dns_replace_by_dn(dn, [record])
327 if res is not None:
328 self.fail("Unable to update dns record to have DNS_RANK_NONE.")
330 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
331 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
332 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
333 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
335 def test_dns_tombstoned_zero_timestamp(self):
336 """What happens with a zero EntombedTime tombstone?"""
337 # A zero-timestamp tombstone record has a special meaning for
338 # dns_common_replace(), which is the function exposed by
339 # samdb.dns_replace_by_dn(), and which is *NOT* a general
340 # purpose record replacement function but a specialised part
341 # of the dns update mechanism (for both DLZ and internal).
343 # In the earlier stages of handling updates, a record that
344 # needs to be deleted is set to be a tombstone with a zero
345 # timestamp. dns_common_replace() notices this specific
346 # marker, and if there are no other records, marks the node as
347 # tombstoned, in the process adding a "real" tombstone.
349 # If the tombstone has a non-zero timestamp, as you'll see in
350 # the next test, dns_common_replace will decide that the node
351 # is already tombstoned, and that no action needs to be taken.
353 # This test has worked historically, entirely by accident, as
354 # changing the wType appears to
356 record_str = "192.168.50.50"
357 self.add_record(self.custom_zone, "testrecord", 'A', record_str)
359 dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
360 record.wType = dnsp.DNS_TYPE_TOMBSTONE
361 record.data = 0
362 self.samdb.dns_replace_by_dn(dn, [record])
364 # there should be no A record, and one TOMBSTONE record.
365 self.assert_num_records(self.custom_zone, "testrecord", 'A', 0)
366 # we can't make assertions about the tombstone count based on
367 # RPC calls, as there are no tombstones in RPCs (there is
368 # "DNS_TYPE_ZERO" instead). Nor do tombstones show up if we
369 # use DNS_TYPE_ALL.
370 self.assert_num_records(self.custom_zone, "testrecord", 'ALL', 0)
372 # But we can use LDAP:
373 records = self.ldap_get_records(self.custom_zone, "testrecord")
374 self.assertEqual(len(records), 1)
375 r = records[0]
376 self.assertEqual(r.wType, dnsp.DNS_TYPE_TOMBSTONE)
377 self.assertGreater(r.data, 1e17) # ~ October 1916
379 # this should fail, because no A records.
380 self.delete_record(self.custom_zone, "testrecord", 'A', record_str,
381 assertion=False)
383 def test_dns_tombstoned_nonzero_timestamp(self):
384 """See what happens when we set a record to be tombstoned with an
385 EntombedTime timestamp.
387 # Because this tombstone has a non-zero EntombedTime,
388 # dns_common_replace() will decide the node was already
389 # tombstoned and there is nothing to be done, leaving the A
390 # record where it was.
392 record_str = "192.168.50.50"
393 self.add_record(self.custom_zone, "testrecord", 'A', record_str)
395 dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
396 record.wType = dnsp.DNS_TYPE_TOMBSTONE
397 record.data = 0x123456789A
398 self.samdb.dns_replace_by_dn(dn, [record])
400 # there should be the A record and no TOMBSTONE
401 self.assert_num_records(self.custom_zone, "testrecord", 'A', 1)
402 self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0)
403 # this should succeed
404 self.delete_record(self.custom_zone, "testrecord", 'A', record_str,
405 assertion=True)
406 self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0)
407 self.assert_num_records(self.custom_zone, "testrecord", 'A', 0)
409 def get_record_from_db(self, zone_name, record_name):
411 Returns (dn of record, record)
414 zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
415 expression="(objectClass=dnsZone)",
416 attrs=["cn"])
418 zone_dn = None
419 for zone in zones:
420 if "DC=%s," % zone_name in str(zone.dn):
421 zone_dn = zone.dn
422 break
424 if zone_dn is None:
425 raise AssertionError("Couldn't find zone '%s'." % zone_name)
427 records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
428 expression="(objectClass=dnsNode)",
429 attrs=["dnsRecord"])
431 for old_packed_record in records:
432 if record_name in str(old_packed_record.dn):
433 rec = ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0])
434 return (old_packed_record.dn, rec)
436 def ldap_get_records(self, zone, name):
437 zone_dn = (f"DC={zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
438 f"{self.samdb.get_default_basedn()}")
440 expr = f"(&(objectClass=dnsNode)(name={name}))"
441 nodes = self.samdb.search(base=zone_dn,
442 scope=ldb.SCOPE_SUBTREE,
443 expression=expr,
444 attrs=["dnsRecord"])
446 records = nodes[0].get('dnsRecord')
447 return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records]
449 def test_duplicate_matching(self):
451 Make sure that records which should be distinct from each other or duplicate
452 to each other behave as expected.
455 distinct_dns = [("SAMDOM.EXAMPLE.COM",
456 "SAMDOM.EXAMPLE.CO",
457 "EXAMPLE.COM", "SAMDOM.EXAMPLE")]
458 duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"),
459 ("EXAMPLE.", "EXAMPLE")]
461 # Every tuple has entries which should be considered duplicate to one another.
462 duplicates = {
463 "AAAA": [("AAAA::", "aaaa::"),
464 ("AAAA::", "AAAA:0000::"),
465 ("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"),
466 ("AAAA::", "AAAA:0:0:0:0:0:0:0"),
467 ("0123::", "123::"),
468 ("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")],
471 # Every tuple has entries which should be considered distinct from one another.
472 distinct = {
473 "A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")],
474 "AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"),
475 ("1000::", "::1000"),
476 ("::1", "::11", "::1111"),
477 ("1234::", "0234::")],
478 "SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1",
479 "SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")],
480 "MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")],
481 "TXT": [("A RECORD", "B RECORD", "a record")]
484 for record_type_str in ("PTR", "CNAME", "NS"):
485 distinct[record_type_str] = distinct_dns
486 duplicates[record_type_str] = duplicate_dns
488 for record_type_str in duplicates:
489 for duplicate_tuple in duplicates[record_type_str]:
490 # Attempt to add duplicates and make sure that all after the first fails
491 self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
492 for record in duplicate_tuple:
493 self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
494 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
495 self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
497 # Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds
498 for record in duplicate_tuple:
499 self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
500 self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
502 for record_type_str in distinct:
503 for distinct_tuple in distinct[record_type_str]:
504 # Attempt to add distinct and make sure that they all succeed within a tuple
505 i = 0
506 for record in distinct_tuple:
507 i = i + 1
508 try:
509 self.add_record(self.custom_zone, "testrecord", record_type_str, record)
510 # All records should have been added.
511 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i)
512 except AssertionError as e:
513 raise AssertionError("Failed to add %s, which should be distinct from all others in the set. "
514 "Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple))
515 for record in distinct_tuple:
516 self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
517 # CNAMEs should not have been added, since they conflict.
518 if record_type_str == 'CNAME':
519 continue
521 # Add the first distinct and attempt to remove all of the others, making sure this fails
522 # Windows fails this test. This is probably due to weird tombstoning behavior.
523 self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
524 for record in distinct_tuple:
525 if record == distinct_tuple[0]:
526 continue
527 try:
528 self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
529 except AssertionError as e:
530 raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s"
531 % (distinct_tuple[0], record, e))
532 self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
534 def test_accept_valid_commands(self):
536 Make sure that we can add, update and delete a variety
537 of valid records.
539 for record_type_str in self.good_records:
540 for record_str in self.good_records[record_type_str]:
541 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
542 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
543 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
545 def check_params(self, wDataLength, rank, flags, dwTtlSeconds, dwReserved, data,
546 wType, dwTimeStamp=0, zone="zone", rec_name="testrecord"):
547 res = self.get_record_from_db(zone, rec_name)
548 self.assertIsNotNone(res, "Expected record %s but was not found over LDAP." % data)
549 (rec_dn, rec) = res
550 self.assertEqual(wDataLength, rec.wDataLength, "Unexpected data length for record %s. Got %s, expected %s." % (data, rec.wDataLength, wDataLength))
551 self.assertEqual(rank, rec.rank, "Unexpected rank for record %s. Got %s, expected %s." % (data, rec.rank, rank))
552 self.assertEqual(flags, rec.flags, "Unexpected flags for record %s. Got %s, expected %s." % (data, rec.flags, flags))
553 self.assertEqual(dwTtlSeconds, rec.dwTtlSeconds, "Unexpected time to live for record %s. Got %s, expected %s." % (data, rec.dwTtlSeconds, dwTtlSeconds))
554 self.assertEqual(dwReserved, rec.dwReserved, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data, rec.dwReserved, dwReserved))
555 self.assertEqual(data.lower(), rec.data.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data, rec.data.lower(), data.lower()))
556 self.assertEqual(wType, rec.wType, "Unexpected wType for record %s. Got %s, expected %s." % (data, rec.wType, wType))
557 self.assertEqual(dwTimeStamp, rec.dwTimeStamp, "Unexpected timestamp for record %s. Got %s, expected %s." % (data, rec.dwTimeStamp, dwTimeStamp))
559 def test_record_params(self):
561 Make sure that, when we add records to the database,
562 they're added with reasonable parameters.
564 self.add_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
565 self.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1)
566 self.delete_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
567 self.add_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
568 self.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28)
569 self.delete_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
570 self.add_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
571 self.check_params(13, 240, 0, 900, 0, "cnamedest", 5)
572 self.delete_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
574 def test_reject_invalid_commands(self):
576 Make sure that we can't add a variety of invalid records,
577 and that we can't update valid records to invalid ones.
579 num_failures = 0
580 for record_type_str in self.bad_records:
581 for record_str in self.bad_records[record_type_str]:
582 # Attempt to add the bad record, which should fail. Then, attempt to query for and delete
583 # it. Since it shouldn't exist, these should fail too.
584 try:
585 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
586 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
587 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
588 except AssertionError as e:
589 print(e)
590 num_failures = num_failures + 1
592 # Also try to update valid records to invalid ones, making sure this fails
593 for record_type_str in self.bad_records:
594 for record_str in self.bad_records[record_type_str]:
595 good_record_str = self.good_records[record_type_str][0]
596 self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
597 try:
598 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
599 except AssertionError as e:
600 print(e)
601 num_failures = num_failures + 1
602 self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
604 self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures)
606 def test_add_duplicate_different_type(self):
608 Attempt to add some values which have the same name as
609 existing ones, just a different type.
611 num_failures = 0
612 for record_type_str_1 in self.good_records:
613 record1 = self.good_records[record_type_str_1][0]
614 self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1)
615 for record_type_str_2 in self.good_records:
616 if record_type_str_1 == record_type_str_2:
617 continue
619 record2 = self.good_records[record_type_str_2][0]
621 has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A'
622 has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA'
623 has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME'
624 has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR'
625 has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX'
626 has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV'
627 has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT'
629 # If we attempt to add any record except A or AAAA when we already have an NS record,
630 # the add should fail.
631 add_error_ok = False
632 if record_type_str_1 == 'NS' and not has_a and not has_aaaa:
633 add_error_ok = True
634 # If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail.
635 if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa):
636 add_error_ok = True
637 # If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail.
638 # If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail.
639 if has_cname and (has_a or has_aaaa or has_srv or has_txt):
640 add_error_ok = True
642 try:
643 self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2)
644 if add_error_ok:
645 num_failures = num_failures + 1
646 print("Expected error when adding %s while a %s existed."
647 % (record_type_str_2, record_type_str_1))
648 except AssertionError as e:
649 if not add_error_ok:
650 num_failures = num_failures + 1
651 print("Didn't expect error when adding %s while a %s existed."
652 % (record_type_str_2, record_type_str_1))
654 if not add_error_ok:
655 # In the "normal" case, we expect the add to work and us to have one of each type of record afterwards.
656 expected_num_type_1 = 1
657 expected_num_type_2 = 1
659 # If we have an MX record, a PTR record should replace it when added.
660 # If we have a PTR record, an MX record should replace it when added.
661 if has_ptr and has_mx:
662 expected_num_type_1 = 0
664 # If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added.
665 if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'):
666 expected_num_type_1 = 0
668 if (record_type_str_1 == 'NS' and (has_a or has_aaaa)):
669 expected_num_type_2 = 0
671 try:
672 self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1)
673 except AssertionError as e:
674 num_failures = num_failures + 1
675 print("Expected %s %s records after adding a %s record and a %s record already existed."
676 % (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1))
677 try:
678 self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2)
679 except AssertionError as e:
680 num_failures = num_failures + 1
681 print("Expected %s %s records after adding a %s record and a %s record already existed."
682 % (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1))
684 try:
685 self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2)
686 except AssertionError as e:
687 pass
689 self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1)
691 self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures)
693 # Windows fails this test in the same way we do.
694 def _test_cname(self):
696 Test some special properties of CNAME records.
699 # RFC 1912: When there is a CNAME record, there must not be any other records with the same alias
700 cname_record = self.good_records["CNAME"][1]
701 self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record)
703 for record_type_str in self.good_records:
704 other_record = self.good_records[record_type_str][0]
705 self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False)
706 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
708 # RFC 2181: MX & NS records must not be allowed to point to a CNAME alias
709 mx_record = "testrecord 1"
710 ns_record = "testrecord"
712 self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False)
713 self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False)
715 self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record)
717 def test_add_duplicate_value(self):
719 Make sure that we can't add duplicate values of any type.
721 for record_type_str in self.good_records:
722 record = self.good_records[record_type_str][0]
724 self.add_record(self.custom_zone, "testrecord", record_type_str, record)
725 self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
726 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
727 self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
729 def test_add_similar_value(self):
731 Attempt to add values with the same name and type in the same
732 zone. This should work, and should result in both values
733 existing (except with some types).
735 for record_type_str in self.good_records:
736 for i in range(1, len(self.good_records[record_type_str])):
737 record1 = self.good_records[record_type_str][i - 1]
738 record2 = self.good_records[record_type_str][i]
740 if record_type_str == 'CNAME':
741 continue
742 # We expect CNAME records to override one another, as
743 # an alias can only map to one CNAME record.
744 # Also, on Windows, when the empty string is added and
745 # another record is added afterwards, the empty string
746 # will be silently overridden by the new one, so it
747 # fails this test for the empty string.
748 expected_num = 1 if record_type_str == 'CNAME' else 2
750 self.add_record(self.custom_zone, "testrecord", record_type_str, record1)
751 self.add_record(self.custom_zone, "testrecord", record_type_str, record2)
752 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num)
753 self.delete_record(self.custom_zone, "testrecord", record_type_str, record1)
754 self.delete_record(self.custom_zone, "testrecord", record_type_str, record2)
756 def assert_record(self, zone, name, record_type_str, expected_record_str,
757 assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
759 Asserts whether or not the given record with the given type exists in the
760 given zone.
762 try:
763 _, result = self.query_records(zone, name, record_type_str)
764 except RuntimeError as e:
765 if assertion:
766 raise AssertionError("Record '%s' of type '%s' was not present when it should have been."
767 % (expected_record_str, record_type_str))
768 else:
769 return
771 found = False
772 for record in result.rec[0].records:
773 if record.data == expected_record_str:
774 found = True
775 break
777 if found and not assertion:
778 raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str))
779 elif not found and assertion:
780 raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str))
782 def assert_num_records(self, zone, name, record_type_str, expected_num=1,
783 client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
785 Asserts that there are a given amount of records with the given type in
786 the given zone.
788 try:
789 _, result = self.query_records(zone, name, record_type_str)
790 num_results = len(result.rec[0].records)
791 if not num_results == expected_num:
792 raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected."
793 % (num_results, record_type_str, name, expected_num))
794 except RuntimeError:
795 if not expected_num == 0:
796 raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected."
797 % (record_type_str, name, expected_num))
799 def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
800 return self.conn.DnssrvEnumRecords2(client_version,
802 self.server,
803 zone,
804 name,
805 None,
806 flag_from_string(record_type_str),
807 dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN,
808 None,
809 None)
811 def add_record(self, zone, name, record_type_str, record_str,
812 assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
814 Attempts to add a map from the given name to a record of the given type,
815 in the given zone.
816 Also asserts whether or not the add was successful.
817 This can also update existing records if they have the same name.
819 record = record_from_string(record_type_str, record_str, sep=' ')
820 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
821 add_rec_buf.rec = record
823 try:
824 self.conn.DnssrvUpdateRecord2(client_version,
826 self.server,
827 zone,
828 name,
829 add_rec_buf,
830 None)
831 if not assertion:
832 raise AssertionError("Successfully added record '%s' of type '%s', which should have failed."
833 % (record_str, record_type_str))
834 except RuntimeError as e:
835 if assertion:
836 raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'."
837 % (record_str, record_type_str, str(e)))
839 def delete_record(self, zone, name, record_type_str, record_str,
840 assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
842 Attempts to delete a record with the given name, record and record type
843 from the given zone.
844 Also asserts whether or not the deletion was successful.
846 record = record_from_string(record_type_str, record_str, sep=' ')
847 del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
848 del_rec_buf.rec = record
850 try:
851 self.conn.DnssrvUpdateRecord2(client_version,
853 self.server,
854 zone,
855 name,
856 None,
857 del_rec_buf)
858 if not assertion:
859 raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str))
860 except RuntimeError as e:
861 if assertion:
862 raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e)))
864 def test_query2(self):
865 typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
867 self.server,
868 None,
869 'ServerInfo')
870 self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid)
872 typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET,
874 self.server,
875 None,
876 'ServerInfo')
877 self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid)
879 typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
881 self.server,
882 None,
883 'ServerInfo')
884 self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid)
887 # This test is to confirm that we do not support multizone operations,
888 # which are designated by a non-zero dwContext value (the 3rd argument
889 # to DnssrvOperation).
890 def test_operation_invalid(self):
891 non_zone = 'a-zone-that-does-not-exist'
892 typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
893 name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
894 name_and_param.pszNodeName = 'AllowUpdate'
895 name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE
896 try:
897 res = self.conn.DnssrvOperation(self.server,
898 non_zone,
900 'ResetDwordProperty',
901 typeid,
902 name_and_param)
903 except WERRORError as e:
904 if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
905 return
907 # We should always encounter a DOES_NOT_EXIST error.
908 self.fail()
910 # This test is to confirm that we do not support multizone operations,
911 # which are designated by a non-zero dwContext value (the 5th argument
912 # to DnssrvOperation2).
913 def test_operation2_invalid(self):
914 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
915 non_zone = 'a-zone-that-does-not-exist'
916 typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
917 name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
918 name_and_param.pszNodeName = 'AllowUpdate'
919 name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE
920 try:
921 res = self.conn.DnssrvOperation2(client_version,
923 self.server,
924 non_zone,
926 'ResetDwordProperty',
927 typeid,
928 name_and_param)
929 except WERRORError as e:
930 if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
931 return
933 # We should always encounter a DOES_NOT_EXIST error.
934 self.fail()
936 def test_operation2(self):
937 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
938 rev_zone = '1.168.192.in-addr.arpa'
940 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
941 zone_create.pszZoneName = rev_zone
942 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
943 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
944 zone_create.fAging = 0
945 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
947 # Create zone
948 self.conn.DnssrvOperation2(client_version,
950 self.server,
951 None,
953 'ZoneCreate',
954 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
955 zone_create)
957 request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
958 dnsserver.DNS_ZONE_REQUEST_PRIMARY)
959 _, zones = self.conn.DnssrvComplexOperation2(client_version,
961 self.server,
962 None,
963 'EnumZones',
964 dnsserver.DNSSRV_TYPEID_DWORD,
965 request_filter)
966 self.assertEqual(1, zones.dwZoneCount)
968 # Delete zone
969 self.conn.DnssrvOperation2(client_version,
971 self.server,
972 rev_zone,
974 'DeleteZoneFromDs',
975 dnsserver.DNSSRV_TYPEID_NULL,
976 None)
978 typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
980 self.server,
981 None,
982 'EnumZones',
983 dnsserver.DNSSRV_TYPEID_DWORD,
984 request_filter)
985 self.assertEqual(0, zones.dwZoneCount)
987 def test_complexoperation2(self):
988 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
989 request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
990 dnsserver.DNS_ZONE_REQUEST_PRIMARY)
992 typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
994 self.server,
995 None,
996 'EnumZones',
997 dnsserver.DNSSRV_TYPEID_DWORD,
998 request_filter)
999 self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
1000 self.assertEqual(3, zones.dwZoneCount)
1002 request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
1003 dnsserver.DNS_ZONE_REQUEST_PRIMARY)
1004 typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
1006 self.server,
1007 None,
1008 'EnumZones',
1009 dnsserver.DNSSRV_TYPEID_DWORD,
1010 request_filter)
1011 self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
1012 self.assertEqual(0, zones.dwZoneCount)
1014 def test_enumrecords2(self):
1015 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1016 record_type = dnsp.DNS_TYPE_NS
1017 select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
1018 dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
1019 _, roothints = self.conn.DnssrvEnumRecords2(client_version,
1021 self.server,
1022 '..RootHints',
1023 '.',
1024 None,
1025 record_type,
1026 select_flags,
1027 None,
1028 None)
1029 self.assertEqual(14, roothints.count) # 1 NS + 13 A records (a-m)
1031 def test_updaterecords2(self):
1032 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1033 record_type = dnsp.DNS_TYPE_A
1034 select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
1035 name = 'dummy'
1036 rec = ARecord('1.2.3.4')
1037 rec2 = ARecord('5.6.7.8')
1039 # Add record
1040 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1041 add_rec_buf.rec = rec
1042 self.conn.DnssrvUpdateRecord2(client_version,
1044 self.server,
1045 self.zone,
1046 name,
1047 add_rec_buf,
1048 None)
1050 _, result = self.conn.DnssrvEnumRecords2(client_version,
1052 self.server,
1053 self.zone,
1054 name,
1055 None,
1056 record_type,
1057 select_flags,
1058 None,
1059 None)
1060 self.assertEqual(1, result.count)
1061 self.assertEqual(1, result.rec[0].wRecordCount)
1062 self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
1063 self.assertEqual('1.2.3.4', result.rec[0].records[0].data)
1065 # Update record
1066 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1067 add_rec_buf.rec = rec2
1068 del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1069 del_rec_buf.rec = rec
1070 self.conn.DnssrvUpdateRecord2(client_version,
1072 self.server,
1073 self.zone,
1074 name,
1075 add_rec_buf,
1076 del_rec_buf)
1078 buflen, result = self.conn.DnssrvEnumRecords2(client_version,
1080 self.server,
1081 self.zone,
1082 name,
1083 None,
1084 record_type,
1085 select_flags,
1086 None,
1087 None)
1088 self.assertEqual(1, result.count)
1089 self.assertEqual(1, result.rec[0].wRecordCount)
1090 self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
1091 self.assertEqual('5.6.7.8', result.rec[0].records[0].data)
1093 # Delete record
1094 del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1095 del_rec_buf.rec = rec2
1096 self.conn.DnssrvUpdateRecord2(client_version,
1098 self.server,
1099 self.zone,
1100 name,
1101 None,
1102 del_rec_buf)
1104 self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2,
1105 client_version,
1107 self.server,
1108 self.zone,
1109 name,
1110 None,
1111 record_type,
1112 select_flags,
1113 None,
1114 None)
1116 # The following tests do not pass against Samba because the owner and
1117 # group are not consistent with Windows, as well as some ACEs.
1119 # The following ACE are also required for 2012R2:
1121 # (OA;CIIO;WP;ea1b7b93-5e48-46d5-bc6c-4df4fda78a35;bf967a86-0de6-11d0-a285-00aa003049e2;PS)
1122 # (OA;OICI;RPWP;3f78c3e5-f79a-46bd-a0b8-9d18116ddc79;;PS)"
1124 # [TPM + Allowed-To-Act-On-Behalf-Of-Other-Identity]
1125 def test_security_descriptor_msdcs_zone(self):
1127 Make sure that security descriptors of the msdcs zone is
1128 as expected.
1131 zones = self.samdb.search(base="DC=ForestDnsZones,%s" % self.samdb.get_default_basedn(),
1132 scope=ldb.SCOPE_SUBTREE,
1133 expression="(&(objectClass=dnsZone)(name=_msdcs*))",
1134 attrs=["nTSecurityDescriptor", "objectClass"])
1135 self.assertEqual(len(zones), 1)
1136 self.assertIn("nTSecurityDescriptor", zones[0])
1137 tmp = zones[0]["nTSecurityDescriptor"][0]
1138 utils = sd_utils.SDUtils(self.samdb)
1139 sd = ndr_unpack(security.descriptor, tmp)
1141 domain_sid = security.dom_sid(self.samdb.get_domain_sid())
1143 res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
1144 expression="(sAMAccountName=DnsAdmins)",
1145 attrs=["objectSid"])
1147 dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
1149 packed_sd = descriptor.sddl2binary("O:SYG:BA"
1150 "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
1151 "(A;;CC;;;AU)"
1152 "(A;;RPLCLORC;;;WD)"
1153 "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
1154 "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1155 domain_sid, {"DnsAdmins": dns_admin})
1156 expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
1158 diff = descriptor.get_diff_sds(expected_sd, sd, domain_sid)
1159 self.assertEqual(diff, '', "SD of msdcs zone different to expected.\n"
1160 "Difference was:\n%s\nExpected: %s\nGot: %s" %
1161 (diff, expected_sd.as_sddl(utils.domain_sid),
1162 sd.as_sddl(utils.domain_sid)))
1164 def test_security_descriptor_forest_zone(self):
1166 Make sure that security descriptors of forest dns zones are
1167 as expected.
1169 forest_zone = "test_forest_zone"
1170 zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1171 zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
1172 zone_create_info.fAging = 0
1173 zone_create_info.fDsIntegrated = 1
1174 zone_create_info.fLoadExisting = 1
1176 zone_create_info.pszZoneName = forest_zone
1177 zone_create_info.dwDpFlags = dnsserver.DNS_DP_FOREST_DEFAULT
1179 self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1181 self.server,
1182 None,
1184 'ZoneCreate',
1185 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1186 zone_create_info)
1188 partition_dn = self.samdb.get_default_basedn()
1189 partition_dn.add_child("DC=ForestDnsZones")
1190 zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
1191 expression="(name=%s)" % forest_zone,
1192 attrs=["nTSecurityDescriptor"])
1193 self.assertEqual(len(zones), 1)
1194 current_dn = zones[0].dn
1195 self.assertIn("nTSecurityDescriptor", zones[0])
1196 tmp = zones[0]["nTSecurityDescriptor"][0]
1197 utils = sd_utils.SDUtils(self.samdb)
1198 sd = ndr_unpack(security.descriptor, tmp)
1200 domain_sid = security.dom_sid(self.samdb.get_domain_sid())
1202 res = self.samdb.search(base=self.samdb.get_default_basedn(),
1203 scope=ldb.SCOPE_SUBTREE,
1204 expression="(sAMAccountName=DnsAdmins)",
1205 attrs=["objectSid"])
1207 dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
1209 packed_sd = descriptor.sddl2binary("O:DAG:DA"
1210 "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
1211 "(A;;CC;;;AU)"
1212 "(A;;RPLCLORC;;;WD)"
1213 "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
1214 "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1215 domain_sid, {"DnsAdmins": dns_admin})
1216 expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
1218 packed_msdns = descriptor.get_dns_forest_microsoft_dns_descriptor(domain_sid,
1219 {"DnsAdmins": dns_admin})
1220 expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
1222 packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
1223 expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
1224 packed_part_sd))
1225 try:
1226 msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1227 security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1228 (msdns_dn.get_linearized(), expected_msdns_sd),
1229 (partition_dn.get_linearized(), expected_part_sd)]
1231 for (key, sec_desc) in security_desc_dict:
1232 zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1233 attrs=["nTSecurityDescriptor"])
1234 self.assertIn("nTSecurityDescriptor", zones[0])
1235 tmp = zones[0]["nTSecurityDescriptor"][0]
1236 utils = sd_utils.SDUtils(self.samdb)
1238 sd = ndr_unpack(security.descriptor, tmp)
1239 diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1241 self.assertEqual(diff, '', "Security descriptor of forest DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1242 % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))
1244 finally:
1245 self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1247 self.server,
1248 forest_zone,
1250 'DeleteZoneFromDs',
1251 dnsserver.DNSSRV_TYPEID_NULL,
1252 None)
1254 def test_security_descriptor_domain_zone(self):
1256 Make sure that security descriptors of domain dns zones are
1257 as expected.
1260 partition_dn = self.samdb.get_default_basedn()
1261 partition_dn.add_child("DC=DomainDnsZones")
1262 zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
1263 expression="(name=%s)" % self.custom_zone,
1264 attrs=["nTSecurityDescriptor"])
1265 self.assertEqual(len(zones), 1)
1266 current_dn = zones[0].dn
1267 self.assertIn("nTSecurityDescriptor", zones[0])
1268 tmp = zones[0]["nTSecurityDescriptor"][0]
1269 utils = sd_utils.SDUtils(self.samdb)
1270 sd = ndr_unpack(security.descriptor, tmp)
1271 sddl = sd.as_sddl(utils.domain_sid)
1273 domain_sid = security.dom_sid(self.samdb.get_domain_sid())
1275 res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
1276 expression="(sAMAccountName=DnsAdmins)",
1277 attrs=["objectSid"])
1279 dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
1281 packed_sd = descriptor.sddl2binary("O:DAG:DA"
1282 "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
1283 "(A;;CC;;;AU)"
1284 "(A;;RPLCLORC;;;WD)"
1285 "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
1286 "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1287 domain_sid, {"DnsAdmins": dns_admin})
1288 expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
1290 packed_msdns = descriptor.get_dns_domain_microsoft_dns_descriptor(domain_sid,
1291 {"DnsAdmins": dns_admin})
1292 expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
1294 packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
1295 expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
1296 packed_part_sd))
1298 msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1299 security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1300 (msdns_dn.get_linearized(), expected_msdns_sd),
1301 (partition_dn.get_linearized(), expected_part_sd)]
1303 for (key, sec_desc) in security_desc_dict:
1304 zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1305 attrs=["nTSecurityDescriptor"])
1306 self.assertIn("nTSecurityDescriptor", zones[0])
1307 tmp = zones[0]["nTSecurityDescriptor"][0]
1308 utils = sd_utils.SDUtils(self.samdb)
1310 sd = ndr_unpack(security.descriptor, tmp)
1311 diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1313 self.assertEqual(diff, '', "Security descriptor of domain DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1314 % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))