1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org>
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from samba
import provision
, param
21 from samba
.tests
import (env_loadparm
, create_test_ou
, BlackboxProcessError
,
22 BlackboxTestCase
, connect_samdb
)
24 from samba
.samdb
import SamDB
25 from samba
.auth
import system_session
26 from samba
import Ldb
, dn_from_dns_name
27 from samba
.netcmd
import CommandError
28 from samba
.netcmd
.fsmo
import get_fsmo_roleowner
30 from samba
import sites
31 from samba
.dsdb
import _dsdb_load_udv_v2
32 from samba
import safe_tarfile
as tarfile
35 def get_prim_dom(secrets_path
, lp
):
36 secrets_ldb
= Ldb(secrets_path
, session_info
=system_session(), lp
=lp
)
37 return secrets_ldb
.search(base
="CN=Primary Domains",
38 attrs
=['objectClass', 'samAccountName',
39 'secret', 'msDS-KeyVersionNumber'],
40 scope
=ldb
.SCOPE_SUBTREE
,
41 expression
="(objectClass=kerberosSecret)")
43 # The backup tests require that a completely clean LoadParm object gets used
44 # for the restore. Otherwise the same global LP gets re-used, and the LP
45 # settings can bleed from one test case to another.
46 # To do this, these tests should use check_output(), which executes the command
47 # in a separate process (as opposed to runcmd(), runsubcmd()).
48 # So although this is a samba-tool test, we don't inherit from SambaToolCmdTest
49 # so that we never inadvertently use .runcmd() by accident.
50 class DomainBackupBase(BlackboxTestCase
):
55 server
= os
.environ
["DC_SERVER"]
56 self
.user_auth
= "-U%s%%%s" % (os
.environ
["DC_USERNAME"],
57 os
.environ
["DC_PASSWORD"])
59 # LDB connection to the original server being backed up
60 self
.ldb
= connect_samdb("ldap://%s" % server
)
61 self
.new_server
= "BACKUPSERV"
62 self
.server
= server
.upper()
64 self
.backup_markers
= ['sidForRestore', 'backupDate']
65 self
.restore_domain
= os
.environ
["DOMAIN"]
66 self
.restore_realm
= os
.environ
["REALM"]
69 def use_backend(self
, backend
):
70 """Explicitly set the DB backend that the backup should use"""
71 self
.backend
= backend
72 self
.base_cmd
+= ["--backend-store=" + backend
]
74 def get_expected_partitions(self
, samdb
):
75 basedn
= str(samdb
.get_default_basedn())
76 config_dn
= "CN=Configuration,%s" % basedn
77 return [basedn
, config_dn
, "CN=Schema,%s" % config_dn
,
78 "DC=DomainDnsZones,%s" % basedn
,
79 "DC=ForestDnsZones,%s" % basedn
]
81 def assert_partitions_present(self
, samdb
):
82 """Asserts all expected partitions are present in the backup samdb"""
83 res
= samdb
.search(base
="", scope
=ldb
.SCOPE_BASE
,
84 attrs
=['namingContexts'])
85 actual_ncs
= [str(r
) for r
in res
[0].get('namingContexts')]
87 expected_ncs
= self
.get_expected_partitions(samdb
)
89 for nc
in expected_ncs
:
90 self
.assertTrue(nc
in actual_ncs
,
91 "%s not in %s" % (nc
, str(actual_ncs
)))
93 def assert_repl_uptodate_vector(self
, samdb
):
94 """Asserts an replUpToDateVector entry exists for the original DC"""
95 orig_invoc_id
= self
.ldb
.get_invocation_id()
96 expected_ncs
= self
.get_expected_partitions(samdb
)
98 # loop through the partitions and check the upToDateness vector
99 for nc
in expected_ncs
:
101 for cursor
in _dsdb_load_udv_v2(samdb
, nc
):
102 if orig_invoc_id
== str(cursor
.source_dsa_invocation_id
):
105 self
.assertTrue(found
, "Couldn't find UDTV for original DC")
107 def assert_dcs_present(self
, samdb
, expected_server
, expected_count
=None):
108 """Checks that the expected server is present in the restored DB"""
109 search_expr
= "(&(objectClass=Server)(serverReference=*))"
110 res
= samdb
.search(samdb
.get_config_basedn(),
111 scope
=ldb
.SCOPE_SUBTREE
,
112 expression
=search_expr
)
115 if expected_server
in str(msg
.dn
):
118 self
.assertTrue(server_found
,
119 "Could not find %s server" % expected_server
)
122 self
.assertTrue(len(res
) == expected_count
)
124 def restore_dir(self
):
125 extract_dir
= os
.path
.join(self
.tempdir
, 'tree')
126 if not os
.path
.exists(extract_dir
):
127 os
.mkdir(extract_dir
)
128 self
.addCleanup(shutil
.rmtree
, extract_dir
)
131 def untar_backup(self
, backup_file
):
132 """Untar the backup file's raw contents (i.e. not a proper restore)"""
133 extract_dir
= self
.restore_dir()
134 with tarfile
.open(backup_file
) as tf
:
135 tf
.extractall(extract_dir
)
138 def _test_backup_untar(
140 primary_domain_secrets
=0,
141 verify_checksums
=False
143 """Creates a backup, untars the raw files, and sanity-checks the DB"""
144 backup_file
= self
.create_backup()
145 extract_dir
= self
.untar_backup(backup_file
)
147 if (verify_checksums
):
148 p
= subprocess
.Popen(
149 ["sha256sum", "-c", "SHA256SUM"],
150 stdout
=subprocess
.PIPE
,
151 stderr
=subprocess
.PIPE
,
154 (out
, err
) = p
.communicate()
156 print("Error: " + err
.decode('utf-8'))
157 raise CommandError('Failed to verify checksums')
159 private_dir
= os
.path
.join(extract_dir
, "private")
160 samdb_path
= os
.path
.join(private_dir
, "sam.ldb")
162 samdb
= SamDB(url
=samdb_path
, session_info
=system_session(), lp
=lp
)
164 # check that backup markers were added to the DB
165 res
= samdb
.search(base
=ldb
.Dn(samdb
, "@SAMBA_DSDB"),
166 scope
=ldb
.SCOPE_BASE
,
167 attrs
=self
.backup_markers
)
168 self
.assertEqual(len(res
), 1)
169 for marker
in self
.backup_markers
:
170 self
.assertIsNotNone(res
[0].get(marker
),
171 "%s backup marker missing" % marker
)
173 # check the secrets.ldb entry for the primary domain. (Online/clone
174 # backups shouldn't have this, as they never got it during the backup)
175 secrets_path
= os
.path
.join(private_dir
, "secrets.ldb")
176 res
= get_prim_dom(secrets_path
, lp
)
177 self
.assertEqual(len(res
), primary_domain_secrets
)
179 # sanity-check that all the partitions got backed up
180 self
.assert_partitions_present(samdb
)
182 def _test_backup_restore(self
):
183 """Does a backup/restore, with specific checks of the resulting DB"""
184 backup_file
= self
.create_backup()
185 self
.restore_backup(backup_file
)
186 lp
= self
.check_restored_smbconf()
187 self
.check_restored_database(lp
)
189 def _test_backup_restore_no_secrets(self
):
190 """Does a backup/restore with secrets excluded from the resulting DB"""
192 # exclude secrets when we create the backup
193 backup_file
= self
.create_backup(extra_args
=["--no-secrets"])
194 self
.restore_backup(backup_file
)
195 lp
= self
.check_restored_smbconf()
197 # assert that we don't find user secrets in the DB
198 self
.check_restored_database(lp
, expect_secrets
=False)
200 def _test_backup_restore_into_site(self
):
201 """Does a backup and restores into a non-default site"""
203 # create a new non-default site
204 sitename
= "Test-Site-For-Backups"
205 sites
.create_site(self
.ldb
, self
.ldb
.get_config_basedn(), sitename
)
206 self
.addCleanup(sites
.delete_site
, self
.ldb
,
207 self
.ldb
.get_config_basedn(), sitename
)
209 # restore the backup DC into the site we just created
210 backup_file
= self
.create_backup()
211 self
.restore_backup(backup_file
, ["--site=" + sitename
])
213 lp
= self
.check_restored_smbconf()
214 restored_ldb
= self
.check_restored_database(lp
)
216 # check the restored DC was added to the site we created, i.e. there's
217 # an entry matching the new DC sitting underneath the site DN
218 site_dn
= "CN={0},CN=Sites,{1}".format(sitename
,
219 restored_ldb
.get_config_basedn())
220 match_server
= "(&(objectClass=server)(cn={0}))".format(self
.new_server
)
221 res
= restored_ldb
.search(site_dn
, scope
=ldb
.SCOPE_SUBTREE
,
222 expression
=match_server
)
223 self
.assertTrue(len(res
) == 1,
224 "Failed to find new DC under site")
226 def create_smbconf(self
, settings
):
227 """Creates a very basic smb.conf to pass to the restore tool"""
229 # without the testenv config's settings, the NTACL backup_restore()
230 # operation will fail (because we're not root). So first suck in all
231 # testenv's settings, so we retain these in the new config. Note we
232 # use a non-global LP so that these settings don't leak into other
233 # places we use LoadParms
234 testenv_conf
= os
.environ
["SMB_CONF_PATH"]
235 local_lp
= param
.LoadParm(filename_for_non_global_lp
=testenv_conf
)
237 # add the new settings to the LP, then write the settings to file
238 for key
, val
in settings
.items():
239 local_lp
.set(key
, val
)
241 new_smbconf
= os
.path
.join(self
.tempdir
, "smb.conf")
242 local_lp
.dump(False, new_smbconf
)
244 self
.addCleanup(os
.remove
, new_smbconf
)
247 def _test_backup_restore_with_conf(self
):
248 """Checks smb.conf values passed to the restore are retained"""
249 backup_file
= self
.create_backup()
251 # create an smb.conf that we pass to the restore. The netbios/state
252 # dir should get overridden by the restore, the other settings should
253 # trickle through into the restored dir's smb.conf
254 settings
= {'state directory': '/var/run',
255 'netbios name': 'FOOBAR',
256 'workgroup': 'NOTMYDOMAIN',
257 'realm': 'NOT.MY.REALM'}
258 assert_settings
= {'drs: max link sync': '275',
259 'prefork children': '7'}
260 settings
.update(assert_settings
)
261 smbconf
= self
.create_smbconf(settings
)
263 self
.restore_backup(backup_file
, ["--configfile=" + smbconf
])
265 # this will check netbios name/state dir
266 lp
= self
.check_restored_smbconf()
267 self
.check_restored_database(lp
)
269 # check the remaining settings are still intact
270 for key
, val
in assert_settings
.items():
271 self
.assertEqual(str(lp
.get(key
)), val
,
272 "'%s' was '%s' in smb.conf" % (key
, lp
.get(key
)))
274 def check_restored_smbconf(self
):
275 """Sanity-check important smb.conf values are restored correctly"""
276 smbconf
= os
.path
.join(self
.restore_dir(), "etc", "smb.conf")
277 bkp_lp
= param
.LoadParm(filename_for_non_global_lp
=smbconf
)
278 self
.assertEqual(bkp_lp
.get('netbios name'), self
.new_server
)
279 self
.assertEqual(bkp_lp
.get('workgroup'), self
.restore_domain
)
280 self
.assertEqual(bkp_lp
.get('realm'), self
.restore_realm
.upper())
282 # we restore with a fixed directory structure, so we can sanity-check
283 # that the core filepaths settings are what we expect them to be
284 private_dir
= os
.path
.join(self
.restore_dir(), "private")
285 self
.assertEqual(bkp_lp
.get('private dir'), private_dir
)
286 state_dir
= os
.path
.join(self
.restore_dir(), "state")
287 self
.assertEqual(bkp_lp
.get('state directory'), state_dir
)
290 def check_restored_database(self
, bkp_lp
, expect_secrets
=True):
291 paths
= provision
.provision_paths_from_lp(bkp_lp
, bkp_lp
.get("realm"))
293 bkp_pd
= get_prim_dom(paths
.secrets
, bkp_lp
)
294 self
.assertEqual(len(bkp_pd
), 1)
295 account
= bkp_pd
[0].get('samAccountName')
296 self
.assertIsNotNone(account
)
297 self
.assertEqual(str(account
[0]), self
.new_server
+ '$')
298 self
.assertIsNotNone(bkp_pd
[0].get('secret'))
300 samdb
= SamDB(url
=paths
.samdb
, session_info
=system_session(),
301 lp
=bkp_lp
, credentials
=self
.get_credentials())
303 # check that the backup markers have been removed from the restored DB
304 res
= samdb
.search(base
=ldb
.Dn(samdb
, "@SAMBA_DSDB"),
305 scope
=ldb
.SCOPE_BASE
,
306 attrs
=self
.backup_markers
)
307 self
.assertEqual(len(res
), 1)
308 for marker
in self
.backup_markers
:
309 self
.assertIsNone(res
[0].get(marker
),
310 "%s backup-marker left behind" % marker
)
312 # check that the repsFrom and repsTo values have been removed
313 # from the restored DB
314 res
= samdb
.search(base
=samdb
.get_default_basedn(),
315 scope
=ldb
.SCOPE_BASE
,
316 attrs
=['repsFrom', 'repsTo'])
317 self
.assertEqual(len(res
), 1)
318 self
.assertIsNone(res
[0].get('repsFrom'))
319 self
.assertIsNone(res
[0].get('repsTo'))
321 res
= samdb
.search(base
=samdb
.get_config_basedn(),
322 scope
=ldb
.SCOPE_BASE
,
323 attrs
=['repsFrom', 'repsTo'])
324 self
.assertEqual(len(res
), 1)
325 self
.assertIsNone(res
[0].get('repsFrom'))
326 self
.assertIsNone(res
[0].get('repsTo'))
328 # check the DB is using the backend we supplied
330 res
= samdb
.search(base
="@PARTITION", scope
=ldb
.SCOPE_BASE
,
331 attrs
=["backendStore"])
332 backend
= str(res
[0].get("backendStore"))
333 self
.assertEqual(backend
, self
.backend
)
335 # check the restored DB has the expected partitions/DC/FSMO roles
336 self
.assert_partitions_present(samdb
)
337 self
.assert_dcs_present(samdb
, self
.new_server
, expected_count
=1)
338 self
.assert_fsmo_roles(samdb
, self
.new_server
, self
.server
)
339 self
.assert_secrets(samdb
, expect_secrets
=expect_secrets
)
341 # check we still have an uptodateness vector for the original DC
342 self
.assert_repl_uptodate_vector(samdb
)
345 def assert_user_secrets(self
, samdb
, username
, expect_secrets
):
346 """Asserts that a user has/doesn't have secrets as expected"""
347 basedn
= str(samdb
.get_default_basedn())
348 user_dn
= "CN=%s,CN=users,%s" % (username
, basedn
)
351 self
.assertIsNotNone(samdb
.searchone("unicodePwd", user_dn
))
353 # the search should throw an exception because the secrets
354 # attribute isn't actually there
355 self
.assertRaises(KeyError, samdb
.searchone
, "unicodePwd", user_dn
)
357 def assert_secrets(self
, samdb
, expect_secrets
):
358 """Check the user secrets in the restored DB match what's expected"""
360 # check secrets for the built-in testenv users match what's expected
361 test_users
= ["alice", "bob", "jane", "joe"]
362 for user
in test_users
:
363 self
.assert_user_secrets(samdb
, user
, expect_secrets
)
365 def assert_fsmo_roles(self
, samdb
, server
, exclude_server
):
366 """Asserts the expected server is the FSMO role owner"""
367 domain_dn
= samdb
.domain_dn()
368 forest_dn
= dn_from_dns_name(samdb
.forest_dns_name())
369 fsmos
= {'infrastructure': "CN=Infrastructure," + domain_dn
,
370 'naming': "CN=Partitions,%s" % samdb
.get_config_basedn(),
371 'schema': str(samdb
.get_schema_basedn()),
372 'rid': "CN=RID Manager$,CN=System," + domain_dn
,
375 "CN=Infrastructure,DC=DomainDnsZones," + domain_dn
,
377 "CN=Infrastructure,DC=ForestDnsZones," + forest_dn
}
378 for role
, dn
in fsmos
.items():
379 owner
= get_fsmo_roleowner(samdb
, ldb
.Dn(samdb
, dn
), role
)
380 self
.assertTrue("CN={0},".format(server
) in owner
.extended_str(),
381 "Expected %s to own FSMO role %s" % (server
, role
))
382 self
.assertTrue("CN={0},".format(exclude_server
)
383 not in owner
.extended_str(),
384 "%s found as FSMO %s role owner" % (server
, role
))
386 def cleanup_tempdir(self
):
387 for filename
in os
.listdir(self
.tempdir
):
388 filepath
= os
.path
.join(self
.tempdir
, filename
)
389 if os
.path
.isfile(filepath
):
391 elif os
.path
.isdir(filepath
):
392 shutil
.rmtree(filepath
)
394 def run_cmd(self
, args
):
395 """Executes a samba-tool backup/restore command"""
398 print("Executing: samba-tool %s" % cmd
)
400 # note: it's important we run the cmd in a separate process here
401 out
= self
.check_output("samba-tool " + cmd
)
402 except BlackboxProcessError
as e
:
403 # if the command failed, it may have left behind temporary files.
404 # We're going to fail the test, but first cleanup any temp files so
405 # that we skip the TestCaseInTempDir._remove_tempdir() assertions
406 self
.cleanup_tempdir()
407 self
.fail("Error calling samba-tool: %s" % e
)
410 def create_backup(self
, extra_args
=None):
411 """Runs the backup cmd to produce a backup file for the testenv DC"""
412 # Run the backup command and check we got one backup tar file
413 args
= self
.base_cmd
+ ["--targetdir=" + self
.tempdir
]
419 # find the filename of the backup-file generated
421 for fn
in os
.listdir(self
.tempdir
):
422 if (fn
.startswith("samba-backup-") and fn
.endswith(".tar.bz2")):
425 self
.assertTrue(len(tar_files
) == 1,
426 "Domain backup created %u tar files" % len(tar_files
))
428 # clean up the backup file once the test finishes
429 backup_file
= os
.path
.join(self
.tempdir
, tar_files
[0])
430 self
.addCleanup(os
.remove
, backup_file
)
433 def restore_backup(self
, backup_file
, extra_args
=None):
434 """Restores the samba directory files from a given backup"""
435 # Run the restore command
436 extract_dir
= self
.restore_dir()
437 args
= ["domain", "backup", "restore", "--backup-file=" + backup_file
,
438 "--targetdir=" + extract_dir
,
439 "--newservername=" + self
.new_server
]
445 # sanity-check the restore doesn't modify the original DC by mistake
446 self
.assert_partitions_present(self
.ldb
)
447 self
.assert_dcs_present(self
.ldb
, self
.server
)
448 self
.assert_fsmo_roles(self
.ldb
, self
.server
, self
.new_server
)
451 class DomainBackupOnline(DomainBackupBase
):
455 self
.base_cmd
= ["domain", "backup", "online",
456 "--server=" + self
.server
, self
.user_auth
]
458 # run the common test cases above using online backups
459 def test_backup_untar(self
):
460 self
._test
_backup
_untar
()
462 def test_backup_restore(self
):
463 self
.use_backend("tdb")
464 self
._test
_backup
_restore
()
466 def test_backup_restore_with_conf(self
):
467 self
.use_backend("mdb")
468 self
._test
_backup
_restore
_with
_conf
()
470 def test_backup_restore_no_secrets(self
):
471 self
.use_backend("tdb")
472 self
._test
_backup
_restore
_no
_secrets
()
474 def test_backup_restore_into_site(self
):
475 self
.use_backend("mdb")
476 self
._test
_backup
_restore
_into
_site
()
479 class DomainBackupRename(DomainBackupBase
):
481 # run the above test cases using a rename backup
484 self
.new_server
= "RENAMESERV"
485 self
.restore_domain
= "NEWDOMAIN"
486 self
.restore_realm
= "rename.test.net"
487 self
.new_basedn
= "DC=rename,DC=test,DC=net"
488 self
.base_cmd
= ["domain", "backup", "rename", self
.restore_domain
,
489 self
.restore_realm
, "--server=" + self
.server
,
491 self
.backup_markers
+= ['backupRename']
493 # run the common test case code for backup-renames
494 def test_backup_untar(self
):
495 self
._test
_backup
_untar
()
497 def test_backup_restore(self
):
498 self
.use_backend("mdb")
499 self
._test
_backup
_restore
()
501 def test_backup_restore_with_conf(self
):
502 self
.use_backend("tdb")
503 self
._test
_backup
_restore
_with
_conf
()
505 def test_backup_restore_no_secrets(self
):
506 self
.use_backend("mdb")
507 self
._test
_backup
_restore
_no
_secrets
()
509 def test_backup_restore_into_site(self
):
510 self
.use_backend("tdb")
511 self
._test
_backup
_restore
_into
_site
()
513 def test_backup_invalid_args(self
):
514 """Checks that rename commands with invalid args are rejected"""
516 # try a "rename" using the same realm as the DC currently has
517 rename_cmd
= "samba-tool domain backup rename "
518 bad_cmd
= "{cmd} {domain} {realm}".format(cmd
=rename_cmd
,
519 domain
=self
.restore_domain
,
520 realm
=os
.environ
["REALM"])
521 self
.assertRaises(BlackboxProcessError
, self
.check_output
, bad_cmd
)
523 # try a "rename" using the same domain as the DC currently has
524 bad_cmd
= "{cmd} {domain} {realm}".format(cmd
=rename_cmd
,
525 domain
=os
.environ
["DOMAIN"],
526 realm
=self
.restore_realm
)
527 self
.assertRaises(BlackboxProcessError
, self
.check_output
, bad_cmd
)
529 def add_link(self
, attr
, source
, target
):
531 m
.dn
= ldb
.Dn(self
.ldb
, source
)
532 m
[attr
] = ldb
.MessageElement(target
, ldb
.FLAG_MOD_ADD
, attr
)
535 def test_one_way_links(self
):
536 """Sanity-check that a rename handles one-way links correctly"""
538 # Do some initial setup on the DC before back it up:
539 # create an OU to hold the test objects we'll create
540 test_ou
= create_test_ou(self
.ldb
, "rename_test")
541 self
.addCleanup(self
.ldb
.delete
, test_ou
, ["tree_delete:1"])
543 # create the source and target objects and link them together.
544 # We use addressBookRoots2 here because it's a one-way link
545 src_dn
= "CN=link_src,%s" % test_ou
546 self
.ldb
.add({"dn": src_dn
,
547 "objectclass": "msExchConfigurationContainer"})
548 target_dn
= "OU=link_tgt,%s" % test_ou
549 self
.ldb
.add({"dn": target_dn
, "objectclass": "organizationalunit"})
550 link_attr
= "addressBookRoots2"
551 self
.add_link(link_attr
, src_dn
, target_dn
)
553 # add a second link target that's in a different partition
554 server_dn
= ("CN=testrename,CN=Servers,CN=Default-First-Site-Name,"
555 "CN=Sites,%s" % str(self
.ldb
.get_config_basedn()))
556 self
.ldb
.add({"dn": server_dn
, "objectclass": "server"})
557 self
.addCleanup(self
.ldb
.delete
, server_dn
)
558 self
.add_link(link_attr
, src_dn
, server_dn
)
560 # do the backup/restore
561 backup_file
= self
.create_backup()
562 self
.restore_backup(backup_file
)
563 lp
= self
.check_restored_smbconf()
564 restored_ldb
= self
.check_restored_database(lp
)
566 # work out what the new DNs should be
567 old_basedn
= str(self
.ldb
.get_default_basedn())
568 new_target_dn
= re
.sub(old_basedn
+ '$', self
.new_basedn
, target_dn
)
569 new_src_dn
= re
.sub(old_basedn
+ '$', self
.new_basedn
, src_dn
)
570 new_server_dn
= re
.sub(old_basedn
+ '$', self
.new_basedn
, server_dn
)
572 # check the links exist in the renamed DB with the correct DNs
573 res
= restored_ldb
.search(base
=new_src_dn
, scope
=ldb
.SCOPE_BASE
,
575 self
.assertEqual(len(res
), 1,
576 "Failed to find renamed link source object")
577 self
.assertTrue(link_attr
in res
[0], "Missing link attribute")
578 link_values
= [str(x
) for x
in res
[0][link_attr
]]
579 self
.assertTrue(new_target_dn
in link_values
)
580 self
.assertTrue(new_server_dn
in link_values
)
582 # extra checks we run on the restored DB in the rename case
583 def check_restored_database(self
, bkp_lp
, expect_secrets
=True):
584 # run the common checks over the restored DB
585 common_test
= super()
586 samdb
= common_test
.check_restored_database(bkp_lp
, expect_secrets
)
588 # check we have actually renamed the DNs
589 basedn
= str(samdb
.get_default_basedn())
590 self
.assertEqual(basedn
, self
.new_basedn
)
592 # check the partition and netBIOS name match the new domain
593 partitions_dn
= samdb
.get_partitions_dn()
594 nc_name
= ldb
.binary_encode(str(basedn
))
595 res
= samdb
.search(base
=partitions_dn
, scope
=ldb
.SCOPE_ONELEVEL
,
596 attrs
=["nETBIOSName", "cn"],
597 expression
='ncName=%s' % nc_name
)
598 self
.assertEqual(len(res
), 1,
599 "Looking up partition's NetBIOS name failed")
600 self
.assertEqual(str(res
[0].get("nETBIOSName")), self
.restore_domain
)
601 self
.assertEqual(str(res
[0].get("cn")), self
.restore_domain
)
603 # check the DC has the correct dnsHostname
604 realm
= self
.restore_realm
605 dn
= "CN=%s,OU=Domain Controllers,%s" % (self
.new_server
,
607 res
= samdb
.search(base
=dn
, scope
=ldb
.SCOPE_BASE
,
608 attrs
=["dNSHostName"])
609 self
.assertEqual(len(res
), 1,
610 "Looking up new DC's dnsHostname failed")
611 expected_val
= "%s.%s" % (self
.new_server
.lower(), realm
)
612 self
.assertEqual(str(res
[0].get("dNSHostName")), expected_val
)
614 # check the DNS zones for the new realm are present
615 dn
= "DC=%s,CN=MicrosoftDNS,DC=DomainDnsZones,%s" % (realm
, basedn
)
616 res
= samdb
.search(base
=dn
, scope
=ldb
.SCOPE_BASE
)
617 self
.assertEqual(len(res
), 1, "Lookup of new domain's DNS zone failed")
619 forestdn
= samdb
.get_root_basedn().get_linearized()
620 dn
= "DC=_msdcs.%s,CN=MicrosoftDNS,DC=ForestDnsZones,%s" % (realm
,
622 res
= samdb
.search(base
=dn
, scope
=ldb
.SCOPE_BASE
)
623 self
.assertEqual(len(res
), 1, "Lookup of new domain's DNS zone failed")
627 class DomainBackupOffline(DomainBackupBase
):
631 self
.base_cmd
= ["domain", "backup", "offline"]
633 def test_backup_untar(self
):
634 self
._test
_backup
_untar
(primary_domain_secrets
=1, verify_checksums
=True)
636 def test_backup_restore_with_conf(self
):
637 self
._test
_backup
_restore
_with
_conf
()
639 def test_backup_restore(self
):
640 self
._test
_backup
_restore
()
642 def test_backup_restore_into_site(self
):
643 self
._test
_backup
_restore
_into
_site
()