ctdb-scripts: Support storing statd-callout state in cluster filesystem
[samba4-gss.git] / source4 / torture / drs / python / getnc_unpriv.py
blobc53906acd8fc4a0dbce74208e3ae3eea4e32b460
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # Tests replication scenarios with different user privileges.
5 # We want to test every replication scenario we can think of against:
6 # - users with only GET_CHANGES privileges
7 # - users with only GET_ALL_CHANGES privileges
8 # - users with both GET_CHANGES and GET_ALL_CHANGES privileges
9 # - users with no privileges
11 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
12 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
14 # This program is free software; you can redistribute it and/or modify
15 # it under the terms of the GNU General Public License as published by
16 # the Free Software Foundation; either version 3 of the License, or
17 # (at your option) any later version.
19 # This program is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 # GNU General Public License for more details.
24 # You should have received a copy of the GNU General Public License
25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 # Usage:
30 # export DC1=dc1_dns_name
31 # export DC2=dc2_dns_name
32 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
33 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_unpriv -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
36 import drs_base
37 import samba.tests
38 from samba import werror, WERRORError
40 from samba import sd_utils
41 import ldb
42 from ldb import SCOPE_BASE
43 import random
45 from samba.dcerpc import drsuapi, security
46 from samba.credentials import DONT_USE_KERBEROS
49 class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
50 """Confirm the behaviour of DsGetNCChanges for unprivileged users"""
52 def setUp(self):
53 super(DrsReplicaSyncUnprivTestCase, self).setUp()
54 self.get_changes_user = "get-changes-user"
55 self.base_dn = self.ldb_dc1.get_default_basedn()
56 self.user_pass = samba.generate_random_password(12, 16)
58 # add some randomness to the test OU. (Deletion of the last test's
59 # objects can be slow to replicate out. So the OU created by a previous
60 # testenv may still exist at this point).
61 rand = random.randint(1, 10000000)
62 test_ou = "OU=test_getnc_unpriv%d" % rand
63 self.ou = "%s,%s" % (test_ou, self.base_dn)
64 self.ldb_dc1.add({
65 "dn": self.ou,
66 "objectclass": "organizationalUnit"})
67 self.ldb_dc1.newuser(self.get_changes_user, self.user_pass,
68 userou=test_ou)
69 (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
71 self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
72 self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
73 user_sid = self.sd_utils.get_object_sid(self.user_dn)
74 self.acl_mod_get_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
75 str(user_sid))
76 self.acl_mod_get_all_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
77 str(user_sid))
78 self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
80 # We set DONT_USE_KERBEROS to avoid a race with getting the
81 # user replicated to our selected KDC
82 self.user_creds = self.insta_creds(template=self.get_credentials(),
83 username=self.get_changes_user,
84 userpass=self.user_pass,
85 kerberos_state=DONT_USE_KERBEROS)
86 (self.user_drs, self.user_drs_handle) = self._ds_bind(self.dnsname_dc1,
87 self.user_creds)
89 def tearDown(self):
90 self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
91 try:
92 self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
93 except ldb.LdbError as e1:
94 (enum, string) = e1.args
95 if enum == ldb.ERR_NO_SUCH_OBJECT:
96 pass
97 super(DrsReplicaSyncUnprivTestCase, self).tearDown()
99 def _test_repl_exop(self, exop, repl_obj, expected_error, dest_dsa=None,
100 partial_attribute_set=None):
102 Common function to send a replication request and check the result
103 matches what's expected.
105 req8 = self._exop_req8(dest_dsa=dest_dsa,
106 invocation_id=self.ldb_dc1.get_invocation_id(),
107 nc_dn_str=repl_obj,
108 exop=exop,
109 replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP,
110 partial_attribute_set=partial_attribute_set)
112 if expected_error is None:
113 # user is OK, request should be accepted without throwing an error
114 (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
115 8, req8)
116 else:
117 # check the request is rejected (with the error we're expecting)
118 try:
119 (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
120 8, req8)
121 self.fail("Should have failed with user denied access")
122 except WERRORError as e:
123 (enum, estr) = e.args
124 self.assertTrue(enum in expected_error,
125 "Got unexpected error: %s" % estr)
127 def _test_repl_single_obj(self, repl_obj, expected_error,
128 partial_attribute_set=None):
130 Checks that replication on a single object either succeeds or fails as
131 expected (based on the user's access rights)
133 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
134 repl_obj=repl_obj,
135 expected_error=expected_error,
136 partial_attribute_set=partial_attribute_set)
138 def _test_repl_secret(self, repl_obj, expected_error, dest_dsa=None):
140 Checks that REPL_SECRET on an object either succeeds or fails as
141 expected (based on the user's access rights)
143 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
144 repl_obj=repl_obj,
145 expected_error=expected_error,
146 dest_dsa=dest_dsa)
148 def _test_repl_full(self, expected_error, partial_attribute_set=None):
150 Checks that a full replication either succeeds or fails as expected
151 (based on the user's access rights)
153 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
154 repl_obj=self.ldb_dc1.get_default_basedn(),
155 expected_error=expected_error,
156 partial_attribute_set=partial_attribute_set)
158 def _test_repl_full_on_ou(self, repl_obj, expected_error):
160 Full replication on a specific OU should always fail (it should be done
161 against a base NC). The error may vary based on the user's access rights
163 # Just try against the OU created in the test setup
164 self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
165 repl_obj=repl_obj,
166 expected_error=expected_error)
168 def test_repl_getchanges_userpriv(self):
170 Tests various replication requests made by a user with only GET_CHANGES
171 rights. Some requests will be accepted, but most will be rejected.
174 # Assign the user GET_CHANGES rights
175 self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_changes)
177 self._test_repl_single_obj(repl_obj=self.ou,
178 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
179 bad_ou = "OU=bad_obj,%s" % self.ou
180 self._test_repl_single_obj(repl_obj=bad_ou,
181 expected_error=[werror.WERR_DS_DRA_BAD_DN,
182 werror.WERR_DS_DRA_ACCESS_DENIED])
184 self._test_repl_secret(repl_obj=self.ou,
185 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
186 self._test_repl_secret(repl_obj=self.user_dn,
187 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
188 self._test_repl_secret(repl_obj=self.user_dn,
189 dest_dsa=self.ldb_dc1.get_ntds_GUID(),
190 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
191 self._test_repl_secret(repl_obj=bad_ou,
192 expected_error=[werror.WERR_DS_DRA_BAD_DN])
194 self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
195 self._test_repl_full_on_ou(repl_obj=self.ou,
196 expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC,
197 werror.WERR_DS_DRA_ACCESS_DENIED])
198 self._test_repl_full_on_ou(repl_obj=bad_ou,
199 expected_error=[werror.WERR_DS_DRA_BAD_NC,
200 werror.WERR_DS_DRA_ACCESS_DENIED])
202 # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we
203 # expect the following to succeed
204 self._test_repl_single_obj(repl_obj=self.ou,
205 expected_error=None,
206 partial_attribute_set=self.get_partial_attribute_set())
207 self._test_repl_full(expected_error=None,
208 partial_attribute_set=self.get_partial_attribute_set())
210 def test_repl_getallchanges_userpriv(self):
212 Tests various replication requests made by a user with only
213 GET_ALL_CHANGES rights. Note that assigning these rights is possible,
214 but doesn't make a lot of sense. We test it anyway for consistency.
217 # Assign the user GET_ALL_CHANGES rights
218 self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_all_changes)
220 # We can expect to get the same responses as an unprivileged user,
221 # i.e. we have permission to see the results, but don't have permission
222 # to ask
223 self.test_repl_no_userpriv()
225 def test_repl_both_userpriv(self):
227 Tests various replication requests made by a privileged user (i.e. has
228 both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests
229 to be accepted.
232 # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights
233 both_rights = self.acl_mod_get_changes + self.acl_mod_get_all_changes
234 self.sd_utils.dacl_add_ace(self.base_dn, both_rights)
236 self._test_repl_single_obj(repl_obj=self.ou,
237 expected_error=None)
238 bad_ou = "OU=bad_obj,%s" % self.ou
239 self._test_repl_single_obj(repl_obj=bad_ou,
240 expected_error=[werror.WERR_DS_DRA_BAD_DN])
242 # Microsoft returns DB_ERROR, Samba returns ACCESS_DENIED
243 self._test_repl_secret(repl_obj=self.ou,
244 expected_error=[werror.WERR_DS_DRA_DB_ERROR,
245 werror.WERR_DS_DRA_ACCESS_DENIED])
246 self._test_repl_secret(repl_obj=self.user_dn,
247 expected_error=[werror.WERR_DS_DRA_DB_ERROR,
248 werror.WERR_DS_DRA_ACCESS_DENIED])
249 # Note that Windows accepts this but Samba rejects it
250 self._test_repl_secret(repl_obj=self.user_dn,
251 dest_dsa=self.ldb_dc1.get_ntds_GUID(),
252 expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
254 self._test_repl_secret(repl_obj=bad_ou,
255 expected_error=[werror.WERR_DS_DRA_BAD_DN])
257 self._test_repl_full(expected_error=None)
258 self._test_repl_full_on_ou(repl_obj=self.ou,
259 expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC])
260 self._test_repl_full_on_ou(repl_obj=bad_ou,
261 expected_error=[werror.WERR_DS_DRA_BAD_NC,
262 werror.WERR_DS_DRA_BAD_DN])
264 self._test_repl_single_obj(repl_obj=self.ou,
265 expected_error=None,
266 partial_attribute_set=self.get_partial_attribute_set())
267 self._test_repl_full(expected_error=None,
268 partial_attribute_set=self.get_partial_attribute_set())
270 def test_repl_no_userpriv(self):
272 Tests various replication requests made by a unprivileged user.
273 We expect all these requests to be rejected.
276 # Microsoft usually returns BAD_DN, Samba returns ACCESS_DENIED
277 usual_error = [werror.WERR_DS_DRA_BAD_DN, werror.WERR_DS_DRA_ACCESS_DENIED]
279 self._test_repl_single_obj(repl_obj=self.ou,
280 expected_error=usual_error)
281 bad_ou = "OU=bad_obj,%s" % self.ou
282 self._test_repl_single_obj(repl_obj=bad_ou,
283 expected_error=usual_error)
285 self._test_repl_secret(repl_obj=self.ou,
286 expected_error=usual_error)
287 self._test_repl_secret(repl_obj=self.user_dn,
288 expected_error=usual_error)
289 self._test_repl_secret(repl_obj=self.user_dn,
290 dest_dsa=self.ldb_dc1.get_ntds_GUID(),
291 expected_error=usual_error)
292 self._test_repl_secret(repl_obj=bad_ou,
293 expected_error=usual_error)
295 self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
296 self._test_repl_full_on_ou(repl_obj=self.ou,
297 expected_error=usual_error)
298 self._test_repl_full_on_ou(repl_obj=bad_ou,
299 expected_error=[werror.WERR_DS_DRA_BAD_NC,
300 werror.WERR_DS_DRA_ACCESS_DENIED])
302 self._test_repl_single_obj(repl_obj=self.ou,
303 expected_error=usual_error,
304 partial_attribute_set=self.get_partial_attribute_set())
305 self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED],
306 partial_attribute_set=self.get_partial_attribute_set())