ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / dsdb.py
blob3d5e74846297b612b0b8ad52a6f09bb1bac5cdf4
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.tests import delete_force
25 from samba.ndr import ndr_unpack, ndr_pack
26 from samba.dcerpc import drsblobs, security, misc
27 from samba.param import LoadParm
28 from samba import dsdb, functional_level
29 from samba import werror
30 import ldb
31 import samba
32 import uuid
35 class DsdbAccountTests(TestCase):
37 def setUp(self):
38 super().setUp()
39 self.lp = samba.tests.env_loadparm()
40 self.creds = Credentials()
41 self.creds.guess(self.lp)
42 self.session = system_session()
43 self.samdb = SamDB(session_info=self.session,
44 credentials=self.creds,
45 lp=self.lp)
47 # Create a test user
48 user_name = "dsdb-user-" + str(uuid.uuid4().hex[0:6])
49 user_pass = samba.generate_random_password(32, 32)
50 user_description = "Test user for dsdb test"
52 base_dn = self.samdb.domain_dn()
54 self.account_dn = "CN=" + user_name + ",CN=Users," + base_dn
55 self.samdb.newuser(username=user_name,
56 password=user_pass,
57 description=user_description)
58 # Cleanup (teardown)
59 self.addCleanup(delete_force, self.samdb, self.account_dn)
61 # Get server reference DN
62 res = self.samdb.search(base=ldb.Dn(self.samdb,
63 self.samdb.get_serverName()),
64 scope=ldb.SCOPE_BASE,
65 attrs=["serverReference"])
66 # Get server reference
67 self.server_ref_dn = ldb.Dn(
68 self.samdb, res[0]["serverReference"][0].decode("utf-8"))
70 # Get RID Set DN
71 res = self.samdb.search(base=self.server_ref_dn,
72 scope=ldb.SCOPE_BASE,
73 attrs=["rIDSetReferences"])
74 rid_set_refs = res[0]
75 self.assertIn("rIDSetReferences", rid_set_refs)
76 rid_set_str = rid_set_refs["rIDSetReferences"][0].decode("utf-8")
77 self.rid_set_dn = ldb.Dn(self.samdb, rid_set_str)
79 def get_rid_set(self, rid_set_dn):
80 res = self.samdb.search(base=rid_set_dn,
81 scope=ldb.SCOPE_BASE,
82 attrs=["rIDAllocationPool",
83 "rIDPreviousAllocationPool",
84 "rIDUsedPool",
85 "rIDNextRID"])
86 return res[0]
88 def test_ridalloc_next_free_rid(self):
89 # Test RID allocation. We assume that RID
90 # pools allocated to us are contiguous.
91 self.samdb.transaction_start()
92 try:
93 orig_rid_set = self.get_rid_set(self.rid_set_dn)
94 self.assertIn("rIDAllocationPool", orig_rid_set)
95 self.assertIn("rIDPreviousAllocationPool", orig_rid_set)
96 self.assertIn("rIDUsedPool", orig_rid_set)
97 self.assertIn("rIDNextRID", orig_rid_set)
99 # Get rIDNextRID value from RID set.
100 next_rid = int(orig_rid_set["rIDNextRID"][0])
102 # Check the result of next_free_rid().
103 next_free_rid = self.samdb.next_free_rid()
104 self.assertEqual(next_rid + 1, next_free_rid)
106 # Check calling it twice in succession gives the same result.
107 next_free_rid2 = self.samdb.next_free_rid()
108 self.assertEqual(next_free_rid, next_free_rid2)
110 # Ensure that the RID set attributes have not changed.
111 rid_set2 = self.get_rid_set(self.rid_set_dn)
112 self.assertEqual(orig_rid_set, rid_set2)
113 finally:
114 self.samdb.transaction_cancel()
116 def test_ridalloc_no_ridnextrid(self):
117 self.samdb.transaction_start()
118 try:
119 # Delete the rIDNextRID attribute of the RID set,
120 # and set up previous and next pools.
121 prev_lo = 1000
122 prev_hi = 1999
123 next_lo = 3000
124 next_hi = 3999
125 msg = ldb.Message()
126 msg.dn = self.rid_set_dn
127 msg["rIDNextRID"] = ldb.MessageElement([],
128 ldb.FLAG_MOD_DELETE,
129 "rIDNextRID")
130 msg["rIDPreviousAllocationPool"] = (
131 ldb.MessageElement(str((prev_hi << 32) | prev_lo),
132 ldb.FLAG_MOD_REPLACE,
133 "rIDPreviousAllocationPool"))
134 msg["rIDAllocationPool"] = (
135 ldb.MessageElement(str((next_hi << 32) | next_lo),
136 ldb.FLAG_MOD_REPLACE,
137 "rIDAllocationPool"))
138 self.samdb.modify(msg)
140 # Ensure that next_free_rid() returns the start of the next pool.
141 next_free_rid3 = self.samdb.next_free_rid()
142 self.assertEqual(next_lo, next_free_rid3)
144 # Check the result of allocate_rid() matches.
145 rid = self.samdb.allocate_rid()
146 self.assertEqual(next_free_rid3, rid)
148 # Check that the result of next_free_rid() has now changed.
149 next_free_rid4 = self.samdb.next_free_rid()
150 self.assertEqual(rid + 1, next_free_rid4)
152 # Check the range of available RIDs.
153 free_lo, free_hi = self.samdb.free_rid_bounds()
154 self.assertEqual(rid + 1, free_lo)
155 self.assertEqual(next_hi, free_hi)
156 finally:
157 self.samdb.transaction_cancel()
159 def test_ridalloc_no_free_rids(self):
160 self.samdb.transaction_start()
161 try:
162 # Exhaust our current pool of RIDs.
163 pool_lo = 2000
164 pool_hi = 2999
165 msg = ldb.Message()
166 msg.dn = self.rid_set_dn
167 msg["rIDPreviousAllocationPool"] = (
168 ldb.MessageElement(str((pool_hi << 32) | pool_lo),
169 ldb.FLAG_MOD_REPLACE,
170 "rIDPreviousAllocationPool"))
171 msg["rIDAllocationPool"] = (
172 ldb.MessageElement(str((pool_hi << 32) | pool_lo),
173 ldb.FLAG_MOD_REPLACE,
174 "rIDAllocationPool"))
175 msg["rIDNextRID"] = (
176 ldb.MessageElement(str(pool_hi),
177 ldb.FLAG_MOD_REPLACE,
178 "rIDNextRID"))
179 self.samdb.modify(msg)
181 # Ensure that calculating the next free RID fails.
182 with self.assertRaises(ldb.LdbError) as err:
183 self.samdb.next_free_rid()
185 self.assertEqual("RID pools out of RIDs", err.exception.args[1])
187 # Ensure we can still allocate a new RID.
188 self.samdb.allocate_rid()
189 finally:
190 self.samdb.transaction_cancel()
192 def test_ridalloc_new_ridset(self):
193 self.samdb.transaction_start()
194 try:
195 # Test what happens with RID Set values set to zero (similar to
196 # when a RID Set is first created, except we also set
197 # rIDAllocationPool to zero).
198 msg = ldb.Message()
199 msg.dn = self.rid_set_dn
200 msg["rIDPreviousAllocationPool"] = (
201 ldb.MessageElement("0",
202 ldb.FLAG_MOD_REPLACE,
203 "rIDPreviousAllocationPool"))
204 msg["rIDAllocationPool"] = (
205 ldb.MessageElement("0",
206 ldb.FLAG_MOD_REPLACE,
207 "rIDAllocationPool"))
208 msg["rIDNextRID"] = (
209 ldb.MessageElement("0",
210 ldb.FLAG_MOD_REPLACE,
211 "rIDNextRID"))
212 self.samdb.modify(msg)
214 # Ensure that calculating the next free RID fails.
215 with self.assertRaises(ldb.LdbError) as err:
216 self.samdb.next_free_rid()
218 self.assertEqual("RID pools out of RIDs", err.exception.args[1])
220 # Set values for the next pool.
221 pool_lo = 2000
222 pool_hi = 2999
223 msg = ldb.Message()
224 msg.dn = self.rid_set_dn
225 msg["rIDAllocationPool"] = (
226 ldb.MessageElement(str((pool_hi << 32) | pool_lo),
227 ldb.FLAG_MOD_REPLACE,
228 "rIDAllocationPool"))
229 self.samdb.modify(msg)
231 # Ensure the next free RID value is equal to the next pool's lower
232 # bound.
233 next_free_rid5 = self.samdb.next_free_rid()
234 self.assertEqual(pool_lo, next_free_rid5)
236 # Check the range of available RIDs.
237 free_lo, free_hi = self.samdb.free_rid_bounds()
238 self.assertEqual(pool_lo, free_lo)
239 self.assertEqual(pool_hi, free_hi)
240 finally:
241 self.samdb.transaction_cancel()
243 def test_ridalloc_move_to_new_pool(self):
244 self.samdb.transaction_start()
245 try:
246 # Test moving to a new pool from the previous pool.
247 pool_lo = 2000
248 pool_hi = 2999
249 new_pool_lo = 4500
250 new_pool_hi = 4599
251 msg = ldb.Message()
252 msg.dn = self.rid_set_dn
253 msg["rIDPreviousAllocationPool"] = (
254 ldb.MessageElement(str((pool_hi << 32) | pool_lo),
255 ldb.FLAG_MOD_REPLACE,
256 "rIDPreviousAllocationPool"))
257 msg["rIDAllocationPool"] = (
258 ldb.MessageElement(str((new_pool_hi << 32) | new_pool_lo),
259 ldb.FLAG_MOD_REPLACE,
260 "rIDAllocationPool"))
261 msg["rIDNextRID"] = (
262 ldb.MessageElement(str(pool_hi - 1),
263 ldb.FLAG_MOD_REPLACE,
264 "rIDNextRID"))
265 self.samdb.modify(msg)
267 # We should have remained in the previous pool.
268 next_free_rid6 = self.samdb.next_free_rid()
269 self.assertEqual(pool_hi, next_free_rid6)
271 # Check the range of available RIDs.
272 free_lo, free_hi = self.samdb.free_rid_bounds()
273 self.assertEqual(pool_hi, free_lo)
274 self.assertEqual(pool_hi, free_hi)
276 # Allocate a new RID.
277 rid2 = self.samdb.allocate_rid()
278 self.assertEqual(next_free_rid6, rid2)
280 # We should now move to the next pool.
281 next_free_rid7 = self.samdb.next_free_rid()
282 self.assertEqual(new_pool_lo, next_free_rid7)
284 # Check the new range of available RIDs.
285 free_lo2, free_hi2 = self.samdb.free_rid_bounds()
286 self.assertEqual(new_pool_lo, free_lo2)
287 self.assertEqual(new_pool_hi, free_hi2)
289 # Ensure that allocate_rid() matches.
290 rid3 = self.samdb.allocate_rid()
291 self.assertEqual(next_free_rid7, rid3)
292 finally:
293 self.samdb.transaction_cancel()
295 def test_ridalloc_no_ridsetreferences(self):
296 self.samdb.transaction_start()
297 try:
298 # Delete the rIDSetReferences attribute.
299 msg = ldb.Message()
300 msg.dn = self.server_ref_dn
301 msg["rIDSetReferences"] = (
302 ldb.MessageElement([],
303 ldb.FLAG_MOD_DELETE,
304 "rIDSetReferences"))
305 self.samdb.modify(msg)
307 # Ensure calculating the next free RID fails.
308 with self.assertRaises(ldb.LdbError) as err:
309 self.samdb.next_free_rid()
311 enum, estr = err.exception.args
312 self.assertEqual(ldb.ERR_NO_SUCH_ATTRIBUTE, enum)
313 self.assertIn("No RID Set DN - "
314 "Cannot find attribute rIDSetReferences of %s "
315 "to calculate reference dn" % self.server_ref_dn,
316 estr)
318 # Ensure allocating a new RID fails.
319 with self.assertRaises(ldb.LdbError) as err:
320 self.samdb.allocate_rid()
322 enum, estr = err.exception.args
323 self.assertEqual(ldb.ERR_ENTRY_ALREADY_EXISTS, enum)
324 self.assertIn("No RID Set DN - "
325 "Failed to add RID Set %s - "
326 "Entry %s already exists" %
327 (self.rid_set_dn, self.rid_set_dn),
328 estr)
329 finally:
330 self.samdb.transaction_cancel()
332 def test_ridalloc_no_rid_set(self):
333 self.samdb.transaction_start()
334 try:
335 # Set the rIDSetReferences attribute to not point to a RID Set.
336 fake_rid_set_str = self.account_dn
337 msg = ldb.Message()
338 msg.dn = self.server_ref_dn
339 msg["rIDSetReferences"] = (
340 ldb.MessageElement(fake_rid_set_str,
341 ldb.FLAG_MOD_REPLACE,
342 "rIDSetReferences"))
343 self.samdb.modify(msg)
345 # Ensure calculating the next free RID fails.
346 with self.assertRaises(ldb.LdbError) as err:
347 self.samdb.next_free_rid()
349 enum, estr = err.exception.args
350 self.assertEqual(ldb.ERR_OPERATIONS_ERROR, enum)
351 self.assertIn("Bad RID Set " + fake_rid_set_str, estr)
353 # Ensure allocating a new RID fails.
354 with self.assertRaises(ldb.LdbError) as err:
355 self.samdb.allocate_rid()
357 enum, estr = err.exception.args
358 self.assertEqual(ldb.ERR_OPERATIONS_ERROR, enum)
359 self.assertIn("Bad RID Set " + fake_rid_set_str, estr)
360 finally:
361 self.samdb.transaction_cancel()
363 def test_error_replpropertymetadata(self):
364 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
365 base=self.account_dn,
366 attrs=["replPropertyMetaData"])
367 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
368 res[0]["replPropertyMetaData"][0])
369 ctr = repl.ctr
370 for o in ctr.array:
371 # Search for Description
372 if o.attid == 13:
373 old_version = o.version
374 o.version = o.version + 1
375 replBlob = ndr_pack(repl)
376 msg = ldb.Message()
377 msg.dn = res[0].dn
378 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
379 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
381 def test_error_replpropertymetadata_nochange(self):
382 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
383 base=self.account_dn,
384 attrs=["replPropertyMetaData"])
385 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
386 res[0]["replPropertyMetaData"][0])
387 replBlob = ndr_pack(repl)
388 msg = ldb.Message()
389 msg.dn = res[0].dn
390 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
391 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
393 def test_error_replpropertymetadata_allow_sort(self):
394 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
395 base=self.account_dn,
396 attrs=["replPropertyMetaData"])
397 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
398 res[0]["replPropertyMetaData"][0])
399 replBlob = ndr_pack(repl)
400 msg = ldb.Message()
401 msg.dn = res[0].dn
402 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
403 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
405 def test_twoatt_replpropertymetadata(self):
406 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
407 base=self.account_dn,
408 attrs=["replPropertyMetaData", "uSNChanged"])
409 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
410 res[0]["replPropertyMetaData"][0])
411 ctr = repl.ctr
412 for o in ctr.array:
413 # Search for Description
414 if o.attid == 13:
415 old_version = o.version
416 o.version = o.version + 1
417 o.local_usn = int(str(res[0]["uSNChanged"])) + 1
418 replBlob = ndr_pack(repl)
419 msg = ldb.Message()
420 msg.dn = res[0].dn
421 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
422 msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
423 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
425 def test_set_replpropertymetadata(self):
426 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
427 base=self.account_dn,
428 attrs=["replPropertyMetaData", "uSNChanged"])
429 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
430 res[0]["replPropertyMetaData"][0])
431 ctr = repl.ctr
432 for o in ctr.array:
433 # Search for Description
434 if o.attid == 13:
435 old_version = o.version
436 o.version = o.version + 1
437 o.local_usn = int(str(res[0]["uSNChanged"])) + 1
438 o.originating_usn = int(str(res[0]["uSNChanged"])) + 1
439 replBlob = ndr_pack(repl)
440 msg = ldb.Message()
441 msg.dn = res[0].dn
442 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
443 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
445 def test_get_attribute_replmetadata_version(self):
446 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
447 base=self.account_dn,
448 attrs=["dn"])
449 self.assertEqual(len(res), 1)
450 dn = str(res[0].dn)
451 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2)
453 def test_set_attribute_replmetadata_version(self):
454 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
455 base=self.account_dn,
456 attrs=["dn"])
457 self.assertEqual(len(res), 1)
458 dn = str(res[0].dn)
459 version = self.samdb.get_attribute_replmetadata_version(dn, "description")
460 self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
461 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
463 def test_no_error_on_invalid_control(self):
464 try:
465 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
466 base=self.account_dn,
467 attrs=["replPropertyMetaData"],
468 controls=["local_oid:%s:0"
469 % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
470 except ldb.LdbError as e:
471 self.fail("Should have not raised an exception")
473 def test_error_on_invalid_critical_control(self):
474 try:
475 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
476 base=self.account_dn,
477 attrs=["replPropertyMetaData"],
478 controls=["local_oid:%s:1"
479 % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
480 except ldb.LdbError as e:
481 (errno, estr) = e.args
482 if errno != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION:
483 self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
484 % e[1])
486 class DsdbTests(TestCase):
487 def setUp(self):
488 super().setUp()
489 self.lp = samba.tests.env_loadparm()
490 self.creds = Credentials()
491 self.creds.guess(self.lp)
492 self.session = system_session()
493 self.samdb = SamDB(session_info=self.session,
494 credentials=self.creds,
495 lp=self.lp)
497 # Allocate a unique RID for use in the objectSID tests.
499 def allocate_rid(self):
500 self.samdb.transaction_start()
501 try:
502 rid = self.samdb.allocate_rid()
503 except:
504 self.samdb.transaction_cancel()
505 raise
506 self.samdb.transaction_commit()
507 return str(rid)
509 def test_get_oid_from_attrid(self):
510 oid = self.samdb.get_oid_from_attid(591614)
511 self.assertEqual(oid, "1.2.840.113556.1.4.1790")
513 def test_ok_get_attribute_from_attid(self):
514 self.assertEqual(self.samdb.get_attribute_from_attid(13), "description")
516 def test_ko_get_attribute_from_attid(self):
517 self.assertEqual(self.samdb.get_attribute_from_attid(11979), None)
519 # Ensure that duplicate objectSID's are permitted for foreign security
520 # principals.
522 def test_duplicate_objectSIDs_allowed_on_foreign_security_principals(self):
525 # We need to build a foreign security principal SID
526 # i.e a SID not in the current domain.
528 dom_sid = self.samdb.get_domain_sid()
529 if str(dom_sid).endswith("0"):
530 c = "9"
531 else:
532 c = "0"
533 sid_str = str(dom_sid)[:-1] + c + "-1000"
534 sid = ndr_pack(security.dom_sid(sid_str))
535 basedn = self.samdb.get_default_basedn()
536 dn = "CN=%s,CN=ForeignSecurityPrincipals,%s" % (sid_str, basedn)
539 # First without control
542 try:
543 self.samdb.add({
544 "dn": dn,
545 "objectClass": "foreignSecurityPrincipal"})
546 self.fail("No exception should get ERR_OBJECT_CLASS_VIOLATION")
547 except ldb.LdbError as e:
548 (code, msg) = e.args
549 self.assertEqual(code, ldb.ERR_OBJECT_CLASS_VIOLATION, str(e))
550 werr = "%08X" % werror.WERR_DS_MISSING_REQUIRED_ATT
551 self.assertTrue(werr in msg, msg)
553 try:
554 self.samdb.add({
555 "dn": dn,
556 "objectClass": "foreignSecurityPrincipal",
557 "objectSid": sid})
558 self.fail("No exception should get ERR_UNWILLING_TO_PERFORM")
559 except ldb.LdbError as e:
560 (code, msg) = e.args
561 self.assertEqual(code, ldb.ERR_UNWILLING_TO_PERFORM, str(e))
562 werr = "%08X" % werror.WERR_DS_ILLEGAL_MOD_OPERATION
563 self.assertTrue(werr in msg, msg)
566 # We need to use the provision control
567 # in order to add foreignSecurityPrincipal
568 # objects
571 controls = ["provision:0"]
572 self.samdb.add({
573 "dn": dn,
574 "objectClass": "foreignSecurityPrincipal"},
575 controls=controls)
577 self.samdb.delete(dn)
579 try:
580 self.samdb.add({
581 "dn": dn,
582 "objectClass": "foreignSecurityPrincipal"},
583 controls=controls)
584 except ldb.LdbError as e:
585 (code, msg) = e.args
586 self.fail("Got unexpected exception %d - %s "
587 % (code, msg))
589 # cleanup
590 self.samdb.delete(dn)
592 def _test_foreignSecurityPrincipal(self, obj_class, fpo_attr):
594 dom_sid = self.samdb.get_domain_sid()
595 lsid_str = str(dom_sid) + "-4294967294"
596 bsid_str = "S-1-5-32-4294967294"
597 fsid_str = "S-1-5-4294967294"
598 basedn = self.samdb.get_default_basedn()
599 cn = "dsdb_test_fpo"
600 dn_str = "cn=%s,cn=Users,%s" % (cn, basedn)
601 dn = ldb.Dn(self.samdb, dn_str)
603 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
604 base=basedn,
605 expression="(objectSid=%s)" % lsid_str,
606 attrs=[])
607 self.assertEqual(len(res), 0)
608 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
609 base=basedn,
610 expression="(objectSid=%s)" % bsid_str,
611 attrs=[])
612 self.assertEqual(len(res), 0)
613 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
614 base=basedn,
615 expression="(objectSid=%s)" % fsid_str,
616 attrs=[])
617 self.assertEqual(len(res), 0)
619 self.addCleanup(delete_force, self.samdb, dn_str)
621 self.samdb.add({
622 "dn": dn_str,
623 "objectClass": obj_class})
625 msg = ldb.Message()
626 msg.dn = dn
627 msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % lsid_str,
628 ldb.FLAG_MOD_ADD,
629 fpo_attr)
630 try:
631 self.samdb.modify(msg)
632 self.fail("No exception should get LDB_ERR_UNWILLING_TO_PERFORM")
633 except ldb.LdbError as e:
634 (code, msg) = e.args
635 self.assertEqual(code, ldb.ERR_UNWILLING_TO_PERFORM, str(e))
636 werr = "%08X" % werror.WERR_DS_INVALID_GROUP_TYPE
637 self.assertTrue(werr in msg, msg)
639 msg = ldb.Message()
640 msg.dn = dn
641 msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % bsid_str,
642 ldb.FLAG_MOD_ADD,
643 fpo_attr)
644 try:
645 self.samdb.modify(msg)
646 self.fail("No exception should get LDB_ERR_NO_SUCH_OBJECT")
647 except ldb.LdbError as e:
648 (code, msg) = e.args
649 self.assertEqual(code, ldb.ERR_NO_SUCH_OBJECT, str(e))
650 werr = "%08X" % werror.WERR_NO_SUCH_MEMBER
651 self.assertTrue(werr in msg, msg)
653 msg = ldb.Message()
654 msg.dn = dn
655 msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % fsid_str,
656 ldb.FLAG_MOD_ADD,
657 fpo_attr)
658 try:
659 self.samdb.modify(msg)
660 except ldb.LdbError as e:
661 self.fail("Should have not raised an exception")
663 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
664 base=basedn,
665 expression="(objectSid=%s)" % fsid_str,
666 attrs=[])
667 self.assertEqual(len(res), 1)
668 self.samdb.delete(res[0].dn)
669 self.samdb.delete(dn)
670 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
671 base=basedn,
672 expression="(objectSid=%s)" % fsid_str,
673 attrs=[])
674 self.assertEqual(len(res), 0)
676 def test_foreignSecurityPrincipal_member(self):
677 return self._test_foreignSecurityPrincipal(
678 "group", "member")
680 def test_foreignSecurityPrincipal_MembersForAzRole(self):
681 return self._test_foreignSecurityPrincipal(
682 "msDS-AzRole", "msDS-MembersForAzRole")
684 def test_foreignSecurityPrincipal_NeverRevealGroup(self):
685 return self._test_foreignSecurityPrincipal(
686 "computer", "msDS-NeverRevealGroup")
688 def test_foreignSecurityPrincipal_RevealOnDemandGroup(self):
689 return self._test_foreignSecurityPrincipal(
690 "computer", "msDS-RevealOnDemandGroup")
692 def _test_fail_foreignSecurityPrincipal(self, obj_class, fpo_attr,
693 msg_exp, lerr_exp, werr_exp,
694 allow_reference=True):
696 dom_sid = self.samdb.get_domain_sid()
697 lsid_str = str(dom_sid) + "-4294967294"
698 bsid_str = "S-1-5-32-4294967294"
699 fsid_str = "S-1-5-4294967294"
700 basedn = self.samdb.get_default_basedn()
701 cn1 = "dsdb_test_fpo1"
702 dn1_str = "cn=%s,cn=Users,%s" % (cn1, basedn)
703 dn1 = ldb.Dn(self.samdb, dn1_str)
704 cn2 = "dsdb_test_fpo2"
705 dn2_str = "cn=%s,cn=Users,%s" % (cn2, basedn)
706 dn2 = ldb.Dn(self.samdb, dn2_str)
708 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
709 base=basedn,
710 expression="(objectSid=%s)" % lsid_str,
711 attrs=[])
712 self.assertEqual(len(res), 0)
713 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
714 base=basedn,
715 expression="(objectSid=%s)" % bsid_str,
716 attrs=[])
717 self.assertEqual(len(res), 0)
718 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
719 base=basedn,
720 expression="(objectSid=%s)" % fsid_str,
721 attrs=[])
722 self.assertEqual(len(res), 0)
724 self.addCleanup(delete_force, self.samdb, dn1_str)
725 self.addCleanup(delete_force, self.samdb, dn2_str)
727 self.samdb.add({
728 "dn": dn1_str,
729 "objectClass": obj_class})
731 self.samdb.add({
732 "dn": dn2_str,
733 "objectClass": obj_class})
735 msg = ldb.Message()
736 msg.dn = dn1
737 msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % lsid_str,
738 ldb.FLAG_MOD_ADD,
739 fpo_attr)
740 try:
741 self.samdb.modify(msg)
742 self.fail("No exception should get %s" % msg_exp)
743 except ldb.LdbError as e:
744 (code, msg) = e.args
745 self.assertEqual(code, lerr_exp, str(e))
746 werr = "%08X" % werr_exp
747 self.assertTrue(werr in msg, msg)
749 msg = ldb.Message()
750 msg.dn = dn1
751 msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % bsid_str,
752 ldb.FLAG_MOD_ADD,
753 fpo_attr)
754 try:
755 self.samdb.modify(msg)
756 self.fail("No exception should get %s" % msg_exp)
757 except ldb.LdbError as e:
758 (code, msg) = e.args
759 self.assertEqual(code, lerr_exp, str(e))
760 werr = "%08X" % werr_exp
761 self.assertTrue(werr in msg, msg)
763 msg = ldb.Message()
764 msg.dn = dn1
765 msg[fpo_attr] = ldb.MessageElement("<SID=%s>" % fsid_str,
766 ldb.FLAG_MOD_ADD,
767 fpo_attr)
768 try:
769 self.samdb.modify(msg)
770 self.fail("No exception should get %s" % msg)
771 except ldb.LdbError as e:
772 (code, msg) = e.args
773 self.assertEqual(code, lerr_exp, str(e))
774 werr = "%08X" % werr_exp
775 self.assertTrue(werr in msg, msg)
777 msg = ldb.Message()
778 msg.dn = dn1
779 msg[fpo_attr] = ldb.MessageElement("%s" % dn2,
780 ldb.FLAG_MOD_ADD,
781 fpo_attr)
782 try:
783 self.samdb.modify(msg)
784 if not allow_reference:
785 self.fail("No exception should get %s" % msg_exp)
786 except ldb.LdbError as e:
787 if allow_reference:
788 self.fail("Should have not raised an exception: %s" % e)
789 (code, msg) = e.args
790 self.assertEqual(code, lerr_exp, str(e))
791 werr = "%08X" % werr_exp
792 self.assertTrue(werr in msg, msg)
794 self.samdb.delete(dn2)
795 self.samdb.delete(dn1)
797 def test_foreignSecurityPrincipal_NonMembers(self):
798 return self._test_fail_foreignSecurityPrincipal(
799 "group", "msDS-NonMembers",
800 "LDB_ERR_UNWILLING_TO_PERFORM/WERR_NOT_SUPPORTED",
801 ldb.ERR_UNWILLING_TO_PERFORM, werror.WERR_NOT_SUPPORTED,
802 allow_reference=False)
804 def test_foreignSecurityPrincipal_HostServiceAccount(self):
805 return self._test_fail_foreignSecurityPrincipal(
806 "computer", "msDS-HostServiceAccount",
807 "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID",
808 ldb.ERR_CONSTRAINT_VIOLATION,
809 werror.WERR_DS_NAME_REFERENCE_INVALID)
811 def test_foreignSecurityPrincipal_manager(self):
812 return self._test_fail_foreignSecurityPrincipal(
813 "user", "manager",
814 "LDB_ERR_CONSTRAINT_VIOLATION/WERR_DS_NAME_REFERENCE_INVALID",
815 ldb.ERR_CONSTRAINT_VIOLATION,
816 werror.WERR_DS_NAME_REFERENCE_INVALID)
819 # Duplicate objectSID's should not be permitted for sids in the local
820 # domain. The test sequence is add an object, delete it, then attempt to
821 # re-add it, this should fail with a constraint violation
823 def test_duplicate_objectSIDs_not_allowed_on_local_objects(self):
825 dom_sid = self.samdb.get_domain_sid()
826 rid = self.allocate_rid()
827 sid_str = str(dom_sid) + "-" + rid
828 sid = ndr_pack(security.dom_sid(sid_str))
829 basedn = self.samdb.get_default_basedn()
830 cn = "dsdb_test_01"
831 dn = "cn=%s,cn=Users,%s" % (cn, basedn)
833 self.samdb.add({
834 "dn": dn,
835 "objectClass": "user",
836 "objectSID": sid})
837 self.samdb.delete(dn)
839 try:
840 self.samdb.add({
841 "dn": dn,
842 "objectClass": "user",
843 "objectSID": sid})
844 self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
845 except ldb.LdbError as e:
846 (code, msg) = e.args
847 if code != ldb.ERR_CONSTRAINT_VIOLATION:
848 self.fail("Got %d - %s should have got "
849 "LDB_ERR_CONSTRAINT_VIOLATION"
850 % (code, msg))
852 def test_linked_vs_non_linked_reference(self):
853 basedn = self.samdb.get_default_basedn()
854 kept_dn_str = "cn=reference_kept,cn=Users,%s" % (basedn)
855 removed_dn_str = "cn=reference_removed,cn=Users,%s" % (basedn)
856 dom_sid = self.samdb.get_domain_sid()
857 none_sid_str = str(dom_sid) + "-4294967294"
858 none_guid_str = "afafafaf-fafa-afaf-fafa-afafafafafaf"
860 self.addCleanup(delete_force, self.samdb, kept_dn_str)
861 self.addCleanup(delete_force, self.samdb, removed_dn_str)
863 self.samdb.add({
864 "dn": kept_dn_str,
865 "objectClass": "user"})
866 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
867 base=kept_dn_str,
868 attrs=["objectGUID", "objectSID"])
869 self.assertEqual(len(res), 1)
870 kept_guid = ndr_unpack(misc.GUID, res[0]["objectGUID"][0])
871 kept_sid = ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
872 kept_dn = res[0].dn
874 self.samdb.add({
875 "dn": removed_dn_str,
876 "objectClass": "user"})
877 res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
878 base=removed_dn_str,
879 attrs=["objectGUID", "objectSID"])
880 self.assertEqual(len(res), 1)
881 removed_guid = ndr_unpack(misc.GUID, res[0]["objectGUID"][0])
882 removed_sid = ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
883 self.samdb.delete(removed_dn_str)
886 # First try the linked attribute 'manager'
887 # by GUID and SID
890 msg = ldb.Message()
891 msg.dn = kept_dn
892 msg["manager"] = ldb.MessageElement("<SID=%s>" % removed_sid,
893 ldb.FLAG_MOD_ADD,
894 "manager")
895 try:
896 self.samdb.modify(msg)
897 self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
898 except ldb.LdbError as e:
899 (code, msg) = e.args
900 self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
901 werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
902 self.assertTrue(werr in msg, msg)
904 msg = ldb.Message()
905 msg.dn = kept_dn
906 msg["manager"] = ldb.MessageElement("<GUID=%s>" % removed_guid,
907 ldb.FLAG_MOD_ADD,
908 "manager")
909 try:
910 self.samdb.modify(msg)
911 self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
912 except ldb.LdbError as e:
913 (code, msg) = e.args
914 self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
915 werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
916 self.assertTrue(werr in msg, msg)
919 # Try the non-linked attribute 'assistant'
920 # by GUID and SID, which should work.
922 msg = ldb.Message()
923 msg.dn = kept_dn
924 msg["assistant"] = ldb.MessageElement("<SID=%s>" % removed_sid,
925 ldb.FLAG_MOD_ADD,
926 "assistant")
927 self.samdb.modify(msg)
928 msg = ldb.Message()
929 msg.dn = kept_dn
930 msg["assistant"] = ldb.MessageElement("<SID=%s>" % removed_sid,
931 ldb.FLAG_MOD_DELETE,
932 "assistant")
933 self.samdb.modify(msg)
935 msg = ldb.Message()
936 msg.dn = kept_dn
937 msg["assistant"] = ldb.MessageElement("<GUID=%s>" % removed_guid,
938 ldb.FLAG_MOD_ADD,
939 "assistant")
940 self.samdb.modify(msg)
941 msg = ldb.Message()
942 msg.dn = kept_dn
943 msg["assistant"] = ldb.MessageElement("<GUID=%s>" % removed_guid,
944 ldb.FLAG_MOD_DELETE,
945 "assistant")
946 self.samdb.modify(msg)
949 # Finally try the non-linked attribute 'assistant'
950 # but with non existing GUID, SID, DN
952 msg = ldb.Message()
953 msg.dn = kept_dn
954 msg["assistant"] = ldb.MessageElement("CN=NoneNone,%s" % (basedn),
955 ldb.FLAG_MOD_ADD,
956 "assistant")
957 try:
958 self.samdb.modify(msg)
959 self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
960 except ldb.LdbError as e:
961 (code, msg) = e.args
962 self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
963 werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
964 self.assertTrue(werr in msg, msg)
966 msg = ldb.Message()
967 msg.dn = kept_dn
968 msg["assistant"] = ldb.MessageElement("<SID=%s>" % none_sid_str,
969 ldb.FLAG_MOD_ADD,
970 "assistant")
971 try:
972 self.samdb.modify(msg)
973 self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
974 except ldb.LdbError as e:
975 (code, msg) = e.args
976 self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
977 werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
978 self.assertTrue(werr in msg, msg)
980 msg = ldb.Message()
981 msg.dn = kept_dn
982 msg["assistant"] = ldb.MessageElement("<GUID=%s>" % none_guid_str,
983 ldb.FLAG_MOD_ADD,
984 "assistant")
985 try:
986 self.samdb.modify(msg)
987 self.fail("No exception should get LDB_ERR_CONSTRAINT_VIOLATION")
988 except ldb.LdbError as e:
989 (code, msg) = e.args
990 self.assertEqual(code, ldb.ERR_CONSTRAINT_VIOLATION, str(e))
991 werr = "%08X" % werror.WERR_DS_NAME_REFERENCE_INVALID
992 self.assertTrue(werr in msg, msg)
994 self.samdb.delete(kept_dn)
996 def test_normalize_dn_in_domain_full(self):
997 domain_dn = self.samdb.domain_dn()
999 part_dn = ldb.Dn(self.samdb, "CN=Users")
1001 full_dn = part_dn
1002 full_dn.add_base(domain_dn)
1004 full_str = str(full_dn)
1006 # That is, no change
1007 self.assertEqual(full_dn,
1008 self.samdb.normalize_dn_in_domain(full_str))
1010 def test_normalize_dn_in_domain_part(self):
1011 domain_dn = self.samdb.domain_dn()
1013 part_str = "CN=Users"
1015 full_dn = ldb.Dn(self.samdb, part_str)
1016 full_dn.add_base(domain_dn)
1018 # That is, the domain DN appended
1019 self.assertEqual(full_dn,
1020 self.samdb.normalize_dn_in_domain(part_str))
1022 def test_normalize_dn_in_domain_full_dn(self):
1023 domain_dn = self.samdb.domain_dn()
1025 part_dn = ldb.Dn(self.samdb, "CN=Users")
1027 full_dn = part_dn
1028 full_dn.add_base(domain_dn)
1030 # That is, no change
1031 self.assertEqual(full_dn,
1032 self.samdb.normalize_dn_in_domain(full_dn))
1034 def test_normalize_dn_in_domain_part_dn(self):
1035 domain_dn = self.samdb.domain_dn()
1037 part_dn = ldb.Dn(self.samdb, "CN=Users")
1039 # That is, the domain DN appended
1040 self.assertEqual(ldb.Dn(self.samdb,
1041 str(part_dn) + "," + str(domain_dn)),
1042 self.samdb.normalize_dn_in_domain(part_dn))
1044 class DsdbNCRootTests(TestCase):
1046 def setUp(self):
1047 super().setUp()
1048 self.lp = samba.tests.env_loadparm()
1049 self.creds = Credentials()
1050 self.creds.guess(self.lp)
1051 self.session = system_session()
1052 self.samdb = SamDB(session_info=self.session,
1053 credentials=self.creds,
1054 lp=self.lp)
1055 self.remote = False
1057 # These all use the local mode of operation inside
1058 # dsdb_find_nc_root() using the partitions control
1059 def test_dsdb_dn_nc_root_sid(self):
1060 dom_sid = self.samdb.get_domain_sid()
1061 domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn())
1062 dn = ldb.Dn(self.samdb, f"<SID={dom_sid}>")
1063 try:
1064 nc_root = self.samdb.get_nc_root(dn)
1065 except ldb.LdbError as e:
1066 (code, msg) = e.args
1067 self.fail("Got unexpected exception %d - %s "
1068 % (code, msg))
1069 self.assertEqual(domain_dn, nc_root)
1071 def test_dsdb_dn_nc_root_admin_sid(self):
1072 dom_sid = self.samdb.get_domain_sid()
1073 domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn())
1074 dn = ldb.Dn(self.samdb, f"<SID={dom_sid}-500>")
1075 try:
1076 nc_root = self.samdb.get_nc_root(dn)
1077 except ldb.LdbError as e:
1078 (code, msg) = e.args
1079 self.fail("Got unexpected exception %d - %s "
1080 % (code, msg))
1081 self.assertEqual(domain_dn, nc_root)
1083 def test_dsdb_dn_nc_root_users_container(self):
1084 dom_sid = self.samdb.get_domain_sid()
1085 domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn())
1086 dn = ldb.Dn(self.samdb, f"CN=Users,{domain_dn}")
1087 try:
1088 nc_root = self.samdb.get_nc_root(dn)
1089 except ldb.LdbError as e:
1090 (code, msg) = e.args
1091 self.fail("Got unexpected exception %d - %s "
1092 % (code, msg))
1093 self.assertEqual(domain_dn, nc_root)
1095 def test_dsdb_dn_nc_root_new_dn(self):
1096 dom_sid = self.samdb.get_domain_sid()
1097 domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn())
1098 dn = ldb.Dn(self.samdb, f"CN=Xnotexisting,CN=Users,{domain_dn}")
1099 try:
1100 nc_root = self.samdb.get_nc_root(dn)
1101 except ldb.LdbError as e:
1102 (code, msg) = e.args
1103 self.fail("Got unexpected exception %d - %s "
1104 % (code, msg))
1105 self.assertEqual(domain_dn, nc_root)
1107 def test_dsdb_dn_nc_root_new_dn_with_guid(self):
1108 domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn())
1109 dn = ldb.Dn(self.samdb, f"<GUID=828e3baf-fa02-4d82-ba5d-6f647dab5fd8>;CN=Xnotexisting,CN=Users,{domain_dn}")
1110 try:
1111 nc_root = self.samdb.get_nc_root(dn)
1112 except ldb.LdbError as e:
1113 (code, msg) = e.args
1114 self.fail("Got unexpected exception %d - %s "
1115 % (code, msg))
1116 self.assertEqual(domain_dn, nc_root)
1118 def test_dsdb_dn_nc_root_guid(self):
1119 ntds_guid = self.samdb.get_ntds_GUID()
1120 configuration_dn = self.samdb.get_config_basedn()
1121 dn = ldb.Dn(self.samdb, f"<GUID={ntds_guid}>")
1122 try:
1123 nc_root = self.samdb.get_nc_root(dn)
1124 except ldb.LdbError as e:
1125 (code, msg) = e.args
1126 self.fail("Got unexpected exception %d - %s "
1127 % (code, msg))
1128 self.assertEqual(configuration_dn, nc_root)
1130 def test_dsdb_dn_nc_root_misleading_to_noexisting_guid(self):
1131 ntds_guid = self.samdb.get_ntds_GUID()
1132 configuration_dn = self.samdb.get_config_basedn()
1133 domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn())
1134 dn = ldb.Dn(self.samdb, f"<GUID={ntds_guid}>;CN=Xnotexisting,CN=Users,{domain_dn}")
1135 try:
1136 nc_root = self.samdb.get_nc_root(dn)
1137 except ldb.LdbError as e:
1138 (code, msg) = e.args
1139 self.fail("Got unexpected exception %d - %s "
1140 % (code, msg))
1141 self.assertEqual(configuration_dn, nc_root)
1143 def test_dsdb_dn_nc_root_misleading_to_existing_guid(self):
1144 ntds_guid = self.samdb.get_ntds_GUID()
1145 configuration_dn = self.samdb.get_config_basedn()
1146 domain_dn = ldb.Dn(self.samdb, self.samdb.domain_dn())
1147 dn = ldb.Dn(self.samdb, f"<GUID={ntds_guid}>;{domain_dn}")
1148 try:
1149 nc_root = self.samdb.get_nc_root(dn)
1150 except ldb.LdbError as e:
1151 (code, msg) = e.args
1152 self.fail("Got unexpected exception %d - %s "
1153 % (code, msg))
1154 self.assertEqual(configuration_dn, nc_root)
1156 class DsdbRemoteNCRootTests(DsdbNCRootTests):
1157 def setUp(self):
1158 super().setUp()
1159 # Reconnect to the remote LDAP port
1160 self.samdb = SamDB(url="ldap://%s" % samba.tests.env_get_var_value('SERVER'),
1161 session_info=self.session,
1162 credentials=self.get_credentials(),
1163 lp=self.lp)
1164 self.remote = True
1167 class DsdbFullScanTests(TestCase):
1169 def setUp(self):
1170 super().setUp()
1171 self.lp = samba.tests.env_loadparm()
1172 self.creds = Credentials()
1173 self.creds.guess(self.lp)
1174 self.session = system_session()
1176 def test_sam_ldb_open_no_full_scan(self):
1177 try:
1178 self.samdb = SamDB(session_info=self.session,
1179 credentials=self.creds,
1180 lp=self.lp,
1181 options=["disable_full_db_scan_for_self_test:1"])
1182 except ldb.LdbError as err:
1183 estr = err.args[1]
1184 self.fail("sam.ldb required a full scan to start up")
1186 class DsdbStartUpTests(TestCase):
1187 def setUp(self):
1188 super().setUp()
1189 lp = samba.tests.env_loadparm()
1190 path = lp.configfile
1192 # This is to avoid a tattoo of the global state
1193 self.lp = LoadParm(filename_for_non_global_lp=path)
1194 self.creds = Credentials()
1195 self.creds.guess(self.lp)
1196 self.session = system_session()
1197 self.samdb = SamDB(session_info=self.session,
1198 credentials=self.creds,
1199 lp=self.lp)
1201 def test_correct_fl(self):
1202 res = self.samdb.search(base="",
1203 scope=ldb.SCOPE_BASE,
1204 attrs=["domainFunctionality"])
1205 # This confirms the domain is in FL 2016 by default, this is
1206 # important to verify the original state
1207 self.assertEqual(int(res[0]["domainFunctionality"][0]),
1208 dsdb.DS_DOMAIN_FUNCTION_2016)
1209 self.assertEqual(functional_level.dc_level_from_lp(self.lp),
1210 dsdb.DS_DOMAIN_FUNCTION_2016)
1211 dsdb.check_and_update_fl(self.samdb, self.lp)
1213 def test_lower_smb_conf_fl(self):
1214 old_lp_fl = self.lp.get("ad dc functional level")
1215 self.lp.set("ad dc functional level",
1216 "2008_R2")
1217 self.addCleanup(self.lp.set, "ad dc functional level", old_lp_fl)
1218 try:
1219 dsdb.check_and_update_fl(self.samdb, self.lp)
1220 self.fail("Should have failed to start DC with 2008 R2 FL in 2016 domain")
1221 except ldb.LdbError as err:
1222 (errno, estr) = err.args
1223 self.assertEqual(errno, ldb.ERR_CONSTRAINT_VIOLATION)