1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the SamDb logging of password changes.
22 from samba
.dcerpc
.messaging
import MSG_DSDB_PWD_LOG
, DSDB_PWD_EVENT_NAME
23 from samba
.samdb
import SamDB
24 from samba
.auth
import system_session
26 from samba
.tests
.audit_log_base
import AuditLogTestBase
27 from samba
.tests
import delete_force
28 from samba
.net
import Net
29 from ldb
import ERR_INSUFFICIENT_ACCESS_RIGHTS
30 from samba
.dcerpc
.windows_event_ids
import (
31 EVT_ID_PASSWORD_CHANGE
,
36 USER_NAME
= "auditlogtestuser"
37 USER_PASS
= samba
.generate_random_password(32, 32)
39 SECOND_USER_NAME
= "auditlogtestuser02"
40 SECOND_USER_PASS
= samba
.generate_random_password(32, 32)
43 class AuditLogPassChangeTests(AuditLogTestBase
):
46 self
.message_type
= MSG_DSDB_PWD_LOG
47 self
.event_type
= DSDB_PWD_EVENT_NAME
50 self
.server_ip
= os
.environ
["SERVER_IP"]
52 host
= "ldap://%s" % os
.environ
["SERVER"]
53 self
.ldb
= SamDB(url
=host
,
54 session_info
=system_session(),
55 credentials
=self
.get_credentials(),
56 lp
=self
.get_loadparm())
57 self
.server
= os
.environ
["SERVER"]
59 # Gets back the basedn
60 self
.base_dn
= self
.ldb
.domain_dn()
62 # Get the old "dSHeuristics" if it was set
63 dsheuristics
= self
.ldb
.get_dsheuristics()
65 # Set the "dSHeuristics" to activate the correct "userPassword"
67 self
.ldb
.set_dsheuristics("000000001")
69 # Reset the "dSHeuristics" as they were before
70 self
.addCleanup(self
.ldb
.set_dsheuristics
, dsheuristics
)
72 # Get the old "minPwdAge"
73 minPwdAge
= self
.ldb
.get_minPwdAge()
75 # Set it temporarily to "0"
76 self
.ldb
.set_minPwdAge("0")
77 self
.base_dn
= self
.ldb
.domain_dn()
79 # Reset the "minPwdAge" as it was before
80 self
.addCleanup(self
.ldb
.set_minPwdAge
, minPwdAge
)
82 # (Re)adds the test user USER_NAME with password USER_PASS
83 delete_force(self
.ldb
, "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
)
86 "cn=" + SECOND_USER_NAME
+ ",cn=users," + self
.base_dn
)
88 "dn": "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
,
89 "objectclass": "user",
90 "sAMAccountName": USER_NAME
,
91 "userPassword": USER_PASS
95 # Discard the messages from the setup code
97 def discardSetupMessages(self
, dn
):
98 self
.waitForMessages(1, dn
=dn
)
99 self
.discardMessages()
101 def test_net_change_password(self
):
103 dn
= "CN=" + USER_NAME
+ ",CN=Users," + self
.base_dn
104 self
.discardSetupMessages(dn
)
106 creds
= self
.insta_creds(template
=self
.get_credentials())
108 lp
= self
.get_loadparm()
109 net
= Net(creds
, lp
, server
=self
.server
)
110 password
= "newPassword!!42"
112 net
.change_password(newpassword
=password
,
114 oldpassword
=USER_PASS
)
116 messages
= self
.waitForMessages(1, net
, dn
)
117 print("Received %d messages" % len(messages
))
120 "Did not receive the expected number of messages")
121 audit
= messages
[0]["passwordChange"]
122 self
.assertEqual(EVT_ID_PASSWORD_CHANGE
, audit
["eventId"])
123 self
.assertEqual("Change", audit
["action"])
124 self
.assertEqual(dn
, audit
["dn"])
125 self
.assertRegex(audit
["remoteAddress"],
127 session_id
= self
.get_session()
128 self
.assertEqual(session_id
, audit
["sessionId"])
129 service_description
= self
.get_service_description()
130 self
.assertEqual(service_description
, "DCE/RPC")
131 self
.assertTrue(self
.is_guid(audit
["transactionId"]))
133 def test_net_set_password_user_without_permission(self
):
135 dn
= "CN=" + USER_NAME
+ ",CN=Users," + self
.base_dn
136 self
.discardSetupMessages(dn
)
138 self
.ldb
.newuser(SECOND_USER_NAME
, SECOND_USER_PASS
)
141 # Get the password reset from the user add
143 dn
= "CN=" + SECOND_USER_NAME
+ ",CN=Users," + self
.base_dn
144 messages
= self
.waitForMessages(1, dn
=dn
)
145 print("Received %d messages" % len(messages
))
148 "Did not receive the expected number of messages")
150 audit
= messages
[0]["passwordChange"]
151 self
.assertEqual(EVT_ID_PASSWORD_RESET
, audit
["eventId"])
152 self
.assertEqual("Reset", audit
["action"])
153 self
.assertEqual(dn
, audit
["dn"])
154 self
.assertRegex(audit
["remoteAddress"],
156 session_id
= self
.get_session()
157 self
.assertEqual(session_id
, audit
["sessionId"])
158 service_description
= self
.get_service_description()
159 self
.assertEqual(service_description
, "LDAP")
160 self
.assertTrue(self
.is_guid(audit
["transactionId"]))
161 self
.assertEqual(0, audit
["statusCode"])
162 self
.assertEqual("Success", audit
["status"])
163 self
.discardMessages()
165 creds
= self
.insta_creds(
166 template
=self
.get_credentials(),
167 username
=SECOND_USER_NAME
,
168 userpass
=SECOND_USER_PASS
,
171 lp
= self
.get_loadparm()
172 net
= Net(creds
, lp
, server
=self
.server
)
173 password
= "newPassword!!42"
174 domain
= lp
.get("workgroup")
177 net
.set_password(newpassword
=password
,
178 account_name
=USER_NAME
,
180 self
.fail("Expected exception not thrown")
184 dn
= "CN=" + USER_NAME
+ ",CN=Users," + self
.base_dn
185 messages
= self
.waitForMessages(1, net
, dn
=dn
)
186 print("Received %d messages" % len(messages
))
189 "Did not receive the expected number of messages")
191 audit
= messages
[0]["passwordChange"]
192 self
.assertEqual(EVT_ID_PASSWORD_RESET
, audit
["eventId"])
193 self
.assertEqual("Reset", audit
["action"])
194 self
.assertEqual(dn
, audit
["dn"])
195 self
.assertRegex(audit
["remoteAddress"],
197 session_id
= self
.get_session()
198 self
.assertEqual(session_id
, audit
["sessionId"])
199 service_description
= self
.get_service_description()
200 self
.assertEqual(service_description
, "DCE/RPC")
201 self
.assertTrue(self
.is_guid(audit
["transactionId"]))
202 self
.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS
, audit
["statusCode"])
203 self
.assertEqual("insufficient access rights", audit
["status"])
205 def test_net_set_password(self
):
207 dn
= "CN=" + USER_NAME
+ ",CN=Users," + self
.base_dn
208 self
.discardSetupMessages(dn
)
210 creds
= self
.insta_creds(template
=self
.get_credentials())
212 lp
= self
.get_loadparm()
213 net
= Net(creds
, lp
, server
=self
.server
)
214 password
= "newPassword!!42"
215 domain
= lp
.get("workgroup")
217 net
.set_password(newpassword
=password
,
218 account_name
=USER_NAME
,
221 dn
= "CN=" + USER_NAME
+ ",CN=Users," + self
.base_dn
222 messages
= self
.waitForMessages(1, net
, dn
)
223 print("Received %d messages" % len(messages
))
226 "Did not receive the expected number of messages")
228 audit
= messages
[0]["passwordChange"]
229 self
.assertEqual(EVT_ID_PASSWORD_RESET
, audit
["eventId"])
230 self
.assertEqual("Reset", audit
["action"])
231 self
.assertEqual(dn
, audit
["dn"])
232 self
.assertRegex(audit
["remoteAddress"],
234 session_id
= self
.get_session()
235 self
.assertEqual(session_id
, audit
["sessionId"])
236 service_description
= self
.get_service_description()
237 self
.assertEqual(service_description
, "DCE/RPC")
238 session_id
= self
.get_session()
239 self
.assertEqual(session_id
, audit
["sessionId"])
240 self
.assertTrue(self
.is_guid(audit
["transactionId"]))
242 def test_ldap_change_password(self
):
244 dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
245 self
.discardSetupMessages(dn
)
247 new_password
= samba
.generate_random_password(32, 32)
248 self
.ldb
.modify_ldif(
250 "changetype: modify\n" +
251 "delete: userPassword\n" +
252 "userPassword: " + USER_PASS
+ "\n" +
253 "add: userPassword\n" +
254 "userPassword: " + new_password
+ "\n")
256 messages
= self
.waitForMessages(1, dn
=dn
)
257 print("Received %d messages" % len(messages
))
260 "Did not receive the expected number of messages")
262 audit
= messages
[0]["passwordChange"]
263 self
.assertEqual(EVT_ID_PASSWORD_CHANGE
, audit
["eventId"])
264 self
.assertEqual("Change", audit
["action"])
265 self
.assertEqual(dn
, audit
["dn"])
266 self
.assertRegex(audit
["remoteAddress"],
268 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
269 session_id
= self
.get_session()
270 self
.assertEqual(session_id
, audit
["sessionId"])
271 service_description
= self
.get_service_description()
272 self
.assertEqual(service_description
, "LDAP")
273 self
.assertTrue(self
.is_guid(audit
["transactionId"]))
275 def test_ldap_replace_password(self
):
277 dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
278 self
.discardSetupMessages(dn
)
280 new_password
= samba
.generate_random_password(32, 32)
281 self
.ldb
.modify_ldif(
283 "changetype: modify\n" +
284 "replace: userPassword\n" +
285 "userPassword: " + new_password
+ "\n")
287 messages
= self
.waitForMessages(1, dn
=dn
)
288 print("Received %d messages" % len(messages
))
291 "Did not receive the expected number of messages")
293 audit
= messages
[0]["passwordChange"]
294 self
.assertEqual(EVT_ID_PASSWORD_RESET
, audit
["eventId"])
295 self
.assertEqual("Reset", audit
["action"])
296 self
.assertEqual(dn
, audit
["dn"])
297 self
.assertRegex(audit
["remoteAddress"],
299 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
300 session_id
= self
.get_session()
301 self
.assertEqual(session_id
, audit
["sessionId"])
302 service_description
= self
.get_service_description()
303 self
.assertEqual(service_description
, "LDAP")
304 self
.assertTrue(self
.is_guid(audit
["transactionId"]))
306 def test_ldap_add_user(self
):
308 # The setup code adds a user, so we check for the password event
310 dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
311 messages
= self
.waitForMessages(1, dn
=dn
)
312 print("Received %d messages" % len(messages
))
315 "Did not receive the expected number of messages")
318 # The first message should be the reset from the Setup code.
320 audit
= messages
[0]["passwordChange"]
321 self
.assertEqual(EVT_ID_PASSWORD_RESET
, audit
["eventId"])
322 self
.assertEqual("Reset", audit
["action"])
323 self
.assertEqual(dn
, audit
["dn"])
324 self
.assertRegex(audit
["remoteAddress"],
326 session_id
= self
.get_session()
327 self
.assertEqual(session_id
, audit
["sessionId"])
328 service_description
= self
.get_service_description()
329 self
.assertEqual(service_description
, "LDAP")
330 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
331 self
.assertTrue(self
.is_guid(audit
["transactionId"]))