ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / ntlm_auth_base.py
blob3f3850bb378e01eb3b14e2794c4bee75c0600ec5
1 # Unix SMB/CIFS implementation.
2 # A test for the ntlm_auth tool
3 # Copyright (C) Kai Blin <kai@samba.org> 2008
4 # Copyright (C) Samuel Cabrero <scabrero@suse.de> 2018
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Test ntlm_auth
20 This test program will start ntlm_auth with the given command line switches and
21 see if it will get the expected results.
22 """
24 import os
25 import samba
26 import subprocess
27 from samba.tests import BlackboxTestCase
29 class NTLMAuthTestCase(BlackboxTestCase):
31 def setUp(self):
32 super().setUp()
33 bindir = os.path.normpath(os.getenv("BINDIR", "./bin"))
34 self.ntlm_auth_path = os.path.join(bindir, 'ntlm_auth')
35 self.lp = samba.tests.env_loadparm()
36 self.winbind_separator = self.lp.get('winbind separator')
38 def readLine(self, text_stream):
39 buf = text_stream.readline()
40 newline = buf.find('\n')
41 if newline == -1:
42 raise Exception("Failed to read line")
43 return buf[:newline]
45 def writeLine(self, text_stream, buf):
46 text_stream.write(buf)
47 text_stream.write("\n")
49 def run_helper(self,
50 client_username=None,
51 client_password=None,
52 client_domain=None,
53 client_use_cached_creds=False,
54 client_use_global_krb5_ccache=False,
55 server_username=None,
56 server_password=None,
57 server_domain=None,
58 client_helper="ntlmssp-client-1",
59 server_helper="squid-2.5-ntlmssp",
60 server_use_winbind=False,
61 require_membership=None,
62 target_hostname=None,
63 target_service=None):
64 self.assertTrue(os.access(self.ntlm_auth_path, os.X_OK))
66 # Client helper args
67 client_args = []
68 client_args.append(self.ntlm_auth_path)
69 client_args.append("--helper-protocol=%s" % client_helper)
70 if client_username:
71 client_args.append("--username=%s" % client_username)
72 if client_domain:
73 client_args.append("--domain=%s" % client_domain)
74 if client_use_cached_creds:
75 if client_username is None:
76 raise Exception("client_username required")
77 if client_password is not None:
78 raise Exception("client_password not allowed")
79 client_args.append("--use-cached-creds")
80 elif client_use_global_krb5_ccache:
81 if client_username is not None:
82 raise Exception("client_username not allowed")
83 if client_password is not None:
84 raise Exception("client_password not allowed")
85 else:
86 if client_username is None:
87 raise Exception("client_username required")
88 if client_password is None:
89 raise Exception("client_password required")
90 client_args.append("--password=%s" % client_password)
91 if target_service:
92 client_args.append("--target-service=%s" % target_service)
93 if target_hostname:
94 client_args.append("--target-hostname=%s" % target_hostname)
95 client_args.append("--configfile=%s" % self.lp.configfile)
97 # Server helper args
98 server_args = []
99 server_args.append(self.ntlm_auth_path)
100 server_args.append("--helper-protocol=%s" % server_helper)
101 server_args.append("--configfile=%s" % self.lp.configfile)
102 if not server_use_winbind:
103 if server_username is None or server_password is None or server_domain is None:
104 raise Exception("Server credentials required if not using winbind")
105 server_args.append("--username=%s" % server_username)
106 server_args.append("--password=%s" % server_password)
107 server_args.append("--domain=%s" % server_domain)
108 if require_membership is not None:
109 raise Exception("Server must be using winbind for require-membership-of")
110 else:
111 if require_membership is not None:
112 server_args.append("--require-membership-of=%s" % require_membership)
114 # Run helpers
115 result = False
116 server_proc = subprocess.Popen(server_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0, universal_newlines=True)
117 client_proc = subprocess.Popen(client_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0, universal_newlines=True)
119 try:
120 if client_helper == "ntlmssp-client-1" and server_helper == "squid-2.5-ntlmssp":
121 self.writeLine(client_proc.stdin, "YR")
122 buf = self.readLine(client_proc.stdout)
123 self.assertTrue(buf.startswith("YR "))
125 self.writeLine(server_proc.stdin, buf)
126 buf = self.readLine(server_proc.stdout)
127 self.assertTrue(buf.startswith("TT "))
129 self.writeLine(client_proc.stdin, buf)
130 buf = self.readLine(client_proc.stdout)
131 self.assertTrue(buf.startswith("AF "))
133 # Client sends 'AF <base64 blob>' but server
134 # expects 'KK <base64 blob>'
135 buf = buf.replace("AF", "KK", 1)
137 self.writeLine(server_proc.stdin, buf)
138 buf = self.readLine(server_proc.stdout)
139 result = buf.startswith("AF ")
140 elif client_helper == "ntlmssp-client-1" and server_helper == "gss-spnego":
141 self.writeLine(client_proc.stdin, "YR")
142 buf = self.readLine(client_proc.stdout)
143 self.assertTrue(buf.startswith("YR "))
145 self.writeLine(server_proc.stdin, buf)
146 buf = self.readLine(server_proc.stdout)
147 self.assertTrue(buf.startswith("TT "))
149 self.writeLine(client_proc.stdin, buf)
150 buf = self.readLine(client_proc.stdout)
151 self.assertTrue(buf.startswith("AF "))
153 # Client sends 'AF <base64 blob>' but server expects 'KK <abse64 blob>'
154 buf = buf.replace("AF", "KK", 1)
156 self.writeLine(server_proc.stdin, buf)
157 buf = self.readLine(server_proc.stdout)
158 result = buf.startswith("AF * ")
159 elif client_helper == "gss-spnego-client" and server_helper == "gss-spnego":
160 self.writeLine(server_proc.stdin, "YR")
161 buf = self.readLine(server_proc.stdout)
163 while True:
164 if (buf.startswith("NA * ")):
165 result = False
166 break
168 self.assertTrue(buf.startswith("AF ") or buf.startswith("TT "))
170 self.writeLine(client_proc.stdin, buf)
171 buf = self.readLine(client_proc.stdout)
173 if buf.startswith("AF"):
174 result = True
175 break
177 self.assertTrue(buf.startswith("AF ") or buf.startswith("KK ") or buf.startswith("TT "))
179 self.writeLine(server_proc.stdin, buf)
180 buf = self.readLine(server_proc.stdout)
182 if buf.startswith("AF * "):
183 result = True
184 break
185 else:
186 self.fail("Helper protocols not handled")
188 if result is True and client_helper == "ntlmssp-client-1":
189 self.writeLine(client_proc.stdin, "GK")
190 buf = self.readLine(client_proc.stdout)
191 self.assertTrue(buf.startswith("GK "))
193 self.writeLine(client_proc.stdin, "GF")
194 buf = self.readLine(client_proc.stdout)
195 self.assertTrue(buf.startswith("GF "))
197 if result is True and server_helper == "squid-2.5-ntlmssp":
198 self.writeLine(server_proc.stdin, "GK")
199 buf = self.readLine(server_proc.stdout)
200 self.assertTrue(buf.startswith("GK "))
202 self.writeLine(server_proc.stdin, "GF")
203 buf = self.readLine(server_proc.stdout)
204 self.assertTrue(buf.startswith("GF "))
206 client_proc.stdin.close()
207 client_proc.wait()
208 self.assertEqual(client_proc.returncode, 0)
210 server_proc.stdin.close()
211 server_proc.wait()
212 self.assertEqual(server_proc.returncode, 0)
214 return result
215 except:
216 client_proc.kill()
217 client_proc.wait()
218 server_proc.kill()
219 server_proc.wait()
220 raise