drsuapi.idl: fix source_dsa spelling
[samba4-gss.git] / python / samba / tests / reparsepoints.py
bloba55686160213a43ccdf9731477b2359f670cfb48
1 # Unix SMB/CIFS implementation.
2 # Copyright Volker Lendecke <vl@samba.org> 2022
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.samba3 import libsmb_samba_internal as libsmb
19 from samba import (ntstatus,NTSTATUSError)
20 from samba.dcerpc import security as sec
21 from samba import reparse_symlink
22 import samba.tests.libsmb
23 import stat
25 def posix_context(mode):
26 return (libsmb.SMB2_CREATE_TAG_POSIX, mode.to_bytes(4, 'little'))
28 class ReparsePoints(samba.tests.libsmb.LibsmbTests):
30 def connection(self, posix=False):
31 share = samba.tests.env_get_var_value("SHARENAME", allow_missing=True)
32 if not share:
33 share = "tmp"
34 smb1 = samba.tests.env_get_var_value("SMB1", allow_missing=True)
35 conn = libsmb.Conn(
36 self.server_ip,
37 share,
38 self.lp,
39 self.creds,
40 posix=posix,
41 force_smb1=smb1)
42 return conn
44 def connection_posix(self):
45 share = samba.tests.env_get_var_value("SHARENAME", allow_missing=True)
46 if not share:
47 share = "posix_share"
48 conn = libsmb.Conn(
49 self.server_ip,
50 share,
51 self.lp,
52 self.creds,
53 force_smb1=True)
54 conn.smb1_posix()
55 return conn
57 def clean_file(self, conn, filename):
58 try:
59 conn.unlink(filename)
60 except NTSTATUSError as e:
61 err = e.args[0]
62 ok = (err == ntstatus.NT_STATUS_OBJECT_NAME_NOT_FOUND)
63 ok |= (err == ntstatus.NT_STATUS_OBJECT_PATH_NOT_FOUND)
64 ok |= (err == ntstatus.NT_STATUS_IO_REPARSE_TAG_NOT_HANDLED)
65 if not ok:
66 raise
68 def test_error_not_a_reparse_point(self):
69 conn = self.connection()
70 filename = 'reparse'
71 self.clean_file(conn, filename)
73 fd = conn.create(
74 filename,
75 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
76 CreateDisposition=libsmb.FILE_CREATE)
78 with self.assertRaises(NTSTATUSError) as e:
79 conn.fsctl(fd, libsmb.FSCTL_GET_REPARSE_POINT, b'', 1024)
81 self.assertEqual(e.exception.args[0],
82 ntstatus.NT_STATUS_NOT_A_REPARSE_POINT)
84 conn.close(fd)
86 self.clean_file(conn, filename)
88 def test_create_reparse(self):
89 conn = self.connection()
90 filename = 'reparse'
91 self.clean_file(conn, filename)
93 fd = conn.create(
94 filename,
95 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE | sec.SEC_STD_DELETE,
96 CreateDisposition=libsmb.FILE_CREATE)
98 conn.delete_on_close(fd, 1)
100 with self.assertRaises(NTSTATUSError) as e:
101 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b'', 0)
103 self.assertEqual(e.exception.args[0],
104 ntstatus.NT_STATUS_INVALID_BUFFER_SIZE)
106 for i in range(1,15):
107 with self.assertRaises(NTSTATUSError) as e:
108 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, i * b'0', 0)
110 self.assertEqual(e.exception.args[0],
111 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
113 # Create a syntactically valid [MS-FSCC] 2.1.2.2 REPARSE_DATA_BUFFER
114 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
116 # Show that SET_REPARSE_POINT does exact length checks
118 with self.assertRaises(NTSTATUSError) as e:
119 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b + b'0', 0)
120 self.assertEqual(e.exception.args[0],
121 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
123 with self.assertRaises(NTSTATUSError) as e:
124 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b[:-1], 0)
125 self.assertEqual(e.exception.args[0],
126 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
128 # Exact length works
129 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
131 b = reparse_symlink.put(0x80000026, 0, b'asdf')
133 # We can't overwrite an existing reparse point with a different tag
134 with self.assertRaises(NTSTATUSError) as e:
135 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
136 self.assertEqual(e.exception.args[0],
137 ntstatus.NT_STATUS_IO_REPARSE_TAG_MISMATCH)
139 def test_query_reparse_tag(self):
140 conn = self.connection()
141 filename = 'reparse'
142 self.clean_file(conn, filename)
144 fd = conn.create(
145 filename,
146 DesiredAccess=sec.SEC_FILE_READ_ATTRIBUTE |
147 sec.SEC_FILE_WRITE_ATTRIBUTE |
148 sec.SEC_STD_DELETE,
149 CreateDisposition=libsmb.FILE_CREATE)
151 conn.delete_on_close(fd, 1)
153 info = conn.qfileinfo(fd, libsmb.FSCC_FILE_ATTRIBUTE_TAG_INFORMATION);
154 self.assertEqual(info['tag'], 0)
156 b = reparse_symlink.put(0x80000026, 0, b'asdf')
157 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
159 info = conn.qfileinfo(fd, libsmb.FSCC_FILE_ATTRIBUTE_TAG_INFORMATION);
160 self.assertEqual(info['tag'], 0x80000026)
163 # Show that we can write to a reparse point when opened properly
164 def test_write_reparse(self):
165 conn = self.connection()
166 filename = 'reparse'
167 self.clean_file(conn, filename)
169 fd = conn.create(
170 filename,
171 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
172 CreateDisposition=libsmb.FILE_CREATE)
173 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
174 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
175 conn.close(fd)
177 fd,cr,_ = conn.create_ex(
178 filename,
179 DesiredAccess=sec.SEC_FILE_WRITE_DATA|sec.SEC_STD_DELETE,
180 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT,
181 CreateDisposition=libsmb.FILE_OPEN)
182 self.assertEqual(
183 cr['file_attributes'] & libsmb.FILE_ATTRIBUTE_REPARSE_POINT,
184 libsmb.FILE_ATTRIBUTE_REPARSE_POINT)
186 conn.write(fd, b'x', 1)
188 conn.delete_on_close(fd, 1)
189 conn.close(fd)
191 def test_query_dir_reparse(self):
192 conn = self.connection()
193 filename = 'reparse'
194 self.clean_file(conn, filename)
196 fd = conn.create(
197 filename,
198 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
199 CreateDisposition=libsmb.FILE_CREATE)
200 b = reparse_symlink.symlink_put("y", "y", 0, 0)
201 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
202 conn.close(fd)
204 dirents = conn.list("", filename)
205 self.assertEqual(
206 dirents[0]["attrib"],
207 libsmb.FILE_ATTRIBUTE_REPARSE_POINT|
208 libsmb.FILE_ATTRIBUTE_ARCHIVE)
209 self.assertEqual(
210 dirents[0]["reparse_tag"],
211 libsmb.IO_REPARSE_TAG_SYMLINK)
213 self.clean_file(conn, filename)
215 # Show that directories can carry reparse points
217 def test_create_reparse_directory(self):
218 conn = self.connection()
219 dirname = "reparse_dir"
220 filename = f'{dirname}\\file.txt'
222 self.clean_file(conn, filename)
223 self.clean_file(conn, dirname)
225 dir_fd = conn.create(
226 dirname,
227 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|
228 sec.SEC_STD_DELETE,
229 CreateDisposition=libsmb.FILE_CREATE,
230 CreateOptions=libsmb.FILE_DIRECTORY_FILE)
232 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
234 try:
235 conn.fsctl(dir_fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
236 except NTSTATUSError as e:
237 err = e.args[0]
238 if (err != ntstatus.NT_STATUS_ACCESS_DENIED):
239 raise
240 finally:
241 conn.close(dir_fd)
242 self.clean_file(conn, dirname)
244 if (err == ntstatus.NT_STATUS_ACCESS_DENIED):
245 self.fail("Could not set reparse point on directory")
246 conn.close(dir_fd)
247 self.clean_file(conn, dirname)
248 return
250 with self.assertRaises(NTSTATUSError) as e:
251 fd = conn.create(
252 filename,
253 DesiredAccess=sec.SEC_STD_DELETE,
254 CreateDisposition=libsmb.FILE_CREATE)
256 self.assertEqual(e.exception.args[0],
257 ntstatus.NT_STATUS_IO_REPARSE_TAG_NOT_HANDLED)
259 conn.delete_on_close(dir_fd, 1)
260 conn.close(dir_fd)
262 dirents = conn.list("", dirname)
263 self.assertEqual(
264 dirents[0]["attrib"],
265 libsmb.FILE_ATTRIBUTE_REPARSE_POINT|
266 libsmb.FILE_ATTRIBUTE_DIRECTORY)
267 self.assertEqual(dirents[0]["reparse_tag"], 0x80000025)
269 self.clean_file(conn, dirname)
271 # Only empty directories can carry reparse points
273 def test_create_reparse_nonempty_directory(self):
274 conn = self.connection()
275 dirname = "reparse_dir"
276 filename = f'{dirname}\\file.txt'
278 self.clean_file(conn, filename)
279 self.clean_file(conn, dirname)
281 dir_fd = conn.create(
282 dirname,
283 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|
284 sec.SEC_STD_DELETE,
285 CreateDisposition=libsmb.FILE_CREATE,
286 CreateOptions=libsmb.FILE_DIRECTORY_FILE)
287 fd = conn.create(
288 filename,
289 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|
290 sec.SEC_STD_DELETE,
291 CreateDisposition=libsmb.FILE_CREATE)
294 b = reparse_symlink.put(0x80000025, 0, b'asdf')
295 try:
296 conn.fsctl(dir_fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
297 except NTSTATUSError as e:
298 err = e.args[0]
300 conn.delete_on_close(fd, 1)
301 conn.close(fd)
302 conn.delete_on_close(dir_fd, 1)
303 conn.close(dir_fd)
305 ok = (err == ntstatus.NT_STATUS_DIRECTORY_NOT_EMPTY)
306 if not ok:
307 self.fail(f'set_reparse on nonempty directory returned {err}')
309 # Show that reparse point opens respect share modes
311 def test_reparse_share_modes(self):
312 conn = self.connection()
313 filename = 'reparse'
314 self.clean_file(conn, filename)
316 fd = conn.create(
317 filename,
318 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
319 CreateDisposition=libsmb.FILE_CREATE)
320 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
321 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
322 conn.close(fd)
324 fd1 = conn.create(
325 filename,
326 DesiredAccess=sec.SEC_FILE_READ_DATA|sec.SEC_STD_DELETE,
327 CreateDisposition=libsmb.FILE_OPEN,
328 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT)
330 with self.assertRaises(NTSTATUSError) as e:
331 fd2 = conn.create(
332 filename,
333 DesiredAccess=sec.SEC_FILE_READ_DATA,
334 CreateDisposition=libsmb.FILE_OPEN,
335 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT)
337 self.assertEqual(e.exception.args[0],
338 ntstatus.NT_STATUS_SHARING_VIOLATION)
340 conn.delete_on_close(fd1, 1)
341 conn.close(fd1)
343 def test_delete_reparse_point(self):
344 conn = self.connection()
345 filename = 'reparse'
346 self.clean_file(conn, filename)
348 fd = conn.create(
349 filename,
350 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
351 CreateDisposition=libsmb.FILE_CREATE)
352 b = reparse_symlink.put(0x80000025, 0, b'asdfasdfasdfasdfasdfasdf')
353 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
354 conn.close(fd)
356 (fd,cr,_) = conn.create_ex(
357 filename,
358 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|sec.SEC_STD_DELETE,
359 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT,
360 CreateDisposition=libsmb.FILE_OPEN)
362 self.assertEqual(cr['file_attributes'] &
363 libsmb.FILE_ATTRIBUTE_REPARSE_POINT,
364 libsmb.FILE_ATTRIBUTE_REPARSE_POINT)
366 b = reparse_symlink.put(0x80000026, 0, b'')
367 with self.assertRaises(NTSTATUSError) as e:
368 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
369 self.assertEqual(e.exception.args[0],
370 ntstatus.NT_STATUS_IO_REPARSE_TAG_MISMATCH)
372 b = reparse_symlink.put(0x80000026, 0, b' ')
373 with self.assertRaises(NTSTATUSError) as e:
374 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
375 self.assertEqual(e.exception.args[0],
376 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
378 b = reparse_symlink.put(0x80000025, 0, b' ')
379 with self.assertRaises(NTSTATUSError) as e:
380 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
381 self.assertEqual(e.exception.args[0],
382 ntstatus.NT_STATUS_IO_REPARSE_DATA_INVALID)
384 b = reparse_symlink.put(0x80000025, 0, b'')
385 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
387 with self.assertRaises(NTSTATUSError) as e:
388 conn.fsctl(fd, libsmb.FSCTL_DELETE_REPARSE_POINT, b, 0)
389 self.assertEqual(e.exception.args[0],
390 ntstatus.NT_STATUS_NOT_A_REPARSE_POINT)
392 conn.close(fd)
394 (fd,cr,_) = conn.create_ex(
395 filename,
396 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE|sec.SEC_STD_DELETE,
397 CreateDisposition=libsmb.FILE_OPEN)
399 self.assertEqual(cr['file_attributes'] &
400 libsmb.FILE_ATTRIBUTE_REPARSE_POINT,
403 conn.delete_on_close(fd, 1)
404 conn.close(fd)
406 def do_test_nfs_reparse(self, filename, filetype, nfstype):
407 """Test special file reparse tag"""
408 smb2 = self.connection(posix=True)
409 smb1 = self.connection_posix()
411 self.clean_file(smb2, filename)
412 smb1.mknod(filename, filetype | 0o755)
414 fd,_,_ = smb2.create_ex(
415 filename,
416 DesiredAccess=sec.SEC_FILE_READ_ATTRIBUTE|sec.SEC_STD_DELETE,
417 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT,
418 CreateDisposition=libsmb.FILE_OPEN,
419 ShareAccess=(libsmb.FILE_SHARE_READ|libsmb.FILE_SHARE_WRITE|libsmb.FILE_SHARE_DELETE),
420 CreateContexts=[posix_context(0o600)])
421 smb2.delete_on_close(fd, 1)
423 info = smb2.qfileinfo(fd, libsmb.FSCC_FILE_ATTRIBUTE_TAG_INFORMATION);
424 self.assertEqual(info['tag'], libsmb.IO_REPARSE_TAG_NFS)
426 info = smb2.qfileinfo(fd, libsmb.FSCC_FILE_POSIX_INFORMATION);
427 self.assertEqual(info['reparse_tag'], libsmb.IO_REPARSE_TAG_NFS)
429 type, perms = self.wire_mode_to_unix(info['perms'])
430 self.assertEqual(type, filetype)
432 reparse = smb2.fsctl(fd, libsmb.FSCTL_GET_REPARSE_POINT, b'', 1024)
433 (tag, ) = reparse_symlink.get(reparse)
434 self.assertEqual(tag, nfstype)
436 def test_fifo_reparse(self):
437 """Test FIFO reparse tag"""
438 self.do_test_nfs_reparse('fifo', stat.S_IFIFO, 'NFS_SPECFILE_FIFO')
440 def test_sock_reparse(self):
441 """Test SOCK reparse tag"""
442 self.do_test_nfs_reparse('sock', stat.S_IFSOCK, 'NFS_SPECFILE_SOCK')
444 def test_reparsepoint_posix_type(self):
445 conn = self.connection(posix=True)
446 filename = 'reparse'
447 self.clean_file(conn, filename)
449 fd,_,_ = conn.create_ex(
450 filename,
451 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
452 CreateDisposition=libsmb.FILE_CREATE,
453 CreateContexts=[posix_context(0o600)])
454 b = reparse_symlink.symlink_put("y", "y", 0, 0)
455 conn.fsctl(fd, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
456 conn.close(fd)
458 dirents = conn.list("", filename,info_level=libsmb.SMB2_FIND_POSIX_INFORMATION)
459 self.assertEqual(
460 dirents[0]["attrib"],
461 libsmb.FILE_ATTRIBUTE_REPARSE_POINT|
462 libsmb.FILE_ATTRIBUTE_ARCHIVE)
463 self.assertEqual(
464 dirents[0]["reparse_tag"],
465 libsmb.IO_REPARSE_TAG_SYMLINK)
467 type, perms = self.wire_mode_to_unix(dirents[0]['perms'])
468 self.assertEqual(type, stat.S_IFLNK)
470 self.clean_file(conn, filename)
472 if __name__ == '__main__':
473 import unittest
474 unittest.main()