2 # Unix SMB/CIFS implementation. Tests for smb notify
3 # Copyright (C) Björn Baumbach <bb@samba.org> 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 sys
.path
.insert(0, "bin/python")
22 os
.environ
["PYTHONUNBUFFERED"] = "1"
26 from samba
.tests
import TestCase
27 from samba
import credentials
28 from samba
.ntstatus
import NT_STATUS_NOTIFY_CLEANUP
29 from samba
.samba3
import libsmb_samba_internal
as libsmb
30 from samba
.samba3
import param
as s3param
31 from samba
.dcerpc
import security
33 from samba
import ntacls
35 test_dir
= os
.path
.join('notify_test_%d' % random
.randint(0, 0xFFFF))
37 class SMBNotifyTests(TestCase
):
40 self
.server
= samba
.tests
.env_get_var_value("SERVER")
42 # create an SMB connection to the server
43 self
.lp
= s3param
.get_context()
44 self
.lp
.load(samba
.tests
.env_get_var_value("SMB_CONF_PATH"))
46 self
.share
= samba
.tests
.env_get_var_value("NOTIFY_SHARE")
48 creds
= credentials
.Credentials()
50 creds
.set_username(samba
.tests
.env_get_var_value("USERNAME"))
51 creds
.set_password(samba
.tests
.env_get_var_value("PASSWORD"))
53 strict_checking
= samba
.tests
.env_get_var_value('STRICT_CHECKING', allow_missing
=True)
54 if strict_checking
is None:
56 self
.strict_checking
= bool(int(strict_checking
))
58 self
.smb_conn
= libsmb
.Conn(self
.server
, self
.share
, self
.lp
, creds
)
59 self
.smb_conn_unpriv
= None
62 self
.smb_conn
.deltree(test_dir
)
65 self
.smb_conn
.mkdir(test_dir
)
67 def connect_unpriv(self
):
68 creds_unpriv
= credentials
.Credentials()
69 creds_unpriv
.guess(self
.lp
)
70 creds_unpriv
.set_username(samba
.tests
.env_get_var_value("USERNAME_UNPRIV"))
71 creds_unpriv
.set_password(samba
.tests
.env_get_var_value("PASSWORD_UNPRIV"))
73 self
.smb_conn_unpriv
= libsmb
.Conn(self
.server
, self
.share
, self
.lp
, creds_unpriv
)
78 self
.smb_conn
.deltree(test_dir
)
82 def make_path(self
, dirpath
, filename
):
83 return os
.path
.join(dirpath
, filename
).replace('/', '\\')
85 def test_notify(self
):
86 # setup notification request on the share root
87 root_fnum
= self
.smb_conn
.create(Name
="", ShareAccess
=1)
88 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
90 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
92 # setup notification request on the test_dir
93 test_dir_fnum
= self
.smb_conn
.create(Name
=test_dir
, ShareAccess
=1)
94 test_dir_notify
= self
.smb_conn
.notify(fnum
=test_dir_fnum
,
96 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
99 # make sure we didn't receive any changes yet.
101 changes
= root_notify
.get_changes(wait
=False)
102 self
.assertIsNone(changes
)
103 changes
= test_dir_notify
.get_changes(wait
=False)
104 self
.assertIsNone(changes
)
106 # create a test directory
108 dir_path
= self
.make_path(test_dir
, dir_name
)
109 self
.smb_conn
.mkdir(dir_path
)
111 # check for 'added' notifications
112 changes
= root_notify
.get_changes(wait
=True)
113 self
.assertIsNotNone(changes
)
114 self
.assertEqual(changes
[0]['name'], dir_path
)
115 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
116 self
.assertEqual(len(changes
), 1)
117 changes
= test_dir_notify
.get_changes(wait
=True)
118 self
.assertIsNotNone(changes
)
119 self
.assertEqual(changes
[0]['name'], dir_name
)
120 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
121 self
.assertEqual(len(changes
), 1)
123 # readd notification requests
124 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
126 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
128 test_dir_notify
= self
.smb_conn
.notify(fnum
=test_dir_fnum
,
130 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
133 # make sure we didn't receive any changes yet.
135 changes
= root_notify
.get_changes(wait
=False)
136 self
.assertIsNone(changes
)
137 changes
= test_dir_notify
.get_changes(wait
=False)
138 self
.assertIsNone(changes
)
140 # create subdir and trigger notifications
142 sub_path_rel
= self
.make_path(dir_name
, sub_name
)
143 sub_path_full
= self
.make_path(dir_path
, sub_name
)
144 self
.smb_conn
.mkdir(sub_path_full
)
146 # check for 'added' notifications
147 changes
= root_notify
.get_changes(wait
=True)
148 self
.assertIsNotNone(changes
)
149 self
.assertEqual(changes
[0]['name'], sub_path_full
)
150 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
151 self
.assertEqual(len(changes
), 1)
152 changes
= test_dir_notify
.get_changes(wait
=True)
153 self
.assertIsNotNone(changes
)
154 self
.assertEqual(changes
[0]['name'], sub_path_rel
)
155 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
156 self
.assertEqual(len(changes
), 1)
158 # readd notification requests
159 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
161 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
163 test_dir_notify
= self
.smb_conn
.notify(fnum
=test_dir_fnum
,
165 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
168 # make sure we didn't receive any changes yet.
170 changes
= root_notify
.get_changes(wait
=False)
171 self
.assertIsNone(changes
)
172 changes
= test_dir_notify
.get_changes(wait
=False)
173 self
.assertIsNone(changes
)
175 # remove test dir and trigger notifications
176 self
.smb_conn
.rmdir(sub_path_full
)
178 # check for 'removed' notifications
179 changes
= root_notify
.get_changes(wait
=True)
180 self
.assertIsNotNone(changes
)
181 self
.assertEqual(changes
[0]['name'], sub_path_full
)
182 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
183 self
.assertEqual(len(changes
), 1)
184 changes
= test_dir_notify
.get_changes(wait
=True)
185 self
.assertIsNotNone(changes
)
186 self
.assertEqual(changes
[0]['name'], sub_path_rel
)
187 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
188 self
.assertEqual(len(changes
), 1)
190 # readd notification requests
191 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
193 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
195 test_dir_notify
= self
.smb_conn
.notify(fnum
=test_dir_fnum
,
197 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
200 # make sure we didn't receive any changes yet.
202 changes
= root_notify
.get_changes(wait
=False)
203 self
.assertIsNone(changes
)
204 changes
= test_dir_notify
.get_changes(wait
=False)
205 self
.assertIsNone(changes
)
207 # closing the handle on test_dir will trigger
208 # a NOTIFY_CLEANUP on test_dir_notify and
209 # it also seems to update something on test_dir it self
210 # and post a MODIFIED on root_notify
212 # TODO: find out why windows generates ACTION_MODIFIED
213 # and why Samba doesn't
214 self
.smb_conn
.close(test_dir_fnum
)
216 changes
= test_dir_notify
.get_changes(wait
=True)
218 except samba
.NTSTATUSError
as err
:
219 self
.assertEqual(err
.args
[0], NT_STATUS_NOTIFY_CLEANUP
)
221 changes
= root_notify
.get_changes(wait
=False)
222 if self
.strict_checking
:
223 self
.assertIsNotNone(changes
)
224 if changes
is not None:
225 self
.assertEqual(changes
[0]['name'], test_dir
)
226 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_MODIFIED
)
227 self
.assertEqual(len(changes
), 1)
229 # readd notification request
230 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
232 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
235 # make sure we didn't receive any changes yet.
237 changes
= root_notify
.get_changes(wait
=False)
238 self
.assertIsNone(changes
)
241 self
.smb_conn
.rmdir(dir_path
)
243 # check for 'removed' notifications
244 changes
= root_notify
.get_changes(wait
=True)
245 self
.assertIsNotNone(changes
)
246 self
.assertEqual(changes
[0]['name'], dir_path
)
247 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
248 self
.assertEqual(len(changes
), 1)
250 # readd notification request
251 root_notify
= self
.smb_conn
.notify(fnum
=root_fnum
,
253 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
255 # closing the handle on test_dir will trigger
256 # a NOTIFY_CLEANUP on root_notify
257 self
.smb_conn
.close(root_fnum
)
259 changes
= root_notify
.get_changes(wait
=True)
261 except samba
.NTSTATUSError
as err
:
262 self
.assertEqual(err
.args
[0], NT_STATUS_NOTIFY_CLEANUP
)
265 def _test_notify_privileged_path(self
,
268 self
.connect_unpriv()
270 domain_sid
= security
.dom_sid() # we just use S-0-0
271 smb_helper
= ntacls
.SMBHelper(self
.smb_conn
, domain_sid
)
273 private_name
= "private"
274 private_rel
= self
.make_path(rel_prefix
, private_name
)
275 private_path
= self
.make_path(test_dir
, private_name
)
276 # create a private test directory
277 self
.smb_conn
.mkdir(private_path
)
279 # Get the security descriptor and replace it
280 # with a one that only grants access to SYSTEM and the
282 private_path_sd_old
= smb_helper
.get_acl(private_path
)
283 private_path_sd_new
= security
.descriptor()
284 private_path_sd_new
.type = private_path_sd_old
.type
285 private_path_sd_new
.revision
= private_path_sd_old
.revision
286 private_path_sd_new
= security
.descriptor
.from_sddl("G:BAD:(A;;0x%x;;;%s)(A;;0x%x;;;%s)" % (
287 security
.SEC_RIGHTS_DIR_ALL
,
288 security
.SID_NT_SYSTEM
,
289 security
.SEC_RIGHTS_DIR_ALL
,
290 str(private_path_sd_old
.owner_sid
)),
292 private_path_sd_new
.type |
= security
.SEC_DESC_SELF_RELATIVE
293 private_path_sd_new
.type |
= security
.SEC_DESC_DACL_PROTECTED
294 set_secinfo
= security
.SECINFO_GROUP | security
.SECINFO_DACL | security
.SECINFO_PROTECTED_DACL
295 smb_helper
.set_acl(private_path
, private_path_sd_new
, sinfo
=set_secinfo
)
297 # setup notification request as privileged user
298 monitor_priv_fnum
= self
.smb_conn
.create(Name
=monitor_path
, ShareAccess
=1)
299 notify_priv
= self
.smb_conn
.notify(fnum
=monitor_priv_fnum
,
301 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
304 # setup notification request as unprivileged user
305 monitor_unpriv_fnum
= self
.smb_conn_unpriv
.create(Name
=monitor_path
, ShareAccess
=1)
306 notify_unpriv
= self
.smb_conn_unpriv
.notify(fnum
=monitor_unpriv_fnum
,
308 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
311 # make sure we didn't receive any changes yet.
313 changes
= notify_priv
.get_changes(wait
=False)
314 self
.assertIsNone(changes
)
315 self
.smb_conn_unpriv
.echo()
316 changes
= notify_unpriv
.get_changes(wait
=False)
317 self
.assertIsNone(changes
)
319 # trigger notification in the private dir
320 new_name
= 'test-new'
321 new_rel
= self
.make_path(private_rel
, new_name
)
322 new_path
= self
.make_path(private_path
, new_name
)
323 self
.smb_conn
.mkdir(new_path
)
325 # check that only the privileged user received the changes
326 changes
= notify_priv
.get_changes(wait
=True)
327 self
.assertIsNotNone(changes
)
328 self
.assertEqual(changes
[0]['name'], new_rel
)
329 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_ADDED
)
330 self
.assertEqual(len(changes
), 1)
331 notify_priv
= self
.smb_conn
.notify(fnum
=monitor_priv_fnum
,
333 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
336 # check that the unprivileged user does not receives the changes
337 self
.smb_conn_unpriv
.echo()
338 changes
= notify_unpriv
.get_changes(wait
=False)
339 self
.assertIsNone(changes
)
340 # and there's no additional change for the privileged user
342 changes
= notify_priv
.get_changes(wait
=False)
343 self
.assertIsNone(changes
)
345 # trigger notification in the private dir
346 self
.smb_conn
.rmdir(new_path
)
348 # check that only the privileged user received the changes
349 changes
= notify_priv
.get_changes(wait
=True)
350 self
.assertIsNotNone(changes
)
351 self
.assertEqual(changes
[0]['name'], new_rel
)
352 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
353 self
.assertEqual(len(changes
), 1)
354 notify_priv
= self
.smb_conn
.notify(fnum
=monitor_priv_fnum
,
356 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
359 # check that the unprivileged user does not receives the changes
360 self
.smb_conn_unpriv
.echo()
361 changes
= notify_unpriv
.get_changes(wait
=False)
362 self
.assertIsNone(changes
)
363 # and there's no additional change for the privileged user
365 changes
= notify_priv
.get_changes(wait
=False)
366 self
.assertIsNone(changes
)
368 # trigger notification for both
369 self
.smb_conn
.rmdir(private_path
)
371 # check that both get the notification
372 changes
= notify_unpriv
.get_changes(wait
=True)
373 self
.assertIsNotNone(changes
)
374 self
.assertEqual(changes
[0]['name'], private_rel
)
375 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
376 self
.assertEqual(len(changes
), 1)
377 notify_unpriv
= self
.smb_conn_unpriv
.notify(fnum
=monitor_unpriv_fnum
,
379 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
381 changes
= notify_priv
.get_changes(wait
=True)
382 self
.assertIsNotNone(changes
)
383 self
.assertEqual(changes
[0]['name'], private_rel
)
384 self
.assertEqual(changes
[0]['action'], libsmb
.NOTIFY_ACTION_REMOVED
)
385 self
.assertEqual(len(changes
), 1)
386 notify_priv
= self
.smb_conn
.notify(fnum
=monitor_priv_fnum
,
388 completion_filter
=libsmb
.FILE_NOTIFY_CHANGE_ALL
,
391 # check that the unprivileged user does not receives the changes
392 self
.smb_conn_unpriv
.echo()
393 changes
= notify_unpriv
.get_changes(wait
=False)
394 self
.assertIsNone(changes
)
395 # and there's no additional change for the privileged user
397 changes
= notify_priv
.get_changes(wait
=False)
398 self
.assertIsNone(changes
)
400 # closing the handle on will trigger a NOTIFY_CLEANUP
401 self
.smb_conn_unpriv
.close(monitor_unpriv_fnum
)
403 changes
= notify_unpriv
.get_changes(wait
=True)
405 except samba
.NTSTATUSError
as err
:
406 self
.assertEqual(err
.args
[0], NT_STATUS_NOTIFY_CLEANUP
)
408 # there's no additional change for the privileged user
410 changes
= notify_priv
.get_changes(wait
=False)
411 self
.assertIsNone(changes
)
413 # closing the handle on will trigger a NOTIFY_CLEANUP
414 self
.smb_conn
.close(monitor_priv_fnum
)
416 changes
= notify_priv
.get_changes(wait
=True)
418 except samba
.NTSTATUSError
as err
:
419 self
.assertEqual(err
.args
[0], NT_STATUS_NOTIFY_CLEANUP
)
421 def test_notify_privileged_test(self
):
422 return self
._test
_notify
_privileged
_path
(monitor_path
=test_dir
, rel_prefix
="")
424 def test_notify_privileged_root(self
):
425 return self
._test
_notify
_privileged
_path
(monitor_path
="", rel_prefix
=test_dir
)
427 if __name__
== "__main__":