1 # Integration tests for pycredentials
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba
.tests
import TestCase
, delete_force
24 from samba
.auth
import system_session
25 from samba
.credentials
import (
30 from samba
.dcerpc
import lsa
, netlogon
, ntlmssp
, security
, srvsvc
31 from samba
.dcerpc
.netlogon
import (
33 netr_WorkstationInformation
,
34 MSV1_0_ALLOW_MSVCHAPV2
36 from samba
.dcerpc
.misc
import SEC_CHAN_WKSTA
37 from samba
.dsdb
import (
38 UF_WORKSTATION_TRUST_ACCOUNT
,
41 from samba
.ndr
import ndr_pack
, ndr_unpack
42 from samba
.samdb
import SamDB
43 from samba
import NTSTATUSError
, ntstatus
44 from samba
.common
import get_string
45 from samba
.sd_utils
import SDUtils
51 Integration tests for pycredentials
57 class PyCredentialsTests(TestCase
):
62 self
.server
= os
.environ
["SERVER"]
63 self
.domain
= os
.environ
["DOMAIN"]
64 self
.host
= os
.environ
["SERVER_IP"]
65 self
.lp
= self
.get_loadparm()
67 self
.credentials
= self
.get_credentials()
69 self
.session
= system_session()
70 self
.ldb
= SamDB(url
="ldap://%s" % self
.host
,
71 session_info
=self
.session
,
72 credentials
=self
.credentials
,
75 self
.create_machine_account()
76 self
.create_user_account()
80 delete_force(self
.ldb
, self
.machine_dn
)
81 delete_force(self
.ldb
, self
.user_dn
)
83 # Until a successful netlogon connection has been established there will
84 # not be a valid authenticator associated with the credentials
85 # and new_client_authenticator should throw a ValueError
86 def test_no_netlogon_connection(self
):
87 self
.assertRaises(ValueError,
88 self
.machine_creds
.new_client_authenticator
)
90 # Once a netlogon connection has been established,
91 # new_client_authenticator should return a value
93 def test_have_netlogon_connection(self
):
94 c
= self
.get_netlogon_connection()
95 a
= self
.machine_creds
.new_client_authenticator()
96 self
.assertIsNotNone(a
)
98 # Get an authenticator and use it on a sequence of operations requiring
100 def test_client_authenticator(self
):
101 c
= self
.get_netlogon_connection()
102 (authenticator
, subsequent
) = self
.get_authenticator()
103 self
.do_NetrLogonSamLogonWithFlags(c
, authenticator
, subsequent
)
104 (authenticator
, subsequent
) = self
.get_authenticator()
105 self
.do_NetrLogonGetDomainInfo(c
, authenticator
, subsequent
)
106 (authenticator
, subsequent
) = self
.get_authenticator()
107 self
.do_NetrLogonGetDomainInfo(c
, authenticator
, subsequent
)
108 (authenticator
, subsequent
) = self
.get_authenticator()
109 self
.do_NetrLogonGetDomainInfo(c
, authenticator
, subsequent
)
111 # Test using LogonGetDomainInfo to update dNSHostName to an allowed value.
112 def test_set_dns_hostname_valid(self
):
113 c
= self
.get_netlogon_connection()
114 authenticator
, subsequent
= self
.get_authenticator()
116 domain_hostname
= self
.ldb
.domain_dns_name()
118 new_dns_hostname
= f
'{self.machine_name}.{domain_hostname}'
119 new_dns_hostname
= new_dns_hostname
.encode('utf-8')
121 query
= netr_WorkstationInformation()
122 query
.os_name
= lsa
.String('some OS')
123 query
.dns_hostname
= new_dns_hostname
125 c
.netr_LogonGetDomainInfo(
126 server_name
=self
.server
,
127 computer_name
=self
.user_creds
.get_workstation(),
128 credential
=authenticator
,
129 return_authenticator
=subsequent
,
135 res
= self
.ldb
.search(self
.machine_dn
,
136 scope
=ldb
.SCOPE_BASE
,
137 attrs
=['dNSHostName'])
138 self
.assertEqual(1, len(res
))
140 got_dns_hostname
= res
[0].get('dNSHostName', idx
=0)
141 self
.assertEqual(new_dns_hostname
, got_dns_hostname
)
143 # Test using LogonGetDomainInfo to update dNSHostName to an allowed value,
144 # when we are denied the right to do so.
145 def test_set_dns_hostname_valid_denied(self
):
146 c
= self
.get_netlogon_connection()
147 authenticator
, subsequent
= self
.get_authenticator()
149 res
= self
.ldb
.search(self
.machine_dn
,
150 scope
=ldb
.SCOPE_BASE
,
152 self
.assertEqual(1, len(res
))
154 machine_sid
= ndr_unpack(security
.dom_sid
,
155 res
[0].get('objectSid', idx
=0))
157 sd_utils
= SDUtils(self
.ldb
)
159 # Deny Validated Write and Write Property.
160 mod
= (f
'(OD;;SWWP;{security.GUID_DRS_DNS_HOST_NAME};;'
162 sd_utils
.dacl_add_ace(self
.machine_dn
, mod
)
164 domain_hostname
= self
.ldb
.domain_dns_name()
166 new_dns_hostname
= f
'{self.machine_name}.{domain_hostname}'
167 new_dns_hostname
= new_dns_hostname
.encode('utf-8')
169 query
= netr_WorkstationInformation()
170 query
.os_name
= lsa
.String('some OS')
171 query
.dns_hostname
= new_dns_hostname
173 c
.netr_LogonGetDomainInfo(
174 server_name
=self
.server
,
175 computer_name
=self
.user_creds
.get_workstation(),
176 credential
=authenticator
,
177 return_authenticator
=subsequent
,
183 res
= self
.ldb
.search(self
.machine_dn
,
184 scope
=ldb
.SCOPE_BASE
,
185 attrs
=['dNSHostName'])
186 self
.assertEqual(1, len(res
))
188 got_dns_hostname
= res
[0].get('dNSHostName', idx
=0)
189 self
.assertEqual(new_dns_hostname
, got_dns_hostname
)
191 # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
192 # invalid value, even with Validated Write.
193 def test_set_dns_hostname_invalid_validated_write(self
):
194 c
= self
.get_netlogon_connection()
195 authenticator
, subsequent
= self
.get_authenticator()
197 res
= self
.ldb
.search(self
.machine_dn
,
198 scope
=ldb
.SCOPE_BASE
,
200 self
.assertEqual(1, len(res
))
202 machine_sid
= ndr_unpack(security
.dom_sid
,
203 res
[0].get('objectSid', idx
=0))
205 sd_utils
= SDUtils(self
.ldb
)
207 # Grant Validated Write.
208 mod
= (f
'(OA;;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
210 sd_utils
.dacl_add_ace(self
.machine_dn
, mod
)
212 new_dns_hostname
= b
'invalid'
214 query
= netr_WorkstationInformation()
215 query
.os_name
= lsa
.String('some OS')
216 query
.dns_hostname
= new_dns_hostname
218 c
.netr_LogonGetDomainInfo(
219 server_name
=self
.server
,
220 computer_name
=self
.user_creds
.get_workstation(),
221 credential
=authenticator
,
222 return_authenticator
=subsequent
,
228 res
= self
.ldb
.search(self
.machine_dn
,
229 scope
=ldb
.SCOPE_BASE
,
230 attrs
=['dNSHostName'])
231 self
.assertEqual(1, len(res
))
233 got_dns_hostname
= res
[0].get('dNSHostName', idx
=0)
234 self
.assertIsNone(got_dns_hostname
)
236 # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
237 # invalid value, even with Write Property.
238 def test_set_dns_hostname_invalid_write_property(self
):
239 c
= self
.get_netlogon_connection()
240 authenticator
, subsequent
= self
.get_authenticator()
242 res
= self
.ldb
.search(self
.machine_dn
,
243 scope
=ldb
.SCOPE_BASE
,
245 self
.assertEqual(1, len(res
))
247 machine_sid
= ndr_unpack(security
.dom_sid
,
248 res
[0].get('objectSid', idx
=0))
250 sd_utils
= SDUtils(self
.ldb
)
252 # Grant Write Property.
253 mod
= (f
'(OA;;WP;{security.GUID_DRS_DNS_HOST_NAME};;'
255 sd_utils
.dacl_add_ace(self
.machine_dn
, mod
)
257 new_dns_hostname
= b
'invalid'
259 query
= netr_WorkstationInformation()
260 query
.os_name
= lsa
.String('some OS')
261 query
.dns_hostname
= new_dns_hostname
263 c
.netr_LogonGetDomainInfo(
264 server_name
=self
.server
,
265 computer_name
=self
.user_creds
.get_workstation(),
266 credential
=authenticator
,
267 return_authenticator
=subsequent
,
273 res
= self
.ldb
.search(self
.machine_dn
,
274 scope
=ldb
.SCOPE_BASE
,
275 attrs
=['dNSHostName'])
276 self
.assertEqual(1, len(res
))
278 got_dns_hostname
= res
[0].get('dNSHostName', idx
=0)
279 self
.assertIsNone(got_dns_hostname
)
281 # Show we can't use LogonGetDomainInfo to set the dNSHostName to just the
283 def test_set_dns_hostname_to_machine_name(self
):
284 c
= self
.get_netlogon_connection()
285 authenticator
, subsequent
= self
.get_authenticator()
287 new_dns_hostname
= self
.machine_name
.encode('utf-8')
289 query
= netr_WorkstationInformation()
290 query
.os_name
= lsa
.String('some OS')
291 query
.dns_hostname
= new_dns_hostname
293 c
.netr_LogonGetDomainInfo(
294 server_name
=self
.server
,
295 computer_name
=self
.user_creds
.get_workstation(),
296 credential
=authenticator
,
297 return_authenticator
=subsequent
,
303 res
= self
.ldb
.search(self
.machine_dn
,
304 scope
=ldb
.SCOPE_BASE
,
305 attrs
=['dNSHostName'])
306 self
.assertEqual(1, len(res
))
308 got_dns_hostname
= res
[0].get('dNSHostName', idx
=0)
309 self
.assertIsNone(got_dns_hostname
)
311 # Show we can't use LogonGetDomainInfo to set dNSHostName with an invalid
313 def test_set_dns_hostname_invalid_suffix(self
):
314 c
= self
.get_netlogon_connection()
315 authenticator
, subsequent
= self
.get_authenticator()
317 domain_hostname
= self
.ldb
.domain_dns_name()
319 new_dns_hostname
= f
'{self.machine_name}.foo.{domain_hostname}'
320 new_dns_hostname
= new_dns_hostname
.encode('utf-8')
322 query
= netr_WorkstationInformation()
323 query
.os_name
= lsa
.String('some OS')
324 query
.dns_hostname
= new_dns_hostname
326 c
.netr_LogonGetDomainInfo(
327 server_name
=self
.server
,
328 computer_name
=self
.user_creds
.get_workstation(),
329 credential
=authenticator
,
330 return_authenticator
=subsequent
,
336 res
= self
.ldb
.search(self
.machine_dn
,
337 scope
=ldb
.SCOPE_BASE
,
338 attrs
=['dNSHostName'])
339 self
.assertEqual(1, len(res
))
341 got_dns_hostname
= res
[0].get('dNSHostName', idx
=0)
342 self
.assertIsNone(got_dns_hostname
)
344 # Test that setting the HANDLES_SPN_UPDATE flag inhibits the dNSHostName
345 # update, but other attributes are still updated.
346 def test_set_dns_hostname_with_flag(self
):
347 c
= self
.get_netlogon_connection()
348 authenticator
, subsequent
= self
.get_authenticator()
350 domain_hostname
= self
.ldb
.domain_dns_name()
352 new_dns_hostname
= f
'{self.machine_name}.{domain_hostname}'
353 new_dns_hostname
= new_dns_hostname
.encode('utf-8')
355 operating_system
= 'some OS'
357 query
= netr_WorkstationInformation()
358 query
.os_name
= lsa
.String(operating_system
)
360 query
.dns_hostname
= new_dns_hostname
361 query
.workstation_flags
= netlogon
.NETR_WS_FLAG_HANDLES_SPN_UPDATE
363 c
.netr_LogonGetDomainInfo(
364 server_name
=self
.server
,
365 computer_name
=self
.user_creds
.get_workstation(),
366 credential
=authenticator
,
367 return_authenticator
=subsequent
,
373 res
= self
.ldb
.search(self
.machine_dn
,
374 scope
=ldb
.SCOPE_BASE
,
375 attrs
=['dNSHostName',
377 self
.assertEqual(1, len(res
))
379 got_dns_hostname
= res
[0].get('dNSHostName', idx
=0)
380 self
.assertIsNone(got_dns_hostname
)
382 got_os
= res
[0].get('operatingSystem', idx
=0)
383 self
.assertEqual(operating_system
.encode('utf-8'), got_os
)
385 def test_SamLogonEx(self
):
386 c
= self
.get_netlogon_connection()
388 logon
= samlogon_logon_info(self
.domain
,
392 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
393 validation_level
= netlogon
.NetlogonValidationSamInfo4
397 c
.netr_LogonSamLogonEx(self
.server
,
398 self
.user_creds
.get_workstation(),
403 except NTSTATUSError
as e
:
404 enum
= ctypes
.c_uint32(e
.args
[0]).value
405 if enum
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
406 self
.fail("got wrong password error")
410 def test_SamLogonEx_no_domain(self
):
411 c
= self
.get_netlogon_connection()
413 self
.user_creds
.set_domain('')
415 logon
= samlogon_logon_info(self
.domain
,
419 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
420 validation_level
= netlogon
.NetlogonValidationSamInfo4
424 c
.netr_LogonSamLogonEx(self
.server
,
425 self
.user_creds
.get_workstation(),
430 except NTSTATUSError
as e
:
431 enum
= ctypes
.c_uint32(e
.args
[0]).value
432 if enum
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
433 self
.fail("got wrong password error")
435 self
.fail("got unexpected error" + str(e
))
437 def test_SamLogonExNTLM(self
):
438 c
= self
.get_netlogon_connection()
440 logon
= samlogon_logon_info(self
.domain
,
443 flags
=CLI_CRED_NTLM_AUTH
)
445 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
446 validation_level
= netlogon
.NetlogonValidationSamInfo4
450 c
.netr_LogonSamLogonEx(self
.server
,
451 self
.user_creds
.get_workstation(),
456 except NTSTATUSError
as e
:
457 enum
= ctypes
.c_uint32(e
.args
[0]).value
458 if enum
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
459 self
.fail("got wrong password error")
463 def test_SamLogonExMSCHAPv2(self
):
464 c
= self
.get_netlogon_connection()
466 logon
= samlogon_logon_info(self
.domain
,
469 flags
=CLI_CRED_NTLM_AUTH
)
471 logon
.identity_info
.parameter_control
= MSV1_0_ALLOW_MSVCHAPV2
473 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
474 validation_level
= netlogon
.NetlogonValidationSamInfo4
478 c
.netr_LogonSamLogonEx(self
.server
,
479 self
.user_creds
.get_workstation(),
484 except NTSTATUSError
as e
:
485 enum
= ctypes
.c_uint32(e
.args
[0]).value
486 if enum
== ntstatus
.NT_STATUS_WRONG_PASSWORD
:
487 self
.fail("got wrong password error")
491 # Test Credentials.encrypt_netr_crypt_password
492 # By performing a NetrServerPasswordSet2
493 # And the logging on using the new password.
495 def test_encrypt_netr_password(self
):
496 # Change the password
497 self
.do_Netr_ServerPasswordSet2()
498 # Now use the new password to perform an operation
499 srvsvc
.srvsvc("ncacn_np:%s" % (self
.server
),
503 # Change the current machine account password with a
504 # netr_ServerPasswordSet2 call.
506 def do_Netr_ServerPasswordSet2(self
):
507 c
= self
.get_netlogon_connection()
508 (authenticator
, subsequent
) = self
.get_authenticator()
511 newpass
= samba
.generate_random_password(PWD_LEN
, PWD_LEN
)
512 encoded
= newpass
.encode('utf-16-le')
513 pwd_len
= len(encoded
)
514 filler
= list(os
.urandom(DATA_LEN
- pwd_len
))
515 pwd
= netlogon
.netr_CryptPassword()
517 pwd
.data
= filler
+ list(encoded
)
518 self
.machine_creds
.encrypt_netr_crypt_password(pwd
)
519 c
.netr_ServerPasswordSet2(self
.server
,
520 f
'{self.machine_name}$',
522 self
.machine_creds
.get_workstation(),
526 self
.machine_pass
= newpass
527 self
.machine_creds
.set_password(newpass
)
529 # Establish sealed schannel netlogon connection over TCP/IP
531 def get_netlogon_connection(self
):
532 return netlogon
.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self
.server
,
537 # Create the machine account
538 def create_machine_account(self
):
539 self
.machine_pass
= samba
.generate_random_password(32, 32)
540 self
.machine_name
= MACHINE_NAME
541 self
.machine_dn
= "cn=%s,%s" % (self
.machine_name
, self
.ldb
.domain_dn())
543 # remove the account if it exists, this will happen if a previous test
545 delete_force(self
.ldb
, self
.machine_dn
)
547 utf16pw
= ('"%s"' % get_string(self
.machine_pass
)).encode('utf-16-le')
549 "dn": self
.machine_dn
,
550 "objectclass": "computer",
551 "sAMAccountName": "%s$" % self
.machine_name
,
552 "userAccountControl":
553 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD
),
554 "unicodePwd": utf16pw
})
556 self
.machine_creds
= Credentials()
557 self
.machine_creds
.guess(self
.get_loadparm())
558 self
.machine_creds
.set_secure_channel_type(SEC_CHAN_WKSTA
)
559 self
.machine_creds
.set_kerberos_state(DONT_USE_KERBEROS
)
560 self
.machine_creds
.set_password(self
.machine_pass
)
561 self
.machine_creds
.set_username(self
.machine_name
+ "$")
562 self
.machine_creds
.set_workstation(self
.machine_name
)
565 # Create a test user account
566 def create_user_account(self
):
567 self
.user_pass
= samba
.generate_random_password(32, 32)
568 self
.user_name
= USER_NAME
569 self
.user_dn
= "cn=%s,%s" % (self
.user_name
, self
.ldb
.domain_dn())
571 # remove the account if it exists, this will happen if a previous test
573 delete_force(self
.ldb
, self
.user_dn
)
575 utf16pw
= ('"%s"' % get_string(self
.user_pass
)).encode('utf-16-le')
578 "objectclass": "user",
579 "sAMAccountName": "%s" % self
.user_name
,
580 "userAccountControl": str(UF_NORMAL_ACCOUNT
),
581 "unicodePwd": utf16pw
})
583 self
.user_creds
= Credentials()
584 self
.user_creds
.guess(self
.get_loadparm())
585 self
.user_creds
.set_password(self
.user_pass
)
586 self
.user_creds
.set_username(self
.user_name
)
587 self
.user_creds
.set_workstation(self
.machine_name
)
590 # Get the authenticator from the machine creds.
591 def get_authenticator(self
):
592 auth
= self
.machine_creds
.new_client_authenticator()
593 current
= netr_Authenticator()
594 current
.cred
.data
= list(auth
["credential"])
595 current
.timestamp
= auth
["timestamp"]
597 subsequent
= netr_Authenticator()
598 return (current
, subsequent
)
600 def do_NetrLogonSamLogonWithFlags(self
, c
, current
, subsequent
):
601 logon
= samlogon_logon_info(self
.domain
,
605 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
606 validation_level
= netlogon
.NetlogonValidationSamInfo4
608 c
.netr_LogonSamLogonWithFlags(self
.server
,
609 self
.user_creds
.get_workstation(),
617 def do_NetrLogonGetDomainInfo(self
, c
, current
, subsequent
):
618 query
= netr_WorkstationInformation()
620 c
.netr_LogonGetDomainInfo(self
.server
,
621 self
.user_creds
.get_workstation(),
628 # Build the logon data required by NetrLogonSamLogonWithFlags
631 def samlogon_logon_info(domain_name
, computer_name
, creds
,
632 flags
=CLI_CRED_NTLMv2_AUTH
):
634 target_info_blob
= samlogon_target(domain_name
, computer_name
)
636 challenge
= b
"abcdefgh"
637 # User account under test
638 response
= creds
.get_ntlm_response(flags
=flags
,
640 target_info
=target_info_blob
)
642 logon
= netlogon
.netr_NetworkInfo()
644 logon
.challenge
= list(challenge
)
645 logon
.nt
= netlogon
.netr_ChallengeResponse()
646 logon
.nt
.length
= len(response
["nt_response"])
647 logon
.nt
.data
= list(response
["nt_response"])
648 logon
.identity_info
= netlogon
.netr_IdentityInfo()
650 (username
, domain
) = creds
.get_ntlm_username_domain()
651 logon
.identity_info
.domain_name
.string
= domain
652 logon
.identity_info
.account_name
.string
= username
653 logon
.identity_info
.workstation
.string
= creds
.get_workstation()
658 # Build the samlogon target info.
661 def samlogon_target(domain_name
, computer_name
):
662 target_info
= ntlmssp
.AV_PAIR_LIST()
663 target_info
.count
= 3
664 computername
= ntlmssp
.AV_PAIR()
665 computername
.AvId
= ntlmssp
.MsvAvNbComputerName
666 computername
.Value
= computer_name
668 domainname
= ntlmssp
.AV_PAIR()
669 domainname
.AvId
= ntlmssp
.MsvAvNbDomainName
670 domainname
.Value
= domain_name
672 eol
= ntlmssp
.AV_PAIR()
673 eol
.AvId
= ntlmssp
.MsvAvEOL
674 target_info
.pair
= [domainname
, computername
, eol
]
676 return ndr_pack(target_info
)