drsuapi.idl: fix source_dsa spelling
[samba4-gss.git] / python / samba / tests / audit_log_dsdb.py
blobaf623376cd16f845fd89cac672ada6d65167bf5f
1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the SamDb logging of password changes.
19 """
21 import samba.tests
22 from samba.dcerpc.messaging import MSG_DSDB_LOG, DSDB_EVENT_NAME
23 from ldb import ERR_NO_SUCH_OBJECT
24 from samba.samdb import SamDB
25 from samba.auth import system_session
26 import os
27 import time
28 from samba.tests.audit_log_base import AuditLogTestBase
29 from samba.tests import delete_force
30 from samba.net import Net
31 import samba
32 from samba.dcerpc import security, lsa
34 USER_NAME = "auditlogtestuser"
35 USER_PASS = samba.generate_random_password(32, 32)
38 class AuditLogDsdbTests(AuditLogTestBase):
40 def setUp(self):
41 self.message_type = MSG_DSDB_LOG
42 self.event_type = DSDB_EVENT_NAME
43 super().setUp()
45 self.server_ip = os.environ["SERVER_IP"]
47 host = "ldap://%s" % os.environ["SERVER"]
48 self.ldb = SamDB(url=host,
49 session_info=system_session(),
50 credentials=self.get_credentials(),
51 lp=self.get_loadparm())
52 self.server = os.environ["SERVER"]
54 # Gets back the basedn
55 self.base_dn = self.ldb.domain_dn()
57 # Get the old "dSHeuristics" if it was set
58 dsheuristics = self.ldb.get_dsheuristics()
60 # Set the "dSHeuristics" to activate the correct "userPassword"
61 # behaviour
62 self.ldb.set_dsheuristics("000000001")
64 # Reset the "dSHeuristics" as they were before
65 self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
67 # Get the old "minPwdAge"
68 minPwdAge = self.ldb.get_minPwdAge()
70 # Set it temporarily to "0"
71 self.ldb.set_minPwdAge("0")
72 self.base_dn = self.ldb.domain_dn()
74 # Reset the "minPwdAge" as it was before
75 self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
77 # (Re)adds the test user USER_NAME with password USER_PASS
78 delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
79 self.ldb.add({
80 "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
81 "objectclass": "user",
82 "sAMAccountName": USER_NAME,
83 "userPassword": USER_PASS
87 # Discard the messages from the setup code
89 def discardSetupMessages(self, dn):
90 self.waitForMessages(2, dn=dn)
91 self.discardMessages()
93 def tearDown(self):
94 self.discardMessages()
95 super().tearDown()
97 def haveExpectedTxn(self, expected):
98 if self.context["txnMessage"] is not None:
99 txn = self.context["txnMessage"]["dsdbTransaction"]
100 if txn["transactionId"] == expected:
101 return True
102 return False
104 def waitForTransaction(self, expected, connection=None):
105 """Wait for a transaction message to arrive
106 The connection is passed through to keep the connection alive
107 until all the logging messages have been received.
110 self.connection = connection
112 start_time = time.time()
113 while not self.haveExpectedTxn(expected):
114 self.msg_ctx.loop_once(0.1)
115 if time.time() - start_time > 1:
116 self.connection = None
117 return ""
119 self.connection = None
120 return self.context["txnMessage"]
122 def test_net_change_password(self):
124 dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
125 self.discardSetupMessages(dn)
127 creds = self.insta_creds(template=self.get_credentials())
129 lp = self.get_loadparm()
130 net = Net(creds, lp, server=self.server)
131 password = "newPassword!!42"
133 net.change_password(newpassword=password,
134 username=USER_NAME,
135 oldpassword=USER_PASS)
137 messages = self.waitForMessages(1, net, dn=dn)
138 print("Received %d messages" % len(messages))
139 self.assertEqual(1,
140 len(messages),
141 "Did not receive the expected number of messages")
143 audit = messages[0]["dsdbChange"]
144 self.assertEqual("Modify", audit["operation"])
145 self.assertFalse(audit["performedAsSystem"])
146 self.assertTrue(dn.lower(), audit["dn"].lower())
147 self.assertRegex(audit["remoteAddress"],
148 self.remoteAddress)
149 session_id = self.get_session()
150 self.assertEqual(session_id, audit["sessionId"])
151 # We skip the check for self.get_service_description() as this
152 # is subject to a race between smbd and the s4 rpc_server code
153 # as to which will set the description as it is DCE/RPC over SMB
155 self.assertTrue(self.is_guid(audit["transactionId"]))
157 attributes = audit["attributes"]
158 self.assertEqual(1, len(attributes))
159 actions = attributes["clearTextPassword"]["actions"]
160 self.assertEqual(1, len(actions))
161 self.assertTrue(actions[0]["redacted"])
162 self.assertEqual("replace", actions[0]["action"])
164 def test_net_set_password(self):
166 dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
167 self.discardSetupMessages(dn)
169 creds = self.insta_creds(template=self.get_credentials())
171 lp = self.get_loadparm()
172 net = Net(creds, lp, server=self.server)
173 password = "newPassword!!42"
174 domain = lp.get("workgroup")
176 net.set_password(newpassword=password,
177 account_name=USER_NAME,
178 domain_name=domain)
179 messages = self.waitForMessages(1, net, dn=dn)
180 print("Received %d messages" % len(messages))
181 self.assertEqual(1,
182 len(messages),
183 "Did not receive the expected number of messages")
184 audit = messages[0]["dsdbChange"]
185 self.assertEqual("Modify", audit["operation"])
186 self.assertFalse(audit["performedAsSystem"])
187 self.assertEqual(dn, audit["dn"])
188 self.assertRegex(audit["remoteAddress"],
189 self.remoteAddress)
190 session_id = self.get_session()
191 self.assertEqual(session_id, audit["sessionId"])
192 # We skip the check for self.get_service_description() as this
193 # is subject to a race between smbd and the s4 rpc_server code
194 # as to which will set the description as it is DCE/RPC over SMB
196 self.assertTrue(self.is_guid(audit["transactionId"]))
198 attributes = audit["attributes"]
199 self.assertEqual(1, len(attributes))
200 actions = attributes["clearTextPassword"]["actions"]
201 self.assertEqual(1, len(actions))
202 self.assertTrue(actions[0]["redacted"])
203 self.assertEqual("replace", actions[0]["action"])
205 def test_ldap_change_password(self):
207 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
208 self.discardSetupMessages(dn)
210 new_password = samba.generate_random_password(32, 32)
211 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
212 self.ldb.modify_ldif(
213 "dn: " + dn + "\n" +
214 "changetype: modify\n" +
215 "delete: userPassword\n" +
216 "userPassword: " + USER_PASS + "\n" +
217 "add: userPassword\n" +
218 "userPassword: " + new_password + "\n")
220 messages = self.waitForMessages(1)
221 print("Received %d messages" % len(messages))
222 self.assertEqual(1,
223 len(messages),
224 "Did not receive the expected number of messages")
226 audit = messages[0]["dsdbChange"]
227 self.assertEqual("Modify", audit["operation"])
228 self.assertFalse(audit["performedAsSystem"])
229 self.assertEqual(dn, audit["dn"])
230 self.assertRegex(audit["remoteAddress"],
231 self.remoteAddress)
232 self.assertTrue(self.is_guid(audit["sessionId"]))
233 session_id = self.get_session()
234 self.assertEqual(session_id, audit["sessionId"])
235 service_description = self.get_service_description()
236 self.assertEqual(service_description, "LDAP")
238 attributes = audit["attributes"]
239 self.assertEqual(1, len(attributes))
240 actions = attributes["userPassword"]["actions"]
241 self.assertEqual(2, len(actions))
242 self.assertTrue(actions[0]["redacted"])
243 self.assertEqual("delete", actions[0]["action"])
244 self.assertTrue(actions[1]["redacted"])
245 self.assertEqual("add", actions[1]["action"])
247 def test_ldap_replace_password(self):
249 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
250 self.discardSetupMessages(dn)
252 new_password = samba.generate_random_password(32, 32)
253 self.ldb.modify_ldif(
254 "dn: " + dn + "\n" +
255 "changetype: modify\n" +
256 "replace: userPassword\n" +
257 "userPassword: " + new_password + "\n")
259 messages = self.waitForMessages(1, dn=dn)
260 print("Received %d messages" % len(messages))
261 self.assertEqual(1,
262 len(messages),
263 "Did not receive the expected number of messages")
265 audit = messages[0]["dsdbChange"]
266 self.assertEqual("Modify", audit["operation"])
267 self.assertFalse(audit["performedAsSystem"])
268 self.assertTrue(dn.lower(), audit["dn"].lower())
269 self.assertRegex(audit["remoteAddress"],
270 self.remoteAddress)
271 self.assertTrue(self.is_guid(audit["sessionId"]))
272 session_id = self.get_session()
273 self.assertEqual(session_id, audit["sessionId"])
274 service_description = self.get_service_description()
275 self.assertEqual(service_description, "LDAP")
276 self.assertTrue(self.is_guid(audit["transactionId"]))
278 attributes = audit["attributes"]
279 self.assertEqual(1, len(attributes))
280 actions = attributes["userPassword"]["actions"]
281 self.assertEqual(1, len(actions))
282 self.assertTrue(actions[0]["redacted"])
283 self.assertEqual("replace", actions[0]["action"])
285 def test_ldap_add_user(self):
287 # The setup code adds a user, so we check for the dsdb events
288 # generated by it.
289 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
290 messages = self.waitForMessages(2, dn=dn)
291 print("Received %d messages" % len(messages))
292 self.assertEqual(2,
293 len(messages),
294 "Did not receive the expected number of messages")
296 audit = messages[1]["dsdbChange"]
297 self.assertEqual("Add", audit["operation"])
298 self.assertFalse(audit["performedAsSystem"])
299 self.assertEqual(dn, audit["dn"])
300 self.assertRegex(audit["remoteAddress"],
301 self.remoteAddress)
302 session_id = self.get_session()
303 self.assertEqual(session_id, audit["sessionId"])
304 service_description = self.get_service_description()
305 self.assertEqual(service_description, "LDAP")
306 self.assertTrue(self.is_guid(audit["sessionId"]))
307 self.assertTrue(self.is_guid(audit["transactionId"]))
309 attributes = audit["attributes"]
310 self.assertEqual(3, len(attributes))
312 actions = attributes["objectclass"]["actions"]
313 self.assertEqual(1, len(actions))
314 self.assertEqual("add", actions[0]["action"])
315 self.assertEqual(1, len(actions[0]["values"]))
316 self.assertEqual("user", actions[0]["values"][0]["value"])
318 actions = attributes["sAMAccountName"]["actions"]
319 self.assertEqual(1, len(actions))
320 self.assertEqual("add", actions[0]["action"])
321 self.assertEqual(1, len(actions[0]["values"]))
322 self.assertEqual(USER_NAME, actions[0]["values"][0]["value"])
324 actions = attributes["userPassword"]["actions"]
325 self.assertEqual(1, len(actions))
326 self.assertEqual("add", actions[0]["action"])
327 self.assertTrue(actions[0]["redacted"])
329 def test_samdb_delete_user(self):
331 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
332 self.discardSetupMessages(dn)
334 self.ldb.deleteuser(USER_NAME)
336 messages = self.waitForMessages(1, dn=dn)
337 print("Received %d messages" % len(messages))
338 self.assertEqual(1,
339 len(messages),
340 "Did not receive the expected number of messages")
342 audit = messages[0]["dsdbChange"]
343 self.assertEqual("Delete", audit["operation"])
344 self.assertFalse(audit["performedAsSystem"])
345 self.assertTrue(dn.lower(), audit["dn"].lower())
346 self.assertRegex(audit["remoteAddress"],
347 self.remoteAddress)
348 self.assertTrue(self.is_guid(audit["sessionId"]))
349 self.assertEqual(0, audit["statusCode"])
350 self.assertEqual("Success", audit["status"])
351 session_id = self.get_session()
352 self.assertEqual(session_id, audit["sessionId"])
353 service_description = self.get_service_description()
354 self.assertEqual(service_description, "LDAP")
356 transactionId = audit["transactionId"]
357 message = self.waitForTransaction(transactionId)
358 audit = message["dsdbTransaction"]
359 self.assertEqual("commit", audit["action"])
360 self.assertTrue(self.is_guid(audit["transactionId"]))
361 self.assertTrue(audit["duration"] > 0)
363 def test_samdb_delete_non_existent_dn(self):
365 DOES_NOT_EXIST = "doesNotExist"
366 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
367 self.discardSetupMessages(dn)
369 dn = "cn=" + DOES_NOT_EXIST + ",cn=users," + self.base_dn
370 try:
371 self.ldb.delete(dn)
372 self.fail("Exception not thrown")
373 except Exception:
374 pass
376 messages = self.waitForMessages(1)
377 print("Received %d messages" % len(messages))
378 self.assertEqual(1,
379 len(messages),
380 "Did not receive the expected number of messages")
382 audit = messages[0]["dsdbChange"]
383 self.assertEqual("Delete", audit["operation"])
384 self.assertFalse(audit["performedAsSystem"])
385 self.assertTrue(dn.lower(), audit["dn"].lower())
386 self.assertRegex(audit["remoteAddress"],
387 self.remoteAddress)
388 self.assertEqual(ERR_NO_SUCH_OBJECT, audit["statusCode"])
389 self.assertEqual("No such object", audit["status"])
390 self.assertTrue(self.is_guid(audit["sessionId"]))
391 session_id = self.get_session()
392 self.assertEqual(session_id, audit["sessionId"])
393 service_description = self.get_service_description()
394 self.assertEqual(service_description, "LDAP")
396 transactionId = audit["transactionId"]
397 message = self.waitForTransaction(transactionId)
398 audit = message["dsdbTransaction"]
399 self.assertEqual("rollback", audit["action"])
400 self.assertTrue(self.is_guid(audit["transactionId"]))
401 self.assertTrue(audit["duration"] > 0)
403 def test_create_and_delete_secret_over_lsa(self):
405 dn = "cn=Test Secret,CN=System," + self.base_dn
406 self.discardSetupMessages(dn)
408 creds = self.insta_creds(template=self.get_credentials())
409 lsa_conn = lsa.lsarpc(
410 "ncacn_np:%s" % self.server,
411 self.get_loadparm(),
412 creds)
413 lsa_handle = lsa_conn.OpenPolicy2(
414 system_name="\\",
415 attr=lsa.ObjectAttribute(),
416 access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED)
417 secret_name = lsa.String()
418 secret_name.string = "G$Test"
419 lsa_conn.CreateSecret(
420 handle=lsa_handle,
421 name=secret_name,
422 access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED)
424 messages = self.waitForMessages(1, dn=dn)
425 print("Received %d messages" % len(messages))
426 self.assertEqual(1,
427 len(messages),
428 "Did not receive the expected number of messages")
430 audit = messages[0]["dsdbChange"]
431 self.assertEqual("Add", audit["operation"])
432 self.assertTrue(audit["performedAsSystem"])
433 self.assertTrue(dn.lower(), audit["dn"].lower())
434 self.assertRegex(audit["remoteAddress"],
435 self.remoteAddress)
436 self.assertTrue(self.is_guid(audit["sessionId"]))
437 session_id = self.get_session()
438 self.assertEqual(session_id, audit["sessionId"])
440 # We skip the check for self.get_service_description() as this
441 # is subject to a race between smbd and the s4 rpc_server code
442 # as to which will set the description as it is DCE/RPC over SMB
444 attributes = audit["attributes"]
445 self.assertEqual(2, len(attributes))
447 object_class = attributes["objectClass"]
448 self.assertEqual(1, len(object_class["actions"]))
449 action = object_class["actions"][0]
450 self.assertEqual("add", action["action"])
451 values = action["values"]
452 self.assertEqual(1, len(values))
453 self.assertEqual("secret", values[0]["value"])
455 cn = attributes["cn"]
456 self.assertEqual(1, len(cn["actions"]))
457 action = cn["actions"][0]
458 self.assertEqual("add", action["action"])
459 values = action["values"]
460 self.assertEqual(1, len(values))
461 self.assertEqual("Test Secret", values[0]["value"])
464 # Now delete the secret.
465 self.discardMessages()
466 h = lsa_conn.OpenSecret(
467 handle=lsa_handle,
468 name=secret_name,
469 access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED)
471 lsa_conn.DeleteObject(h)
472 messages = self.waitForMessages(1, dn=dn)
473 print("Received %d messages" % len(messages))
474 self.assertEqual(1,
475 len(messages),
476 "Did not receive the expected number of messages")
478 dn = "cn=Test Secret,CN=System," + self.base_dn
479 audit = messages[0]["dsdbChange"]
480 self.assertEqual("Delete", audit["operation"])
481 self.assertTrue(audit["performedAsSystem"])
482 self.assertTrue(dn.lower(), audit["dn"].lower())
483 self.assertRegex(audit["remoteAddress"],
484 self.remoteAddress)
485 self.assertTrue(self.is_guid(audit["sessionId"]))
486 session_id = self.get_session()
487 self.assertEqual(session_id, audit["sessionId"])
489 # We skip the check for self.get_service_description() as this
490 # is subject to a race between smbd and the s4 rpc_server code
491 # as to which will set the description as it is DCE/RPC over SMB
493 def test_modify(self):
495 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
496 self.discardSetupMessages(dn)
499 # Add an attribute value
501 self.ldb.modify_ldif(
502 "dn: " + dn + "\n" +
503 "changetype: modify\n" +
504 "add: carLicense\n" +
505 "carLicense: license-01\n")
507 messages = self.waitForMessages(1, dn=dn)
508 print("Received %d messages" % len(messages))
509 self.assertEqual(1,
510 len(messages),
511 "Did not receive the expected number of messages")
513 audit = messages[0]["dsdbChange"]
514 self.assertEqual("Modify", audit["operation"])
515 self.assertFalse(audit["performedAsSystem"])
516 self.assertEqual(dn, audit["dn"])
517 self.assertRegex(audit["remoteAddress"],
518 self.remoteAddress)
519 self.assertTrue(self.is_guid(audit["sessionId"]))
520 session_id = self.get_session()
521 self.assertEqual(session_id, audit["sessionId"])
522 service_description = self.get_service_description()
523 self.assertEqual(service_description, "LDAP")
525 attributes = audit["attributes"]
526 self.assertEqual(1, len(attributes))
527 actions = attributes["carLicense"]["actions"]
528 self.assertEqual(1, len(actions))
529 self.assertEqual("add", actions[0]["action"])
530 values = actions[0]["values"]
531 self.assertEqual(1, len(values))
532 self.assertEqual("license-01", values[0]["value"])
535 # Add an another value to the attribute
537 self.discardMessages()
538 self.ldb.modify_ldif(
539 "dn: " + dn + "\n" +
540 "changetype: modify\n" +
541 "add: carLicense\n" +
542 "carLicense: license-02\n")
544 messages = self.waitForMessages(1, dn=dn)
545 print("Received %d messages" % len(messages))
546 self.assertEqual(1,
547 len(messages),
548 "Did not receive the expected number of messages")
549 attributes = messages[0]["dsdbChange"]["attributes"]
550 self.assertEqual(1, len(attributes))
551 actions = attributes["carLicense"]["actions"]
552 self.assertEqual(1, len(actions))
553 self.assertEqual("add", actions[0]["action"])
554 values = actions[0]["values"]
555 self.assertEqual(1, len(values))
556 self.assertEqual("license-02", values[0]["value"])
559 # Add an another two values to the attribute
561 self.discardMessages()
562 self.ldb.modify_ldif(
563 "dn: " + dn + "\n" +
564 "changetype: modify\n" +
565 "add: carLicense\n" +
566 "carLicense: license-03\n" +
567 "carLicense: license-04\n")
569 messages = self.waitForMessages(1, dn=dn)
570 print("Received %d messages" % len(messages))
571 self.assertEqual(1,
572 len(messages),
573 "Did not receive the expected number of messages")
574 attributes = messages[0]["dsdbChange"]["attributes"]
575 self.assertEqual(1, len(attributes))
576 actions = attributes["carLicense"]["actions"]
577 self.assertEqual(1, len(actions))
578 self.assertEqual("add", actions[0]["action"])
579 values = actions[0]["values"]
580 self.assertEqual(2, len(values))
581 self.assertEqual("license-03", values[0]["value"])
582 self.assertEqual("license-04", values[1]["value"])
585 # delete two values to the attribute
587 self.discardMessages()
588 self.ldb.modify_ldif(
589 "dn: " + dn + "\n" +
590 "changetype: modify\n" +
591 "delete: carLicense\n" +
592 "carLicense: license-03\n" +
593 "carLicense: license-04\n")
595 messages = self.waitForMessages(1, dn=dn)
596 print("Received %d messages" % len(messages))
597 self.assertEqual(1,
598 len(messages),
599 "Did not receive the expected number of messages")
600 attributes = messages[0]["dsdbChange"]["attributes"]
601 self.assertEqual(1, len(attributes))
602 actions = attributes["carLicense"]["actions"]
603 self.assertEqual(1, len(actions))
604 self.assertEqual("delete", actions[0]["action"])
605 values = actions[0]["values"]
606 self.assertEqual(2, len(values))
607 self.assertEqual("license-03", values[0]["value"])
608 self.assertEqual("license-04", values[1]["value"])
611 # replace two values to the attribute
613 self.discardMessages()
614 self.ldb.modify_ldif(
615 "dn: " + dn + "\n" +
616 "changetype: modify\n" +
617 "replace: carLicense\n" +
618 "carLicense: license-05\n" +
619 "carLicense: license-06\n")
621 messages = self.waitForMessages(1, dn=dn)
622 print("Received %d messages" % len(messages))
623 self.assertEqual(1,
624 len(messages),
625 "Did not receive the expected number of messages")
626 attributes = messages[0]["dsdbChange"]["attributes"]
627 self.assertEqual(1, len(attributes))
628 actions = attributes["carLicense"]["actions"]
629 self.assertEqual(1, len(actions))
630 self.assertEqual("replace", actions[0]["action"])
631 values = actions[0]["values"]
632 self.assertEqual(2, len(values))
633 self.assertEqual("license-05", values[0]["value"])
634 self.assertEqual("license-06", values[1]["value"])