2 # -*- coding: utf-8 -*-
4 # Tests replication scenarios with different user privileges.
5 # We want to test every replication scenario we can think of against:
6 # - users with only GET_CHANGES privileges
7 # - users with only GET_ALL_CHANGES privileges
8 # - users with both GET_CHANGES and GET_ALL_CHANGES privileges
9 # - users with no privileges
11 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
12 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
14 # This program is free software; you can redistribute it and/or modify
15 # it under the terms of the GNU General Public License as published by
16 # the Free Software Foundation; either version 3 of the License, or
17 # (at your option) any later version.
19 # This program is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 # GNU General Public License for more details.
24 # You should have received a copy of the GNU General Public License
25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
30 # export DC1=dc1_dns_name
31 # export DC2=dc2_dns_name
32 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
33 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_unpriv -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
38 from samba
import werror
, WERRORError
40 from samba
import sd_utils
42 from ldb
import SCOPE_BASE
45 from samba
.dcerpc
import drsuapi
, security
46 from samba
.credentials
import DONT_USE_KERBEROS
49 class DrsReplicaSyncUnprivTestCase(drs_base
.DrsBaseTestCase
):
50 """Confirm the behaviour of DsGetNCChanges for unprivileged users"""
53 super(DrsReplicaSyncUnprivTestCase
, self
).setUp()
54 self
.get_changes_user
= "get-changes-user"
55 self
.base_dn
= self
.ldb_dc1
.get_default_basedn()
56 self
.user_pass
= samba
.generate_random_password(12, 16)
58 # add some randomness to the test OU. (Deletion of the last test's
59 # objects can be slow to replicate out. So the OU created by a previous
60 # testenv may still exist at this point).
61 rand
= random
.randint(1, 10000000)
62 test_ou
= "OU=test_getnc_unpriv%d" % rand
63 self
.ou
= "%s,%s" % (test_ou
, self
.base_dn
)
66 "objectclass": "organizationalUnit"})
67 self
.ldb_dc1
.newuser(self
.get_changes_user
, self
.user_pass
,
69 (self
.drs
, self
.drs_handle
) = self
._ds
_bind
(self
.dnsname_dc1
)
71 self
.sd_utils
= sd_utils
.SDUtils(self
.ldb_dc1
)
72 self
.user_dn
= "cn=%s,%s" % (self
.get_changes_user
, self
.ou
)
73 user_sid
= self
.sd_utils
.get_object_sid(self
.user_dn
)
74 self
.acl_mod_get_changes
= "(OA;;CR;%s;;%s)" % (security
.GUID_DRS_GET_CHANGES
,
76 self
.acl_mod_get_all_changes
= "(OA;;CR;%s;;%s)" % (security
.GUID_DRS_GET_ALL_CHANGES
,
78 self
.desc_sddl
= self
.sd_utils
.get_sd_as_sddl(self
.base_dn
)
80 # We set DONT_USE_KERBEROS to avoid a race with getting the
81 # user replicated to our selected KDC
82 self
.user_creds
= self
.insta_creds(template
=self
.get_credentials(),
83 username
=self
.get_changes_user
,
84 userpass
=self
.user_pass
,
85 kerberos_state
=DONT_USE_KERBEROS
)
86 (self
.user_drs
, self
.user_drs_handle
) = self
._ds
_bind
(self
.dnsname_dc1
,
90 self
.sd_utils
.modify_sd_on_dn(self
.base_dn
, self
.desc_sddl
)
92 self
.ldb_dc1
.delete(self
.ou
, ["tree_delete:1"])
93 except ldb
.LdbError
as e1
:
94 (enum
, string
) = e1
.args
95 if enum
== ldb
.ERR_NO_SUCH_OBJECT
:
97 super(DrsReplicaSyncUnprivTestCase
, self
).tearDown()
99 def _test_repl_exop(self
, exop
, repl_obj
, expected_error
, dest_dsa
=None,
100 partial_attribute_set
=None):
102 Common function to send a replication request and check the result
103 matches what's expected.
105 req8
= self
._exop
_req
8(dest_dsa
=dest_dsa
,
106 invocation_id
=self
.ldb_dc1
.get_invocation_id(),
109 replica_flags
=drsuapi
.DRSUAPI_DRS_WRIT_REP
,
110 partial_attribute_set
=partial_attribute_set
)
112 if expected_error
is None:
113 # user is OK, request should be accepted without throwing an error
114 (level
, ctr
) = self
.user_drs
.DsGetNCChanges(self
.user_drs_handle
,
117 # check the request is rejected (with the error we're expecting)
119 (level
, ctr
) = self
.user_drs
.DsGetNCChanges(self
.user_drs_handle
,
121 self
.fail("Should have failed with user denied access")
122 except WERRORError
as e
:
123 (enum
, estr
) = e
.args
124 self
.assertTrue(enum
in expected_error
,
125 "Got unexpected error: %s" % estr
)
127 def _test_repl_single_obj(self
, repl_obj
, expected_error
,
128 partial_attribute_set
=None):
130 Checks that replication on a single object either succeeds or fails as
131 expected (based on the user's access rights)
133 self
._test
_repl
_exop
(exop
=drsuapi
.DRSUAPI_EXOP_REPL_OBJ
,
135 expected_error
=expected_error
,
136 partial_attribute_set
=partial_attribute_set
)
138 def _test_repl_secret(self
, repl_obj
, expected_error
, dest_dsa
=None):
140 Checks that REPL_SECRET on an object either succeeds or fails as
141 expected (based on the user's access rights)
143 self
._test
_repl
_exop
(exop
=drsuapi
.DRSUAPI_EXOP_REPL_SECRET
,
145 expected_error
=expected_error
,
148 def _test_repl_full(self
, expected_error
, partial_attribute_set
=None):
150 Checks that a full replication either succeeds or fails as expected
151 (based on the user's access rights)
153 self
._test
_repl
_exop
(exop
=drsuapi
.DRSUAPI_EXOP_NONE
,
154 repl_obj
=self
.ldb_dc1
.get_default_basedn(),
155 expected_error
=expected_error
,
156 partial_attribute_set
=partial_attribute_set
)
158 def _test_repl_full_on_ou(self
, repl_obj
, expected_error
):
160 Full replication on a specific OU should always fail (it should be done
161 against a base NC). The error may vary based on the user's access rights
163 # Just try against the OU created in the test setup
164 self
._test
_repl
_exop
(exop
=drsuapi
.DRSUAPI_EXOP_NONE
,
166 expected_error
=expected_error
)
168 def test_repl_getchanges_userpriv(self
):
170 Tests various replication requests made by a user with only GET_CHANGES
171 rights. Some requests will be accepted, but most will be rejected.
174 # Assign the user GET_CHANGES rights
175 self
.sd_utils
.dacl_add_ace(self
.base_dn
, self
.acl_mod_get_changes
)
177 self
._test
_repl
_single
_obj
(repl_obj
=self
.ou
,
178 expected_error
=[werror
.WERR_DS_DRA_ACCESS_DENIED
])
179 bad_ou
= "OU=bad_obj,%s" % self
.ou
180 self
._test
_repl
_single
_obj
(repl_obj
=bad_ou
,
181 expected_error
=[werror
.WERR_DS_DRA_BAD_DN
,
182 werror
.WERR_DS_DRA_ACCESS_DENIED
])
184 self
._test
_repl
_secret
(repl_obj
=self
.ou
,
185 expected_error
=[werror
.WERR_DS_DRA_ACCESS_DENIED
])
186 self
._test
_repl
_secret
(repl_obj
=self
.user_dn
,
187 expected_error
=[werror
.WERR_DS_DRA_ACCESS_DENIED
])
188 self
._test
_repl
_secret
(repl_obj
=self
.user_dn
,
189 dest_dsa
=self
.ldb_dc1
.get_ntds_GUID(),
190 expected_error
=[werror
.WERR_DS_DRA_ACCESS_DENIED
])
191 self
._test
_repl
_secret
(repl_obj
=bad_ou
,
192 expected_error
=[werror
.WERR_DS_DRA_BAD_DN
])
194 self
._test
_repl
_full
(expected_error
=[werror
.WERR_DS_DRA_ACCESS_DENIED
])
195 self
._test
_repl
_full
_on
_ou
(repl_obj
=self
.ou
,
196 expected_error
=[werror
.WERR_DS_CANT_FIND_EXPECTED_NC
,
197 werror
.WERR_DS_DRA_ACCESS_DENIED
])
198 self
._test
_repl
_full
_on
_ou
(repl_obj
=bad_ou
,
199 expected_error
=[werror
.WERR_DS_DRA_BAD_NC
,
200 werror
.WERR_DS_DRA_ACCESS_DENIED
])
202 # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we
203 # expect the following to succeed
204 self
._test
_repl
_single
_obj
(repl_obj
=self
.ou
,
206 partial_attribute_set
=self
.get_partial_attribute_set())
207 self
._test
_repl
_full
(expected_error
=None,
208 partial_attribute_set
=self
.get_partial_attribute_set())
210 def test_repl_getallchanges_userpriv(self
):
212 Tests various replication requests made by a user with only
213 GET_ALL_CHANGES rights. Note that assigning these rights is possible,
214 but doesn't make a lot of sense. We test it anyway for consistency.
217 # Assign the user GET_ALL_CHANGES rights
218 self
.sd_utils
.dacl_add_ace(self
.base_dn
, self
.acl_mod_get_all_changes
)
220 # We can expect to get the same responses as an unprivileged user,
221 # i.e. we have permission to see the results, but don't have permission
223 self
.test_repl_no_userpriv()
225 def test_repl_both_userpriv(self
):
227 Tests various replication requests made by a privileged user (i.e. has
228 both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests
232 # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights
233 both_rights
= self
.acl_mod_get_changes
+ self
.acl_mod_get_all_changes
234 self
.sd_utils
.dacl_add_ace(self
.base_dn
, both_rights
)
236 self
._test
_repl
_single
_obj
(repl_obj
=self
.ou
,
238 bad_ou
= "OU=bad_obj,%s" % self
.ou
239 self
._test
_repl
_single
_obj
(repl_obj
=bad_ou
,
240 expected_error
=[werror
.WERR_DS_DRA_BAD_DN
])
242 # Microsoft returns DB_ERROR, Samba returns ACCESS_DENIED
243 self
._test
_repl
_secret
(repl_obj
=self
.ou
,
244 expected_error
=[werror
.WERR_DS_DRA_DB_ERROR
,
245 werror
.WERR_DS_DRA_ACCESS_DENIED
])
246 self
._test
_repl
_secret
(repl_obj
=self
.user_dn
,
247 expected_error
=[werror
.WERR_DS_DRA_DB_ERROR
,
248 werror
.WERR_DS_DRA_ACCESS_DENIED
])
249 # Note that Windows accepts this but Samba rejects it
250 self
._test
_repl
_secret
(repl_obj
=self
.user_dn
,
251 dest_dsa
=self
.ldb_dc1
.get_ntds_GUID(),
252 expected_error
=[werror
.WERR_DS_DRA_ACCESS_DENIED
])
254 self
._test
_repl
_secret
(repl_obj
=bad_ou
,
255 expected_error
=[werror
.WERR_DS_DRA_BAD_DN
])
257 self
._test
_repl
_full
(expected_error
=None)
258 self
._test
_repl
_full
_on
_ou
(repl_obj
=self
.ou
,
259 expected_error
=[werror
.WERR_DS_CANT_FIND_EXPECTED_NC
])
260 self
._test
_repl
_full
_on
_ou
(repl_obj
=bad_ou
,
261 expected_error
=[werror
.WERR_DS_DRA_BAD_NC
,
262 werror
.WERR_DS_DRA_BAD_DN
])
264 self
._test
_repl
_single
_obj
(repl_obj
=self
.ou
,
266 partial_attribute_set
=self
.get_partial_attribute_set())
267 self
._test
_repl
_full
(expected_error
=None,
268 partial_attribute_set
=self
.get_partial_attribute_set())
270 def test_repl_no_userpriv(self
):
272 Tests various replication requests made by a unprivileged user.
273 We expect all these requests to be rejected.
276 # Microsoft usually returns BAD_DN, Samba returns ACCESS_DENIED
277 usual_error
= [werror
.WERR_DS_DRA_BAD_DN
, werror
.WERR_DS_DRA_ACCESS_DENIED
]
279 self
._test
_repl
_single
_obj
(repl_obj
=self
.ou
,
280 expected_error
=usual_error
)
281 bad_ou
= "OU=bad_obj,%s" % self
.ou
282 self
._test
_repl
_single
_obj
(repl_obj
=bad_ou
,
283 expected_error
=usual_error
)
285 self
._test
_repl
_secret
(repl_obj
=self
.ou
,
286 expected_error
=usual_error
)
287 self
._test
_repl
_secret
(repl_obj
=self
.user_dn
,
288 expected_error
=usual_error
)
289 self
._test
_repl
_secret
(repl_obj
=self
.user_dn
,
290 dest_dsa
=self
.ldb_dc1
.get_ntds_GUID(),
291 expected_error
=usual_error
)
292 self
._test
_repl
_secret
(repl_obj
=bad_ou
,
293 expected_error
=usual_error
)
295 self
._test
_repl
_full
(expected_error
=[werror
.WERR_DS_DRA_ACCESS_DENIED
])
296 self
._test
_repl
_full
_on
_ou
(repl_obj
=self
.ou
,
297 expected_error
=usual_error
)
298 self
._test
_repl
_full
_on
_ou
(repl_obj
=bad_ou
,
299 expected_error
=[werror
.WERR_DS_DRA_BAD_NC
,
300 werror
.WERR_DS_DRA_ACCESS_DENIED
])
302 self
._test
_repl
_single
_obj
(repl_obj
=self
.ou
,
303 expected_error
=usual_error
,
304 partial_attribute_set
=self
.get_partial_attribute_set())
305 self
._test
_repl
_full
(expected_error
=[werror
.WERR_DS_DRA_ACCESS_DENIED
],
306 partial_attribute_set
=self
.get_partial_attribute_set())