drsuapi.idl: fix source_dsa spelling
[samba4-gss.git] / python / samba / tests / smb.py
blobb1c415b041e0a0fc218c74ac3ad067eb9664fd5b
1 # -*- coding: utf-8 -*-
2 # Unix SMB/CIFS implementation. Tests for smb manipulation
3 # Copyright (C) David Mulder <dmulder@suse.com> 2018
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 import samba
19 import os
20 import random
21 import sys
22 from samba import NTSTATUSError
23 from samba.ntstatus import (NT_STATUS_OBJECT_NAME_NOT_FOUND,
24 NT_STATUS_OBJECT_PATH_NOT_FOUND)
25 from samba.samba3 import libsmb_samba_internal as libsmb
26 from samba.samba3 import param as s3param
28 realm = os.environ.get('REALM')
29 domain_dir = realm.lower() + '/'
30 test_contents = 'abcd' * 256
31 utf_contents = u'Süßigkeiten Äpfel ' * 128
32 test_literal_bytes_embed_nulls = b'\xff\xfe\x14\x61\x00\x00\x62\x63\x64' * 256
33 binary_contents = b'\xff\xfe'
34 binary_contents = binary_contents + "Hello cruel world of python3".encode('utf8') * 128
35 test_dir = os.path.join(domain_dir, 'testing_%d' % random.randint(0, 0xFFFF))
36 test_file = os.path.join(test_dir, 'testing').replace('/', '\\')
39 class SMBTests(samba.tests.TestCase):
40 def setUp(self):
41 super().setUp()
42 self.server = os.environ["SERVER"]
43 creds = self.insta_creds(template=self.get_credentials())
45 # create an SMB connection to the server
46 lp = s3param.get_context()
47 lp.load(os.getenv("SMB_CONF_PATH"))
48 self.smb_conn = libsmb.Conn(self.server, "sysvol", lp, creds)
50 self.smb_conn.mkdir(test_dir)
52 def tearDown(self):
53 super().tearDown()
54 try:
55 self.smb_conn.deltree(test_dir)
56 except:
57 pass
59 def test_list(self):
60 # check a basic listing returns the items we expect
61 ls = [f['name'] for f in self.smb_conn.list(domain_dir)]
62 self.assertIn('scripts', ls,
63 msg='"scripts" directory not found in sysvol')
64 self.assertIn('Policies', ls,
65 msg='"Policies" directory not found in sysvol')
66 self.assertNotIn('..', ls,
67 msg='Parent (..) found in directory listing')
68 self.assertNotIn('.', ls,
69 msg='Current dir (.) found in directory listing')
71 # using a '*' mask should be the same as using no mask
72 ls_wildcard = [f['name'] for f in self.smb_conn.list(domain_dir, "*")]
73 self.assertEqual(ls, ls_wildcard)
75 # applying a mask should only return items that match that mask
76 ls_pol = [f['name'] for f in self.smb_conn.list(domain_dir, "Pol*")]
77 expected = ["Policies"]
78 self.assertEqual(ls_pol, expected)
80 # each item in the listing is a has with expected keys
81 expected_keys = ['attrib', 'mtime', 'name', 'short_name', 'size']
82 for item in self.smb_conn.list(domain_dir):
83 for key in expected_keys:
84 self.assertIn(key, item,
85 msg="Key '%s' not in listing '%s'" % (key, item))
87 def test_deltree(self):
88 """The smb.deltree API should delete files and sub-dirs"""
89 # create some test sub-dirs
90 dirpaths = []
91 empty_dirs = []
92 cur_dir = test_dir
94 for subdir in ["subdir-X", "subdir-Y", "subdir-Z"]:
95 path = self.make_sysvol_path(cur_dir, subdir)
96 self.smb_conn.mkdir(path)
97 dirpaths.append(path)
98 cur_dir = path
100 # create another empty dir just for kicks
101 path = self.make_sysvol_path(cur_dir, "another")
102 self.smb_conn.mkdir(path)
103 empty_dirs.append(path)
105 # create some files in these directories
106 filepaths = []
107 for subdir in dirpaths:
108 for i in range(1, 4):
109 contents = "I'm file {0} in dir {1}!".format(i, subdir)
110 path = self.make_sysvol_path(subdir, "file-{0}.txt".format(i))
111 self.smb_conn.savefile(path, contents.encode('utf8'))
112 filepaths.append(path)
114 # sanity-check these dirs/files exist
115 for subdir in dirpaths + empty_dirs:
116 self.assertTrue(self.smb_conn.chkpath(subdir),
117 "Failed to create {0}".format(subdir))
118 for path in filepaths:
119 self.assertTrue(self.file_exists(path),
120 "Failed to create {0}".format(path))
122 # try using deltree to remove a single empty directory
123 path = empty_dirs.pop(0)
124 self.smb_conn.deltree(path)
125 self.assertFalse(self.smb_conn.chkpath(path),
126 "Failed to delete {0}".format(path))
128 # try using deltree to remove a single file
129 path = filepaths.pop(0)
130 self.smb_conn.deltree(path)
131 self.assertFalse(self.file_exists(path),
132 "Failed to delete {0}".format(path))
134 # delete the top-level dir
135 self.smb_conn.deltree(test_dir)
137 # now check that all the dirs/files are no longer there
138 for subdir in dirpaths + empty_dirs:
139 self.assertFalse(self.smb_conn.chkpath(subdir),
140 "Failed to delete {0}".format(subdir))
141 for path in filepaths:
142 self.assertFalse(self.file_exists(path),
143 "Failed to delete {0}".format(path))
145 def file_exists(self, filepath):
146 """Returns whether a regular file exists (by trying to open it)"""
147 try:
148 self.smb_conn.loadfile(filepath)
149 exists = True
150 except NTSTATUSError as err:
151 if (err.args[0] == NT_STATUS_OBJECT_NAME_NOT_FOUND or
152 err.args[0] == NT_STATUS_OBJECT_PATH_NOT_FOUND):
153 exists = False
154 else:
155 raise err
156 return exists
158 def test_unlink(self):
160 The smb.unlink API should delete file
162 # create the test file
163 self.assertFalse(self.file_exists(test_file))
164 self.smb_conn.savefile(test_file, binary_contents)
165 self.assertTrue(self.file_exists(test_file))
167 # delete it and check that it's gone
168 self.smb_conn.unlink(test_file)
169 self.assertFalse(self.file_exists(test_file))
171 def test_chkpath(self):
172 """Tests .chkpath determines whether or not a directory exists"""
174 self.assertTrue(self.smb_conn.chkpath(test_dir))
176 # should return False for a non-existent directory
177 bad_dir = self.make_sysvol_path(test_dir, 'dont_exist')
178 self.assertFalse(self.smb_conn.chkpath(bad_dir))
180 # should return False for files (because they're not directories)
181 self.smb_conn.savefile(test_file, binary_contents)
182 self.assertFalse(self.smb_conn.chkpath(test_file))
184 # check correct result after creating and then deleting a new dir
185 new_dir = self.make_sysvol_path(test_dir, 'test-new')
186 self.smb_conn.mkdir(new_dir)
187 self.assertTrue(self.smb_conn.chkpath(new_dir))
188 self.smb_conn.rmdir(new_dir)
189 self.assertFalse(self.smb_conn.chkpath(new_dir))
191 def test_save_load_text(self):
193 self.smb_conn.savefile(test_file, test_contents.encode('utf8'))
195 contents = self.smb_conn.loadfile(test_file)
196 self.assertEqual(contents.decode('utf8'), test_contents,
197 msg='contents of test file did not match what was written')
199 # check we can overwrite the file with new contents
200 new_contents = 'wxyz' * 128
201 self.smb_conn.savefile(test_file, new_contents.encode('utf8'))
202 contents = self.smb_conn.loadfile(test_file)
203 self.assertEqual(contents.decode('utf8'), new_contents,
204 msg='contents of test file did not match what was written')
206 # this will save/load bytes type
207 def test_save_load_string_bytes(self):
208 self.smb_conn.savefile(test_file, test_literal_bytes_embed_nulls)
210 contents = self.smb_conn.loadfile(test_file)
211 self.assertEqual(contents, test_literal_bytes_embed_nulls,
212 msg='contents of test file did not match what was written')
214 # this will save/load unicode
215 def test_save_load_utfcontents(self):
216 self.smb_conn.savefile(test_file, utf_contents.encode('utf8'))
218 contents = self.smb_conn.loadfile(test_file)
219 self.assertEqual(contents.decode('utf8'), utf_contents,
220 msg='contents of test file did not match what was written')
222 # this will save/load bytes type
223 def test_save_binary_contents(self):
224 self.smb_conn.savefile(test_file, binary_contents)
226 contents = self.smb_conn.loadfile(test_file)
227 self.assertEqual(contents, binary_contents,
228 msg='contents of test file did not match what was written')
230 def make_sysvol_path(self, dirpath, filename):
231 # return the dir + filename as a sysvol path
232 return os.path.join(dirpath, filename).replace('/', '\\')