ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / py_credentials.py
blob7d473b19a39c9a936659edcd0872c0bf57e74aec
1 # Integration tests for pycredentials
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.tests import TestCase, delete_force
19 import os
21 import ldb
23 import samba
24 from samba.auth import system_session
25 from samba.credentials import (
26 Credentials,
27 CLI_CRED_NTLMv2_AUTH,
28 CLI_CRED_NTLM_AUTH,
29 DONT_USE_KERBEROS)
30 from samba.dcerpc import lsa, netlogon, ntlmssp, security, srvsvc
31 from samba.dcerpc.netlogon import (
32 netr_Authenticator,
33 netr_WorkstationInformation,
34 MSV1_0_ALLOW_MSVCHAPV2
36 from samba.dcerpc.misc import SEC_CHAN_WKSTA
37 from samba.dsdb import (
38 UF_WORKSTATION_TRUST_ACCOUNT,
39 UF_PASSWD_NOTREQD,
40 UF_NORMAL_ACCOUNT)
41 from samba.ndr import ndr_pack, ndr_unpack
42 from samba.samdb import SamDB
43 from samba import NTSTATUSError, ntstatus
44 from samba.common import get_string
45 from samba.sd_utils import SDUtils
47 import ctypes
50 """
51 Integration tests for pycredentials
52 """
54 MACHINE_NAME = "PCTM"
55 USER_NAME = "PCTU"
57 class PyCredentialsTests(TestCase):
59 def setUp(self):
60 super().setUp()
62 self.server = os.environ["SERVER"]
63 self.domain = os.environ["DOMAIN"]
64 self.host = os.environ["SERVER_IP"]
65 self.lp = self.get_loadparm()
67 self.credentials = self.get_credentials()
69 self.session = system_session()
70 self.ldb = SamDB(url="ldap://%s" % self.host,
71 session_info=self.session,
72 credentials=self.credentials,
73 lp=self.lp)
75 self.create_machine_account()
76 self.create_user_account()
78 def tearDown(self):
79 super().tearDown()
80 delete_force(self.ldb, self.machine_dn)
81 delete_force(self.ldb, self.user_dn)
83 # Until a successful netlogon connection has been established there will
84 # not be a valid authenticator associated with the credentials
85 # and new_client_authenticator should throw a ValueError
86 def test_no_netlogon_connection(self):
87 self.assertRaises(ValueError,
88 self.machine_creds.new_client_authenticator)
90 # Once a netlogon connection has been established,
91 # new_client_authenticator should return a value
93 def test_have_netlogon_connection(self):
94 c = self.get_netlogon_connection()
95 a = self.machine_creds.new_client_authenticator()
96 self.assertIsNotNone(a)
98 # Get an authenticator and use it on a sequence of operations requiring
99 # an authenticator
100 def test_client_authenticator(self):
101 c = self.get_netlogon_connection()
102 (authenticator, subsequent) = self.get_authenticator()
103 self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent)
104 (authenticator, subsequent) = self.get_authenticator()
105 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
106 (authenticator, subsequent) = self.get_authenticator()
107 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
108 (authenticator, subsequent) = self.get_authenticator()
109 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
111 # Test using LogonGetDomainInfo to update dNSHostName to an allowed value.
112 def test_set_dns_hostname_valid(self):
113 c = self.get_netlogon_connection()
114 authenticator, subsequent = self.get_authenticator()
116 domain_hostname = self.ldb.domain_dns_name()
118 new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
119 new_dns_hostname = new_dns_hostname.encode('utf-8')
121 query = netr_WorkstationInformation()
122 query.os_name = lsa.String('some OS')
123 query.dns_hostname = new_dns_hostname
125 c.netr_LogonGetDomainInfo(
126 server_name=self.server,
127 computer_name=self.user_creds.get_workstation(),
128 credential=authenticator,
129 return_authenticator=subsequent,
130 level=1,
131 query=query)
133 # Check the result.
135 res = self.ldb.search(self.machine_dn,
136 scope=ldb.SCOPE_BASE,
137 attrs=['dNSHostName'])
138 self.assertEqual(1, len(res))
140 got_dns_hostname = res[0].get('dNSHostName', idx=0)
141 self.assertEqual(new_dns_hostname, got_dns_hostname)
143 # Test using LogonGetDomainInfo to update dNSHostName to an allowed value,
144 # when we are denied the right to do so.
145 def test_set_dns_hostname_valid_denied(self):
146 c = self.get_netlogon_connection()
147 authenticator, subsequent = self.get_authenticator()
149 res = self.ldb.search(self.machine_dn,
150 scope=ldb.SCOPE_BASE,
151 attrs=['objectSid'])
152 self.assertEqual(1, len(res))
154 machine_sid = ndr_unpack(security.dom_sid,
155 res[0].get('objectSid', idx=0))
157 sd_utils = SDUtils(self.ldb)
159 # Deny Validated Write and Write Property.
160 mod = (f'(OD;;SWWP;{security.GUID_DRS_DNS_HOST_NAME};;'
161 f'{machine_sid})')
162 sd_utils.dacl_add_ace(self.machine_dn, mod)
164 domain_hostname = self.ldb.domain_dns_name()
166 new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
167 new_dns_hostname = new_dns_hostname.encode('utf-8')
169 query = netr_WorkstationInformation()
170 query.os_name = lsa.String('some OS')
171 query.dns_hostname = new_dns_hostname
173 c.netr_LogonGetDomainInfo(
174 server_name=self.server,
175 computer_name=self.user_creds.get_workstation(),
176 credential=authenticator,
177 return_authenticator=subsequent,
178 level=1,
179 query=query)
181 # Check the result.
183 res = self.ldb.search(self.machine_dn,
184 scope=ldb.SCOPE_BASE,
185 attrs=['dNSHostName'])
186 self.assertEqual(1, len(res))
188 got_dns_hostname = res[0].get('dNSHostName', idx=0)
189 self.assertEqual(new_dns_hostname, got_dns_hostname)
191 # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
192 # invalid value, even with Validated Write.
193 def test_set_dns_hostname_invalid_validated_write(self):
194 c = self.get_netlogon_connection()
195 authenticator, subsequent = self.get_authenticator()
197 res = self.ldb.search(self.machine_dn,
198 scope=ldb.SCOPE_BASE,
199 attrs=['objectSid'])
200 self.assertEqual(1, len(res))
202 machine_sid = ndr_unpack(security.dom_sid,
203 res[0].get('objectSid', idx=0))
205 sd_utils = SDUtils(self.ldb)
207 # Grant Validated Write.
208 mod = (f'(OA;;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
209 f'{machine_sid})')
210 sd_utils.dacl_add_ace(self.machine_dn, mod)
212 new_dns_hostname = b'invalid'
214 query = netr_WorkstationInformation()
215 query.os_name = lsa.String('some OS')
216 query.dns_hostname = new_dns_hostname
218 c.netr_LogonGetDomainInfo(
219 server_name=self.server,
220 computer_name=self.user_creds.get_workstation(),
221 credential=authenticator,
222 return_authenticator=subsequent,
223 level=1,
224 query=query)
226 # Check the result.
228 res = self.ldb.search(self.machine_dn,
229 scope=ldb.SCOPE_BASE,
230 attrs=['dNSHostName'])
231 self.assertEqual(1, len(res))
233 got_dns_hostname = res[0].get('dNSHostName', idx=0)
234 self.assertIsNone(got_dns_hostname)
236 # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
237 # invalid value, even with Write Property.
238 def test_set_dns_hostname_invalid_write_property(self):
239 c = self.get_netlogon_connection()
240 authenticator, subsequent = self.get_authenticator()
242 res = self.ldb.search(self.machine_dn,
243 scope=ldb.SCOPE_BASE,
244 attrs=['objectSid'])
245 self.assertEqual(1, len(res))
247 machine_sid = ndr_unpack(security.dom_sid,
248 res[0].get('objectSid', idx=0))
250 sd_utils = SDUtils(self.ldb)
252 # Grant Write Property.
253 mod = (f'(OA;;WP;{security.GUID_DRS_DNS_HOST_NAME};;'
254 f'{machine_sid})')
255 sd_utils.dacl_add_ace(self.machine_dn, mod)
257 new_dns_hostname = b'invalid'
259 query = netr_WorkstationInformation()
260 query.os_name = lsa.String('some OS')
261 query.dns_hostname = new_dns_hostname
263 c.netr_LogonGetDomainInfo(
264 server_name=self.server,
265 computer_name=self.user_creds.get_workstation(),
266 credential=authenticator,
267 return_authenticator=subsequent,
268 level=1,
269 query=query)
271 # Check the result.
273 res = self.ldb.search(self.machine_dn,
274 scope=ldb.SCOPE_BASE,
275 attrs=['dNSHostName'])
276 self.assertEqual(1, len(res))
278 got_dns_hostname = res[0].get('dNSHostName', idx=0)
279 self.assertIsNone(got_dns_hostname)
281 # Show we can't use LogonGetDomainInfo to set the dNSHostName to just the
282 # machine name.
283 def test_set_dns_hostname_to_machine_name(self):
284 c = self.get_netlogon_connection()
285 authenticator, subsequent = self.get_authenticator()
287 new_dns_hostname = self.machine_name.encode('utf-8')
289 query = netr_WorkstationInformation()
290 query.os_name = lsa.String('some OS')
291 query.dns_hostname = new_dns_hostname
293 c.netr_LogonGetDomainInfo(
294 server_name=self.server,
295 computer_name=self.user_creds.get_workstation(),
296 credential=authenticator,
297 return_authenticator=subsequent,
298 level=1,
299 query=query)
301 # Check the result.
303 res = self.ldb.search(self.machine_dn,
304 scope=ldb.SCOPE_BASE,
305 attrs=['dNSHostName'])
306 self.assertEqual(1, len(res))
308 got_dns_hostname = res[0].get('dNSHostName', idx=0)
309 self.assertIsNone(got_dns_hostname)
311 # Show we can't use LogonGetDomainInfo to set dNSHostName with an invalid
312 # suffix.
313 def test_set_dns_hostname_invalid_suffix(self):
314 c = self.get_netlogon_connection()
315 authenticator, subsequent = self.get_authenticator()
317 domain_hostname = self.ldb.domain_dns_name()
319 new_dns_hostname = f'{self.machine_name}.foo.{domain_hostname}'
320 new_dns_hostname = new_dns_hostname.encode('utf-8')
322 query = netr_WorkstationInformation()
323 query.os_name = lsa.String('some OS')
324 query.dns_hostname = new_dns_hostname
326 c.netr_LogonGetDomainInfo(
327 server_name=self.server,
328 computer_name=self.user_creds.get_workstation(),
329 credential=authenticator,
330 return_authenticator=subsequent,
331 level=1,
332 query=query)
334 # Check the result.
336 res = self.ldb.search(self.machine_dn,
337 scope=ldb.SCOPE_BASE,
338 attrs=['dNSHostName'])
339 self.assertEqual(1, len(res))
341 got_dns_hostname = res[0].get('dNSHostName', idx=0)
342 self.assertIsNone(got_dns_hostname)
344 # Test that setting the HANDLES_SPN_UPDATE flag inhibits the dNSHostName
345 # update, but other attributes are still updated.
346 def test_set_dns_hostname_with_flag(self):
347 c = self.get_netlogon_connection()
348 authenticator, subsequent = self.get_authenticator()
350 domain_hostname = self.ldb.domain_dns_name()
352 new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
353 new_dns_hostname = new_dns_hostname.encode('utf-8')
355 operating_system = 'some OS'
357 query = netr_WorkstationInformation()
358 query.os_name = lsa.String(operating_system)
360 query.dns_hostname = new_dns_hostname
361 query.workstation_flags = netlogon.NETR_WS_FLAG_HANDLES_SPN_UPDATE
363 c.netr_LogonGetDomainInfo(
364 server_name=self.server,
365 computer_name=self.user_creds.get_workstation(),
366 credential=authenticator,
367 return_authenticator=subsequent,
368 level=1,
369 query=query)
371 # Check the result.
373 res = self.ldb.search(self.machine_dn,
374 scope=ldb.SCOPE_BASE,
375 attrs=['dNSHostName',
376 'operatingSystem'])
377 self.assertEqual(1, len(res))
379 got_dns_hostname = res[0].get('dNSHostName', idx=0)
380 self.assertIsNone(got_dns_hostname)
382 got_os = res[0].get('operatingSystem', idx=0)
383 self.assertEqual(operating_system.encode('utf-8'), got_os)
385 def test_SamLogonEx(self):
386 c = self.get_netlogon_connection()
388 logon = samlogon_logon_info(self.domain,
389 self.machine_name,
390 self.user_creds)
392 logon_level = netlogon.NetlogonNetworkTransitiveInformation
393 validation_level = netlogon.NetlogonValidationSamInfo4
394 netr_flags = 0
396 try:
397 c.netr_LogonSamLogonEx(self.server,
398 self.user_creds.get_workstation(),
399 logon_level,
400 logon,
401 validation_level,
402 netr_flags)
403 except NTSTATUSError as e:
404 enum = ctypes.c_uint32(e.args[0]).value
405 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
406 self.fail("got wrong password error")
407 else:
408 raise
410 def test_SamLogonEx_no_domain(self):
411 c = self.get_netlogon_connection()
413 self.user_creds.set_domain('')
415 logon = samlogon_logon_info(self.domain,
416 self.machine_name,
417 self.user_creds)
419 logon_level = netlogon.NetlogonNetworkTransitiveInformation
420 validation_level = netlogon.NetlogonValidationSamInfo4
421 netr_flags = 0
423 try:
424 c.netr_LogonSamLogonEx(self.server,
425 self.user_creds.get_workstation(),
426 logon_level,
427 logon,
428 validation_level,
429 netr_flags)
430 except NTSTATUSError as e:
431 enum = ctypes.c_uint32(e.args[0]).value
432 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
433 self.fail("got wrong password error")
434 else:
435 self.fail("got unexpected error" + str(e))
437 def test_SamLogonExNTLM(self):
438 c = self.get_netlogon_connection()
440 logon = samlogon_logon_info(self.domain,
441 self.machine_name,
442 self.user_creds,
443 flags=CLI_CRED_NTLM_AUTH)
445 logon_level = netlogon.NetlogonNetworkTransitiveInformation
446 validation_level = netlogon.NetlogonValidationSamInfo4
447 netr_flags = 0
449 try:
450 c.netr_LogonSamLogonEx(self.server,
451 self.user_creds.get_workstation(),
452 logon_level,
453 logon,
454 validation_level,
455 netr_flags)
456 except NTSTATUSError as e:
457 enum = ctypes.c_uint32(e.args[0]).value
458 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
459 self.fail("got wrong password error")
460 else:
461 raise
463 def test_SamLogonExMSCHAPv2(self):
464 c = self.get_netlogon_connection()
466 logon = samlogon_logon_info(self.domain,
467 self.machine_name,
468 self.user_creds,
469 flags=CLI_CRED_NTLM_AUTH)
471 logon.identity_info.parameter_control = MSV1_0_ALLOW_MSVCHAPV2
473 logon_level = netlogon.NetlogonNetworkTransitiveInformation
474 validation_level = netlogon.NetlogonValidationSamInfo4
475 netr_flags = 0
477 try:
478 c.netr_LogonSamLogonEx(self.server,
479 self.user_creds.get_workstation(),
480 logon_level,
481 logon,
482 validation_level,
483 netr_flags)
484 except NTSTATUSError as e:
485 enum = ctypes.c_uint32(e.args[0]).value
486 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
487 self.fail("got wrong password error")
488 else:
489 raise
491 # Test Credentials.encrypt_netr_crypt_password
492 # By performing a NetrServerPasswordSet2
493 # And the logging on using the new password.
495 def test_encrypt_netr_password(self):
496 # Change the password
497 self.do_Netr_ServerPasswordSet2()
498 # Now use the new password to perform an operation
499 srvsvc.srvsvc("ncacn_np:%s" % (self.server),
500 self.lp,
501 self.machine_creds)
503 # Change the current machine account password with a
504 # netr_ServerPasswordSet2 call.
506 def do_Netr_ServerPasswordSet2(self):
507 c = self.get_netlogon_connection()
508 (authenticator, subsequent) = self.get_authenticator()
509 PWD_LEN = 32
510 DATA_LEN = 512
511 newpass = samba.generate_random_password(PWD_LEN, PWD_LEN)
512 encoded = newpass.encode('utf-16-le')
513 pwd_len = len(encoded)
514 filler = list(os.urandom(DATA_LEN - pwd_len))
515 pwd = netlogon.netr_CryptPassword()
516 pwd.length = pwd_len
517 pwd.data = filler + list(encoded)
518 self.machine_creds.encrypt_netr_crypt_password(pwd)
519 c.netr_ServerPasswordSet2(self.server,
520 f'{self.machine_name}$',
521 SEC_CHAN_WKSTA,
522 self.machine_creds.get_workstation(),
523 authenticator,
524 pwd)
526 self.machine_pass = newpass
527 self.machine_creds.set_password(newpass)
529 # Establish sealed schannel netlogon connection over TCP/IP
531 def get_netlogon_connection(self):
532 return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
533 self.lp,
534 self.machine_creds)
537 # Create the machine account
538 def create_machine_account(self):
539 self.machine_pass = samba.generate_random_password(32, 32)
540 self.machine_name = MACHINE_NAME
541 self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn())
543 # remove the account if it exists, this will happen if a previous test
544 # run failed
545 delete_force(self.ldb, self.machine_dn)
547 utf16pw = ('"%s"' % get_string(self.machine_pass)).encode('utf-16-le')
548 self.ldb.add({
549 "dn": self.machine_dn,
550 "objectclass": "computer",
551 "sAMAccountName": "%s$" % self.machine_name,
552 "userAccountControl":
553 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
554 "unicodePwd": utf16pw})
556 self.machine_creds = Credentials()
557 self.machine_creds.guess(self.get_loadparm())
558 self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
559 self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
560 self.machine_creds.set_password(self.machine_pass)
561 self.machine_creds.set_username(self.machine_name + "$")
562 self.machine_creds.set_workstation(self.machine_name)
565 # Create a test user account
566 def create_user_account(self):
567 self.user_pass = samba.generate_random_password(32, 32)
568 self.user_name = USER_NAME
569 self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
571 # remove the account if it exists, this will happen if a previous test
572 # run failed
573 delete_force(self.ldb, self.user_dn)
575 utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le')
576 self.ldb.add({
577 "dn": self.user_dn,
578 "objectclass": "user",
579 "sAMAccountName": "%s" % self.user_name,
580 "userAccountControl": str(UF_NORMAL_ACCOUNT),
581 "unicodePwd": utf16pw})
583 self.user_creds = Credentials()
584 self.user_creds.guess(self.get_loadparm())
585 self.user_creds.set_password(self.user_pass)
586 self.user_creds.set_username(self.user_name)
587 self.user_creds.set_workstation(self.machine_name)
590 # Get the authenticator from the machine creds.
591 def get_authenticator(self):
592 auth = self.machine_creds.new_client_authenticator()
593 current = netr_Authenticator()
594 current.cred.data = list(auth["credential"])
595 current.timestamp = auth["timestamp"]
597 subsequent = netr_Authenticator()
598 return (current, subsequent)
600 def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent):
601 logon = samlogon_logon_info(self.domain,
602 self.machine_name,
603 self.user_creds)
605 logon_level = netlogon.NetlogonNetworkTransitiveInformation
606 validation_level = netlogon.NetlogonValidationSamInfo4
607 netr_flags = 0
608 c.netr_LogonSamLogonWithFlags(self.server,
609 self.user_creds.get_workstation(),
610 current,
611 subsequent,
612 logon_level,
613 logon,
614 validation_level,
615 netr_flags)
617 def do_NetrLogonGetDomainInfo(self, c, current, subsequent):
618 query = netr_WorkstationInformation()
620 c.netr_LogonGetDomainInfo(self.server,
621 self.user_creds.get_workstation(),
622 current,
623 subsequent,
625 query)
628 # Build the logon data required by NetrLogonSamLogonWithFlags
631 def samlogon_logon_info(domain_name, computer_name, creds,
632 flags=CLI_CRED_NTLMv2_AUTH):
634 target_info_blob = samlogon_target(domain_name, computer_name)
636 challenge = b"abcdefgh"
637 # User account under test
638 response = creds.get_ntlm_response(flags=flags,
639 challenge=challenge,
640 target_info=target_info_blob)
642 logon = netlogon.netr_NetworkInfo()
644 logon.challenge = list(challenge)
645 logon.nt = netlogon.netr_ChallengeResponse()
646 logon.nt.length = len(response["nt_response"])
647 logon.nt.data = list(response["nt_response"])
648 logon.identity_info = netlogon.netr_IdentityInfo()
650 (username, domain) = creds.get_ntlm_username_domain()
651 logon.identity_info.domain_name.string = domain
652 logon.identity_info.account_name.string = username
653 logon.identity_info.workstation.string = creds.get_workstation()
655 return logon
658 # Build the samlogon target info.
661 def samlogon_target(domain_name, computer_name):
662 target_info = ntlmssp.AV_PAIR_LIST()
663 target_info.count = 3
664 computername = ntlmssp.AV_PAIR()
665 computername.AvId = ntlmssp.MsvAvNbComputerName
666 computername.Value = computer_name
668 domainname = ntlmssp.AV_PAIR()
669 domainname.AvId = ntlmssp.MsvAvNbDomainName
670 domainname.Value = domain_name
672 eol = ntlmssp.AV_PAIR()
673 eol.AvId = ntlmssp.MsvAvEOL
674 target_info.pair = [domainname, computername, eol]
676 return ndr_pack(target_info)