ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / smb-notify.py
blob4587a0064201a838f9fad817f335b15df06827ab
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation. Tests for smb notify
3 # Copyright (C) Björn Baumbach <bb@samba.org> 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import sys
19 import os
21 sys.path.insert(0, "bin/python")
22 os.environ["PYTHONUNBUFFERED"] = "1"
24 import samba
25 import random
26 from samba.tests import TestCase
27 from samba import credentials
28 from samba.ntstatus import NT_STATUS_NOTIFY_CLEANUP
29 from samba.samba3 import libsmb_samba_internal as libsmb
30 from samba.samba3 import param as s3param
31 from samba.dcerpc import security
33 from samba import ntacls
35 test_dir = os.path.join('notify_test_%d' % random.randint(0, 0xFFFF))
37 class SMBNotifyTests(TestCase):
38 def setUp(self):
39 super().setUp()
40 self.server = samba.tests.env_get_var_value("SERVER")
42 # create an SMB connection to the server
43 self.lp = s3param.get_context()
44 self.lp.load(samba.tests.env_get_var_value("SMB_CONF_PATH"))
46 self.share = samba.tests.env_get_var_value("NOTIFY_SHARE")
48 creds = credentials.Credentials()
49 creds.guess(self.lp)
50 creds.set_username(samba.tests.env_get_var_value("USERNAME"))
51 creds.set_password(samba.tests.env_get_var_value("PASSWORD"))
53 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True)
54 if strict_checking is None:
55 strict_checking = '1'
56 self.strict_checking = bool(int(strict_checking))
58 self.smb_conn = libsmb.Conn(self.server, self.share, self.lp, creds)
59 self.smb_conn_unpriv = None
61 try:
62 self.smb_conn.deltree(test_dir)
63 except:
64 pass
65 self.smb_conn.mkdir(test_dir)
67 def connect_unpriv(self):
68 creds_unpriv = credentials.Credentials()
69 creds_unpriv.guess(self.lp)
70 creds_unpriv.set_username(samba.tests.env_get_var_value("USERNAME_UNPRIV"))
71 creds_unpriv.set_password(samba.tests.env_get_var_value("PASSWORD_UNPRIV"))
73 self.smb_conn_unpriv = libsmb.Conn(self.server, self.share, self.lp, creds_unpriv)
75 def tearDown(self):
76 super().tearDown()
77 try:
78 self.smb_conn.deltree(test_dir)
79 except:
80 pass
82 def make_path(self, dirpath, filename):
83 return os.path.join(dirpath, filename).replace('/', '\\')
85 def test_notify(self):
86 # setup notification request on the share root
87 root_fnum = self.smb_conn.create(Name="", ShareAccess=1)
88 root_notify = self.smb_conn.notify(fnum=root_fnum,
89 buffer_size=0xffff,
90 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
91 recursive=True)
92 # setup notification request on the test_dir
93 test_dir_fnum = self.smb_conn.create(Name=test_dir, ShareAccess=1)
94 test_dir_notify = self.smb_conn.notify(fnum=test_dir_fnum,
95 buffer_size=0xffff,
96 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
97 recursive=True)
99 # make sure we didn't receive any changes yet.
100 self.smb_conn.echo()
101 changes = root_notify.get_changes(wait=False)
102 self.assertIsNone(changes)
103 changes = test_dir_notify.get_changes(wait=False)
104 self.assertIsNone(changes)
106 # create a test directory
107 dir_name = "dir"
108 dir_path = self.make_path(test_dir, dir_name)
109 self.smb_conn.mkdir(dir_path)
111 # check for 'added' notifications
112 changes = root_notify.get_changes(wait=True)
113 self.assertIsNotNone(changes)
114 self.assertEqual(changes[0]['name'], dir_path)
115 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
116 self.assertEqual(len(changes), 1)
117 changes = test_dir_notify.get_changes(wait=True)
118 self.assertIsNotNone(changes)
119 self.assertEqual(changes[0]['name'], dir_name)
120 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
121 self.assertEqual(len(changes), 1)
123 # readd notification requests
124 root_notify = self.smb_conn.notify(fnum=root_fnum,
125 buffer_size=0xffff,
126 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
127 recursive=True)
128 test_dir_notify = self.smb_conn.notify(fnum=test_dir_fnum,
129 buffer_size=0xffff,
130 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
131 recursive=True)
133 # make sure we didn't receive any changes yet.
134 self.smb_conn.echo()
135 changes = root_notify.get_changes(wait=False)
136 self.assertIsNone(changes)
137 changes = test_dir_notify.get_changes(wait=False)
138 self.assertIsNone(changes)
140 # create subdir and trigger notifications
141 sub_name = "subdir"
142 sub_path_rel = self.make_path(dir_name, sub_name)
143 sub_path_full = self.make_path(dir_path, sub_name)
144 self.smb_conn.mkdir(sub_path_full)
146 # check for 'added' notifications
147 changes = root_notify.get_changes(wait=True)
148 self.assertIsNotNone(changes)
149 self.assertEqual(changes[0]['name'], sub_path_full)
150 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
151 self.assertEqual(len(changes), 1)
152 changes = test_dir_notify.get_changes(wait=True)
153 self.assertIsNotNone(changes)
154 self.assertEqual(changes[0]['name'], sub_path_rel)
155 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
156 self.assertEqual(len(changes), 1)
158 # readd notification requests
159 root_notify = self.smb_conn.notify(fnum=root_fnum,
160 buffer_size=0xffff,
161 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
162 recursive=True)
163 test_dir_notify = self.smb_conn.notify(fnum=test_dir_fnum,
164 buffer_size=0xffff,
165 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
166 recursive=True)
168 # make sure we didn't receive any changes yet.
169 self.smb_conn.echo()
170 changes = root_notify.get_changes(wait=False)
171 self.assertIsNone(changes)
172 changes = test_dir_notify.get_changes(wait=False)
173 self.assertIsNone(changes)
175 # remove test dir and trigger notifications
176 self.smb_conn.rmdir(sub_path_full)
178 # check for 'removed' notifications
179 changes = root_notify.get_changes(wait=True)
180 self.assertIsNotNone(changes)
181 self.assertEqual(changes[0]['name'], sub_path_full)
182 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
183 self.assertEqual(len(changes), 1)
184 changes = test_dir_notify.get_changes(wait=True)
185 self.assertIsNotNone(changes)
186 self.assertEqual(changes[0]['name'], sub_path_rel)
187 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
188 self.assertEqual(len(changes), 1)
190 # readd notification requests
191 root_notify = self.smb_conn.notify(fnum=root_fnum,
192 buffer_size=0xffff,
193 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
194 recursive=True)
195 test_dir_notify = self.smb_conn.notify(fnum=test_dir_fnum,
196 buffer_size=0xffff,
197 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
198 recursive=True)
200 # make sure we didn't receive any changes yet.
201 self.smb_conn.echo()
202 changes = root_notify.get_changes(wait=False)
203 self.assertIsNone(changes)
204 changes = test_dir_notify.get_changes(wait=False)
205 self.assertIsNone(changes)
207 # closing the handle on test_dir will trigger
208 # a NOTIFY_CLEANUP on test_dir_notify and
209 # it also seems to update something on test_dir it self
210 # and post a MODIFIED on root_notify
212 # TODO: find out why windows generates ACTION_MODIFIED
213 # and why Samba doesn't
214 self.smb_conn.close(test_dir_fnum)
215 try:
216 changes = test_dir_notify.get_changes(wait=True)
217 self.fail()
218 except samba.NTSTATUSError as err:
219 self.assertEqual(err.args[0], NT_STATUS_NOTIFY_CLEANUP)
220 self.smb_conn.echo()
221 changes = root_notify.get_changes(wait=False)
222 if self.strict_checking:
223 self.assertIsNotNone(changes)
224 if changes is not None:
225 self.assertEqual(changes[0]['name'], test_dir)
226 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_MODIFIED)
227 self.assertEqual(len(changes), 1)
229 # readd notification request
230 root_notify = self.smb_conn.notify(fnum=root_fnum,
231 buffer_size=0xffff,
232 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
233 recursive=True)
235 # make sure we didn't receive any changes yet.
236 self.smb_conn.echo()
237 changes = root_notify.get_changes(wait=False)
238 self.assertIsNone(changes)
240 # remove test_dir
241 self.smb_conn.rmdir(dir_path)
243 # check for 'removed' notifications
244 changes = root_notify.get_changes(wait=True)
245 self.assertIsNotNone(changes)
246 self.assertEqual(changes[0]['name'], dir_path)
247 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
248 self.assertEqual(len(changes), 1)
250 # readd notification request
251 root_notify = self.smb_conn.notify(fnum=root_fnum,
252 buffer_size=0xffff,
253 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
254 recursive=True)
255 # closing the handle on test_dir will trigger
256 # a NOTIFY_CLEANUP on root_notify
257 self.smb_conn.close(root_fnum)
258 try:
259 changes = root_notify.get_changes(wait=True)
260 self.fail()
261 except samba.NTSTATUSError as err:
262 self.assertEqual(err.args[0], NT_STATUS_NOTIFY_CLEANUP)
265 def _test_notify_privileged_path(self,
266 monitor_path=None,
267 rel_prefix=None):
268 self.connect_unpriv()
270 domain_sid = security.dom_sid() # we just use S-0-0
271 smb_helper = ntacls.SMBHelper(self.smb_conn, domain_sid)
273 private_name = "private"
274 private_rel = self.make_path(rel_prefix, private_name)
275 private_path = self.make_path(test_dir, private_name)
276 # create a private test directory
277 self.smb_conn.mkdir(private_path)
279 # Get the security descriptor and replace it
280 # with a one that only grants access to SYSTEM and the
281 # owner.
282 private_path_sd_old = smb_helper.get_acl(private_path)
283 private_path_sd_new = security.descriptor()
284 private_path_sd_new.type = private_path_sd_old.type
285 private_path_sd_new.revision = private_path_sd_old.revision
286 private_path_sd_new = security.descriptor.from_sddl("G:BAD:(A;;0x%x;;;%s)(A;;0x%x;;;%s)" % (
287 security.SEC_RIGHTS_DIR_ALL,
288 security.SID_NT_SYSTEM,
289 security.SEC_RIGHTS_DIR_ALL,
290 str(private_path_sd_old.owner_sid)),
291 domain_sid)
292 private_path_sd_new.type |= security.SEC_DESC_SELF_RELATIVE
293 private_path_sd_new.type |= security.SEC_DESC_DACL_PROTECTED
294 set_secinfo = security.SECINFO_GROUP | security.SECINFO_DACL | security.SECINFO_PROTECTED_DACL
295 smb_helper.set_acl(private_path, private_path_sd_new, sinfo=set_secinfo)
297 # setup notification request as privileged user
298 monitor_priv_fnum = self.smb_conn.create(Name=monitor_path, ShareAccess=1)
299 notify_priv = self.smb_conn.notify(fnum=monitor_priv_fnum,
300 buffer_size=0xffff,
301 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
302 recursive=True)
304 # setup notification request as unprivileged user
305 monitor_unpriv_fnum = self.smb_conn_unpriv.create(Name=monitor_path, ShareAccess=1)
306 notify_unpriv = self.smb_conn_unpriv.notify(fnum=monitor_unpriv_fnum,
307 buffer_size=0xffff,
308 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
309 recursive=True)
311 # make sure we didn't receive any changes yet.
312 self.smb_conn.echo()
313 changes = notify_priv.get_changes(wait=False)
314 self.assertIsNone(changes)
315 self.smb_conn_unpriv.echo()
316 changes = notify_unpriv.get_changes(wait=False)
317 self.assertIsNone(changes)
319 # trigger notification in the private dir
320 new_name = 'test-new'
321 new_rel = self.make_path(private_rel, new_name)
322 new_path = self.make_path(private_path, new_name)
323 self.smb_conn.mkdir(new_path)
325 # check that only the privileged user received the changes
326 changes = notify_priv.get_changes(wait=True)
327 self.assertIsNotNone(changes)
328 self.assertEqual(changes[0]['name'], new_rel)
329 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_ADDED)
330 self.assertEqual(len(changes), 1)
331 notify_priv = self.smb_conn.notify(fnum=monitor_priv_fnum,
332 buffer_size=0xffff,
333 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
334 recursive=True)
336 # check that the unprivileged user does not receives the changes
337 self.smb_conn_unpriv.echo()
338 changes = notify_unpriv.get_changes(wait=False)
339 self.assertIsNone(changes)
340 # and there's no additional change for the privileged user
341 self.smb_conn.echo()
342 changes = notify_priv.get_changes(wait=False)
343 self.assertIsNone(changes)
345 # trigger notification in the private dir
346 self.smb_conn.rmdir(new_path)
348 # check that only the privileged user received the changes
349 changes = notify_priv.get_changes(wait=True)
350 self.assertIsNotNone(changes)
351 self.assertEqual(changes[0]['name'], new_rel)
352 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
353 self.assertEqual(len(changes), 1)
354 notify_priv = self.smb_conn.notify(fnum=monitor_priv_fnum,
355 buffer_size=0xffff,
356 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
357 recursive=True)
359 # check that the unprivileged user does not receives the changes
360 self.smb_conn_unpriv.echo()
361 changes = notify_unpriv.get_changes(wait=False)
362 self.assertIsNone(changes)
363 # and there's no additional change for the privileged user
364 self.smb_conn.echo()
365 changes = notify_priv.get_changes(wait=False)
366 self.assertIsNone(changes)
368 # trigger notification for both
369 self.smb_conn.rmdir(private_path)
371 # check that both get the notification
372 changes = notify_unpriv.get_changes(wait=True)
373 self.assertIsNotNone(changes)
374 self.assertEqual(changes[0]['name'], private_rel)
375 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
376 self.assertEqual(len(changes), 1)
377 notify_unpriv = self.smb_conn_unpriv.notify(fnum=monitor_unpriv_fnum,
378 buffer_size=0xffff,
379 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
380 recursive=True)
381 changes = notify_priv.get_changes(wait=True)
382 self.assertIsNotNone(changes)
383 self.assertEqual(changes[0]['name'], private_rel)
384 self.assertEqual(changes[0]['action'], libsmb.NOTIFY_ACTION_REMOVED)
385 self.assertEqual(len(changes), 1)
386 notify_priv = self.smb_conn.notify(fnum=monitor_priv_fnum,
387 buffer_size=0xffff,
388 completion_filter=libsmb.FILE_NOTIFY_CHANGE_ALL,
389 recursive=True)
391 # check that the unprivileged user does not receives the changes
392 self.smb_conn_unpriv.echo()
393 changes = notify_unpriv.get_changes(wait=False)
394 self.assertIsNone(changes)
395 # and there's no additional change for the privileged user
396 self.smb_conn.echo()
397 changes = notify_priv.get_changes(wait=False)
398 self.assertIsNone(changes)
400 # closing the handle on will trigger a NOTIFY_CLEANUP
401 self.smb_conn_unpriv.close(monitor_unpriv_fnum)
402 try:
403 changes = notify_unpriv.get_changes(wait=True)
404 self.fail()
405 except samba.NTSTATUSError as err:
406 self.assertEqual(err.args[0], NT_STATUS_NOTIFY_CLEANUP)
408 # there's no additional change for the privileged user
409 self.smb_conn.echo()
410 changes = notify_priv.get_changes(wait=False)
411 self.assertIsNone(changes)
413 # closing the handle on will trigger a NOTIFY_CLEANUP
414 self.smb_conn.close(monitor_priv_fnum)
415 try:
416 changes = notify_priv.get_changes(wait=True)
417 self.fail()
418 except samba.NTSTATUSError as err:
419 self.assertEqual(err.args[0], NT_STATUS_NOTIFY_CLEANUP)
421 def test_notify_privileged_test(self):
422 return self._test_notify_privileged_path(monitor_path=test_dir, rel_prefix="")
424 def test_notify_privileged_root(self):
425 return self._test_notify_privileged_path(monitor_path="", rel_prefix=test_dir)
427 if __name__ == "__main__":
428 import unittest
429 unittest.main()