ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / samba_tool / group.py
blobe8c0960849f78f93bc8cf4b63623c1b2d59ec543
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Michael Adam 2012
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import os
19 import time
20 import ldb
21 from samba.tests.samba_tool.base import SambaToolCmdTest
22 from samba import (
23 nttime2unix,
24 dsdb
28 class GroupCmdTestCase(SambaToolCmdTest):
29 """Tests for samba-tool group subcommands"""
30 groups = []
31 samdb = None
33 def setUp(self):
34 super().setUp()
35 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
36 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
37 self.groups = []
38 self.groups.append(self._randomGroup({"name": "testgroup1"}))
39 self.groups.append(self._randomGroup({"name": "testgroup2"}))
40 self.groups.append(self._randomGroup({"name": "testgroup3"}))
41 self.groups.append(self._randomGroup({"name": "testgroup4"}))
42 self.groups.append(self._randomGroup({"name": "testgroup5 (with brackets)"}))
43 self.groups.append(self._randomPosixGroup({"name": "posixgroup1"}))
44 self.groups.append(self._randomPosixGroup({"name": "posixgroup2"}))
45 self.groups.append(self._randomPosixGroup({"name": "posixgroup3"}))
46 self.groups.append(self._randomPosixGroup({"name": "posixgroup4"}))
47 self.groups.append(self._randomPosixGroup({"name": "posixgroup5 (with brackets)"}))
48 self.groups.append(self._randomUnixGroup({"name": "unixgroup1"}))
49 self.groups.append(self._randomUnixGroup({"name": "unixgroup2"}))
50 self.groups.append(self._randomUnixGroup({"name": "unixgroup3"}))
51 self.groups.append(self._randomUnixGroup({"name": "unixgroup4"}))
52 self.groups.append(self._randomUnixGroup({"name": "unixgroup5 (with brackets)"}))
54 # setup the 12 groups and ensure they are correct
55 for group in self.groups:
56 (result, out, err) = group["createGroupFn"](group)
58 self.assertCmdSuccess(result, out, err)
59 self.assertEqual(err, "", "There shouldn't be any error message")
61 if 'unix' in group["name"]:
62 self.assertIn("Modified Group '%s' successfully"
63 % group["name"], out)
64 else:
65 self.assertIn("Added group %s" % group["name"], out)
67 group["checkGroupFn"](group)
69 found = self._find_group(group["name"])
71 self.assertIsNotNone(found)
73 self.assertEqual("%s" % found.get("name"), group["name"])
74 self.assertEqual("%s" % found.get("description"), group["description"])
76 def tearDown(self):
77 super().tearDown()
78 # clean up all the left over groups, just in case
79 for group in self.groups:
80 if self._find_group(group["name"]):
81 self.runsubcmd("group", "delete", group["name"])
83 def test_newgroup(self):
84 """This tests the "group add" and "group delete" commands"""
85 # try to add all the groups again, this should fail
86 for group in self.groups:
87 (result, out, err) = self._create_group(group)
88 self.assertCmdFail(result, "Succeeded to add existing group")
89 self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
91 # try to delete all the groups we just added
92 for group in self.groups:
93 (result, out, err) = self.runsubcmd("group", "delete", group["name"])
94 self.assertCmdSuccess(result, out, err,
95 "Failed to delete group '%s'" % group["name"])
96 found = self._find_group(group["name"])
97 self.assertIsNone(found,
98 "Deleted group '%s' still exists" % group["name"])
100 # test adding groups
101 for group in self.groups:
102 (result, out, err) = self.runsubcmd("group", "add", group["name"],
103 "--description=%s" % group["description"],
104 "-H", "ldap://%s" % os.environ["DC_SERVER"],
105 "-U%s%%%s" % (os.environ["DC_USERNAME"],
106 os.environ["DC_PASSWORD"]))
108 self.assertCmdSuccess(result, out, err)
109 self.assertEqual(err, "", "There shouldn't be any error message")
110 self.assertIn("Added group %s" % group["name"], out)
112 found = self._find_group(group["name"])
114 self.assertEqual("%s" % found.get("samaccountname"),
115 "%s" % group["name"])
117 def test_list(self):
118 (result, out, err) = self.runsubcmd("group", "list",
119 "-H", "ldap://%s" % os.environ["DC_SERVER"],
120 "-U%s%%%s" % (os.environ["DC_USERNAME"],
121 os.environ["DC_PASSWORD"]))
122 self.assertCmdSuccess(result, out, err, "Error running list")
124 search_filter = "(objectClass=group)"
126 grouplist = self.samdb.search(base=self.samdb.domain_dn(),
127 scope=ldb.SCOPE_SUBTREE,
128 expression=search_filter,
129 attrs=["samaccountname"])
131 self.assertTrue(len(grouplist) > 0, "no groups found in samdb")
133 for groupobj in grouplist:
134 name = str(groupobj.get("samaccountname", idx=0))
135 found = self.assertMatch(out, name,
136 "group '%s' not found" % name)
138 def test_list_verbose(self):
139 (result, out, err) = self.runsubcmd("group", "list", "--verbose",
140 "-H", "ldap://%s" % os.environ["DC_SERVER"],
141 "-U%s%%%s" % (os.environ["DC_USERNAME"],
142 os.environ["DC_PASSWORD"]))
143 self.assertCmdSuccess(result, out, err, "Error running list --verbose")
145 # use the output to build a dictionary, where key=group-name,
146 # value=num-members
147 output_memberships = {}
149 # split the output by line, skipping the first 2 header lines
150 group_lines = out.split('\n')[2:-1]
151 for line in group_lines:
152 # split line by column whitespace (but keep the group name together
153 # if it contains spaces)
154 values = line.split(" ")
155 name = values[0]
156 num_members = int(values[-1])
157 output_memberships[name] = num_members
159 # build up a similar dict using an LDAP search
160 search_filter = "(objectClass=group)"
161 grouplist = self.samdb.search(base=self.samdb.domain_dn(),
162 scope=ldb.SCOPE_SUBTREE,
163 expression=search_filter,
164 attrs=["samaccountname", "member"])
165 self.assertTrue(len(grouplist) > 0, "no groups found in samdb")
167 ldap_memberships = {}
168 for groupobj in grouplist:
169 name = str(groupobj.get("samaccountname", idx=0))
170 num_members = len(groupobj.get("member", default=[]))
171 ldap_memberships[name] = num_members
173 # check the command output matches LDAP
174 self.assertTrue(output_memberships == ldap_memberships,
175 "Command output doesn't match LDAP results.\n" +
176 "Command='%s'\nLDAP='%s'" %(output_memberships,
177 ldap_memberships))
179 def test_list_full_dn(self):
180 (result, out, err) = self.runsubcmd("group", "list", "--full-dn",
181 "-H", "ldap://%s" % os.environ["DC_SERVER"],
182 "-U%s%%%s" % (os.environ["DC_USERNAME"],
183 os.environ["DC_PASSWORD"]))
184 self.assertCmdSuccess(result, out, err, "Error running list")
186 search_filter = "(objectClass=group)"
188 grouplist = self.samdb.search(base=self.samdb.domain_dn(),
189 scope=ldb.SCOPE_SUBTREE,
190 expression=search_filter,
191 attrs=[])
193 self.assertTrue(len(grouplist) > 0, "no groups found in samdb")
195 for groupobj in grouplist:
196 name = str(groupobj.get("dn", idx=0))
197 found = self.assertMatch(out, name,
198 "group '%s' not found" % name)
200 def test_list_base_dn(self):
201 base_dn = "CN=Users"
202 (result, out, err) = self.runsubcmd("group", "list", "--base-dn", base_dn,
203 "-H", "ldap://%s" % os.environ["DC_SERVER"],
204 "-U%s%%%s" % (os.environ["DC_USERNAME"],
205 os.environ["DC_PASSWORD"]))
206 self.assertCmdSuccess(result, out, err, "Error running list")
208 search_filter = "(objectClass=group)"
210 grouplist = self.samdb.search(base=self.samdb.normalize_dn_in_domain(base_dn),
211 scope=ldb.SCOPE_SUBTREE,
212 expression=search_filter,
213 attrs=["name"])
215 self.assertTrue(len(grouplist) > 0, "no groups found in samdb")
217 for groupobj in grouplist:
218 name = str(groupobj.get("name", idx=0))
219 found = self.assertMatch(out, name,
220 "group '%s' not found" % name)
222 def test_listmembers(self):
223 (result, out, err) = self.runsubcmd("group", "listmembers", "Domain Users",
224 "-H", "ldap://%s" % os.environ["DC_SERVER"],
225 "-U%s%%%s" % (os.environ["DC_USERNAME"],
226 os.environ["DC_PASSWORD"]))
227 self.assertCmdSuccess(result, out, err, "Error running listmembers")
229 search_filter = "(|(primaryGroupID=513)(memberOf=CN=Domain Users,CN=Users,%s))" % self.samdb.domain_dn()
231 grouplist = self.samdb.search(base=self.samdb.domain_dn(),
232 scope=ldb.SCOPE_SUBTREE,
233 expression=search_filter,
234 attrs=["samAccountName"])
236 self.assertTrue(len(grouplist) > 0, "no groups found in samdb")
238 for groupobj in grouplist:
239 name = str(groupobj.get("samAccountName", idx=0))
240 found = self.assertMatch(out, name, "group '%s' not found" % name)
242 def test_listmembers_hide_expired(self):
243 expire_username = "expireUser"
244 expire_user = self._random_user({"name": expire_username})
245 self._create_user(expire_user)
247 (result, out, err) = self.runsubcmd(
248 "group",
249 "listmembers",
250 "Domain Users",
251 "--hide-expired",
252 "-H",
253 "ldap://%s" % os.environ["DC_SERVER"],
254 "-U%s%%%s" % (os.environ["DC_USERNAME"],
255 os.environ["DC_PASSWORD"]))
256 self.assertCmdSuccess(result, out, err, "Error running listmembers")
257 self.assertTrue(expire_username in out,
258 "user '%s' not found" % expire_username)
260 # user will be expired one second ago
261 self.samdb.setexpiry(
262 "(sAMAccountname=%s)" % expire_username,
264 False)
266 (result, out, err) = self.runsubcmd(
267 "group",
268 "listmembers",
269 "Domain Users",
270 "--hide-expired",
271 "-H",
272 "ldap://%s" % os.environ["DC_SERVER"],
273 "-U%s%%%s" % (os.environ["DC_USERNAME"],
274 os.environ["DC_PASSWORD"]))
275 self.assertCmdSuccess(result, out, err, "Error running listmembers")
276 self.assertFalse(expire_username in out,
277 "user '%s' not found" % expire_username)
279 self.samdb.deleteuser(expire_username)
281 def test_listmembers_hide_disabled(self):
282 disable_username = "disableUser"
283 disable_user = self._random_user({"name": disable_username})
284 self._create_user(disable_user)
286 (result, out, err) = self.runsubcmd(
287 "group",
288 "listmembers",
289 "Domain Users",
290 "--hide-disabled",
291 "-H",
292 "ldap://%s" % os.environ["DC_SERVER"],
293 "-U%s%%%s" % (os.environ["DC_USERNAME"],
294 os.environ["DC_PASSWORD"]))
295 self.assertCmdSuccess(result, out, err, "Error running listmembers")
296 self.assertTrue(disable_username in out,
297 "user '%s' not found" % disable_username)
299 self.samdb.disable_account("(sAMAccountname=%s)" % disable_username)
301 (result, out, err) = self.runsubcmd(
302 "group",
303 "listmembers",
304 "Domain Users",
305 "--hide-disabled",
306 "-H",
307 "ldap://%s" % os.environ["DC_SERVER"],
308 "-U%s%%%s" % (os.environ["DC_USERNAME"],
309 os.environ["DC_PASSWORD"]))
310 self.assertCmdSuccess(result, out, err, "Error running listmembers")
311 self.assertFalse(disable_username in out,
312 "user '%s' not found" % disable_username)
314 self.samdb.deleteuser(disable_username)
316 def test_listmembers_full_dn(self):
317 (result, out, err) = self.runsubcmd("group", "listmembers", "Domain Users",
318 "--full-dn",
319 "-H", "ldap://%s" % os.environ["DC_SERVER"],
320 "-U%s%%%s" % (os.environ["DC_USERNAME"],
321 os.environ["DC_PASSWORD"]))
322 self.assertCmdSuccess(result, out, err, "Error running listmembers")
324 search_filter = "(|(primaryGroupID=513)(memberOf=CN=Domain Users,CN=Users,%s))" % self.samdb.domain_dn()
326 grouplist = self.samdb.search(base=self.samdb.domain_dn(),
327 scope=ldb.SCOPE_SUBTREE,
328 expression=search_filter,
329 attrs=["dn"])
331 self.assertTrue(len(grouplist) > 0, "no groups found in samdb")
333 for groupobj in grouplist:
334 name = str(groupobj.get("dn", idx=0))
335 self.assertMatch(out, name, "group '%s' not found" % name)
338 def test_move(self):
339 full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest_grp"))
340 self.addCleanup(self.samdb.delete, full_ou_dn, ["tree_delete:1"])
342 (result, out, err) = self.runsubcmd("ou", "add", full_ou_dn)
343 self.assertCmdSuccess(result, out, err)
344 self.assertEqual(err, "", "There shouldn't be any error message")
345 self.assertIn('Added ou "%s"' % full_ou_dn, out)
347 for group in self.groups:
348 (result, out, err) = self.runsubcmd(
349 "group", "move", group["name"], full_ou_dn)
350 self.assertCmdSuccess(result, out, err, "Error running move")
351 self.assertIn('Moved group "%s" into "%s"' %
352 (group["name"], full_ou_dn), out)
354 # Should fail as groups objects are in OU
355 (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
356 self.assertCmdFail(result)
357 self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
358 "(it has %d children)!") % len(self.groups), err)
360 for group in self.groups:
361 new_dn = "CN=Users,%s" % self.samdb.domain_dn()
362 (result, out, err) = self.runsubcmd(
363 "group", "move", group["name"], new_dn)
364 self.assertCmdSuccess(result, out, err, "Error running move")
365 self.assertIn('Moved group "%s" into "%s"' %
366 (group["name"], new_dn), out)
368 def test_show(self):
369 """Assert that we can show a group correctly."""
370 (result, out, err) = self.runsubcmd("group", "show", "Domain Users",
371 "-H", "ldap://%s" % os.environ["DC_SERVER"],
372 "-U%s%%%s" % (os.environ["DC_USERNAME"],
373 os.environ["DC_PASSWORD"]))
374 self.assertCmdSuccess(result, out, err)
375 self.assertEqual(err, "", "Shouldn't be any error messages")
376 self.assertIn("dn: CN=Domain Users,CN=Users,DC=addom,DC=samba,DC=example,DC=com", out)
378 def test_rename_samaccountname(self):
379 """rename the samaccountname of all groups"""
380 for group in self.groups:
381 new_name = "new_samaccountname_of_" + group["name"]
383 # change samaccountname
384 (result, out, err) = self.runsubcmd("group", "rename", group["name"],
385 "--samaccountname=" + new_name)
386 self.assertCmdSuccess(result, out, err)
387 self.assertEqual(err, "", "Shouldn't be any error messages")
388 self.assertIn('successfully', out)
390 found = self._find_group(new_name)
391 self.assertEqual("%s" % found.get("description"), group["description"])
392 if not "cn" in group or str(group["cn"]) == str(group["name"]):
393 self.assertEqual("%s" % found.get("cn"), new_name)
394 else:
395 self.assertEqual("%s" % found.get("cn"), group["cn"])
397 # trying to remove the samaccountname throws an error
398 (result, out, err) = self.runsubcmd("group", "rename", new_name,
399 "--samaccountname=")
400 self.assertCmdFail(result)
401 self.assertIn('Failed to rename group', err)
402 self.assertIn('delete protected attribute', err)
404 # reset changes
405 (result, out, err) = self.runsubcmd("group", "rename", new_name,
406 "--samaccountname=" + group["name"])
407 self.assertCmdSuccess(result, out, err)
408 if "cn" in group:
409 (result, out, err) = self.runsubcmd("group", "rename", group["name"],
410 "--force-new-cn=%s" % group["cn"])
411 self.assertCmdSuccess(result, out, err)
413 def test_rename_cn_mail(self):
414 """change and remove the cn and mail attributes of all groups"""
415 for group in self.groups:
416 new_mail = "new mail of " + group["name"]
417 new_cn = "new cn of " + group["name"]
419 # change attributes
420 (result, out, err) = self.runsubcmd("group", "rename", group["name"],
421 "--mail-address=" + new_mail,
422 "--force-new-cn=" + new_cn)
423 self.assertCmdSuccess(result, out, err)
424 self.assertEqual(err, "", "Shouldn't be any error messages")
425 self.assertIn('successfully', out)
427 found = self._find_group(group["name"])
428 self.assertEqual("%s" % found.get("mail"), new_mail)
429 self.assertEqual("%s" % found.get("cn"), new_cn)
431 # remove mail
432 (result, out, err) = self.runsubcmd("group", "rename", group["name"],
433 "--mail-address=")
434 self.assertCmdSuccess(result, out, err)
435 self.assertEqual(err, "", "Shouldn't be any error messages")
436 self.assertIn('successfully', out)
438 found = self._find_group(group["name"])
439 self.assertEqual(found.get("mail"), None)
441 # trying to remove cn (throws an error)
442 (result, out, err) = self.runsubcmd("group", "rename", group["name"],
443 "--force-new-cn=")
444 self.assertCmdFail(result)
445 self.assertIn("Failed to rename group", err)
446 self.assertIn("delete protected attribute", err)
448 # reset CN (mail is already empty)
449 (result, out, err) = self.runsubcmd("group", "rename", group["name"],
450 "--reset-cn")
451 self.assertCmdSuccess(result, out, err)
453 def _randomGroup(self, base={}):
454 """create a group with random attribute values, you can specify base
455 attributes"""
456 group = {
457 "name": self.randomName(),
458 "description": self.randomName(count=100),
459 "createGroupFn": self._create_group,
460 "checkGroupFn": self._check_group,
462 group.update(base)
463 return group
465 def _randomPosixGroup(self, base={}):
466 """create a group with random attribute values and additional RFC2307
467 attributes, you can specify base attributes"""
468 group = self._randomGroup({})
469 group.update(base)
470 posixAttributes = {
471 "unixdomain": self.randomName(),
472 "gidNumber": self.randomXid(),
473 "createGroupFn": self._create_posix_group,
474 "checkGroupFn": self._check_posix_group,
476 group.update(posixAttributes)
477 group.update(base)
478 return group
480 def _randomUnixGroup(self, base={}):
481 """create a group with random attribute values and additional RFC2307
482 attributes, you can specify base attributes"""
483 group = self._randomGroup({})
484 group.update(base)
485 posixAttributes = {
486 "gidNumber": self.randomXid(),
487 "createGroupFn": self._create_unix_group,
488 "checkGroupFn": self._check_unix_group,
490 group.update(posixAttributes)
491 group.update(base)
492 return group
494 def _check_group(self, group):
495 """ check if a group from SamDB has the same attributes as
496 its template """
497 found = self._find_group(group["name"])
499 self.assertEqual("%s" % found.get("name"), group["name"])
500 self.assertEqual("%s" % found.get("description"), group["description"])
502 def _check_posix_group(self, group):
503 """ check if a posix_group from SamDB has the same attributes as
504 its template """
505 found = self._find_group(group["name"])
507 self.assertEqual("%s" % found.get("gidNumber"), "%s" %
508 group["gidNumber"])
509 self._check_group(group)
511 def _check_unix_group(self, group):
512 """ check if a unix_group from SamDB has the same attributes as its
513 template """
514 found = self._find_group(group["name"])
516 self.assertEqual("%s" % found.get("gidNumber"), "%s" %
517 group["gidNumber"])
518 self._check_group(group)
520 def _create_group(self, group):
521 return self.runsubcmd("group", "add", group["name"],
522 "--description=%s" % group["description"],
523 "-H", "ldap://%s" % os.environ["DC_SERVER"],
524 "-U%s%%%s" % (os.environ["DC_USERNAME"],
525 os.environ["DC_PASSWORD"]))
527 def _create_posix_group(self, group):
528 """ create a new group with RFC2307 attributes """
529 return self.runsubcmd("group", "add", group["name"],
530 "--description=%s" % group["description"],
531 "--nis-domain=%s" % group["unixdomain"],
532 "--gid-number=%s" % group["gidNumber"],
533 "-H", "ldap://%s" % os.environ["DC_SERVER"],
534 "-U%s%%%s" % (os.environ["DC_USERNAME"],
535 os.environ["DC_PASSWORD"]))
537 def _create_unix_group(self, group):
538 """ Add RFC2307 attributes to a group"""
539 self._create_group(group)
540 return self.runsubcmd("group", "addunixattrs", group["name"],
541 "%s" % group["gidNumber"],
542 "-H", "ldap://%s" % os.environ["DC_SERVER"],
543 "-U%s%%%s" % (os.environ["DC_USERNAME"],
544 os.environ["DC_PASSWORD"]))
546 def _find_group(self, name):
547 search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
548 (ldb.binary_encode(name),
549 "CN=Group,CN=Schema,CN=Configuration",
550 self.samdb.domain_dn()))
551 grouplist = self.samdb.search(base=self.samdb.domain_dn(),
552 scope=ldb.SCOPE_SUBTREE,
553 expression=search_filter)
554 if grouplist:
555 return grouplist[0]
556 else:
557 return None
559 def test_stats(self):
560 (result, out, err) = self.runsubcmd("group", "stats",
561 "-H", "ldap://%s" % os.environ["DC_SERVER"],
562 "-U%s%%%s" % (os.environ["DC_USERNAME"],
563 os.environ["DC_PASSWORD"]))
564 self.assertCmdSuccess(result, out, err, "Error running stats")
566 # sanity-check the command reports 'total groups' correctly
567 search_filter = "(objectClass=group)"
568 grouplist = self.samdb.search(base=self.samdb.domain_dn(),
569 scope=ldb.SCOPE_SUBTREE,
570 expression=search_filter,
571 attrs=[])
573 total_groups = len(grouplist)
574 self.assertTrue("Total groups: {0}".format(total_groups) in out,
575 "Total groups not reported correctly")
577 def _random_user(self, base=None):
579 create a user with random attribute values, you can specify
580 base attributes
582 if base is None:
583 base = {}
584 user = {
585 "name": self.randomName(),
586 "password": self.random_password(16),
587 "surname": self.randomName(),
588 "given-name": self.randomName(),
589 "job-title": self.randomName(),
590 "department": self.randomName(),
591 "company": self.randomName(),
592 "description": self.randomName(count=100),
593 "createUserFn": self._create_user,
595 user.update(base)
596 return user
598 def _create_user(self, user):
599 return self.runsubcmd(
600 "user",
601 "add",
602 user["name"],
603 user["password"],
604 "--surname=%s" % user["surname"],
605 "--given-name=%s" % user["given-name"],
606 "--job-title=%s" % user["job-title"],
607 "--department=%s" % user["department"],
608 "--description=%s" % user["description"],
609 "--company=%s" % user["company"],
610 "-H",
611 "ldap://%s" % os.environ["DC_SERVER"],
612 "-U%s%%%s" % (os.environ["DC_USERNAME"],
613 os.environ["DC_PASSWORD"]))