1 # Unix SMB/CIFS implementation.
2 # A test for the ntlm_auth tool
3 # Copyright (C) Kai Blin <kai@samba.org> 2008
4 # Copyright (C) Samuel Cabrero <scabrero@suse.de> 2018
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 This test program will start ntlm_auth with the given command line switches and
21 see if it will get the expected results.
27 from samba
.tests
import BlackboxTestCase
29 class NTLMAuthTestCase(BlackboxTestCase
):
33 bindir
= os
.path
.normpath(os
.getenv("BINDIR", "./bin"))
34 self
.ntlm_auth_path
= os
.path
.join(bindir
, 'ntlm_auth')
35 self
.lp
= samba
.tests
.env_loadparm()
36 self
.winbind_separator
= self
.lp
.get('winbind separator')
38 def readLine(self
, text_stream
):
39 buf
= text_stream
.readline()
40 newline
= buf
.find('\n')
42 raise Exception("Failed to read line")
45 def writeLine(self
, text_stream
, buf
):
46 text_stream
.write(buf
)
47 text_stream
.write("\n")
53 client_use_cached_creds
=False,
54 client_use_global_krb5_ccache
=False,
58 client_helper
="ntlmssp-client-1",
59 server_helper
="squid-2.5-ntlmssp",
60 server_use_winbind
=False,
61 require_membership
=None,
64 self
.assertTrue(os
.access(self
.ntlm_auth_path
, os
.X_OK
))
68 client_args
.append(self
.ntlm_auth_path
)
69 client_args
.append("--helper-protocol=%s" % client_helper
)
71 client_args
.append("--username=%s" % client_username
)
73 client_args
.append("--domain=%s" % client_domain
)
74 if client_use_cached_creds
:
75 if client_username
is None:
76 raise Exception("client_username required")
77 if client_password
is not None:
78 raise Exception("client_password not allowed")
79 client_args
.append("--use-cached-creds")
80 elif client_use_global_krb5_ccache
:
81 if client_username
is not None:
82 raise Exception("client_username not allowed")
83 if client_password
is not None:
84 raise Exception("client_password not allowed")
86 if client_username
is None:
87 raise Exception("client_username required")
88 if client_password
is None:
89 raise Exception("client_password required")
90 client_args
.append("--password=%s" % client_password
)
92 client_args
.append("--target-service=%s" % target_service
)
94 client_args
.append("--target-hostname=%s" % target_hostname
)
95 client_args
.append("--configfile=%s" % self
.lp
.configfile
)
99 server_args
.append(self
.ntlm_auth_path
)
100 server_args
.append("--helper-protocol=%s" % server_helper
)
101 server_args
.append("--configfile=%s" % self
.lp
.configfile
)
102 if not server_use_winbind
:
103 if server_username
is None or server_password
is None or server_domain
is None:
104 raise Exception("Server credentials required if not using winbind")
105 server_args
.append("--username=%s" % server_username
)
106 server_args
.append("--password=%s" % server_password
)
107 server_args
.append("--domain=%s" % server_domain
)
108 if require_membership
is not None:
109 raise Exception("Server must be using winbind for require-membership-of")
111 if require_membership
is not None:
112 server_args
.append("--require-membership-of=%s" % require_membership
)
116 server_proc
= subprocess
.Popen(server_args
, stdout
=subprocess
.PIPE
, stdin
=subprocess
.PIPE
, bufsize
=0, universal_newlines
=True)
117 client_proc
= subprocess
.Popen(client_args
, stdout
=subprocess
.PIPE
, stdin
=subprocess
.PIPE
, bufsize
=0, universal_newlines
=True)
120 if client_helper
== "ntlmssp-client-1" and server_helper
== "squid-2.5-ntlmssp":
121 self
.writeLine(client_proc
.stdin
, "YR")
122 buf
= self
.readLine(client_proc
.stdout
)
123 self
.assertTrue(buf
.startswith("YR "))
125 self
.writeLine(server_proc
.stdin
, buf
)
126 buf
= self
.readLine(server_proc
.stdout
)
127 self
.assertTrue(buf
.startswith("TT "))
129 self
.writeLine(client_proc
.stdin
, buf
)
130 buf
= self
.readLine(client_proc
.stdout
)
131 self
.assertTrue(buf
.startswith("AF "))
133 # Client sends 'AF <base64 blob>' but server
134 # expects 'KK <base64 blob>'
135 buf
= buf
.replace("AF", "KK", 1)
137 self
.writeLine(server_proc
.stdin
, buf
)
138 buf
= self
.readLine(server_proc
.stdout
)
139 result
= buf
.startswith("AF ")
140 elif client_helper
== "ntlmssp-client-1" and server_helper
== "gss-spnego":
141 self
.writeLine(client_proc
.stdin
, "YR")
142 buf
= self
.readLine(client_proc
.stdout
)
143 self
.assertTrue(buf
.startswith("YR "))
145 self
.writeLine(server_proc
.stdin
, buf
)
146 buf
= self
.readLine(server_proc
.stdout
)
147 self
.assertTrue(buf
.startswith("TT "))
149 self
.writeLine(client_proc
.stdin
, buf
)
150 buf
= self
.readLine(client_proc
.stdout
)
151 self
.assertTrue(buf
.startswith("AF "))
153 # Client sends 'AF <base64 blob>' but server expects 'KK <abse64 blob>'
154 buf
= buf
.replace("AF", "KK", 1)
156 self
.writeLine(server_proc
.stdin
, buf
)
157 buf
= self
.readLine(server_proc
.stdout
)
158 result
= buf
.startswith("AF * ")
159 elif client_helper
== "gss-spnego-client" and server_helper
== "gss-spnego":
160 self
.writeLine(server_proc
.stdin
, "YR")
161 buf
= self
.readLine(server_proc
.stdout
)
164 if (buf
.startswith("NA * ")):
168 self
.assertTrue(buf
.startswith("AF ") or buf
.startswith("TT "))
170 self
.writeLine(client_proc
.stdin
, buf
)
171 buf
= self
.readLine(client_proc
.stdout
)
173 if buf
.startswith("AF"):
177 self
.assertTrue(buf
.startswith("AF ") or buf
.startswith("KK ") or buf
.startswith("TT "))
179 self
.writeLine(server_proc
.stdin
, buf
)
180 buf
= self
.readLine(server_proc
.stdout
)
182 if buf
.startswith("AF * "):
186 self
.fail("Helper protocols not handled")
188 if result
is True and client_helper
== "ntlmssp-client-1":
189 self
.writeLine(client_proc
.stdin
, "GK")
190 buf
= self
.readLine(client_proc
.stdout
)
191 self
.assertTrue(buf
.startswith("GK "))
193 self
.writeLine(client_proc
.stdin
, "GF")
194 buf
= self
.readLine(client_proc
.stdout
)
195 self
.assertTrue(buf
.startswith("GF "))
197 if result
is True and server_helper
== "squid-2.5-ntlmssp":
198 self
.writeLine(server_proc
.stdin
, "GK")
199 buf
= self
.readLine(server_proc
.stdout
)
200 self
.assertTrue(buf
.startswith("GK "))
202 self
.writeLine(server_proc
.stdin
, "GF")
203 buf
= self
.readLine(server_proc
.stdout
)
204 self
.assertTrue(buf
.startswith("GF "))
206 client_proc
.stdin
.close()
208 self
.assertEqual(client_proc
.returncode
, 0)
210 server_proc
.stdin
.close()
212 self
.assertEqual(server_proc
.returncode
, 0)