2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Anatoliy Atanasov <anatoliy.atanasov@postpath.com> 2010
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 # export DC1=dc1_dns_name
23 # export DC2=dc2_dns_name
24 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
25 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN fsmo -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
32 sys
.path
.insert(0, "bin/python")
34 from ldb
import SCOPE_BASE
39 class DrsFsmoTestCase(drs_base
.DrsBaseTestCase
):
42 super(DrsFsmoTestCase
, self
).setUp()
44 # we have to wait for the replication before we make the check
45 self
.fsmo_wait_max_time
= 20
46 self
.fsmo_wait_sleep_time
= 0.2
48 # cache some of RootDSE props
49 self
.dsServiceName_dc1
= self
.info_dc1
["dsServiceName"][0]
50 self
.dsServiceName_dc2
= self
.info_dc2
["dsServiceName"][0]
51 self
.infrastructure_dn
= "CN=Infrastructure," + self
.domain_dn
52 self
.naming_dn
= "CN=Partitions," + self
.config_dn
53 self
.rid_dn
= "CN=RID Manager$,CN=System," + self
.domain_dn
54 self
.domain_dns_dn
= (
55 "CN=Infrastructure,DC=DomainDnsZones, %s" % self
.domain_dn
)
56 self
.forest_dns_dn
= (
57 "CN=Infrastructure,DC=ForestDnsZones, %s" % self
.domain_dn
)
60 super(DrsFsmoTestCase
, self
).tearDown()
62 def _net_fsmo_role_transfer(self
, DC
, role
, noop
=False):
63 # make command line credentials string
64 ccache_name
= self
.get_creds_ccache_name()
65 cmd_line_auth
= "--use-krb5-ccache=%s" % ccache_name
66 (result
, out
, err
) = self
.runsubcmd("fsmo", "transfer",
68 "-H", "ldap://%s:389" % DC
,
71 self
.assertCmdSuccess(result
, out
, err
)
72 self
.assertEqual(err
, "", "Shouldn't be any error messages")
74 self
.assertTrue("FSMO transfer of '%s' role successful" % role
in out
)
76 self
.assertTrue("This DC already has the '%s' FSMO role" % role
in out
)
78 def _wait_for_role_transfer(self
, ldb_dc
, role_dn
, master
):
79 """Wait for role transfer for certain amount of time
81 :return: (Result=True|False, CurrentMasterDnsName) tuple
84 retries
= int(self
.fsmo_wait_max_time
/ self
.fsmo_wait_sleep_time
) + 1
85 for i
in range(0, retries
):
86 # check if master has been transferred
87 res
= ldb_dc
.search(role_dn
,
88 scope
=SCOPE_BASE
, attrs
=["fSMORoleOwner"])
89 assert len(res
) == 1, "Only one fSMORoleOwner value expected!"
90 cur_master
= res
[0]["fSMORoleOwner"][0]
91 if master
== cur_master
:
92 return (True, cur_master
)
93 # skip last sleep, if no need to wait anymore
94 if i
!= (retries
- 1):
95 # wait a little bit before next retry
96 time
.sleep(self
.fsmo_wait_sleep_time
)
97 return (False, cur_master
)
99 def _role_transfer(self
, role
, role_dn
):
100 """Triggers transfer of role from DC1 to DC2
101 and vice versa so the role goes back to the original dc"""
102 # dc2 gets the role from dc1
103 print("Testing for %s role transfer from %s to %s" % (role
, self
.dnsname_dc1
, self
.dnsname_dc2
))
105 self
._net
_fsmo
_role
_transfer
(DC
=self
.dnsname_dc2
, role
=role
)
106 # check if the role is transferred
107 (res
, master
) = self
._wait
_for
_role
_transfer
(ldb_dc
=self
.ldb_dc2
,
109 master
=self
.dsServiceName_dc2
)
111 "Transferring %s role to %s has failed, master is: %s!" % (role
, self
.dsServiceName_dc2
, master
))
113 # dc1 gets back the role from dc2
114 print("Testing for %s role transfer from %s to %s" % (role
, self
.dnsname_dc2
, self
.dnsname_dc1
))
115 self
._net
_fsmo
_role
_transfer
(DC
=self
.dnsname_dc1
, role
=role
)
116 # check if the role is transferred
117 (res
, master
) = self
._wait
_for
_role
_transfer
(ldb_dc
=self
.ldb_dc1
,
119 master
=self
.dsServiceName_dc1
)
121 "Transferring %s role to %s has failed, master is: %s!" % (role
, self
.dsServiceName_dc1
, master
))
124 print("Testing for no-op %s role transfer from %s to %s" % (role
, self
.dnsname_dc2
, self
.dnsname_dc1
))
125 self
._net
_fsmo
_role
_transfer
(DC
=self
.dnsname_dc1
, role
=role
, noop
=True)
126 # check if the role is transferred
127 (res
, master
) = self
._wait
_for
_role
_transfer
(ldb_dc
=self
.ldb_dc1
,
129 master
=self
.dsServiceName_dc1
)
131 "Transferring %s role to %s has failed, master is: %s!" % (role
, self
.dsServiceName_dc1
, master
))
133 def test_SchemaMasterTransfer(self
):
134 self
._role
_transfer
(role
="schema", role_dn
=self
.schema_dn
)
136 def test_InfrastructureMasterTransfer(self
):
137 self
._role
_transfer
(role
="infrastructure", role_dn
=self
.infrastructure_dn
)
139 def test_PDCMasterTransfer(self
):
140 self
._role
_transfer
(role
="pdc", role_dn
=self
.domain_dn
)
142 def test_RIDMasterTransfer(self
):
143 self
._role
_transfer
(role
="rid", role_dn
=self
.rid_dn
)
145 def test_NamingMasterTransfer(self
):
146 self
._role
_transfer
(role
="naming", role_dn
=self
.naming_dn
)
148 def test_DomainDnsZonesMasterTransfer(self
):
149 self
._role
_transfer
(role
="domaindns", role_dn
=self
.domain_dns_dn
)
151 def test_ForestDnsZonesMasterTransfer(self
):
152 self
._role
_transfer
(role
="forestdns", role_dn
=self
.forest_dns_dn
)