ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / smb2symlink.py
blobf9cd4a23cd67b6725c74cb73b890e11f9f22de45
1 # Unix SMB/CIFS implementation.
2 # Copyright Volker Lendecke <vl@samba.org> 2022
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.samba3 import libsmb_samba_internal as libsmb
19 from samba import reparse_symlink
20 from samba import (ntstatus,NTSTATUSError)
21 from samba.dcerpc import security as sec
22 import samba.tests.libsmb
24 class Smb2SymlinkTests(samba.tests.libsmb.LibsmbTests):
26 def connections(self, smb1share=None, smb2share=None):
27 if not smb1share:
28 smb1share = samba.tests.env_get_var_value(
29 "SMB1_SHARE", allow_missing=True)
30 if not smb1share:
31 smb1share = "nosymlinks_smb1allow"
33 try:
34 smb1 = libsmb.Conn(
35 self.server_ip,
36 smb1share,
37 self.lp,
38 self.creds,
39 force_smb1=True)
40 except NTSTATUSError as e:
41 if e.args[0] != ntstatus.NT_STATUS_CONNECTION_RESET:
42 raise
43 smb1.smb1_posix()
45 if not smb2share:
46 smb2share = samba.tests.env_get_var_value(
47 "SMB2_SHARE", allow_missing=True)
48 if not smb2share:
49 smb2share = "nosymlinks"
51 smb2 = libsmb.Conn(
52 self.server_ip,
53 smb2share,
54 self.lp,
55 self.creds)
56 return (smb1, smb2)
58 def create_symlink(self, conn1, conn2, target, symlink,
59 expect_tgt=None):
61 if expect_tgt is None:
62 expect_tgt = target
64 self.clean_file(conn1, symlink)
65 if (conn1.protocol() < libsmb.PROTOCOL_SMB2_02 and
66 conn1.have_posix()):
67 conn1.smb1_symlink(target, symlink)
68 else:
69 flags = 0 if target[0]=='/' else 1
70 syml = conn1.create(
71 symlink,
72 DesiredAccess=sec.SEC_FILE_READ_ATTRIBUTE|
73 sec.SEC_FILE_WRITE_ATTRIBUTE|
74 sec.SEC_STD_DELETE,
75 FileAttributes=libsmb.FILE_ATTRIBUTE_NORMAL,
76 CreateDisposition=libsmb.FILE_OPEN_IF,
77 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT)
78 b = reparse_symlink.symlink_put(target, target, 0, 1)
79 conn1.fsctl(syml, libsmb.FSCTL_SET_REPARSE_POINT, b, 0)
80 conn1.close(syml)
82 fd = conn2.create(symlink,
83 DesiredAccess=sec.SEC_FILE_WRITE_ATTRIBUTE,
84 CreateOptions=libsmb.FILE_OPEN_REPARSE_POINT,
85 CreateDisposition=libsmb.FILE_OPEN)
86 blob = conn2.fsctl(fd, libsmb.FSCTL_GET_REPARSE_POINT, b'', 1024)
87 conn2.close(fd)
89 (tag,(subst,_,_,_)) = reparse_symlink.get(blob)
90 self.assertEqual(tag, "IO_REPARSE_TAG_SYMLINK")
91 self.assertEqual(expect_tgt, subst)
93 def assert_symlink_exception(self, e, expect):
94 self.assertEqual(e.args[0], ntstatus.NT_STATUS_STOPPED_ON_SYMLINK)
95 for k,v in expect.items():
96 if (k == "flags"):
97 # Ignore symlink trust flags for now
98 expected = v & ~libsmb.SYMLINK_TRUST_MASK
99 got = e.args[2].get(k) & ~libsmb.SYMLINK_TRUST_MASK
100 self.assertEqual((k,got), (k,expected))
101 else:
102 self.assertEqual((k,e.args[2].get(k)), (k,v))
104 def test_symlinkerror_directory(self):
105 """Test a symlink in a nonterminal path component"""
106 (smb1,smb2) = self.connections()
107 symlink="syml"
108 target="foo"
109 suffix="bar"
111 self.create_symlink(smb1, smb2, target, symlink);
113 with self.assertRaises(NTSTATUSError) as e:
114 fd = smb2.create_ex(f'{symlink}\\{suffix}')
116 self.assert_symlink_exception(
117 e.exception,
118 { 'unparsed_path_length' : len(suffix)+1,
119 'substitute_name' : target,
120 'print_name' : target,
121 'flags' : 0x20000001 })
123 self.clean_file(smb1, symlink)
125 def test_symlinkerror_file(self):
126 """Test a simple symlink in a terminal path"""
127 (smb1,smb2) = self.connections()
128 symlink="syml"
129 target="foo"
131 self.create_symlink(smb1, smb2, target, symlink);
133 with self.assertRaises(NTSTATUSError) as e:
134 fd = smb2.create_ex(f'{symlink}')
136 self.assert_symlink_exception(
137 e.exception,
138 { 'unparsed_path_length' : 0,
139 'substitute_name' : target,
140 'print_name' : target,
141 'flags' : 0x20000001 })
143 self.clean_file(smb1, symlink)
145 def test_symlinkerror_absolute_outside_share(self):
147 Test symlinks to outside of the share
148 We return the contents 1:1
150 (smb1,smb2) = self.connections()
151 symlink="syml"
153 for target in ["/etc", "//foo/bar", "/"]:
155 self.create_symlink(smb1, smb2, target, symlink)
157 with self.assertRaises(NTSTATUSError) as e:
158 fd = smb2.create_ex(f'{symlink}')
160 self.assert_symlink_exception(
161 e.exception,
162 { 'unparsed_path_length' : 0,
163 'substitute_name' : target,
164 'print_name' : target,
165 'flags' : 0 })
167 self.clean_file(smb1, symlink)
169 def test_symlinkerror_absolute_inshare(self):
170 """Test an absolute symlink inside the share"""
171 (smb1,smb2) = self.connections()
172 symlink="syml"
174 localpath=samba.tests.env_get_var_value("LOCAL_PATH")
175 shareroot=f'{localpath}/nosymlinks'
176 rel_dest="dst"
177 target=f'{shareroot}/{rel_dest}'
179 self.create_symlink(smb1, smb2, target, symlink, rel_dest)
181 with self.assertRaises(NTSTATUSError) as e:
182 fd = smb2.create_ex(f'{symlink}')
184 self.assert_symlink_exception(
185 e.exception,
186 { 'unparsed_path_length' : 0,
187 'substitute_name' : rel_dest,
188 'print_name' : rel_dest,
189 'flags' : 1 })
191 self.clean_file(smb1, symlink)
193 def test_symlink_reparse_data_buffer_parse(self):
194 """Test parsing a symlink reparse buffer coming from Windows"""
196 buf = (b'\x0c\x00\x00\xa0\x18\x00\x00\x00'
197 b'\x06\x00\x06\x00\x00\x00\x06\x00'
198 b'\x01\x00\x00\x00\x62\x00\x61\x00'
199 b'\x72\x00\x62\x00\x61\x00\x72\x00')
201 try:
202 (tag,syml) = reparse_symlink.get(buf)
203 except:
204 self.fail("Could not parse symlink buffer")
206 self.assertEqual(tag, "IO_REPARSE_TAG_SYMLINK")
207 self.assertEqual(syml, ('bar', 'bar', 0, 1))
209 def test_bug15505(self):
210 """Test an absolute intermediate symlink inside the share"""
211 (smb1,smb2) = self.connections(smb1share="tmp",smb2share="tmp")
212 symlink="syml"
214 localpath=samba.tests.env_get_var_value("LOCAL_PATH")
216 smb1.mkdir("sub")
217 self.addCleanup(self.clean_file, smb1, "sub")
219 self.create_symlink(smb1, smb2, f'{localpath}/sub1', "sub/lnk")
220 self.addCleanup(self.clean_file, smb1, "sub/lnk")
222 smb1.mkdir("sub1")
223 self.addCleanup(self.clean_file, smb1, "sub1")
225 fd = smb1.create("sub1/x", CreateDisposition=libsmb.FILE_CREATE);
226 smb1.close(fd)
227 self.addCleanup(self.clean_file, smb1, "sub1/x")
229 fd = smb2.create("sub\\lnk\\x")
230 smb2.close(fd)
232 if __name__ == '__main__':
233 import unittest
234 unittest.main()