1 # Blackbox tests for "samba-tool drs" command
2 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Blackbox tests for samba-tool drs showrepl."""
22 from samba
.dcerpc
import drsuapi
23 from samba
import drs_utils
28 from samba
.common
import get_string
30 GUID_RE
= r
'[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}'
31 HEX8_RE
= r
'0x[\da-f]{8}'
32 DN_RE
= r
'(?:(?:CN|DC)=[\\:\w -]+,)+DC=com'
35 class SambaToolDrsShowReplTests(drs_base
.DrsBaseTestCase
):
36 """Blackbox test case for samba-tool drs."""
39 super(SambaToolDrsShowReplTests
, self
).setUp()
41 self
.dc1
= samba
.tests
.env_get_var_value("DC1")
42 self
.dc2
= samba
.tests
.env_get_var_value("DC2")
44 creds
= self
.get_credentials()
45 self
.cmdline_creds
= "-U%s/%s%%%s" % (creds
.get_domain(),
49 def test_samba_tool_showrepl(self
):
50 """Tests 'samba-tool drs showrepl' command.
52 nc_list
= [self
.config_dn
, self
.domain_dn
, self
.schema_dn
]
53 dns_name
= self
.ldb_dc1
.domain_dns_name()
55 # Manually run kcc to create a "Connection" object, so we can find
56 # this for the expected output below.
57 kcc_out
= self
.check_output("samba-tool drs kcc %s %s" % (self
.dc1
,
59 self
.assertIn(b
"successful", kcc_out
)
61 # Run replicate to ensure there are incoming and outgoing partners
62 # exist, so we can expect these in the output below.
64 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc2
, fromDC
=self
.dnsname_dc1
, nc_dn
=nc
, forced
=True)
65 self
._net
_drs
_replicate
(DC
=self
.dnsname_dc1
, fromDC
=self
.dnsname_dc2
, nc_dn
=nc
, forced
=True)
67 # Output should be like:
68 # <site-name>/<domain-name>
69 # DSA Options: <hex-options>
70 # DSA object GUID: <DSA-object-GUID>
71 # DSA invocationId: <DSA-invocationId>
72 # <Inbound-connections-list>
73 # <Outbound-connections-list>
76 # TODO: Perhaps we should check at least for
77 # DSA's objectGUDI and invocationId
78 out
= self
.check_output("samba-tool drs showrepl "
79 "%s %s" % (self
.dc1
, self
.cmdline_creds
))
82 # We want to assert that we are getting the same results, but
83 # dates and GUIDs change randomly.
85 # There are sections with headers like ==== THIS ===="
89 _conn
, conn
) = out
.split("====")
91 self
.assertEqual(_inbound
, ' INBOUND NEIGHBORS ')
92 self
.assertEqual(_outbound
, ' OUTBOUND NEIGHBORS ')
93 self
.assertEqual(_conn
, ' KCC CONNECTION OBJECTS ')
95 self
.assertRegex(header
,
96 r
'^Default-First-Site-Name\\%s\s+'
98 r
"DSA object GUID: %s\s+"
99 r
"DSA invocationId: %s" %
100 (self
.dc1
.upper(), HEX8_RE
, GUID_RE
, GUID_RE
))
102 # We don't assert the DomainDnsZones and ForestDnsZones are
103 # there because we don't know that they have been set up yet.
109 r
'\tDefault-First-Site-Name\\[A-Z0-9]+ via RPC\n'
110 r
'\t\tDSA object GUID: %s\n'
111 r
'\t\tLast attempt @ [^\n]+\n'
112 r
'\t\t\d+ consecutive failure\(s\).\n'
113 r
'\t\tLast success @ [^\n]+\n'
114 r
'\n' % (p
, GUID_RE
),
115 msg
="%s inbound missing" % p
)
120 r
'\tDefault-First-Site-Name\\[A-Z0-9]+ via RPC\n'
121 r
'\t\tDSA object GUID: %s\n'
122 r
'\t\tLast attempt @ [^\n]+\n'
123 r
'\t\t\d+ consecutive failure\(s\).\n'
124 r
'\t\tLast success @ [^\n]+\n'
125 r
'\n' % (p
, GUID_RE
),
126 msg
="%s outbound missing" % p
)
128 self
.assertRegex(conn
,
130 r
'\tConnection name: %s\n'
131 r
'\tEnabled : TRUE\n'
132 r
'\tServer DNS name : \w+.%s\n'
133 r
'\tServer DN name : %s'
134 r
'\n' % (GUID_RE
, dns_name
, DN_RE
))
136 def test_samba_tool_showrepl_json(self
):
137 """Tests 'samba-tool drs showrepl --json' command.
139 dns_name
= self
.ldb_dc1
.domain_dns_name()
140 out
= self
.check_output("samba-tool drs showrepl %s %s --json" %
141 (self
.dc1
, self
.cmdline_creds
))
142 d
= json
.loads(get_string(out
))
143 self
.assertEqual(set(d
), set(['repsFrom',
149 for k
in ["objectGUID", "invocationId"]:
150 self
.assertRegex(d
['dsa'][k
], '^%s$' % GUID_RE
)
151 self
.assertTrue(isinstance(d
['dsa']["options"], int))
153 # repsfrom and repsto
154 for reps
in (d
['repsFrom'], d
['repsTo']):
156 for k
in ('NC dn', "NTDS DN"):
157 self
.assertRegex(r
[k
], '^%s$' % DN_RE
)
158 for k
in ("last attempt time",
159 "last attempt message",
161 self
.assertTrue(isinstance(r
[k
], str))
162 self
.assertRegex(r
["DSA objectGUID"], '^%s$' % GUID_RE
)
163 self
.assertTrue(isinstance(r
["consecutive failures"], int))
166 for n
in d
["NTDSConnections"]:
167 self
.assertTrue(n
["dns name"].endswith(dns_name
))
168 self
.assertRegex(n
["name"], "^%s$" % GUID_RE
)
169 self
.assertTrue(isinstance(n
['enabled'], bool))
170 self
.assertTrue(isinstance(n
['options'], int))
171 self
.assertTrue(isinstance(n
['replicates NC'], list))
172 self
.assertRegex(n
["remote DN"], "^%s$" % DN_RE
)
174 def _force_all_reps(self
, samdb
, dc
, direction
):
175 if direction
== 'inbound':
176 info_type
= drsuapi
.DRSUAPI_DS_REPLICA_INFO_NEIGHBORS
177 elif direction
== 'outbound':
178 info_type
= drsuapi
.DRSUAPI_DS_REPLICA_INFO_REPSTO
180 raise ValueError("expected 'inbound' or 'outbound'")
182 self
._enable
_all
_repl
(dc
)
183 lp
= self
.get_loadparm()
184 creds
= self
.get_credentials()
185 drsuapi_conn
, drsuapi_handle
, _
= drs_utils
.drsuapi_connect(dc
, lp
, creds
)
186 req1
= drsuapi
.DsReplicaGetInfoRequest1()
187 req1
.info_type
= info_type
188 _
, info
= drsuapi_conn
.DsReplicaGetInfo(drsuapi_handle
, 1, req1
)
190 # you might think x.source_dsa_address was the thing, but no.
191 # and we need to filter out RODCs and deleted DCs
195 res
= samdb
.search(base
=x
.source_dsa_obj_dn
,
196 scope
=ldb
.SCOPE_BASE
,
197 attrs
=['msDS-isRODC', 'isDeleted'],
198 controls
=['show_deleted:0'])
199 except ldb
.LdbError
as e
:
200 if e
.args
[0] != ldb
.ERR_NO_SUCH_OBJECT
:
204 len(res
[0].get('msDS-isRODC', '')) > 0 or
205 res
[0]['isDeleted'] == 'TRUE'):
208 dsa_dn
= str(ldb
.Dn(samdb
, x
.source_dsa_obj_dn
).parent())
210 res
= samdb
.search(base
=dsa_dn
,
211 scope
=ldb
.SCOPE_BASE
,
212 attrs
=['dNSHostName'])
213 except ldb
.LdbError
as e
:
214 if e
.args
[0] != ldb
.ERR_NO_SUCH_OBJECT
:
219 print("server %s has no dNSHostName" % dsa_dn
)
222 remote
= res
[0].get('dNSHostName', [''])[0]
224 self
._enable
_all
_repl
(remote
)
226 if direction
== 'inbound':
227 src
, dest
= remote
, dc
229 src
, dest
= dc
, remote
230 self
._net
_drs
_replicate
(dest
, src
, forced
=True)
232 def test_samba_tool_showrepl_pull_summary_all_good(self
):
233 """Tests 'samba-tool drs showrepl --pull-summary' command."""
234 # To be sure that all is good we need to force replication
235 # with everyone (because others might have it turned off), and
236 # turn replication on for them in case they suddenly decide to
239 # We don't restore them to the non-auto-replication state.
240 samdb1
= self
.getSamDB("-H", "ldap://%s" % self
.dc1
,
242 self
._enable
_all
_repl
(self
.dc1
)
243 self
._force
_all
_reps
(samdb1
, self
.dc1
, 'inbound')
244 self
._force
_all
_reps
(samdb1
, self
.dc1
, 'outbound')
245 old_no_color
= os
.environ
.get('NO_COLOR')
246 all_good_green
= "\033[1;32m[ALL GOOD]\033[0m\n"
247 all_good
= "[ALL GOOD]\n"
250 out
= self
.check_output(
251 "samba-tool drs showrepl --pull-summary %s %s" %
252 (self
.dc1
, self
.cmdline_creds
))
253 out
= get_string(out
)
254 self
.assertStringsEqual(out
, all_good
)
255 out
= get_string(out
)
257 out
= self
.check_output("samba-tool drs showrepl --pull-summary "
258 "--color=yes %s %s" %
259 (self
.dc1
, self
.cmdline_creds
))
260 out
= get_string(out
)
261 self
.assertStringsEqual(out
, all_good_green
)
263 # --verbose output is still quiet when all is good.
264 out
= self
.check_output(
265 "samba-tool drs showrepl --pull-summary -v %s %s" %
266 (self
.dc1
, self
.cmdline_creds
))
267 out
= get_string(out
)
268 self
.assertStringsEqual(out
, all_good
)
270 out
= self
.check_output("samba-tool drs showrepl --pull-summary -v "
271 "--color=always %s %s" %
272 (self
.dc1
, self
.cmdline_creds
))
273 out
= get_string(out
)
274 self
.assertStringsEqual(out
, all_good_green
)
276 out
= self
.check_output("samba-tool drs showrepl --pull-summary -v "
277 "--color=never %s %s" %
278 (self
.dc1
, self
.cmdline_creds
))
279 out
= get_string(out
)
280 self
.assertStringsEqual(out
, all_good
)
282 os
.environ
['NO_COLOR'] = 'bean'
284 out
= self
.check_output("samba-tool drs showrepl --pull-summary -v "
285 "--color=auto %s %s" %
286 (self
.dc1
, self
.cmdline_creds
))
287 out
= get_string(out
)
288 self
.assertStringsEqual(out
, all_good
)
290 os
.environ
['NO_COLOR'] = ''
292 out
= self
.check_output("samba-tool drs showrepl --pull-summary -v "
293 "--color=auto %s %s" %
294 (self
.dc1
, self
.cmdline_creds
))
295 out
= get_string(out
)
296 self
.assertStringsEqual(out
, all_good_green
)
298 except samba
.tests
.BlackboxProcessError
as e
:
301 if old_no_color
is None:
302 os
.environ
.pop('NO_COLOR', None)
304 os
.environ
['NO_COLOR'] = old_no_color
306 def test_samba_tool_showrepl_summary_forced_failure(self
):
307 """Tests 'samba-tool drs showrepl --summary' command when we break the
310 self
.addCleanup(self
._enable
_all
_repl
, self
.dc1
)
311 self
._disable
_all
_repl
(self
.dc1
)
313 samdb1
= self
.getSamDB("-H", "ldap://%s" % self
.dc1
,
315 samdb2
= self
.getSamDB("-H", "ldap://%s" % self
.dc2
,
317 domain_dn
= samdb1
.domain_dn()
319 # Add some things to NOT replicate
320 ou1
= "OU=dc1.%x,%s" % (random
.randrange(1 << 64), domain_dn
)
321 ou2
= "OU=dc2.%x,%s" % (random
.randrange(1 << 64), domain_dn
)
324 "objectclass": "organizationalUnit"
326 self
.addCleanup(samdb1
.delete
, ou1
, ['tree_delete:1'])
329 "objectclass": "organizationalUnit"
331 self
.addCleanup(samdb2
.delete
, ou2
, ['tree_delete:1'])
333 dn1
= 'cn=u1.%%d,%s' % (ou1
)
334 dn2
= 'cn=u2.%%d,%s' % (ou2
)
340 "objectclass": "user"
344 "objectclass": "user"
346 out
= self
.check_output("samba-tool drs showrepl --summary -v "
348 (self
.dc1
, self
.cmdline_creds
))
349 out
= get_string(out
)
350 self
.assertStringsEqual('[ALL GOOD]', out
, strip
=True)
351 out
= self
.check_output("samba-tool drs showrepl --summary -v "
352 "--color=yes %s %s" %
353 (self
.dc2
, self
.cmdline_creds
))
354 out
= get_string(out
)
355 self
.assertIn('[ALL GOOD]', out
)
357 except samba
.tests
.BlackboxProcessError
as e
:
358 e_stdout
= get_string(e
.stdout
)
359 e_stderr
= get_string(e
.stderr
)
360 print("Good, failed as expected after %d rounds: %r" % (i
, e
.cmd
))
361 self
.assertIn('There are failing connections', e_stdout
,
362 msg
=('stdout: %r\nstderr: %r\nretcode: %s'
363 '\nmessage: %r\ncmd: %r') % (e_stdout
,
371 r
'\(WERR_DS_DRA_(SINK|SOURCE)_DISABLED\)',
372 msg
=("The process should have failed "
373 "because replication was forced off, "
374 "but it failed for some other reason."))
375 self
.assertIn('consecutive failure(s).', e_stdout
)
377 self
.fail("No DRS failure noticed after 100 rounds of trying")