ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / auth_log_winbind.py
blob1445eff698436e459bb75b0fd0d648efa6053106
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """
19 auth logging tests that exercise winbind
20 """
22 import json
23 import os
24 import time
26 from samba.auth import system_session
27 from samba.credentials import Credentials
28 from samba.common import get_string, get_bytes
29 from samba.dcerpc.messaging import AUTH_EVENT_NAME, MSG_AUTH_LOG
30 from samba.dsdb import UF_NORMAL_ACCOUNT
31 from samba.messaging import Messaging
32 from samba.param import LoadParm
33 from samba.samdb import SamDB
34 from samba.tests import delete_force, BlackboxProcessError, BlackboxTestCase
35 from samba.tests.auth_log_base import AuthLogTestBase
37 USER_NAME = "WBALU"
40 class AuthLogTestsWinbind(AuthLogTestBase, BlackboxTestCase):
43 # Helper function to watch for authentication messages on the
44 # Domain Controller.
46 def dc_watcher(self):
48 (r1, w1) = os.pipe()
49 pid = os.fork()
50 if pid != 0:
51 # Parent process return the result socket to the caller.
52 return r1
54 # Load the lp context for the Domain Controller, rather than the
55 # member server.
56 config_file = os.environ["DC_SERVERCONFFILE"]
57 lp_ctx = LoadParm()
58 lp_ctx.load(config_file)
61 # Is the message a SamLogon authentication?
62 def is_sam_logon(m):
63 if m is None:
64 return False
65 msg = json.loads(m)
66 return (
67 msg["type"] == "Authentication" and
68 msg["Authentication"]["serviceDescription"] == "SamLogon")
71 # Handler function for received authentication messages.
72 def message_handler(context, msgType, src, message):
73 # Print the message to help debugging the tests.
74 # as it's a JSON message it does not look like a sub-unit message.
75 print(message)
76 self.dc_msgs.append(message)
78 # Set up a messaging context to listen for authentication events on
79 # the domain controller.
80 msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
81 msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
82 msg_handler_and_context = (message_handler, None)
83 msg_ctx.register(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
85 # Wait for the SamLogon message.
86 # As there could be other SamLogon's in progress we need to collect
87 # all the SamLogons and let the caller match them to the session.
88 self.dc_msgs = []
89 start_time = time.time()
90 while (time.time() - start_time < 1):
91 msg_ctx.loop_once(0.1)
93 # Only interested in SamLogon messages, filter out the rest
94 msgs = list(filter(is_sam_logon, self.dc_msgs))
95 if msgs:
96 for m in msgs:
97 os.write(w1, get_bytes(m+"\n"))
98 else:
99 os.write(w1, get_bytes("None\n"))
100 os.close(w1)
102 msg_ctx.deregister(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
103 msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
105 os._exit(0)
107 # Remove any DCE/RPC ncacn_np messages
108 # these only get triggered once per session, and stripping them out
109 # avoids ordering dependencies in the tests
111 def filter_messages(self, messages):
112 def keep(msg):
113 if (msg["type"] == "Authorization" and
114 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
115 msg["Authorization"]["authType"] == "ncacn_np"):
116 return False
117 else:
118 return True
120 return list(filter(keep, messages))
122 def setUp(self):
123 super().setUp()
124 self.domain = os.environ["DOMAIN"]
125 self.host = os.environ["SERVER"]
126 self.dc = os.environ["DC_SERVER"]
127 self.lp = self.get_loadparm()
128 self.credentials = self.get_credentials()
129 self.session = system_session()
131 self.ldb = SamDB(
132 url="ldap://{0}".format(self.dc),
133 session_info=self.session,
134 credentials=self.credentials,
135 lp=self.lp)
136 self.create_user_account()
138 self.remoteAddress = ''
140 def tearDown(self):
141 super().tearDown()
142 delete_force(self.ldb, self.user_dn)
145 # Create a test user account
146 def create_user_account(self):
147 self.user_pass = self.random_password()
148 self.user_name = USER_NAME
149 self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
151 # remove the account if it exists, this will happen if a previous test
152 # run failed
153 delete_force(self.ldb, self.user_dn)
155 utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le')
156 self.ldb.add({
157 "dn": self.user_dn,
158 "objectclass": "user",
159 "sAMAccountName": "%s" % self.user_name,
160 "userAccountControl": str(UF_NORMAL_ACCOUNT),
161 "unicodePwd": utf16pw})
163 self.user_creds = Credentials()
164 self.user_creds.guess(self.get_loadparm())
165 self.user_creds.set_password(self.user_pass)
166 self.user_creds.set_username(self.user_name)
167 self.user_creds.set_workstation(self.server)
170 # Check that the domain server received a SamLogon request for the
171 # current logon.
173 def check_domain_server_authentication(self, pipe, logon_id, description):
175 messages = os.read(pipe, 8192)
176 messages = get_string(messages)
177 if len(messages) == 0 or messages == "None":
178 self.fail("No Domain server authentication message")
181 # Look for the SamLogon request matching logon_id
182 msg = None
183 for message in messages.split("\n"):
184 msg = json.loads(get_string(message))
185 if logon_id == msg["Authentication"]["logonId"]:
186 break
187 msg = None
189 if msg is None:
190 self.fail("No Domain server authentication message")
193 # Validate that message contains the expected data
195 self.assertEqual("Authentication", msg["type"])
196 self.assertEqual(logon_id, msg["Authentication"]["logonId"])
197 self.assertEqual("SamLogon",
198 msg["Authentication"]["serviceDescription"])
199 self.assertEqual(description,
200 msg["Authentication"]["authDescription"])
202 def test_ntlm_auth(self):
204 def isLastExpectedMessage(msg):
205 DESC = "PAM_AUTH, ntlm_auth"
206 return (
207 msg["type"] == "Authentication" and
208 msg["Authentication"]["serviceDescription"] == "winbind" and
209 msg["Authentication"]["authDescription"] is not None and
210 msg["Authentication"]["authDescription"].startswith(DESC))
212 pipe = self.dc_watcher()
213 COMMAND = "bin/ntlm_auth"
214 self.check_run("{0} --username={1} --password={2}".format(
215 COMMAND,
216 self.credentials.get_username(),
217 self.credentials.get_password()),
218 msg="ntlm_auth failed")
220 messages = self.waitForMessages(isLastExpectedMessage)
221 messages = self.filter_messages(messages)
222 expected_messages = 1
223 self.assertEqual(expected_messages,
224 len(messages),
225 "Did not receive the expected number of messages")
227 # Check the first message it should be an Authentication
228 msg = messages[0]
229 self.assertEqual("Authentication", msg["type"])
230 self.assertTrue(
231 msg["Authentication"]["authDescription"].startswith(
232 "PAM_AUTH, ntlm_auth,"))
233 self.assertEqual("winbind",
234 msg["Authentication"]["serviceDescription"])
235 self.assertEqual("Plaintext", msg["Authentication"]["passwordType"])
236 # Logon type should be NetworkCleartext
237 self.assertEqual(8, msg["Authentication"]["logonType"])
238 # Event code should be Successful logon
239 self.assertEqual(4624, msg["Authentication"]["eventId"])
240 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
241 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
242 self.assertEqual(self.domain, msg["Authentication"]["clientDomain"])
243 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
244 self.assertEqual(self.credentials.get_username(),
245 msg["Authentication"]["clientAccount"])
246 self.assertEqual(self.credentials.get_domain(),
247 msg["Authentication"]["clientDomain"])
248 self.assertTrue(msg["Authentication"]["workstation"] is None)
250 logon_id = msg["Authentication"]["logonId"]
253 # Now check the Domain server authentication message
255 self.check_domain_server_authentication(pipe, logon_id, "interactive")
257 def test_wbinfo(self):
258 def isLastExpectedMessage(msg):
259 DESC = "NTLM_AUTH, wbinfo"
260 return (
261 msg["type"] == "Authentication" and
262 msg["Authentication"]["serviceDescription"] == "winbind" and
263 msg["Authentication"]["authDescription"] is not None and
264 msg["Authentication"]["authDescription"].startswith(DESC))
266 pipe = self.dc_watcher()
267 COMMAND = "bin/wbinfo"
268 try:
269 self.check_run("{0} -a {1}%{2}".format(
270 COMMAND,
271 self.credentials.get_username(),
272 self.credentials.get_password()),
273 msg="ntlm_auth failed")
274 except BlackboxProcessError:
275 pass
277 messages = self.waitForMessages(isLastExpectedMessage)
278 messages = self.filter_messages(messages)
279 expected_messages = 3
280 self.assertEqual(expected_messages,
281 len(messages),
282 "Did not receive the expected number of messages")
284 # The 1st message should be an Authentication against the local
285 # password database
286 msg = messages[0]
287 self.assertEqual("Authentication", msg["type"])
288 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
289 "PASSDB, wbinfo,"))
290 self.assertEqual("winbind",
291 msg["Authentication"]["serviceDescription"])
292 # Logon type should be Interactive
293 self.assertEqual(2, msg["Authentication"]["logonType"])
294 # Event code should be Unsuccessful logon
295 self.assertEqual(4625, msg["Authentication"]["eventId"])
296 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
297 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
298 self.assertEqual('', msg["Authentication"]["clientDomain"])
299 # This is what the existing winbind implementation returns.
300 self.assertEqual("NT_STATUS_NO_SUCH_USER",
301 msg["Authentication"]["status"])
302 self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
303 self.assertEqual(self.credentials.get_username(),
304 msg["Authentication"]["clientAccount"])
305 self.assertEqual("", msg["Authentication"]["clientDomain"])
307 logon_id = msg["Authentication"]["logonId"]
309 # The 2nd message should be a PAM_AUTH with the same logon id as the
310 # 1st message
311 msg = messages[1]
312 self.assertEqual("Authentication", msg["type"])
313 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
314 "PAM_AUTH"))
315 self.assertEqual("winbind",
316 msg["Authentication"]["serviceDescription"])
317 self.assertEqual(logon_id, msg["Authentication"]["logonId"])
318 # Logon type should be NetworkCleartext
319 self.assertEqual(8, msg["Authentication"]["logonType"])
320 # Event code should be Unsuccessful logon
321 self.assertEqual(4625, msg["Authentication"]["eventId"])
322 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
323 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
324 self.assertEqual('', msg["Authentication"]["clientDomain"])
325 # This is what the existing winbind implementation returns.
326 self.assertEqual("NT_STATUS_NO_SUCH_USER",
327 msg["Authentication"]["status"])
328 self.assertEqual(self.credentials.get_username(),
329 msg["Authentication"]["clientAccount"])
330 self.assertEqual("", msg["Authentication"]["clientDomain"])
332 # The 3rd message should be an NTLM_AUTH
333 msg = messages[2]
334 self.assertEqual("Authentication", msg["type"])
335 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
336 "NTLM_AUTH, wbinfo,"))
337 self.assertEqual("winbind",
338 msg["Authentication"]["serviceDescription"])
339 # Logon type should be Network
340 self.assertEqual(3, msg["Authentication"]["logonType"])
341 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
342 # Event code should be successful logon
343 self.assertEqual(4624, msg["Authentication"]["eventId"])
344 self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
345 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
346 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
347 self.assertEqual(self.credentials.get_username(),
348 msg["Authentication"]["clientAccount"])
349 self.assertEqual(self.credentials.get_domain(),
350 msg["Authentication"]["clientDomain"])
352 logon_id = msg["Authentication"]["logonId"]
355 # Now check the Domain server authentication message
357 self.check_domain_server_authentication(pipe, logon_id, "network")
359 def test_wbinfo_ntlmv1(self):
360 def isLastExpectedMessage(msg):
361 DESC = "NTLM_AUTH, wbinfo"
362 return (
363 msg["type"] == "Authentication" and
364 msg["Authentication"]["serviceDescription"] == "winbind" and
365 msg["Authentication"]["authDescription"] is not None and
366 msg["Authentication"]["authDescription"].startswith(DESC))
368 pipe = self.dc_watcher()
369 COMMAND = "bin/wbinfo"
370 try:
371 self.check_run("{0} --ntlmv1 -a {1}%{2}".format(
372 COMMAND,
373 self.credentials.get_username(),
374 self.credentials.get_password()),
375 msg="ntlm_auth failed")
376 except BlackboxProcessError:
377 pass
379 messages = self.waitForMessages(isLastExpectedMessage)
380 messages = self.filter_messages(messages)
381 expected_messages = 3
382 self.assertEqual(expected_messages,
383 len(messages),
384 "Did not receive the expected number of messages")
386 # The 1st message should be an Authentication against the local
387 # password database
388 msg = messages[0]
389 self.assertEqual("Authentication", msg["type"])
390 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
391 "PASSDB, wbinfo,"))
392 self.assertEqual("winbind",
393 msg["Authentication"]["serviceDescription"])
394 # Logon type should be Interactive
395 self.assertEqual(2, msg["Authentication"]["logonType"])
396 # Event code should be Unsuccessful logon
397 self.assertEqual(4625, msg["Authentication"]["eventId"])
398 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
399 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
400 self.assertEqual('', msg["Authentication"]["clientDomain"])
401 # This is what the existing winbind implementation returns.
402 self.assertEqual("NT_STATUS_NO_SUCH_USER",
403 msg["Authentication"]["status"])
404 self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
405 self.assertEqual(self.credentials.get_username(),
406 msg["Authentication"]["clientAccount"])
407 self.assertEqual("", msg["Authentication"]["clientDomain"])
409 logon_id = msg["Authentication"]["logonId"]
411 # The 2nd message should be a PAM_AUTH with the same logon id as the
412 # 1st message
413 msg = messages[1]
414 self.assertEqual("Authentication", msg["type"])
415 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
416 "PAM_AUTH"))
417 self.assertEqual("winbind",
418 msg["Authentication"]["serviceDescription"])
419 self.assertEqual(logon_id, msg["Authentication"]["logonId"])
420 self.assertEqual("Plaintext", msg["Authentication"]["passwordType"])
421 # Logon type should be NetworkCleartext
422 self.assertEqual(8, msg["Authentication"]["logonType"])
423 # Event code should be Unsuccessful logon
424 self.assertEqual(4625, msg["Authentication"]["eventId"])
425 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
426 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
427 self.assertEqual('', msg["Authentication"]["clientDomain"])
428 # This is what the existing winbind implementation returns.
429 self.assertEqual("NT_STATUS_NO_SUCH_USER",
430 msg["Authentication"]["status"])
431 self.assertEqual(self.credentials.get_username(),
432 msg["Authentication"]["clientAccount"])
433 self.assertEqual("", msg["Authentication"]["clientDomain"])
435 # The 3rd message should be an NTLM_AUTH
436 msg = messages[2]
437 self.assertEqual("Authentication", msg["type"])
438 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
439 "NTLM_AUTH, wbinfo,"))
440 self.assertEqual("winbind",
441 msg["Authentication"]["serviceDescription"])
442 self.assertEqual("NTLMv1",
443 msg["Authentication"]["passwordType"])
444 # Logon type should be Network
445 self.assertEqual(3, msg["Authentication"]["logonType"])
446 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
447 # Event code should be successful logon
448 self.assertEqual(4624, msg["Authentication"]["eventId"])
449 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
450 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
451 self.assertEqual(self.credentials.get_username(),
452 msg["Authentication"]["clientAccount"])
453 self.assertEqual(self.credentials.get_domain(),
454 msg["Authentication"]["clientDomain"])
456 logon_id = msg["Authentication"]["logonId"]
458 # Now check the Domain server authentication message
460 self.check_domain_server_authentication(pipe, logon_id, "network")