1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba
.credentials
import Credentials
21 from samba
.samdb
import SamDB
22 from samba
.auth
import system_session
23 from samba
.tests
import TestCase
24 from samba
.tests
import delete_force
25 from samba
.ndr
import ndr_unpack
, ndr_pack
26 from samba
.dcerpc
import drsblobs
, security
, misc
27 from samba
.param
import LoadParm
28 from samba
import dsdb
, functional_level
29 from samba
import werror
35 class DsdbAccountTests(TestCase
):
39 self
.lp
= samba
.tests
.env_loadparm()
40 self
.creds
= Credentials()
41 self
.creds
.guess(self
.lp
)
42 self
.session
= system_session()
43 self
.samdb
= SamDB(session_info
=self
.session
,
44 credentials
=self
.creds
,
48 user_name
= "dsdb-user-" + str(uuid
.uuid4().hex[0:6])
49 user_pass
= samba
.generate_random_password(32, 32)
50 user_description
= "Test user for dsdb test"
52 base_dn
= self
.samdb
.domain_dn()
54 self
.account_dn
= "CN=" + user_name
+ ",CN=Users," + base_dn
55 self
.samdb
.newuser(username
=user_name
,
57 description
=user_description
)
59 self
.addCleanup(delete_force
, self
.samdb
, self
.account_dn
)
61 # Get server reference DN
62 res
= self
.samdb
.search(base
=ldb
.Dn(self
.samdb
,
63 self
.samdb
.get_serverName()),
65 attrs
=["serverReference"])
66 # Get server reference
67 self
.server_ref_dn
= ldb
.Dn(
68 self
.samdb
, res
[0]["serverReference"][0].decode("utf-8"))
71 res
= self
.samdb
.search(base
=self
.server_ref_dn
,
73 attrs
=["rIDSetReferences"])
75 self
.assertIn("rIDSetReferences", rid_set_refs
)
76 rid_set_str
= rid_set_refs
["rIDSetReferences"][0].decode("utf-8")
77 self
.rid_set_dn
= ldb
.Dn(self
.samdb
, rid_set_str
)
79 def get_rid_set(self
, rid_set_dn
):
80 res
= self
.samdb
.search(base
=rid_set_dn
,
82 attrs
=["rIDAllocationPool",
83 "rIDPreviousAllocationPool",
88 def test_ridalloc_next_free_rid(self
):
89 # Test RID allocation. We assume that RID
90 # pools allocated to us are contiguous.
91 self
.samdb
.transaction_start()
93 orig_rid_set
= self
.get_rid_set(self
.rid_set_dn
)
94 self
.assertIn("rIDAllocationPool", orig_rid_set
)
95 self
.assertIn("rIDPreviousAllocationPool", orig_rid_set
)
96 self
.assertIn("rIDUsedPool", orig_rid_set
)
97 self
.assertIn("rIDNextRID", orig_rid_set
)
99 # Get rIDNextRID value from RID set.
100 next_rid
= int(orig_rid_set
["rIDNextRID"][0])
102 # Check the result of next_free_rid().
103 next_free_rid
= self
.samdb
.next_free_rid()
104 self
.assertEqual(next_rid
+ 1, next_free_rid
)
106 # Check calling it twice in succession gives the same result.
107 next_free_rid2
= self
.samdb
.next_free_rid()
108 self
.assertEqual(next_free_rid
, next_free_rid2
)
110 # Ensure that the RID set attributes have not changed.
111 rid_set2
= self
.get_rid_set(self
.rid_set_dn
)
112 self
.assertEqual(orig_rid_set
, rid_set2
)
114 self
.samdb
.transaction_cancel()
116 def test_ridalloc_no_ridnextrid(self
):
117 self
.samdb
.transaction_start()
119 # Delete the rIDNextRID attribute of the RID set,
120 # and set up previous and next pools.
126 msg
.dn
= self
.rid_set_dn
127 msg
["rIDNextRID"] = ldb
.MessageElement([],
130 msg
["rIDPreviousAllocationPool"] = (
131 ldb
.MessageElement(str((prev_hi
<< 32) | prev_lo
),
132 ldb
.FLAG_MOD_REPLACE
,
133 "rIDPreviousAllocationPool"))
134 msg
["rIDAllocationPool"] = (
135 ldb
.MessageElement(str((next_hi
<< 32) | next_lo
),
136 ldb
.FLAG_MOD_REPLACE
,
137 "rIDAllocationPool"))
138 self
.samdb
.modify(msg
)
140 # Ensure that next_free_rid() returns the start of the next pool.
141 next_free_rid3
= self
.samdb
.next_free_rid()
142 self
.assertEqual(next_lo
, next_free_rid3
)
144 # Check the result of allocate_rid() matches.
145 rid
= self
.samdb
.allocate_rid()
146 self
.assertEqual(next_free_rid3
, rid
)
148 # Check that the result of next_free_rid() has now changed.
149 next_free_rid4
= self
.samdb
.next_free_rid()
150 self
.assertEqual(rid
+ 1, next_free_rid4
)
152 # Check the range of available RIDs.
153 free_lo
, free_hi
= self
.samdb
.free_rid_bounds()
154 self
.assertEqual(rid
+ 1, free_lo
)
155 self
.assertEqual(next_hi
, free_hi
)
157 self
.samdb
.transaction_cancel()
159 def test_ridalloc_no_free_rids(self
):
160 self
.samdb
.transaction_start()
162 # Exhaust our current pool of RIDs.
166 msg
.dn
= self
.rid_set_dn
167 msg
["rIDPreviousAllocationPool"] = (
168 ldb
.MessageElement(str((pool_hi
<< 32) | pool_lo
),
169 ldb
.FLAG_MOD_REPLACE
,
170 "rIDPreviousAllocationPool"))
171 msg
["rIDAllocationPool"] = (
172 ldb
.MessageElement(str((pool_hi
<< 32) | pool_lo
),
173 ldb
.FLAG_MOD_REPLACE
,
174 "rIDAllocationPool"))
175 msg
["rIDNextRID"] = (
176 ldb
.MessageElement(str(pool_hi
),
177 ldb
.FLAG_MOD_REPLACE
,
179 self
.samdb
.modify(msg
)
181 # Ensure that calculating the next free RID fails.
182 with self
.assertRaises(ldb
.LdbError
) as err
:
183 self
.samdb
.next_free_rid()
185 self
.assertEqual("RID pools out of RIDs", err
.exception
.args
[1])
187 # Ensure we can still allocate a new RID.
188 self
.samdb
.allocate_rid()
190 self
.samdb
.transaction_cancel()
192 def test_ridalloc_new_ridset(self
):
193 self
.samdb
.transaction_start()
195 # Test what happens with RID Set values set to zero (similar to
196 # when a RID Set is first created, except we also set
197 # rIDAllocationPool to zero).
199 msg
.dn
= self
.rid_set_dn
200 msg
["rIDPreviousAllocationPool"] = (
201 ldb
.MessageElement("0",
202 ldb
.FLAG_MOD_REPLACE
,
203 "rIDPreviousAllocationPool"))
204 msg
["rIDAllocationPool"] = (
205 ldb
.MessageElement("0",
206 ldb
.FLAG_MOD_REPLACE
,
207 "rIDAllocationPool"))
208 msg
["rIDNextRID"] = (
209 ldb
.MessageElement("0",
210 ldb
.FLAG_MOD_REPLACE
,
212 self
.samdb
.modify(msg
)
214 # Ensure that calculating the next free RID fails.
215 with self
.assertRaises(ldb
.LdbError
) as err
:
216 self
.samdb
.next_free_rid()
218 self
.assertEqual("RID pools out of RIDs", err
.exception
.args
[1])
220 # Set values for the next pool.
224 msg
.dn
= self
.rid_set_dn
225 msg
["rIDAllocationPool"] = (
226 ldb
.MessageElement(str((pool_hi
<< 32) | pool_lo
),
227 ldb
.FLAG_MOD_REPLACE
,
228 "rIDAllocationPool"))
229 self
.samdb
.modify(msg
)
231 # Ensure the next free RID value is equal to the next pool's lower
233 next_free_rid5
= self
.samdb
.next_free_rid()
234 self
.assertEqual(pool_lo
, next_free_rid5
)
236 # Check the range of available RIDs.
237 free_lo
, free_hi
= self
.samdb
.free_rid_bounds()
238 self
.assertEqual(pool_lo
, free_lo
)
239 self
.assertEqual(pool_hi
, free_hi
)
241 self
.samdb
.transaction_cancel()
243 def test_ridalloc_move_to_new_pool(self
):
244 self
.samdb
.transaction_start()
246 # Test moving to a new pool from the previous pool.
252 msg
.dn
= self
.rid_set_dn
253 msg
["rIDPreviousAllocationPool"] = (
254 ldb
.MessageElement(str((pool_hi
<< 32) | pool_lo
),
255 ldb
.FLAG_MOD_REPLACE
,
256 "rIDPreviousAllocationPool"))
257 msg
["rIDAllocationPool"] = (
258 ldb
.MessageElement(str((new_pool_hi
<< 32) | new_pool_lo
),
259 ldb
.FLAG_MOD_REPLACE
,
260 "rIDAllocationPool"))
261 msg
["rIDNextRID"] = (
262 ldb
.MessageElement(str(pool_hi
- 1),
263 ldb
.FLAG_MOD_REPLACE
,
265 self
.samdb
.modify(msg
)
267 # We should have remained in the previous pool.
268 next_free_rid6
= self
.samdb
.next_free_rid()
269 self
.assertEqual(pool_hi
, next_free_rid6
)
271 # Check the range of available RIDs.
272 free_lo
, free_hi
= self
.samdb
.free_rid_bounds()
273 self
.assertEqual(pool_hi
, free_lo
)
274 self
.assertEqual(pool_hi
, free_hi
)
276 # Allocate a new RID.
277 rid2
= self
.samdb
.allocate_rid()
278 self
.assertEqual(next_free_rid6
, rid2
)
280 # We should now move to the next pool.
281 next_free_rid7
= self
.samdb
.next_free_rid()
282 self
.assertEqual(new_pool_lo
, next_free_rid7
)
284 # Check the new range of available RIDs.
285 free_lo2
, free_hi2
= self
.samdb
.free_rid_bounds()
286 self
.assertEqual(new_pool_lo
, free_lo2
)
287 self
.assertEqual(new_pool_hi
, free_hi2
)
289 # Ensure that allocate_rid() matches.
290 rid3
= self
.samdb
.allocate_rid()
291 self
.assertEqual(next_free_rid7
, rid3
)
293 self
.samdb
.transaction_cancel()
295 def test_ridalloc_no_ridsetreferences(self
):
296 self
.samdb
.transaction_start()
298 # Delete the rIDSetReferences attribute.
300 msg
.dn
= self
.server_ref_dn
301 msg
["rIDSetReferences"] = (
302 ldb
.MessageElement([],
305 self
.samdb
.modify(msg
)
307 # Ensure calculating the next free RID fails.
308 with self
.assertRaises(ldb
.LdbError
) as err
:
309 self
.samdb
.next_free_rid()
311 enum
, estr
= err
.exception
.args
312 self
.assertEqual(ldb
.ERR_NO_SUCH_ATTRIBUTE
, enum
)
313 self
.assertIn("No RID Set DN - "
314 "Cannot find attribute rIDSetReferences of %s "
315 "to calculate reference dn" % self
.server_ref_dn
,
318 # Ensure allocating a new RID fails.
319 with self
.assertRaises(ldb
.LdbError
) as err
:
320 self
.samdb
.allocate_rid()
322 enum
, estr
= err
.exception
.args
323 self
.assertEqual(ldb
.ERR_ENTRY_ALREADY_EXISTS
, enum
)
324 self
.assertIn("No RID Set DN - "
325 "Failed to add RID Set %s - "
326 "Entry %s already exists" %
327 (self
.rid_set_dn
, self
.rid_set_dn
),
330 self
.samdb
.transaction_cancel()
332 def test_ridalloc_no_rid_set(self
):
333 self
.samdb
.transaction_start()
335 # Set the rIDSetReferences attribute to not point to a RID Set.
336 fake_rid_set_str
= self
.account_dn
338 msg
.dn
= self
.server_ref_dn
339 msg
["rIDSetReferences"] = (
340 ldb
.MessageElement(fake_rid_set_str
,
341 ldb
.FLAG_MOD_REPLACE
,
343 self
.samdb
.modify(msg
)
345 # Ensure calculating the next free RID fails.
346 with self
.assertRaises(ldb
.LdbError
) as err
:
347 self
.samdb
.next_free_rid()
349 enum
, estr
= err
.exception
.args
350 self
.assertEqual(ldb
.ERR_OPERATIONS_ERROR
, enum
)
351 self
.assertIn("Bad RID Set " + fake_rid_set_str
, estr
)
353 # Ensure allocating a new RID fails.
354 with self
.assertRaises(ldb
.LdbError
) as err
:
355 self
.samdb
.allocate_rid()
357 enum
, estr
= err
.exception
.args
358 self
.assertEqual(ldb
.ERR_OPERATIONS_ERROR
, enum
)
359 self
.assertIn("Bad RID Set " + fake_rid_set_str
, estr
)
361 self
.samdb
.transaction_cancel()
363 def test_error_replpropertymetadata(self
):
364 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
365 base
=self
.account_dn
,
366 attrs
=["replPropertyMetaData"])
367 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
368 res
[0]["replPropertyMetaData"][0])
371 # Search for Description
373 old_version
= o
.version
374 o
.version
= o
.version
+ 1
375 replBlob
= ndr_pack(repl
)
378 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
379 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
381 def test_error_replpropertymetadata_nochange(self
):
382 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
383 base
=self
.account_dn
,
384 attrs
=["replPropertyMetaData"])
385 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
386 res
[0]["replPropertyMetaData"][0])
387 replBlob
= ndr_pack(repl
)
390 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
391 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
393 def test_error_replpropertymetadata_allow_sort(self
):
394 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
395 base
=self
.account_dn
,
396 attrs
=["replPropertyMetaData"])
397 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
398 res
[0]["replPropertyMetaData"][0])
399 replBlob
= ndr_pack(repl
)
402 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
403 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
405 def test_twoatt_replpropertymetadata(self
):
406 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
407 base
=self
.account_dn
,
408 attrs
=["replPropertyMetaData", "uSNChanged"])
409 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
410 res
[0]["replPropertyMetaData"][0])
413 # Search for Description
415 old_version
= o
.version
416 o
.version
= o
.version
+ 1
417 o
.local_usn
= int(str(res
[0]["uSNChanged"])) + 1
418 replBlob
= ndr_pack(repl
)
421 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
422 msg
["description"] = ldb
.MessageElement("new val", ldb
.FLAG_MOD_REPLACE
, "description")
423 self
.assertRaises(ldb
.LdbError
, self
.samdb
.modify
, msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
425 def test_set_replpropertymetadata(self
):
426 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
427 base
=self
.account_dn
,
428 attrs
=["replPropertyMetaData", "uSNChanged"])
429 repl
= ndr_unpack(drsblobs
.replPropertyMetaDataBlob
,
430 res
[0]["replPropertyMetaData"][0])
433 # Search for Description
435 old_version
= o
.version
436 o
.version
= o
.version
+ 1
437 o
.local_usn
= int(str(res
[0]["uSNChanged"])) + 1
438 o
.originating_usn
= int(str(res
[0]["uSNChanged"])) + 1
439 replBlob
= ndr_pack(repl
)
442 msg
["replPropertyMetaData"] = ldb
.MessageElement(replBlob
, ldb
.FLAG_MOD_REPLACE
, "replPropertyMetaData")
443 self
.samdb
.modify(msg
, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
445 def test_get_attribute_replmetadata_version(self
):
446 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
447 base
=self
.account_dn
,
449 self
.assertEqual(len(res
), 1)
451 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "unicodePwd"), 2)
453 def test_set_attribute_replmetadata_version(self
):
454 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
455 base
=self
.account_dn
,
457 self
.assertEqual(len(res
), 1)
459 version
= self
.samdb
.get_attribute_replmetadata_version(dn
, "description")
460 self
.samdb
.set_attribute_replmetadata_version(dn
, "description", version
+ 2)
461 self
.assertEqual(self
.samdb
.get_attribute_replmetadata_version(dn
, "description"), version
+ 2)
463 def test_no_error_on_invalid_control(self
):
465 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
466 base
=self
.account_dn
,
467 attrs
=["replPropertyMetaData"],
468 controls
=["local_oid:%s:0"
469 % dsdb
.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED
])
470 except ldb
.LdbError
as e
:
471 self
.fail("Should have not raised an exception")
473 def test_error_on_invalid_critical_control(self
):
475 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
476 base
=self
.account_dn
,
477 attrs
=["replPropertyMetaData"],
478 controls
=["local_oid:%s:1"
479 % dsdb
.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED
])
480 except ldb
.LdbError
as e
:
481 (errno
, estr
) = e
.args
482 if errno
!= ldb
.ERR_UNSUPPORTED_CRITICAL_EXTENSION
:
483 self
.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
486 class DsdbTests(TestCase
):
489 self
.lp
= samba
.tests
.env_loadparm()
490 self
.creds
= Credentials()
491 self
.creds
.guess(self
.lp
)
492 self
.session
= system_session()
493 self
.samdb
= SamDB(session_info
=self
.session
,
494 credentials
=self
.creds
,
497 # Allocate a unique RID for use in the objectSID tests.
499 def allocate_rid(self
):
500 self
.samdb
.transaction_start()
502 rid
= self
.samdb
.allocate_rid()
504 self
.samdb
.transaction_cancel()
506 self
.samdb
.transaction_commit()
509 def test_get_oid_from_attrid(self
):
510 oid
= self
.samdb
.get_oid_from_attid(591614)
511 self
.assertEqual(oid
, "1.2.840.113556.1.4.1790")
513 def test_ok_get_attribute_from_attid(self
):
514 self
.assertEqual(self
.samdb
.get_attribute_from_attid(13), "description")
516 def test_ko_get_attribute_from_attid(self
):
517 self
.assertEqual(self
.samdb
.get_attribute_from_attid(11979), None)
519 # Ensure that duplicate objectSID's are permitted for foreign security
522 def test_duplicate_objectSIDs_allowed_on_foreign_security_principals(self
):
525 # We need to build a foreign security principal SID
526 # i.e a SID not in the current domain.
528 dom_sid
= self
.samdb
.get_domain_sid()
529 if str(dom_sid
).endswith("0"):
533 sid_str
= str(dom_sid
)[:-1] + c
+ "-1000"
534 sid
= ndr_pack(security
.dom_sid(sid_str
))
535 basedn
= self
.samdb
.get_default_basedn()
536 dn
= "CN=%s,CN=ForeignSecurityPrincipals,%s" % (sid_str
, basedn
)
539 # First without control
545 "objectClass": "foreignSecurityPrincipal"})
546 self
.fail("No exception should get ERR_OBJECT_CLASS_VIOLATION")
547 except ldb
.LdbError
as e
:
549 self
.assertEqual(code
, ldb
.ERR_OBJECT_CLASS_VIOLATION
, str(e
))
550 werr
= "%08X" % werror
.WERR_DS_MISSING_REQUIRED_ATT
551 self
.assertTrue(werr
in msg
, msg
)
556 "objectClass": "foreignSecurityPrincipal",
558 self
.fail("No exception should get ERR_UNWILLING_TO_PERFORM")
559 except ldb
.LdbError
as e
:
561 self
.assertEqual(code
, ldb
.ERR_UNWILLING_TO_PERFORM
, str(e
))
562 werr
= "%08X" % werror
.WERR_DS_ILLEGAL_MOD_OPERATION
563 self
.assertTrue(werr
in msg
, msg
)
566 # We need to use the provision control
567 # in order to add foreignSecurityPrincipal
571 controls
= ["provision:0"]
574 "objectClass": "foreignSecurityPrincipal"},
577 self
.samdb
.delete(dn
)
582 "objectClass": "foreignSecurityPrincipal"},
584 except ldb
.LdbError
as e
:
586 self
.fail("Got unexpected exception %d - %s "
590 self
.samdb
.delete(dn
)
592 def _test_foreignSecurityPrincipal(self
, obj_class
, fpo_attr
):
594 dom_sid
= self
.samdb
.get_domain_sid()
595 lsid_str
= str(dom_sid
) + "-4294967294"
596 bsid_str
= "S-1-5-32-4294967294"
597 fsid_str
= "S-1-5-4294967294"
598 basedn
= self
.samdb
.get_default_basedn()
600 dn_str
= "cn=%s,cn=Users,%s" % (cn
, basedn
)
601 dn
= ldb
.Dn(self
.samdb
, dn_str
)
603 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
605 expression
="(objectSid=%s)" % lsid_str
,
607 self
.assertEqual(len(res
), 0)
608 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
610 expression
="(objectSid=%s)" % bsid_str
,
612 self
.assertEqual(len(res
), 0)
613 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
615 expression
="(objectSid=%s)" % fsid_str
,
617 self
.assertEqual(len(res
), 0)
619 self
.addCleanup(delete_force
, self
.samdb
, dn_str
)
623 "objectClass": obj_class
})
627 msg
[fpo_attr
] = ldb
.MessageElement("<SID=%s>" % lsid_str
,
631 self
.samdb
.modify(msg
)
632 self
.fail("No exception should get LDB_ERR_UNWILLING_TO_PERFORM")
633 except ldb
.LdbError
as e
:
635 self
.assertEqual(code
, ldb
.ERR_UNWILLING_TO_PERFORM
, str(e
))
636 werr
= "%08X" % werror
.WERR_DS_INVALID_GROUP_TYPE
637 self
.assertTrue(werr
in msg
, msg
)
641 msg
[fpo_attr
] = ldb
.MessageElement("<SID=%s>" % bsid_str
,
645 self
.samdb
.modify(msg
)
646 self
.fail("No exception should get LDB_ERR_NO_SUCH_OBJECT")
647 except ldb
.LdbError
as e
:
649 self
.assertEqual(code
, ldb
.ERR_NO_SUCH_OBJECT
, str(e
))
650 werr
= "%08X" % werror
.WERR_NO_SUCH_MEMBER
651 self
.assertTrue(werr
in msg
, msg
)
655 msg
[fpo_attr
] = ldb
.MessageElement("<SID=%s>" % fsid_str
,
659 self
.samdb
.modify(msg
)
660 except ldb
.LdbError
as e
:
661 self
.fail("Should have not raised an exception")
663 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
665 expression
="(objectSid=%s)" % fsid_str
,
667 self
.assertEqual(len(res
), 1)
668 self
.samdb
.delete(res
[0].dn
)
669 self
.samdb
.delete(dn
)
670 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
672 expression
="(objectSid=%s)" % fsid_str
,
674 self
.assertEqual(len(res
), 0)
676 def test_foreignSecurityPrincipal_member(self
):
677 return self
._test
_foreignSecurityPrincipal
(
680 def test_foreignSecurityPrincipal_MembersForAzRole(self
):
681 return self
._test
_foreignSecurityPrincipal
(
682 "msDS-AzRole", "msDS-MembersForAzRole")
684 def test_foreignSecurityPrincipal_NeverRevealGroup(self
):
685 return self
._test
_foreignSecurityPrincipal
(
686 "computer", "msDS-NeverRevealGroup")
688 def test_foreignSecurityPrincipal_RevealOnDemandGroup(self
):
689 return self
._test
_foreignSecurityPrincipal
(
690 "computer", "msDS-RevealOnDemandGroup")
692 def _test_fail_foreignSecurityPrincipal(self
, obj_class
, fpo_attr
,
693 msg_exp
, lerr_exp
, werr_exp
,
694 allow_reference
=True):
696 dom_sid
= self
.samdb
.get_domain_sid()
697 lsid_str
= str(dom_sid
) + "-4294967294"
698 bsid_str
= "S-1-5-32-4294967294"
699 fsid_str
= "S-1-5-4294967294"
700 basedn
= self
.samdb
.get_default_basedn()
701 cn1
= "dsdb_test_fpo1"
702 dn1_str
= "cn=%s,cn=Users,%s" % (cn1
, basedn
)
703 dn1
= ldb
.Dn(self
.samdb
, dn1_str
)
704 cn2
= "dsdb_test_fpo2"
705 dn2_str
= "cn=%s,cn=Users,%s" % (cn2
, basedn
)
706 dn2
= ldb
.Dn(self
.samdb
, dn2_str
)
708 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
710 expression
="(objectSid=%s)" % lsid_str
,
712 self
.assertEqual(len(res
), 0)
713 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
715 expression
="(objectSid=%s)" % bsid_str
,
717 self
.assertEqual(len(res
), 0)
718 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
720 expression
="(objectSid=%s)" % fsid_str
,
722 self
.assertEqual(len(res
), 0)
724 self
.addCleanup(delete_force
, self
.samdb
, dn1_str
)
725 self
.addCleanup(delete_force
, self
.samdb
, dn2_str
)
729 "objectClass": obj_class
})
733 "objectClass": obj_class
})
737 msg
[fpo_attr
] = ldb
.MessageElement("<SID=%s>" % lsid_str
,
741 self
.samdb
.modify(msg
)
742 self
.fail("No exception should get %s" % msg_exp
)
743 except ldb
.LdbError
as e
:
745 self
.assertEqual(code
, lerr_exp
, str(e
))
746 werr
= "%08X" % werr_exp
747 self
.assertTrue(werr
in msg
, msg
)
751 msg
[fpo_attr
] = ldb
.MessageElement("<SID=%s>" % bsid_str
,
755 self
.samdb
.modify(msg
)
756 self
.fail("No exception should get %s" % msg_exp
)
757 except ldb
.LdbError
as e
:
759 self
.assertEqual(code
, lerr_exp
, str(e
))
760 werr
= "%08X" % werr_exp
761 self
.assertTrue(werr
in msg
, msg
)
765 msg
[fpo_attr
] = ldb
.MessageElement("<SID=%s>" % fsid_str
,
769 self
.samdb
.modify(msg
)
770 self
.fail("No exception should get %s" % msg
)
771 except ldb
.LdbError
as e
:
773 self
.assertEqual(code
, lerr_exp
, str(e
))
774 werr
= "%08X" % werr_exp
775 self
.assertTrue(werr
in msg
, msg
)
779 msg
[fpo_attr
] = ldb
.MessageElement("%s" % dn2
,
783 self
.samdb
.modify(msg
)
784 if not allow_reference
:
785 self
.fail("No exception should get %s" % msg_exp
)
786 except ldb
.LdbError
as e
:
788 self
.fail("Should have not raised an exception: %s" % e
)
790 self
.assertEqual(code
, lerr_exp
, str(e
))
791 werr
= "%08X" % werr_exp
792 self
.assertTrue(werr
in msg
, msg
)
794 self
.samdb
.delete(dn2
)
795 self
.samdb
.delete(dn1
)
797 def test_foreignSecurityPrincipal_NonMembers(self
):
798 return self
._test
_fail
_foreignSecurityPrincipal
(
799 "group", "msDS-NonMembers",
800 "LDB_ERR_UNWILLING_TO_PERFORM/WERR_NOT_SUPPORTED",
801 ldb
.ERR_UNWILLING_TO_PERFORM
, werror
.WERR_NOT_SUPPORTED
,
802 allow_reference
=False)
804 def test_foreignSecurityPrincipal_HostServiceAccount(self
):
805 return self
._test
_fail
_foreignSecurityPrincipal
(
806 "computer", "msDS-HostServiceAccount",
807 "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID",
808 ldb
.ERR_CONSTRAINT_VIOLATION
,
809 werror
.WERR_DS_NAME_REFERENCE_INVALID
)
811 def test_foreignSecurityPrincipal_manager(self
):
812 return self
._test
_fail
_foreignSecurityPrincipal
(
814 "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID",
815 ldb
.ERR_CONSTRAINT_VIOLATION
,
816 werror
.WERR_DS_NAME_REFERENCE_INVALID
)
819 # Duplicate objectSID's should not be permitted for sids in the local
820 # domain. The test sequence is add an object, delete it, then attempt to
821 # re-add it, this should fail with a constraint violation
823 def test_duplicate_objectSIDs_not_allowed_on_local_objects(self
):
825 dom_sid
= self
.samdb
.get_domain_sid()
826 rid
= self
.allocate_rid()
827 sid_str
= str(dom_sid
) + "-" + rid
828 sid
= ndr_pack(security
.dom_sid(sid_str
))
829 basedn
= self
.samdb
.get_default_basedn()
831 dn
= "cn=%s,cn=Users,%s" % (cn
, basedn
)
835 "objectClass": "user",
837 self
.samdb
.delete(dn
)
842 "objectClass": "user",
844 self
.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
845 except ldb
.LdbError
as e
:
847 if code
!= ldb
.ERR_CONSTRAINT_VIOLATION
:
848 self
.fail("Got %d - %s should have got "
849 "LDB_ERR_CONSTRAINT_VIOLATION"
852 def test_linked_vs_non_linked_reference(self
):
853 basedn
= self
.samdb
.get_default_basedn()
854 kept_dn_str
= "cn=reference_kept,cn=Users,%s" % (basedn
)
855 removed_dn_str
= "cn=reference_removed,cn=Users,%s" % (basedn
)
856 dom_sid
= self
.samdb
.get_domain_sid()
857 none_sid_str
= str(dom_sid
) + "-4294967294"
858 none_guid_str
= "afafafaf-fafa-afaf-fafa-afafafafafaf"
860 self
.addCleanup(delete_force
, self
.samdb
, kept_dn_str
)
861 self
.addCleanup(delete_force
, self
.samdb
, removed_dn_str
)
865 "objectClass": "user"})
866 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
868 attrs
=["objectGUID", "objectSID"])
869 self
.assertEqual(len(res
), 1)
870 kept_guid
= ndr_unpack(misc
.GUID
, res
[0]["objectGUID"][0])
871 kept_sid
= ndr_unpack(security
.dom_sid
, res
[0]["objectSid"][0])
875 "dn": removed_dn_str
,
876 "objectClass": "user"})
877 res
= self
.samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
879 attrs
=["objectGUID", "objectSID"])
880 self
.assertEqual(len(res
), 1)
881 removed_guid
= ndr_unpack(misc
.GUID
, res
[0]["objectGUID"][0])
882 removed_sid
= ndr_unpack(security
.dom_sid
, res
[0]["objectSid"][0])
883 self
.samdb
.delete(removed_dn_str
)
886 # First try the linked attribute 'manager'
892 msg
["manager"] = ldb
.MessageElement("<SID=%s>" % removed_sid
,
896 self
.samdb
.modify(msg
)
897 self
.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
898 except ldb
.LdbError
as e
:
900 self
.assertEqual(code
, ldb
.ERR_CONSTRAINT_VIOLATION
, str(e
))
901 werr
= "%08X" % werror
.WERR_DS_NAME_REFERENCE_INVALID
902 self
.assertTrue(werr
in msg
, msg
)
906 msg
["manager"] = ldb
.MessageElement("<GUID=%s>" % removed_guid
,
910 self
.samdb
.modify(msg
)
911 self
.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
912 except ldb
.LdbError
as e
:
914 self
.assertEqual(code
, ldb
.ERR_CONSTRAINT_VIOLATION
, str(e
))
915 werr
= "%08X" % werror
.WERR_DS_NAME_REFERENCE_INVALID
916 self
.assertTrue(werr
in msg
, msg
)
919 # Try the non-linked attribute 'assistant'
920 # by GUID and SID, which should work.
924 msg
["assistant"] = ldb
.MessageElement("<SID=%s>" % removed_sid
,
927 self
.samdb
.modify(msg
)
930 msg
["assistant"] = ldb
.MessageElement("<SID=%s>" % removed_sid
,
933 self
.samdb
.modify(msg
)
937 msg
["assistant"] = ldb
.MessageElement("<GUID=%s>" % removed_guid
,
940 self
.samdb
.modify(msg
)
943 msg
["assistant"] = ldb
.MessageElement("<GUID=%s>" % removed_guid
,
946 self
.samdb
.modify(msg
)
949 # Finally try the non-linked attribute 'assistant'
950 # but with non existing GUID, SID, DN
954 msg
["assistant"] = ldb
.MessageElement("CN=NoneNone,%s" % (basedn
),
958 self
.samdb
.modify(msg
)
959 self
.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
960 except ldb
.LdbError
as e
:
962 self
.assertEqual(code
, ldb
.ERR_CONSTRAINT_VIOLATION
, str(e
))
963 werr
= "%08X" % werror
.WERR_DS_NAME_REFERENCE_INVALID
964 self
.assertTrue(werr
in msg
, msg
)
968 msg
["assistant"] = ldb
.MessageElement("<SID=%s>" % none_sid_str
,
972 self
.samdb
.modify(msg
)
973 self
.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
974 except ldb
.LdbError
as e
:
976 self
.assertEqual(code
, ldb
.ERR_CONSTRAINT_VIOLATION
, str(e
))
977 werr
= "%08X" % werror
.WERR_DS_NAME_REFERENCE_INVALID
978 self
.assertTrue(werr
in msg
, msg
)
982 msg
["assistant"] = ldb
.MessageElement("<GUID=%s>" % none_guid_str
,
986 self
.samdb
.modify(msg
)
987 self
.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
988 except ldb
.LdbError
as e
:
990 self
.assertEqual(code
, ldb
.ERR_CONSTRAINT_VIOLATION
, str(e
))
991 werr
= "%08X" % werror
.WERR_DS_NAME_REFERENCE_INVALID
992 self
.assertTrue(werr
in msg
, msg
)
994 self
.samdb
.delete(kept_dn
)
996 def test_normalize_dn_in_domain_full(self
):
997 domain_dn
= self
.samdb
.domain_dn()
999 part_dn
= ldb
.Dn(self
.samdb
, "CN=Users")
1002 full_dn
.add_base(domain_dn
)
1004 full_str
= str(full_dn
)
1006 # That is, no change
1007 self
.assertEqual(full_dn
,
1008 self
.samdb
.normalize_dn_in_domain(full_str
))
1010 def test_normalize_dn_in_domain_part(self
):
1011 domain_dn
= self
.samdb
.domain_dn()
1013 part_str
= "CN=Users"
1015 full_dn
= ldb
.Dn(self
.samdb
, part_str
)
1016 full_dn
.add_base(domain_dn
)
1018 # That is, the domain DN appended
1019 self
.assertEqual(full_dn
,
1020 self
.samdb
.normalize_dn_in_domain(part_str
))
1022 def test_normalize_dn_in_domain_full_dn(self
):
1023 domain_dn
= self
.samdb
.domain_dn()
1025 part_dn
= ldb
.Dn(self
.samdb
, "CN=Users")
1028 full_dn
.add_base(domain_dn
)
1030 # That is, no change
1031 self
.assertEqual(full_dn
,
1032 self
.samdb
.normalize_dn_in_domain(full_dn
))
1034 def test_normalize_dn_in_domain_part_dn(self
):
1035 domain_dn
= self
.samdb
.domain_dn()
1037 part_dn
= ldb
.Dn(self
.samdb
, "CN=Users")
1039 # That is, the domain DN appended
1040 self
.assertEqual(ldb
.Dn(self
.samdb
,
1041 str(part_dn
) + "," + str(domain_dn
)),
1042 self
.samdb
.normalize_dn_in_domain(part_dn
))
1044 class DsdbNCRootTests(TestCase
):
1048 self
.lp
= samba
.tests
.env_loadparm()
1049 self
.creds
= Credentials()
1050 self
.creds
.guess(self
.lp
)
1051 self
.session
= system_session()
1052 self
.samdb
= SamDB(session_info
=self
.session
,
1053 credentials
=self
.creds
,
1057 # These all use the local mode of operation inside
1058 # dsdb_find_nc_root() using the partitions control
1059 def test_dsdb_dn_nc_root_sid(self
):
1060 dom_sid
= self
.samdb
.get_domain_sid()
1061 domain_dn
= ldb
.Dn(self
.samdb
, self
.samdb
.domain_dn())
1062 dn
= ldb
.Dn(self
.samdb
, f
"<SID={dom_sid}>")
1064 nc_root
= self
.samdb
.get_nc_root(dn
)
1065 except ldb
.LdbError
as e
:
1066 (code
, msg
) = e
.args
1067 self
.fail("Got unexpected exception %d - %s "
1069 self
.assertEqual(domain_dn
, nc_root
)
1071 def test_dsdb_dn_nc_root_admin_sid(self
):
1072 dom_sid
= self
.samdb
.get_domain_sid()
1073 domain_dn
= ldb
.Dn(self
.samdb
, self
.samdb
.domain_dn())
1074 dn
= ldb
.Dn(self
.samdb
, f
"<SID={dom_sid}-500>")
1076 nc_root
= self
.samdb
.get_nc_root(dn
)
1077 except ldb
.LdbError
as e
:
1078 (code
, msg
) = e
.args
1079 self
.fail("Got unexpected exception %d - %s "
1081 self
.assertEqual(domain_dn
, nc_root
)
1083 def test_dsdb_dn_nc_root_users_container(self
):
1084 dom_sid
= self
.samdb
.get_domain_sid()
1085 domain_dn
= ldb
.Dn(self
.samdb
, self
.samdb
.domain_dn())
1086 dn
= ldb
.Dn(self
.samdb
, f
"CN=Users,{domain_dn}")
1088 nc_root
= self
.samdb
.get_nc_root(dn
)
1089 except ldb
.LdbError
as e
:
1090 (code
, msg
) = e
.args
1091 self
.fail("Got unexpected exception %d - %s "
1093 self
.assertEqual(domain_dn
, nc_root
)
1095 def test_dsdb_dn_nc_root_new_dn(self
):
1096 dom_sid
= self
.samdb
.get_domain_sid()
1097 domain_dn
= ldb
.Dn(self
.samdb
, self
.samdb
.domain_dn())
1098 dn
= ldb
.Dn(self
.samdb
, f
"CN=Xnotexisting,CN=Users,{domain_dn}")
1100 nc_root
= self
.samdb
.get_nc_root(dn
)
1101 except ldb
.LdbError
as e
:
1102 (code
, msg
) = e
.args
1103 self
.fail("Got unexpected exception %d - %s "
1105 self
.assertEqual(domain_dn
, nc_root
)
1107 def test_dsdb_dn_nc_root_new_dn_with_guid(self
):
1108 domain_dn
= ldb
.Dn(self
.samdb
, self
.samdb
.domain_dn())
1109 dn
= ldb
.Dn(self
.samdb
, f
"<GUID=828e3baf-fa02-4d82-ba5d-6f647dab5fd8>;CN=Xnotexisting,CN=Users,{domain_dn}")
1111 nc_root
= self
.samdb
.get_nc_root(dn
)
1112 except ldb
.LdbError
as e
:
1113 (code
, msg
) = e
.args
1114 self
.fail("Got unexpected exception %d - %s "
1116 self
.assertEqual(domain_dn
, nc_root
)
1118 def test_dsdb_dn_nc_root_guid(self
):
1119 ntds_guid
= self
.samdb
.get_ntds_GUID()
1120 configuration_dn
= self
.samdb
.get_config_basedn()
1121 dn
= ldb
.Dn(self
.samdb
, f
"<GUID={ntds_guid}>")
1123 nc_root
= self
.samdb
.get_nc_root(dn
)
1124 except ldb
.LdbError
as e
:
1125 (code
, msg
) = e
.args
1126 self
.fail("Got unexpected exception %d - %s "
1128 self
.assertEqual(configuration_dn
, nc_root
)
1130 def test_dsdb_dn_nc_root_misleading_to_noexisting_guid(self
):
1131 ntds_guid
= self
.samdb
.get_ntds_GUID()
1132 configuration_dn
= self
.samdb
.get_config_basedn()
1133 domain_dn
= ldb
.Dn(self
.samdb
, self
.samdb
.domain_dn())
1134 dn
= ldb
.Dn(self
.samdb
, f
"<GUID={ntds_guid}>;CN=Xnotexisting,CN=Users,{domain_dn}")
1136 nc_root
= self
.samdb
.get_nc_root(dn
)
1137 except ldb
.LdbError
as e
:
1138 (code
, msg
) = e
.args
1139 self
.fail("Got unexpected exception %d - %s "
1141 self
.assertEqual(configuration_dn
, nc_root
)
1143 def test_dsdb_dn_nc_root_misleading_to_existing_guid(self
):
1144 ntds_guid
= self
.samdb
.get_ntds_GUID()
1145 configuration_dn
= self
.samdb
.get_config_basedn()
1146 domain_dn
= ldb
.Dn(self
.samdb
, self
.samdb
.domain_dn())
1147 dn
= ldb
.Dn(self
.samdb
, f
"<GUID={ntds_guid}>;{domain_dn}")
1149 nc_root
= self
.samdb
.get_nc_root(dn
)
1150 except ldb
.LdbError
as e
:
1151 (code
, msg
) = e
.args
1152 self
.fail("Got unexpected exception %d - %s "
1154 self
.assertEqual(configuration_dn
, nc_root
)
1156 class DsdbRemoteNCRootTests(DsdbNCRootTests
):
1159 # Reconnect to the remote LDAP port
1160 self
.samdb
= SamDB(url
="ldap://%s" % samba
.tests
.env_get_var_value('SERVER'),
1161 session_info
=self
.session
,
1162 credentials
=self
.get_credentials(),
1167 class DsdbFullScanTests(TestCase
):
1171 self
.lp
= samba
.tests
.env_loadparm()
1172 self
.creds
= Credentials()
1173 self
.creds
.guess(self
.lp
)
1174 self
.session
= system_session()
1176 def test_sam_ldb_open_no_full_scan(self
):
1178 self
.samdb
= SamDB(session_info
=self
.session
,
1179 credentials
=self
.creds
,
1181 options
=["disable_full_db_scan_for_self_test:1"])
1182 except ldb
.LdbError
as err
:
1184 self
.fail("sam.ldb required a full scan to start up")
1186 class DsdbStartUpTests(TestCase
):
1189 lp
= samba
.tests
.env_loadparm()
1190 path
= lp
.configfile
1192 # This is to avoid a tattoo of the global state
1193 self
.lp
= LoadParm(filename_for_non_global_lp
=path
)
1194 self
.creds
= Credentials()
1195 self
.creds
.guess(self
.lp
)
1196 self
.session
= system_session()
1197 self
.samdb
= SamDB(session_info
=self
.session
,
1198 credentials
=self
.creds
,
1201 def test_correct_fl(self
):
1202 res
= self
.samdb
.search(base
="",
1203 scope
=ldb
.SCOPE_BASE
,
1204 attrs
=["domainFunctionality"])
1205 # This confirms the domain is in FL 2016 by default, this is
1206 # important to verify the original state
1207 self
.assertEqual(int(res
[0]["domainFunctionality"][0]),
1208 dsdb
.DS_DOMAIN_FUNCTION_2016
)
1209 self
.assertEqual(functional_level
.dc_level_from_lp(self
.lp
),
1210 dsdb
.DS_DOMAIN_FUNCTION_2016
)
1211 dsdb
.check_and_update_fl(self
.samdb
, self
.lp
)
1213 def test_lower_smb_conf_fl(self
):
1214 old_lp_fl
= self
.lp
.get("ad dc functional level")
1215 self
.lp
.set("ad dc functional level",
1217 self
.addCleanup(self
.lp
.set, "ad dc functional level", old_lp_fl
)
1219 dsdb
.check_and_update_fl(self
.samdb
, self
.lp
)
1220 self
.fail("Should have failed to start DC with 2008 R2 FL in 2016 domain")
1221 except ldb
.LdbError
as err
:
1222 (errno
, estr
) = err
.args
1223 self
.assertEqual(errno
, ldb
.ERR_CONSTRAINT_VIOLATION
)