ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / auth_log.py
blob0307ed0d86dbbc3a1ec971106a3b55caf6d32344
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Tests for the Auth and AuthZ logging.
20 """
22 import sys
24 sys.path.insert(0, 'bin/python')
26 import samba.tests
27 from samba.dcerpc import srvsvc, dnsserver
28 import os
29 from samba.samba3 import libsmb_samba_internal as libsmb
30 from samba.samba3 import param as s3param
31 from samba.samdb import SamDB
32 import samba.tests.auth_log_base
33 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
34 from samba import NTSTATUSError
35 from subprocess import call
36 from ldb import LdbError
37 from samba.dcerpc.windows_event_ids import (
38 EVT_ID_SUCCESSFUL_LOGON,
39 EVT_ID_UNSUCCESSFUL_LOGON,
40 EVT_LOGON_NETWORK,
41 EVT_LOGON_INTERACTIVE,
42 EVT_LOGON_NETWORK_CLEAR_TEXT
44 import re
47 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
49 def setUp(self):
50 super().setUp()
51 self.remoteAddress = os.environ["CLIENT_IP"]
53 def smb_connection(self, creds, use_spnego="yes", ntlmv2_auth="yes",
54 force_smb1=False):
55 # the SMB bindings rely on having a s3 loadparm
56 lp = self.get_loadparm()
57 s3_lp = s3param.get_context()
58 s3_lp.load(lp.configfile)
60 # Allow the testcase to skip SPNEGO or use NTLMv1
61 s3_lp.set("client use spnego", use_spnego)
62 s3_lp.set("client ntlmv2 auth", ntlmv2_auth)
64 return libsmb.Conn(self.server, "sysvol", lp=s3_lp, creds=creds,
65 force_smb1=force_smb1)
67 def _test_rpc_ncacn_np(self, authTypes, creds, service,
68 binding, protection, checkFunction):
69 def isLastExpectedMessage(msg):
70 return (msg["type"] == "Authorization" and
71 (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
72 msg["Authorization"]["serviceDescription"] == service) and
73 msg["Authorization"]["authType"] == authTypes[0] and
74 msg["Authorization"]["transportProtection"] == protection)
76 if binding:
77 binding = "[%s]" % binding
79 if service == "dnsserver":
80 x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
81 self.get_loadparm(),
82 creds)
83 elif service == "srvsvc":
84 x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
85 self.get_loadparm(),
86 creds)
88 # The connection is passed to ensure the server
89 # messaging context stays up until all the messages have been received.
90 messages = self.waitForMessages(isLastExpectedMessage, x)
91 checkFunction(messages, authTypes, service, binding, protection)
93 def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
94 # Turn "[foo,bar]" into a list ("foo", "bar") to test
95 # lambda x: x removes anything that evaluates to False,
96 # including empty strings, so we handle "" as well
97 binding_list = \
98 list(filter(lambda x: x, re.compile(r'[\[,\]]').split(binding)))
100 # Handle explicit smb2, smb1 or auto negotiation
101 if "smb2" in binding_list:
102 self.assertEqual(serviceDescription, "SMB2")
103 elif "smb1" in binding_list:
104 self.assertEqual(serviceDescription, "SMB")
105 else:
106 self.assertIn(serviceDescription, ["SMB", "SMB2"])
108 def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
109 binding, protection):
111 expected_messages = len(authTypes)
112 self.assertEqual(expected_messages,
113 len(messages),
114 "Did not receive the expected number of messages")
116 # Check the first message it should be an Authentication
117 msg = messages[0]
118 self.assertEqual("Authentication", msg["type"])
119 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
120 self.assertEqual(
121 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
122 self.assertEqual(
123 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
124 self._assert_ncacn_np_serviceDescription(
125 binding, msg["Authentication"]["serviceDescription"])
126 self.assertEqual(authTypes[1],
127 msg["Authentication"]["authDescription"])
129 # Check the second message it should be an Authorization
130 msg = messages[1]
131 self.assertEqual("Authorization", msg["type"])
132 self._assert_ncacn_np_serviceDescription(
133 binding, msg["Authorization"]["serviceDescription"])
134 self.assertEqual(authTypes[2], msg["Authorization"]["authType"])
135 self.assertEqual("SMB", msg["Authorization"]["transportProtection"])
136 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
138 # Check the third message it should be an Authentication
139 # if we are expecting 4 messages
140 if expected_messages == 4:
141 def checkServiceDescription(desc):
142 return (desc == "DCE/RPC" or desc == service)
144 msg = messages[2]
145 self.assertEqual("Authentication", msg["type"])
146 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
147 self.assertTrue(
148 checkServiceDescription(
149 msg["Authentication"]["serviceDescription"]))
151 self.assertEqual(authTypes[3],
152 msg["Authentication"]["authDescription"])
153 self.assertEqual(
154 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
155 self.assertEqual(
156 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
158 def rpc_ncacn_np_krb5_check(
159 self,
160 messages,
161 authTypes,
162 service,
163 binding,
164 protection):
166 expected_messages = len(authTypes)
167 self.assertEqual(expected_messages,
168 len(messages),
169 "Did not receive the expected number of messages")
171 # Check the first message it should be an Authentication
172 # This is almost certainly Authentication over UDP, and is probably
173 # returning message too big,
174 msg = messages[0]
175 self.assertEqual("Authentication", msg["type"])
176 self.assertEqual("NT_STATUS_PROTOCOL_UNREACHABLE", # RESPONSE_TOO_BIG
177 msg["Authentication"]["status"])
178 self.assertEqual("Kerberos KDC",
179 msg["Authentication"]["serviceDescription"])
180 self.assertEqual(authTypes[1],
181 msg["Authentication"]["authDescription"])
182 self.assertEqual(
183 EVT_ID_UNSUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
184 self.assertEqual(
185 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
187 # Check the second message it should be an Authentication
188 # This this the TCP Authentication in response to the message too big
189 # response to the UDP Authentication
190 msg = messages[1]
191 self.assertEqual("Authentication", msg["type"])
192 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
193 self.assertEqual("Kerberos KDC",
194 msg["Authentication"]["serviceDescription"])
195 self.assertEqual(authTypes[2],
196 msg["Authentication"]["authDescription"])
197 self.assertEqual(
198 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
199 self.assertEqual(
200 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
202 # Check the third message it should be an Authorization
203 msg = messages[2]
204 self.assertEqual("Authorization", msg["type"])
205 self._assert_ncacn_np_serviceDescription(
206 binding, msg["Authorization"]["serviceDescription"])
207 self.assertEqual(authTypes[3], msg["Authorization"]["authType"])
208 self.assertEqual("SMB", msg["Authorization"]["transportProtection"])
209 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
211 def test_rpc_ncacn_np_ntlm_dns_sign(self):
212 creds = self.insta_creds(template=self.get_credentials(),
213 kerberos_state=DONT_USE_KERBEROS)
214 self._test_rpc_ncacn_np(["NTLMSSP",
215 "NTLMSSP",
216 "NTLMSSP",
217 "NTLMSSP"],
218 creds, "dnsserver", "sign", "SIGN",
219 self.rpc_ncacn_np_ntlm_check)
221 def test_rpc_ncacn_np_ntlm_srv_sign(self):
222 creds = self.insta_creds(template=self.get_credentials(),
223 kerberos_state=DONT_USE_KERBEROS)
224 self._test_rpc_ncacn_np(["NTLMSSP",
225 "NTLMSSP",
226 "NTLMSSP",
227 "NTLMSSP"],
228 creds, "srvsvc", "sign", "SIGN",
229 self.rpc_ncacn_np_ntlm_check)
231 def test_rpc_ncacn_np_ntlm_dns(self):
232 creds = self.insta_creds(template=self.get_credentials(),
233 kerberos_state=DONT_USE_KERBEROS)
234 self._test_rpc_ncacn_np(["ncacn_np",
235 "NTLMSSP",
236 "NTLMSSP"],
237 creds, "dnsserver", "", "SMB",
238 self.rpc_ncacn_np_ntlm_check)
240 def test_rpc_ncacn_np_ntlm_srv(self):
241 creds = self.insta_creds(template=self.get_credentials(),
242 kerberos_state=DONT_USE_KERBEROS)
243 self._test_rpc_ncacn_np(["ncacn_np",
244 "NTLMSSP",
245 "NTLMSSP"],
246 creds, "srvsvc", "", "SMB",
247 self.rpc_ncacn_np_ntlm_check)
249 def test_rpc_ncacn_np_krb_dns_sign(self):
250 creds = self.insta_creds(template=self.get_credentials(),
251 kerberos_state=MUST_USE_KERBEROS)
252 self._test_rpc_ncacn_np(["krb5",
253 "ENC-TS Pre-authentication",
254 "ENC-TS Pre-authentication",
255 "krb5"],
256 creds, "dnsserver", "sign", "SIGN",
257 self.rpc_ncacn_np_krb5_check)
259 def test_rpc_ncacn_np_krb_srv_sign(self):
260 creds = self.insta_creds(template=self.get_credentials(),
261 kerberos_state=MUST_USE_KERBEROS)
262 self._test_rpc_ncacn_np(["krb5",
263 "ENC-TS Pre-authentication",
264 "ENC-TS Pre-authentication",
265 "krb5"],
266 creds, "srvsvc", "sign", "SIGN",
267 self.rpc_ncacn_np_krb5_check)
269 def test_rpc_ncacn_np_krb_dns(self):
270 creds = self.insta_creds(template=self.get_credentials(),
271 kerberos_state=MUST_USE_KERBEROS)
272 self._test_rpc_ncacn_np(["ncacn_np",
273 "ENC-TS Pre-authentication",
274 "ENC-TS Pre-authentication",
275 "krb5"],
276 creds, "dnsserver", "", "SMB",
277 self.rpc_ncacn_np_krb5_check)
279 def test_rpc_ncacn_np_krb_dns_smb2(self):
280 creds = self.insta_creds(template=self.get_credentials(),
281 kerberos_state=MUST_USE_KERBEROS)
282 self._test_rpc_ncacn_np(["ncacn_np",
283 "ENC-TS Pre-authentication",
284 "ENC-TS Pre-authentication",
285 "krb5"],
286 creds, "dnsserver", "smb2", "SMB",
287 self.rpc_ncacn_np_krb5_check)
289 def test_rpc_ncacn_np_krb_srv(self):
290 creds = self.insta_creds(template=self.get_credentials(),
291 kerberos_state=MUST_USE_KERBEROS)
292 self._test_rpc_ncacn_np(["ncacn_np",
293 "ENC-TS Pre-authentication",
294 "ENC-TS Pre-authentication",
295 "krb5"],
296 creds, "srvsvc", "", "SMB",
297 self.rpc_ncacn_np_krb5_check)
299 def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
300 binding, protection, checkFunction):
301 def isLastExpectedMessage(msg):
302 return (msg["type"] == "Authorization" and
303 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
304 msg["Authorization"]["authType"] == authTypes[0] and
305 msg["Authorization"]["transportProtection"] == protection)
307 if binding:
308 binding = "[%s]" % binding
310 if service == "dnsserver":
311 conn = dnsserver.dnsserver(
312 "ncacn_ip_tcp:%s%s" % (self.server, binding),
313 self.get_loadparm(),
314 creds)
315 elif service == "srvsvc":
316 conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
317 self.get_loadparm(),
318 creds)
320 messages = self.waitForMessages(isLastExpectedMessage, conn)
321 checkFunction(messages, authTypes, service, binding, protection)
323 def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
324 binding, protection):
326 expected_messages = len(authTypes)
327 self.assertEqual(expected_messages,
328 len(messages),
329 "Did not receive the expected number of messages")
331 # Check the first message it should be an Authorization
332 msg = messages[0]
333 self.assertEqual("Authorization", msg["type"])
334 self.assertEqual("DCE/RPC",
335 msg["Authorization"]["serviceDescription"])
336 self.assertEqual(authTypes[1], msg["Authorization"]["authType"])
337 self.assertEqual("NONE", msg["Authorization"]["transportProtection"])
338 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
340 # Check the second message it should be an Authentication
341 msg = messages[1]
342 self.assertEqual("Authentication", msg["type"])
343 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
344 self.assertEqual("DCE/RPC",
345 msg["Authentication"]["serviceDescription"])
346 self.assertEqual(authTypes[2],
347 msg["Authentication"]["authDescription"])
348 self.assertEqual(
349 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
350 self.assertEqual(
351 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
353 def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
354 binding, protection):
356 expected_messages = len(authTypes)
357 self.assertEqual(expected_messages,
358 len(messages),
359 "Did not receive the expected number of messages")
361 # Check the first message it should be an Authorization
362 msg = messages[0]
363 self.assertEqual("Authorization", msg["type"])
364 self.assertEqual("DCE/RPC",
365 msg["Authorization"]["serviceDescription"])
366 self.assertEqual(authTypes[1], msg["Authorization"]["authType"])
367 self.assertEqual("NONE", msg["Authorization"]["transportProtection"])
368 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
370 # Check the second message it should be an Authentication
371 msg = messages[1]
372 self.assertEqual("Authentication", msg["type"])
373 self.assertEqual("NT_STATUS_PROTOCOL_UNREACHABLE", # RESPONSE_TOO_BIG
374 msg["Authentication"]["status"])
375 self.assertEqual("Kerberos KDC",
376 msg["Authentication"]["serviceDescription"])
377 self.assertEqual(authTypes[2],
378 msg["Authentication"]["authDescription"])
379 self.assertEqual(
380 EVT_ID_UNSUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
381 self.assertEqual(
382 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
384 # Check the third message it should be an Authentication
385 msg = messages[2]
386 self.assertEqual("Authentication", msg["type"])
387 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
388 self.assertEqual("Kerberos KDC",
389 msg["Authentication"]["serviceDescription"])
390 self.assertEqual(authTypes[2],
391 msg["Authentication"]["authDescription"])
392 self.assertEqual(
393 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
394 self.assertEqual(
395 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
397 def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
398 creds = self.insta_creds(template=self.get_credentials(),
399 kerberos_state=DONT_USE_KERBEROS)
400 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
401 "ncacn_ip_tcp",
402 "NTLMSSP"],
403 creds, "dnsserver", "sign", "SIGN",
404 self.rpc_ncacn_ip_tcp_ntlm_check)
406 def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
407 creds = self.insta_creds(template=self.get_credentials(),
408 kerberos_state=MUST_USE_KERBEROS)
409 self._test_rpc_ncacn_ip_tcp(["krb5",
410 "ncacn_ip_tcp",
411 "ENC-TS Pre-authentication",
412 "ENC-TS Pre-authentication"],
413 creds, "dnsserver", "sign", "SIGN",
414 self.rpc_ncacn_ip_tcp_krb5_check)
416 def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
417 creds = self.insta_creds(template=self.get_credentials(),
418 kerberos_state=DONT_USE_KERBEROS)
419 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
420 "ncacn_ip_tcp",
421 "NTLMSSP"],
422 creds, "dnsserver", "", "SIGN",
423 self.rpc_ncacn_ip_tcp_ntlm_check)
425 def test_rpc_ncacn_ip_tcp_krb5_dns(self):
426 creds = self.insta_creds(template=self.get_credentials(),
427 kerberos_state=MUST_USE_KERBEROS)
428 self._test_rpc_ncacn_ip_tcp(["krb5",
429 "ncacn_ip_tcp",
430 "ENC-TS Pre-authentication",
431 "ENC-TS Pre-authentication"],
432 creds, "dnsserver", "", "SIGN",
433 self.rpc_ncacn_ip_tcp_krb5_check)
435 def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
436 creds = self.insta_creds(template=self.get_credentials(),
437 kerberos_state=DONT_USE_KERBEROS)
438 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
439 "ncacn_ip_tcp",
440 "NTLMSSP"],
441 creds, "dnsserver", "connect", "NONE",
442 self.rpc_ncacn_ip_tcp_ntlm_check)
444 def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
445 creds = self.insta_creds(template=self.get_credentials(),
446 kerberos_state=MUST_USE_KERBEROS)
447 self._test_rpc_ncacn_ip_tcp(["krb5",
448 "ncacn_ip_tcp",
449 "ENC-TS Pre-authentication",
450 "ENC-TS Pre-authentication"],
451 creds, "dnsserver", "connect", "NONE",
452 self.rpc_ncacn_ip_tcp_krb5_check)
454 def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
455 creds = self.insta_creds(template=self.get_credentials(),
456 kerberos_state=DONT_USE_KERBEROS)
457 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
458 "ncacn_ip_tcp",
459 "NTLMSSP"],
460 creds, "dnsserver", "seal", "SEAL",
461 self.rpc_ncacn_ip_tcp_ntlm_check)
463 def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
464 creds = self.insta_creds(template=self.get_credentials(),
465 kerberos_state=MUST_USE_KERBEROS)
466 self._test_rpc_ncacn_ip_tcp(["krb5",
467 "ncacn_ip_tcp",
468 "ENC-TS Pre-authentication",
469 "ENC-TS Pre-authentication"],
470 creds, "dnsserver", "seal", "SEAL",
471 self.rpc_ncacn_ip_tcp_krb5_check)
473 def test_ldap(self):
475 def isLastExpectedMessage(msg):
476 return (msg["type"] == "Authorization" and
477 msg["Authorization"]["serviceDescription"] == "LDAP" and
478 msg["Authorization"]["transportProtection"] == "SEAL" and
479 msg["Authorization"]["authType"] == "krb5")
481 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
482 lp=self.get_loadparm(),
483 credentials=self.get_credentials())
485 messages = self.waitForMessages(isLastExpectedMessage)
486 self.assertEqual(3,
487 len(messages),
488 "Did not receive the expected number of messages")
490 # Check the first message it should be an Authentication
491 msg = messages[0]
492 self.assertEqual("Authentication", msg["type"])
493 self.assertEqual("NT_STATUS_PROTOCOL_UNREACHABLE", # RESPONSE_TOO_BIG
494 msg["Authentication"]["status"])
495 self.assertEqual("Kerberos KDC",
496 msg["Authentication"]["serviceDescription"])
497 self.assertEqual("ENC-TS Pre-authentication",
498 msg["Authentication"]["authDescription"])
499 self.assertTrue(msg["Authentication"]["duration"] > 0)
500 self.assertEqual(
501 EVT_ID_UNSUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
502 self.assertEqual(
503 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
505 # Check the second message it should be an Authentication
506 msg = messages[1]
507 self.assertEqual("Authentication", msg["type"])
508 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
509 self.assertEqual("Kerberos KDC",
510 msg["Authentication"]["serviceDescription"])
511 self.assertEqual("ENC-TS Pre-authentication",
512 msg["Authentication"]["authDescription"])
513 self.assertTrue(msg["Authentication"]["duration"] > 0)
514 self.assertEqual(
515 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
516 self.assertEqual(
517 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
519 def test_ldap_ntlm(self):
521 def isLastExpectedMessage(msg):
522 return (msg["type"] == "Authorization" and
523 msg["Authorization"]["serviceDescription"] == "LDAP" and
524 msg["Authorization"]["transportProtection"] == "SEAL" and
525 msg["Authorization"]["authType"] == "NTLMSSP")
527 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
528 lp=self.get_loadparm(),
529 credentials=self.get_credentials())
531 messages = self.waitForMessages(isLastExpectedMessage)
532 self.assertEqual(2,
533 len(messages),
534 "Did not receive the expected number of messages")
535 # Check the first message it should be an Authentication
536 msg = messages[0]
537 self.assertEqual("Authentication", msg["type"])
538 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
539 self.assertEqual("LDAP",
540 msg["Authentication"]["serviceDescription"])
541 self.assertEqual("NTLMSSP", msg["Authentication"]["authDescription"])
542 self.assertTrue(msg["Authentication"]["duration"] > 0)
543 self.assertEqual(
544 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
545 self.assertEqual(
546 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
548 def test_ldap_simple_bind(self):
549 def isLastExpectedMessage(msg):
550 return (msg["type"] == "Authorization" and
551 msg["Authorization"]["serviceDescription"] == "LDAP" and
552 msg["Authorization"]["transportProtection"] == "TLS" and
553 msg["Authorization"]["authType"] == "simple bind")
555 creds = self.insta_creds(template=self.get_credentials())
556 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
557 creds.get_username()))
559 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
560 lp=self.get_loadparm(),
561 credentials=creds)
563 messages = self.waitForMessages(isLastExpectedMessage)
564 self.assertEqual(2,
565 len(messages),
566 "Did not receive the expected number of messages")
568 # Check the first message it should be an Authentication
569 msg = messages[0]
570 self.assertEqual("Authentication", msg["type"])
571 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
572 self.assertEqual("LDAP",
573 msg["Authentication"]["serviceDescription"])
574 self.assertEqual("simple bind/TLS",
575 msg["Authentication"]["authDescription"])
576 self.assertEqual(
577 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
578 self.assertEqual(
579 EVT_LOGON_NETWORK_CLEAR_TEXT, msg["Authentication"]["logonType"])
581 def test_ldap_simple_bind_bad_password(self):
582 def isLastExpectedMessage(msg):
583 return (msg["type"] == "Authentication" and
584 msg["Authentication"]["serviceDescription"] == "LDAP" and
585 (msg["Authentication"]["status"] ==
586 "NT_STATUS_WRONG_PASSWORD") and
587 (msg["Authentication"]["authDescription"] ==
588 "simple bind/TLS") and
589 (msg["Authentication"]["eventId"] ==
590 EVT_ID_UNSUCCESSFUL_LOGON) and
591 (msg["Authentication"]["logonType"] ==
592 EVT_LOGON_NETWORK_CLEAR_TEXT))
594 creds = self.insta_creds(template=self.get_credentials())
595 creds.set_password("badPassword")
596 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
597 creds.get_username()))
599 thrown = False
600 try:
601 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
602 lp=self.get_loadparm(),
603 credentials=creds)
604 except LdbError:
605 thrown = True
606 self.assertEqual(thrown, True)
608 messages = self.waitForMessages(isLastExpectedMessage)
609 self.assertEqual(1,
610 len(messages),
611 "Did not receive the expected number of messages")
613 def test_ldap_simple_bind_bad_user(self):
614 def isLastExpectedMessage(msg):
615 return (msg["type"] == "Authentication" and
616 msg["Authentication"]["serviceDescription"] == "LDAP" and
617 (msg["Authentication"]["status"] ==
618 "NT_STATUS_NO_SUCH_USER") and
619 (msg["Authentication"]["authDescription"] ==
620 "simple bind/TLS") and
621 (msg["Authentication"]["eventId"] ==
622 EVT_ID_UNSUCCESSFUL_LOGON) and
623 (msg["Authentication"]["logonType"] ==
624 EVT_LOGON_NETWORK_CLEAR_TEXT))
626 creds = self.insta_creds(template=self.get_credentials())
627 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
629 thrown = False
630 try:
631 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
632 lp=self.get_loadparm(),
633 credentials=creds)
634 except LdbError:
635 thrown = True
636 self.assertEqual(thrown, True)
638 messages = self.waitForMessages(isLastExpectedMessage)
639 self.assertEqual(1,
640 len(messages),
641 "Did not receive the expected number of messages")
643 def test_ldap_simple_bind_unparseable_user(self):
644 def isLastExpectedMessage(msg):
645 return (msg["type"] == "Authentication" and
646 msg["Authentication"]["serviceDescription"] == "LDAP" and
647 (msg["Authentication"]["status"] ==
648 "NT_STATUS_NO_SUCH_USER") and
649 (msg["Authentication"]["authDescription"] ==
650 "simple bind/TLS") and
651 (msg["Authentication"]["eventId"] ==
652 EVT_ID_UNSUCCESSFUL_LOGON) and
653 (msg["Authentication"]["logonType"] ==
654 EVT_LOGON_NETWORK_CLEAR_TEXT))
656 creds = self.insta_creds(template=self.get_credentials())
657 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
659 thrown = False
660 try:
661 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
662 lp=self.get_loadparm(),
663 credentials=creds)
664 except LdbError:
665 thrown = True
666 self.assertEqual(thrown, True)
668 messages = self.waitForMessages(isLastExpectedMessage)
669 self.assertEqual(1,
670 len(messages),
671 "Did not receive the expected number of messages")
674 # Note: as this test does not expect any messages it will
675 # time out in the call to self.waitForMessages.
676 # This is expected, but it will slow this test.
677 def test_ldap_anonymous_access_bind_only(self):
678 # Should be no logging for anonymous bind
679 # so receiving any message indicates a failure.
680 def isLastExpectedMessage(msg):
681 return True
683 creds = self.insta_creds(template=self.get_credentials())
684 creds.set_anonymous()
686 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
687 lp=self.get_loadparm(),
688 credentials=creds)
690 messages = self.waitForMessages(isLastExpectedMessage)
691 self.assertEqual(0,
692 len(messages),
693 "Did not receive the expected number of messages")
695 def test_ldap_anonymous_access(self):
696 def isLastExpectedMessage(msg):
697 return (msg["type"] == "Authorization" and
698 msg["Authorization"]["serviceDescription"] == "LDAP" and
699 msg["Authorization"]["transportProtection"] == "TLS" and
700 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
701 msg["Authorization"]["authType"] == "no bind")
703 creds = self.insta_creds(template=self.get_credentials())
704 creds.set_anonymous()
706 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
707 lp=self.get_loadparm(),
708 credentials=creds)
710 try:
711 self.samdb.search(base=self.samdb.domain_dn())
712 self.fail("Expected an LdbError exception")
713 except LdbError:
714 pass
716 messages = self.waitForMessages(isLastExpectedMessage)
717 self.assertEqual(1,
718 len(messages),
719 "Did not receive the expected number of messages")
721 def test_smb(self):
722 def isLastExpectedMessage(msg):
723 return (msg["type"] == "Authorization" and
724 "SMB" in msg["Authorization"]["serviceDescription"] and
725 msg["Authorization"]["authType"] == "krb5" and
726 msg["Authorization"]["transportProtection"] == "SMB")
728 creds = self.insta_creds(template=self.get_credentials())
729 self.smb_connection(creds)
731 messages = self.waitForMessages(isLastExpectedMessage)
732 self.assertEqual(3,
733 len(messages),
734 "Did not receive the expected number of messages")
735 # Check the first message it should be an Authentication
736 msg = messages[0]
737 self.assertEqual("Authentication", msg["type"])
738 self.assertEqual("NT_STATUS_PROTOCOL_UNREACHABLE", # RESPONSE_TOO_BIG
739 msg["Authentication"]["status"])
740 self.assertEqual("Kerberos KDC",
741 msg["Authentication"]["serviceDescription"])
742 self.assertEqual("ENC-TS Pre-authentication",
743 msg["Authentication"]["authDescription"])
744 self.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON,
745 msg["Authentication"]["eventId"])
746 self.assertEqual(EVT_LOGON_NETWORK,
747 msg["Authentication"]["logonType"])
749 # Check the second message it should be an Authentication
750 msg = messages[1]
751 self.assertEqual("Authentication", msg["type"])
752 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
753 self.assertEqual("Kerberos KDC",
754 msg["Authentication"]["serviceDescription"])
755 self.assertEqual("ENC-TS Pre-authentication",
756 msg["Authentication"]["authDescription"])
757 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
758 msg["Authentication"]["eventId"])
759 self.assertEqual(EVT_LOGON_NETWORK,
760 msg["Authentication"]["logonType"])
762 def test_smb_bad_password(self):
763 def isLastExpectedMessage(msg):
764 return (msg["type"] == "Authentication" and
765 (msg["Authentication"]["serviceDescription"] ==
766 "Kerberos KDC") and
767 (msg["Authentication"]["status"] ==
768 "NT_STATUS_WRONG_PASSWORD") and
769 (msg["Authentication"]["authDescription"] ==
770 "ENC-TS Pre-authentication"))
772 creds = self.insta_creds(template=self.get_credentials())
773 creds.set_kerberos_state(MUST_USE_KERBEROS)
774 creds.set_password("badPassword")
776 thrown = False
777 try:
778 self.smb_connection(creds)
779 except NTSTATUSError:
780 thrown = True
781 self.assertEqual(thrown, True)
783 messages = self.waitForMessages(isLastExpectedMessage)
784 self.assertEqual(1,
785 len(messages),
786 "Did not receive the expected number of messages")
788 def test_smb_bad_user(self):
789 def isLastExpectedMessage(msg):
790 return (msg["type"] == "Authentication" and
791 (msg["Authentication"]["serviceDescription"] ==
792 "Kerberos KDC") and
793 (msg["Authentication"]["status"] ==
794 "NT_STATUS_NO_SUCH_USER") and
795 (msg["Authentication"]["authDescription"] ==
796 "AS-REQ") and
797 (msg["Authentication"]["eventId"] ==
798 EVT_ID_UNSUCCESSFUL_LOGON) and
799 (msg["Authentication"]["logonType"] ==
800 EVT_LOGON_NETWORK))
802 creds = self.insta_creds(template=self.get_credentials())
803 creds.set_kerberos_state(MUST_USE_KERBEROS)
804 creds.set_username("badUser")
806 thrown = False
807 try:
808 self.smb_connection(creds)
809 except NTSTATUSError:
810 thrown = True
811 self.assertEqual(thrown, True)
813 messages = self.waitForMessages(isLastExpectedMessage)
814 self.assertEqual(1,
815 len(messages),
816 "Did not receive the expected number of messages")
818 def test_smb1_anonymous(self):
819 def isLastExpectedMessage(msg):
820 return (msg["type"] == "Authorization" and
821 msg["Authorization"]["serviceDescription"] == "SMB" and
822 msg["Authorization"]["authType"] == "NTLMSSP" and
823 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
824 msg["Authorization"]["transportProtection"] == "SMB")
826 server = os.environ["SERVER"]
828 path = "//%s/IPC$" % server
829 auth = "-N"
830 call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
832 messages = self.waitForMessages(isLastExpectedMessage)
833 self.assertEqual(3,
834 len(messages),
835 "Did not receive the expected number of messages")
837 # Check the first message it should be an Authentication
838 msg = messages[0]
839 self.assertEqual("Authentication", msg["type"])
840 self.assertEqual("NT_STATUS_NO_SUCH_USER",
841 msg["Authentication"]["status"])
842 self.assertEqual("SMB",
843 msg["Authentication"]["serviceDescription"])
844 self.assertEqual("NTLMSSP",
845 msg["Authentication"]["authDescription"])
846 self.assertEqual("No-Password",
847 msg["Authentication"]["passwordType"])
848 self.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON,
849 msg["Authentication"]["eventId"])
850 self.assertEqual(EVT_LOGON_NETWORK,
851 msg["Authentication"]["logonType"])
853 # Check the second message it should be an Authentication
854 msg = messages[1]
855 self.assertEqual("Authentication", msg["type"])
856 self.assertEqual("NT_STATUS_OK",
857 msg["Authentication"]["status"])
858 self.assertEqual("SMB",
859 msg["Authentication"]["serviceDescription"])
860 self.assertEqual("NTLMSSP",
861 msg["Authentication"]["authDescription"])
862 self.assertEqual("No-Password",
863 msg["Authentication"]["passwordType"])
864 self.assertEqual("ANONYMOUS LOGON",
865 msg["Authentication"]["becameAccount"])
866 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
867 msg["Authentication"]["eventId"])
868 self.assertEqual(EVT_LOGON_NETWORK,
869 msg["Authentication"]["logonType"])
871 def test_smb2_anonymous(self):
872 def isLastExpectedMessage(msg):
873 return (msg["type"] == "Authorization" and
874 msg["Authorization"]["serviceDescription"] == "SMB2" and
875 msg["Authorization"]["authType"] == "NTLMSSP" and
876 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
877 msg["Authorization"]["transportProtection"] == "SMB")
879 server = os.environ["SERVER"]
881 path = "//%s/IPC$" % server
882 auth = "-N"
883 call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
885 messages = self.waitForMessages(isLastExpectedMessage)
886 self.assertEqual(3,
887 len(messages),
888 "Did not receive the expected number of messages")
890 # Check the first message it should be an Authentication
891 msg = messages[0]
892 self.assertEqual("Authentication", msg["type"])
893 self.assertEqual("NT_STATUS_NO_SUCH_USER",
894 msg["Authentication"]["status"])
895 self.assertEqual("SMB2",
896 msg["Authentication"]["serviceDescription"])
897 self.assertEqual("NTLMSSP",
898 msg["Authentication"]["authDescription"])
899 self.assertEqual("No-Password",
900 msg["Authentication"]["passwordType"])
901 self.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON,
902 msg["Authentication"]["eventId"])
903 self.assertEqual(EVT_LOGON_NETWORK,
904 msg["Authentication"]["logonType"])
906 # Check the second message it should be an Authentication
907 msg = messages[1]
908 self.assertEqual("Authentication", msg["type"])
909 self.assertEqual("NT_STATUS_OK",
910 msg["Authentication"]["status"])
911 self.assertEqual("SMB2",
912 msg["Authentication"]["serviceDescription"])
913 self.assertEqual("NTLMSSP",
914 msg["Authentication"]["authDescription"])
915 self.assertEqual("No-Password",
916 msg["Authentication"]["passwordType"])
917 self.assertEqual("ANONYMOUS LOGON",
918 msg["Authentication"]["becameAccount"])
919 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
920 msg["Authentication"]["eventId"])
921 self.assertEqual(EVT_LOGON_NETWORK,
922 msg["Authentication"]["logonType"])
924 def test_smb_no_krb_spnego(self):
925 def isLastExpectedMessage(msg):
926 return (msg["type"] == "Authorization" and
927 "SMB" in msg["Authorization"]["serviceDescription"] and
928 msg["Authorization"]["authType"] == "NTLMSSP" and
929 msg["Authorization"]["transportProtection"] == "SMB")
931 creds = self.insta_creds(template=self.get_credentials(),
932 kerberos_state=DONT_USE_KERBEROS)
933 self.smb_connection(creds)
935 messages = self.waitForMessages(isLastExpectedMessage)
936 self.assertEqual(2,
937 len(messages),
938 "Did not receive the expected number of messages")
939 # Check the first message it should be an Authentication
940 msg = messages[0]
941 self.assertEqual("Authentication", msg["type"])
942 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
943 self.assertIn(msg["Authentication"]["serviceDescription"],
944 ["SMB", "SMB2"])
945 self.assertEqual("NTLMSSP",
946 msg["Authentication"]["authDescription"])
947 self.assertEqual("NTLMv2",
948 msg["Authentication"]["passwordType"])
949 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
950 msg["Authentication"]["eventId"])
951 self.assertEqual(EVT_LOGON_NETWORK,
952 msg["Authentication"]["logonType"])
954 def test_smb_no_krb_spnego_bad_password(self):
955 def isLastExpectedMessage(msg):
956 return (msg["type"] == "Authentication" and
957 "SMB" in msg["Authentication"]["serviceDescription"] and
958 msg["Authentication"]["authDescription"] == "NTLMSSP" and
959 msg["Authentication"]["passwordType"] == "NTLMv2" and
960 (msg["Authentication"]["status"] ==
961 "NT_STATUS_WRONG_PASSWORD") and
962 (msg["Authentication"]["eventId"] ==
963 EVT_ID_UNSUCCESSFUL_LOGON) and
964 (msg["Authentication"]["logonType"] ==
965 EVT_LOGON_NETWORK))
967 creds = self.insta_creds(template=self.get_credentials(),
968 kerberos_state=DONT_USE_KERBEROS)
969 creds.set_password("badPassword")
971 thrown = False
972 try:
973 self.smb_connection(creds)
974 except NTSTATUSError:
975 thrown = True
976 self.assertEqual(thrown, True)
978 messages = self.waitForMessages(isLastExpectedMessage)
979 self.assertEqual(1,
980 len(messages),
981 "Did not receive the expected number of messages")
983 def test_smb_no_krb_spnego_bad_user(self):
984 def isLastExpectedMessage(msg):
985 return (msg["type"] == "Authentication" and
986 "SMB" in msg["Authentication"]["serviceDescription"] and
987 msg["Authentication"]["authDescription"] == "NTLMSSP" and
988 msg["Authentication"]["passwordType"] == "NTLMv2" and
989 (msg["Authentication"]["status"] ==
990 "NT_STATUS_NO_SUCH_USER") and
991 (msg["Authentication"]["eventId"] ==
992 EVT_ID_UNSUCCESSFUL_LOGON) and
993 (msg["Authentication"]["logonType"] ==
994 EVT_LOGON_NETWORK))
996 creds = self.insta_creds(template=self.get_credentials(),
997 kerberos_state=DONT_USE_KERBEROS)
998 creds.set_username("badUser")
1000 thrown = False
1001 try:
1002 self.smb_connection(creds)
1003 except NTSTATUSError:
1004 thrown = True
1005 self.assertEqual(thrown, True)
1007 messages = self.waitForMessages(isLastExpectedMessage)
1008 self.assertEqual(1,
1009 len(messages),
1010 "Did not receive the expected number of messages")
1012 def test_smb_no_krb_no_spnego_no_ntlmv2(self):
1013 def isLastExpectedMessage(msg):
1014 return (msg["type"] == "Authorization" and
1015 msg["Authorization"]["serviceDescription"] == "SMB" and
1016 msg["Authorization"]["authType"] == "bare-NTLM" and
1017 msg["Authorization"]["transportProtection"] == "SMB")
1019 creds = self.insta_creds(template=self.get_credentials(),
1020 kerberos_state=DONT_USE_KERBEROS)
1021 self.smb_connection(creds,
1022 force_smb1=True,
1023 ntlmv2_auth="no",
1024 use_spnego="no")
1026 messages = self.waitForMessages(isLastExpectedMessage)
1027 self.assertEqual(2,
1028 len(messages),
1029 "Did not receive the expected number of messages")
1030 # Check the first message it should be an Authentication
1031 msg = messages[0]
1032 self.assertEqual("Authentication", msg["type"])
1033 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
1034 self.assertEqual("SMB",
1035 msg["Authentication"]["serviceDescription"])
1036 self.assertEqual("bare-NTLM",
1037 msg["Authentication"]["authDescription"])
1038 self.assertEqual("NTLMv1",
1039 msg["Authentication"]["passwordType"])
1040 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
1041 msg["Authentication"]["eventId"])
1042 self.assertEqual(EVT_LOGON_NETWORK,
1043 msg["Authentication"]["logonType"])
1045 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
1046 def isLastExpectedMessage(msg):
1047 return (msg["type"] == "Authentication" and
1048 msg["Authentication"]["serviceDescription"] == "SMB" and
1049 msg["Authentication"]["authDescription"] == "bare-NTLM" and
1050 msg["Authentication"]["passwordType"] == "NTLMv1" and
1051 (msg["Authentication"]["status"] ==
1052 "NT_STATUS_WRONG_PASSWORD") and
1053 (msg["Authentication"]["eventId"] ==
1054 EVT_ID_UNSUCCESSFUL_LOGON) and
1055 (msg["Authentication"]["logonType"] ==
1056 EVT_LOGON_NETWORK))
1058 creds = self.insta_creds(template=self.get_credentials(),
1059 kerberos_state=DONT_USE_KERBEROS)
1060 creds.set_password("badPassword")
1062 thrown = False
1063 try:
1064 self.smb_connection(creds,
1065 force_smb1=True,
1066 ntlmv2_auth="no",
1067 use_spnego="no")
1068 except NTSTATUSError:
1069 thrown = True
1070 self.assertEqual(thrown, True)
1072 messages = self.waitForMessages(isLastExpectedMessage)
1073 self.assertEqual(1,
1074 len(messages),
1075 "Did not receive the expected number of messages")
1077 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
1078 def isLastExpectedMessage(msg):
1079 return (msg["type"] == "Authentication" and
1080 msg["Authentication"]["serviceDescription"] == "SMB" and
1081 msg["Authentication"]["authDescription"] == "bare-NTLM" and
1082 msg["Authentication"]["passwordType"] == "NTLMv1" and
1083 (msg["Authentication"]["status"] ==
1084 "NT_STATUS_NO_SUCH_USER") and
1085 (msg["Authentication"]["eventId"] ==
1086 EVT_ID_UNSUCCESSFUL_LOGON) and
1087 (msg["Authentication"]["logonType"] ==
1088 EVT_LOGON_NETWORK))
1090 creds = self.insta_creds(template=self.get_credentials(),
1091 kerberos_state=DONT_USE_KERBEROS)
1092 creds.set_username("badUser")
1094 thrown = False
1095 try:
1096 self.smb_connection(creds,
1097 force_smb1=True,
1098 ntlmv2_auth="no",
1099 use_spnego="no")
1100 except NTSTATUSError:
1101 thrown = True
1102 self.assertEqual(thrown, True)
1104 messages = self.waitForMessages(isLastExpectedMessage)
1105 self.assertEqual(1,
1106 len(messages),
1107 "Did not receive the expected number of messages")
1109 def test_samlogon_interactive(self):
1111 workstation = "AuthLogTests"
1113 def isLastExpectedMessage(msg):
1114 return (msg["type"] == "Authentication" and
1115 (msg["Authentication"]["serviceDescription"] ==
1116 "SamLogon") and
1117 (msg["Authentication"]["authDescription"] ==
1118 "interactive") and
1119 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1120 (msg["Authentication"]["workstation"] ==
1121 r"\\%s" % workstation) and
1122 (msg["Authentication"]["eventId"] ==
1123 EVT_ID_SUCCESSFUL_LOGON) and
1124 (msg["Authentication"]["logonType"] ==
1125 EVT_LOGON_INTERACTIVE))
1127 server = os.environ["SERVER"]
1128 user = os.environ["USERNAME"]
1129 password = os.environ["PASSWORD"]
1130 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1132 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1134 messages = self.waitForMessages(isLastExpectedMessage)
1135 messages = self.remove_netlogon_messages(messages)
1136 received = len(messages)
1137 self.assertIs(True,
1138 (received == 4 or received == 5),
1139 "Did not receive the expected number of messages")
1141 def test_samlogon_interactive_bad_password(self):
1143 workstation = "AuthLogTests"
1145 def isLastExpectedMessage(msg):
1146 return (msg["type"] == "Authentication" and
1147 (msg["Authentication"]["serviceDescription"] ==
1148 "SamLogon") and
1149 (msg["Authentication"]["authDescription"] ==
1150 "interactive") and
1151 (msg["Authentication"]["status"] ==
1152 "NT_STATUS_WRONG_PASSWORD") and
1153 (msg["Authentication"]["workstation"] ==
1154 r"\\%s" % workstation) and
1155 (msg["Authentication"]["eventId"] ==
1156 EVT_ID_UNSUCCESSFUL_LOGON) and
1157 (msg["Authentication"]["logonType"] ==
1158 EVT_LOGON_INTERACTIVE))
1160 server = os.environ["SERVER"]
1161 user = os.environ["USERNAME"]
1162 password = "badPassword"
1163 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1165 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1167 messages = self.waitForMessages(isLastExpectedMessage)
1168 messages = self.remove_netlogon_messages(messages)
1169 received = len(messages)
1170 self.assertIs(True,
1171 (received == 4 or received == 5),
1172 "Did not receive the expected number of messages")
1174 def test_samlogon_interactive_bad_user(self):
1176 workstation = "AuthLogTests"
1178 def isLastExpectedMessage(msg):
1179 return (msg["type"] == "Authentication" and
1180 (msg["Authentication"]["serviceDescription"] ==
1181 "SamLogon") and
1182 (msg["Authentication"]["authDescription"] ==
1183 "interactive") and
1184 (msg["Authentication"]["status"] ==
1185 "NT_STATUS_NO_SUCH_USER") and
1186 (msg["Authentication"]["workstation"] ==
1187 r"\\%s" % workstation) and
1188 (msg["Authentication"]["eventId"] ==
1189 EVT_ID_UNSUCCESSFUL_LOGON) and
1190 (msg["Authentication"]["logonType"] ==
1191 EVT_LOGON_INTERACTIVE))
1193 server = os.environ["SERVER"]
1194 user = "badUser"
1195 password = os.environ["PASSWORD"]
1196 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1198 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1200 messages = self.waitForMessages(isLastExpectedMessage)
1201 messages = self.remove_netlogon_messages(messages)
1202 received = len(messages)
1203 self.assertIs(True,
1204 (received == 4 or received == 5),
1205 "Did not receive the expected number of messages")
1207 def test_samlogon_network(self):
1209 workstation = "AuthLogTests"
1211 def isLastExpectedMessage(msg):
1212 return (msg["type"] == "Authentication" and
1213 (msg["Authentication"]["serviceDescription"] ==
1214 "SamLogon") and
1215 msg["Authentication"]["authDescription"] == "network" and
1216 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1217 (msg["Authentication"]["workstation"] ==
1218 r"\\%s" % workstation) and
1219 (msg["Authentication"]["eventId"] ==
1220 EVT_ID_SUCCESSFUL_LOGON) and
1221 (msg["Authentication"]["logonType"] ==
1222 EVT_LOGON_NETWORK))
1224 server = os.environ["SERVER"]
1225 user = os.environ["USERNAME"]
1226 password = os.environ["PASSWORD"]
1227 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1229 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1231 messages = self.waitForMessages(isLastExpectedMessage)
1232 messages = self.remove_netlogon_messages(messages)
1233 received = len(messages)
1234 self.assertIs(True,
1235 (received == 4 or received == 5),
1236 "Did not receive the expected number of messages")
1238 def test_samlogon_network_bad_password(self):
1240 workstation = "AuthLogTests"
1242 def isLastExpectedMessage(msg):
1243 return (msg["type"] == "Authentication" and
1244 (msg["Authentication"]["serviceDescription"] ==
1245 "SamLogon") and
1246 msg["Authentication"]["authDescription"] == "network" and
1247 (msg["Authentication"]["status"] ==
1248 "NT_STATUS_WRONG_PASSWORD") and
1249 (msg["Authentication"]["workstation"] ==
1250 r"\\%s" % workstation) and
1251 (msg["Authentication"]["eventId"] ==
1252 EVT_ID_UNSUCCESSFUL_LOGON) and
1253 (msg["Authentication"]["logonType"] ==
1254 EVT_LOGON_NETWORK))
1256 server = os.environ["SERVER"]
1257 user = os.environ["USERNAME"]
1258 password = "badPassword"
1259 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1261 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1263 messages = self.waitForMessages(isLastExpectedMessage)
1264 messages = self.remove_netlogon_messages(messages)
1265 received = len(messages)
1266 self.assertIs(True,
1267 (received == 4 or received == 5),
1268 "Did not receive the expected number of messages")
1270 def test_samlogon_network_bad_user(self):
1272 workstation = "AuthLogTests"
1274 def isLastExpectedMessage(msg):
1275 return ((msg["type"] == "Authentication") and
1276 (msg["Authentication"]["serviceDescription"] ==
1277 "SamLogon") and
1278 (msg["Authentication"]["authDescription"] == "network") and
1279 (msg["Authentication"]["status"] ==
1280 "NT_STATUS_NO_SUCH_USER") and
1281 (msg["Authentication"]["workstation"] ==
1282 r"\\%s" % workstation) and
1283 (msg["Authentication"]["eventId"] ==
1284 EVT_ID_UNSUCCESSFUL_LOGON) and
1285 (msg["Authentication"]["logonType"] ==
1286 EVT_LOGON_NETWORK))
1288 server = os.environ["SERVER"]
1289 user = "badUser"
1290 password = os.environ["PASSWORD"]
1291 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1293 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1295 messages = self.waitForMessages(isLastExpectedMessage)
1296 messages = self.remove_netlogon_messages(messages)
1297 received = len(messages)
1298 self.assertIs(True,
1299 (received == 4 or received == 5),
1300 "Did not receive the expected number of messages")
1302 def test_samlogon_network_mschap(self):
1304 workstation = "AuthLogTests"
1306 def isLastExpectedMessage(msg):
1307 return ((msg["type"] == "Authentication") and
1308 (msg["Authentication"]["serviceDescription"] ==
1309 "SamLogon") and
1310 (msg["Authentication"]["authDescription"] == "network") and
1311 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1312 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1313 (msg["Authentication"]["workstation"] ==
1314 r"\\%s" % workstation) and
1315 (msg["Authentication"]["eventId"] ==
1316 EVT_ID_SUCCESSFUL_LOGON) and
1317 (msg["Authentication"]["logonType"] ==
1318 EVT_LOGON_NETWORK))
1320 server = os.environ["SERVER"]
1321 user = os.environ["USERNAME"]
1322 password = os.environ["PASSWORD"]
1323 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1324 user, password, workstation, 2)
1326 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1328 messages = self.waitForMessages(isLastExpectedMessage)
1329 messages = self.remove_netlogon_messages(messages)
1330 received = len(messages)
1331 self.assertIs(True,
1332 (received == 4 or received == 5),
1333 "Did not receive the expected number of messages")
1335 def test_samlogon_network_mschap_bad_password(self):
1337 workstation = "AuthLogTests"
1339 def isLastExpectedMessage(msg):
1340 return ((msg["type"] == "Authentication") and
1341 (msg["Authentication"]["serviceDescription"] ==
1342 "SamLogon") and
1343 (msg["Authentication"]["authDescription"] == "network") and
1344 (msg["Authentication"]["status"] ==
1345 "NT_STATUS_WRONG_PASSWORD") and
1346 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1347 (msg["Authentication"]["workstation"] ==
1348 r"\\%s" % workstation) and
1349 (msg["Authentication"]["eventId"] ==
1350 EVT_ID_UNSUCCESSFUL_LOGON) and
1351 (msg["Authentication"]["logonType"] ==
1352 EVT_LOGON_NETWORK))
1354 server = os.environ["SERVER"]
1355 user = os.environ["USERNAME"]
1356 password = "badPassword"
1357 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1358 user, password, workstation, 2)
1360 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1362 messages = self.waitForMessages(isLastExpectedMessage)
1363 messages = self.remove_netlogon_messages(messages)
1364 received = len(messages)
1365 self.assertIs(True,
1366 (received == 4 or received == 5),
1367 "Did not receive the expected number of messages")
1369 def test_samlogon_network_mschap_bad_user(self):
1371 workstation = "AuthLogTests"
1373 def isLastExpectedMessage(msg):
1374 return ((msg["type"] == "Authentication") and
1375 (msg["Authentication"]["serviceDescription"] ==
1376 "SamLogon") and
1377 (msg["Authentication"]["authDescription"] == "network") and
1378 (msg["Authentication"]["status"] ==
1379 "NT_STATUS_NO_SUCH_USER") and
1380 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1381 (msg["Authentication"]["workstation"] ==
1382 r"\\%s" % workstation) and
1383 (msg["Authentication"]["eventId"] ==
1384 EVT_ID_UNSUCCESSFUL_LOGON) and
1385 (msg["Authentication"]["logonType"] ==
1386 EVT_LOGON_NETWORK))
1388 server = os.environ["SERVER"]
1389 user = "badUser"
1390 password = os.environ["PASSWORD"]
1391 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1392 user, password, workstation, 2)
1394 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1396 messages = self.waitForMessages(isLastExpectedMessage)
1397 messages = self.remove_netlogon_messages(messages)
1398 received = len(messages)
1399 self.assertIs(True,
1400 (received == 4 or received == 5),
1401 "Did not receive the expected number of messages")
1403 def test_samlogon_schannel_seal(self):
1405 workstation = "AuthLogTests"
1407 def isLastExpectedMessage(msg):
1408 return ((msg["type"] == "Authentication") and
1409 (msg["Authentication"]["serviceDescription"] ==
1410 "SamLogon") and
1411 (msg["Authentication"]["authDescription"] == "network") and
1412 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1413 (msg["Authentication"]["workstation"] ==
1414 r"\\%s" % workstation) and
1415 (msg["Authentication"]["eventId"] ==
1416 EVT_ID_SUCCESSFUL_LOGON) and
1417 (msg["Authentication"]["logonType"] ==
1418 EVT_LOGON_NETWORK))
1420 server = os.environ["SERVER"]
1421 user = os.environ["USERNAME"]
1422 password = os.environ["PASSWORD"]
1423 samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1425 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1427 messages = self.waitForMessages(isLastExpectedMessage)
1428 messages = self.remove_netlogon_messages(messages)
1429 received = len(messages)
1430 self.assertIs(True,
1431 (received == 4 or received == 5),
1432 "Did not receive the expected number of messages")
1434 # Check the second to last message it should be an Authorization
1435 msg = messages[-2]
1436 self.assertEqual("Authorization", msg["type"])
1437 self.assertEqual("DCE/RPC",
1438 msg["Authorization"]["serviceDescription"])
1439 self.assertEqual("schannel", msg["Authorization"]["authType"])
1440 self.assertEqual("SEAL", msg["Authorization"]["transportProtection"])
1441 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1443 # Signed logons get promoted to sealed, this test ensures that
1444 # this behaviour is not removed accidentally
1445 def test_samlogon_schannel_sign(self):
1447 workstation = "AuthLogTests"
1449 def isLastExpectedMessage(msg):
1450 return ((msg["type"] == "Authentication") and
1451 (msg["Authentication"]["serviceDescription"] ==
1452 "SamLogon") and
1453 (msg["Authentication"]["authDescription"] == "network") and
1454 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1455 (msg["Authentication"]["workstation"] ==
1456 r"\\%s" % workstation) and
1457 (msg["Authentication"]["eventId"] ==
1458 EVT_ID_SUCCESSFUL_LOGON) and
1459 (msg["Authentication"]["logonType"] ==
1460 EVT_LOGON_NETWORK))
1462 server = os.environ["SERVER"]
1463 user = os.environ["USERNAME"]
1464 password = os.environ["PASSWORD"]
1465 samlogon = "schannelsign;samlogon %s %s %s" % (
1466 user, password, workstation)
1468 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1470 messages = self.waitForMessages(isLastExpectedMessage)
1471 messages = self.remove_netlogon_messages(messages)
1472 received = len(messages)
1473 self.assertIs(True,
1474 (received == 4 or received == 5),
1475 "Did not receive the expected number of messages")
1477 # Check the second to last message it should be an Authorization
1478 msg = messages[-2]
1479 self.assertEqual("Authorization", msg["type"])
1480 self.assertEqual("DCE/RPC",
1481 msg["Authorization"]["serviceDescription"])
1482 self.assertEqual("schannel", msg["Authorization"]["authType"])
1483 self.assertEqual("SEAL", msg["Authorization"]["transportProtection"])
1484 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1487 if __name__ == '__main__':
1488 import unittest
1489 unittest.main()