2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Tests for the Auth and AuthZ logging.
24 sys
.path
.insert(0, 'bin/python')
27 from samba
.dcerpc
import srvsvc
, dnsserver
29 from samba
.samba3
import libsmb_samba_internal
as libsmb
30 from samba
.samba3
import param
as s3param
31 from samba
.samdb
import SamDB
32 import samba
.tests
.auth_log_base
33 from samba
.credentials
import DONT_USE_KERBEROS
, MUST_USE_KERBEROS
34 from samba
import NTSTATUSError
35 from subprocess
import call
36 from ldb
import LdbError
37 from samba
.dcerpc
.windows_event_ids
import (
38 EVT_ID_SUCCESSFUL_LOGON
,
39 EVT_ID_UNSUCCESSFUL_LOGON
,
41 EVT_LOGON_INTERACTIVE
,
42 EVT_LOGON_NETWORK_CLEAR_TEXT
47 class AuthLogTests(samba
.tests
.auth_log_base
.AuthLogTestBase
):
51 self
.remoteAddress
= os
.environ
["CLIENT_IP"]
53 def smb_connection(self
, creds
, use_spnego
="yes", ntlmv2_auth
="yes",
55 # the SMB bindings rely on having a s3 loadparm
56 lp
= self
.get_loadparm()
57 s3_lp
= s3param
.get_context()
58 s3_lp
.load(lp
.configfile
)
60 # Allow the testcase to skip SPNEGO or use NTLMv1
61 s3_lp
.set("client use spnego", use_spnego
)
62 s3_lp
.set("client ntlmv2 auth", ntlmv2_auth
)
64 return libsmb
.Conn(self
.server
, "sysvol", lp
=s3_lp
, creds
=creds
,
65 force_smb1
=force_smb1
)
67 def _test_rpc_ncacn_np(self
, authTypes
, creds
, service
,
68 binding
, protection
, checkFunction
):
69 def isLastExpectedMessage(msg
):
70 return (msg
["type"] == "Authorization" and
71 (msg
["Authorization"]["serviceDescription"] == "DCE/RPC" or
72 msg
["Authorization"]["serviceDescription"] == service
) and
73 msg
["Authorization"]["authType"] == authTypes
[0] and
74 msg
["Authorization"]["transportProtection"] == protection
)
77 binding
= "[%s]" % binding
79 if service
== "dnsserver":
80 x
= dnsserver
.dnsserver("ncacn_np:%s%s" % (self
.server
, binding
),
83 elif service
== "srvsvc":
84 x
= srvsvc
.srvsvc("ncacn_np:%s%s" % (self
.server
, binding
),
88 # The connection is passed to ensure the server
89 # messaging context stays up until all the messages have been received.
90 messages
= self
.waitForMessages(isLastExpectedMessage
, x
)
91 checkFunction(messages
, authTypes
, service
, binding
, protection
)
93 def _assert_ncacn_np_serviceDescription(self
, binding
, serviceDescription
):
94 # Turn "[foo,bar]" into a list ("foo", "bar") to test
95 # lambda x: x removes anything that evaluates to False,
96 # including empty strings, so we handle "" as well
98 list(filter(lambda x
: x
, re
.compile(r
'[\[,\]]').split(binding
)))
100 # Handle explicit smb2, smb1 or auto negotiation
101 if "smb2" in binding_list
:
102 self
.assertEqual(serviceDescription
, "SMB2")
103 elif "smb1" in binding_list
:
104 self
.assertEqual(serviceDescription
, "SMB")
106 self
.assertIn(serviceDescription
, ["SMB", "SMB2"])
108 def rpc_ncacn_np_ntlm_check(self
, messages
, authTypes
, service
,
109 binding
, protection
):
111 expected_messages
= len(authTypes
)
112 self
.assertEqual(expected_messages
,
114 "Did not receive the expected number of messages")
116 # Check the first message it should be an Authentication
118 self
.assertEqual("Authentication", msg
["type"])
119 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
121 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
123 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
124 self
._assert
_ncacn
_np
_serviceDescription
(
125 binding
, msg
["Authentication"]["serviceDescription"])
126 self
.assertEqual(authTypes
[1],
127 msg
["Authentication"]["authDescription"])
129 # Check the second message it should be an Authorization
131 self
.assertEqual("Authorization", msg
["type"])
132 self
._assert
_ncacn
_np
_serviceDescription
(
133 binding
, msg
["Authorization"]["serviceDescription"])
134 self
.assertEqual(authTypes
[2], msg
["Authorization"]["authType"])
135 self
.assertEqual("SMB", msg
["Authorization"]["transportProtection"])
136 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
138 # Check the third message it should be an Authentication
139 # if we are expecting 4 messages
140 if expected_messages
== 4:
141 def checkServiceDescription(desc
):
142 return (desc
== "DCE/RPC" or desc
== service
)
145 self
.assertEqual("Authentication", msg
["type"])
146 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
148 checkServiceDescription(
149 msg
["Authentication"]["serviceDescription"]))
151 self
.assertEqual(authTypes
[3],
152 msg
["Authentication"]["authDescription"])
154 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
156 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
158 def rpc_ncacn_np_krb5_check(
166 expected_messages
= len(authTypes
)
167 self
.assertEqual(expected_messages
,
169 "Did not receive the expected number of messages")
171 # Check the first message it should be an Authentication
172 # This is almost certainly Authentication over UDP, and is probably
173 # returning message too big,
175 self
.assertEqual("Authentication", msg
["type"])
176 self
.assertEqual("NT_STATUS_PROTOCOL_UNREACHABLE", # RESPONSE_TOO_BIG
177 msg
["Authentication"]["status"])
178 self
.assertEqual("Kerberos KDC",
179 msg
["Authentication"]["serviceDescription"])
180 self
.assertEqual(authTypes
[1],
181 msg
["Authentication"]["authDescription"])
183 EVT_ID_UNSUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
185 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
187 # Check the second message it should be an Authentication
188 # This this the TCP Authentication in response to the message too big
189 # response to the UDP Authentication
191 self
.assertEqual("Authentication", msg
["type"])
192 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
193 self
.assertEqual("Kerberos KDC",
194 msg
["Authentication"]["serviceDescription"])
195 self
.assertEqual(authTypes
[2],
196 msg
["Authentication"]["authDescription"])
198 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
200 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
202 # Check the third message it should be an Authorization
204 self
.assertEqual("Authorization", msg
["type"])
205 self
._assert
_ncacn
_np
_serviceDescription
(
206 binding
, msg
["Authorization"]["serviceDescription"])
207 self
.assertEqual(authTypes
[3], msg
["Authorization"]["authType"])
208 self
.assertEqual("SMB", msg
["Authorization"]["transportProtection"])
209 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
211 def test_rpc_ncacn_np_ntlm_dns_sign(self
):
212 creds
= self
.insta_creds(template
=self
.get_credentials(),
213 kerberos_state
=DONT_USE_KERBEROS
)
214 self
._test
_rpc
_ncacn
_np
(["NTLMSSP",
218 creds
, "dnsserver", "sign", "SIGN",
219 self
.rpc_ncacn_np_ntlm_check
)
221 def test_rpc_ncacn_np_ntlm_srv_sign(self
):
222 creds
= self
.insta_creds(template
=self
.get_credentials(),
223 kerberos_state
=DONT_USE_KERBEROS
)
224 self
._test
_rpc
_ncacn
_np
(["NTLMSSP",
228 creds
, "srvsvc", "sign", "SIGN",
229 self
.rpc_ncacn_np_ntlm_check
)
231 def test_rpc_ncacn_np_ntlm_dns(self
):
232 creds
= self
.insta_creds(template
=self
.get_credentials(),
233 kerberos_state
=DONT_USE_KERBEROS
)
234 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
237 creds
, "dnsserver", "", "SMB",
238 self
.rpc_ncacn_np_ntlm_check
)
240 def test_rpc_ncacn_np_ntlm_srv(self
):
241 creds
= self
.insta_creds(template
=self
.get_credentials(),
242 kerberos_state
=DONT_USE_KERBEROS
)
243 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
246 creds
, "srvsvc", "", "SMB",
247 self
.rpc_ncacn_np_ntlm_check
)
249 def test_rpc_ncacn_np_krb_dns_sign(self
):
250 creds
= self
.insta_creds(template
=self
.get_credentials(),
251 kerberos_state
=MUST_USE_KERBEROS
)
252 self
._test
_rpc
_ncacn
_np
(["krb5",
253 "ENC-TS Pre-authentication",
254 "ENC-TS Pre-authentication",
256 creds
, "dnsserver", "sign", "SIGN",
257 self
.rpc_ncacn_np_krb5_check
)
259 def test_rpc_ncacn_np_krb_srv_sign(self
):
260 creds
= self
.insta_creds(template
=self
.get_credentials(),
261 kerberos_state
=MUST_USE_KERBEROS
)
262 self
._test
_rpc
_ncacn
_np
(["krb5",
263 "ENC-TS Pre-authentication",
264 "ENC-TS Pre-authentication",
266 creds
, "srvsvc", "sign", "SIGN",
267 self
.rpc_ncacn_np_krb5_check
)
269 def test_rpc_ncacn_np_krb_dns(self
):
270 creds
= self
.insta_creds(template
=self
.get_credentials(),
271 kerberos_state
=MUST_USE_KERBEROS
)
272 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
273 "ENC-TS Pre-authentication",
274 "ENC-TS Pre-authentication",
276 creds
, "dnsserver", "", "SMB",
277 self
.rpc_ncacn_np_krb5_check
)
279 def test_rpc_ncacn_np_krb_dns_smb2(self
):
280 creds
= self
.insta_creds(template
=self
.get_credentials(),
281 kerberos_state
=MUST_USE_KERBEROS
)
282 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
283 "ENC-TS Pre-authentication",
284 "ENC-TS Pre-authentication",
286 creds
, "dnsserver", "smb2", "SMB",
287 self
.rpc_ncacn_np_krb5_check
)
289 def test_rpc_ncacn_np_krb_srv(self
):
290 creds
= self
.insta_creds(template
=self
.get_credentials(),
291 kerberos_state
=MUST_USE_KERBEROS
)
292 self
._test
_rpc
_ncacn
_np
(["ncacn_np",
293 "ENC-TS Pre-authentication",
294 "ENC-TS Pre-authentication",
296 creds
, "srvsvc", "", "SMB",
297 self
.rpc_ncacn_np_krb5_check
)
299 def _test_rpc_ncacn_ip_tcp(self
, authTypes
, creds
, service
,
300 binding
, protection
, checkFunction
):
301 def isLastExpectedMessage(msg
):
302 return (msg
["type"] == "Authorization" and
303 msg
["Authorization"]["serviceDescription"] == "DCE/RPC" and
304 msg
["Authorization"]["authType"] == authTypes
[0] and
305 msg
["Authorization"]["transportProtection"] == protection
)
308 binding
= "[%s]" % binding
310 if service
== "dnsserver":
311 conn
= dnsserver
.dnsserver(
312 "ncacn_ip_tcp:%s%s" % (self
.server
, binding
),
315 elif service
== "srvsvc":
316 conn
= srvsvc
.srvsvc("ncacn_ip_tcp:%s%s" % (self
.server
, binding
),
320 messages
= self
.waitForMessages(isLastExpectedMessage
, conn
)
321 checkFunction(messages
, authTypes
, service
, binding
, protection
)
323 def rpc_ncacn_ip_tcp_ntlm_check(self
, messages
, authTypes
, service
,
324 binding
, protection
):
326 expected_messages
= len(authTypes
)
327 self
.assertEqual(expected_messages
,
329 "Did not receive the expected number of messages")
331 # Check the first message it should be an Authorization
333 self
.assertEqual("Authorization", msg
["type"])
334 self
.assertEqual("DCE/RPC",
335 msg
["Authorization"]["serviceDescription"])
336 self
.assertEqual(authTypes
[1], msg
["Authorization"]["authType"])
337 self
.assertEqual("NONE", msg
["Authorization"]["transportProtection"])
338 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
340 # Check the second message it should be an Authentication
342 self
.assertEqual("Authentication", msg
["type"])
343 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
344 self
.assertEqual("DCE/RPC",
345 msg
["Authentication"]["serviceDescription"])
346 self
.assertEqual(authTypes
[2],
347 msg
["Authentication"]["authDescription"])
349 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
351 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
353 def rpc_ncacn_ip_tcp_krb5_check(self
, messages
, authTypes
, service
,
354 binding
, protection
):
356 expected_messages
= len(authTypes
)
357 self
.assertEqual(expected_messages
,
359 "Did not receive the expected number of messages")
361 # Check the first message it should be an Authorization
363 self
.assertEqual("Authorization", msg
["type"])
364 self
.assertEqual("DCE/RPC",
365 msg
["Authorization"]["serviceDescription"])
366 self
.assertEqual(authTypes
[1], msg
["Authorization"]["authType"])
367 self
.assertEqual("NONE", msg
["Authorization"]["transportProtection"])
368 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
370 # Check the second message it should be an Authentication
372 self
.assertEqual("Authentication", msg
["type"])
373 self
.assertEqual("NT_STATUS_PROTOCOL_UNREACHABLE", # RESPONSE_TOO_BIG
374 msg
["Authentication"]["status"])
375 self
.assertEqual("Kerberos KDC",
376 msg
["Authentication"]["serviceDescription"])
377 self
.assertEqual(authTypes
[2],
378 msg
["Authentication"]["authDescription"])
380 EVT_ID_UNSUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
382 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
384 # Check the third message it should be an Authentication
386 self
.assertEqual("Authentication", msg
["type"])
387 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
388 self
.assertEqual("Kerberos KDC",
389 msg
["Authentication"]["serviceDescription"])
390 self
.assertEqual(authTypes
[2],
391 msg
["Authentication"]["authDescription"])
393 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
395 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
397 def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self
):
398 creds
= self
.insta_creds(template
=self
.get_credentials(),
399 kerberos_state
=DONT_USE_KERBEROS
)
400 self
._test
_rpc
_ncacn
_ip
_tcp
(["NTLMSSP",
403 creds
, "dnsserver", "sign", "SIGN",
404 self
.rpc_ncacn_ip_tcp_ntlm_check
)
406 def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self
):
407 creds
= self
.insta_creds(template
=self
.get_credentials(),
408 kerberos_state
=MUST_USE_KERBEROS
)
409 self
._test
_rpc
_ncacn
_ip
_tcp
(["krb5",
411 "ENC-TS Pre-authentication",
412 "ENC-TS Pre-authentication"],
413 creds
, "dnsserver", "sign", "SIGN",
414 self
.rpc_ncacn_ip_tcp_krb5_check
)
416 def test_rpc_ncacn_ip_tcp_ntlm_dns(self
):
417 creds
= self
.insta_creds(template
=self
.get_credentials(),
418 kerberos_state
=DONT_USE_KERBEROS
)
419 self
._test
_rpc
_ncacn
_ip
_tcp
(["NTLMSSP",
422 creds
, "dnsserver", "", "SIGN",
423 self
.rpc_ncacn_ip_tcp_ntlm_check
)
425 def test_rpc_ncacn_ip_tcp_krb5_dns(self
):
426 creds
= self
.insta_creds(template
=self
.get_credentials(),
427 kerberos_state
=MUST_USE_KERBEROS
)
428 self
._test
_rpc
_ncacn
_ip
_tcp
(["krb5",
430 "ENC-TS Pre-authentication",
431 "ENC-TS Pre-authentication"],
432 creds
, "dnsserver", "", "SIGN",
433 self
.rpc_ncacn_ip_tcp_krb5_check
)
435 def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self
):
436 creds
= self
.insta_creds(template
=self
.get_credentials(),
437 kerberos_state
=DONT_USE_KERBEROS
)
438 self
._test
_rpc
_ncacn
_ip
_tcp
(["NTLMSSP",
441 creds
, "dnsserver", "connect", "NONE",
442 self
.rpc_ncacn_ip_tcp_ntlm_check
)
444 def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self
):
445 creds
= self
.insta_creds(template
=self
.get_credentials(),
446 kerberos_state
=MUST_USE_KERBEROS
)
447 self
._test
_rpc
_ncacn
_ip
_tcp
(["krb5",
449 "ENC-TS Pre-authentication",
450 "ENC-TS Pre-authentication"],
451 creds
, "dnsserver", "connect", "NONE",
452 self
.rpc_ncacn_ip_tcp_krb5_check
)
454 def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self
):
455 creds
= self
.insta_creds(template
=self
.get_credentials(),
456 kerberos_state
=DONT_USE_KERBEROS
)
457 self
._test
_rpc
_ncacn
_ip
_tcp
(["NTLMSSP",
460 creds
, "dnsserver", "seal", "SEAL",
461 self
.rpc_ncacn_ip_tcp_ntlm_check
)
463 def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self
):
464 creds
= self
.insta_creds(template
=self
.get_credentials(),
465 kerberos_state
=MUST_USE_KERBEROS
)
466 self
._test
_rpc
_ncacn
_ip
_tcp
(["krb5",
468 "ENC-TS Pre-authentication",
469 "ENC-TS Pre-authentication"],
470 creds
, "dnsserver", "seal", "SEAL",
471 self
.rpc_ncacn_ip_tcp_krb5_check
)
475 def isLastExpectedMessage(msg
):
476 return (msg
["type"] == "Authorization" and
477 msg
["Authorization"]["serviceDescription"] == "LDAP" and
478 msg
["Authorization"]["transportProtection"] == "SEAL" and
479 msg
["Authorization"]["authType"] == "krb5")
481 self
.samdb
= SamDB(url
="ldap://%s" % os
.environ
["SERVER"],
482 lp
=self
.get_loadparm(),
483 credentials
=self
.get_credentials())
485 messages
= self
.waitForMessages(isLastExpectedMessage
)
488 "Did not receive the expected number of messages")
490 # Check the first message it should be an Authentication
492 self
.assertEqual("Authentication", msg
["type"])
493 self
.assertEqual("NT_STATUS_PROTOCOL_UNREACHABLE", # RESPONSE_TOO_BIG
494 msg
["Authentication"]["status"])
495 self
.assertEqual("Kerberos KDC",
496 msg
["Authentication"]["serviceDescription"])
497 self
.assertEqual("ENC-TS Pre-authentication",
498 msg
["Authentication"]["authDescription"])
499 self
.assertTrue(msg
["Authentication"]["duration"] > 0)
501 EVT_ID_UNSUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
503 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
505 # Check the second message it should be an Authentication
507 self
.assertEqual("Authentication", msg
["type"])
508 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
509 self
.assertEqual("Kerberos KDC",
510 msg
["Authentication"]["serviceDescription"])
511 self
.assertEqual("ENC-TS Pre-authentication",
512 msg
["Authentication"]["authDescription"])
513 self
.assertTrue(msg
["Authentication"]["duration"] > 0)
515 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
517 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
519 def test_ldap_ntlm(self
):
521 def isLastExpectedMessage(msg
):
522 return (msg
["type"] == "Authorization" and
523 msg
["Authorization"]["serviceDescription"] == "LDAP" and
524 msg
["Authorization"]["transportProtection"] == "SEAL" and
525 msg
["Authorization"]["authType"] == "NTLMSSP")
527 self
.samdb
= SamDB(url
="ldap://%s" % os
.environ
["SERVER_IP"],
528 lp
=self
.get_loadparm(),
529 credentials
=self
.get_credentials())
531 messages
= self
.waitForMessages(isLastExpectedMessage
)
534 "Did not receive the expected number of messages")
535 # Check the first message it should be an Authentication
537 self
.assertEqual("Authentication", msg
["type"])
538 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
539 self
.assertEqual("LDAP",
540 msg
["Authentication"]["serviceDescription"])
541 self
.assertEqual("NTLMSSP", msg
["Authentication"]["authDescription"])
542 self
.assertTrue(msg
["Authentication"]["duration"] > 0)
544 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
546 EVT_LOGON_NETWORK
, msg
["Authentication"]["logonType"])
548 def test_ldap_simple_bind(self
):
549 def isLastExpectedMessage(msg
):
550 return (msg
["type"] == "Authorization" and
551 msg
["Authorization"]["serviceDescription"] == "LDAP" and
552 msg
["Authorization"]["transportProtection"] == "TLS" and
553 msg
["Authorization"]["authType"] == "simple bind")
555 creds
= self
.insta_creds(template
=self
.get_credentials())
556 creds
.set_bind_dn("%s\\%s" % (creds
.get_domain(),
557 creds
.get_username()))
559 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
560 lp
=self
.get_loadparm(),
563 messages
= self
.waitForMessages(isLastExpectedMessage
)
566 "Did not receive the expected number of messages")
568 # Check the first message it should be an Authentication
570 self
.assertEqual("Authentication", msg
["type"])
571 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
572 self
.assertEqual("LDAP",
573 msg
["Authentication"]["serviceDescription"])
574 self
.assertEqual("simple bind/TLS",
575 msg
["Authentication"]["authDescription"])
577 EVT_ID_SUCCESSFUL_LOGON
, msg
["Authentication"]["eventId"])
579 EVT_LOGON_NETWORK_CLEAR_TEXT
, msg
["Authentication"]["logonType"])
581 def test_ldap_simple_bind_bad_password(self
):
582 def isLastExpectedMessage(msg
):
583 return (msg
["type"] == "Authentication" and
584 msg
["Authentication"]["serviceDescription"] == "LDAP" and
585 (msg
["Authentication"]["status"] ==
586 "NT_STATUS_WRONG_PASSWORD") and
587 (msg
["Authentication"]["authDescription"] ==
588 "simple bind/TLS") and
589 (msg
["Authentication"]["eventId"] ==
590 EVT_ID_UNSUCCESSFUL_LOGON
) and
591 (msg
["Authentication"]["logonType"] ==
592 EVT_LOGON_NETWORK_CLEAR_TEXT
))
594 creds
= self
.insta_creds(template
=self
.get_credentials())
595 creds
.set_password("badPassword")
596 creds
.set_bind_dn("%s\\%s" % (creds
.get_domain(),
597 creds
.get_username()))
601 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
602 lp
=self
.get_loadparm(),
606 self
.assertEqual(thrown
, True)
608 messages
= self
.waitForMessages(isLastExpectedMessage
)
611 "Did not receive the expected number of messages")
613 def test_ldap_simple_bind_bad_user(self
):
614 def isLastExpectedMessage(msg
):
615 return (msg
["type"] == "Authentication" and
616 msg
["Authentication"]["serviceDescription"] == "LDAP" and
617 (msg
["Authentication"]["status"] ==
618 "NT_STATUS_NO_SUCH_USER") and
619 (msg
["Authentication"]["authDescription"] ==
620 "simple bind/TLS") and
621 (msg
["Authentication"]["eventId"] ==
622 EVT_ID_UNSUCCESSFUL_LOGON
) and
623 (msg
["Authentication"]["logonType"] ==
624 EVT_LOGON_NETWORK_CLEAR_TEXT
))
626 creds
= self
.insta_creds(template
=self
.get_credentials())
627 creds
.set_bind_dn("%s\\%s" % (creds
.get_domain(), "badUser"))
631 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
632 lp
=self
.get_loadparm(),
636 self
.assertEqual(thrown
, True)
638 messages
= self
.waitForMessages(isLastExpectedMessage
)
641 "Did not receive the expected number of messages")
643 def test_ldap_simple_bind_unparseable_user(self
):
644 def isLastExpectedMessage(msg
):
645 return (msg
["type"] == "Authentication" and
646 msg
["Authentication"]["serviceDescription"] == "LDAP" and
647 (msg
["Authentication"]["status"] ==
648 "NT_STATUS_NO_SUCH_USER") and
649 (msg
["Authentication"]["authDescription"] ==
650 "simple bind/TLS") and
651 (msg
["Authentication"]["eventId"] ==
652 EVT_ID_UNSUCCESSFUL_LOGON
) and
653 (msg
["Authentication"]["logonType"] ==
654 EVT_LOGON_NETWORK_CLEAR_TEXT
))
656 creds
= self
.insta_creds(template
=self
.get_credentials())
657 creds
.set_bind_dn("%s\\%s" % (creds
.get_domain(), "abdcef"))
661 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
662 lp
=self
.get_loadparm(),
666 self
.assertEqual(thrown
, True)
668 messages
= self
.waitForMessages(isLastExpectedMessage
)
671 "Did not receive the expected number of messages")
674 # Note: as this test does not expect any messages it will
675 # time out in the call to self.waitForMessages.
676 # This is expected, but it will slow this test.
677 def test_ldap_anonymous_access_bind_only(self
):
678 # Should be no logging for anonymous bind
679 # so receiving any message indicates a failure.
680 def isLastExpectedMessage(msg
):
683 creds
= self
.insta_creds(template
=self
.get_credentials())
684 creds
.set_anonymous()
686 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
687 lp
=self
.get_loadparm(),
690 messages
= self
.waitForMessages(isLastExpectedMessage
)
693 "Did not receive the expected number of messages")
695 def test_ldap_anonymous_access(self
):
696 def isLastExpectedMessage(msg
):
697 return (msg
["type"] == "Authorization" and
698 msg
["Authorization"]["serviceDescription"] == "LDAP" and
699 msg
["Authorization"]["transportProtection"] == "TLS" and
700 msg
["Authorization"]["account"] == "ANONYMOUS LOGON" and
701 msg
["Authorization"]["authType"] == "no bind")
703 creds
= self
.insta_creds(template
=self
.get_credentials())
704 creds
.set_anonymous()
706 self
.samdb
= SamDB(url
="ldaps://%s" % os
.environ
["SERVER"],
707 lp
=self
.get_loadparm(),
711 self
.samdb
.search(base
=self
.samdb
.domain_dn())
712 self
.fail("Expected an LdbError exception")
716 messages
= self
.waitForMessages(isLastExpectedMessage
)
719 "Did not receive the expected number of messages")
722 def isLastExpectedMessage(msg
):
723 return (msg
["type"] == "Authorization" and
724 "SMB" in msg
["Authorization"]["serviceDescription"] and
725 msg
["Authorization"]["authType"] == "krb5" and
726 msg
["Authorization"]["transportProtection"] == "SMB")
728 creds
= self
.insta_creds(template
=self
.get_credentials())
729 self
.smb_connection(creds
)
731 messages
= self
.waitForMessages(isLastExpectedMessage
)
734 "Did not receive the expected number of messages")
735 # Check the first message it should be an Authentication
737 self
.assertEqual("Authentication", msg
["type"])
738 self
.assertEqual("NT_STATUS_PROTOCOL_UNREACHABLE", # RESPONSE_TOO_BIG
739 msg
["Authentication"]["status"])
740 self
.assertEqual("Kerberos KDC",
741 msg
["Authentication"]["serviceDescription"])
742 self
.assertEqual("ENC-TS Pre-authentication",
743 msg
["Authentication"]["authDescription"])
744 self
.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON
,
745 msg
["Authentication"]["eventId"])
746 self
.assertEqual(EVT_LOGON_NETWORK
,
747 msg
["Authentication"]["logonType"])
749 # Check the second message it should be an Authentication
751 self
.assertEqual("Authentication", msg
["type"])
752 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
753 self
.assertEqual("Kerberos KDC",
754 msg
["Authentication"]["serviceDescription"])
755 self
.assertEqual("ENC-TS Pre-authentication",
756 msg
["Authentication"]["authDescription"])
757 self
.assertEqual(EVT_ID_SUCCESSFUL_LOGON
,
758 msg
["Authentication"]["eventId"])
759 self
.assertEqual(EVT_LOGON_NETWORK
,
760 msg
["Authentication"]["logonType"])
762 def test_smb_bad_password(self
):
763 def isLastExpectedMessage(msg
):
764 return (msg
["type"] == "Authentication" and
765 (msg
["Authentication"]["serviceDescription"] ==
767 (msg
["Authentication"]["status"] ==
768 "NT_STATUS_WRONG_PASSWORD") and
769 (msg
["Authentication"]["authDescription"] ==
770 "ENC-TS Pre-authentication"))
772 creds
= self
.insta_creds(template
=self
.get_credentials())
773 creds
.set_kerberos_state(MUST_USE_KERBEROS
)
774 creds
.set_password("badPassword")
778 self
.smb_connection(creds
)
779 except NTSTATUSError
:
781 self
.assertEqual(thrown
, True)
783 messages
= self
.waitForMessages(isLastExpectedMessage
)
786 "Did not receive the expected number of messages")
788 def test_smb_bad_user(self
):
789 def isLastExpectedMessage(msg
):
790 return (msg
["type"] == "Authentication" and
791 (msg
["Authentication"]["serviceDescription"] ==
793 (msg
["Authentication"]["status"] ==
794 "NT_STATUS_NO_SUCH_USER") and
795 (msg
["Authentication"]["authDescription"] ==
797 (msg
["Authentication"]["eventId"] ==
798 EVT_ID_UNSUCCESSFUL_LOGON
) and
799 (msg
["Authentication"]["logonType"] ==
802 creds
= self
.insta_creds(template
=self
.get_credentials())
803 creds
.set_kerberos_state(MUST_USE_KERBEROS
)
804 creds
.set_username("badUser")
808 self
.smb_connection(creds
)
809 except NTSTATUSError
:
811 self
.assertEqual(thrown
, True)
813 messages
= self
.waitForMessages(isLastExpectedMessage
)
816 "Did not receive the expected number of messages")
818 def test_smb1_anonymous(self
):
819 def isLastExpectedMessage(msg
):
820 return (msg
["type"] == "Authorization" and
821 msg
["Authorization"]["serviceDescription"] == "SMB" and
822 msg
["Authorization"]["authType"] == "NTLMSSP" and
823 msg
["Authorization"]["account"] == "ANONYMOUS LOGON" and
824 msg
["Authorization"]["transportProtection"] == "SMB")
826 server
= os
.environ
["SERVER"]
828 path
= "//%s/IPC$" % server
830 call(["bin/smbclient", path
, auth
, "-mNT1", "-c quit"])
832 messages
= self
.waitForMessages(isLastExpectedMessage
)
835 "Did not receive the expected number of messages")
837 # Check the first message it should be an Authentication
839 self
.assertEqual("Authentication", msg
["type"])
840 self
.assertEqual("NT_STATUS_NO_SUCH_USER",
841 msg
["Authentication"]["status"])
842 self
.assertEqual("SMB",
843 msg
["Authentication"]["serviceDescription"])
844 self
.assertEqual("NTLMSSP",
845 msg
["Authentication"]["authDescription"])
846 self
.assertEqual("No-Password",
847 msg
["Authentication"]["passwordType"])
848 self
.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON
,
849 msg
["Authentication"]["eventId"])
850 self
.assertEqual(EVT_LOGON_NETWORK
,
851 msg
["Authentication"]["logonType"])
853 # Check the second message it should be an Authentication
855 self
.assertEqual("Authentication", msg
["type"])
856 self
.assertEqual("NT_STATUS_OK",
857 msg
["Authentication"]["status"])
858 self
.assertEqual("SMB",
859 msg
["Authentication"]["serviceDescription"])
860 self
.assertEqual("NTLMSSP",
861 msg
["Authentication"]["authDescription"])
862 self
.assertEqual("No-Password",
863 msg
["Authentication"]["passwordType"])
864 self
.assertEqual("ANONYMOUS LOGON",
865 msg
["Authentication"]["becameAccount"])
866 self
.assertEqual(EVT_ID_SUCCESSFUL_LOGON
,
867 msg
["Authentication"]["eventId"])
868 self
.assertEqual(EVT_LOGON_NETWORK
,
869 msg
["Authentication"]["logonType"])
871 def test_smb2_anonymous(self
):
872 def isLastExpectedMessage(msg
):
873 return (msg
["type"] == "Authorization" and
874 msg
["Authorization"]["serviceDescription"] == "SMB2" and
875 msg
["Authorization"]["authType"] == "NTLMSSP" and
876 msg
["Authorization"]["account"] == "ANONYMOUS LOGON" and
877 msg
["Authorization"]["transportProtection"] == "SMB")
879 server
= os
.environ
["SERVER"]
881 path
= "//%s/IPC$" % server
883 call(["bin/smbclient", path
, auth
, "-mSMB3", "-c quit"])
885 messages
= self
.waitForMessages(isLastExpectedMessage
)
888 "Did not receive the expected number of messages")
890 # Check the first message it should be an Authentication
892 self
.assertEqual("Authentication", msg
["type"])
893 self
.assertEqual("NT_STATUS_NO_SUCH_USER",
894 msg
["Authentication"]["status"])
895 self
.assertEqual("SMB2",
896 msg
["Authentication"]["serviceDescription"])
897 self
.assertEqual("NTLMSSP",
898 msg
["Authentication"]["authDescription"])
899 self
.assertEqual("No-Password",
900 msg
["Authentication"]["passwordType"])
901 self
.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON
,
902 msg
["Authentication"]["eventId"])
903 self
.assertEqual(EVT_LOGON_NETWORK
,
904 msg
["Authentication"]["logonType"])
906 # Check the second message it should be an Authentication
908 self
.assertEqual("Authentication", msg
["type"])
909 self
.assertEqual("NT_STATUS_OK",
910 msg
["Authentication"]["status"])
911 self
.assertEqual("SMB2",
912 msg
["Authentication"]["serviceDescription"])
913 self
.assertEqual("NTLMSSP",
914 msg
["Authentication"]["authDescription"])
915 self
.assertEqual("No-Password",
916 msg
["Authentication"]["passwordType"])
917 self
.assertEqual("ANONYMOUS LOGON",
918 msg
["Authentication"]["becameAccount"])
919 self
.assertEqual(EVT_ID_SUCCESSFUL_LOGON
,
920 msg
["Authentication"]["eventId"])
921 self
.assertEqual(EVT_LOGON_NETWORK
,
922 msg
["Authentication"]["logonType"])
924 def test_smb_no_krb_spnego(self
):
925 def isLastExpectedMessage(msg
):
926 return (msg
["type"] == "Authorization" and
927 "SMB" in msg
["Authorization"]["serviceDescription"] and
928 msg
["Authorization"]["authType"] == "NTLMSSP" and
929 msg
["Authorization"]["transportProtection"] == "SMB")
931 creds
= self
.insta_creds(template
=self
.get_credentials(),
932 kerberos_state
=DONT_USE_KERBEROS
)
933 self
.smb_connection(creds
)
935 messages
= self
.waitForMessages(isLastExpectedMessage
)
938 "Did not receive the expected number of messages")
939 # Check the first message it should be an Authentication
941 self
.assertEqual("Authentication", msg
["type"])
942 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
943 self
.assertIn(msg
["Authentication"]["serviceDescription"],
945 self
.assertEqual("NTLMSSP",
946 msg
["Authentication"]["authDescription"])
947 self
.assertEqual("NTLMv2",
948 msg
["Authentication"]["passwordType"])
949 self
.assertEqual(EVT_ID_SUCCESSFUL_LOGON
,
950 msg
["Authentication"]["eventId"])
951 self
.assertEqual(EVT_LOGON_NETWORK
,
952 msg
["Authentication"]["logonType"])
954 def test_smb_no_krb_spnego_bad_password(self
):
955 def isLastExpectedMessage(msg
):
956 return (msg
["type"] == "Authentication" and
957 "SMB" in msg
["Authentication"]["serviceDescription"] and
958 msg
["Authentication"]["authDescription"] == "NTLMSSP" and
959 msg
["Authentication"]["passwordType"] == "NTLMv2" and
960 (msg
["Authentication"]["status"] ==
961 "NT_STATUS_WRONG_PASSWORD") and
962 (msg
["Authentication"]["eventId"] ==
963 EVT_ID_UNSUCCESSFUL_LOGON
) and
964 (msg
["Authentication"]["logonType"] ==
967 creds
= self
.insta_creds(template
=self
.get_credentials(),
968 kerberos_state
=DONT_USE_KERBEROS
)
969 creds
.set_password("badPassword")
973 self
.smb_connection(creds
)
974 except NTSTATUSError
:
976 self
.assertEqual(thrown
, True)
978 messages
= self
.waitForMessages(isLastExpectedMessage
)
981 "Did not receive the expected number of messages")
983 def test_smb_no_krb_spnego_bad_user(self
):
984 def isLastExpectedMessage(msg
):
985 return (msg
["type"] == "Authentication" and
986 "SMB" in msg
["Authentication"]["serviceDescription"] and
987 msg
["Authentication"]["authDescription"] == "NTLMSSP" and
988 msg
["Authentication"]["passwordType"] == "NTLMv2" and
989 (msg
["Authentication"]["status"] ==
990 "NT_STATUS_NO_SUCH_USER") and
991 (msg
["Authentication"]["eventId"] ==
992 EVT_ID_UNSUCCESSFUL_LOGON
) and
993 (msg
["Authentication"]["logonType"] ==
996 creds
= self
.insta_creds(template
=self
.get_credentials(),
997 kerberos_state
=DONT_USE_KERBEROS
)
998 creds
.set_username("badUser")
1002 self
.smb_connection(creds
)
1003 except NTSTATUSError
:
1005 self
.assertEqual(thrown
, True)
1007 messages
= self
.waitForMessages(isLastExpectedMessage
)
1010 "Did not receive the expected number of messages")
1012 def test_smb_no_krb_no_spnego_no_ntlmv2(self
):
1013 def isLastExpectedMessage(msg
):
1014 return (msg
["type"] == "Authorization" and
1015 msg
["Authorization"]["serviceDescription"] == "SMB" and
1016 msg
["Authorization"]["authType"] == "bare-NTLM" and
1017 msg
["Authorization"]["transportProtection"] == "SMB")
1019 creds
= self
.insta_creds(template
=self
.get_credentials(),
1020 kerberos_state
=DONT_USE_KERBEROS
)
1021 self
.smb_connection(creds
,
1026 messages
= self
.waitForMessages(isLastExpectedMessage
)
1029 "Did not receive the expected number of messages")
1030 # Check the first message it should be an Authentication
1032 self
.assertEqual("Authentication", msg
["type"])
1033 self
.assertEqual("NT_STATUS_OK", msg
["Authentication"]["status"])
1034 self
.assertEqual("SMB",
1035 msg
["Authentication"]["serviceDescription"])
1036 self
.assertEqual("bare-NTLM",
1037 msg
["Authentication"]["authDescription"])
1038 self
.assertEqual("NTLMv1",
1039 msg
["Authentication"]["passwordType"])
1040 self
.assertEqual(EVT_ID_SUCCESSFUL_LOGON
,
1041 msg
["Authentication"]["eventId"])
1042 self
.assertEqual(EVT_LOGON_NETWORK
,
1043 msg
["Authentication"]["logonType"])
1045 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self
):
1046 def isLastExpectedMessage(msg
):
1047 return (msg
["type"] == "Authentication" and
1048 msg
["Authentication"]["serviceDescription"] == "SMB" and
1049 msg
["Authentication"]["authDescription"] == "bare-NTLM" and
1050 msg
["Authentication"]["passwordType"] == "NTLMv1" and
1051 (msg
["Authentication"]["status"] ==
1052 "NT_STATUS_WRONG_PASSWORD") and
1053 (msg
["Authentication"]["eventId"] ==
1054 EVT_ID_UNSUCCESSFUL_LOGON
) and
1055 (msg
["Authentication"]["logonType"] ==
1058 creds
= self
.insta_creds(template
=self
.get_credentials(),
1059 kerberos_state
=DONT_USE_KERBEROS
)
1060 creds
.set_password("badPassword")
1064 self
.smb_connection(creds
,
1068 except NTSTATUSError
:
1070 self
.assertEqual(thrown
, True)
1072 messages
= self
.waitForMessages(isLastExpectedMessage
)
1075 "Did not receive the expected number of messages")
1077 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self
):
1078 def isLastExpectedMessage(msg
):
1079 return (msg
["type"] == "Authentication" and
1080 msg
["Authentication"]["serviceDescription"] == "SMB" and
1081 msg
["Authentication"]["authDescription"] == "bare-NTLM" and
1082 msg
["Authentication"]["passwordType"] == "NTLMv1" and
1083 (msg
["Authentication"]["status"] ==
1084 "NT_STATUS_NO_SUCH_USER") and
1085 (msg
["Authentication"]["eventId"] ==
1086 EVT_ID_UNSUCCESSFUL_LOGON
) and
1087 (msg
["Authentication"]["logonType"] ==
1090 creds
= self
.insta_creds(template
=self
.get_credentials(),
1091 kerberos_state
=DONT_USE_KERBEROS
)
1092 creds
.set_username("badUser")
1096 self
.smb_connection(creds
,
1100 except NTSTATUSError
:
1102 self
.assertEqual(thrown
, True)
1104 messages
= self
.waitForMessages(isLastExpectedMessage
)
1107 "Did not receive the expected number of messages")
1109 def test_samlogon_interactive(self
):
1111 workstation
= "AuthLogTests"
1113 def isLastExpectedMessage(msg
):
1114 return (msg
["type"] == "Authentication" and
1115 (msg
["Authentication"]["serviceDescription"] ==
1117 (msg
["Authentication"]["authDescription"] ==
1119 msg
["Authentication"]["status"] == "NT_STATUS_OK" and
1120 (msg
["Authentication"]["workstation"] ==
1121 r
"\\%s" % workstation
) and
1122 (msg
["Authentication"]["eventId"] ==
1123 EVT_ID_SUCCESSFUL_LOGON
) and
1124 (msg
["Authentication"]["logonType"] ==
1125 EVT_LOGON_INTERACTIVE
))
1127 server
= os
.environ
["SERVER"]
1128 user
= os
.environ
["USERNAME"]
1129 password
= os
.environ
["PASSWORD"]
1130 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 1)
1132 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1134 messages
= self
.waitForMessages(isLastExpectedMessage
)
1135 messages
= self
.remove_netlogon_messages(messages
)
1136 received
= len(messages
)
1138 (received
== 4 or received
== 5),
1139 "Did not receive the expected number of messages")
1141 def test_samlogon_interactive_bad_password(self
):
1143 workstation
= "AuthLogTests"
1145 def isLastExpectedMessage(msg
):
1146 return (msg
["type"] == "Authentication" and
1147 (msg
["Authentication"]["serviceDescription"] ==
1149 (msg
["Authentication"]["authDescription"] ==
1151 (msg
["Authentication"]["status"] ==
1152 "NT_STATUS_WRONG_PASSWORD") and
1153 (msg
["Authentication"]["workstation"] ==
1154 r
"\\%s" % workstation
) and
1155 (msg
["Authentication"]["eventId"] ==
1156 EVT_ID_UNSUCCESSFUL_LOGON
) and
1157 (msg
["Authentication"]["logonType"] ==
1158 EVT_LOGON_INTERACTIVE
))
1160 server
= os
.environ
["SERVER"]
1161 user
= os
.environ
["USERNAME"]
1162 password
= "badPassword"
1163 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 1)
1165 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1167 messages
= self
.waitForMessages(isLastExpectedMessage
)
1168 messages
= self
.remove_netlogon_messages(messages
)
1169 received
= len(messages
)
1171 (received
== 4 or received
== 5),
1172 "Did not receive the expected number of messages")
1174 def test_samlogon_interactive_bad_user(self
):
1176 workstation
= "AuthLogTests"
1178 def isLastExpectedMessage(msg
):
1179 return (msg
["type"] == "Authentication" and
1180 (msg
["Authentication"]["serviceDescription"] ==
1182 (msg
["Authentication"]["authDescription"] ==
1184 (msg
["Authentication"]["status"] ==
1185 "NT_STATUS_NO_SUCH_USER") and
1186 (msg
["Authentication"]["workstation"] ==
1187 r
"\\%s" % workstation
) and
1188 (msg
["Authentication"]["eventId"] ==
1189 EVT_ID_UNSUCCESSFUL_LOGON
) and
1190 (msg
["Authentication"]["logonType"] ==
1191 EVT_LOGON_INTERACTIVE
))
1193 server
= os
.environ
["SERVER"]
1195 password
= os
.environ
["PASSWORD"]
1196 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 1)
1198 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1200 messages
= self
.waitForMessages(isLastExpectedMessage
)
1201 messages
= self
.remove_netlogon_messages(messages
)
1202 received
= len(messages
)
1204 (received
== 4 or received
== 5),
1205 "Did not receive the expected number of messages")
1207 def test_samlogon_network(self
):
1209 workstation
= "AuthLogTests"
1211 def isLastExpectedMessage(msg
):
1212 return (msg
["type"] == "Authentication" and
1213 (msg
["Authentication"]["serviceDescription"] ==
1215 msg
["Authentication"]["authDescription"] == "network" and
1216 msg
["Authentication"]["status"] == "NT_STATUS_OK" and
1217 (msg
["Authentication"]["workstation"] ==
1218 r
"\\%s" % workstation
) and
1219 (msg
["Authentication"]["eventId"] ==
1220 EVT_ID_SUCCESSFUL_LOGON
) and
1221 (msg
["Authentication"]["logonType"] ==
1224 server
= os
.environ
["SERVER"]
1225 user
= os
.environ
["USERNAME"]
1226 password
= os
.environ
["PASSWORD"]
1227 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 2)
1229 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1231 messages
= self
.waitForMessages(isLastExpectedMessage
)
1232 messages
= self
.remove_netlogon_messages(messages
)
1233 received
= len(messages
)
1235 (received
== 4 or received
== 5),
1236 "Did not receive the expected number of messages")
1238 def test_samlogon_network_bad_password(self
):
1240 workstation
= "AuthLogTests"
1242 def isLastExpectedMessage(msg
):
1243 return (msg
["type"] == "Authentication" and
1244 (msg
["Authentication"]["serviceDescription"] ==
1246 msg
["Authentication"]["authDescription"] == "network" and
1247 (msg
["Authentication"]["status"] ==
1248 "NT_STATUS_WRONG_PASSWORD") and
1249 (msg
["Authentication"]["workstation"] ==
1250 r
"\\%s" % workstation
) and
1251 (msg
["Authentication"]["eventId"] ==
1252 EVT_ID_UNSUCCESSFUL_LOGON
) and
1253 (msg
["Authentication"]["logonType"] ==
1256 server
= os
.environ
["SERVER"]
1257 user
= os
.environ
["USERNAME"]
1258 password
= "badPassword"
1259 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 2)
1261 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1263 messages
= self
.waitForMessages(isLastExpectedMessage
)
1264 messages
= self
.remove_netlogon_messages(messages
)
1265 received
= len(messages
)
1267 (received
== 4 or received
== 5),
1268 "Did not receive the expected number of messages")
1270 def test_samlogon_network_bad_user(self
):
1272 workstation
= "AuthLogTests"
1274 def isLastExpectedMessage(msg
):
1275 return ((msg
["type"] == "Authentication") and
1276 (msg
["Authentication"]["serviceDescription"] ==
1278 (msg
["Authentication"]["authDescription"] == "network") and
1279 (msg
["Authentication"]["status"] ==
1280 "NT_STATUS_NO_SUCH_USER") and
1281 (msg
["Authentication"]["workstation"] ==
1282 r
"\\%s" % workstation
) and
1283 (msg
["Authentication"]["eventId"] ==
1284 EVT_ID_UNSUCCESSFUL_LOGON
) and
1285 (msg
["Authentication"]["logonType"] ==
1288 server
= os
.environ
["SERVER"]
1290 password
= os
.environ
["PASSWORD"]
1291 samlogon
= "samlogon %s %s %s %d" % (user
, password
, workstation
, 2)
1293 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1295 messages
= self
.waitForMessages(isLastExpectedMessage
)
1296 messages
= self
.remove_netlogon_messages(messages
)
1297 received
= len(messages
)
1299 (received
== 4 or received
== 5),
1300 "Did not receive the expected number of messages")
1302 def test_samlogon_network_mschap(self
):
1304 workstation
= "AuthLogTests"
1306 def isLastExpectedMessage(msg
):
1307 return ((msg
["type"] == "Authentication") and
1308 (msg
["Authentication"]["serviceDescription"] ==
1310 (msg
["Authentication"]["authDescription"] == "network") and
1311 (msg
["Authentication"]["status"] == "NT_STATUS_OK") and
1312 (msg
["Authentication"]["passwordType"] == "MSCHAPv2") and
1313 (msg
["Authentication"]["workstation"] ==
1314 r
"\\%s" % workstation
) and
1315 (msg
["Authentication"]["eventId"] ==
1316 EVT_ID_SUCCESSFUL_LOGON
) and
1317 (msg
["Authentication"]["logonType"] ==
1320 server
= os
.environ
["SERVER"]
1321 user
= os
.environ
["USERNAME"]
1322 password
= os
.environ
["PASSWORD"]
1323 samlogon
= "samlogon %s %s %s %d 0x00010000" % (
1324 user
, password
, workstation
, 2)
1326 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1328 messages
= self
.waitForMessages(isLastExpectedMessage
)
1329 messages
= self
.remove_netlogon_messages(messages
)
1330 received
= len(messages
)
1332 (received
== 4 or received
== 5),
1333 "Did not receive the expected number of messages")
1335 def test_samlogon_network_mschap_bad_password(self
):
1337 workstation
= "AuthLogTests"
1339 def isLastExpectedMessage(msg
):
1340 return ((msg
["type"] == "Authentication") and
1341 (msg
["Authentication"]["serviceDescription"] ==
1343 (msg
["Authentication"]["authDescription"] == "network") and
1344 (msg
["Authentication"]["status"] ==
1345 "NT_STATUS_WRONG_PASSWORD") and
1346 (msg
["Authentication"]["passwordType"] == "MSCHAPv2") and
1347 (msg
["Authentication"]["workstation"] ==
1348 r
"\\%s" % workstation
) and
1349 (msg
["Authentication"]["eventId"] ==
1350 EVT_ID_UNSUCCESSFUL_LOGON
) and
1351 (msg
["Authentication"]["logonType"] ==
1354 server
= os
.environ
["SERVER"]
1355 user
= os
.environ
["USERNAME"]
1356 password
= "badPassword"
1357 samlogon
= "samlogon %s %s %s %d 0x00010000" % (
1358 user
, password
, workstation
, 2)
1360 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1362 messages
= self
.waitForMessages(isLastExpectedMessage
)
1363 messages
= self
.remove_netlogon_messages(messages
)
1364 received
= len(messages
)
1366 (received
== 4 or received
== 5),
1367 "Did not receive the expected number of messages")
1369 def test_samlogon_network_mschap_bad_user(self
):
1371 workstation
= "AuthLogTests"
1373 def isLastExpectedMessage(msg
):
1374 return ((msg
["type"] == "Authentication") and
1375 (msg
["Authentication"]["serviceDescription"] ==
1377 (msg
["Authentication"]["authDescription"] == "network") and
1378 (msg
["Authentication"]["status"] ==
1379 "NT_STATUS_NO_SUCH_USER") and
1380 (msg
["Authentication"]["passwordType"] == "MSCHAPv2") and
1381 (msg
["Authentication"]["workstation"] ==
1382 r
"\\%s" % workstation
) and
1383 (msg
["Authentication"]["eventId"] ==
1384 EVT_ID_UNSUCCESSFUL_LOGON
) and
1385 (msg
["Authentication"]["logonType"] ==
1388 server
= os
.environ
["SERVER"]
1390 password
= os
.environ
["PASSWORD"]
1391 samlogon
= "samlogon %s %s %s %d 0x00010000" % (
1392 user
, password
, workstation
, 2)
1394 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1396 messages
= self
.waitForMessages(isLastExpectedMessage
)
1397 messages
= self
.remove_netlogon_messages(messages
)
1398 received
= len(messages
)
1400 (received
== 4 or received
== 5),
1401 "Did not receive the expected number of messages")
1403 def test_samlogon_schannel_seal(self
):
1405 workstation
= "AuthLogTests"
1407 def isLastExpectedMessage(msg
):
1408 return ((msg
["type"] == "Authentication") and
1409 (msg
["Authentication"]["serviceDescription"] ==
1411 (msg
["Authentication"]["authDescription"] == "network") and
1412 (msg
["Authentication"]["status"] == "NT_STATUS_OK") and
1413 (msg
["Authentication"]["workstation"] ==
1414 r
"\\%s" % workstation
) and
1415 (msg
["Authentication"]["eventId"] ==
1416 EVT_ID_SUCCESSFUL_LOGON
) and
1417 (msg
["Authentication"]["logonType"] ==
1420 server
= os
.environ
["SERVER"]
1421 user
= os
.environ
["USERNAME"]
1422 password
= os
.environ
["PASSWORD"]
1423 samlogon
= "schannel;samlogon %s %s %s" % (user
, password
, workstation
)
1425 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1427 messages
= self
.waitForMessages(isLastExpectedMessage
)
1428 messages
= self
.remove_netlogon_messages(messages
)
1429 received
= len(messages
)
1431 (received
== 4 or received
== 5),
1432 "Did not receive the expected number of messages")
1434 # Check the second to last message it should be an Authorization
1436 self
.assertEqual("Authorization", msg
["type"])
1437 self
.assertEqual("DCE/RPC",
1438 msg
["Authorization"]["serviceDescription"])
1439 self
.assertEqual("schannel", msg
["Authorization"]["authType"])
1440 self
.assertEqual("SEAL", msg
["Authorization"]["transportProtection"])
1441 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
1443 # Signed logons get promoted to sealed, this test ensures that
1444 # this behaviour is not removed accidentally
1445 def test_samlogon_schannel_sign(self
):
1447 workstation
= "AuthLogTests"
1449 def isLastExpectedMessage(msg
):
1450 return ((msg
["type"] == "Authentication") and
1451 (msg
["Authentication"]["serviceDescription"] ==
1453 (msg
["Authentication"]["authDescription"] == "network") and
1454 (msg
["Authentication"]["status"] == "NT_STATUS_OK") and
1455 (msg
["Authentication"]["workstation"] ==
1456 r
"\\%s" % workstation
) and
1457 (msg
["Authentication"]["eventId"] ==
1458 EVT_ID_SUCCESSFUL_LOGON
) and
1459 (msg
["Authentication"]["logonType"] ==
1462 server
= os
.environ
["SERVER"]
1463 user
= os
.environ
["USERNAME"]
1464 password
= os
.environ
["PASSWORD"]
1465 samlogon
= "schannelsign;samlogon %s %s %s" % (
1466 user
, password
, workstation
)
1468 call(["bin/rpcclient", "-c", samlogon
, "-U%", server
])
1470 messages
= self
.waitForMessages(isLastExpectedMessage
)
1471 messages
= self
.remove_netlogon_messages(messages
)
1472 received
= len(messages
)
1474 (received
== 4 or received
== 5),
1475 "Did not receive the expected number of messages")
1477 # Check the second to last message it should be an Authorization
1479 self
.assertEqual("Authorization", msg
["type"])
1480 self
.assertEqual("DCE/RPC",
1481 msg
["Authorization"]["serviceDescription"])
1482 self
.assertEqual("schannel", msg
["Authorization"]["authType"])
1483 self
.assertEqual("SEAL", msg
["Authorization"]["transportProtection"])
1484 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
1487 if __name__
== '__main__':