ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / audit_log_pass_change.py
blob1039e17a1807d46c351d65d6d18012f0d6abc3fa
1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the SamDb logging of password changes.
19 """
21 import samba.tests
22 from samba.dcerpc.messaging import MSG_DSDB_PWD_LOG, DSDB_PWD_EVENT_NAME
23 from samba.samdb import SamDB
24 from samba.auth import system_session
25 import os
26 from samba.tests.audit_log_base import AuditLogTestBase
27 from samba.tests import delete_force
28 from samba.net import Net
29 from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS
30 from samba.dcerpc.windows_event_ids import (
31 EVT_ID_PASSWORD_CHANGE,
32 EVT_ID_PASSWORD_RESET
36 USER_NAME = "auditlogtestuser"
37 USER_PASS = samba.generate_random_password(32, 32)
39 SECOND_USER_NAME = "auditlogtestuser02"
40 SECOND_USER_PASS = samba.generate_random_password(32, 32)
43 class AuditLogPassChangeTests(AuditLogTestBase):
45 def setUp(self):
46 self.message_type = MSG_DSDB_PWD_LOG
47 self.event_type = DSDB_PWD_EVENT_NAME
48 super().setUp()
50 self.server_ip = os.environ["SERVER_IP"]
52 host = "ldap://%s" % os.environ["SERVER"]
53 self.ldb = SamDB(url=host,
54 session_info=system_session(),
55 credentials=self.get_credentials(),
56 lp=self.get_loadparm())
57 self.server = os.environ["SERVER"]
59 # Gets back the basedn
60 self.base_dn = self.ldb.domain_dn()
62 # Get the old "dSHeuristics" if it was set
63 dsheuristics = self.ldb.get_dsheuristics()
65 # Set the "dSHeuristics" to activate the correct "userPassword"
66 # behaviour
67 self.ldb.set_dsheuristics("000000001")
69 # Reset the "dSHeuristics" as they were before
70 self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
72 # Get the old "minPwdAge"
73 minPwdAge = self.ldb.get_minPwdAge()
75 # Set it temporarily to "0"
76 self.ldb.set_minPwdAge("0")
77 self.base_dn = self.ldb.domain_dn()
79 # Reset the "minPwdAge" as it was before
80 self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
82 # (Re)adds the test user USER_NAME with password USER_PASS
83 delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
84 delete_force(
85 self.ldb,
86 "cn=" + SECOND_USER_NAME + ",cn=users," + self.base_dn)
87 self.ldb.add({
88 "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
89 "objectclass": "user",
90 "sAMAccountName": USER_NAME,
91 "userPassword": USER_PASS
95 # Discard the messages from the setup code
97 def discardSetupMessages(self, dn):
98 self.waitForMessages(1, dn=dn)
99 self.discardMessages()
101 def test_net_change_password(self):
103 dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
104 self.discardSetupMessages(dn)
106 creds = self.insta_creds(template=self.get_credentials())
108 lp = self.get_loadparm()
109 net = Net(creds, lp, server=self.server)
110 password = "newPassword!!42"
112 net.change_password(newpassword=password,
113 username=USER_NAME,
114 oldpassword=USER_PASS)
116 messages = self.waitForMessages(1, net, dn)
117 print("Received %d messages" % len(messages))
118 self.assertEqual(1,
119 len(messages),
120 "Did not receive the expected number of messages")
121 audit = messages[0]["passwordChange"]
122 self.assertEqual(EVT_ID_PASSWORD_CHANGE, audit["eventId"])
123 self.assertEqual("Change", audit["action"])
124 self.assertEqual(dn, audit["dn"])
125 self.assertRegex(audit["remoteAddress"],
126 self.remoteAddress)
127 session_id = self.get_session()
128 self.assertEqual(session_id, audit["sessionId"])
129 service_description = self.get_service_description()
130 self.assertEqual(service_description, "DCE/RPC")
131 self.assertTrue(self.is_guid(audit["transactionId"]))
133 def test_net_set_password_user_without_permission(self):
135 dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
136 self.discardSetupMessages(dn)
138 self.ldb.newuser(SECOND_USER_NAME, SECOND_USER_PASS)
141 # Get the password reset from the user add
143 dn = "CN=" + SECOND_USER_NAME + ",CN=Users," + self.base_dn
144 messages = self.waitForMessages(1, dn=dn)
145 print("Received %d messages" % len(messages))
146 self.assertEqual(1,
147 len(messages),
148 "Did not receive the expected number of messages")
150 audit = messages[0]["passwordChange"]
151 self.assertEqual(EVT_ID_PASSWORD_RESET, audit["eventId"])
152 self.assertEqual("Reset", audit["action"])
153 self.assertEqual(dn, audit["dn"])
154 self.assertRegex(audit["remoteAddress"],
155 self.remoteAddress)
156 session_id = self.get_session()
157 self.assertEqual(session_id, audit["sessionId"])
158 service_description = self.get_service_description()
159 self.assertEqual(service_description, "LDAP")
160 self.assertTrue(self.is_guid(audit["transactionId"]))
161 self.assertEqual(0, audit["statusCode"])
162 self.assertEqual("Success", audit["status"])
163 self.discardMessages()
165 creds = self.insta_creds(
166 template=self.get_credentials(),
167 username=SECOND_USER_NAME,
168 userpass=SECOND_USER_PASS,
169 kerberos_state=None)
171 lp = self.get_loadparm()
172 net = Net(creds, lp, server=self.server)
173 password = "newPassword!!42"
174 domain = lp.get("workgroup")
176 try:
177 net.set_password(newpassword=password,
178 account_name=USER_NAME,
179 domain_name=domain)
180 self.fail("Expected exception not thrown")
181 except Exception:
182 pass
184 dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
185 messages = self.waitForMessages(1, net, dn=dn)
186 print("Received %d messages" % len(messages))
187 self.assertEqual(1,
188 len(messages),
189 "Did not receive the expected number of messages")
191 audit = messages[0]["passwordChange"]
192 self.assertEqual(EVT_ID_PASSWORD_RESET, audit["eventId"])
193 self.assertEqual("Reset", audit["action"])
194 self.assertEqual(dn, audit["dn"])
195 self.assertRegex(audit["remoteAddress"],
196 self.remoteAddress)
197 session_id = self.get_session()
198 self.assertEqual(session_id, audit["sessionId"])
199 service_description = self.get_service_description()
200 self.assertEqual(service_description, "DCE/RPC")
201 self.assertTrue(self.is_guid(audit["transactionId"]))
202 self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, audit["statusCode"])
203 self.assertEqual("insufficient access rights", audit["status"])
205 def test_net_set_password(self):
207 dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
208 self.discardSetupMessages(dn)
210 creds = self.insta_creds(template=self.get_credentials())
212 lp = self.get_loadparm()
213 net = Net(creds, lp, server=self.server)
214 password = "newPassword!!42"
215 domain = lp.get("workgroup")
217 net.set_password(newpassword=password,
218 account_name=USER_NAME,
219 domain_name=domain)
221 dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
222 messages = self.waitForMessages(1, net, dn)
223 print("Received %d messages" % len(messages))
224 self.assertEqual(1,
225 len(messages),
226 "Did not receive the expected number of messages")
228 audit = messages[0]["passwordChange"]
229 self.assertEqual(EVT_ID_PASSWORD_RESET, audit["eventId"])
230 self.assertEqual("Reset", audit["action"])
231 self.assertEqual(dn, audit["dn"])
232 self.assertRegex(audit["remoteAddress"],
233 self.remoteAddress)
234 session_id = self.get_session()
235 self.assertEqual(session_id, audit["sessionId"])
236 service_description = self.get_service_description()
237 self.assertEqual(service_description, "DCE/RPC")
238 session_id = self.get_session()
239 self.assertEqual(session_id, audit["sessionId"])
240 self.assertTrue(self.is_guid(audit["transactionId"]))
242 def test_ldap_change_password(self):
244 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
245 self.discardSetupMessages(dn)
247 new_password = samba.generate_random_password(32, 32)
248 self.ldb.modify_ldif(
249 "dn: " + dn + "\n" +
250 "changetype: modify\n" +
251 "delete: userPassword\n" +
252 "userPassword: " + USER_PASS + "\n" +
253 "add: userPassword\n" +
254 "userPassword: " + new_password + "\n")
256 messages = self.waitForMessages(1, dn=dn)
257 print("Received %d messages" % len(messages))
258 self.assertEqual(1,
259 len(messages),
260 "Did not receive the expected number of messages")
262 audit = messages[0]["passwordChange"]
263 self.assertEqual(EVT_ID_PASSWORD_CHANGE, audit["eventId"])
264 self.assertEqual("Change", audit["action"])
265 self.assertEqual(dn, audit["dn"])
266 self.assertRegex(audit["remoteAddress"],
267 self.remoteAddress)
268 self.assertTrue(self.is_guid(audit["sessionId"]))
269 session_id = self.get_session()
270 self.assertEqual(session_id, audit["sessionId"])
271 service_description = self.get_service_description()
272 self.assertEqual(service_description, "LDAP")
273 self.assertTrue(self.is_guid(audit["transactionId"]))
275 def test_ldap_replace_password(self):
277 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
278 self.discardSetupMessages(dn)
280 new_password = samba.generate_random_password(32, 32)
281 self.ldb.modify_ldif(
282 "dn: " + dn + "\n" +
283 "changetype: modify\n" +
284 "replace: userPassword\n" +
285 "userPassword: " + new_password + "\n")
287 messages = self.waitForMessages(1, dn=dn)
288 print("Received %d messages" % len(messages))
289 self.assertEqual(1,
290 len(messages),
291 "Did not receive the expected number of messages")
293 audit = messages[0]["passwordChange"]
294 self.assertEqual(EVT_ID_PASSWORD_RESET, audit["eventId"])
295 self.assertEqual("Reset", audit["action"])
296 self.assertEqual(dn, audit["dn"])
297 self.assertRegex(audit["remoteAddress"],
298 self.remoteAddress)
299 self.assertTrue(self.is_guid(audit["sessionId"]))
300 session_id = self.get_session()
301 self.assertEqual(session_id, audit["sessionId"])
302 service_description = self.get_service_description()
303 self.assertEqual(service_description, "LDAP")
304 self.assertTrue(self.is_guid(audit["transactionId"]))
306 def test_ldap_add_user(self):
308 # The setup code adds a user, so we check for the password event
309 # generated by it.
310 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
311 messages = self.waitForMessages(1, dn=dn)
312 print("Received %d messages" % len(messages))
313 self.assertEqual(1,
314 len(messages),
315 "Did not receive the expected number of messages")
318 # The first message should be the reset from the Setup code.
320 audit = messages[0]["passwordChange"]
321 self.assertEqual(EVT_ID_PASSWORD_RESET, audit["eventId"])
322 self.assertEqual("Reset", audit["action"])
323 self.assertEqual(dn, audit["dn"])
324 self.assertRegex(audit["remoteAddress"],
325 self.remoteAddress)
326 session_id = self.get_session()
327 self.assertEqual(session_id, audit["sessionId"])
328 service_description = self.get_service_description()
329 self.assertEqual(service_description, "LDAP")
330 self.assertTrue(self.is_guid(audit["sessionId"]))
331 self.assertTrue(self.is_guid(audit["transactionId"]))