1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dcerpc.dnsserver"""
23 from samba
.auth
import system_session
24 from samba
.samdb
import SamDB
25 from samba
.ndr
import ndr_unpack
26 from samba
.dcerpc
import dnsp
, dnsserver
, security
27 from samba
.tests
import RpcInterfaceTestCase
, env_get_var_value
28 from samba
.dnsserver
import record_from_string
, flag_from_string
, ARecord
29 from samba
import sd_utils
, descriptor
30 from samba
import WERRORError
, werror
33 class DnsserverTests(RpcInterfaceTestCase
):
39 good_dns
= ["SAMDOM.EXAMPLE.COM",
41 "%sEXAMPLE.COM" % ("1." * 100),
52 "SAMDOM..EXAMPLE.COM"]
54 good_mx
= ["SAMDOM.EXAMPLE.COM 65535"]
57 good_srv
= ["SAMDOM.EXAMPLE.COM 65535 65535 65535"]
60 for bad_dn
in bad_dns
:
61 bad_mx
.append("%s 1" % bad_dn
)
62 bad_srv
.append("%s 0 0 0" % bad_dn
)
63 for good_dn
in good_dns
:
64 good_mx
.append("%s 1" % good_dn
)
65 good_srv
.append("%s 0 0 0" % good_dn
)
70 "AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
71 "0000:0000:0000:0000:0000:0000:0000:0000",
72 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
74 "1234:1234:1234:1234:1234::",
75 "1234:5678:9ABC:DEF0::",
77 "1234::5678:9ABC:0000:0000:0000:0000",
86 "TXT": ["text", "", "@#!", "\n"]
90 "A": ["192.168.0.500",
91 "255.255.255.255/32"],
92 "AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
93 "0000:0000:0000:0000:0000:0000:0000:0000/1",
94 "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
95 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
96 "1234:5678:9ABC:DEF0:1234:5678:9ABC",
105 # Because we use uint16_t for these numbers, we can't
106 # actually create these records.
107 invalid_mx
= ["SAMDOM.EXAMPLE.COM -1",
108 "SAMDOM.EXAMPLE.COM 65536",
109 "%s 1" % ("A" * 256)]
110 invalid_srv
= ["SAMDOM.EXAMPLE.COM 0 65536 0",
111 "SAMDOM.EXAMPLE.COM 0 0 65536",
112 "SAMDOM.EXAMPLE.COM 65536 0 0"]
113 cls
.invalid_records
= {
120 self
.server
= os
.environ
["DC_SERVER"]
121 self
.zone
= env_get_var_value("REALM").lower()
122 self
.conn
= dnsserver
.dnsserver("ncacn_ip_tcp:%s[sign]" % (self
.server
),
124 self
.get_credentials())
126 self
.samdb
= SamDB(url
="ldap://%s" % os
.environ
["DC_SERVER_IP"],
127 lp
=self
.get_loadparm(),
128 session_info
=system_session(),
129 credentials
=self
.get_credentials())
131 self
.custom_zone
= "zone"
132 zone_create_info
= dnsserver
.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
133 zone_create_info
.pszZoneName
= self
.custom_zone
134 zone_create_info
.dwZoneType
= dnsp
.DNS_ZONE_TYPE_PRIMARY
135 zone_create_info
.fAging
= 0
136 zone_create_info
.fDsIntegrated
= 1
137 zone_create_info
.fLoadExisting
= 1
138 zone_create_info
.dwDpFlags
= dnsserver
.DNS_DP_DOMAIN_DEFAULT
140 self
.conn
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
146 dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
,
150 self
.conn
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
156 dnsserver
.DNSSRV_TYPEID_NULL
,
160 def test_enum_is_sorted(self
):
162 Confirm the zone is sorted
165 record_str
= "192.168.50.50"
166 record_type_str
= "A"
167 self
.add_record(self
.custom_zone
, "atestrecord-1", record_type_str
, record_str
)
168 self
.add_record(self
.custom_zone
, "atestrecord-2", record_type_str
, record_str
)
169 self
.add_record(self
.custom_zone
, "atestrecord-3", record_type_str
, record_str
)
170 self
.add_record(self
.custom_zone
, "atestrecord-4", record_type_str
, record_str
)
171 self
.add_record(self
.custom_zone
, "atestrecord-0", record_type_str
, record_str
)
173 # This becomes an extra A on the zone itself by server-side magic
174 self
.add_record(self
.custom_zone
, self
.custom_zone
, record_type_str
, record_str
)
176 _
, result
= self
.conn
.DnssrvEnumRecords2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
182 flag_from_string(record_type_str
),
183 dnsserver
.DNS_RPC_VIEW_AUTHORITY_DATA
,
187 self
.assertEqual(len(result
.rec
), 6)
188 self
.assertEqual(result
.rec
[0].dnsNodeName
.str, "")
189 self
.assertEqual(result
.rec
[1].dnsNodeName
.str, "atestrecord-0")
190 self
.assertEqual(result
.rec
[2].dnsNodeName
.str, "atestrecord-1")
191 self
.assertEqual(result
.rec
[3].dnsNodeName
.str, "atestrecord-2")
192 self
.assertEqual(result
.rec
[4].dnsNodeName
.str, "atestrecord-3")
193 self
.assertEqual(result
.rec
[5].dnsNodeName
.str, "atestrecord-4")
195 def test_enum_is_sorted_with_zone_dup(self
):
197 Confirm the zone is sorted
200 record_str
= "192.168.50.50"
201 record_type_str
= "A"
202 self
.add_record(self
.custom_zone
, "atestrecord-1", record_type_str
, record_str
)
203 self
.add_record(self
.custom_zone
, "atestrecord-2", record_type_str
, record_str
)
204 self
.add_record(self
.custom_zone
, "atestrecord-3", record_type_str
, record_str
)
205 self
.add_record(self
.custom_zone
, "atestrecord-4", record_type_str
, record_str
)
206 self
.add_record(self
.custom_zone
, "atestrecord-0", record_type_str
, record_str
)
208 # This triggers a bug in old Samba
209 self
.add_record(self
.custom_zone
, self
.custom_zone
+ "1", record_type_str
, record_str
)
211 dn
, record
= self
.get_record_from_db(self
.custom_zone
, self
.custom_zone
+ "1")
213 new_dn
= ldb
.Dn(self
.samdb
, str(dn
))
214 new_dn
.set_component(0, "dc", self
.custom_zone
)
215 self
.samdb
.rename(dn
, new_dn
)
217 _
, result
= self
.conn
.DnssrvEnumRecords2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
223 flag_from_string(record_type_str
),
224 dnsserver
.DNS_RPC_VIEW_AUTHORITY_DATA
,
228 self
.assertEqual(len(result
.rec
), 7)
229 self
.assertEqual(result
.rec
[0].dnsNodeName
.str, "")
230 self
.assertEqual(result
.rec
[1].dnsNodeName
.str, "atestrecord-0")
231 self
.assertEqual(result
.rec
[2].dnsNodeName
.str, "atestrecord-1")
232 self
.assertEqual(result
.rec
[3].dnsNodeName
.str, "atestrecord-2")
233 self
.assertEqual(result
.rec
[4].dnsNodeName
.str, "atestrecord-3")
234 self
.assertEqual(result
.rec
[5].dnsNodeName
.str, "atestrecord-4")
236 # Windows doesn't reload the zone fast enough, but doesn't
237 # have the bug anyway, it will sort last on both names (where
239 if result
.rec
[6].dnsNodeName
.str != (self
.custom_zone
+ "1"):
240 self
.assertEqual(result
.rec
[6].dnsNodeName
.str, self
.custom_zone
)
242 def test_enum_is_sorted_children_prefix_first(self
):
244 Confirm the zone returns the selected prefix first but no more
245 as Samba is flappy for the full sort
248 record_str
= "192.168.50.50"
249 record_type_str
= "A"
250 self
.add_record(self
.custom_zone
, "atestrecord-1.a.b", record_type_str
, record_str
)
251 self
.add_record(self
.custom_zone
, "atestrecord-2.a.b", record_type_str
, record_str
)
252 self
.add_record(self
.custom_zone
, "atestrecord-3.a.b", record_type_str
, record_str
)
253 self
.add_record(self
.custom_zone
, "atestrecord-4.a.b", record_type_str
, record_str
)
254 self
.add_record(self
.custom_zone
, "atestrecord-0.a.b", record_type_str
, record_str
)
256 # Not expected to be returned
257 self
.add_record(self
.custom_zone
, "atestrecord-0.b.b", record_type_str
, record_str
)
259 _
, result
= self
.conn
.DnssrvEnumRecords2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
265 flag_from_string(record_type_str
),
266 dnsserver
.DNS_RPC_VIEW_AUTHORITY_DATA
,
270 self
.assertEqual(len(result
.rec
), 6)
271 self
.assertEqual(result
.rec
[0].dnsNodeName
.str, "")
273 def test_enum_is_sorted_children(self
):
275 Confirm the zone is sorted
278 record_str
= "192.168.50.50"
279 record_type_str
= "A"
280 self
.add_record(self
.custom_zone
, "atestrecord-1.a.b", record_type_str
, record_str
)
281 self
.add_record(self
.custom_zone
, "atestrecord-2.a.b", record_type_str
, record_str
)
282 self
.add_record(self
.custom_zone
, "atestrecord-3.a.b", record_type_str
, record_str
)
283 self
.add_record(self
.custom_zone
, "atestrecord-4.a.b", record_type_str
, record_str
)
284 self
.add_record(self
.custom_zone
, "atestrecord-0.a.b", record_type_str
, record_str
)
286 # Not expected to be returned
287 self
.add_record(self
.custom_zone
, "atestrecord-0.b.b", record_type_str
, record_str
)
289 _
, result
= self
.conn
.DnssrvEnumRecords2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
295 flag_from_string(record_type_str
),
296 dnsserver
.DNS_RPC_VIEW_AUTHORITY_DATA
,
300 self
.assertEqual(len(result
.rec
), 6)
301 self
.assertEqual(result
.rec
[0].dnsNodeName
.str, "")
302 self
.assertEqual(result
.rec
[1].dnsNodeName
.str, "atestrecord-0")
303 self
.assertEqual(result
.rec
[2].dnsNodeName
.str, "atestrecord-1")
304 self
.assertEqual(result
.rec
[3].dnsNodeName
.str, "atestrecord-2")
305 self
.assertEqual(result
.rec
[4].dnsNodeName
.str, "atestrecord-3")
306 self
.assertEqual(result
.rec
[5].dnsNodeName
.str, "atestrecord-4")
308 # This test fails against Samba (but passes against Windows),
309 # because Samba does not return the record when we enum records.
310 # Records can be given DNS_RANK_NONE when the zone they are in
311 # does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be
312 # deleted, however, we do not consider this urgent to fix and
313 # so this test is a knownfail.
314 def test_rank_none(self
):
316 See what happens when we set a record's rank to
320 record_str
= "192.168.50.50"
321 record_type_str
= "A"
322 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record_str
)
324 dn
, record
= self
.get_record_from_db(self
.custom_zone
, "testrecord")
325 record
.rank
= 0 # DNS_RANK_NONE
326 res
= self
.samdb
.dns_replace_by_dn(dn
, [record
])
328 self
.fail("Unable to update dns record to have DNS_RANK_NONE.")
330 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str
)
331 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record_str
, assertion
=False)
332 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, record_str
)
333 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str
, 0)
335 def test_dns_tombstoned_zero_timestamp(self
):
336 """What happens with a zero EntombedTime tombstone?"""
337 # A zero-timestamp tombstone record has a special meaning for
338 # dns_common_replace(), which is the function exposed by
339 # samdb.dns_replace_by_dn(), and which is *NOT* a general
340 # purpose record replacement function but a specialised part
341 # of the dns update mechanism (for both DLZ and internal).
343 # In the earlier stages of handling updates, a record that
344 # needs to be deleted is set to be a tombstone with a zero
345 # timestamp. dns_common_replace() notices this specific
346 # marker, and if there are no other records, marks the node as
347 # tombstoned, in the process adding a "real" tombstone.
349 # If the tombstone has a non-zero timestamp, as you'll see in
350 # the next test, dns_common_replace will decide that the node
351 # is already tombstoned, and that no action needs to be taken.
353 # This test has worked historically, entirely by accident, as
354 # changing the wType appears to
356 record_str
= "192.168.50.50"
357 self
.add_record(self
.custom_zone
, "testrecord", 'A', record_str
)
359 dn
, record
= self
.get_record_from_db(self
.custom_zone
, "testrecord")
360 record
.wType
= dnsp
.DNS_TYPE_TOMBSTONE
362 self
.samdb
.dns_replace_by_dn(dn
, [record
])
364 # there should be no A record, and one TOMBSTONE record.
365 self
.assert_num_records(self
.custom_zone
, "testrecord", 'A', 0)
366 # we can't make assertions about the tombstone count based on
367 # RPC calls, as there are no tombstones in RPCs (there is
368 # "DNS_TYPE_ZERO" instead). Nor do tombstones show up if we
370 self
.assert_num_records(self
.custom_zone
, "testrecord", 'ALL', 0)
372 # But we can use LDAP:
373 records
= self
.ldap_get_records(self
.custom_zone
, "testrecord")
374 self
.assertEqual(len(records
), 1)
376 self
.assertEqual(r
.wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
377 self
.assertGreater(r
.data
, 1e17
) # ~ October 1916
379 # this should fail, because no A records.
380 self
.delete_record(self
.custom_zone
, "testrecord", 'A', record_str
,
383 def test_dns_tombstoned_nonzero_timestamp(self
):
384 """See what happens when we set a record to be tombstoned with an
385 EntombedTime timestamp.
387 # Because this tombstone has a non-zero EntombedTime,
388 # dns_common_replace() will decide the node was already
389 # tombstoned and there is nothing to be done, leaving the A
390 # record where it was.
392 record_str
= "192.168.50.50"
393 self
.add_record(self
.custom_zone
, "testrecord", 'A', record_str
)
395 dn
, record
= self
.get_record_from_db(self
.custom_zone
, "testrecord")
396 record
.wType
= dnsp
.DNS_TYPE_TOMBSTONE
397 record
.data
= 0x123456789A
398 self
.samdb
.dns_replace_by_dn(dn
, [record
])
400 # there should be the A record and no TOMBSTONE
401 self
.assert_num_records(self
.custom_zone
, "testrecord", 'A', 1)
402 self
.assert_num_records(self
.custom_zone
, "testrecord", 'TOMBSTONE', 0)
403 # this should succeed
404 self
.delete_record(self
.custom_zone
, "testrecord", 'A', record_str
,
406 self
.assert_num_records(self
.custom_zone
, "testrecord", 'TOMBSTONE', 0)
407 self
.assert_num_records(self
.custom_zone
, "testrecord", 'A', 0)
409 def get_record_from_db(self
, zone_name
, record_name
):
411 Returns (dn of record, record)
414 zones
= self
.samdb
.search(base
="DC=DomainDnsZones,%s" % self
.samdb
.get_default_basedn(), scope
=ldb
.SCOPE_SUBTREE
,
415 expression
="(objectClass=dnsZone)",
420 if "DC=%s," % zone_name
in str(zone
.dn
):
425 raise AssertionError("Couldn't find zone '%s'." % zone_name
)
427 records
= self
.samdb
.search(base
=zone_dn
, scope
=ldb
.SCOPE_SUBTREE
,
428 expression
="(objectClass=dnsNode)",
431 for old_packed_record
in records
:
432 if record_name
in str(old_packed_record
.dn
):
433 rec
= ndr_unpack(dnsp
.DnssrvRpcRecord
, old_packed_record
["dnsRecord"][0])
434 return (old_packed_record
.dn
, rec
)
436 def ldap_get_records(self
, zone
, name
):
437 zone_dn
= (f
"DC={zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
438 f
"{self.samdb.get_default_basedn()}")
440 expr
= f
"(&(objectClass=dnsNode)(name={name}))"
441 nodes
= self
.samdb
.search(base
=zone_dn
,
442 scope
=ldb
.SCOPE_SUBTREE
,
446 records
= nodes
[0].get('dnsRecord')
447 return [ndr_unpack(dnsp
.DnssrvRpcRecord
, r
) for r
in records
]
449 def test_duplicate_matching(self
):
451 Make sure that records which should be distinct from each other or duplicate
452 to each other behave as expected.
455 distinct_dns
= [("SAMDOM.EXAMPLE.COM",
457 "EXAMPLE.COM", "SAMDOM.EXAMPLE")]
458 duplicate_dns
= [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"),
459 ("EXAMPLE.", "EXAMPLE")]
461 # Every tuple has entries which should be considered duplicate to one another.
463 "AAAA": [("AAAA::", "aaaa::"),
464 ("AAAA::", "AAAA:0000::"),
465 ("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"),
466 ("AAAA::", "AAAA:0:0:0:0:0:0:0"),
468 ("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")],
471 # Every tuple has entries which should be considered distinct from one another.
473 "A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")],
474 "AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"),
475 ("1000::", "::1000"),
476 ("::1", "::11", "::1111"),
477 ("1234::", "0234::")],
478 "SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1",
479 "SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")],
480 "MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")],
481 "TXT": [("A RECORD", "B RECORD", "a record")]
484 for record_type_str
in ("PTR", "CNAME", "NS"):
485 distinct
[record_type_str
] = distinct_dns
486 duplicates
[record_type_str
] = duplicate_dns
488 for record_type_str
in duplicates
:
489 for duplicate_tuple
in duplicates
[record_type_str
]:
490 # Attempt to add duplicates and make sure that all after the first fails
491 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, duplicate_tuple
[0])
492 for record
in duplicate_tuple
:
493 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record
, assertion
=False)
494 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str
)
495 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, duplicate_tuple
[0])
497 # Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds
498 for record
in duplicate_tuple
:
499 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, duplicate_tuple
[0])
500 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, record
)
502 for record_type_str
in distinct
:
503 for distinct_tuple
in distinct
[record_type_str
]:
504 # Attempt to add distinct and make sure that they all succeed within a tuple
506 for record
in distinct_tuple
:
509 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record
)
510 # All records should have been added.
511 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str
, expected_num
=i
)
512 except AssertionError as e
:
513 raise AssertionError("Failed to add %s, which should be distinct from all others in the set. "
514 "Original error: %s\nDistinct set: %s." % (record
, e
, distinct_tuple
))
515 for record
in distinct_tuple
:
516 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, record
)
517 # CNAMEs should not have been added, since they conflict.
518 if record_type_str
== 'CNAME':
521 # Add the first distinct and attempt to remove all of the others, making sure this fails
522 # Windows fails this test. This is probably due to weird tombstoning behavior.
523 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, distinct_tuple
[0])
524 for record
in distinct_tuple
:
525 if record
== distinct_tuple
[0]:
528 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, record
, assertion
=False)
529 except AssertionError as e
:
530 raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s"
531 % (distinct_tuple
[0], record
, e
))
532 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, distinct_tuple
[0])
534 def test_accept_valid_commands(self
):
536 Make sure that we can add, update and delete a variety
539 for record_type_str
in self
.good_records
:
540 for record_str
in self
.good_records
[record_type_str
]:
541 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record_str
)
542 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str
)
543 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, record_str
)
545 def check_params(self
, wDataLength
, rank
, flags
, dwTtlSeconds
, dwReserved
, data
,
546 wType
, dwTimeStamp
=0, zone
="zone", rec_name
="testrecord"):
547 res
= self
.get_record_from_db(zone
, rec_name
)
548 self
.assertIsNotNone(res
, "Expected record %s but was not found over LDAP." % data
)
550 self
.assertEqual(wDataLength
, rec
.wDataLength
, "Unexpected data length for record %s. Got %s, expected %s." % (data
, rec
.wDataLength
, wDataLength
))
551 self
.assertEqual(rank
, rec
.rank
, "Unexpected rank for record %s. Got %s, expected %s." % (data
, rec
.rank
, rank
))
552 self
.assertEqual(flags
, rec
.flags
, "Unexpected flags for record %s. Got %s, expected %s." % (data
, rec
.flags
, flags
))
553 self
.assertEqual(dwTtlSeconds
, rec
.dwTtlSeconds
, "Unexpected time to live for record %s. Got %s, expected %s." % (data
, rec
.dwTtlSeconds
, dwTtlSeconds
))
554 self
.assertEqual(dwReserved
, rec
.dwReserved
, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data
, rec
.dwReserved
, dwReserved
))
555 self
.assertEqual(data
.lower(), rec
.data
.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data
, rec
.data
.lower(), data
.lower()))
556 self
.assertEqual(wType
, rec
.wType
, "Unexpected wType for record %s. Got %s, expected %s." % (data
, rec
.wType
, wType
))
557 self
.assertEqual(dwTimeStamp
, rec
.dwTimeStamp
, "Unexpected timestamp for record %s. Got %s, expected %s." % (data
, rec
.dwTimeStamp
, dwTimeStamp
))
559 def test_record_params(self
):
561 Make sure that, when we add records to the database,
562 they're added with reasonable parameters.
564 self
.add_record(self
.custom_zone
, "testrecord", "A", "192.168.50.50")
565 self
.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1)
566 self
.delete_record(self
.custom_zone
, "testrecord", "A", "192.168.50.50")
567 self
.add_record(self
.custom_zone
, "testrecord", "AAAA", "AAAA:AAAA::")
568 self
.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28)
569 self
.delete_record(self
.custom_zone
, "testrecord", "AAAA", "AAAA:AAAA::")
570 self
.add_record(self
.custom_zone
, "testrecord", "CNAME", "cnamedest")
571 self
.check_params(13, 240, 0, 900, 0, "cnamedest", 5)
572 self
.delete_record(self
.custom_zone
, "testrecord", "CNAME", "cnamedest")
574 def test_reject_invalid_commands(self
):
576 Make sure that we can't add a variety of invalid records,
577 and that we can't update valid records to invalid ones.
580 for record_type_str
in self
.bad_records
:
581 for record_str
in self
.bad_records
[record_type_str
]:
582 # Attempt to add the bad record, which should fail. Then, attempt to query for and delete
583 # it. Since it shouldn't exist, these should fail too.
585 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record_str
, assertion
=False)
586 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str
, expected_num
=0)
587 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, record_str
, assertion
=False)
588 except AssertionError as e
:
590 num_failures
= num_failures
+ 1
592 # Also try to update valid records to invalid ones, making sure this fails
593 for record_type_str
in self
.bad_records
:
594 for record_str
in self
.bad_records
[record_type_str
]:
595 good_record_str
= self
.good_records
[record_type_str
][0]
596 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, good_record_str
)
598 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record_str
, assertion
=False)
599 except AssertionError as e
:
601 num_failures
= num_failures
+ 1
602 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, good_record_str
)
604 self
.assertTrue(num_failures
== 0, "Failed to reject invalid commands. Total failures: %d." % num_failures
)
606 def test_add_duplicate_different_type(self
):
608 Attempt to add some values which have the same name as
609 existing ones, just a different type.
612 for record_type_str_1
in self
.good_records
:
613 record1
= self
.good_records
[record_type_str_1
][0]
614 self
.add_record(self
.custom_zone
, "testrecord", record_type_str_1
, record1
)
615 for record_type_str_2
in self
.good_records
:
616 if record_type_str_1
== record_type_str_2
:
619 record2
= self
.good_records
[record_type_str_2
][0]
621 has_a
= record_type_str_1
== 'A' or record_type_str_2
== 'A'
622 has_aaaa
= record_type_str_1
== 'AAAA' or record_type_str_2
== 'AAAA'
623 has_cname
= record_type_str_1
== 'CNAME' or record_type_str_2
== 'CNAME'
624 has_ptr
= record_type_str_1
== 'PTR' or record_type_str_2
== 'PTR'
625 has_mx
= record_type_str_1
== 'MX' or record_type_str_2
== 'MX'
626 has_srv
= record_type_str_1
== 'SRV' or record_type_str_2
== 'SRV'
627 has_txt
= record_type_str_1
== 'TXT' or record_type_str_2
== 'TXT'
629 # If we attempt to add any record except A or AAAA when we already have an NS record,
630 # the add should fail.
632 if record_type_str_1
== 'NS' and not has_a
and not has_aaaa
:
634 # If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail.
635 if record_type_str_2
== 'CNAME' and (has_ptr
or has_mx
or has_a
or has_aaaa
):
637 # If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail.
638 # If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail.
639 if has_cname
and (has_a
or has_aaaa
or has_srv
or has_txt
):
643 self
.add_record(self
.custom_zone
, "testrecord", record_type_str_2
, record2
)
645 num_failures
= num_failures
+ 1
646 print("Expected error when adding %s while a %s existed."
647 % (record_type_str_2
, record_type_str_1
))
648 except AssertionError as e
:
650 num_failures
= num_failures
+ 1
651 print("Didn't expect error when adding %s while a %s existed."
652 % (record_type_str_2
, record_type_str_1
))
655 # In the "normal" case, we expect the add to work and us to have one of each type of record afterwards.
656 expected_num_type_1
= 1
657 expected_num_type_2
= 1
659 # If we have an MX record, a PTR record should replace it when added.
660 # If we have a PTR record, an MX record should replace it when added.
661 if has_ptr
and has_mx
:
662 expected_num_type_1
= 0
664 # If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added.
665 if (has_cname
or has_srv
or has_txt
) and (record_type_str_2
== 'PTR' or record_type_str_2
== 'MX'):
666 expected_num_type_1
= 0
668 if (record_type_str_1
== 'NS' and (has_a
or has_aaaa
)):
669 expected_num_type_2
= 0
672 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str_1
, expected_num
=expected_num_type_1
)
673 except AssertionError as e
:
674 num_failures
= num_failures
+ 1
675 print("Expected %s %s records after adding a %s record and a %s record already existed."
676 % (expected_num_type_1
, record_type_str_1
, record_type_str_2
, record_type_str_1
))
678 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str_2
, expected_num
=expected_num_type_2
)
679 except AssertionError as e
:
680 num_failures
= num_failures
+ 1
681 print("Expected %s %s records after adding a %s record and a %s record already existed."
682 % (expected_num_type_2
, record_type_str_2
, record_type_str_2
, record_type_str_1
))
685 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str_2
, record2
)
686 except AssertionError as e
:
689 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str_1
, record1
)
691 self
.assertTrue(num_failures
== 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures
)
693 # Windows fails this test in the same way we do.
694 def _test_cname(self
):
696 Test some special properties of CNAME records.
699 # RFC 1912: When there is a CNAME record, there must not be any other records with the same alias
700 cname_record
= self
.good_records
["CNAME"][1]
701 self
.add_record(self
.custom_zone
, "testrecord", "CNAME", cname_record
)
703 for record_type_str
in self
.good_records
:
704 other_record
= self
.good_records
[record_type_str
][0]
705 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, other_record
, assertion
=False)
706 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str
, expected_num
=0)
708 # RFC 2181: MX & NS records must not be allowed to point to a CNAME alias
709 mx_record
= "testrecord 1"
710 ns_record
= "testrecord"
712 self
.add_record(self
.custom_zone
, "mxrec", "MX", mx_record
, assertion
=False)
713 self
.add_record(self
.custom_zone
, "nsrec", "NS", ns_record
, assertion
=False)
715 self
.delete_record(self
.custom_zone
, "testrecord", "CNAME", cname_record
)
717 def test_add_duplicate_value(self
):
719 Make sure that we can't add duplicate values of any type.
721 for record_type_str
in self
.good_records
:
722 record
= self
.good_records
[record_type_str
][0]
724 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record
)
725 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record
, assertion
=False)
726 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str
)
727 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, record
)
729 def test_add_similar_value(self
):
731 Attempt to add values with the same name and type in the same
732 zone. This should work, and should result in both values
733 existing (except with some types).
735 for record_type_str
in self
.good_records
:
736 for i
in range(1, len(self
.good_records
[record_type_str
])):
737 record1
= self
.good_records
[record_type_str
][i
- 1]
738 record2
= self
.good_records
[record_type_str
][i
]
740 if record_type_str
== 'CNAME':
742 # We expect CNAME records to override one another, as
743 # an alias can only map to one CNAME record.
744 # Also, on Windows, when the empty string is added and
745 # another record is added afterwards, the empty string
746 # will be silently overridden by the new one, so it
747 # fails this test for the empty string.
748 expected_num
= 1 if record_type_str
== 'CNAME' else 2
750 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record1
)
751 self
.add_record(self
.custom_zone
, "testrecord", record_type_str
, record2
)
752 self
.assert_num_records(self
.custom_zone
, "testrecord", record_type_str
, expected_num
=expected_num
)
753 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, record1
)
754 self
.delete_record(self
.custom_zone
, "testrecord", record_type_str
, record2
)
756 def assert_record(self
, zone
, name
, record_type_str
, expected_record_str
,
757 assertion
=True, client_version
=dnsserver
.DNS_CLIENT_VERSION_LONGHORN
):
759 Asserts whether or not the given record with the given type exists in the
763 _
, result
= self
.query_records(zone
, name
, record_type_str
)
764 except RuntimeError as e
:
766 raise AssertionError("Record '%s' of type '%s' was not present when it should have been."
767 % (expected_record_str
, record_type_str
))
772 for record
in result
.rec
[0].records
:
773 if record
.data
== expected_record_str
:
777 if found
and not assertion
:
778 raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str
, record_type_str
))
779 elif not found
and assertion
:
780 raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str
, record_type_str
))
782 def assert_num_records(self
, zone
, name
, record_type_str
, expected_num
=1,
783 client_version
=dnsserver
.DNS_CLIENT_VERSION_LONGHORN
):
785 Asserts that there are a given amount of records with the given type in
789 _
, result
= self
.query_records(zone
, name
, record_type_str
)
790 num_results
= len(result
.rec
[0].records
)
791 if not num_results
== expected_num
:
792 raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected."
793 % (num_results
, record_type_str
, name
, expected_num
))
795 if not expected_num
== 0:
796 raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected."
797 % (record_type_str
, name
, expected_num
))
799 def query_records(self
, zone
, name
, record_type_str
, client_version
=dnsserver
.DNS_CLIENT_VERSION_LONGHORN
):
800 return self
.conn
.DnssrvEnumRecords2(client_version
,
806 flag_from_string(record_type_str
),
807 dnsserver
.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver
.DNS_RPC_VIEW_NO_CHILDREN
,
811 def add_record(self
, zone
, name
, record_type_str
, record_str
,
812 assertion
=True, client_version
=dnsserver
.DNS_CLIENT_VERSION_LONGHORN
):
814 Attempts to add a map from the given name to a record of the given type,
816 Also asserts whether or not the add was successful.
817 This can also update existing records if they have the same name.
819 record
= record_from_string(record_type_str
, record_str
, sep
=' ')
820 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
821 add_rec_buf
.rec
= record
824 self
.conn
.DnssrvUpdateRecord2(client_version
,
832 raise AssertionError("Successfully added record '%s' of type '%s', which should have failed."
833 % (record_str
, record_type_str
))
834 except RuntimeError as e
:
836 raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'."
837 % (record_str
, record_type_str
, str(e
)))
839 def delete_record(self
, zone
, name
, record_type_str
, record_str
,
840 assertion
=True, client_version
=dnsserver
.DNS_CLIENT_VERSION_LONGHORN
):
842 Attempts to delete a record with the given name, record and record type
844 Also asserts whether or not the deletion was successful.
846 record
= record_from_string(record_type_str
, record_str
, sep
=' ')
847 del_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
848 del_rec_buf
.rec
= record
851 self
.conn
.DnssrvUpdateRecord2(client_version
,
859 raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str
, record_type_str
))
860 except RuntimeError as e
:
862 raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str
, record_type_str
, str(e
)))
864 def test_query2(self
):
865 typeid
, result
= self
.conn
.DnssrvQuery2(dnsserver
.DNS_CLIENT_VERSION_W2K
,
870 self
.assertEqual(dnsserver
.DNSSRV_TYPEID_SERVER_INFO_W2K
, typeid
)
872 typeid
, result
= self
.conn
.DnssrvQuery2(dnsserver
.DNS_CLIENT_VERSION_DOTNET
,
877 self
.assertEqual(dnsserver
.DNSSRV_TYPEID_SERVER_INFO_DOTNET
, typeid
)
879 typeid
, result
= self
.conn
.DnssrvQuery2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
884 self
.assertEqual(dnsserver
.DNSSRV_TYPEID_SERVER_INFO
, typeid
)
887 # This test is to confirm that we do not support multizone operations,
888 # which are designated by a non-zero dwContext value (the 3rd argument
889 # to DnssrvOperation).
890 def test_operation_invalid(self
):
891 non_zone
= 'a-zone-that-does-not-exist'
892 typeid
= dnsserver
.DNSSRV_TYPEID_NAME_AND_PARAM
893 name_and_param
= dnsserver
.DNS_RPC_NAME_AND_PARAM()
894 name_and_param
.pszNodeName
= 'AllowUpdate'
895 name_and_param
.dwParam
= dnsp
.DNS_ZONE_UPDATE_SECURE
897 res
= self
.conn
.DnssrvOperation(self
.server
,
900 'ResetDwordProperty',
903 except WERRORError
as e
:
904 if e
.args
[0] == werror
.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
:
907 # We should always encounter a DOES_NOT_EXIST error.
910 # This test is to confirm that we do not support multizone operations,
911 # which are designated by a non-zero dwContext value (the 5th argument
912 # to DnssrvOperation2).
913 def test_operation2_invalid(self
):
914 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
915 non_zone
= 'a-zone-that-does-not-exist'
916 typeid
= dnsserver
.DNSSRV_TYPEID_NAME_AND_PARAM
917 name_and_param
= dnsserver
.DNS_RPC_NAME_AND_PARAM()
918 name_and_param
.pszNodeName
= 'AllowUpdate'
919 name_and_param
.dwParam
= dnsp
.DNS_ZONE_UPDATE_SECURE
921 res
= self
.conn
.DnssrvOperation2(client_version
,
926 'ResetDwordProperty',
929 except WERRORError
as e
:
930 if e
.args
[0] == werror
.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
:
933 # We should always encounter a DOES_NOT_EXIST error.
936 def test_operation2(self
):
937 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
938 rev_zone
= '1.168.192.in-addr.arpa'
940 zone_create
= dnsserver
.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
941 zone_create
.pszZoneName
= rev_zone
942 zone_create
.dwZoneType
= dnsp
.DNS_ZONE_TYPE_PRIMARY
943 zone_create
.fAllowUpdate
= dnsp
.DNS_ZONE_UPDATE_SECURE
944 zone_create
.fAging
= 0
945 zone_create
.dwDpFlags
= dnsserver
.DNS_DP_DOMAIN_DEFAULT
948 self
.conn
.DnssrvOperation2(client_version
,
954 dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
,
957 request_filter
= (dnsserver
.DNS_ZONE_REQUEST_REVERSE |
958 dnsserver
.DNS_ZONE_REQUEST_PRIMARY
)
959 _
, zones
= self
.conn
.DnssrvComplexOperation2(client_version
,
964 dnsserver
.DNSSRV_TYPEID_DWORD
,
966 self
.assertEqual(1, zones
.dwZoneCount
)
969 self
.conn
.DnssrvOperation2(client_version
,
975 dnsserver
.DNSSRV_TYPEID_NULL
,
978 typeid
, zones
= self
.conn
.DnssrvComplexOperation2(client_version
,
983 dnsserver
.DNSSRV_TYPEID_DWORD
,
985 self
.assertEqual(0, zones
.dwZoneCount
)
987 def test_complexoperation2(self
):
988 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
989 request_filter
= (dnsserver
.DNS_ZONE_REQUEST_FORWARD |
990 dnsserver
.DNS_ZONE_REQUEST_PRIMARY
)
992 typeid
, zones
= self
.conn
.DnssrvComplexOperation2(client_version
,
997 dnsserver
.DNSSRV_TYPEID_DWORD
,
999 self
.assertEqual(dnsserver
.DNSSRV_TYPEID_ZONE_LIST
, typeid
)
1000 self
.assertEqual(3, zones
.dwZoneCount
)
1002 request_filter
= (dnsserver
.DNS_ZONE_REQUEST_REVERSE |
1003 dnsserver
.DNS_ZONE_REQUEST_PRIMARY
)
1004 typeid
, zones
= self
.conn
.DnssrvComplexOperation2(client_version
,
1009 dnsserver
.DNSSRV_TYPEID_DWORD
,
1011 self
.assertEqual(dnsserver
.DNSSRV_TYPEID_ZONE_LIST
, typeid
)
1012 self
.assertEqual(0, zones
.dwZoneCount
)
1014 def test_enumrecords2(self
):
1015 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1016 record_type
= dnsp
.DNS_TYPE_NS
1017 select_flags
= (dnsserver
.DNS_RPC_VIEW_ROOT_HINT_DATA |
1018 dnsserver
.DNS_RPC_VIEW_ADDITIONAL_DATA
)
1019 _
, roothints
= self
.conn
.DnssrvEnumRecords2(client_version
,
1029 self
.assertEqual(14, roothints
.count
) # 1 NS + 13 A records (a-m)
1031 def test_updaterecords2(self
):
1032 client_version
= dnsserver
.DNS_CLIENT_VERSION_LONGHORN
1033 record_type
= dnsp
.DNS_TYPE_A
1034 select_flags
= dnsserver
.DNS_RPC_VIEW_AUTHORITY_DATA
1036 rec
= ARecord('1.2.3.4')
1037 rec2
= ARecord('5.6.7.8')
1040 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1041 add_rec_buf
.rec
= rec
1042 self
.conn
.DnssrvUpdateRecord2(client_version
,
1050 _
, result
= self
.conn
.DnssrvEnumRecords2(client_version
,
1060 self
.assertEqual(1, result
.count
)
1061 self
.assertEqual(1, result
.rec
[0].wRecordCount
)
1062 self
.assertEqual(dnsp
.DNS_TYPE_A
, result
.rec
[0].records
[0].wType
)
1063 self
.assertEqual('1.2.3.4', result
.rec
[0].records
[0].data
)
1066 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1067 add_rec_buf
.rec
= rec2
1068 del_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1069 del_rec_buf
.rec
= rec
1070 self
.conn
.DnssrvUpdateRecord2(client_version
,
1078 buflen
, result
= self
.conn
.DnssrvEnumRecords2(client_version
,
1088 self
.assertEqual(1, result
.count
)
1089 self
.assertEqual(1, result
.rec
[0].wRecordCount
)
1090 self
.assertEqual(dnsp
.DNS_TYPE_A
, result
.rec
[0].records
[0].wType
)
1091 self
.assertEqual('5.6.7.8', result
.rec
[0].records
[0].data
)
1094 del_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
1095 del_rec_buf
.rec
= rec2
1096 self
.conn
.DnssrvUpdateRecord2(client_version
,
1104 self
.assertRaises(RuntimeError, self
.conn
.DnssrvEnumRecords2
,
1116 # The following tests do not pass against Samba because the owner and
1117 # group are not consistent with Windows, as well as some ACEs.
1119 # The following ACE are also required for 2012R2:
1121 # (OA;CIIO;WP;ea1b7b93-5e48-46d5-bc6c-4df4fda78a35;bf967a86-0de6-11d0-a285-00aa003049e2;PS)
1122 # (OA;OICI;RPWP;3f78c3e5-f79a-46bd-a0b8-9d18116ddc79;;PS)"
1124 # [TPM + Allowed-To-Act-On-Behalf-Of-Other-Identity]
1125 def test_security_descriptor_msdcs_zone(self
):
1127 Make sure that security descriptors of the msdcs zone is
1131 zones
= self
.samdb
.search(base
="DC=ForestDnsZones,%s" % self
.samdb
.get_default_basedn(),
1132 scope
=ldb
.SCOPE_SUBTREE
,
1133 expression
="(&(objectClass=dnsZone)(name=_msdcs*))",
1134 attrs
=["nTSecurityDescriptor", "objectClass"])
1135 self
.assertEqual(len(zones
), 1)
1136 self
.assertIn("nTSecurityDescriptor", zones
[0])
1137 tmp
= zones
[0]["nTSecurityDescriptor"][0]
1138 utils
= sd_utils
.SDUtils(self
.samdb
)
1139 sd
= ndr_unpack(security
.descriptor
, tmp
)
1141 domain_sid
= security
.dom_sid(self
.samdb
.get_domain_sid())
1143 res
= self
.samdb
.search(base
=self
.samdb
.get_default_basedn(), scope
=ldb
.SCOPE_SUBTREE
,
1144 expression
="(sAMAccountName=DnsAdmins)",
1145 attrs
=["objectSid"])
1147 dns_admin
= str(ndr_unpack(security
.dom_sid
, res
[0]['objectSid'][0]))
1149 packed_sd
= descriptor
.sddl2binary("O:SYG:BA"
1150 "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
1152 "(A;;RPLCLORC;;;WD)"
1153 "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
1154 "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1155 domain_sid
, {"DnsAdmins": dns_admin
})
1156 expected_sd
= descriptor
.get_clean_sd(ndr_unpack(security
.descriptor
, packed_sd
))
1158 diff
= descriptor
.get_diff_sds(expected_sd
, sd
, domain_sid
)
1159 self
.assertEqual(diff
, '', "SD of msdcs zone different to expected.\n"
1160 "Difference was:\n%s\nExpected: %s\nGot: %s" %
1161 (diff
, expected_sd
.as_sddl(utils
.domain_sid
),
1162 sd
.as_sddl(utils
.domain_sid
)))
1164 def test_security_descriptor_forest_zone(self
):
1166 Make sure that security descriptors of forest dns zones are
1169 forest_zone
= "test_forest_zone"
1170 zone_create_info
= dnsserver
.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1171 zone_create_info
.dwZoneType
= dnsp
.DNS_ZONE_TYPE_PRIMARY
1172 zone_create_info
.fAging
= 0
1173 zone_create_info
.fDsIntegrated
= 1
1174 zone_create_info
.fLoadExisting
= 1
1176 zone_create_info
.pszZoneName
= forest_zone
1177 zone_create_info
.dwDpFlags
= dnsserver
.DNS_DP_FOREST_DEFAULT
1179 self
.conn
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1185 dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
,
1188 partition_dn
= self
.samdb
.get_default_basedn()
1189 partition_dn
.add_child("DC=ForestDnsZones")
1190 zones
= self
.samdb
.search(base
=partition_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1191 expression
="(name=%s)" % forest_zone
,
1192 attrs
=["nTSecurityDescriptor"])
1193 self
.assertEqual(len(zones
), 1)
1194 current_dn
= zones
[0].dn
1195 self
.assertIn("nTSecurityDescriptor", zones
[0])
1196 tmp
= zones
[0]["nTSecurityDescriptor"][0]
1197 utils
= sd_utils
.SDUtils(self
.samdb
)
1198 sd
= ndr_unpack(security
.descriptor
, tmp
)
1200 domain_sid
= security
.dom_sid(self
.samdb
.get_domain_sid())
1202 res
= self
.samdb
.search(base
=self
.samdb
.get_default_basedn(),
1203 scope
=ldb
.SCOPE_SUBTREE
,
1204 expression
="(sAMAccountName=DnsAdmins)",
1205 attrs
=["objectSid"])
1207 dns_admin
= str(ndr_unpack(security
.dom_sid
, res
[0]['objectSid'][0]))
1209 packed_sd
= descriptor
.sddl2binary("O:DAG:DA"
1210 "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
1212 "(A;;RPLCLORC;;;WD)"
1213 "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
1214 "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1215 domain_sid
, {"DnsAdmins": dns_admin
})
1216 expected_sd
= descriptor
.get_clean_sd(ndr_unpack(security
.descriptor
, packed_sd
))
1218 packed_msdns
= descriptor
.get_dns_forest_microsoft_dns_descriptor(domain_sid
,
1219 {"DnsAdmins": dns_admin
})
1220 expected_msdns_sd
= descriptor
.get_clean_sd(ndr_unpack(security
.descriptor
, packed_msdns
))
1222 packed_part_sd
= descriptor
.get_dns_partition_descriptor(domain_sid
)
1223 expected_part_sd
= descriptor
.get_clean_sd(ndr_unpack(security
.descriptor
,
1226 msdns_dn
= ldb
.Dn(self
.samdb
, "CN=MicrosoftDNS,%s" % str(partition_dn
))
1227 security_desc_dict
= [(current_dn
.get_linearized(), expected_sd
),
1228 (msdns_dn
.get_linearized(), expected_msdns_sd
),
1229 (partition_dn
.get_linearized(), expected_part_sd
)]
1231 for (key
, sec_desc
) in security_desc_dict
:
1232 zones
= self
.samdb
.search(base
=key
, scope
=ldb
.SCOPE_BASE
,
1233 attrs
=["nTSecurityDescriptor"])
1234 self
.assertIn("nTSecurityDescriptor", zones
[0])
1235 tmp
= zones
[0]["nTSecurityDescriptor"][0]
1236 utils
= sd_utils
.SDUtils(self
.samdb
)
1238 sd
= ndr_unpack(security
.descriptor
, tmp
)
1239 diff
= descriptor
.get_diff_sds(sec_desc
, sd
, domain_sid
)
1241 self
.assertEqual(diff
, '', "Security descriptor of forest DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1242 % (key
, diff
, sec_desc
.as_sddl(utils
.domain_sid
), sd
.as_sddl(utils
.domain_sid
)))
1245 self
.conn
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
1251 dnsserver
.DNSSRV_TYPEID_NULL
,
1254 def test_security_descriptor_domain_zone(self
):
1256 Make sure that security descriptors of domain dns zones are
1260 partition_dn
= self
.samdb
.get_default_basedn()
1261 partition_dn
.add_child("DC=DomainDnsZones")
1262 zones
= self
.samdb
.search(base
=partition_dn
, scope
=ldb
.SCOPE_SUBTREE
,
1263 expression
="(name=%s)" % self
.custom_zone
,
1264 attrs
=["nTSecurityDescriptor"])
1265 self
.assertEqual(len(zones
), 1)
1266 current_dn
= zones
[0].dn
1267 self
.assertIn("nTSecurityDescriptor", zones
[0])
1268 tmp
= zones
[0]["nTSecurityDescriptor"][0]
1269 utils
= sd_utils
.SDUtils(self
.samdb
)
1270 sd
= ndr_unpack(security
.descriptor
, tmp
)
1271 sddl
= sd
.as_sddl(utils
.domain_sid
)
1273 domain_sid
= security
.dom_sid(self
.samdb
.get_domain_sid())
1275 res
= self
.samdb
.search(base
=self
.samdb
.get_default_basedn(), scope
=ldb
.SCOPE_SUBTREE
,
1276 expression
="(sAMAccountName=DnsAdmins)",
1277 attrs
=["objectSid"])
1279 dns_admin
= str(ndr_unpack(security
.dom_sid
, res
[0]['objectSid'][0]))
1281 packed_sd
= descriptor
.sddl2binary("O:DAG:DA"
1282 "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
1284 "(A;;RPLCLORC;;;WD)"
1285 "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
1286 "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1287 domain_sid
, {"DnsAdmins": dns_admin
})
1288 expected_sd
= descriptor
.get_clean_sd(ndr_unpack(security
.descriptor
, packed_sd
))
1290 packed_msdns
= descriptor
.get_dns_domain_microsoft_dns_descriptor(domain_sid
,
1291 {"DnsAdmins": dns_admin
})
1292 expected_msdns_sd
= descriptor
.get_clean_sd(ndr_unpack(security
.descriptor
, packed_msdns
))
1294 packed_part_sd
= descriptor
.get_dns_partition_descriptor(domain_sid
)
1295 expected_part_sd
= descriptor
.get_clean_sd(ndr_unpack(security
.descriptor
,
1298 msdns_dn
= ldb
.Dn(self
.samdb
, "CN=MicrosoftDNS,%s" % str(partition_dn
))
1299 security_desc_dict
= [(current_dn
.get_linearized(), expected_sd
),
1300 (msdns_dn
.get_linearized(), expected_msdns_sd
),
1301 (partition_dn
.get_linearized(), expected_part_sd
)]
1303 for (key
, sec_desc
) in security_desc_dict
:
1304 zones
= self
.samdb
.search(base
=key
, scope
=ldb
.SCOPE_BASE
,
1305 attrs
=["nTSecurityDescriptor"])
1306 self
.assertIn("nTSecurityDescriptor", zones
[0])
1307 tmp
= zones
[0]["nTSecurityDescriptor"][0]
1308 utils
= sd_utils
.SDUtils(self
.samdb
)
1310 sd
= ndr_unpack(security
.descriptor
, tmp
)
1311 diff
= descriptor
.get_diff_sds(sec_desc
, sd
, domain_sid
)
1313 self
.assertEqual(diff
, '', "Security descriptor of domain DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1314 % (key
, diff
, sec_desc
.as_sddl(utils
.domain_sid
), sd
.as_sddl(utils
.domain_sid
)))