1 # -*- coding: utf-8 -*-
2 # Unix SMB/CIFS implementation. Tests for smb manipulation
3 # Copyright (C) David Mulder <dmulder@suse.com> 2018
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from samba
import NTSTATUSError
23 from samba
.ntstatus
import (NT_STATUS_OBJECT_NAME_NOT_FOUND
,
24 NT_STATUS_OBJECT_PATH_NOT_FOUND
)
25 from samba
.samba3
import libsmb_samba_internal
as libsmb
26 from samba
.samba3
import param
as s3param
28 realm
= os
.environ
.get('REALM')
29 domain_dir
= realm
.lower() + '/'
30 test_contents
= 'abcd' * 256
31 utf_contents
= u
'Süßigkeiten Äpfel ' * 128
32 test_literal_bytes_embed_nulls
= b
'\xff\xfe\x14\x61\x00\x00\x62\x63\x64' * 256
33 binary_contents
= b
'\xff\xfe'
34 binary_contents
= binary_contents
+ "Hello cruel world of python3".encode('utf8') * 128
35 test_dir
= os
.path
.join(domain_dir
, 'testing_%d' % random
.randint(0, 0xFFFF))
36 test_file
= os
.path
.join(test_dir
, 'testing').replace('/', '\\')
39 class SMBTests(samba
.tests
.TestCase
):
42 self
.server
= os
.environ
["SERVER"]
43 creds
= self
.insta_creds(template
=self
.get_credentials())
45 # create an SMB connection to the server
46 lp
= s3param
.get_context()
47 lp
.load(os
.getenv("SMB_CONF_PATH"))
48 self
.smb_conn
= libsmb
.Conn(self
.server
, "sysvol", lp
, creds
)
50 self
.smb_conn
.mkdir(test_dir
)
55 self
.smb_conn
.deltree(test_dir
)
60 # check a basic listing returns the items we expect
61 ls
= [f
['name'] for f
in self
.smb_conn
.list(domain_dir
)]
62 self
.assertIn('scripts', ls
,
63 msg
='"scripts" directory not found in sysvol')
64 self
.assertIn('Policies', ls
,
65 msg
='"Policies" directory not found in sysvol')
66 self
.assertNotIn('..', ls
,
67 msg
='Parent (..) found in directory listing')
68 self
.assertNotIn('.', ls
,
69 msg
='Current dir (.) found in directory listing')
71 # using a '*' mask should be the same as using no mask
72 ls_wildcard
= [f
['name'] for f
in self
.smb_conn
.list(domain_dir
, "*")]
73 self
.assertEqual(ls
, ls_wildcard
)
75 # applying a mask should only return items that match that mask
76 ls_pol
= [f
['name'] for f
in self
.smb_conn
.list(domain_dir
, "Pol*")]
77 expected
= ["Policies"]
78 self
.assertEqual(ls_pol
, expected
)
80 # each item in the listing is a has with expected keys
81 expected_keys
= ['attrib', 'mtime', 'name', 'short_name', 'size']
82 for item
in self
.smb_conn
.list(domain_dir
):
83 for key
in expected_keys
:
84 self
.assertIn(key
, item
,
85 msg
="Key '%s' not in listing '%s'" % (key
, item
))
87 def test_deltree(self
):
88 """The smb.deltree API should delete files and sub-dirs"""
89 # create some test sub-dirs
94 for subdir
in ["subdir-X", "subdir-Y", "subdir-Z"]:
95 path
= self
.make_sysvol_path(cur_dir
, subdir
)
96 self
.smb_conn
.mkdir(path
)
100 # create another empty dir just for kicks
101 path
= self
.make_sysvol_path(cur_dir
, "another")
102 self
.smb_conn
.mkdir(path
)
103 empty_dirs
.append(path
)
105 # create some files in these directories
107 for subdir
in dirpaths
:
108 for i
in range(1, 4):
109 contents
= "I'm file {0} in dir {1}!".format(i
, subdir
)
110 path
= self
.make_sysvol_path(subdir
, "file-{0}.txt".format(i
))
111 self
.smb_conn
.savefile(path
, contents
.encode('utf8'))
112 filepaths
.append(path
)
114 # sanity-check these dirs/files exist
115 for subdir
in dirpaths
+ empty_dirs
:
116 self
.assertTrue(self
.smb_conn
.chkpath(subdir
),
117 "Failed to create {0}".format(subdir
))
118 for path
in filepaths
:
119 self
.assertTrue(self
.file_exists(path
),
120 "Failed to create {0}".format(path
))
122 # try using deltree to remove a single empty directory
123 path
= empty_dirs
.pop(0)
124 self
.smb_conn
.deltree(path
)
125 self
.assertFalse(self
.smb_conn
.chkpath(path
),
126 "Failed to delete {0}".format(path
))
128 # try using deltree to remove a single file
129 path
= filepaths
.pop(0)
130 self
.smb_conn
.deltree(path
)
131 self
.assertFalse(self
.file_exists(path
),
132 "Failed to delete {0}".format(path
))
134 # delete the top-level dir
135 self
.smb_conn
.deltree(test_dir
)
137 # now check that all the dirs/files are no longer there
138 for subdir
in dirpaths
+ empty_dirs
:
139 self
.assertFalse(self
.smb_conn
.chkpath(subdir
),
140 "Failed to delete {0}".format(subdir
))
141 for path
in filepaths
:
142 self
.assertFalse(self
.file_exists(path
),
143 "Failed to delete {0}".format(path
))
145 def file_exists(self
, filepath
):
146 """Returns whether a regular file exists (by trying to open it)"""
148 self
.smb_conn
.loadfile(filepath
)
150 except NTSTATUSError
as err
:
151 if (err
.args
[0] == NT_STATUS_OBJECT_NAME_NOT_FOUND
or
152 err
.args
[0] == NT_STATUS_OBJECT_PATH_NOT_FOUND
):
158 def test_unlink(self
):
160 The smb.unlink API should delete file
162 # create the test file
163 self
.assertFalse(self
.file_exists(test_file
))
164 self
.smb_conn
.savefile(test_file
, binary_contents
)
165 self
.assertTrue(self
.file_exists(test_file
))
167 # delete it and check that it's gone
168 self
.smb_conn
.unlink(test_file
)
169 self
.assertFalse(self
.file_exists(test_file
))
171 def test_chkpath(self
):
172 """Tests .chkpath determines whether or not a directory exists"""
174 self
.assertTrue(self
.smb_conn
.chkpath(test_dir
))
176 # should return False for a non-existent directory
177 bad_dir
= self
.make_sysvol_path(test_dir
, 'dont_exist')
178 self
.assertFalse(self
.smb_conn
.chkpath(bad_dir
))
180 # should return False for files (because they're not directories)
181 self
.smb_conn
.savefile(test_file
, binary_contents
)
182 self
.assertFalse(self
.smb_conn
.chkpath(test_file
))
184 # check correct result after creating and then deleting a new dir
185 new_dir
= self
.make_sysvol_path(test_dir
, 'test-new')
186 self
.smb_conn
.mkdir(new_dir
)
187 self
.assertTrue(self
.smb_conn
.chkpath(new_dir
))
188 self
.smb_conn
.rmdir(new_dir
)
189 self
.assertFalse(self
.smb_conn
.chkpath(new_dir
))
191 def test_save_load_text(self
):
193 self
.smb_conn
.savefile(test_file
, test_contents
.encode('utf8'))
195 contents
= self
.smb_conn
.loadfile(test_file
)
196 self
.assertEqual(contents
.decode('utf8'), test_contents
,
197 msg
='contents of test file did not match what was written')
199 # check we can overwrite the file with new contents
200 new_contents
= 'wxyz' * 128
201 self
.smb_conn
.savefile(test_file
, new_contents
.encode('utf8'))
202 contents
= self
.smb_conn
.loadfile(test_file
)
203 self
.assertEqual(contents
.decode('utf8'), new_contents
,
204 msg
='contents of test file did not match what was written')
206 # this will save/load bytes type
207 def test_save_load_string_bytes(self
):
208 self
.smb_conn
.savefile(test_file
, test_literal_bytes_embed_nulls
)
210 contents
= self
.smb_conn
.loadfile(test_file
)
211 self
.assertEqual(contents
, test_literal_bytes_embed_nulls
,
212 msg
='contents of test file did not match what was written')
214 # this will save/load unicode
215 def test_save_load_utfcontents(self
):
216 self
.smb_conn
.savefile(test_file
, utf_contents
.encode('utf8'))
218 contents
= self
.smb_conn
.loadfile(test_file
)
219 self
.assertEqual(contents
.decode('utf8'), utf_contents
,
220 msg
='contents of test file did not match what was written')
222 # this will save/load bytes type
223 def test_save_binary_contents(self
):
224 self
.smb_conn
.savefile(test_file
, binary_contents
)
226 contents
= self
.smb_conn
.loadfile(test_file
)
227 self
.assertEqual(contents
, binary_contents
,
228 msg
='contents of test file did not match what was written')
230 def make_sysvol_path(self
, dirpath
, filename
):
231 # return the dir + filename as a sysvol path
232 return os
.path
.join(dirpath
, filename
).replace('/', '\\')