1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org>
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from samba
.tests
import BlackboxTestCase
, BlackboxProcessError
22 from samba
.netcmd
import CommandError
23 from samba
.param
import LoadParm
24 from samba
import safe_tarfile
as tarfile
27 # The backup tests require that a completely clean LoadParm object gets used
28 # for the restore. Otherwise the same global LP gets re-used, and the LP
29 # settings can bleed from one test case to another.
30 # To do this, these tests should use check_output(), which executes the command
31 # in a separate process (as opposed to runcmd(), runsubcmd()).
32 # So although this is a samba-tool test, we don't inherit from SambaToolCmdTest
33 # so that we never inadvertently use .runcmd() by accident.
34 class DomainBackupOfflineCmp(BlackboxTestCase
):
36 def test_domain_backup_offline_nested_tdb(self
):
37 self
.nested_testcase('tdb')
39 def test_domain_backup_offline_nested_mdb(self
):
40 self
.nested_testcase('mdb')
42 def nested_testcase(self
, backend
):
43 self
.prov_dir
= self
.provision(backend
)
44 self
.extract_dir
= None
46 src
= os
.path
.join(self
.prov_dir
, "private")
47 dst
= os
.path
.join(self
.prov_dir
, "state", "private")
49 # Move private directory inside state directory
52 smbconf
= os
.path
.join(self
.prov_dir
, "etc", "smb.conf")
54 # Update the conf file
55 lp
= LoadParm(filename_for_non_global_lp
=smbconf
)
56 lp
.set("private dir", dst
)
57 lp
.dump(False, smbconf
)
59 backup_file
= self
.backup(self
.prov_dir
)
61 # Ensure each file is only present once in the tar file
62 tf
= tarfile
.open(backup_file
)
64 self
.assertEqual(len(names
), len(set(names
)))
66 def test_domain_backup_offline_join_restore_tdb(self
):
67 self
.join_restore_testcase('tdb')
69 def test_domain_backup_offline_join_restore_mdb(self
):
70 self
.join_restore_testcase('mdb')
72 def join_restore_testcase(self
, backend
):
73 self
.prov_dir
= self
.join(backend
)
74 self
.extract_dir
= None
77 backup_file
= self
.backup(self
.prov_dir
)
78 except BlackboxProcessError
as e
:
81 self
.extract_dir
= self
.restore(backup_file
)
83 def test_domain_backup_offline_hard_link_tdb(self
):
84 self
.hard_link_testcase('tdb')
86 def test_domain_backup_offline_hard_link_mdb(self
):
87 self
.hard_link_testcase('mdb')
89 def hard_link_testcase(self
, backend
):
90 self
.prov_dir
= self
.provision(backend
)
91 self
.extract_dir
= None
93 # Create hard links in the private and state directories
94 os
.link(os
.path
.join(self
.prov_dir
, "private", "krb5.conf"),
95 os
.path
.join(self
.prov_dir
, "state", "krb5.conf"))
97 backup_file
= self
.backup(self
.prov_dir
)
100 self
.extract_dir
= tempfile
.mkdtemp(dir=self
.tempdir
)
101 tf
= tarfile
.open(backup_file
)
102 tf
.extractall(self
.extract_dir
)
104 # Ensure that the hard link in the private directory was backed up,
105 # while the one in the state directory was not.
106 self
.assertTrue(os
.path
.exists(os
.path
.join(self
.extract_dir
,
107 "private", "krb5.conf")))
108 self
.assertFalse(os
.path
.exists(os
.path
.join(self
.extract_dir
,
109 "statedir", "krb5.conf")))
111 def test_domain_backup_offline_untar_tdb(self
):
112 self
.untar_testcase('tdb')
114 def test_domain_backup_offline_untar_mdb(self
):
115 self
.untar_testcase('mdb')
117 def test_domain_backup_offline_restore_tdb(self
):
118 self
.restore_testcase('tdb')
120 def test_domain_backup_offline_restore_mdb(self
):
121 self
.restore_testcase('mdb')
123 def restore_testcase(self
, backend
):
124 self
.prov_dir
= self
.provision(backend
)
125 self
.extract_dir
= None
126 backup_file
= self
.backup(self
.prov_dir
)
128 self
.extract_dir
= self
.restore(backup_file
)
130 # attrs that are altered by the restore process
131 ignore_attrs
= ["servicePrincipalName", "lastLogonTimestamp",
132 "rIDAllocationPool", "rIDAvailablePool", "rIDUsedPool",
133 "localPolicyFlags", "operatingSystem", "displayName",
134 "dnsRecord", "dNSTombstoned",
135 "msDS-NC-Replica-Locations", "msDS-HasInstantiatedNCs",
136 "interSiteTopologyGenerator", "msKds-DomainID"]
137 filter_arg
= "--filter=" + ",".join(ignore_attrs
)
138 args
= ["--two", filter_arg
]
139 self
.ldapcmp(self
.prov_dir
, self
.extract_dir
, args
)
141 def untar_testcase(self
, backend
):
142 self
.prov_dir
= self
.provision(backend
)
143 self
.extract_dir
= None
144 backup_file
= self
.backup(self
.prov_dir
)
146 self
.extract_dir
= tempfile
.mkdtemp(dir=self
.tempdir
)
147 tf
= tarfile
.open(backup_file
)
148 tf
.extractall(self
.extract_dir
)
150 self
.ldapcmp(self
.prov_dir
, self
.extract_dir
)
152 def ldapcmp(self
, prov_dir
, ex_dir
, args
=None):
155 sam_fn
= os
.path
.join("private", "sam.ldb")
156 url1
= "tdb://" + os
.path
.join(os
.path
.realpath(prov_dir
), sam_fn
)
157 url2
= "tdb://" + os
.path
.join(os
.path
.realpath(ex_dir
), sam_fn
)
159 # Compare the restored sam.ldb with the old one
160 for partition
in ["domain", "configuration", "schema",
161 "dnsdomain", "dnsforest"]:
162 cmd
= "samba-tool ldapcmp " + " ".join([url1
, url2
, partition
] + args
)
163 self
.check_output(cmd
)
165 # Test the "samba-tool domain backup" command with ldapcmp
166 def provision(self
, backend
):
167 target
= tempfile
.mkdtemp(dir=self
.tempdir
)
169 # Provision domain. Use fake ACLs and store xattrs in tdbs so that
170 # NTACL backup will work inside the testenv.
171 # host-name option must be given because if this test runs on a
172 # system with a very long hostname, it will be shortened in certain
173 # circumstances, causing the ldapcmp to fail.
174 prov_cmd
= "samba-tool domain provision " +\
175 "--domain FOO --realm foo.example.com " +\
176 "--targetdir {target} " +\
177 "--backend-store {backend} " +\
178 "--host-name OLDSERVER "+\
179 "--option=\"vfs objects=dfs_samba4 acl_xattr fake_acls xattr_tdb\""
180 prov_cmd
= prov_cmd
.format(target
=target
, backend
=backend
)
181 self
.check_output(prov_cmd
)
185 def join(self
, backend
):
186 target
= tempfile
.mkdtemp(dir=self
.tempdir
)
188 new_dc_name
= "offlinebackupdc"
190 join_cmd
= "samba-tool domain join {domain} DC " +\
191 "--server {server} " +\
192 "--realm {realm} " +\
193 "--username {username}%{password} " +\
194 "--targetdir {target} " +\
195 "--backend-store {backend} " +\
196 "--option='netbios name = {new_dc_name}' " +\
197 "--option=\"vfs objects=dfs_samba4 acl_xattr fake_acls xattr_tdb\""
198 join_cmd
= join_cmd
.format(server
=os
.environ
["DC_SERVER"],
199 domain
=os
.environ
["DOMAIN"],
200 realm
=os
.environ
["REALM"],
201 username
=os
.environ
["USERNAME"],
202 password
=os
.environ
["PASSWORD"],
205 new_dc_name
=new_dc_name
)
206 self
.check_output(join_cmd
)
208 demote_cmd
= "samba-tool domain demote " +\
209 "--server {server} " +\
210 "--username {username}%{password} " +\
211 "--remove-other-dead-server={new_dc_name}"
213 demote_cmd
= demote_cmd
.format(server
=os
.environ
["DC_SERVER"],
214 username
=os
.environ
["USERNAME"],
215 password
=os
.environ
["PASSWORD"],
216 new_dc_name
=new_dc_name
)
217 self
.check_output(demote_cmd
)
221 def backup(self
, prov_dir
):
222 # Run the backup and check we got one backup tar file
223 cmd
= ("samba-tool domain backup offline --targetdir={prov_dir} "
224 "--configfile={prov_dir}/etc/smb.conf").format(prov_dir
=prov_dir
)
225 self
.check_output(cmd
)
227 tar_files
= [fn
for fn
in os
.listdir(prov_dir
)
228 if fn
.startswith("samba-backup-") and
229 fn
.endswith(".tar.bz2")]
230 if len(tar_files
) != 1:
231 raise CommandError("expected domain backup to create one tar" +
232 " file but got {0}".format(len(tar_files
)))
234 backup_file
= os
.path
.join(prov_dir
, tar_files
[0])
237 def restore(self
, backup_file
):
238 # Restore from a backup file
239 extract_dir
= tempfile
.mkdtemp(dir=self
.tempdir
)
240 cmd
= ("samba-tool domain backup restore --backup-file={f}"
242 "--newservername=NEWSERVER").format(f
=backup_file
,
244 self
.check_output(cmd
)
249 # Remove temporary directories
250 shutil
.rmtree(self
.prov_dir
)
252 shutil
.rmtree(self
.extract_dir
)