2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
6 sys
.path
.insert(0, "bin/python")
9 from samba
.dcerpc
import drsuapi
, misc
10 from samba
.ndr
import ndr_unpack
, ndr_pack
15 class LATestException(Exception):
19 class LATests(drs_base
.DrsBaseTestCase
):
22 super(LATests
, self
).setUp()
23 # DrsBaseTestCase sets up self.ldb_dc1, self.ldb_dc2
24 # we're only using one
25 self
.samdb
= self
.ldb_dc1
27 self
.base_dn
= self
.samdb
.domain_dn()
28 self
.ou
= "OU=la,%s" % self
.base_dn
31 self
.samdb
.delete(self
.ou
, ['tree_delete:1'])
32 except ldb
.LdbError
as e
:
34 self
.samdb
.add({'objectclass': 'organizationalUnit',
37 self
.dc_guid
= self
.samdb
.get_invocation_id()
38 self
.drs
, self
.drs_handle
= self
._ds
_bind
(self
.dnsname_dc1
)
41 super(LATests
, self
).tearDown()
43 self
.samdb
.delete(self
.ou
, ['tree_delete:1'])
44 except ldb
.LdbError
as e
:
47 def delete_user(self
, user
):
48 self
.samdb
.delete(user
['dn'])
49 del self
.users
[self
.users
.index(user
)]
51 def add_object(self
, cn
, objectclass
):
52 dn
= "CN=%s,%s" % (cn
, self
.ou
)
53 self
.samdb
.add({'cn': cn
,
54 'objectclass': objectclass
,
59 def add_objects(self
, n
, objectclass
, prefix
=None):
64 dns
.append(self
.add_object("%s%d" % (prefix
, i
+ 1),
68 def add_linked_attribute(self
, src
, dest
, attr
='member'):
70 m
.dn
= ldb
.Dn(self
.samdb
, src
)
71 m
[attr
] = ldb
.MessageElement(dest
, ldb
.FLAG_MOD_ADD
, attr
)
74 def remove_linked_attribute(self
, src
, dest
, attr
='member'):
76 m
.dn
= ldb
.Dn(self
.samdb
, src
)
77 m
[attr
] = ldb
.MessageElement(dest
, ldb
.FLAG_MOD_DELETE
, attr
)
80 def attr_search(self
, obj
, expected
, attr
, scope
=ldb
.SCOPE_BASE
):
82 req8
= self
._exop
_req
8(dest_dsa
=None,
83 invocation_id
=self
.dc_guid
,
85 exop
=drsuapi
.DRSUAPI_EXOP_REPL_OBJ
)
87 level
, ctr
= self
.drs
.DsGetNCChanges(self
.drs_handle
, 8, req8
)
88 expected_attid
= getattr(drsuapi
, 'DRSUAPI_ATTID_' + attr
)
91 for link
in ctr
.linked_attributes
:
92 if link
.attid
== expected_attid
:
93 unpacked
= ndr_unpack(drsuapi
.DsReplicaObjectIdentifier3
,
95 active
= link
.flags
& drsuapi
.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
96 links
.append((str(unpacked
.dn
), bool(active
)))
100 def assert_forward_links(self
, obj
, expected
, attr
='member'):
101 results
= self
.attr_search(obj
, expected
, attr
)
102 self
.assertEqual(len(results
), len(expected
))
105 self
.assertTrue(k
in expected
)
106 self
.assertEqual(expected
[k
], v
, "%s active flag should be %d, not %d" %
109 def get_object_guid(self
, dn
):
110 res
= self
.samdb
.search(dn
,
111 scope
=ldb
.SCOPE_BASE
,
112 attrs
=['objectGUID'])
113 return str(misc
.GUID(res
[0]['objectGUID'][0]))
115 def test_links_all_delete_group(self
):
116 u1
, u2
= self
.add_objects(2, 'user', 'u_all_del_group')
117 g1
, g2
= self
.add_objects(2, 'group', 'g_all_del_group')
118 g2guid
= self
.get_object_guid(g2
)
120 self
.add_linked_attribute(g1
, u1
)
121 self
.add_linked_attribute(g2
, u1
)
122 self
.add_linked_attribute(g2
, u2
)
124 self
.samdb
.delete(g2
)
125 self
.assert_forward_links(g1
, {u1
: True})
126 res
= self
.samdb
.search('<GUID=%s>' % g2guid
,
127 scope
=ldb
.SCOPE_BASE
,
128 controls
=['show_deleted:1'])
130 self
.assert_forward_links(new_dn
, {})
132 def test_la_links_delete_link(self
):
133 u1
, u2
= self
.add_objects(2, 'user', 'u_del_link')
134 g1
, g2
= self
.add_objects(2, 'group', 'g_del_link')
136 self
.add_linked_attribute(g1
, u1
)
137 self
.add_linked_attribute(g2
, u1
)
138 self
.add_linked_attribute(g2
, u2
)
140 self
.remove_linked_attribute(g2
, u1
)
142 self
.assert_forward_links(g1
, {u1
: True})
143 self
.assert_forward_links(g2
, {u1
: False, u2
: True})
145 self
.add_linked_attribute(g2
, u1
)
146 self
.remove_linked_attribute(g2
, u2
)
147 self
.assert_forward_links(g2
, {u1
: True, u2
: False})
148 self
.remove_linked_attribute(g2
, u1
)
149 self
.assert_forward_links(g2
, {u1
: False, u2
: False})
151 def test_la_links_delete_user(self
):
152 u1
, u2
= self
.add_objects(2, 'user', 'u_del_user')
153 g1
, g2
= self
.add_objects(2, 'group', 'g_del_user')
155 self
.add_linked_attribute(g1
, u1
)
156 self
.add_linked_attribute(g2
, u1
)
157 self
.add_linked_attribute(g2
, u2
)
159 self
.samdb
.delete(u1
)
161 self
.assert_forward_links(g1
, {})
162 self
.assert_forward_links(g2
, {u2
: True})