1 # -*- coding: utf-8 -*-
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
23 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/dsdb/tests/python" $SUBUNITRUN dsdb_schema_attributes
31 from ldb
import SCOPE_BASE
, LdbError
34 class SchemaAttributesTestCase(samba
.tests
.TestCase
):
39 self
.lp
= samba
.tests
.env_loadparm()
40 self
.samdb
= samba
.tests
.connect_samdb(self
.lp
.samdb_url())
43 res
= self
.samdb
.search(base
="", expression
="", scope
=SCOPE_BASE
, attrs
=["*"])
44 self
.assertEqual(len(res
), 1)
45 self
.schema_dn
= res
[0]["schemaNamingContext"][0]
46 self
.base_dn
= res
[0]["defaultNamingContext"][0]
47 self
.forest_level
= int(res
[0]["forestFunctionality"][0])
49 def _ldap_schemaUpdateNow(self
):
56 self
.samdb
.modify_ldif(ldif
)
58 def _make_obj_names(self
, prefix
):
59 obj_name
= prefix
+ time
.strftime("%s", time
.gmtime())
60 obj_ldap_name
= obj_name
.replace("-", "")
61 obj_dn
= "CN=%s,%s" % (obj_name
, self
.schema_dn
)
62 return (obj_name
, obj_ldap_name
, obj_dn
)
64 def _make_attr_ldif(self
, attr_name
, attr_dn
, sub_oid
, extra
=None):
66 dn: """ + attr_dn
+ """
68 objectClass: attributeSchema
69 adminDescription: """ + attr_name
+ """
70 adminDisplayName: """ + attr_name
+ """
71 cn: """ + attr_name
+ """
72 attributeId: 1.3.6.1.4.1.7165.4.6.1.8.%d.""" % sub_oid
+ str(random
.randint(1, 100000)) + """
73 attributeSyntax: 2.5.5.12
85 def test_AddIndexedAttribute(self
):
86 # create names for an attribute to add
87 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("schemaAttributes-IdxAttr-")
88 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
, 1,
89 "searchFlags: %d" % samba
.dsdb
.SEARCH_FLAG_ATTINDEX
)
91 # add the new attribute
92 self
.samdb
.add_ldif(ldif
)
93 self
._ldap
_schemaUpdateNow
()
97 attr_res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
)
99 self
.assertIn(attr_ldap_name
, attr_res
[0])
100 self
.assertEqual(len(attr_res
[0][attr_ldap_name
]), 1)
101 self
.assertEqual(str(attr_res
[0][attr_ldap_name
][0]), "CASE_INSENSITIVE")
105 idx_res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
)
107 self
.assertIn(attr_ldap_name
, [str(x
) for x
in idx_res
[0]["@IDXATTR"]])
109 def test_AddUnIndexedAttribute(self
):
110 # create names for an attribute to add
111 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("schemaAttributes-UnIdxAttr-")
112 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
, 2)
114 # add the new attribute
115 self
.samdb
.add_ldif(ldif
)
116 self
._ldap
_schemaUpdateNow
()
120 attr_res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
)
122 self
.assertIn(attr_ldap_name
, attr_res
[0])
123 self
.assertEqual(len(attr_res
[0][attr_ldap_name
]), 1)
124 self
.assertEqual(str(attr_res
[0][attr_ldap_name
][0]), "CASE_INSENSITIVE")
128 idx_res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
)
130 self
.assertNotIn(attr_ldap_name
, [str(x
) for x
in idx_res
[0]["@IDXATTR"]])
132 def test_AddTwoIndexedAttributes(self
):
133 # create names for an attribute to add
134 (attr_name
, attr_ldap_name
, attr_dn
) = self
._make
_obj
_names
("schemaAttributes-2IdxAttr-")
135 ldif
= self
._make
_attr
_ldif
(attr_name
, attr_dn
, 3,
136 "searchFlags: %d" % samba
.dsdb
.SEARCH_FLAG_ATTINDEX
)
138 # add the new attribute
139 self
.samdb
.add_ldif(ldif
)
140 self
._ldap
_schemaUpdateNow
()
142 # create names for an attribute to add
143 (attr_name2
, attr_ldap_name2
, attr_dn2
) = self
._make
_obj
_names
("schemaAttributes-Attr-")
144 ldif
= self
._make
_attr
_ldif
(attr_name2
, attr_dn2
, 4,
145 "searchFlags: %d" % samba
.dsdb
.SEARCH_FLAG_ATTINDEX
)
147 # add the new attribute
148 self
.samdb
.add_ldif(ldif
)
149 self
._ldap
_schemaUpdateNow
()
153 attr_res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
)
155 self
.assertIn(attr_ldap_name
, attr_res
[0])
156 self
.assertEqual(len(attr_res
[0][attr_ldap_name
]), 1)
157 self
.assertEqual(str(attr_res
[0][attr_ldap_name
][0]), "CASE_INSENSITIVE")
159 self
.assertIn(attr_ldap_name2
, attr_res
[0])
160 self
.assertEqual(len(attr_res
[0][attr_ldap_name2
]), 1)
161 self
.assertEqual(str(attr_res
[0][attr_ldap_name2
][0]), "CASE_INSENSITIVE")
165 idx_res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
)
167 self
.assertIn(attr_ldap_name
, [str(x
) for x
in idx_res
[0]["@IDXATTR"]])
168 self
.assertIn(attr_ldap_name2
, [str(x
) for x
in idx_res
[0]["@IDXATTR"]])
170 def test_modify_at_attributes(self
):
171 m
= {"dn": "@ATTRIBUTES",
172 "@TEST_EXTRA": ["HIDDEN"]
175 msg
= ldb
.Message
.from_dict(self
.samdb
, m
, ldb
.FLAG_MOD_ADD
)
176 self
.samdb
.modify(msg
)
178 res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
,
179 attrs
=["@TEST_EXTRA"])
180 self
.assertEqual(len(res
), 1)
181 self
.assertEqual(str(res
[0].dn
), "@ATTRIBUTES")
182 self
.assertEqual(len(res
[0]), 1)
183 self
.assertTrue("@TEST_EXTRA" in res
[0])
184 self
.assertEqual(len(res
[0]["@TEST_EXTRA"]), 1)
185 self
.assertEqual(str(res
[0]["@TEST_EXTRA"][0]), "HIDDEN")
187 samdb2
= samba
.tests
.connect_samdb(self
.lp
.samdb_url())
189 # We now only update the @ATTRIBUTES when a transaction happens
190 # rather than making a read of the DB do writes.
192 # This avoids locking issues and is more expected
194 samdb2
.transaction_start()
195 samdb2
.transaction_commit()
197 res
= self
.samdb
.search(base
="@ATTRIBUTES", scope
=ldb
.SCOPE_BASE
,
198 attrs
=["@TEST_EXTRA"])
199 self
.assertEqual(len(res
), 1)
200 self
.assertEqual(str(res
[0].dn
), "@ATTRIBUTES")
201 self
.assertEqual(len(res
[0]), 0)
202 self
.assertFalse("@TEST_EXTRA" in res
[0])
204 def test_modify_at_indexlist(self
):
205 m
= {"dn": "@INDEXLIST",
209 msg
= ldb
.Message
.from_dict(self
.samdb
, m
, ldb
.FLAG_MOD_ADD
)
210 self
.samdb
.modify(msg
)
212 res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
,
213 attrs
=["@TEST_EXTRA"])
214 self
.assertEqual(len(res
), 1)
215 self
.assertEqual(str(res
[0].dn
), "@INDEXLIST")
216 self
.assertEqual(len(res
[0]), 1)
217 self
.assertTrue("@TEST_EXTRA" in res
[0])
218 self
.assertEqual(len(res
[0]["@TEST_EXTRA"]), 1)
219 self
.assertEqual(str(res
[0]["@TEST_EXTRA"][0]), "1")
221 samdb2
= samba
.tests
.connect_samdb(self
.lp
.samdb_url())
223 # We now only update the @INDEXLIST when a transaction happens
224 # rather than making a read of the DB do writes.
226 # This avoids locking issues and is more expected
228 samdb2
.transaction_start()
229 samdb2
.transaction_commit()
231 res
= self
.samdb
.search(base
="@INDEXLIST", scope
=ldb
.SCOPE_BASE
,
232 attrs
=["@TEST_EXTRA"])
233 self
.assertEqual(len(res
), 1)
234 self
.assertEqual(str(res
[0].dn
), "@INDEXLIST")
235 self
.assertEqual(len(res
[0]), 0)
236 self
.assertFalse("@TEST_EXTRA" in res
[0])
238 def test_modify_fail_of_at_indexlist(self
):
239 m
= {"dn": "@INDEXLIST",
240 "@TEST_NOT_EXTRA": ["1"]
243 msg
= ldb
.Message
.from_dict(self
.samdb
, m
, ldb
.FLAG_MOD_DELETE
)
245 self
.samdb
.modify(msg
)
246 self
.fail("modify of @INDEXLIST with a failed constraint should fail")
247 except LdbError
as err
:
249 self
.assertEqual(enum
, ldb
.ERR_NO_SUCH_ATTRIBUTE
)