ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / domain_backup.py
blobaf87a93f736ae33cac95f71dfc6322934ba64474
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org>
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from samba import provision, param
18 import os
19 import shutil
20 import subprocess
21 from samba.tests import (env_loadparm, create_test_ou, BlackboxProcessError,
22 BlackboxTestCase, connect_samdb)
23 import ldb
24 from samba.samdb import SamDB
25 from samba.auth import system_session
26 from samba import Ldb, dn_from_dns_name
27 from samba.netcmd import CommandError
28 from samba.netcmd.fsmo import get_fsmo_roleowner
29 import re
30 from samba import sites
31 from samba.dsdb import _dsdb_load_udv_v2
32 from samba import safe_tarfile as tarfile
35 def get_prim_dom(secrets_path, lp):
36 secrets_ldb = Ldb(secrets_path, session_info=system_session(), lp=lp)
37 return secrets_ldb.search(base="CN=Primary Domains",
38 attrs=['objectClass', 'samAccountName',
39 'secret', 'msDS-KeyVersionNumber'],
40 scope=ldb.SCOPE_SUBTREE,
41 expression="(objectClass=kerberosSecret)")
43 # The backup tests require that a completely clean LoadParm object gets used
44 # for the restore. Otherwise the same global LP gets re-used, and the LP
45 # settings can bleed from one test case to another.
46 # To do this, these tests should use check_output(), which executes the command
47 # in a separate process (as opposed to runcmd(), runsubcmd()).
48 # So although this is a samba-tool test, we don't inherit from SambaToolCmdTest
49 # so that we never inadvertently use .runcmd() by accident.
50 class DomainBackupBase(BlackboxTestCase):
52 def setUp(self):
53 super().setUp()
55 server = os.environ["DC_SERVER"]
56 self.user_auth = "-U%s%%%s" % (os.environ["DC_USERNAME"],
57 os.environ["DC_PASSWORD"])
59 # LDB connection to the original server being backed up
60 self.ldb = connect_samdb("ldap://%s" % server)
61 self.new_server = "BACKUPSERV"
62 self.server = server.upper()
63 self.base_cmd = None
64 self.backup_markers = ['sidForRestore', 'backupDate']
65 self.restore_domain = os.environ["DOMAIN"]
66 self.restore_realm = os.environ["REALM"]
67 self.backend = None
69 def use_backend(self, backend):
70 """Explicitly set the DB backend that the backup should use"""
71 self.backend = backend
72 self.base_cmd += ["--backend-store=" + backend]
74 def get_expected_partitions(self, samdb):
75 basedn = str(samdb.get_default_basedn())
76 config_dn = "CN=Configuration,%s" % basedn
77 return [basedn, config_dn, "CN=Schema,%s" % config_dn,
78 "DC=DomainDnsZones,%s" % basedn,
79 "DC=ForestDnsZones,%s" % basedn]
81 def assert_partitions_present(self, samdb):
82 """Asserts all expected partitions are present in the backup samdb"""
83 res = samdb.search(base="", scope=ldb.SCOPE_BASE,
84 attrs=['namingContexts'])
85 actual_ncs = [str(r) for r in res[0].get('namingContexts')]
87 expected_ncs = self.get_expected_partitions(samdb)
89 for nc in expected_ncs:
90 self.assertTrue(nc in actual_ncs,
91 "%s not in %s" % (nc, str(actual_ncs)))
93 def assert_repl_uptodate_vector(self, samdb):
94 """Asserts an replUpToDateVector entry exists for the original DC"""
95 orig_invoc_id = self.ldb.get_invocation_id()
96 expected_ncs = self.get_expected_partitions(samdb)
98 # loop through the partitions and check the upToDateness vector
99 for nc in expected_ncs:
100 found = False
101 for cursor in _dsdb_load_udv_v2(samdb, nc):
102 if orig_invoc_id == str(cursor.source_dsa_invocation_id):
103 found = True
104 break
105 self.assertTrue(found, "Couldn't find UDTV for original DC")
107 def assert_dcs_present(self, samdb, expected_server, expected_count=None):
108 """Checks that the expected server is present in the restored DB"""
109 search_expr = "(&(objectClass=Server)(serverReference=*))"
110 res = samdb.search(samdb.get_config_basedn(),
111 scope=ldb.SCOPE_SUBTREE,
112 expression=search_expr)
113 server_found = False
114 for msg in res:
115 if expected_server in str(msg.dn):
116 server_found = True
118 self.assertTrue(server_found,
119 "Could not find %s server" % expected_server)
121 if expected_count:
122 self.assertTrue(len(res) == expected_count)
124 def restore_dir(self):
125 extract_dir = os.path.join(self.tempdir, 'tree')
126 if not os.path.exists(extract_dir):
127 os.mkdir(extract_dir)
128 self.addCleanup(shutil.rmtree, extract_dir)
129 return extract_dir
131 def untar_backup(self, backup_file):
132 """Untar the backup file's raw contents (i.e. not a proper restore)"""
133 extract_dir = self.restore_dir()
134 with tarfile.open(backup_file) as tf:
135 tf.extractall(extract_dir)
136 return extract_dir
138 def _test_backup_untar(
139 self,
140 primary_domain_secrets=0,
141 verify_checksums=False
143 """Creates a backup, untars the raw files, and sanity-checks the DB"""
144 backup_file = self.create_backup()
145 extract_dir = self.untar_backup(backup_file)
147 if (verify_checksums):
148 p = subprocess.Popen(
149 ["sha256sum", "-c", "SHA256SUM"],
150 stdout=subprocess.PIPE,
151 stderr=subprocess.PIPE,
152 cwd=extract_dir,
154 (out, err) = p.communicate()
155 if p.returncode:
156 print("Error: " + err.decode('utf-8'))
157 raise CommandError('Failed to verify checksums')
159 private_dir = os.path.join(extract_dir, "private")
160 samdb_path = os.path.join(private_dir, "sam.ldb")
161 lp = env_loadparm()
162 samdb = SamDB(url=samdb_path, session_info=system_session(), lp=lp)
164 # check that backup markers were added to the DB
165 res = samdb.search(base=ldb.Dn(samdb, "@SAMBA_DSDB"),
166 scope=ldb.SCOPE_BASE,
167 attrs=self.backup_markers)
168 self.assertEqual(len(res), 1)
169 for marker in self.backup_markers:
170 self.assertIsNotNone(res[0].get(marker),
171 "%s backup marker missing" % marker)
173 # check the secrets.ldb entry for the primary domain. (Online/clone
174 # backups shouldn't have this, as they never got it during the backup)
175 secrets_path = os.path.join(private_dir, "secrets.ldb")
176 res = get_prim_dom(secrets_path, lp)
177 self.assertEqual(len(res), primary_domain_secrets)
179 # sanity-check that all the partitions got backed up
180 self.assert_partitions_present(samdb)
182 def _test_backup_restore(self):
183 """Does a backup/restore, with specific checks of the resulting DB"""
184 backup_file = self.create_backup()
185 self.restore_backup(backup_file)
186 lp = self.check_restored_smbconf()
187 self.check_restored_database(lp)
189 def _test_backup_restore_no_secrets(self):
190 """Does a backup/restore with secrets excluded from the resulting DB"""
192 # exclude secrets when we create the backup
193 backup_file = self.create_backup(extra_args=["--no-secrets"])
194 self.restore_backup(backup_file)
195 lp = self.check_restored_smbconf()
197 # assert that we don't find user secrets in the DB
198 self.check_restored_database(lp, expect_secrets=False)
200 def _test_backup_restore_into_site(self):
201 """Does a backup and restores into a non-default site"""
203 # create a new non-default site
204 sitename = "Test-Site-For-Backups"
205 sites.create_site(self.ldb, self.ldb.get_config_basedn(), sitename)
206 self.addCleanup(sites.delete_site, self.ldb,
207 self.ldb.get_config_basedn(), sitename)
209 # restore the backup DC into the site we just created
210 backup_file = self.create_backup()
211 self.restore_backup(backup_file, ["--site=" + sitename])
213 lp = self.check_restored_smbconf()
214 restored_ldb = self.check_restored_database(lp)
216 # check the restored DC was added to the site we created, i.e. there's
217 # an entry matching the new DC sitting underneath the site DN
218 site_dn = "CN={0},CN=Sites,{1}".format(sitename,
219 restored_ldb.get_config_basedn())
220 match_server = "(&(objectClass=server)(cn={0}))".format(self.new_server)
221 res = restored_ldb.search(site_dn, scope=ldb.SCOPE_SUBTREE,
222 expression=match_server)
223 self.assertTrue(len(res) == 1,
224 "Failed to find new DC under site")
226 def create_smbconf(self, settings):
227 """Creates a very basic smb.conf to pass to the restore tool"""
229 # without the testenv config's settings, the NTACL backup_restore()
230 # operation will fail (because we're not root). So first suck in all
231 # testenv's settings, so we retain these in the new config. Note we
232 # use a non-global LP so that these settings don't leak into other
233 # places we use LoadParms
234 testenv_conf = os.environ["SMB_CONF_PATH"]
235 local_lp = param.LoadParm(filename_for_non_global_lp=testenv_conf)
237 # add the new settings to the LP, then write the settings to file
238 for key, val in settings.items():
239 local_lp.set(key, val)
241 new_smbconf = os.path.join(self.tempdir, "smb.conf")
242 local_lp.dump(False, new_smbconf)
244 self.addCleanup(os.remove, new_smbconf)
245 return new_smbconf
247 def _test_backup_restore_with_conf(self):
248 """Checks smb.conf values passed to the restore are retained"""
249 backup_file = self.create_backup()
251 # create an smb.conf that we pass to the restore. The netbios/state
252 # dir should get overridden by the restore, the other settings should
253 # trickle through into the restored dir's smb.conf
254 settings = {'state directory': '/var/run',
255 'netbios name': 'FOOBAR',
256 'workgroup': 'NOTMYDOMAIN',
257 'realm': 'NOT.MY.REALM'}
258 assert_settings = {'drs: max link sync': '275',
259 'prefork children': '7'}
260 settings.update(assert_settings)
261 smbconf = self.create_smbconf(settings)
263 self.restore_backup(backup_file, ["--configfile=" + smbconf])
265 # this will check netbios name/state dir
266 lp = self.check_restored_smbconf()
267 self.check_restored_database(lp)
269 # check the remaining settings are still intact
270 for key, val in assert_settings.items():
271 self.assertEqual(str(lp.get(key)), val,
272 "'%s' was '%s' in smb.conf" % (key, lp.get(key)))
274 def check_restored_smbconf(self):
275 """Sanity-check important smb.conf values are restored correctly"""
276 smbconf = os.path.join(self.restore_dir(), "etc", "smb.conf")
277 bkp_lp = param.LoadParm(filename_for_non_global_lp=smbconf)
278 self.assertEqual(bkp_lp.get('netbios name'), self.new_server)
279 self.assertEqual(bkp_lp.get('workgroup'), self.restore_domain)
280 self.assertEqual(bkp_lp.get('realm'), self.restore_realm.upper())
282 # we restore with a fixed directory structure, so we can sanity-check
283 # that the core filepaths settings are what we expect them to be
284 private_dir = os.path.join(self.restore_dir(), "private")
285 self.assertEqual(bkp_lp.get('private dir'), private_dir)
286 state_dir = os.path.join(self.restore_dir(), "state")
287 self.assertEqual(bkp_lp.get('state directory'), state_dir)
288 return bkp_lp
290 def check_restored_database(self, bkp_lp, expect_secrets=True):
291 paths = provision.provision_paths_from_lp(bkp_lp, bkp_lp.get("realm"))
293 bkp_pd = get_prim_dom(paths.secrets, bkp_lp)
294 self.assertEqual(len(bkp_pd), 1)
295 account = bkp_pd[0].get('samAccountName')
296 self.assertIsNotNone(account)
297 self.assertEqual(str(account[0]), self.new_server + '$')
298 self.assertIsNotNone(bkp_pd[0].get('secret'))
300 samdb = SamDB(url=paths.samdb, session_info=system_session(),
301 lp=bkp_lp, credentials=self.get_credentials())
303 # check that the backup markers have been removed from the restored DB
304 res = samdb.search(base=ldb.Dn(samdb, "@SAMBA_DSDB"),
305 scope=ldb.SCOPE_BASE,
306 attrs=self.backup_markers)
307 self.assertEqual(len(res), 1)
308 for marker in self.backup_markers:
309 self.assertIsNone(res[0].get(marker),
310 "%s backup-marker left behind" % marker)
312 # check that the repsFrom and repsTo values have been removed
313 # from the restored DB
314 res = samdb.search(base=samdb.get_default_basedn(),
315 scope=ldb.SCOPE_BASE,
316 attrs=['repsFrom', 'repsTo'])
317 self.assertEqual(len(res), 1)
318 self.assertIsNone(res[0].get('repsFrom'))
319 self.assertIsNone(res[0].get('repsTo'))
321 res = samdb.search(base=samdb.get_config_basedn(),
322 scope=ldb.SCOPE_BASE,
323 attrs=['repsFrom', 'repsTo'])
324 self.assertEqual(len(res), 1)
325 self.assertIsNone(res[0].get('repsFrom'))
326 self.assertIsNone(res[0].get('repsTo'))
328 # check the DB is using the backend we supplied
329 if self.backend:
330 res = samdb.search(base="@PARTITION", scope=ldb.SCOPE_BASE,
331 attrs=["backendStore"])
332 backend = str(res[0].get("backendStore"))
333 self.assertEqual(backend, self.backend)
335 # check the restored DB has the expected partitions/DC/FSMO roles
336 self.assert_partitions_present(samdb)
337 self.assert_dcs_present(samdb, self.new_server, expected_count=1)
338 self.assert_fsmo_roles(samdb, self.new_server, self.server)
339 self.assert_secrets(samdb, expect_secrets=expect_secrets)
341 # check we still have an uptodateness vector for the original DC
342 self.assert_repl_uptodate_vector(samdb)
343 return samdb
345 def assert_user_secrets(self, samdb, username, expect_secrets):
346 """Asserts that a user has/doesn't have secrets as expected"""
347 basedn = str(samdb.get_default_basedn())
348 user_dn = "CN=%s,CN=users,%s" % (username, basedn)
350 if expect_secrets:
351 self.assertIsNotNone(samdb.searchone("unicodePwd", user_dn))
352 else:
353 # the search should throw an exception because the secrets
354 # attribute isn't actually there
355 self.assertRaises(KeyError, samdb.searchone, "unicodePwd", user_dn)
357 def assert_secrets(self, samdb, expect_secrets):
358 """Check the user secrets in the restored DB match what's expected"""
360 # check secrets for the built-in testenv users match what's expected
361 test_users = ["alice", "bob", "jane", "joe"]
362 for user in test_users:
363 self.assert_user_secrets(samdb, user, expect_secrets)
365 def assert_fsmo_roles(self, samdb, server, exclude_server):
366 """Asserts the expected server is the FSMO role owner"""
367 domain_dn = samdb.domain_dn()
368 forest_dn = dn_from_dns_name(samdb.forest_dns_name())
369 fsmos = {'infrastructure': "CN=Infrastructure," + domain_dn,
370 'naming': "CN=Partitions,%s" % samdb.get_config_basedn(),
371 'schema': str(samdb.get_schema_basedn()),
372 'rid': "CN=RID Manager$,CN=System," + domain_dn,
373 'pdc': domain_dn,
374 'domaindns':
375 "CN=Infrastructure,DC=DomainDnsZones," + domain_dn,
376 'forestdns':
377 "CN=Infrastructure,DC=ForestDnsZones," + forest_dn}
378 for role, dn in fsmos.items():
379 owner = get_fsmo_roleowner(samdb, ldb.Dn(samdb, dn), role)
380 self.assertTrue("CN={0},".format(server) in owner.extended_str(),
381 "Expected %s to own FSMO role %s" % (server, role))
382 self.assertTrue("CN={0},".format(exclude_server)
383 not in owner.extended_str(),
384 "%s found as FSMO %s role owner" % (server, role))
386 def cleanup_tempdir(self):
387 for filename in os.listdir(self.tempdir):
388 filepath = os.path.join(self.tempdir, filename)
389 if os.path.isfile(filepath):
390 os.remove(filepath)
391 elif os.path.isdir(filepath):
392 shutil.rmtree(filepath)
394 def run_cmd(self, args):
395 """Executes a samba-tool backup/restore command"""
397 cmd = " ".join(args)
398 print("Executing: samba-tool %s" % cmd)
399 try:
400 # note: it's important we run the cmd in a separate process here
401 out = self.check_output("samba-tool " + cmd)
402 except BlackboxProcessError as e:
403 # if the command failed, it may have left behind temporary files.
404 # We're going to fail the test, but first cleanup any temp files so
405 # that we skip the TestCaseInTempDir._remove_tempdir() assertions
406 self.cleanup_tempdir()
407 self.fail("Error calling samba-tool: %s" % e)
408 print(out)
410 def create_backup(self, extra_args=None):
411 """Runs the backup cmd to produce a backup file for the testenv DC"""
412 # Run the backup command and check we got one backup tar file
413 args = self.base_cmd + ["--targetdir=" + self.tempdir]
414 if extra_args:
415 args += extra_args
417 self.run_cmd(args)
419 # find the filename of the backup-file generated
420 tar_files = []
421 for fn in os.listdir(self.tempdir):
422 if (fn.startswith("samba-backup-") and fn.endswith(".tar.bz2")):
423 tar_files.append(fn)
425 self.assertTrue(len(tar_files) == 1,
426 "Domain backup created %u tar files" % len(tar_files))
428 # clean up the backup file once the test finishes
429 backup_file = os.path.join(self.tempdir, tar_files[0])
430 self.addCleanup(os.remove, backup_file)
431 return backup_file
433 def restore_backup(self, backup_file, extra_args=None):
434 """Restores the samba directory files from a given backup"""
435 # Run the restore command
436 extract_dir = self.restore_dir()
437 args = ["domain", "backup", "restore", "--backup-file=" + backup_file,
438 "--targetdir=" + extract_dir,
439 "--newservername=" + self.new_server]
440 if extra_args:
441 args += extra_args
443 self.run_cmd(args)
445 # sanity-check the restore doesn't modify the original DC by mistake
446 self.assert_partitions_present(self.ldb)
447 self.assert_dcs_present(self.ldb, self.server)
448 self.assert_fsmo_roles(self.ldb, self.server, self.new_server)
451 class DomainBackupOnline(DomainBackupBase):
453 def setUp(self):
454 super().setUp()
455 self.base_cmd = ["domain", "backup", "online",
456 "--server=" + self.server, self.user_auth]
458 # run the common test cases above using online backups
459 def test_backup_untar(self):
460 self._test_backup_untar()
462 def test_backup_restore(self):
463 self.use_backend("tdb")
464 self._test_backup_restore()
466 def test_backup_restore_with_conf(self):
467 self.use_backend("mdb")
468 self._test_backup_restore_with_conf()
470 def test_backup_restore_no_secrets(self):
471 self.use_backend("tdb")
472 self._test_backup_restore_no_secrets()
474 def test_backup_restore_into_site(self):
475 self.use_backend("mdb")
476 self._test_backup_restore_into_site()
479 class DomainBackupRename(DomainBackupBase):
481 # run the above test cases using a rename backup
482 def setUp(self):
483 super().setUp()
484 self.new_server = "RENAMESERV"
485 self.restore_domain = "NEWDOMAIN"
486 self.restore_realm = "rename.test.net"
487 self.new_basedn = "DC=rename,DC=test,DC=net"
488 self.base_cmd = ["domain", "backup", "rename", self.restore_domain,
489 self.restore_realm, "--server=" + self.server,
490 self.user_auth]
491 self.backup_markers += ['backupRename']
493 # run the common test case code for backup-renames
494 def test_backup_untar(self):
495 self._test_backup_untar()
497 def test_backup_restore(self):
498 self.use_backend("mdb")
499 self._test_backup_restore()
501 def test_backup_restore_with_conf(self):
502 self.use_backend("tdb")
503 self._test_backup_restore_with_conf()
505 def test_backup_restore_no_secrets(self):
506 self.use_backend("mdb")
507 self._test_backup_restore_no_secrets()
509 def test_backup_restore_into_site(self):
510 self.use_backend("tdb")
511 self._test_backup_restore_into_site()
513 def test_backup_invalid_args(self):
514 """Checks that rename commands with invalid args are rejected"""
516 # try a "rename" using the same realm as the DC currently has
517 rename_cmd = "samba-tool domain backup rename "
518 bad_cmd = "{cmd} {domain} {realm}".format(cmd=rename_cmd,
519 domain=self.restore_domain,
520 realm=os.environ["REALM"])
521 self.assertRaises(BlackboxProcessError, self.check_output, bad_cmd)
523 # try a "rename" using the same domain as the DC currently has
524 bad_cmd = "{cmd} {domain} {realm}".format(cmd=rename_cmd,
525 domain=os.environ["DOMAIN"],
526 realm=self.restore_realm)
527 self.assertRaises(BlackboxProcessError, self.check_output, bad_cmd)
529 def add_link(self, attr, source, target):
530 m = ldb.Message()
531 m.dn = ldb.Dn(self.ldb, source)
532 m[attr] = ldb.MessageElement(target, ldb.FLAG_MOD_ADD, attr)
533 self.ldb.modify(m)
535 def test_one_way_links(self):
536 """Sanity-check that a rename handles one-way links correctly"""
538 # Do some initial setup on the DC before back it up:
539 # create an OU to hold the test objects we'll create
540 test_ou = create_test_ou(self.ldb, "rename_test")
541 self.addCleanup(self.ldb.delete, test_ou, ["tree_delete:1"])
543 # create the source and target objects and link them together.
544 # We use addressBookRoots2 here because it's a one-way link
545 src_dn = "CN=link_src,%s" % test_ou
546 self.ldb.add({"dn": src_dn,
547 "objectclass": "msExchConfigurationContainer"})
548 target_dn = "OU=link_tgt,%s" % test_ou
549 self.ldb.add({"dn": target_dn, "objectclass": "organizationalunit"})
550 link_attr = "addressBookRoots2"
551 self.add_link(link_attr, src_dn, target_dn)
553 # add a second link target that's in a different partition
554 server_dn = ("CN=testrename,CN=Servers,CN=Default-First-Site-Name,"
555 "CN=Sites,%s" % str(self.ldb.get_config_basedn()))
556 self.ldb.add({"dn": server_dn, "objectclass": "server"})
557 self.addCleanup(self.ldb.delete, server_dn)
558 self.add_link(link_attr, src_dn, server_dn)
560 # do the backup/restore
561 backup_file = self.create_backup()
562 self.restore_backup(backup_file)
563 lp = self.check_restored_smbconf()
564 restored_ldb = self.check_restored_database(lp)
566 # work out what the new DNs should be
567 old_basedn = str(self.ldb.get_default_basedn())
568 new_target_dn = re.sub(old_basedn + '$', self.new_basedn, target_dn)
569 new_src_dn = re.sub(old_basedn + '$', self.new_basedn, src_dn)
570 new_server_dn = re.sub(old_basedn + '$', self.new_basedn, server_dn)
572 # check the links exist in the renamed DB with the correct DNs
573 res = restored_ldb.search(base=new_src_dn, scope=ldb.SCOPE_BASE,
574 attrs=[link_attr])
575 self.assertEqual(len(res), 1,
576 "Failed to find renamed link source object")
577 self.assertTrue(link_attr in res[0], "Missing link attribute")
578 link_values = [str(x) for x in res[0][link_attr]]
579 self.assertTrue(new_target_dn in link_values)
580 self.assertTrue(new_server_dn in link_values)
582 # extra checks we run on the restored DB in the rename case
583 def check_restored_database(self, bkp_lp, expect_secrets=True):
584 # run the common checks over the restored DB
585 common_test = super()
586 samdb = common_test.check_restored_database(bkp_lp, expect_secrets)
588 # check we have actually renamed the DNs
589 basedn = str(samdb.get_default_basedn())
590 self.assertEqual(basedn, self.new_basedn)
592 # check the partition and netBIOS name match the new domain
593 partitions_dn = samdb.get_partitions_dn()
594 nc_name = ldb.binary_encode(str(basedn))
595 res = samdb.search(base=partitions_dn, scope=ldb.SCOPE_ONELEVEL,
596 attrs=["nETBIOSName", "cn"],
597 expression='ncName=%s' % nc_name)
598 self.assertEqual(len(res), 1,
599 "Looking up partition's NetBIOS name failed")
600 self.assertEqual(str(res[0].get("nETBIOSName")), self.restore_domain)
601 self.assertEqual(str(res[0].get("cn")), self.restore_domain)
603 # check the DC has the correct dnsHostname
604 realm = self.restore_realm
605 dn = "CN=%s,OU=Domain Controllers,%s" % (self.new_server,
606 self.new_basedn)
607 res = samdb.search(base=dn, scope=ldb.SCOPE_BASE,
608 attrs=["dNSHostName"])
609 self.assertEqual(len(res), 1,
610 "Looking up new DC's dnsHostname failed")
611 expected_val = "%s.%s" % (self.new_server.lower(), realm)
612 self.assertEqual(str(res[0].get("dNSHostName")), expected_val)
614 # check the DNS zones for the new realm are present
615 dn = "DC=%s,CN=MicrosoftDNS,DC=DomainDnsZones,%s" % (realm, basedn)
616 res = samdb.search(base=dn, scope=ldb.SCOPE_BASE)
617 self.assertEqual(len(res), 1, "Lookup of new domain's DNS zone failed")
619 forestdn = samdb.get_root_basedn().get_linearized()
620 dn = "DC=_msdcs.%s,CN=MicrosoftDNS,DC=ForestDnsZones,%s" % (realm,
621 forestdn)
622 res = samdb.search(base=dn, scope=ldb.SCOPE_BASE)
623 self.assertEqual(len(res), 1, "Lookup of new domain's DNS zone failed")
624 return samdb
627 class DomainBackupOffline(DomainBackupBase):
629 def setUp(self):
630 super().setUp()
631 self.base_cmd = ["domain", "backup", "offline"]
633 def test_backup_untar(self):
634 self._test_backup_untar(primary_domain_secrets=1, verify_checksums=True)
636 def test_backup_restore_with_conf(self):
637 self._test_backup_restore_with_conf()
639 def test_backup_restore(self):
640 self._test_backup_restore()
642 def test_backup_restore_into_site(self):
643 self._test_backup_restore_into_site()