ctdb-scripts: Support storing statd-callout state in cluster filesystem
[samba4-gss.git] / source4 / torture / drs / python / linked_attributes_drs.py
blob93ad3130f037bda082913a095e40adce63b2115a
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 import sys
6 sys.path.insert(0, "bin/python")
7 import ldb
9 from samba.dcerpc import drsuapi, misc
10 from samba.ndr import ndr_unpack, ndr_pack
12 import drs_base
15 class LATestException(Exception):
16 pass
19 class LATests(drs_base.DrsBaseTestCase):
21 def setUp(self):
22 super(LATests, self).setUp()
23 # DrsBaseTestCase sets up self.ldb_dc1, self.ldb_dc2
24 # we're only using one
25 self.samdb = self.ldb_dc1
27 self.base_dn = self.samdb.domain_dn()
28 self.ou = "OU=la,%s" % self.base_dn
29 if True:
30 try:
31 self.samdb.delete(self.ou, ['tree_delete:1'])
32 except ldb.LdbError as e:
33 pass
34 self.samdb.add({'objectclass': 'organizationalUnit',
35 'dn': self.ou})
37 self.dc_guid = self.samdb.get_invocation_id()
38 self.drs, self.drs_handle = self._ds_bind(self.dnsname_dc1)
40 def tearDown(self):
41 super(LATests, self).tearDown()
42 try:
43 self.samdb.delete(self.ou, ['tree_delete:1'])
44 except ldb.LdbError as e:
45 pass
47 def delete_user(self, user):
48 self.samdb.delete(user['dn'])
49 del self.users[self.users.index(user)]
51 def add_object(self, cn, objectclass):
52 dn = "CN=%s,%s" % (cn, self.ou)
53 self.samdb.add({'cn': cn,
54 'objectclass': objectclass,
55 'dn': dn})
57 return dn
59 def add_objects(self, n, objectclass, prefix=None):
60 if prefix is None:
61 prefix = objectclass
62 dns = []
63 for i in range(n):
64 dns.append(self.add_object("%s%d" % (prefix, i + 1),
65 objectclass))
66 return dns
68 def add_linked_attribute(self, src, dest, attr='member'):
69 m = ldb.Message()
70 m.dn = ldb.Dn(self.samdb, src)
71 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
72 self.samdb.modify(m)
74 def remove_linked_attribute(self, src, dest, attr='member'):
75 m = ldb.Message()
76 m.dn = ldb.Dn(self.samdb, src)
77 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
78 self.samdb.modify(m)
80 def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE):
82 req8 = self._exop_req8(dest_dsa=None,
83 invocation_id=self.dc_guid,
84 nc_dn_str=obj,
85 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
87 level, ctr = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
88 expected_attid = getattr(drsuapi, 'DRSUAPI_ATTID_' + attr)
90 links = []
91 for link in ctr.linked_attributes:
92 if link.attid == expected_attid:
93 unpacked = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
94 link.value.blob)
95 active = link.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
96 links.append((str(unpacked.dn), bool(active)))
98 return links
100 def assert_forward_links(self, obj, expected, attr='member'):
101 results = self.attr_search(obj, expected, attr)
102 self.assertEqual(len(results), len(expected))
104 for k, v in results:
105 self.assertTrue(k in expected)
106 self.assertEqual(expected[k], v, "%s active flag should be %d, not %d" %
107 (k, expected[k], v))
109 def get_object_guid(self, dn):
110 res = self.samdb.search(dn,
111 scope=ldb.SCOPE_BASE,
112 attrs=['objectGUID'])
113 return str(misc.GUID(res[0]['objectGUID'][0]))
115 def test_links_all_delete_group(self):
116 u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
117 g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
118 g2guid = self.get_object_guid(g2)
120 self.add_linked_attribute(g1, u1)
121 self.add_linked_attribute(g2, u1)
122 self.add_linked_attribute(g2, u2)
124 self.samdb.delete(g2)
125 self.assert_forward_links(g1, {u1: True})
126 res = self.samdb.search('<GUID=%s>' % g2guid,
127 scope=ldb.SCOPE_BASE,
128 controls=['show_deleted:1'])
129 new_dn = res[0].dn
130 self.assert_forward_links(new_dn, {})
132 def test_la_links_delete_link(self):
133 u1, u2 = self.add_objects(2, 'user', 'u_del_link')
134 g1, g2 = self.add_objects(2, 'group', 'g_del_link')
136 self.add_linked_attribute(g1, u1)
137 self.add_linked_attribute(g2, u1)
138 self.add_linked_attribute(g2, u2)
140 self.remove_linked_attribute(g2, u1)
142 self.assert_forward_links(g1, {u1: True})
143 self.assert_forward_links(g2, {u1: False, u2: True})
145 self.add_linked_attribute(g2, u1)
146 self.remove_linked_attribute(g2, u2)
147 self.assert_forward_links(g2, {u1: True, u2: False})
148 self.remove_linked_attribute(g2, u1)
149 self.assert_forward_links(g2, {u1: False, u2: False})
151 def test_la_links_delete_user(self):
152 u1, u2 = self.add_objects(2, 'user', 'u_del_user')
153 g1, g2 = self.add_objects(2, 'group', 'g_del_user')
155 self.add_linked_attribute(g1, u1)
156 self.add_linked_attribute(g2, u1)
157 self.add_linked_attribute(g2, u2)
159 self.samdb.delete(u1)
161 self.assert_forward_links(g1, {})
162 self.assert_forward_links(g2, {u2: True})