1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 auth logging tests that exercise winbind
26 from samba
.auth
import system_session
27 from samba
.credentials
import Credentials
28 from samba
.common
import get_string
, get_bytes
29 from samba
.dcerpc
.messaging
import AUTH_EVENT_NAME
, MSG_AUTH_LOG
30 from samba
.dsdb
import UF_NORMAL_ACCOUNT
31 from samba
.messaging
import Messaging
32 from samba
.param
import LoadParm
33 from samba
.samdb
import SamDB
34 from samba
.tests
import delete_force
, BlackboxProcessError
, BlackboxTestCase
35 from samba
.tests
.auth_log_base
import AuthLogTestBase
40 class AuthLogTestsWinbind(AuthLogTestBase
, BlackboxTestCase
):
43 # Helper function to watch for authentication messages on the
51 # Parent process return the result socket to the caller.
54 # Load the lp context for the Domain Controller, rather than the
56 config_file
= os
.environ
["DC_SERVERCONFFILE"]
58 lp_ctx
.load(config_file
)
61 # Is the message a SamLogon authentication?
67 msg
["type"] == "Authentication" and
68 msg
["Authentication"]["serviceDescription"] == "SamLogon")
71 # Handler function for received authentication messages.
72 def message_handler(context
, msgType
, src
, message
):
73 # Print the message to help debugging the tests.
74 # as it's a JSON message it does not look like a sub-unit message.
76 self
.dc_msgs
.append(message
)
78 # Set up a messaging context to listen for authentication events on
79 # the domain controller.
80 msg_ctx
= Messaging((1,), lp_ctx
=lp_ctx
)
81 msg_ctx
.irpc_add_name(AUTH_EVENT_NAME
)
82 msg_handler_and_context
= (message_handler
, None)
83 msg_ctx
.register(msg_handler_and_context
, msg_type
=MSG_AUTH_LOG
)
85 # Wait for the SamLogon message.
86 # As there could be other SamLogon's in progress we need to collect
87 # all the SamLogons and let the caller match them to the session.
89 start_time
= time
.time()
90 while (time
.time() - start_time
< 1):
91 msg_ctx
.loop_once(0.1)
93 # Only interested in SamLogon messages, filter out the rest
94 msgs
= list(filter(is_sam_logon
, self
.dc_msgs
))
97 os
.write(w1
, get_bytes(m
+"\n"))
99 os
.write(w1
, get_bytes("None\n"))
102 msg_ctx
.deregister(msg_handler_and_context
, msg_type
=MSG_AUTH_LOG
)
103 msg_ctx
.irpc_remove_name(AUTH_EVENT_NAME
)
107 # Remove any DCE/RPC ncacn_np messages
108 # these only get triggered once per session, and stripping them out
109 # avoids ordering dependencies in the tests
111 def filter_messages(self
, messages
):
113 if (msg
["type"] == "Authorization" and
114 msg
["Authorization"]["serviceDescription"] == "DCE/RPC" and
115 msg
["Authorization"]["authType"] == "ncacn_np"):
120 return list(filter(keep
, messages
))
124 self
.domain
= os
.environ
["DOMAIN"]
125 self
.host
= os
.environ
["SERVER"]
126 self
.dc
= os
.environ
["DC_SERVER"]
127 self
.lp
= self
.get_loadparm()
128 self
.credentials
= self
.get_credentials()
129 self
.session
= system_session()
132 url
="ldap://{0}".format(self
.dc
),
133 session_info
=self
.session
,
134 credentials
=self
.credentials
,
136 self
.create_user_account()
138 self
.remoteAddress
= ''
142 delete_force(self
.ldb
, self
.user_dn
)
145 # Create a test user account
146 def create_user_account(self
):
147 self
.user_pass
= self
.random_password()
148 self
.user_name
= USER_NAME
149 self
.user_dn
= "cn=%s,%s" % (self
.user_name
, self
.ldb
.domain_dn())
151 # remove the account if it exists, this will happen if a previous test
153 delete_force(self
.ldb
, self
.user_dn
)
155 utf16pw
= ('"%s"' % get_string(self
.user_pass
)).encode('utf-16-le')
158 "objectclass": "user",
159 "sAMAccountName": "%s" % self
.user_name
,
160 "userAccountControl": str(UF_NORMAL_ACCOUNT
),
161 "unicodePwd": utf16pw
})
163 self
.user_creds
= Credentials()
164 self
.user_creds
.guess(self
.get_loadparm())
165 self
.user_creds
.set_password(self
.user_pass
)
166 self
.user_creds
.set_username(self
.user_name
)
167 self
.user_creds
.set_workstation(self
.server
)
170 # Check that the domain server received a SamLogon request for the
173 def check_domain_server_authentication(self
, pipe
, logon_id
, description
):
175 messages
= os
.read(pipe
, 8192)
176 messages
= get_string(messages
)
177 if len(messages
) == 0 or messages
== "None":
178 self
.fail("No Domain server authentication message")
181 # Look for the SamLogon request matching logon_id
183 for message
in messages
.split("\n"):
184 msg
= json
.loads(get_string(message
))
185 if logon_id
== msg
["Authentication"]["logonId"]:
190 self
.fail("No Domain server authentication message")
193 # Validate that message contains the expected data
195 self
.assertEqual("Authentication", msg
["type"])
196 self
.assertEqual(logon_id
, msg
["Authentication"]["logonId"])
197 self
.assertEqual("SamLogon",
198 msg
["Authentication"]["serviceDescription"])
199 self
.assertEqual(description
,
200 msg
["Authentication"]["authDescription"])
202 def test_ntlm_auth(self
):
204 def isLastExpectedMessage(msg
):
205 DESC
= "PAM_AUTH, ntlm_auth"
207 msg
["type"] == "Authentication" and
208 msg
["Authentication"]["serviceDescription"] == "winbind" and
209 msg
["Authentication"]["authDescription"] is not None and
210 msg
["Authentication"]["authDescription"].startswith(DESC
))
212 pipe
= self
.dc_watcher()
213 COMMAND
= "bin/ntlm_auth"
214 self
.check_run("{0} --username={1} --password={2}".format(
216 self
.credentials
.get_username(),
217 self
.credentials
.get_password()),
218 msg
="ntlm_auth failed")
220 messages
= self
.waitForMessages(isLastExpectedMessage
)
221 messages
= self
.filter_messages(messages
)
222 expected_messages
= 1
223 self
.assertEqual(expected_messages
,
225 "Did not receive the expected number of messages")
227 # Check the first message it should be an Authentication
229 self
.assertEqual("Authentication", msg
["type"])
231 msg
["Authentication"]["authDescription"].startswith(
232 "PAM_AUTH, ntlm_auth,"))
233 self
.assertEqual("winbind",
234 msg
["Authentication"]["serviceDescription"])
235 self
.assertEqual("Plaintext", msg
["Authentication"]["passwordType"])
236 # Logon type should be NetworkCleartext
237 self
.assertEqual(8, msg
["Authentication"]["logonType"])
238 # Event code should be Successful logon
239 self
.assertEqual(4624, msg
["Authentication"]["eventId"])
240 self
.assertEqual("unix:", msg
["Authentication"]["remoteAddress"])
241 self
.assertEqual("unix:", msg
["Authentication"]["localAddress"])
242 self
.assertEqual(self
.domain
, msg
["Authentication"]["clientDomain"])
243 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
244 self
.assertEqual(self
.credentials
.get_username(),
245 msg
["Authentication"]["clientAccount"])
246 self
.assertEqual(self
.credentials
.get_domain(),
247 msg
["Authentication"]["clientDomain"])
248 self
.assertTrue(msg
["Authentication"]["workstation"] is None)
250 logon_id
= msg
["Authentication"]["logonId"]
253 # Now check the Domain server authentication message
255 self
.check_domain_server_authentication(pipe
, logon_id
, "interactive")
257 def test_wbinfo(self
):
258 def isLastExpectedMessage(msg
):
259 DESC
= "NTLM_AUTH, wbinfo"
261 msg
["type"] == "Authentication" and
262 msg
["Authentication"]["serviceDescription"] == "winbind" and
263 msg
["Authentication"]["authDescription"] is not None and
264 msg
["Authentication"]["authDescription"].startswith(DESC
))
266 pipe
= self
.dc_watcher()
267 COMMAND
= "bin/wbinfo"
269 self
.check_run("{0} -a {1}%{2}".format(
271 self
.credentials
.get_username(),
272 self
.credentials
.get_password()),
273 msg
="ntlm_auth failed")
274 except BlackboxProcessError
:
277 messages
= self
.waitForMessages(isLastExpectedMessage
)
278 messages
= self
.filter_messages(messages
)
279 expected_messages
= 3
280 self
.assertEqual(expected_messages
,
282 "Did not receive the expected number of messages")
284 # The 1st message should be an Authentication against the local
287 self
.assertEqual("Authentication", msg
["type"])
288 self
.assertTrue(msg
["Authentication"]["authDescription"].startswith(
290 self
.assertEqual("winbind",
291 msg
["Authentication"]["serviceDescription"])
292 # Logon type should be Interactive
293 self
.assertEqual(2, msg
["Authentication"]["logonType"])
294 # Event code should be Unsuccessful logon
295 self
.assertEqual(4625, msg
["Authentication"]["eventId"])
296 self
.assertEqual("unix:", msg
["Authentication"]["remoteAddress"])
297 self
.assertEqual("unix:", msg
["Authentication"]["localAddress"])
298 self
.assertEqual('', msg
["Authentication"]["clientDomain"])
299 # This is what the existing winbind implementation returns.
300 self
.assertEqual("NT_STATUS_NO_SUCH_USER",
301 msg
["Authentication"]["status"])
302 self
.assertEqual("NTLMv2", msg
["Authentication"]["passwordType"])
303 self
.assertEqual(self
.credentials
.get_username(),
304 msg
["Authentication"]["clientAccount"])
305 self
.assertEqual("", msg
["Authentication"]["clientDomain"])
307 logon_id
= msg
["Authentication"]["logonId"]
309 # The 2nd message should be a PAM_AUTH with the same logon id as the
312 self
.assertEqual("Authentication", msg
["type"])
313 self
.assertTrue(msg
["Authentication"]["authDescription"].startswith(
315 self
.assertEqual("winbind",
316 msg
["Authentication"]["serviceDescription"])
317 self
.assertEqual(logon_id
, msg
["Authentication"]["logonId"])
318 # Logon type should be NetworkCleartext
319 self
.assertEqual(8, msg
["Authentication"]["logonType"])
320 # Event code should be Unsuccessful logon
321 self
.assertEqual(4625, msg
["Authentication"]["eventId"])
322 self
.assertEqual("unix:", msg
["Authentication"]["remoteAddress"])
323 self
.assertEqual("unix:", msg
["Authentication"]["localAddress"])
324 self
.assertEqual('', msg
["Authentication"]["clientDomain"])
325 # This is what the existing winbind implementation returns.
326 self
.assertEqual("NT_STATUS_NO_SUCH_USER",
327 msg
["Authentication"]["status"])
328 self
.assertEqual(self
.credentials
.get_username(),
329 msg
["Authentication"]["clientAccount"])
330 self
.assertEqual("", msg
["Authentication"]["clientDomain"])
332 # The 3rd message should be an NTLM_AUTH
334 self
.assertEqual("Authentication", msg
["type"])
335 self
.assertTrue(msg
["Authentication"]["authDescription"].startswith(
336 "NTLM_AUTH, wbinfo,"))
337 self
.assertEqual("winbind",
338 msg
["Authentication"]["serviceDescription"])
339 # Logon type should be Network
340 self
.assertEqual(3, msg
["Authentication"]["logonType"])
341 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
342 # Event code should be successful logon
343 self
.assertEqual(4624, msg
["Authentication"]["eventId"])
344 self
.assertEqual("NTLMv2", msg
["Authentication"]["passwordType"])
345 self
.assertEqual("unix:", msg
["Authentication"]["remoteAddress"])
346 self
.assertEqual("unix:", msg
["Authentication"]["localAddress"])
347 self
.assertEqual(self
.credentials
.get_username(),
348 msg
["Authentication"]["clientAccount"])
349 self
.assertEqual(self
.credentials
.get_domain(),
350 msg
["Authentication"]["clientDomain"])
352 logon_id
= msg
["Authentication"]["logonId"]
355 # Now check the Domain server authentication message
357 self
.check_domain_server_authentication(pipe
, logon_id
, "network")
359 def test_wbinfo_ntlmv1(self
):
360 def isLastExpectedMessage(msg
):
361 DESC
= "NTLM_AUTH, wbinfo"
363 msg
["type"] == "Authentication" and
364 msg
["Authentication"]["serviceDescription"] == "winbind" and
365 msg
["Authentication"]["authDescription"] is not None and
366 msg
["Authentication"]["authDescription"].startswith(DESC
))
368 pipe
= self
.dc_watcher()
369 COMMAND
= "bin/wbinfo"
371 self
.check_run("{0} --ntlmv1 -a {1}%{2}".format(
373 self
.credentials
.get_username(),
374 self
.credentials
.get_password()),
375 msg
="ntlm_auth failed")
376 except BlackboxProcessError
:
379 messages
= self
.waitForMessages(isLastExpectedMessage
)
380 messages
= self
.filter_messages(messages
)
381 expected_messages
= 3
382 self
.assertEqual(expected_messages
,
384 "Did not receive the expected number of messages")
386 # The 1st message should be an Authentication against the local
389 self
.assertEqual("Authentication", msg
["type"])
390 self
.assertTrue(msg
["Authentication"]["authDescription"].startswith(
392 self
.assertEqual("winbind",
393 msg
["Authentication"]["serviceDescription"])
394 # Logon type should be Interactive
395 self
.assertEqual(2, msg
["Authentication"]["logonType"])
396 # Event code should be Unsuccessful logon
397 self
.assertEqual(4625, msg
["Authentication"]["eventId"])
398 self
.assertEqual("unix:", msg
["Authentication"]["remoteAddress"])
399 self
.assertEqual("unix:", msg
["Authentication"]["localAddress"])
400 self
.assertEqual('', msg
["Authentication"]["clientDomain"])
401 # This is what the existing winbind implementation returns.
402 self
.assertEqual("NT_STATUS_NO_SUCH_USER",
403 msg
["Authentication"]["status"])
404 self
.assertEqual("NTLMv2", msg
["Authentication"]["passwordType"])
405 self
.assertEqual(self
.credentials
.get_username(),
406 msg
["Authentication"]["clientAccount"])
407 self
.assertEqual("", msg
["Authentication"]["clientDomain"])
409 logon_id
= msg
["Authentication"]["logonId"]
411 # The 2nd message should be a PAM_AUTH with the same logon id as the
414 self
.assertEqual("Authentication", msg
["type"])
415 self
.assertTrue(msg
["Authentication"]["authDescription"].startswith(
417 self
.assertEqual("winbind",
418 msg
["Authentication"]["serviceDescription"])
419 self
.assertEqual(logon_id
, msg
["Authentication"]["logonId"])
420 self
.assertEqual("Plaintext", msg
["Authentication"]["passwordType"])
421 # Logon type should be NetworkCleartext
422 self
.assertEqual(8, msg
["Authentication"]["logonType"])
423 # Event code should be Unsuccessful logon
424 self
.assertEqual(4625, msg
["Authentication"]["eventId"])
425 self
.assertEqual("unix:", msg
["Authentication"]["remoteAddress"])
426 self
.assertEqual("unix:", msg
["Authentication"]["localAddress"])
427 self
.assertEqual('', msg
["Authentication"]["clientDomain"])
428 # This is what the existing winbind implementation returns.
429 self
.assertEqual("NT_STATUS_NO_SUCH_USER",
430 msg
["Authentication"]["status"])
431 self
.assertEqual(self
.credentials
.get_username(),
432 msg
["Authentication"]["clientAccount"])
433 self
.assertEqual("", msg
["Authentication"]["clientDomain"])
435 # The 3rd message should be an NTLM_AUTH
437 self
.assertEqual("Authentication", msg
["type"])
438 self
.assertTrue(msg
["Authentication"]["authDescription"].startswith(
439 "NTLM_AUTH, wbinfo,"))
440 self
.assertEqual("winbind",
441 msg
["Authentication"]["serviceDescription"])
442 self
.assertEqual("NTLMv1",
443 msg
["Authentication"]["passwordType"])
444 # Logon type should be Network
445 self
.assertEqual(3, msg
["Authentication"]["logonType"])
446 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
447 # Event code should be successful logon
448 self
.assertEqual(4624, msg
["Authentication"]["eventId"])
449 self
.assertEqual("unix:", msg
["Authentication"]["remoteAddress"])
450 self
.assertEqual("unix:", msg
["Authentication"]["localAddress"])
451 self
.assertEqual(self
.credentials
.get_username(),
452 msg
["Authentication"]["clientAccount"])
453 self
.assertEqual(self
.credentials
.get_domain(),
454 msg
["Authentication"]["clientDomain"])
456 logon_id
= msg
["Authentication"]["logonId"]
458 # Now check the Domain server authentication message
460 self
.check_domain_server_authentication(pipe
, logon_id
, "network")