1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """Tests for the SamDb logging of password changes.
21 from samba
.dcerpc
.messaging
import MSG_GROUP_LOG
, DSDB_GROUP_EVENT_NAME
22 from samba
.dcerpc
.windows_event_ids
import (
23 EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP
,
24 EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP
26 from samba
.samdb
import SamDB
27 from samba
.auth
import system_session
29 from samba
.tests
.audit_log_base
import AuditLogTestBase
30 from samba
.tests
import delete_force
32 from ldb
import FLAG_MOD_REPLACE
34 USER_NAME
= "grpadttstuser01"
35 USER_PASS
= samba
.generate_random_password(32, 32)
37 SECOND_USER_NAME
= "grpadttstuser02"
38 SECOND_USER_PASS
= samba
.generate_random_password(32, 32)
40 GROUP_NAME_01
= "group-audit-01"
41 GROUP_NAME_02
= "group-audit-02"
44 class GroupAuditTests(AuditLogTestBase
):
47 self
.message_type
= MSG_GROUP_LOG
48 self
.event_type
= DSDB_GROUP_EVENT_NAME
51 self
.server_ip
= os
.environ
["SERVER_IP"]
53 host
= "ldap://%s" % os
.environ
["SERVER"]
54 self
.ldb
= SamDB(url
=host
,
55 session_info
=system_session(),
56 credentials
=self
.get_credentials(),
57 lp
=self
.get_loadparm())
58 self
.server
= os
.environ
["SERVER"]
60 # Gets back the basedn
61 self
.base_dn
= self
.ldb
.domain_dn()
63 # Get the old "dSHeuristics" if it was set
64 dsheuristics
= self
.ldb
.get_dsheuristics()
66 # Set the "dSHeuristics" to activate the correct "userPassword"
68 self
.ldb
.set_dsheuristics("000000001")
70 # Reset the "dSHeuristics" as they were before
71 self
.addCleanup(self
.ldb
.set_dsheuristics
, dsheuristics
)
73 # Get the old "minPwdAge"
74 minPwdAge
= self
.ldb
.get_minPwdAge()
76 # Set it temporarily to "0"
77 self
.ldb
.set_minPwdAge("0")
78 self
.base_dn
= self
.ldb
.domain_dn()
80 # Reset the "minPwdAge" as it was before
81 self
.addCleanup(self
.ldb
.set_minPwdAge
, minPwdAge
)
83 # (Re)adds the test user USER_NAME with password USER_PASS
85 "dn": "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
,
86 "objectclass": "user",
87 "sAMAccountName": USER_NAME
,
88 "userPassword": USER_PASS
90 self
.ldb
.newgroup(GROUP_NAME_01
)
91 self
.ldb
.newgroup(GROUP_NAME_02
)
95 delete_force(self
.ldb
, "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
)
96 self
.ldb
.deletegroup(GROUP_NAME_01
)
97 self
.ldb
.deletegroup(GROUP_NAME_02
)
99 def test_add_and_remove_users_from_group(self
):
102 # Wait for the primary group change for the created user.
104 messages
= self
.waitForMessages(2)
105 print("Received %d messages" % len(messages
))
108 "Did not receive the expected number of messages")
109 audit
= messages
[0]["groupChange"]
111 self
.assertEqual("PrimaryGroup", audit
["action"])
112 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
113 group_dn
= "cn=domain users,cn=users," + self
.base_dn
114 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
115 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
116 self
.assertRegex(audit
["remoteAddress"],
118 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
119 session_id
= self
.get_session()
120 self
.assertEqual(session_id
, audit
["sessionId"])
121 service_description
= self
.get_service_description()
122 self
.assertEqual(service_description
, "LDAP")
124 # Check the Add message for the new users primary group
125 audit
= messages
[1]["groupChange"]
127 self
.assertEqual("Added", audit
["action"])
128 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
129 group_dn
= "cn=domain users,cn=users," + self
.base_dn
130 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
131 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
132 self
.assertRegex(audit
["remoteAddress"],
134 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
135 session_id
= self
.get_session()
136 self
.assertEqual(session_id
, audit
["sessionId"])
137 self
.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP
,
140 # Add the user to a group
142 self
.discardMessages()
144 self
.ldb
.add_remove_group_members(GROUP_NAME_01
, [USER_NAME
])
145 messages
= self
.waitForMessages(1)
146 print("Received %d messages" % len(messages
))
149 "Did not receive the expected number of messages")
150 audit
= messages
[0]["groupChange"]
152 self
.assertEqual("Added", audit
["action"])
153 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
154 group_dn
= "cn=" + GROUP_NAME_01
+ ",cn=users," + self
.base_dn
155 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
156 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
157 self
.assertRegex(audit
["remoteAddress"],
159 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
160 session_id
= self
.get_session()
161 self
.assertEqual(session_id
, audit
["sessionId"])
162 service_description
= self
.get_service_description()
163 self
.assertEqual(service_description
, "LDAP")
166 # Add the user to another group
168 self
.discardMessages()
169 self
.ldb
.add_remove_group_members(GROUP_NAME_02
, [USER_NAME
])
171 messages
= self
.waitForMessages(1)
172 print("Received %d messages" % len(messages
))
175 "Did not receive the expected number of messages")
176 audit
= messages
[0]["groupChange"]
178 self
.assertEqual("Added", audit
["action"])
179 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
180 group_dn
= "cn=" + GROUP_NAME_02
+ ",cn=users," + self
.base_dn
181 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
182 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
183 self
.assertRegex(audit
["remoteAddress"],
185 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
186 session_id
= self
.get_session()
187 self
.assertEqual(session_id
, audit
["sessionId"])
188 service_description
= self
.get_service_description()
189 self
.assertEqual(service_description
, "LDAP")
192 # Remove the user from a group
194 self
.discardMessages()
195 self
.ldb
.add_remove_group_members(
198 add_members_operation
=False)
199 messages
= self
.waitForMessages(1)
200 print("Received %d messages" % len(messages
))
203 "Did not receive the expected number of messages")
204 audit
= messages
[0]["groupChange"]
206 self
.assertEqual("Removed", audit
["action"])
207 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
208 group_dn
= "cn=" + GROUP_NAME_01
+ ",cn=users," + self
.base_dn
209 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
210 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
211 self
.assertRegex(audit
["remoteAddress"],
213 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
214 session_id
= self
.get_session()
215 self
.assertEqual(session_id
, audit
["sessionId"])
216 service_description
= self
.get_service_description()
217 self
.assertEqual(service_description
, "LDAP")
220 # Re-add the user to a group
222 self
.discardMessages()
223 self
.ldb
.add_remove_group_members(GROUP_NAME_01
, [USER_NAME
])
225 messages
= self
.waitForMessages(1)
226 print("Received %d messages" % len(messages
))
229 "Did not receive the expected number of messages")
230 audit
= messages
[0]["groupChange"]
232 self
.assertEqual("Added", audit
["action"])
233 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
234 group_dn
= "cn=" + GROUP_NAME_01
+ ",cn=users," + self
.base_dn
235 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
236 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
237 self
.assertRegex(audit
["remoteAddress"],
239 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
240 session_id
= self
.get_session()
241 self
.assertEqual(session_id
, audit
["sessionId"])
242 service_description
= self
.get_service_description()
243 self
.assertEqual(service_description
, "LDAP")
245 def test_change_primary_group(self
):
248 # Wait for the primary group change for the created user.
250 messages
= self
.waitForMessages(2)
251 print("Received %d messages" % len(messages
))
254 "Did not receive the expected number of messages")
256 # Check the PrimaryGroup message
257 audit
= messages
[0]["groupChange"]
259 self
.assertEqual("PrimaryGroup", audit
["action"])
260 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
261 group_dn
= "cn=domain users,cn=users," + self
.base_dn
262 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
263 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
264 self
.assertRegex(audit
["remoteAddress"],
266 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
267 session_id
= self
.get_session()
268 self
.assertEqual(session_id
, audit
["sessionId"])
269 service_description
= self
.get_service_description()
270 self
.assertEqual(service_description
, "LDAP")
272 # Check the Add message for the new users primary group
273 audit
= messages
[1]["groupChange"]
275 self
.assertEqual("Added", audit
["action"])
276 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
277 group_dn
= "cn=domain users,cn=users," + self
.base_dn
278 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
279 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
280 self
.assertRegex(audit
["remoteAddress"],
282 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
283 session_id
= self
.get_session()
284 self
.assertEqual(session_id
, audit
["sessionId"])
285 self
.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP
,
289 # Add the user to a group, the user needs to be a member of a group
290 # before there primary group can be set to that group.
292 self
.discardMessages()
294 self
.ldb
.add_remove_group_members(GROUP_NAME_01
, [USER_NAME
])
295 messages
= self
.waitForMessages(1)
296 print("Received %d messages" % len(messages
))
299 "Did not receive the expected number of messages")
300 audit
= messages
[0]["groupChange"]
302 self
.assertEqual("Added", audit
["action"])
303 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
304 group_dn
= "cn=" + GROUP_NAME_01
+ ",cn=users," + self
.base_dn
305 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
306 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
307 self
.assertRegex(audit
["remoteAddress"],
309 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
310 session_id
= self
.get_session()
311 self
.assertEqual(session_id
, audit
["sessionId"])
312 service_description
= self
.get_service_description()
313 self
.assertEqual(service_description
, "LDAP")
314 self
.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP
,
318 # Change the primary group of a user
320 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
321 group_dn
= "cn=" + GROUP_NAME_01
+ ",cn=users," + self
.base_dn
322 # get the primaryGroupToken of the group
323 res
= self
.ldb
.search(base
=group_dn
, attrs
=["primaryGroupToken"],
324 scope
=ldb
.SCOPE_BASE
)
325 group_id
= res
[0]["primaryGroupToken"]
327 # set primaryGroupID attribute of the user to that group
329 m
.dn
= ldb
.Dn(self
.ldb
, user_dn
)
330 m
["primaryGroupID"] = ldb
.MessageElement(
334 self
.discardMessages()
338 # Wait for the primary group change.
339 # Will see the user removed from the new group
340 # the user added to their old primary group
341 # and a new primary group event.
343 messages
= self
.waitForMessages(3)
344 print("Received %d messages" % len(messages
))
347 "Did not receive the expected number of messages")
349 audit
= messages
[0]["groupChange"]
350 self
.assertEqual("Removed", audit
["action"])
351 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
352 group_dn
= "cn=" + GROUP_NAME_01
+ ",cn=users," + self
.base_dn
353 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
354 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
355 self
.assertRegex(audit
["remoteAddress"],
357 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
358 session_id
= self
.get_session()
359 self
.assertEqual(session_id
, audit
["sessionId"])
360 service_description
= self
.get_service_description()
361 self
.assertEqual(service_description
, "LDAP")
362 self
.assertEqual(EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP
,
365 audit
= messages
[1]["groupChange"]
367 self
.assertEqual("Added", audit
["action"])
368 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
369 group_dn
= "cn=domain users,cn=users," + self
.base_dn
370 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
371 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
372 self
.assertRegex(audit
["remoteAddress"],
374 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
375 session_id
= self
.get_session()
376 self
.assertEqual(session_id
, audit
["sessionId"])
377 service_description
= self
.get_service_description()
378 self
.assertEqual(service_description
, "LDAP")
379 self
.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP
,
382 audit
= messages
[2]["groupChange"]
384 self
.assertEqual("PrimaryGroup", audit
["action"])
385 user_dn
= "cn=" + USER_NAME
+ ",cn=users," + self
.base_dn
386 group_dn
= "cn=" + GROUP_NAME_01
+ ",cn=users," + self
.base_dn
387 self
.assertTrue(user_dn
.lower(), audit
["user"].lower())
388 self
.assertTrue(group_dn
.lower(), audit
["group"].lower())
389 self
.assertRegex(audit
["remoteAddress"],
391 self
.assertTrue(self
.is_guid(audit
["sessionId"]))
392 session_id
= self
.get_session()
393 self
.assertEqual(session_id
, audit
["sessionId"])
394 service_description
= self
.get_service_description()
395 self
.assertEqual(service_description
, "LDAP")