1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 sys
.path
.insert(0, "bin/python")
23 os
.environ
["PYTHONUNBUFFERED"] = "1"
30 from collections
import namedtuple
31 from datetime
import datetime
, timezone
33 from functools
import partial
34 from typing
import Dict
, Optional
37 from ldb
import SCOPE_BASE
43 generate_random_password
,
47 from samba
.auth
import system_session
48 from samba
.credentials
import (
54 from samba
.crypto
import des_crypt_blob_16
, md4_hash_blob
55 from samba
.dcerpc
import (
69 from samba
.dcerpc
.misc
import SEC_CHAN_BDC
, SEC_CHAN_NULL
, SEC_CHAN_WKSTA
70 from samba
.domain
.models
import AuthenticationPolicy
, AuthenticationSilo
71 from samba
.drs_utils
import drs_Replicate
, drsuapi_connect
72 from samba
.dsdb
import (
73 DS_DOMAIN_FUNCTION_2000
,
74 DS_DOMAIN_FUNCTION_2008
,
75 DS_GUID_COMPUTERS_CONTAINER
,
76 DS_GUID_DOMAIN_CONTROLLERS_CONTAINER
,
77 DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
,
78 DS_GUID_USERS_CONTAINER
,
79 DSDB_SYNTAX_BINARY_DN
,
80 GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
,
81 GTYPE_SECURITY_GLOBAL_GROUP
,
82 GTYPE_SECURITY_UNIVERSAL_GROUP
,
84 UF_NO_AUTH_DATA_REQUIRED
,
87 UF_PARTIAL_SECRETS_ACCOUNT
,
88 UF_SERVER_TRUST_ACCOUNT
,
89 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
,
90 UF_WORKSTATION_TRUST_ACCOUNT
,
93 from samba
.join
import DCJoinContext
94 from samba
.ndr
import ndr_pack
, ndr_unpack
95 from samba
.param
import LoadParm
96 from samba
.samdb
import SamDB
, dsdb_Dn
98 rc4_bit
= security
.KERB_ENCTYPE_RC4_HMAC_MD5
99 aes256_sk_bit
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
101 import samba
.tests
.krb5
.kcrypto
as kcrypto
102 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
103 from samba
.tests
import TestCaseInTempDir
, delete_force
104 from samba
.tests
.krb5
.raw_testcase
import (
109 from samba
.tests
.krb5
.rfc4120_constants
import (
112 AES256_CTS_HMAC_SHA1_96
,
114 KDC_ERR_PREAUTH_REQUIRED
,
120 KU_ENC_CHALLENGE_CLIENT
,
125 PADATA_ENC_TIMESTAMP
,
126 PADATA_ENCRYPTED_CHALLENGE
,
130 global_asn1_print
= False
131 global_hexdump
= False
134 class GroupType(Enum
):
135 GLOBAL
= GTYPE_SECURITY_GLOBAL_GROUP
136 DOMAIN_LOCAL
= GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
137 UNIVERSAL
= GTYPE_SECURITY_UNIVERSAL_GROUP
140 # This simple class encapsulates the DN and SID of a Principal.
142 __slots__
= ['dn', 'sid']
144 def __init__(self
, dn
, sid
):
145 if dn
is not None and not isinstance(dn
, ldb
.Dn
):
146 raise AssertionError(f
'expected {dn} to be an ldb.Dn')
152 class KDCBaseTest(TestCaseInTempDir
, RawKerberosTest
):
153 """ Base class for KDC tests.
156 class AccountType(Enum
):
161 MANAGED_SERVICE
= object()
162 GROUP_MANAGED_SERVICE
= object()
172 cls
._drsuapi
_connection
= None
174 cls
._functional
_level
= None
176 # An identifier to ensure created accounts have unique names. Windows
177 # caches accounts based on usernames, so account names being different
178 # across test runs avoids previous test runs affecting the results.
179 cls
.account_base
= f
'{secrets.token_hex(4)}_'
182 # A list containing DNs of accounts created as part of testing.
185 cls
.account_cache
= {}
186 cls
.policy_cache
= {}
191 cls
.ldb_cleanups
= []
193 cls
._claim
_types
_dn
= None
194 cls
._authn
_policy
_config
_dn
= None
195 cls
._authn
_policies
_dn
= None
196 cls
._authn
_silos
_dn
= None
198 def get_claim_types_dn(self
):
199 samdb
= self
.get_samdb()
201 if self
._claim
_types
_dn
is None:
202 claim_config_dn
= samdb
.get_config_basedn()
204 claim_config_dn
.add_child('CN=Claims Configuration,CN=Services')
206 'dn': claim_config_dn
,
207 'objectClass': 'container',
211 except ldb
.LdbError
as err
:
213 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
216 self
.accounts
.append(str(claim_config_dn
))
218 claim_types_dn
= claim_config_dn
219 claim_types_dn
.add_child('CN=Claim Types')
221 'dn': claim_types_dn
,
222 'objectClass': 'msDS-ClaimTypes',
226 except ldb
.LdbError
as err
:
228 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
231 self
.accounts
.append(str(claim_types_dn
))
233 type(self
)._claim
_types
_dn
= claim_types_dn
235 # Return a copy of the DN.
236 return ldb
.Dn(samdb
, str(self
._claim
_types
_dn
))
238 def get_authn_policy_config_dn(self
):
239 samdb
= self
.get_samdb()
241 if self
._authn
_policy
_config
_dn
is None:
242 authn_policy_config_dn
= samdb
.get_config_basedn()
244 authn_policy_config_dn
.add_child(
245 'CN=AuthN Policy Configuration,CN=Services')
247 'dn': authn_policy_config_dn
,
248 'objectClass': 'container',
249 'description': ('Contains configuration for authentication '
254 except ldb
.LdbError
as err
:
256 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
259 self
.accounts
.append(str(authn_policy_config_dn
))
261 type(self
)._authn
_policy
_config
_dn
= authn_policy_config_dn
263 # Return a copy of the DN.
264 return ldb
.Dn(samdb
, str(self
._authn
_policy
_config
_dn
))
266 def get_authn_policies_dn(self
):
267 samdb
= self
.get_samdb()
269 if self
._authn
_policies
_dn
is None:
270 authn_policies_dn
= self
.get_authn_policy_config_dn()
271 authn_policies_dn
.add_child('CN=AuthN Policies')
273 'dn': authn_policies_dn
,
274 'objectClass': 'msDS-AuthNPolicies',
275 'description': 'Contains authentication policy objects',
279 except ldb
.LdbError
as err
:
281 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
284 self
.accounts
.append(str(authn_policies_dn
))
286 type(self
)._authn
_policies
_dn
= authn_policies_dn
288 # Return a copy of the DN.
289 return ldb
.Dn(samdb
, str(self
._authn
_policies
_dn
))
291 def get_authn_silos_dn(self
):
292 samdb
= self
.get_samdb()
294 if self
._authn
_silos
_dn
is None:
295 authn_silos_dn
= self
.get_authn_policy_config_dn()
296 authn_silos_dn
.add_child('CN=AuthN Silos')
298 'dn': authn_silos_dn
,
299 'objectClass': 'msDS-AuthNPolicySilos',
300 'description': 'Contains authentication policy silo objects',
304 except ldb
.LdbError
as err
:
306 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
309 self
.accounts
.append(str(authn_silos_dn
))
311 type(self
)._authn
_silos
_dn
= authn_silos_dn
313 # Return a copy of the DN.
314 return ldb
.Dn(samdb
, str(self
._authn
_silos
_dn
))
318 return frozenset((k
, v
) for k
, v
in m
.items())
321 # Run any cleanups that may modify accounts prior to deleting those
325 # Clean up any accounts created for single tests.
326 if self
._ldb
is not None:
327 for dn
in reversed(self
.test_accounts
):
328 delete_force(self
._ldb
, dn
)
333 def tearDownClass(cls
):
334 # Clean up any accounts created by create_account. This is
335 # done in tearDownClass() rather than tearDown(), so that
336 # accounts need only be created once for permutation tests.
337 if cls
._ldb
is not None:
338 for cleanup
in reversed(cls
.ldb_cleanups
):
340 cls
._ldb
.modify(cleanup
)
344 for dn
in reversed(cls
.accounts
):
345 delete_force(cls
._ldb
, dn
)
347 if cls
._rodc
_ctx
is not None:
348 cls
._rodc
_ctx
.cleanup_old_join(force
=True)
350 super().tearDownClass()
354 self
.do_asn1_print
= global_asn1_print
355 self
.do_hexdump
= global_hexdump
357 # A list containing DNs of accounts that should be removed when the
358 # current test finishes.
359 self
.test_accounts
= []
361 def get_lp(self
) -> LoadParm
:
363 type(self
)._lp
= self
.get_loadparm()
367 def get_samdb(self
) -> SamDB
:
368 if self
._ldb
is None:
369 creds
= self
.get_admin_creds()
372 session
= system_session()
373 type(self
)._ldb
= SamDB(url
="ldap://%s" % self
.dc_host
,
374 session_info
=session
,
380 def get_rodc_samdb(self
) -> SamDB
:
381 if self
._rodc
_ldb
is None:
382 creds
= self
.get_admin_creds()
385 session
= system_session()
386 type(self
)._rodc
_ldb
= SamDB(url
="ldap://%s" % self
.host
,
387 session_info
=session
,
392 return self
._rodc
_ldb
394 def get_drsuapi_connection(self
):
395 if self
._drsuapi
_connection
is None:
396 admin_creds
= self
.get_admin_creds()
397 samdb
= self
.get_samdb()
398 dns_hostname
= samdb
.host_dns_name()
399 type(self
)._drsuapi
_connection
= drsuapi_connect(dns_hostname
,
404 return self
._drsuapi
_connection
406 def get_server_dn(self
, samdb
):
407 server
= samdb
.get_serverName()
409 res
= samdb
.search(base
=server
,
410 scope
=ldb
.SCOPE_BASE
,
411 attrs
=['serverReference'])
412 dn
= ldb
.Dn(samdb
, res
[0]['serverReference'][0].decode('utf8'))
416 def get_mock_rodc_ctx(self
):
417 if self
._rodc
_ctx
is None:
418 admin_creds
= self
.get_admin_creds()
421 rodc_name
= self
.get_new_username()
422 site_name
= 'Default-First-Site-Name'
424 rodc_ctx
= DCJoinContext(server
=self
.dc_host
,
428 netbios_name
=rodc_name
,
431 self
.create_rodc(rodc_ctx
)
433 type(self
)._rodc
_ctx
= rodc_ctx
435 return self
._rodc
_ctx
437 def get_domain_functional_level(self
, ldb
=None):
438 if self
._functional
_level
is None:
440 ldb
= self
.get_samdb()
442 res
= ldb
.search(base
='',
444 attrs
=['domainFunctionality'])
446 functional_level
= int(res
[0]['domainFunctionality'][0])
448 functional_level
= DS_DOMAIN_FUNCTION_2000
450 type(self
)._functional
_level
= functional_level
452 return self
._functional
_level
454 def get_default_enctypes(self
, creds
):
455 self
.assertIsNotNone(creds
, 'expected client creds to be passed in')
457 functional_level
= self
.get_domain_functional_level()
459 default_enctypes
= []
461 if functional_level
>= DS_DOMAIN_FUNCTION_2008
:
462 # AES is only supported at functional level 2008 or higher
463 default_enctypes
.append(kcrypto
.Enctype
.AES256
)
464 default_enctypes
.append(kcrypto
.Enctype
.AES128
)
466 if self
.expect_nt_hash
or creds
.get_workstation():
467 default_enctypes
.append(kcrypto
.Enctype
.RC4
)
469 return default_enctypes
471 def create_group(self
, samdb
, name
, ou
=None, gtype
=None):
473 ou
= samdb
.get_wellknown_dn(samdb
.get_default_basedn(),
474 DS_GUID_USERS_CONTAINER
)
476 dn
= f
'CN={name},{ou}'
478 # Remove the group if it exists; this will happen if a previous test
480 delete_force(samdb
, dn
)
482 # Save the group name so it can be deleted in tearDownClass.
483 self
.accounts
.append(dn
)
487 'objectClass': 'group'
489 if gtype
is not None:
490 details
['groupType'] = common
.normalise_int32(gtype
)
495 def get_dn_from_attribute(self
, attribute
):
496 return self
.get_from_attribute(attribute
).dn
498 def get_dn_from_class(self
, attribute
):
499 return self
.get_from_class(attribute
).dn
501 def get_schema_id_guid_from_attribute(self
, attribute
):
502 guid
= self
.get_from_attribute(attribute
).get('schemaIDGUID', idx
=0)
503 return misc
.GUID(guid
)
505 def get_from_attribute(self
, attribute
):
506 return self
.get_from_schema(attribute
, 'attributeSchema')
508 def get_from_class(self
, attribute
):
509 return self
.get_from_schema(attribute
, 'classSchema')
511 def get_from_schema(self
, name
, object_class
):
512 samdb
= self
.get_samdb()
513 schema_dn
= samdb
.get_schema_basedn()
515 res
= samdb
.search(base
=schema_dn
,
516 scope
=ldb
.SCOPE_ONELEVEL
,
517 attrs
=['schemaIDGUID'],
518 expression
=(f
'(&(objectClass={object_class})'
519 f
'(lDAPDisplayName={name}))'))
520 self
.assertEqual(1, len(res
),
521 f
'could not locate {name} in {object_class}')
525 def create_authn_silo(self
, *,
528 computer_policy
=None,
531 samdb
= self
.get_samdb()
533 silo_id
= self
.get_new_username()
535 authn_silo_dn
= self
.get_authn_silos_dn()
536 authn_silo_dn
.add_child(f
'CN={silo_id}')
540 'objectClass': 'msDS-AuthNPolicySilo',
545 elif enforced
is False:
548 if members
is not None:
549 details
['msDS-AuthNPolicySiloMembers'] = members
550 if user_policy
is not None:
551 details
['msDS-UserAuthNPolicy'] = str(user_policy
.dn
)
552 if computer_policy
is not None:
553 details
['msDS-ComputerAuthNPolicy'] = str(computer_policy
.dn
)
554 if service_policy
is not None:
555 details
['msDS-ServiceAuthNPolicy'] = str(service_policy
.dn
)
556 if enforced
is not None:
557 details
['msDS-AuthNPolicySiloEnforced'] = enforced
559 # Save the silo DN so it can be deleted in tearDownClass().
560 self
.accounts
.append(str(authn_silo_dn
))
562 # Remove the silo if it exists; this will happen if a previous test run
564 delete_force(samdb
, authn_silo_dn
)
568 return AuthenticationSilo
.get(samdb
, dn
=authn_silo_dn
)
570 def create_authn_silo_claim_id(self
):
571 claim_id
= 'ad://ext/AuthenticationSilo'
574 'msDS-GroupManagedServiceAccount',
576 'msDS-ManagedServiceAccount',
580 self
.create_claim(claim_id
,
583 value_space_restricted
=False,
584 source_type
='Constructed',
585 for_classes
=for_classes
,
586 value_type
=claims
.CLAIM_TYPE_STRING
,
587 # It's OK if the claim type already exists.
592 def create_authn_policy(self
, *,
597 cache_key
= self
.freeze(kwargs
)
599 authn_policy
= self
.policy_cache
.get(cache_key
)
600 if authn_policy
is not None:
603 authn_policy
= self
.create_authn_policy_opts(**kwargs
)
605 self
.policy_cache
[cache_key
] = authn_policy
609 def create_authn_policy_opts(self
, *,
611 strong_ntlm_policy
=None,
612 user_allowed_from
=None,
613 user_allowed_ntlm
=None,
614 user_allowed_to
=None,
615 user_tgt_lifetime
=None,
616 computer_allowed_to
=None,
617 computer_tgt_lifetime
=None,
618 service_allowed_from
=None,
619 service_allowed_ntlm
=None,
620 service_allowed_to
=None,
621 service_tgt_lifetime
=None):
622 samdb
= self
.get_samdb()
624 policy_id
= self
.get_new_username()
626 policy_dn
= self
.get_authn_policies_dn()
627 policy_dn
.add_child(f
'CN={policy_id}')
631 'objectClass': 'msDS-AuthNPolicy',
636 def sd_from_sddl(sddl
):
638 if _domain_sid
is None:
639 _domain_sid
= security
.dom_sid(samdb
.get_domain_sid())
641 return ndr_pack(security
.descriptor
.from_sddl(sddl
, _domain_sid
))
645 elif enforced
is False:
648 if user_allowed_ntlm
is True:
649 user_allowed_ntlm
= 'TRUE'
650 elif user_allowed_ntlm
is False:
651 user_allowed_ntlm
= 'FALSE'
653 if service_allowed_ntlm
is True:
654 service_allowed_ntlm
= 'TRUE'
655 elif service_allowed_ntlm
is False:
656 service_allowed_ntlm
= 'FALSE'
658 if enforced
is not None:
659 details
['msDS-AuthNPolicyEnforced'] = enforced
660 if strong_ntlm_policy
is not None:
661 details
['msDS-StrongNTLMPolicy'] = strong_ntlm_policy
663 if user_allowed_from
is not None:
664 details
['msDS-UserAllowedToAuthenticateFrom'] = sd_from_sddl(
666 if user_allowed_ntlm
is not None:
667 details
['msDS-UserAllowedNTLMNetworkAuthentication'] = (
669 if user_allowed_to
is not None:
670 details
['msDS-UserAllowedToAuthenticateTo'] = sd_from_sddl(
672 if user_tgt_lifetime
is not None:
673 if isinstance(user_tgt_lifetime
, numbers
.Number
):
674 user_tgt_lifetime
= str(int(user_tgt_lifetime
* 10_000_000))
675 details
['msDS-UserTGTLifetime'] = user_tgt_lifetime
677 if computer_allowed_to
is not None:
678 details
['msDS-ComputerAllowedToAuthenticateTo'] = sd_from_sddl(
680 if computer_tgt_lifetime
is not None:
681 if isinstance(computer_tgt_lifetime
, numbers
.Number
):
682 computer_tgt_lifetime
= str(
683 int(computer_tgt_lifetime
* 10_000_000))
684 details
['msDS-ComputerTGTLifetime'] = computer_tgt_lifetime
686 if service_allowed_from
is not None:
687 details
['msDS-ServiceAllowedToAuthenticateFrom'] = sd_from_sddl(
688 service_allowed_from
)
689 if service_allowed_ntlm
is not None:
690 details
['msDS-ServiceAllowedNTLMNetworkAuthentication'] = (
691 service_allowed_ntlm
)
692 if service_allowed_to
is not None:
693 details
['msDS-ServiceAllowedToAuthenticateTo'] = sd_from_sddl(
695 if service_tgt_lifetime
is not None:
696 if isinstance(service_tgt_lifetime
, numbers
.Number
):
697 service_tgt_lifetime
= str(
698 int(service_tgt_lifetime
* 10_000_000))
699 details
['msDS-ServiceTGTLifetime'] = service_tgt_lifetime
701 # Save the policy DN so it can be deleted in tearDownClass().
702 self
.accounts
.append(str(policy_dn
))
704 # Remove the policy if it exists; this will happen if a previous test
706 delete_force(samdb
, policy_dn
)
710 return AuthenticationPolicy
.get(samdb
, dn
=policy_dn
)
712 def create_claim(self
,
717 value_space_restricted
=None,
723 samdb
= self
.get_samdb()
725 claim_dn
= self
.get_claim_types_dn()
726 claim_dn
.add_child(f
'CN={claim_id}')
730 'objectClass': 'msDS-ClaimType',
735 elif enabled
is False:
738 if attribute
is not None:
739 attribute
= str(self
.get_dn_from_attribute(attribute
))
741 if single_valued
is True:
742 single_valued
= 'TRUE'
743 elif single_valued
is False:
744 single_valued
= 'FALSE'
746 if value_space_restricted
is True:
747 value_space_restricted
= 'TRUE'
748 elif value_space_restricted
is False:
749 value_space_restricted
= 'FALSE'
751 if for_classes
is not None:
752 for_classes
= [str(self
.get_dn_from_class(name
))
753 for name
in for_classes
]
755 if isinstance(value_type
, int):
756 value_type
= str(value_type
)
758 if enabled
is not None:
759 details
['Enabled'] = enabled
760 if attribute
is not None:
761 details
['msDS-ClaimAttributeSource'] = attribute
762 if single_valued
is not None:
763 details
['msDS-ClaimIsSingleValued'] = single_valued
764 if value_space_restricted
is not None:
765 details
['msDS-ClaimIsValueSpaceRestricted'] = (
766 value_space_restricted
)
767 if source
is not None:
768 details
['msDS-ClaimSource'] = source
769 if source_type
is not None:
770 details
['msDS-ClaimSourceType'] = source_type
771 if for_classes
is not None:
772 details
['msDS-ClaimTypeAppliesToClass'] = for_classes
773 if value_type
is not None:
774 details
['msDS-ClaimValueType'] = value_type
777 # Remove the claim if it exists; this will happen if a previous
779 delete_force(samdb
, claim_dn
)
783 except ldb
.LdbError
as err
:
785 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
787 self
.assertFalse(force
, 'should not fail with force=True')
789 # Save the claim DN so it can be deleted in tearDownClass()
790 self
.accounts
.append(str(claim_dn
))
792 def create_account(self
, samdb
, name
, account_type
=AccountType
.USER
,
793 spn
=None, upn
=None, additional_details
=None,
794 ou
=None, account_control
=0, add_dollar
=None,
795 expired_password
=False, force_nt4_hash
=False,
797 """Create an account for testing.
798 The dn of the created account is added to self.accounts,
799 which is used by tearDownClass to clean up the created accounts.
801 if add_dollar
is None and account_type
is not self
.AccountType
.USER
:
805 if account_type
is self
.AccountType
.COMPUTER
:
806 guid
= DS_GUID_COMPUTERS_CONTAINER
807 elif account_type
is self
.AccountType
.MANAGED_SERVICE
or (
808 account_type
is self
.AccountType
.GROUP_MANAGED_SERVICE
):
809 guid
= DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
810 elif account_type
is self
.AccountType
.SERVER
:
811 guid
= DS_GUID_DOMAIN_CONTROLLERS_CONTAINER
813 guid
= DS_GUID_USERS_CONTAINER
815 ou
= samdb
.get_wellknown_dn(samdb
.get_default_basedn(), guid
)
817 dn
= "CN=%s,%s" % (name
, ou
)
819 # remove the account if it exists, this will happen if a previous test
821 delete_force(samdb
, dn
)
825 secure_schannel_type
= SEC_CHAN_NULL
826 if account_type
is self
.AccountType
.USER
:
827 object_class
= "user"
828 account_control |
= UF_NORMAL_ACCOUNT
829 elif account_type
is self
.AccountType
.MANAGED_SERVICE
:
830 object_class
= "msDS-ManagedServiceAccount"
831 account_control |
= UF_WORKSTATION_TRUST_ACCOUNT
832 secure_schannel_type
= SEC_CHAN_WKSTA
833 elif account_type
is self
.AccountType
.GROUP_MANAGED_SERVICE
:
834 object_class
= "msDS-GroupManagedServiceAccount"
835 account_control |
= UF_WORKSTATION_TRUST_ACCOUNT
836 secure_schannel_type
= SEC_CHAN_WKSTA
838 object_class
= "computer"
839 if account_type
is self
.AccountType
.COMPUTER
:
840 account_control |
= UF_WORKSTATION_TRUST_ACCOUNT
841 secure_schannel_type
= SEC_CHAN_WKSTA
842 elif account_type
is self
.AccountType
.SERVER
:
843 account_control |
= UF_SERVER_TRUST_ACCOUNT
844 secure_schannel_type
= SEC_CHAN_BDC
850 "objectClass": object_class
,
851 "sAMAccountName": account_name
,
852 "userAccountControl": str(account_control
),
855 if account_type
is self
.AccountType
.GROUP_MANAGED_SERVICE
:
858 password
= generate_random_password(32, 32)
859 utf16pw
= ('"%s"' % password
).encode('utf-16-le')
861 details
['unicodePwd'] = utf16pw
864 upn
= upn
.format(account
=account_name
)
866 if isinstance(spn
, str):
867 spn
= spn
.format(account
=account_name
)
869 spn
= tuple(s
.format(account
=account_name
) for s
in spn
)
870 details
["servicePrincipalName"] = spn
872 details
["userPrincipalName"] = upn
874 details
["pwdLastSet"] = "0"
875 if additional_details
is not None:
876 details
.update(additional_details
)
878 # Mark this account for deletion in tearDownClass() after all the
879 # tests in this class finish.
880 self
.accounts
.append(dn
)
882 # Mark this account for deletion in tearDown() after the current
883 # test finishes. Because the time complexity of deleting an account
884 # in Samba scales with the number of accounts, it is faster to
885 # delete accounts as soon as possible than to keep them around
886 # until all the tests are finished.
887 self
.test_accounts
.append(dn
)
893 admin_creds
= self
.get_admin_creds()
895 net_ctx
= net
.Net(admin_creds
, lp
, server
=self
.dc_host
)
896 domain
= samdb
.domain_netbios_name().upper()
898 password
= generate_random_password(32, 32)
901 net_ctx
.set_password(newpassword
=password
,
902 account_name
=account_name
,
906 except Exception as e
:
909 creds
= KerberosCredentials()
910 creds
.guess(self
.get_lp())
911 creds
.set_realm(samdb
.domain_dns_name().upper())
912 creds
.set_domain(samdb
.domain_netbios_name().upper())
913 if password
is not None:
914 creds
.set_password(password
)
915 creds
.set_username(account_name
)
916 if account_type
is self
.AccountType
.USER
:
917 creds
.set_workstation('')
919 creds
.set_workstation(name
)
920 creds
.set_secure_channel_type(secure_schannel_type
)
921 creds
.set_dn(ldb
.Dn(samdb
, dn
))
924 creds
.set_type(account_type
)
925 creds
.set_user_account_control(account_control
)
927 self
.creds_set_enctypes(creds
)
929 res
= samdb
.search(base
=dn
,
930 scope
=ldb
.SCOPE_BASE
,
931 attrs
=['msDS-KeyVersionNumber',
935 kvno
= res
[0].get('msDS-KeyVersionNumber', idx
=0)
937 self
.assertEqual(int(kvno
), expected_kvno
)
938 creds
.set_kvno(expected_kvno
)
940 sid
= res
[0].get('objectSid', idx
=0)
941 sid
= samdb
.schema_format_value('objectSID', sid
)
942 sid
= sid
.decode('utf-8')
944 guid
= res
[0].get('objectGUID', idx
=0)
945 guid
= samdb
.schema_format_value('objectGUID', guid
)
946 guid
= guid
.decode('utf-8')
951 def get_security_descriptor(self
, dn
):
952 samdb
= self
.get_samdb()
954 sid
= self
.get_objectSid(samdb
, dn
)
956 owner_sid
= security
.dom_sid(security
.SID_BUILTIN_ADMINISTRATORS
)
959 ace
.access_mask
= security
.SEC_ADS_CONTROL_ACCESS
961 ace
.trustee
= security
.dom_sid(sid
)
963 dacl
= security
.acl()
964 dacl
.revision
= security
.SECURITY_ACL_REVISION_ADS
968 security_desc
= security
.descriptor()
969 security_desc
.type |
= security
.SEC_DESC_DACL_PRESENT
970 security_desc
.owner_sid
= owner_sid
971 security_desc
.dacl
= dacl
973 return ndr_pack(security_desc
)
975 def create_rodc(self
, ctx
):
976 ctx
.nc_list
= [ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
977 ctx
.full_nc_list
= [ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
978 ctx
.krbtgt_dn
= f
'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
980 ctx
.never_reveal_sid
= [f
'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
981 f
'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
982 f
'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
983 f
'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
984 f
'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
985 ctx
.reveal_sid
= f
'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
987 mysid
= ctx
.get_mysid()
988 admin_dn
= f
'<SID={mysid}>'
989 ctx
.managedby
= admin_dn
991 ctx
.userAccountControl
= (UF_WORKSTATION_TRUST_ACCOUNT |
992 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
993 UF_PARTIAL_SECRETS_ACCOUNT
)
995 ctx
.connection_dn
= f
'CN=RODC Connection (FRS),{ctx.ntds_dn}'
996 ctx
.secure_channel_type
= misc
.SEC_CHAN_RODC
998 ctx
.replica_flags
= (drsuapi
.DRSUAPI_DRS_INIT_SYNC |
999 drsuapi
.DRSUAPI_DRS_PER_SYNC |
1000 drsuapi
.DRSUAPI_DRS_GET_ANC |
1001 drsuapi
.DRSUAPI_DRS_NEVER_SYNCED |
1002 drsuapi
.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING
)
1003 ctx
.domain_replica_flags
= ctx
.replica_flags | drsuapi
.DRSUAPI_DRS_CRITICAL_ONLY
1005 ctx
.build_nc_lists()
1007 ctx
.cleanup_old_join()
1010 ctx
.join_add_objects()
1012 # cleanup the failed join (checking we still have a live LDB
1013 # connection to the remote DC first)
1014 ctx
.refresh_ldb_connection()
1015 ctx
.cleanup_old_join()
1018 def replicate_account_to_rodc(self
, dn
):
1019 samdb
= self
.get_samdb()
1020 rodc_samdb
= self
.get_rodc_samdb()
1022 repl_val
= f
'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
1025 msg
.dn
= ldb
.Dn(rodc_samdb
, '')
1026 msg
['replicateSingleObject'] = ldb
.MessageElement(
1028 ldb
.FLAG_MOD_REPLACE
,
1029 'replicateSingleObject')
1032 # Try replication using the replicateSingleObject rootDSE
1034 rodc_samdb
.modify(msg
)
1035 except ldb
.LdbError
as err
:
1036 enum
, estr
= err
.args
1037 self
.assertEqual(enum
, ldb
.ERR_UNWILLING_TO_PERFORM
)
1038 self
.assertIn('rootdse_modify: unknown attribute to change!',
1041 # If that method wasn't supported, we may be in the rodc:local test
1042 # environment, where we can try replicating to the local database.
1046 rodc_creds
= Credentials()
1047 rodc_creds
.guess(lp
)
1048 rodc_creds
.set_machine_account(lp
)
1050 local_samdb
= SamDB(url
=None, session_info
=system_session(),
1051 credentials
=rodc_creds
, lp
=lp
)
1053 destination_dsa_guid
= misc
.GUID(local_samdb
.get_ntds_GUID())
1055 repl
= drs_Replicate(f
'ncacn_ip_tcp:{self.dc_host}[seal]',
1057 local_samdb
, destination_dsa_guid
)
1059 source_dsa_invocation_id
= misc
.GUID(samdb
.invocation_id
)
1062 source_dsa_invocation_id
,
1063 destination_dsa_guid
,
1064 exop
=drsuapi
.DRSUAPI_EXOP_REPL_SECRET
,
1067 def reveal_account_to_mock_rodc(self
, dn
):
1068 samdb
= self
.get_samdb()
1069 rodc_ctx
= self
.get_mock_rodc_ctx()
1073 destination_dsa_guid
=rodc_ctx
.ntds_guid
,
1074 source_dsa_invocation_id
=misc
.GUID(samdb
.invocation_id
))
1076 def check_revealed(self
, dn
, rodc_dn
, revealed
=True):
1077 samdb
= self
.get_samdb()
1079 res
= samdb
.search(base
=rodc_dn
,
1080 scope
=ldb
.SCOPE_BASE
,
1081 attrs
=['msDS-RevealedUsers'])
1083 revealed_users
= res
[0].get('msDS-RevealedUsers')
1084 if revealed_users
is None:
1085 self
.assertFalse(revealed
)
1088 revealed_dns
= set(str(dsdb_Dn(samdb
, str(user
),
1089 syntax_oid
=DSDB_SYNTAX_BINARY_DN
).dn
)
1090 for user
in revealed_users
)
1093 self
.assertIn(str(dn
), revealed_dns
)
1095 self
.assertNotIn(str(dn
), revealed_dns
)
1097 def get_secrets(self
, dn
,
1098 destination_dsa_guid
,
1099 source_dsa_invocation_id
):
1100 bind
, handle
, _
= self
.get_drsuapi_connection()
1102 req
= drsuapi
.DsGetNCChangesRequest8()
1104 req
.destination_dsa_guid
= destination_dsa_guid
1105 req
.source_dsa_invocation_id
= source_dsa_invocation_id
1107 naming_context
= drsuapi
.DsReplicaObjectIdentifier()
1108 naming_context
.dn
= dn
1110 req
.naming_context
= naming_context
1112 hwm
= drsuapi
.DsReplicaHighWaterMark()
1113 hwm
.tmp_highest_usn
= 0
1114 hwm
.reserved_usn
= 0
1117 req
.highwatermark
= hwm
1118 req
.uptodateness_vector
= None
1120 req
.replica_flags
= 0
1122 req
.max_object_count
= 1
1123 req
.max_ndr_size
= 402116
1124 req
.extended_op
= drsuapi
.DRSUAPI_EXOP_REPL_SECRET
1126 attids
= [drsuapi
.DRSUAPI_ATTID_supplementalCredentials
,
1127 drsuapi
.DRSUAPI_ATTID_unicodePwd
,
1128 drsuapi
.DRSUAPI_ATTID_ntPwdHistory
]
1130 partial_attribute_set
= drsuapi
.DsPartialAttributeSet()
1131 partial_attribute_set
.version
= 1
1132 partial_attribute_set
.attids
= attids
1133 partial_attribute_set
.num_attids
= len(attids
)
1135 req
.partial_attribute_set
= partial_attribute_set
1137 req
.partial_attribute_set_ex
= None
1138 req
.mapping_ctr
.num_mappings
= 0
1139 req
.mapping_ctr
.mappings
= None
1141 _
, ctr
= bind
.DsGetNCChanges(handle
, 8, req
)
1143 self
.assertEqual(1, ctr
.object_count
)
1145 identifier
= ctr
.first_object
.object.identifier
1146 attributes
= ctr
.first_object
.object.attribute_ctr
.attributes
1148 self
.assertEqual(dn
, identifier
.dn
)
1150 return bind
, identifier
, attributes
1152 def unpack_supplemental_credentials(
1154 ) -> Dict
[kcrypto
.Enctype
, str]:
1155 spl
= ndr_unpack(drsblobs
.supplementalCredentialsBlob
, blob
)
1157 keys
: Dict
[kcrypto
.Enctype
, str] = {}
1159 for pkg
in spl
.sub
.packages
:
1160 if pkg
.name
== 'Primary:Kerberos-Newer-Keys':
1161 krb5_new_keys_raw
= binascii
.a2b_hex(pkg
.data
)
1162 krb5_new_keys
= ndr_unpack(
1163 drsblobs
.package_PrimaryKerberosBlob
, krb5_new_keys_raw
1165 for key
in krb5_new_keys
.ctr
.keys
:
1166 keytype
= key
.keytype
1167 if keytype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
1168 keys
[keytype
] = key
.value
.hex()
1172 def get_keys(self
, creds
, expected_etypes
=None):
1173 admin_creds
= self
.get_admin_creds()
1174 samdb
= self
.get_samdb()
1178 bind
, identifier
, attributes
= self
.get_secrets(
1180 destination_dsa_guid
=misc
.GUID(samdb
.get_ntds_GUID()),
1181 source_dsa_invocation_id
=misc
.GUID())
1183 rid
= identifier
.sid
.split()[1]
1185 net_ctx
= net
.Net(admin_creds
)
1189 for attr
in attributes
:
1190 if not attr
.value_ctr
.num_values
:
1193 if attr
.attid
== drsuapi
.DRSUAPI_ATTID_supplementalCredentials
:
1194 net_ctx
.replicate_decrypt(bind
, attr
, rid
)
1197 self
.unpack_supplemental_credentials(attr
.value_ctr
.values
[0].blob
)
1199 elif attr
.attid
== drsuapi
.DRSUAPI_ATTID_unicodePwd
:
1200 net_ctx
.replicate_decrypt(bind
, attr
, rid
)
1202 pwd
= attr
.value_ctr
.values
[0].blob
1203 keys
[kcrypto
.Enctype
.RC4
] = pwd
.hex()
1205 if expected_etypes
is None:
1206 expected_etypes
= self
.get_default_enctypes(creds
)
1208 self
.assertCountEqual(expected_etypes
, keys
)
1212 def creds_set_keys(self
, creds
, keys
):
1213 if keys
is not None:
1214 for enctype
, key
in keys
.items():
1215 creds
.set_forced_key(enctype
, key
)
1217 def creds_set_enctypes(self
, creds
,
1220 samdb
= self
.get_samdb()
1222 res
= samdb
.search(creds
.get_dn(),
1223 scope
=ldb
.SCOPE_BASE
,
1224 attrs
=['msDS-SupportedEncryptionTypes'])
1225 supported_enctypes
= res
[0].get('msDS-SupportedEncryptionTypes', idx
=0)
1227 if supported_enctypes
is None:
1228 supported_enctypes
= self
.default_etypes
1229 if supported_enctypes
is None:
1231 supported_enctypes
= lp
.get('kdc default domain supported enctypes')
1232 if supported_enctypes
== 0:
1233 supported_enctypes
= rc4_bit | aes256_sk_bit
1234 supported_enctypes
= int(supported_enctypes
)
1236 if extra_bits
is not None:
1237 # We need to add in implicit or implied encryption types.
1238 supported_enctypes |
= extra_bits
1239 if remove_bits
is not None:
1240 # We also need to remove certain bits, such as the non-encryption
1241 # type bit aes256-sk.
1242 supported_enctypes
&= ~remove_bits
1244 creds
.set_as_supported_enctypes(supported_enctypes
)
1245 creds
.set_tgs_supported_enctypes(supported_enctypes
)
1246 creds
.set_ap_supported_enctypes(supported_enctypes
)
1248 def creds_set_default_enctypes(self
, creds
,
1250 claims_support
=False,
1251 compound_id_support
=False):
1252 default_enctypes
= self
.get_default_enctypes(creds
)
1253 supported_enctypes
= KerberosCredentials
.etypes_to_bits(
1257 supported_enctypes |
= security
.KERB_ENCTYPE_FAST_SUPPORTED
1259 supported_enctypes |
= security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
1260 if compound_id_support
:
1261 supported_enctypes |
= (
1262 security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
)
1264 creds
.set_as_supported_enctypes(supported_enctypes
)
1265 creds
.set_tgs_supported_enctypes(supported_enctypes
)
1266 creds
.set_ap_supported_enctypes(supported_enctypes
)
1268 def add_to_group(self
, account_dn
, group_dn
, group_attr
, expect_attr
=True,
1269 new_group_type
=None):
1270 samdb
= self
.get_samdb()
1273 res
= samdb
.search(base
=group_dn
,
1274 scope
=ldb
.SCOPE_BASE
,
1276 except ldb
.LdbError
as err
:
1278 if num
!= ldb
.ERR_NO_SUCH_OBJECT
:
1284 members
= orig_msg
.get(group_attr
)
1286 self
.assertIsNotNone(members
)
1287 elif members
is None:
1290 members
= map(lambda s
: s
.decode('utf-8'), members
)
1292 # Use a set so we can handle the same group being added twice.
1293 members
= set(members
)
1295 self
.assertNotIsInstance(account_dn
, ldb
.Dn
,
1296 'ldb.MessageElement does not support ldb.Dn')
1297 self
.assertNotIsInstance(account_dn
, bytes
)
1299 if isinstance(account_dn
, str):
1300 members
.add(account_dn
)
1302 members
.update(account_dn
)
1306 if new_group_type
is not None:
1307 msg
['0'] = ldb
.MessageElement(
1308 common
.normalise_int32(new_group_type
),
1309 ldb
.FLAG_MOD_REPLACE
,
1311 msg
['1'] = ldb
.MessageElement(list(members
),
1312 ldb
.FLAG_MOD_REPLACE
,
1314 cleanup
= samdb
.msg_diff(msg
, orig_msg
)
1315 self
.ldb_cleanups
.append(cleanup
)
1320 def remove_from_group(self
, account_dn
, group_dn
):
1321 samdb
= self
.get_samdb()
1323 res
= samdb
.search(base
=group_dn
,
1324 scope
=ldb
.SCOPE_BASE
,
1327 self
.assertIn('member', orig_msg
)
1328 members
= list(orig_msg
['member'])
1330 account_dn
= str(account_dn
).encode('utf-8')
1331 self
.assertIn(account_dn
, members
)
1332 members
.remove(account_dn
)
1336 msg
['member'] = ldb
.MessageElement(members
,
1337 ldb
.FLAG_MOD_REPLACE
,
1340 cleanup
= samdb
.msg_diff(msg
, orig_msg
)
1341 self
.ldb_cleanups
.append(cleanup
)
1346 # Create a new group and return a Principal object representing it.
1347 def create_group_principal(self
, samdb
, group_type
):
1348 name
= self
.get_new_username()
1349 dn
= self
.create_group(samdb
, name
, gtype
=group_type
.value
)
1350 sid
= self
.get_objectSid(samdb
, dn
)
1352 return Principal(ldb
.Dn(samdb
, dn
), sid
)
1354 def set_group_type(self
, samdb
, dn
, gtype
):
1355 group_type
= common
.normalise_int32(gtype
.value
)
1356 msg
= ldb
.Message(dn
)
1357 msg
['groupType'] = ldb
.MessageElement(group_type
,
1358 ldb
.FLAG_MOD_REPLACE
,
1362 def set_primary_group(self
, samdb
, dn
, primary_sid
,
1363 expected_error
=None,
1364 expected_werror
=None):
1365 # Get the RID to be set as our primary group.
1366 primary_rid
= primary_sid
.rsplit('-', 1)[1]
1368 # Find out our current primary group.
1369 res
= samdb
.search(dn
,
1370 scope
=ldb
.SCOPE_BASE
,
1371 attrs
=['primaryGroupId'])
1374 # Prepare to modify the attribute.
1375 msg
= ldb
.Message(dn
)
1376 msg
['primaryGroupId'] = ldb
.MessageElement(str(primary_rid
),
1377 ldb
.FLAG_MOD_REPLACE
,
1380 # We'll remove the primaryGroupId attribute after the test, to avoid
1381 # problems in the teardown if the user outlives the group.
1382 remove_msg
= samdb
.msg_diff(msg
, orig_msg
)
1383 self
.addCleanup(samdb
.modify
, remove_msg
)
1385 # Set primaryGroupId.
1386 if expected_error
is None:
1387 self
.assertIsNone(expected_werror
)
1391 self
.assertIsNotNone(expected_werror
)
1393 with self
.assertRaises(
1395 msg
='expected setting primary group to fail'
1399 error
, estr
= err
.exception
.args
1400 self
.assertEqual(expected_error
, error
)
1401 self
.assertIn(f
'{expected_werror:08X}', estr
)
1403 # Create an arrangement of groups based on a configuration specified in a
1404 # test case. 'user_principal' is a principal representing the user account;
1405 # 'trust_principal', a principal representing the account of a user from
1407 def setup_groups(self
,
1412 groups
= dict(preexisting_groups
)
1414 primary_group_types
= {}
1416 # Create each group and add it to the group mapping.
1417 if group_setup
is not None:
1418 for group_id
, (group_type
, _
) in group_setup
.items():
1419 self
.assertNotIn(group_id
, preexisting_groups
,
1420 "don't specify placeholders")
1421 self
.assertNotIn(group_id
, groups
,
1422 'group ID specified more than once')
1424 if primary_groups
is not None and (
1425 group_id
in primary_groups
.values()):
1426 # Windows disallows setting a domain-local group as a
1427 # primary group, unless we create it as Universal first and
1428 # change it back to Domain-Local later.
1429 primary_group_types
[group_id
] = group_type
1430 group_type
= GroupType
.UNIVERSAL
1432 groups
[group_id
] = self
.create_group_principal(samdb
,
1435 if group_setup
is not None:
1436 # Map a group ID to that group's DN, and generate an
1437 # understandable error message if the mapping fails.
1438 def group_id_to_dn(group_id
):
1440 group
= groups
[group_id
]
1442 self
.fail(f
"included group member '{group_id}', but it is "
1443 f
"not specified in {groups.keys()}")
1445 if group
.dn
is not None:
1446 return str(group
.dn
)
1448 return f
'<SID={group.sid}>'
1450 # Populate each group's members.
1451 for group_id
, (_
, members
) in group_setup
.items():
1452 # Get the group's DN and the mapped DNs of its members.
1453 dn
= groups
[group_id
].dn
1454 principal_members
= map(group_id_to_dn
, members
)
1456 # Add the members to the group.
1457 self
.add_to_group(principal_members
, dn
, 'member',
1460 # Set primary groups.
1461 if primary_groups
is not None:
1462 for user
, primary_group
in primary_groups
.items():
1463 primary_sid
= groups
[primary_group
].sid
1464 self
.set_primary_group(samdb
, user
.dn
, primary_sid
)
1466 # Change the primary groups to their actual group types.
1467 for primary_group
, primary_group_type
in primary_group_types
.items():
1468 self
.set_group_type(samdb
,
1469 groups
[primary_group
].dn
,
1472 # Return the mapping from group IDs to principals.
1475 def map_to_sid(self
, val
, mapping
, domain_sid
):
1476 if isinstance(val
, int):
1477 # If it's an integer, we assume it's a RID, and prefix the domain
1479 self
.assertIsNotNone(domain_sid
)
1480 return f
'{domain_sid}-{val}'
1482 if mapping
is not None and val
in mapping
:
1483 # Or if we have a mapping for it, apply that.
1484 return mapping
[val
].sid
1486 # Otherwise leave it unmodified.
1489 def map_to_dn(self
, val
, mapping
, domain_sid
):
1490 sid
= self
.map_to_sid(val
, mapping
, domain_sid
)
1491 return ldb
.Dn(self
.get_samdb(), f
'<SID={sid}>')
1493 # Return SIDs from principal placeholders based on a supplied mapping.
1494 def map_sids(self
, sids
, mapping
, domain_sid
):
1501 if isinstance(entry
, frozenset):
1502 mapped_sids
.add(frozenset(self
.map_sids(entry
,
1506 val
, sid_type
, attrs
= entry
1507 sid
= self
.map_to_sid(val
, mapping
, domain_sid
)
1509 # There's no point expecting the 'Claims Valid' SID to be
1510 # present if we don't support claims. Filter it out to give the
1511 # tests a chance of passing.
1512 if not self
.kdc_claims_support
and (
1513 sid
== security
.SID_CLAIMS_VALID
):
1516 mapped_sids
.add((sid
, sid_type
, attrs
))
1520 def issued_by_rodc(self
, ticket
):
1521 rodc_krbtgt_creds
= self
.get_mock_rodc_krbtgt_creds()
1522 rodc_krbtgt_key
= self
.TicketDecryptionKey_from_creds(
1526 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: rodc_krbtgt_key
,
1529 return self
.modified_ticket(
1531 new_ticket_key
=rodc_krbtgt_key
,
1532 checksum_keys
=checksum_keys
)
1534 def signed_by_rodc(self
, ticket
):
1535 rodc_krbtgt_creds
= self
.get_mock_rodc_krbtgt_creds()
1536 rodc_krbtgt_key
= self
.TicketDecryptionKey_from_creds(
1540 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: rodc_krbtgt_key
,
1543 return self
.modified_ticket(ticket
,
1544 checksum_keys
=checksum_keys
)
1546 # Get a ticket with the SIDs in the PAC replaced with ones we specify. This
1547 # is useful for creating arbitrary tickets that can be used to perform a
1549 def ticket_with_sids(self
,
1558 krbtgt_creds
= self
.get_mock_rodc_krbtgt_creds()
1560 krbtgt_creds
= self
.get_krbtgt_creds()
1561 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
1564 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
1567 modify_pac_fn
= partial(self
.set_pac_sids
,
1569 domain_sid
=domain_sid
,
1571 set_user_flags
=set_user_flags
,
1572 reset_user_flags
=reset_user_flags
)
1574 return self
.modified_ticket(ticket
,
1575 new_ticket_key
=krbtgt_key
,
1576 modify_pac_fn
=modify_pac_fn
,
1577 checksum_keys
=checksum_keys
)
1579 # Replace the SIDs in a PAC with 'new_sids'.
1580 def set_pac_sids(self
,
1587 reset_user_flags
=0):
1588 if domain_sid
is None:
1589 domain_sid
= self
.get_samdb().get_domain_sid()
1595 resource_domain
= None
1599 # Filter our SIDs into three arrays depending on their ultimate
1600 # location in the PAC.
1601 for sid
, sid_type
, attrs
in new_sids
:
1602 if sid_type
is self
.SidType
.BASE_SID
:
1603 if isinstance(sid
, int):
1604 domain
, rid
= domain_sid
, sid
1606 domain
, rid
= sid
.rsplit('-', 1)
1607 self
.assertEqual(domain_sid
, domain
,
1608 f
'base SID {sid} must be in our domain')
1610 base_sid
= samr
.RidWithAttribute()
1611 base_sid
.rid
= int(rid
)
1612 base_sid
.attributes
= attrs
1614 base_sids
.append(base_sid
)
1615 elif sid_type
is self
.SidType
.EXTRA_SID
:
1616 extra_sid
= netlogon
.netr_SidAttr()
1617 extra_sid
.sid
= security
.dom_sid(sid
)
1618 extra_sid
.attributes
= attrs
1620 extra_sids
.append(extra_sid
)
1621 elif sid_type
is self
.SidType
.RESOURCE_SID
:
1622 if isinstance(sid
, int):
1623 domain
, rid
= domain_sid
, sid
1625 domain
, rid
= sid
.rsplit('-', 1)
1626 if resource_domain
is None:
1627 resource_domain
= domain
1629 self
.assertEqual(resource_domain
, domain
,
1630 'resource SIDs must share the same '
1633 resource_sid
= samr
.RidWithAttribute()
1634 resource_sid
.rid
= int(rid
)
1635 resource_sid
.attributes
= attrs
1637 resource_sids
.append(resource_sid
)
1638 elif sid_type
is self
.SidType
.PRIMARY_GID
:
1639 self
.assertIsNone(primary_gid
,
1640 f
'must not specify a second primary GID '
1642 self
.assertIsNone(attrs
, 'cannot specify primary GID attrs')
1644 if isinstance(sid
, int):
1645 domain
, primary_gid
= domain_sid
, sid
1647 domain
, primary_gid
= sid
.rsplit('-', 1)
1648 self
.assertEqual(domain_sid
, domain
,
1649 f
'primary GID {sid} must be in our domain')
1651 self
.fail(f
'invalid SID type {sid_type}')
1653 found_logon_info
= False
1655 pac_buffers
= pac
.buffers
1656 for pac_buffer
in pac_buffers
:
1657 # Find the LOGON_INFO PAC buffer.
1658 if pac_buffer
.type == krb5pac
.PAC_TYPE_LOGON_INFO
:
1659 logon_info
= pac_buffer
.info
.info
1661 # Add Extra SIDs and set the EXTRA_SIDS flag as needed.
1662 logon_info
.info3
.sidcount
= len(extra_sids
)
1664 logon_info
.info3
.sids
= extra_sids
1665 logon_info
.info3
.base
.user_flags |
= (
1666 netlogon
.NETLOGON_EXTRA_SIDS
)
1668 logon_info
.info3
.sids
= None
1669 logon_info
.info3
.base
.user_flags
&= ~
(
1670 netlogon
.NETLOGON_EXTRA_SIDS
)
1673 logon_info
.info3
.base
.groups
.count
= len(base_sids
)
1675 logon_info
.info3
.base
.groups
.rids
= base_sids
1677 logon_info
.info3
.base
.groups
.rids
= None
1679 logon_info
.info3
.base
.domain_sid
= security
.dom_sid(domain_sid
)
1680 if user_rid
is not None:
1681 logon_info
.info3
.base
.rid
= int(user_rid
)
1683 if primary_gid
is not None:
1684 logon_info
.info3
.base
.primary_gid
= int(primary_gid
)
1686 # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed.
1687 logon_info
.resource_groups
.groups
.count
= len(resource_sids
)
1689 resource_domain
= security
.dom_sid(resource_domain
)
1690 logon_info
.resource_groups
.domain_sid
= resource_domain
1691 logon_info
.resource_groups
.groups
.rids
= resource_sids
1692 logon_info
.info3
.base
.user_flags |
= (
1693 netlogon
.NETLOGON_RESOURCE_GROUPS
)
1695 logon_info
.resource_groups
.domain_sid
= None
1696 logon_info
.resource_groups
.groups
.rids
= None
1697 logon_info
.info3
.base
.user_flags
&= ~
(
1698 netlogon
.NETLOGON_RESOURCE_GROUPS
)
1700 logon_info
.info3
.base
.user_flags |
= set_user_flags
1701 logon_info
.info3
.base
.user_flags
&= ~reset_user_flags
1703 found_logon_info
= True
1705 # Also replace the user's SID in the UPN DNS buffer.
1706 elif pac_buffer
.type == krb5pac
.PAC_TYPE_UPN_DNS_INFO
:
1707 upn_dns_info_ex
= pac_buffer
.info
.ex
1709 if user_rid
is not None:
1710 upn_dns_info_ex
.objectsid
= security
.dom_sid(
1711 f
'{domain_sid}-{user_rid}')
1713 # But don't replace the user's SID in the Requester SID buffer, or
1714 # we'll get a SID mismatch.
1716 self
.assertTrue(found_logon_info
, 'no LOGON_INFO PAC buffer')
1718 pac
.buffers
= pac_buffers
1722 # Replace the device SIDs in a PAC with 'new_sids'.
1723 def set_pac_device_sids(self
,
1729 if domain_sid
is None:
1730 domain_sid
= self
.get_samdb().get_domain_sid()
1738 # Filter our SIDs into three arrays depending on their ultimate
1739 # location in the PAC.
1740 for entry
in new_sids
:
1741 if isinstance(entry
, frozenset):
1742 resource_domain
= None
1745 for sid
, sid_type
, attrs
in entry
:
1746 self
.assertIs(sid_type
, self
.SidType
.RESOURCE_SID
,
1747 'only resource SIDs may be specified in this way')
1749 if isinstance(sid
, int):
1750 domain
, rid
= domain_sid
, sid
1752 domain
, rid
= sid
.rsplit('-', 1)
1753 if resource_domain
is None:
1754 resource_domain
= domain
1756 self
.assertEqual(resource_domain
, domain
,
1757 'resource SIDs must share the same '
1760 resource_sid
= samr
.RidWithAttribute()
1761 resource_sid
.rid
= int(rid
)
1762 resource_sid
.attributes
= attrs
1764 domain_sids
.append(resource_sid
)
1766 membership
= krb5pac
.PAC_DOMAIN_GROUP_MEMBERSHIP()
1767 if resource_domain
is not None:
1768 membership
.domain_sid
= security
.dom_sid(resource_domain
)
1769 membership
.groups
.rids
= domain_sids
1770 membership
.groups
.count
= len(domain_sids
)
1772 resource_sids
.append(membership
)
1774 sid
, sid_type
, attrs
= entry
1775 if sid_type
is self
.SidType
.BASE_SID
:
1776 if isinstance(sid
, int):
1777 domain
, rid
= domain_sid
, sid
1779 domain
, rid
= sid
.rsplit('-', 1)
1780 self
.assertEqual(domain_sid
, domain
,
1781 f
'base SID {sid} must be in our domain')
1783 base_sid
= samr
.RidWithAttribute()
1784 base_sid
.rid
= int(rid
)
1785 base_sid
.attributes
= attrs
1787 base_sids
.append(base_sid
)
1788 elif sid_type
is self
.SidType
.EXTRA_SID
:
1789 extra_sid
= netlogon
.netr_SidAttr()
1790 extra_sid
.sid
= security
.dom_sid(sid
)
1791 extra_sid
.attributes
= attrs
1793 extra_sids
.append(extra_sid
)
1794 elif sid_type
is self
.SidType
.RESOURCE_SID
:
1795 self
.fail('specify resource groups in frozenset(s)')
1796 elif sid_type
is self
.SidType
.PRIMARY_GID
:
1797 self
.assertIsNone(primary_gid
,
1798 f
'must not specify a second primary GID '
1800 self
.assertIsNone(attrs
, 'cannot specify primary GID attrs')
1802 if isinstance(sid
, int):
1803 domain
, primary_gid
= domain_sid
, sid
1805 domain
, primary_gid
= sid
.rsplit('-', 1)
1806 self
.assertEqual(domain_sid
, domain
,
1807 f
'primary GID {sid} must be in our domain')
1809 self
.fail(f
'invalid SID type {sid_type}')
1811 pac_buffers
= pac
.buffers
1812 for pac_buffer
in pac_buffers
:
1813 # Find the DEVICE_INFO PAC buffer.
1814 if pac_buffer
.type == krb5pac
.PAC_TYPE_DEVICE_INFO
:
1815 logon_info
= pac_buffer
.info
.info
1818 logon_info
= krb5pac
.PAC_DEVICE_INFO()
1820 logon_info_ctr
= krb5pac
.PAC_DEVICE_INFO_CTR()
1821 logon_info_ctr
.info
= logon_info
1823 pac_buffer
= krb5pac
.PAC_BUFFER()
1824 pac_buffer
.type = krb5pac
.PAC_TYPE_DEVICE_INFO
1825 pac_buffer
.info
= logon_info_ctr
1827 pac_buffers
.append(pac_buffer
)
1829 logon_info
.domain_sid
= security
.dom_sid(domain_sid
)
1830 logon_info
.rid
= int(user_rid
)
1832 self
.assertIsNotNone(primary_gid
, 'please specify the primary GID')
1833 logon_info
.primary_gid
= int(primary_gid
)
1837 logon_info
.groups
.rids
= base_sids
1839 logon_info
.groups
.rids
= None
1840 logon_info
.groups
.count
= len(base_sids
)
1844 logon_info
.sids
= extra_sids
1846 logon_info
.sids
= None
1847 logon_info
.sid_count
= len(extra_sids
)
1849 # Add Resource SIDs.
1851 logon_info
.domain_groups
= resource_sids
1853 logon_info
.domain_groups
= None
1854 logon_info
.domain_group_count
= len(resource_sids
)
1856 pac
.buffers
= pac_buffers
1857 pac
.num_buffers
= len(pac_buffers
)
1861 def set_pac_claims(self
, pac
, *, client_claims
=None, device_claims
=None, claim_ids
=None):
1862 if claim_ids
is None:
1865 if client_claims
is not None:
1866 self
.assertIsNone(device_claims
,
1867 'don’t specify both client and device claims')
1868 pac_claims
= client_claims
1869 pac_buffer_type
= krb5pac
.PAC_TYPE_CLIENT_CLAIMS_INFO
1871 self
.assertIsNotNone(device_claims
,
1872 'please specify client or device claims')
1873 pac_claims
= device_claims
1874 pac_buffer_type
= krb5pac
.PAC_TYPE_DEVICE_CLAIMS_INFO
1876 claim_value_types
= {
1877 claims
.CLAIM_TYPE_INT64
: claims
.CLAIM_INT64
,
1878 claims
.CLAIM_TYPE_UINT64
: claims
.CLAIM_UINT64
,
1879 claims
.CLAIM_TYPE_STRING
: claims
.CLAIM_STRING
,
1880 claims
.CLAIM_TYPE_BOOLEAN
: claims
.CLAIM_UINT64
,
1885 for pac_claim_array
in pac_claims
:
1886 pac_claim_source_type
, pac_claim_entries
= (
1891 for pac_claim_entry
in pac_claim_entries
:
1892 pac_claim_id
, pac_claim_type
, pac_claim_values
= (
1895 claim_values_type
= claim_value_types
.get(
1896 pac_claim_type
, claims
.CLAIM_STRING
)
1898 claim_values_enum
= claim_values_type()
1899 claim_values_enum
.values
= pac_claim_values
1900 claim_values_enum
.value_count
= len(
1903 claim_entry
= claims
.CLAIM_ENTRY()
1905 claim_entry
.id = pac_claim_id
.format_map(
1907 except KeyError as err
:
1909 f
'unknown claim name(s) '
1910 f
'in ‘{pac_claim_id}’'
1912 claim_entry
.type = pac_claim_type
1913 claim_entry
.values
= claim_values_enum
1915 claim_entries
.append(claim_entry
)
1917 claims_array
= claims
.CLAIMS_ARRAY()
1918 claims_array
.claims_source_type
= pac_claim_source_type
1919 claims_array
.claim_entries
= claim_entries
1920 claims_array
.claims_count
= len(claim_entries
)
1922 claims_arrays
.append(claims_array
)
1924 claims_set
= claims
.CLAIMS_SET()
1925 claims_set
.claims_arrays
= claims_arrays
1926 claims_set
.claims_array_count
= len(claims_arrays
)
1928 claims_ctr
= claims
.CLAIMS_SET_CTR()
1929 claims_ctr
.claims
= claims_set
1931 claims_ndr
= claims
.CLAIMS_SET_NDR()
1932 claims_ndr
.claims
= claims_ctr
1934 metadata
= claims
.CLAIMS_SET_METADATA()
1935 metadata
.claims_set
= claims_ndr
1936 metadata
.compression_format
= (
1937 claims
.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF
)
1939 metadata_ctr
= claims
.CLAIMS_SET_METADATA_CTR()
1940 metadata_ctr
.metadata
= metadata
1942 metadata_ndr
= claims
.CLAIMS_SET_METADATA_NDR()
1943 metadata_ndr
.claims
= metadata_ctr
1945 pac_buffers
= pac
.buffers
1946 for pac_buffer
in pac_buffers
:
1947 if pac_buffer
.type == pac_buffer_type
:
1950 pac_buffer
= krb5pac
.PAC_BUFFER()
1951 pac_buffer
.type = pac_buffer_type
1952 pac_buffer
.info
= krb5pac
.DATA_BLOB_REM()
1954 pac_buffers
.append(pac_buffer
)
1956 pac_buffer
.info
.remaining
= ndr_pack(metadata_ndr
)
1958 pac
.buffers
= pac_buffers
1959 pac
.num_buffers
= len(pac_buffers
)
1963 def add_extra_pac_buffers(self
, pac
, *, buffers
=None):
1967 pac_buffers
= pac
.buffers
1968 for pac_buffer_type
in buffers
:
1969 info
= krb5pac
.DATA_BLOB_REM()
1970 # Having an empty PAC buffer will trigger an assertion failure in
1971 # the MIT KDC’s k5_pac_locate_buffer(), so we need at least one
1973 info
.remaining
= b
'0'
1975 pac_buffer
= krb5pac
.PAC_BUFFER()
1976 pac_buffer
.type = pac_buffer_type
1977 pac_buffer
.info
= info
1979 pac_buffers
.append(pac_buffer
)
1981 pac
.buffers
= pac_buffers
1982 pac
.num_buffers
= len(pac_buffers
)
1986 def get_cached_creds(self
, *,
1987 account_type
: AccountType
,
1988 opts
: Optional
[dict]=None,
1989 samdb
: Optional
[SamDB
]=None,
1990 use_cache
=True) -> KerberosCredentials
:
1995 'name_prefix': None,
1996 'name_suffix': None,
2000 'additional_details': None,
2001 'allowed_replication': False,
2002 'allowed_replication_mock': False,
2003 'denied_replication': False,
2004 'denied_replication_mock': False,
2005 'revealed_to_rodc': False,
2006 'revealed_to_mock_rodc': False,
2007 'no_auth_data_required': False,
2008 'expired_password': False,
2009 'supported_enctypes': None,
2010 'not_delegated': False,
2011 'delegation_to_spn': None,
2012 'delegation_from_dn': None,
2013 'trusted_to_auth_for_delegation': False,
2014 'fast_support': False,
2015 'claims_support': False,
2016 'compound_id_support': False,
2017 'sid_compression_support': True,
2019 'kerberos_enabled': True,
2020 'secure_channel_type': None,
2022 'force_nt4_hash': False,
2023 'assigned_policy': None,
2024 'assigned_silo': None,
2025 'logon_hours': None,
2026 'smartcard_required': False,
2031 'account_type': account_type
,
2037 self
.assertIsNone(samdb
)
2038 cache_key
= tuple(sorted(account_opts
.items()))
2039 creds
= self
.account_cache
.get(cache_key
)
2040 if creds
is not None:
2043 creds
= self
.create_account_opts(samdb
, use_cache
, **account_opts
)
2045 self
.account_cache
[cache_key
] = creds
2049 def create_account_opts(self
,
2050 samdb
: Optional
[SamDB
],
2060 allowed_replication
,
2061 allowed_replication_mock
,
2063 denied_replication_mock
,
2065 revealed_to_mock_rodc
,
2066 no_auth_data_required
,
2072 trusted_to_auth_for_delegation
,
2075 compound_id_support
,
2076 sid_compression_support
,
2079 secure_channel_type
,
2087 if account_type
is self
.AccountType
.USER
:
2088 self
.assertIsNone(delegation_to_spn
)
2089 self
.assertIsNone(delegation_from_dn
)
2090 self
.assertFalse(trusted_to_auth_for_delegation
)
2092 self
.assertFalse(not_delegated
)
2095 samdb
= self
.get_samdb()
2097 user_name
= self
.get_new_username()
2098 if name_prefix
is not None:
2099 user_name
= name_prefix
+ user_name
2100 if name_suffix
is not None:
2101 user_name
+= name_suffix
2103 user_account_control
= 0
2104 if trusted_to_auth_for_delegation
:
2105 user_account_control |
= UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
2107 user_account_control |
= UF_NOT_DELEGATED
2108 if no_auth_data_required
:
2109 user_account_control |
= UF_NO_AUTH_DATA_REQUIRED
2110 if smartcard_required
:
2111 user_account_control |
= UF_SMARTCARD_REQUIRED
2113 user_account_control |
= UF_ACCOUNTDISABLE
2115 if additional_details
:
2116 details
= {k
: v
for k
, v
in additional_details
}
2120 enctypes
= supported_enctypes
2122 enctypes
= enctypes
or 0
2123 enctypes |
= security
.KERB_ENCTYPE_FAST_SUPPORTED
2125 enctypes
= enctypes
or 0
2126 enctypes |
= security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
2127 if compound_id_support
:
2128 enctypes
= enctypes
or 0
2129 enctypes |
= security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2130 if sid_compression_support
is False:
2131 enctypes
= enctypes
or 0
2132 enctypes |
= security
.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED
2134 if enctypes
is not None:
2135 details
['msDS-SupportedEncryptionTypes'] = str(enctypes
)
2137 if delegation_to_spn
:
2138 details
['msDS-AllowedToDelegateTo'] = delegation_to_spn
2140 if delegation_from_dn
:
2141 if isinstance(delegation_from_dn
, str):
2142 delegation_from_dn
= self
.get_security_descriptor(
2144 details
['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
2147 if spn
is None and account_type
is not self
.AccountType
.USER
:
2148 spn
= 'host/' + user_name
2150 if assigned_policy
is not None:
2151 details
['msDS-AssignedAuthNPolicy'] = assigned_policy
2153 if assigned_silo
is not None:
2154 details
['msDS-AssignedAuthNPolicySilo'] = assigned_silo
2156 if logon_hours
is not None:
2157 details
['logonHours'] = logon_hours
2159 creds
, dn
= self
.create_account(samdb
, user_name
,
2160 account_type
=account_type
,
2163 additional_details
=details
,
2164 account_control
=user_account_control
,
2165 add_dollar
=add_dollar
,
2166 force_nt4_hash
=force_nt4_hash
,
2167 expired_password
=expired_password
,
2170 expected_etypes
= None
2172 # We don't force fetching the keys other than the NT hash as
2173 # how the server stores the unused KDC keys for the
2174 # smartcard_required case is not important and makes unrelated
2175 # tests break because of differences between Samba and
2178 # The NT hash is different, as it is returned to the client in
2179 # the PAC so is visible in the network behaviour.
2181 expected_etypes
= {kcrypto
.Enctype
.RC4
}
2182 keys
= self
.get_keys(creds
, expected_etypes
=expected_etypes
)
2183 self
.creds_set_keys(creds
, keys
)
2185 # Handle secret replication to the RODC.
2187 if allowed_replication
or revealed_to_rodc
:
2188 rodc_samdb
= self
.get_rodc_samdb()
2189 rodc_dn
= self
.get_server_dn(rodc_samdb
)
2191 # Allow replicating this account's secrets if requested, or allow
2192 # it only temporarily if we're about to replicate them.
2193 allowed_cleanup
= self
.add_to_group(
2195 'msDS-RevealOnDemandGroup')
2197 if revealed_to_rodc
:
2198 # Replicate this account's secrets to the RODC.
2199 self
.replicate_account_to_rodc(dn
)
2201 if not allowed_replication
:
2202 # If we don't want replicating secrets to be allowed for this
2203 # account, disable it again.
2204 samdb
.modify(allowed_cleanup
)
2206 self
.check_revealed(dn
,
2208 revealed
=revealed_to_rodc
)
2210 if denied_replication
:
2211 rodc_samdb
= self
.get_rodc_samdb()
2212 rodc_dn
= self
.get_server_dn(rodc_samdb
)
2214 # Deny replicating this account's secrets to the RODC.
2215 self
.add_to_group(dn
, rodc_dn
, 'msDS-NeverRevealGroup')
2217 # Handle secret replication to the mock RODC.
2219 if allowed_replication_mock
or revealed_to_mock_rodc
:
2220 # Allow replicating this account's secrets if requested, or allow
2221 # it only temporarily if we want to add the account to the mock
2222 # RODC's msDS-RevealedUsers.
2223 rodc_ctx
= self
.get_mock_rodc_ctx()
2224 mock_rodc_dn
= ldb
.Dn(samdb
, rodc_ctx
.acct_dn
)
2226 allowed_mock_cleanup
= self
.add_to_group(
2228 'msDS-RevealOnDemandGroup')
2230 if revealed_to_mock_rodc
:
2231 # Request replicating this account's secrets to the mock RODC,
2232 # which updates msDS-RevealedUsers.
2233 self
.reveal_account_to_mock_rodc(dn
)
2235 if not allowed_replication_mock
:
2236 # If we don't want replicating secrets to be allowed for this
2237 # account, disable it again.
2238 samdb
.modify(allowed_mock_cleanup
)
2240 self
.check_revealed(dn
,
2242 revealed
=revealed_to_mock_rodc
)
2244 if denied_replication_mock
:
2245 # Deny replicating this account's secrets to the mock RODC.
2246 rodc_ctx
= self
.get_mock_rodc_ctx()
2247 mock_rodc_dn
= ldb
.Dn(samdb
, rodc_ctx
.acct_dn
)
2249 self
.add_to_group(dn
, mock_rodc_dn
, 'msDS-NeverRevealGroup')
2251 if member_of
is not None:
2252 for group_dn
in member_of
:
2253 self
.add_to_group(dn
, ldb
.Dn(samdb
, group_dn
), 'member',
2256 if kerberos_enabled
:
2257 creds
.set_kerberos_state(MUST_USE_KERBEROS
)
2259 creds
.set_kerberos_state(DONT_USE_KERBEROS
)
2261 if secure_channel_type
is not None:
2262 creds
.set_secure_channel_type(secure_channel_type
)
2266 def get_new_username(self
):
2267 user_name
= self
.account_base
+ str(self
.account_id
)
2268 type(self
).account_id
+= 1
2272 def get_client_creds(self
,
2273 allow_missing_password
=False,
2274 allow_missing_keys
=True):
2275 def create_client_account():
2276 return self
.get_cached_creds(account_type
=self
.AccountType
.USER
)
2278 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
2279 allow_missing_password
=allow_missing_password
,
2280 allow_missing_keys
=allow_missing_keys
,
2281 fallback_creds_fn
=create_client_account
)
2284 def get_mach_creds(self
,
2285 allow_missing_password
=False,
2286 allow_missing_keys
=True):
2287 def create_mach_account():
2288 return self
.get_cached_creds(
2289 account_type
=self
.AccountType
.COMPUTER
,
2291 'fast_support': True,
2292 'claims_support': True,
2293 'compound_id_support': True,
2294 'supported_enctypes': (
2295 security
.KERB_ENCTYPE_RC4_HMAC_MD5 |
2296 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2300 c
= self
._get
_krb
5_creds
(prefix
='MAC',
2301 allow_missing_password
=allow_missing_password
,
2302 allow_missing_keys
=allow_missing_keys
,
2303 fallback_creds_fn
=create_mach_account
)
2306 def get_service_creds(self
,
2307 allow_missing_password
=False,
2308 allow_missing_keys
=True):
2309 def create_service_account():
2310 return self
.get_cached_creds(
2311 account_type
=self
.AccountType
.COMPUTER
,
2313 'trusted_to_auth_for_delegation': True,
2314 'fast_support': True,
2315 'claims_support': True,
2316 'compound_id_support': True,
2317 'supported_enctypes': (
2318 security
.KERB_ENCTYPE_RC4_HMAC_MD5 |
2319 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2323 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
2324 allow_missing_password
=allow_missing_password
,
2325 allow_missing_keys
=allow_missing_keys
,
2326 fallback_creds_fn
=create_service_account
)
2329 def get_rodc_krbtgt_creds(self
,
2331 require_strongest_key
=False):
2332 if require_strongest_key
:
2333 self
.assertTrue(require_keys
)
2335 def download_rodc_krbtgt_creds():
2336 samdb
= self
.get_samdb()
2337 rodc_samdb
= self
.get_rodc_samdb()
2339 rodc_dn
= self
.get_server_dn(rodc_samdb
)
2341 res
= samdb
.search(rodc_dn
,
2342 scope
=ldb
.SCOPE_BASE
,
2343 attrs
=['msDS-KrbTgtLink'])
2344 krbtgt_dn
= res
[0]['msDS-KrbTgtLink'][0]
2346 res
= samdb
.search(krbtgt_dn
,
2347 scope
=ldb
.SCOPE_BASE
,
2348 attrs
=['sAMAccountName',
2349 'msDS-KeyVersionNumber',
2350 'msDS-SecondaryKrbTgtNumber'])
2351 krbtgt_dn
= res
[0].dn
2352 username
= str(res
[0]['sAMAccountName'])
2354 creds
= KerberosCredentials()
2355 creds
.set_domain(self
.env_get_var('DOMAIN', 'RODC_KRBTGT'))
2356 creds
.set_realm(self
.env_get_var('REALM', 'RODC_KRBTGT'))
2357 creds
.set_username(username
)
2359 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2360 krbtgt_number
= int(res
[0]['msDS-SecondaryKrbTgtNumber'][0])
2362 rodc_kvno
= krbtgt_number
<< 16 | kvno
2363 creds
.set_kvno(rodc_kvno
)
2364 creds
.set_dn(krbtgt_dn
)
2366 keys
= self
.get_keys(creds
)
2367 self
.creds_set_keys(creds
, keys
)
2369 # The RODC krbtgt account should support the default enctypes,
2370 # although it might not have the msDS-SupportedEncryptionTypes
2372 self
.creds_set_default_enctypes(
2374 fast_support
=self
.kdc_fast_support
,
2375 claims_support
=self
.kdc_claims_support
,
2376 compound_id_support
=self
.kdc_compound_id_support
)
2380 c
= self
._get
_krb
5_creds
(prefix
='RODC_KRBTGT',
2381 allow_missing_password
=True,
2382 allow_missing_keys
=not require_keys
,
2383 require_strongest_key
=require_strongest_key
,
2384 fallback_creds_fn
=download_rodc_krbtgt_creds
)
2387 def get_mock_rodc_krbtgt_creds(self
,
2389 require_strongest_key
=False):
2390 if require_strongest_key
:
2391 self
.assertTrue(require_keys
)
2393 def create_rodc_krbtgt_account():
2394 samdb
= self
.get_samdb()
2396 rodc_ctx
= self
.get_mock_rodc_ctx()
2398 krbtgt_dn
= rodc_ctx
.new_krbtgt_dn
2400 res
= samdb
.search(base
=ldb
.Dn(samdb
, krbtgt_dn
),
2401 scope
=ldb
.SCOPE_BASE
,
2402 attrs
=['msDS-KeyVersionNumber',
2403 'msDS-SecondaryKrbTgtNumber'])
2405 username
= str(rodc_ctx
.krbtgt_name
)
2407 creds
= KerberosCredentials()
2408 creds
.set_domain(self
.env_get_var('DOMAIN', 'RODC_KRBTGT'))
2409 creds
.set_realm(self
.env_get_var('REALM', 'RODC_KRBTGT'))
2410 creds
.set_username(username
)
2412 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2413 krbtgt_number
= int(res
[0]['msDS-SecondaryKrbTgtNumber'][0])
2415 rodc_kvno
= krbtgt_number
<< 16 | kvno
2416 creds
.set_kvno(rodc_kvno
)
2419 keys
= self
.get_keys(creds
)
2420 self
.creds_set_keys(creds
, keys
)
2422 if self
.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008
:
2423 extra_bits
= (security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2424 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
)
2427 remove_bits
= (security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK |
2428 security
.KERB_ENCTYPE_RC4_HMAC_MD5
)
2429 self
.creds_set_enctypes(creds
,
2430 extra_bits
=extra_bits
,
2431 remove_bits
=remove_bits
)
2435 c
= self
._get
_krb
5_creds
(prefix
='MOCK_RODC_KRBTGT',
2436 allow_missing_password
=True,
2437 allow_missing_keys
=not require_keys
,
2438 require_strongest_key
=require_strongest_key
,
2439 fallback_creds_fn
=create_rodc_krbtgt_account
)
2442 def get_krbtgt_creds(self
,
2444 require_strongest_key
=False):
2445 if require_strongest_key
:
2446 self
.assertTrue(require_keys
)
2448 def download_krbtgt_creds():
2449 samdb
= self
.get_samdb()
2451 krbtgt_rid
= security
.DOMAIN_RID_KRBTGT
2452 krbtgt_sid
= '%s-%d' % (samdb
.get_domain_sid(), krbtgt_rid
)
2454 res
= samdb
.search(base
='<SID=%s>' % krbtgt_sid
,
2455 scope
=ldb
.SCOPE_BASE
,
2456 attrs
=['sAMAccountName',
2457 'msDS-KeyVersionNumber'])
2459 username
= str(res
[0]['sAMAccountName'])
2461 creds
= KerberosCredentials()
2462 creds
.set_domain(self
.env_get_var('DOMAIN', 'KRBTGT'))
2463 creds
.set_realm(self
.env_get_var('REALM', 'KRBTGT'))
2464 creds
.set_username(username
)
2466 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2467 creds
.set_kvno(kvno
)
2470 keys
= self
.get_keys(creds
)
2471 self
.creds_set_keys(creds
, keys
)
2473 # The krbtgt account should support the default enctypes, although
2474 # it might not (on Samba) have the msDS-SupportedEncryptionTypes
2476 self
.creds_set_default_enctypes(
2478 fast_support
=self
.kdc_fast_support
,
2479 claims_support
=self
.kdc_claims_support
,
2480 compound_id_support
=self
.kdc_compound_id_support
)
2484 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
2485 default_username
='krbtgt',
2486 allow_missing_password
=True,
2487 allow_missing_keys
=not require_keys
,
2488 require_strongest_key
=require_strongest_key
,
2489 fallback_creds_fn
=download_krbtgt_creds
)
2492 def get_dc_creds(self
,
2494 require_strongest_key
=False):
2495 if require_strongest_key
:
2496 self
.assertTrue(require_keys
)
2498 def download_dc_creds():
2499 samdb
= self
.get_samdb()
2502 dc_sid
= '%s-%d' % (samdb
.get_domain_sid(), dc_rid
)
2504 res
= samdb
.search(base
='<SID=%s>' % dc_sid
,
2505 scope
=ldb
.SCOPE_BASE
,
2506 attrs
=['sAMAccountName',
2507 'msDS-KeyVersionNumber'])
2509 username
= str(res
[0]['sAMAccountName'])
2511 creds
= KerberosCredentials()
2512 creds
.set_domain(self
.env_get_var('DOMAIN', 'DC'))
2513 creds
.set_realm(self
.env_get_var('REALM', 'DC'))
2514 creds
.set_username(username
)
2516 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2517 creds
.set_kvno(kvno
)
2518 creds
.set_workstation(username
[:-1])
2521 keys
= self
.get_keys(creds
)
2522 self
.creds_set_keys(creds
, keys
)
2524 if self
.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008
:
2525 extra_bits
= (security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2526 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
)
2529 remove_bits
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2530 self
.creds_set_enctypes(creds
,
2531 extra_bits
=extra_bits
,
2532 remove_bits
=remove_bits
)
2536 c
= self
._get
_krb
5_creds
(prefix
='DC',
2537 allow_missing_password
=True,
2538 allow_missing_keys
=not require_keys
,
2539 require_strongest_key
=require_strongest_key
,
2540 fallback_creds_fn
=download_dc_creds
)
2543 def get_server_creds(self
,
2545 require_strongest_key
=False):
2546 if require_strongest_key
:
2547 self
.assertTrue(require_keys
)
2549 def download_server_creds():
2550 samdb
= self
.get_samdb()
2552 res
= samdb
.search(base
=samdb
.get_default_basedn(),
2553 expression
=(f
'(|(sAMAccountName={self.host}*)'
2554 f
'(dNSHostName={self.host}))'),
2555 scope
=ldb
.SCOPE_SUBTREE
,
2556 attrs
=['sAMAccountName',
2557 'msDS-KeyVersionNumber'])
2558 self
.assertEqual(1, len(res
))
2560 username
= str(res
[0]['sAMAccountName'])
2562 creds
= KerberosCredentials()
2563 creds
.set_domain(self
.env_get_var('DOMAIN', 'SERVER'))
2564 creds
.set_realm(self
.env_get_var('REALM', 'SERVER'))
2565 creds
.set_username(username
)
2567 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2568 creds
.set_kvno(kvno
)
2571 keys
= self
.get_keys(creds
)
2572 self
.creds_set_keys(creds
, keys
)
2574 if self
.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008
:
2575 extra_bits
= (security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2576 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
)
2579 remove_bits
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2580 self
.creds_set_enctypes(creds
,
2581 extra_bits
=extra_bits
,
2582 remove_bits
=remove_bits
)
2586 c
= self
._get
_krb
5_creds
(prefix
='SERVER',
2587 allow_missing_password
=True,
2588 allow_missing_keys
=not require_keys
,
2589 require_strongest_key
=require_strongest_key
,
2590 fallback_creds_fn
=download_server_creds
)
2593 # Get the credentials and server principal name of either the krbtgt, or a
2594 # specially created account, with resource SID compression either supported
2596 def get_target(self
,
2602 self
.assertIsNone(compound_id
,
2603 "it's no good specifying compound id support "
2605 self
.assertIsNone(compression
,
2606 "it's no good specifying compression support "
2608 self
.assertFalse(extra_enctypes
,
2609 "it's no good specifying extra enctypes "
2611 creds
= self
.get_krbtgt_creds()
2612 sname
= self
.get_krbtgt_sname()
2614 creds
= self
.get_cached_creds(
2615 account_type
=self
.AccountType
.COMPUTER
,
2617 'supported_enctypes':
2618 security
.KERB_ENCTYPE_RC4_HMAC_MD5 |
2619 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 |
2621 'compound_id_support': compound_id
,
2622 'sid_compression_support': compression
,
2624 target_name
= creds
.get_username()
2626 if target_name
[-1] == '$':
2627 target_name
= target_name
[:-1]
2628 sname
= self
.PrincipalName_create(
2629 name_type
=NT_PRINCIPAL
,
2630 names
=['host', target_name
])
2634 def as_req(self
, cname
, sname
, realm
, etypes
, padata
=None, kdc_options
=0):
2635 """Send a Kerberos AS_REQ, returns the undecoded response
2638 till
= self
.get_KerberosTime(offset
=36000)
2640 req
= self
.AS_REQ_create(padata
=padata
,
2641 kdc_options
=str(kdc_options
),
2651 additional_tickets
=None)
2652 rep
= self
.send_recv_transaction(req
)
2655 def get_as_rep_key(self
, creds
, rep
):
2656 """Extract the session key from an AS-REP
2658 rep_padata
= self
.der_decode(
2660 asn1Spec
=krb5_asn1
.METHOD_DATA())
2662 for pa
in rep_padata
:
2663 if pa
['padata-type'] == PADATA_ETYPE_INFO2
:
2664 padata_value
= pa
['padata-value']
2667 self
.fail('expected to find ETYPE-INFO2')
2669 etype_info2
= self
.der_decode(
2670 padata_value
, asn1Spec
=krb5_asn1
.ETYPE_INFO2())
2672 key
= self
.PasswordKey_from_etype_info2(creds
, etype_info2
[0],
2676 def get_enc_timestamp_pa_data(self
, creds
, rep
, skew
=0):
2677 """generate the pa_data data element for an AS-REQ
2680 key
= self
.get_as_rep_key(creds
, rep
)
2682 return self
.get_enc_timestamp_pa_data_from_key(key
, skew
=skew
)
2684 def get_enc_timestamp_pa_data_from_key(self
, key
, skew
=0):
2685 (patime
, pausec
) = self
.get_KerberosTimeWithUsec(offset
=skew
)
2686 padata
= self
.PA_ENC_TS_ENC_create(patime
, pausec
)
2687 padata
= self
.der_encode(padata
, asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
2689 padata
= self
.EncryptedData_create(key
, KU_PA_ENC_TIMESTAMP
, padata
)
2690 padata
= self
.der_encode(padata
, asn1Spec
=krb5_asn1
.EncryptedData())
2692 padata
= self
.PA_DATA_create(PADATA_ENC_TIMESTAMP
, padata
)
2696 def get_challenge_pa_data(self
, client_challenge_key
, skew
=0):
2697 patime
, pausec
= self
.get_KerberosTimeWithUsec(offset
=skew
)
2698 padata
= self
.PA_ENC_TS_ENC_create(patime
, pausec
)
2699 padata
= self
.der_encode(padata
,
2700 asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
2702 padata
= self
.EncryptedData_create(client_challenge_key
,
2703 KU_ENC_CHALLENGE_CLIENT
,
2705 padata
= self
.der_encode(padata
,
2706 asn1Spec
=krb5_asn1
.EncryptedData())
2708 padata
= self
.PA_DATA_create(PADATA_ENCRYPTED_CHALLENGE
,
2713 def get_as_rep_enc_data(self
, key
, rep
):
2714 """ Decrypt and Decode the encrypted data in an AS-REP
2716 enc_part
= key
.decrypt(KU_AS_REP_ENC_PART
, rep
['enc-part']['cipher'])
2717 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2718 # application tag 26
2720 enc_part
= self
.der_decode(
2721 enc_part
, asn1Spec
=krb5_asn1
.EncASRepPart())
2723 enc_part
= self
.der_decode(
2724 enc_part
, asn1Spec
=krb5_asn1
.EncTGSRepPart())
2728 def check_pre_authentication(self
, rep
):
2729 """ Check that the kdc response was pre-authentication required
2731 self
.check_error_rep(rep
, KDC_ERR_PREAUTH_REQUIRED
)
2733 def check_as_reply(self
, rep
):
2734 """ Check that the kdc response is an AS-REP and that the
2740 match the expected values
2742 self
.check_reply(rep
, msg_type
=KRB_AS_REP
)
2744 def check_tgs_reply(self
, rep
):
2745 """ Check that the kdc response is an TGS-REP and that the
2751 match the expected values
2753 self
.check_reply(rep
, msg_type
=KRB_TGS_REP
)
2755 def check_reply(self
, rep
, msg_type
):
2757 # Should have a reply, and it should an TGS-REP message.
2758 self
.assertIsNotNone(rep
)
2759 self
.assertEqual(rep
['msg-type'], msg_type
, "rep = {%s}" % rep
)
2761 # Protocol version number should be 5
2762 pvno
= int(rep
['pvno'])
2763 self
.assertEqual(5, pvno
, "rep = {%s}" % rep
)
2765 # The ticket version number should be 5
2766 tkt_vno
= int(rep
['ticket']['tkt-vno'])
2767 self
.assertEqual(5, tkt_vno
, "rep = {%s}" % rep
)
2769 # Check that the kvno is not an RODC kvno
2770 # MIT kerberos does not provide the kvno, so we treat it as optional.
2771 # This is tested in compatability_test.py
2772 if 'kvno' in rep
['enc-part']:
2773 kvno
= int(rep
['enc-part']['kvno'])
2774 # If the high order bits are set this is an RODC kvno.
2775 self
.assertEqual(0, kvno
& 0xFFFF0000, "rep = {%s}" % rep
)
2777 def check_error_rep(self
, rep
, expected
):
2778 """ Check that the reply is an error message, with the expected
2779 error-code specified.
2781 self
.assertIsNotNone(rep
)
2782 self
.assertEqual(rep
['msg-type'], KRB_ERROR
, "rep = {%s}" % rep
)
2783 if isinstance(expected
, collections
.abc
.Container
):
2784 self
.assertIn(rep
['error-code'], expected
, "rep = {%s}" % rep
)
2786 self
.assertEqual(rep
['error-code'], expected
, "rep = {%s}" % rep
)
2788 def tgs_req(self
, cname
, sname
, realm
, ticket
, key
, etypes
,
2789 expected_error_mode
=0, padata
=None, kdc_options
=0,
2790 to_rodc
=False, creds
=None, service_creds
=None, expect_pac
=True,
2791 expect_edata
=None, expected_flags
=None, unexpected_flags
=None):
2792 """Send a TGS-REQ, returns the response and the decrypted and
2796 subkey
= self
.RandomKey(key
.etype
)
2798 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
2800 tgt
= KerberosTicketCreds(ticket
,
2805 if service_creds
is not None:
2806 decryption_key
= self
.TicketDecryptionKey_from_creds(
2808 expected_supported_etypes
= service_creds
.tgs_supported_enctypes
2810 decryption_key
= None
2811 expected_supported_etypes
= None
2813 if not expected_error_mode
:
2814 check_error_fn
= None
2815 check_rep_fn
= self
.generic_check_kdc_rep
2817 check_error_fn
= self
.generic_check_kdc_error
2820 def generate_padata(_kdc_exchange_dict
,
2824 return padata
, req_body
2826 kdc_exchange_dict
= self
.tgs_exchange_dict(
2828 expected_crealm
=realm
,
2829 expected_cname
=cname
,
2830 expected_srealm
=realm
,
2831 expected_sname
=sname
,
2832 expected_error_mode
=expected_error_mode
,
2833 expected_flags
=expected_flags
,
2834 unexpected_flags
=unexpected_flags
,
2835 expected_supported_etypes
=expected_supported_etypes
,
2836 check_error_fn
=check_error_fn
,
2837 check_rep_fn
=check_rep_fn
,
2838 check_kdc_private_fn
=self
.generic_check_kdc_private
,
2839 ticket_decryption_key
=decryption_key
,
2840 generate_padata_fn
=generate_padata
if padata
is not None else None,
2842 authenticator_subkey
=subkey
,
2843 kdc_options
=str(kdc_options
),
2844 expect_edata
=expect_edata
,
2845 expect_pac
=expect_pac
,
2848 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
2854 if expected_error_mode
:
2857 ticket_creds
= kdc_exchange_dict
['rep_ticket_creds']
2858 enc_part
= ticket_creds
.encpart_private
2860 return rep
, enc_part
2862 def get_service_ticket(self
, tgt
, target_creds
, service
='host',
2864 target_name
=None, till
=None, rc4_support
=True,
2865 to_rodc
=False, kdc_options
=None,
2866 expected_flags
=None, unexpected_flags
=None,
2867 expected_groups
=None,
2868 unexpected_groups
=None,
2869 expect_client_claims
=None,
2870 expect_device_claims
=None,
2871 expected_client_claims
=None,
2872 unexpected_client_claims
=None,
2873 expected_device_claims
=None,
2874 unexpected_device_claims
=None,
2875 pac_request
=True, expect_pac
=True,
2876 expect_requester_sid
=None,
2877 expect_pac_attrs
=None,
2878 expect_pac_attrs_pac_request
=None,
2880 user_name
= tgt
.cname
['name-string'][0]
2881 ticket_sname
= tgt
.sname
2882 if target_name
is None:
2883 target_name
= target_creds
.get_username()[:-1]
2885 self
.assertIsNone(sname
, 'supplied both target name and sname')
2886 cache_key
= (user_name
, target_name
, service
, to_rodc
, kdc_options
,
2887 pac_request
, str(expected_flags
), str(unexpected_flags
),
2891 str(expected_groups
),
2892 str(unexpected_groups
),
2893 expect_client_claims
, expect_device_claims
,
2894 str(expected_client_claims
),
2895 str(unexpected_client_claims
),
2896 str(expected_device_claims
),
2897 str(unexpected_device_claims
),
2899 expect_requester_sid
,
2901 expect_pac_attrs_pac_request
)
2904 ticket
= self
.tkt_cache
.get(cache_key
)
2906 if ticket
is not None:
2909 etype
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
2911 if kdc_options
is None:
2913 kdc_options
= str(krb5_asn1
.KDCOptions(kdc_options
))
2916 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
2917 names
=[service
, target_name
])
2919 srealm
= target_creds
.get_realm()
2921 authenticator_subkey
= self
.RandomKey(kcrypto
.Enctype
.AES256
)
2923 decryption_key
= self
.TicketDecryptionKey_from_creds(target_creds
)
2925 kdc_exchange_dict
= self
.tgs_exchange_dict(
2926 expected_crealm
=tgt
.crealm
,
2927 expected_cname
=tgt
.cname
,
2928 expected_srealm
=srealm
,
2929 expected_sname
=sname
,
2930 expected_supported_etypes
=target_creds
.tgs_supported_enctypes
,
2931 expected_flags
=expected_flags
,
2932 unexpected_flags
=unexpected_flags
,
2933 expected_groups
=expected_groups
,
2934 unexpected_groups
=unexpected_groups
,
2935 expect_client_claims
=expect_client_claims
,
2936 expect_device_claims
=expect_device_claims
,
2937 expected_client_claims
=expected_client_claims
,
2938 unexpected_client_claims
=unexpected_client_claims
,
2939 expected_device_claims
=expected_device_claims
,
2940 unexpected_device_claims
=unexpected_device_claims
,
2941 ticket_decryption_key
=decryption_key
,
2942 check_rep_fn
=self
.generic_check_kdc_rep
,
2943 check_kdc_private_fn
=self
.generic_check_kdc_private
,
2945 authenticator_subkey
=authenticator_subkey
,
2946 kdc_options
=kdc_options
,
2947 pac_request
=pac_request
,
2948 expect_pac
=expect_pac
,
2949 expect_requester_sid
=expect_requester_sid
,
2950 expect_pac_attrs
=expect_pac_attrs
,
2951 expect_pac_attrs_pac_request
=expect_pac_attrs_pac_request
,
2952 rc4_support
=rc4_support
,
2955 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
2961 self
.check_tgs_reply(rep
)
2963 service_ticket_creds
= kdc_exchange_dict
['rep_ticket_creds']
2966 krbtgt_creds
= self
.get_rodc_krbtgt_creds()
2968 krbtgt_creds
= self
.get_krbtgt_creds()
2969 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
2971 is_tgs_princ
= self
.is_tgs_principal(sname
)
2972 expect_ticket_checksum
= (self
.tkt_sig_support
2973 and not is_tgs_princ
)
2974 expect_full_checksum
= (self
.full_sig_support
2975 and not is_tgs_princ
)
2976 self
.verify_ticket(service_ticket_creds
, krbtgt_key
,
2977 service_ticket
=True, expect_pac
=expect_pac
,
2978 expect_ticket_checksum
=expect_ticket_checksum
,
2979 expect_full_checksum
=expect_full_checksum
)
2981 self
.tkt_cache
[cache_key
] = service_ticket_creds
2983 return service_ticket_creds
2985 def get_tgt(self
, creds
, to_rodc
=False, kdc_options
=None,
2986 client_account
=None, client_name_type
=NT_PRINCIPAL
,
2987 target_creds
=None, ticket_etype
=None,
2988 expected_flags
=None, unexpected_flags
=None,
2989 expected_account_name
=None, expected_upn_name
=None,
2990 expected_cname
=None,
2992 sname
=None, realm
=None,
2993 expected_groups
=None,
2994 unexpected_groups
=None,
2995 pac_request
=True, expect_pac
=True,
2996 expect_pac_attrs
=None, expect_pac_attrs_pac_request
=None,
2998 expect_requester_sid
=None,
3001 expect_client_claims
=None, expect_device_claims
=None,
3002 expected_client_claims
=None, unexpected_client_claims
=None,
3003 expected_device_claims
=None, unexpected_device_claims
=None,
3005 if client_account
is not None:
3006 user_name
= client_account
3008 user_name
= creds
.get_username()
3010 cache_key
= (user_name
, to_rodc
, kdc_options
, pac_request
, pac_options
,
3013 str(expected_flags
), str(unexpected_flags
),
3014 expected_account_name
, expected_upn_name
, expected_sid
,
3015 str(sname
), str(realm
),
3016 str(expected_groups
),
3017 str(unexpected_groups
),
3018 str(expected_cname
),
3021 expect_pac
, expect_pac_attrs
,
3022 expect_pac_attrs_pac_request
, expect_requester_sid
,
3023 expect_client_claims
, expect_device_claims
,
3024 str(expected_client_claims
),
3025 str(unexpected_client_claims
),
3026 str(expected_device_claims
),
3027 str(unexpected_device_claims
))
3030 tgt
= self
.tkt_cache
.get(cache_key
)
3036 realm
= creds
.get_realm()
3038 salt
= creds
.get_salt()
3040 etype
= self
.get_default_enctypes(creds
)
3041 cname
= self
.PrincipalName_create(name_type
=client_name_type
,
3042 names
=user_name
.split('/'))
3044 sname
= self
.PrincipalName_create(name_type
=NT_SRV_INST
,
3045 names
=['krbtgt', realm
])
3046 expected_sname
= self
.PrincipalName_create(
3047 name_type
=NT_SRV_INST
, names
=['krbtgt', realm
.upper()])
3049 expected_sname
= sname
3051 if expected_cname
is None:
3052 expected_cname
= cname
3054 till
= self
.get_KerberosTime(offset
=36000)
3056 if target_creds
is not None:
3057 krbtgt_creds
= target_creds
3059 krbtgt_creds
= self
.get_rodc_krbtgt_creds()
3061 krbtgt_creds
= self
.get_krbtgt_creds()
3062 ticket_decryption_key
= (
3063 self
.TicketDecryptionKey_from_creds(krbtgt_creds
,
3064 etype
=ticket_etype
))
3066 expected_etypes
= krbtgt_creds
.tgs_supported_enctypes
3068 if kdc_options
is None:
3069 kdc_options
= ('forwardable,'
3073 kdc_options
= krb5_asn1
.KDCOptions(kdc_options
)
3075 if pac_options
is None:
3076 pac_options
= '1' # supports claims
3078 rep
, kdc_exchange_dict
= self
._test
_as
_exchange
(
3084 expected_error_mode
=KDC_ERR_PREAUTH_REQUIRED
,
3085 expected_crealm
=realm
,
3086 expected_cname
=expected_cname
,
3087 expected_srealm
=realm
,
3088 expected_sname
=sname
,
3089 expected_account_name
=expected_account_name
,
3090 expected_upn_name
=expected_upn_name
,
3091 expected_sid
=expected_sid
,
3092 expected_groups
=expected_groups
,
3093 unexpected_groups
=unexpected_groups
,
3095 expected_flags
=expected_flags
,
3096 unexpected_flags
=unexpected_flags
,
3097 expected_supported_etypes
=expected_etypes
,
3100 kdc_options
=kdc_options
,
3102 ticket_decryption_key
=ticket_decryption_key
,
3103 pac_request
=pac_request
,
3104 pac_options
=pac_options
,
3105 expect_pac
=expect_pac
,
3106 expect_pac_attrs
=expect_pac_attrs
,
3107 expect_pac_attrs_pac_request
=expect_pac_attrs_pac_request
,
3108 expect_requester_sid
=expect_requester_sid
,
3109 rc4_support
=rc4_support
,
3110 expect_edata
=expect_edata
,
3111 expect_client_claims
=expect_client_claims
,
3112 expect_device_claims
=expect_device_claims
,
3113 expected_client_claims
=expected_client_claims
,
3114 unexpected_client_claims
=unexpected_client_claims
,
3115 expected_device_claims
=expected_device_claims
,
3116 unexpected_device_claims
=unexpected_device_claims
,
3118 self
.check_pre_authentication(rep
)
3120 etype_info2
= kdc_exchange_dict
['preauth_etype_info2']
3122 preauth_key
= self
.PasswordKey_from_etype_info2(creds
,
3126 ts_enc_padata
= self
.get_enc_timestamp_pa_data_from_key(preauth_key
)
3128 padata
= [ts_enc_padata
]
3130 expected_realm
= realm
.upper()
3132 rep
, kdc_exchange_dict
= self
._test
_as
_exchange
(
3138 expected_error_mode
=0,
3139 expected_crealm
=expected_realm
,
3140 expected_cname
=expected_cname
,
3141 expected_srealm
=expected_realm
,
3142 expected_sname
=expected_sname
,
3143 expected_account_name
=expected_account_name
,
3144 expected_upn_name
=expected_upn_name
,
3145 expected_sid
=expected_sid
,
3146 expected_groups
=expected_groups
,
3147 unexpected_groups
=unexpected_groups
,
3149 expected_flags
=expected_flags
,
3150 unexpected_flags
=unexpected_flags
,
3151 expected_supported_etypes
=expected_etypes
,
3154 kdc_options
=kdc_options
,
3155 preauth_key
=preauth_key
,
3156 ticket_decryption_key
=ticket_decryption_key
,
3157 pac_request
=pac_request
,
3158 pac_options
=pac_options
,
3159 expect_pac
=expect_pac
,
3160 expect_pac_attrs
=expect_pac_attrs
,
3161 expect_pac_attrs_pac_request
=expect_pac_attrs_pac_request
,
3162 expect_requester_sid
=expect_requester_sid
,
3163 rc4_support
=rc4_support
,
3164 expect_edata
=expect_edata
,
3165 expect_client_claims
=expect_client_claims
,
3166 expect_device_claims
=expect_device_claims
,
3167 expected_client_claims
=expected_client_claims
,
3168 unexpected_client_claims
=unexpected_client_claims
,
3169 expected_device_claims
=expected_device_claims
,
3170 unexpected_device_claims
=unexpected_device_claims
,
3172 self
.check_as_reply(rep
)
3174 ticket_creds
= kdc_exchange_dict
['rep_ticket_creds']
3176 self
.tkt_cache
[cache_key
] = ticket_creds
3180 def _make_tgs_request(self
, client_creds
, service_creds
, tgt
,
3181 client_account
=None,
3182 client_name_type
=NT_PRINCIPAL
,
3184 pac_request
=None, expect_pac
=True,
3186 expected_cname
=None,
3187 expected_account_name
=None,
3188 expected_upn_name
=None,
3190 if client_account
is None:
3191 client_account
= client_creds
.get_username()
3192 cname
= self
.PrincipalName_create(name_type
=client_name_type
,
3193 names
=client_account
.split('/'))
3195 service_account
= service_creds
.get_username()
3196 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
3197 names
=[service_account
])
3199 realm
= service_creds
.get_realm()
3201 expected_crealm
= realm
3202 if expected_cname
is None:
3203 expected_cname
= cname
3204 expected_srealm
= realm
3205 expected_sname
= sname
3207 expected_supported_etypes
= service_creds
.tgs_supported_enctypes
3209 etypes
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
3211 if kdc_options
is None:
3212 kdc_options
= 'canonicalize'
3213 kdc_options
= str(krb5_asn1
.KDCOptions(kdc_options
))
3215 target_decryption_key
= self
.TicketDecryptionKey_from_creds(
3218 authenticator_subkey
= self
.RandomKey(kcrypto
.Enctype
.AES256
)
3221 expected_error_mode
= expect_error
3222 if expected_error_mode
is True:
3223 expected_error_mode
= KDC_ERR_TGT_REVOKED
3224 check_error_fn
= self
.generic_check_kdc_error
3227 expected_error_mode
= 0
3228 check_error_fn
= None
3229 check_rep_fn
= self
.generic_check_kdc_rep
3231 kdc_exchange_dict
= self
.tgs_exchange_dict(
3232 expected_crealm
=expected_crealm
,
3233 expected_cname
=expected_cname
,
3234 expected_srealm
=expected_srealm
,
3235 expected_sname
=expected_sname
,
3236 expected_account_name
=expected_account_name
,
3237 expected_upn_name
=expected_upn_name
,
3238 expected_sid
=expected_sid
,
3239 expected_supported_etypes
=expected_supported_etypes
,
3240 ticket_decryption_key
=target_decryption_key
,
3241 check_error_fn
=check_error_fn
,
3242 check_rep_fn
=check_rep_fn
,
3243 check_kdc_private_fn
=self
.generic_check_kdc_private
,
3244 expected_error_mode
=expected_error_mode
,
3246 authenticator_subkey
=authenticator_subkey
,
3247 kdc_options
=kdc_options
,
3248 pac_request
=pac_request
,
3249 expect_pac
=expect_pac
,
3252 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
3258 self
.check_error_rep(rep
, expected_error_mode
)
3262 self
.check_reply(rep
, KRB_TGS_REP
)
3264 return kdc_exchange_dict
['rep_ticket_creds']
3266 # Named tuple to contain values of interest when the PAC is decoded.
3267 PacData
= namedtuple(
3269 "account_name account_sid logon_name upn domain_name")
3271 def get_pac_data(self
, authorization_data
):
3272 """Decode the PAC element contained in the authorization-data element
3280 # The PAC data will be wrapped in an AD_IF_RELEVANT element
3281 ad_if_relevant_elements
= (
3282 x
for x
in authorization_data
if x
['ad-type'] == AD_IF_RELEVANT
)
3283 for dt
in ad_if_relevant_elements
:
3284 buf
= self
.der_decode(
3285 dt
['ad-data'], asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
3286 # The PAC data is further wrapped in a AD_WIN2K_PAC element
3287 for ad
in (x
for x
in buf
if x
['ad-type'] == AD_WIN2K_PAC
):
3288 pb
= ndr_unpack(krb5pac
.PAC_DATA
, ad
['ad-data'])
3289 for pac
in pb
.buffers
:
3290 if pac
.type == krb5pac
.PAC_TYPE_LOGON_INFO
:
3292 pac
.info
.info
.info3
.base
.account_name
)
3294 str(pac
.info
.info
.info3
.base
.domain_sid
)
3295 + "-" + str(pac
.info
.info
.info3
.base
.rid
))
3296 elif pac
.type == krb5pac
.PAC_TYPE_LOGON_NAME
:
3297 logon_name
= pac
.info
.account_name
3298 elif pac
.type == krb5pac
.PAC_TYPE_UPN_DNS_INFO
:
3299 upn
= pac
.info
.upn_name
3300 domain_name
= pac
.info
.dns_domain_name
3302 return self
.PacData(
3309 def decode_service_ticket(self
, creds
, ticket
):
3310 """Decrypt and decode a service ticket
3313 enc_part
= ticket
['enc-part']
3315 key
= self
.TicketDecryptionKey_from_creds(creds
,
3318 if key
.kvno
is not None:
3319 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
3321 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
3322 enc_ticket_part
= self
.der_decode(
3323 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
3324 return enc_ticket_part
3326 def modify_ticket_flag(self
, enc_part
, flag
, value
):
3327 self
.assertIsInstance(value
, bool)
3329 flag
= krb5_asn1
.TicketFlags(flag
)
3330 pos
= len(tuple(flag
)) - 1
3332 flags
= enc_part
['flags']
3333 self
.assertLessEqual(pos
, len(flags
))
3335 new_flags
= flags
[:pos
] + str(int(value
)) + flags
[pos
+ 1:]
3336 enc_part
['flags'] = new_flags
3340 def get_objectSid(self
, samdb
, dn
):
3341 """ Get the objectSID for a DN
3342 Note: performs an Ldb query.
3344 res
= samdb
.search(dn
, scope
=SCOPE_BASE
, attrs
=["objectSID"])
3345 self
.assertTrue(len(res
) == 1, "did not get objectSid for %s" % dn
)
3346 sid
= samdb
.schema_format_value("objectSID", res
[0]["objectSID"][0])
3347 return sid
.decode('utf8')
3349 def add_attribute(self
, samdb
, dn_str
, name
, value
):
3350 if isinstance(value
, list):
3354 flag
= ldb
.FLAG_MOD_ADD
3356 dn
= ldb
.Dn(samdb
, dn_str
)
3357 msg
= ldb
.Message(dn
)
3358 msg
[name
] = ldb
.MessageElement(values
, flag
, name
)
3361 def modify_attribute(self
, samdb
, dn_str
, name
, value
):
3362 if isinstance(value
, list):
3366 flag
= ldb
.FLAG_MOD_REPLACE
3368 dn
= ldb
.Dn(samdb
, dn_str
)
3369 msg
= ldb
.Message(dn
)
3370 msg
[name
] = ldb
.MessageElement(values
, flag
, name
)
3373 def remove_attribute(self
, samdb
, dn_str
, name
):
3374 flag
= ldb
.FLAG_MOD_DELETE
3376 dn
= ldb
.Dn(samdb
, dn_str
)
3377 msg
= ldb
.Message(dn
)
3378 msg
[name
] = ldb
.MessageElement([], flag
, name
)
3381 def create_ccache(self
, cname
, ticket
, enc_part
):
3382 """ Lay out a version 4 on-disk credentials cache, to be read using the
3386 field
= krb5ccache
.DELTATIME_TAG()
3387 field
.kdc_sec_offset
= 0
3388 field
.kdc_usec_offset
= 0
3390 v4tag
= krb5ccache
.V4TAG()
3394 v4tags
= krb5ccache
.V4TAGS()
3396 v4tags
.further_tags
= b
''
3398 optional_header
= krb5ccache
.V4HEADER()
3399 optional_header
.v4tags
= v4tags
3401 cname_string
= cname
['name-string']
3403 cprincipal
= krb5ccache
.PRINCIPAL()
3404 cprincipal
.name_type
= cname
['name-type']
3405 cprincipal
.component_count
= len(cname_string
)
3406 cprincipal
.realm
= ticket
['realm']
3407 cprincipal
.components
= cname_string
3409 sname
= ticket
['sname']
3410 sname_string
= sname
['name-string']
3412 sprincipal
= krb5ccache
.PRINCIPAL()
3413 sprincipal
.name_type
= sname
['name-type']
3414 sprincipal
.component_count
= len(sname_string
)
3415 sprincipal
.realm
= ticket
['realm']
3416 sprincipal
.components
= sname_string
3418 key
= self
.EncryptionKey_import(enc_part
['key'])
3420 key_data
= key
.export_obj()
3421 keyblock
= krb5ccache
.KEYBLOCK()
3422 keyblock
.enctype
= key_data
['keytype']
3423 keyblock
.data
= key_data
['keyvalue']
3425 addresses
= krb5ccache
.ADDRESSES()
3429 authdata
= krb5ccache
.AUTHDATA()
3433 # Re-encode the ticket, since it was decoded by another layer.
3434 ticket_data
= self
.der_encode(ticket
, asn1Spec
=krb5_asn1
.Ticket())
3436 authtime
= enc_part
['authtime']
3437 starttime
= enc_part
.get('starttime', authtime
)
3438 endtime
= enc_part
['endtime']
3440 cred
= krb5ccache
.CREDENTIAL()
3441 cred
.client
= cprincipal
3442 cred
.server
= sprincipal
3443 cred
.keyblock
= keyblock
3444 cred
.authtime
= self
.get_EpochFromKerberosTime(authtime
)
3445 cred
.starttime
= self
.get_EpochFromKerberosTime(starttime
)
3446 cred
.endtime
= self
.get_EpochFromKerberosTime(endtime
)
3448 # Account for clock skew of up to five minutes.
3449 self
.assertLess(cred
.authtime
- 5 * 60,
3450 datetime
.now(timezone
.utc
).timestamp(),
3451 "Ticket not yet valid - clocks may be out of sync.")
3452 self
.assertLess(cred
.starttime
- 5 * 60,
3453 datetime
.now(timezone
.utc
).timestamp(),
3454 "Ticket not yet valid - clocks may be out of sync.")
3455 self
.assertGreater(cred
.endtime
- 60 * 60,
3456 datetime
.now(timezone
.utc
).timestamp(),
3457 "Ticket already expired/about to expire - "
3458 "clocks may be out of sync.")
3460 cred
.renew_till
= cred
.endtime
3462 cred
.ticket_flags
= int(enc_part
['flags'], 2)
3463 cred
.addresses
= addresses
3464 cred
.authdata
= authdata
3465 cred
.ticket
= ticket_data
3466 cred
.second_ticket
= b
''
3468 ccache
= krb5ccache
.CCACHE()
3471 ccache
.optional_header
= optional_header
3472 ccache
.principal
= cprincipal
3475 # Serialise the credentials cache structure.
3476 result
= ndr_pack(ccache
)
3478 # Create a temporary file and write the credentials.
3479 cachefile
= tempfile
.NamedTemporaryFile(dir=self
.tempdir
, delete
=False)
3480 cachefile
.write(result
)
3485 def create_ccache_with_ticket(self
, user_credentials
, ticket
, pac
=True):
3486 # Place the ticket into a newly created credentials cache file.
3488 user_name
= user_credentials
.get_username()
3489 realm
= user_credentials
.get_realm()
3491 cname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
3495 ticket
= self
.modified_ticket(ticket
, exclude_pac
=True)
3497 # Write the ticket into a credentials cache file that can be ingested
3498 # by the main credentials code.
3499 cachefile
= self
.create_ccache(cname
, ticket
.ticket
,
3500 ticket
.encpart_private
)
3502 # Create a credentials object to reference the credentials cache.
3503 creds
= Credentials()
3504 creds
.set_kerberos_state(MUST_USE_KERBEROS
)
3505 creds
.set_username(user_name
, SPECIFIED
)
3506 creds
.set_realm(realm
)
3507 creds
.set_named_ccache(cachefile
.name
, SPECIFIED
, self
.get_lp())
3509 # Return the credentials along with the cache file.
3510 return (creds
, cachefile
)
3512 def create_ccache_with_user(self
, user_credentials
, mach_credentials
,
3513 service
="host", target_name
=None, pac
=True):
3514 # Obtain a service ticket authorising the user and place it into a
3515 # newly created credentials cache file.
3517 tgt
= self
.get_tgt(user_credentials
)
3519 ticket
= self
.get_service_ticket(tgt
, mach_credentials
,
3521 target_name
=target_name
)
3523 return self
.create_ccache_with_ticket(user_credentials
, ticket
,
3526 # Test credentials by connecting to the DC through LDAP.
3527 def _connect(self
, creds
, simple_bind
, expect_error
=None):
3528 samdb
= self
.get_samdb()
3532 url
= f
'ldaps://{samdb.host_dns_name()}'
3533 creds
.set_bind_dn(str(dn
))
3535 url
= f
'ldap://{samdb.host_dns_name()}'
3536 creds
.set_bind_dn(None)
3538 ldap
= SamDB(url
=url
,
3541 except ldb
.LdbError
as err
:
3542 self
.assertIsNotNone(expect_error
, 'got unexpected error')
3543 num
, estr
= err
.args
3544 if num
!= ldb
.ERR_INVALID_CREDENTIALS
:
3547 self
.assertIn(expect_error
, estr
)
3551 self
.assertIsNone(expect_error
, 'expected to get an error')
3553 res
= ldap
.search('',
3554 scope
=ldb
.SCOPE_BASE
,
3555 attrs
=['tokenGroups'])
3556 self
.assertEqual(1, len(res
))
3558 sid
= creds
.get_sid()
3560 token_groups
= res
[0].get('tokenGroups', idx
=0)
3561 token_sid
= ndr_unpack(security
.dom_sid
, token_groups
)
3563 self
.assertEqual(sid
, str(token_sid
))
3565 # Test the two SAMR password change methods implemented in Samba. If the
3566 # user is protected, we should get an ACCOUNT_RESTRICTION error indicating
3567 # that the password change is not allowed.
3568 def _test_samr_change_password(self
, creds
, expect_error
,
3569 connect_error
=None):
3570 samdb
= self
.get_samdb()
3571 server_name
= samdb
.host_dns_name()
3573 conn
= samr
.samr(f
'ncacn_np:{server_name}[seal,smb2]',
3576 except NTSTATUSError
as err
:
3577 self
.assertIsNotNone(connect_error
,
3578 'connection unexpectedly failed')
3579 self
.assertIsNone(expect_error
, 'don’t specify both errors')
3582 self
.assertEqual(num
, connect_error
)
3586 self
.assertIsNone(connect_error
, 'expected connection to fail')
3589 nt_hash
= creds
.get_nt_hash()
3591 # Generate a new UTF-16 password.
3592 new_password_str
= generate_random_password(32, 32)
3593 new_password
= new_password_str
.encode('utf-16le')
3595 # Generate the MD4 hash of the password.
3596 new_password_md4
= md4_hash_blob(new_password
)
3598 # Prefix the password with padding so it is 512 bytes long.
3599 new_password_len
= len(new_password
)
3600 remaining_len
= 512 - new_password_len
3601 new_password
= bytes(remaining_len
) + new_password
3603 # Append the 32-bit length of the password.
3604 new_password
+= int.to_bytes(new_password_len
,
3608 # Create a key from the MD4 hash of the new password.
3609 key
= new_password_md4
[:14]
3611 # Encrypt the old NT hash with DES to obtain the verifier.
3612 verifier
= des_crypt_blob_16(nt_hash
, key
)
3614 server
= lsa
.String()
3615 server
.string
= server_name
3617 account
= lsa
.String()
3618 account
.string
= creds
.get_username()
3620 nt_verifier
= samr
.Password()
3621 nt_verifier
.hash = list(verifier
)
3623 nt_password
= samr
.CryptPassword()
3624 nt_password
.data
= list(arcfour_encrypt(nt_hash
, new_password
))
3626 if not self
.expect_nt_hash
:
3627 expect_error
= ntstatus
.NT_STATUS_NTLM_BLOCKED
3630 conn
.ChangePasswordUser2(server
=server
,
3632 nt_password
=nt_password
,
3633 nt_verifier
=nt_verifier
,
3637 except NTSTATUSError
as err
:
3639 self
.assertIsNotNone(expect_error
,
3640 f
'unexpectedly failed with {num:08X}')
3641 self
.assertEqual(num
, expect_error
)
3643 self
.assertIsNone(expect_error
, 'expected to fail')
3645 creds
.set_password(new_password_str
)
3648 nt_hash
= creds
.get_nt_hash()
3650 # Generate a new UTF-16 password.
3651 new_password
= generate_random_password(32, 32)
3652 new_password
= new_password
.encode('utf-16le')
3654 # Generate the MD4 hash of the password.
3655 new_password_md4
= md4_hash_blob(new_password
)
3657 # Prefix the password with padding so it is 512 bytes long.
3658 new_password_len
= len(new_password
)
3659 remaining_len
= 512 - new_password_len
3660 new_password
= bytes(remaining_len
) + new_password
3662 # Append the 32-bit length of the password.
3663 new_password
+= int.to_bytes(new_password_len
,
3667 # Create a key from the MD4 hash of the new password.
3668 key
= new_password_md4
[:14]
3670 # Encrypt the old NT hash with DES to obtain the verifier.
3671 verifier
= des_crypt_blob_16(nt_hash
, key
)
3673 nt_verifier
.hash = list(verifier
)
3675 nt_password
.data
= list(arcfour_encrypt(nt_hash
, new_password
))
3678 conn
.ChangePasswordUser3(server
=server
,
3680 nt_password
=nt_password
,
3681 nt_verifier
=nt_verifier
,
3686 except NTSTATUSError
as err
:
3687 self
.assertIsNotNone(expect_error
, 'unexpectedly failed')
3690 self
.assertEqual(num
, expect_error
)
3692 self
.assertIsNone(expect_error
, 'expected to fail')
3694 # Test SamLogon. Authentication should succeed for non-protected accounts,
3695 # and fail for protected accounts.
3696 def _test_samlogon(self
, creds
, logon_type
, expect_error
=None,
3697 validation_level
=netlogon
.NetlogonValidationSamInfo2
,
3698 domain_joined_mach_creds
=None):
3699 samdb
= self
.get_samdb()
3701 if domain_joined_mach_creds
is None:
3702 domain_joined_mach_creds
= self
.get_cached_creds(
3703 account_type
=self
.AccountType
.COMPUTER
,
3704 opts
={'secure_channel_type': misc
.SEC_CHAN_WKSTA
})
3706 dc_server
= samdb
.host_dns_name()
3707 username
, domain
= creds
.get_ntlm_username_domain()
3708 workstation
= domain_joined_mach_creds
.get_username()
3710 # Calling this initializes netlogon_creds on mach_creds, as is required
3711 # before calling mach_creds.encrypt_netr_PasswordInfo().
3712 conn
= netlogon
.netlogon(f
'ncacn_ip_tcp:{dc_server}[schannel,seal]',
3714 domain_joined_mach_creds
)
3715 (auth_type
, auth_level
) = conn
.auth_info()
3717 if logon_type
== netlogon
.NetlogonInteractiveInformation
:
3718 logon
= netlogon
.netr_PasswordInfo()
3720 lm_pass
= samr
.Password()
3721 lm_pass
.hash = [0] * 16
3723 nt_pass
= samr
.Password()
3724 nt_pass
.hash = list(creds
.get_nt_hash())
3726 logon
.lmpassword
= lm_pass
3727 logon
.ntpassword
= nt_pass
3729 domain_joined_mach_creds
.encrypt_netr_PasswordInfo(info
=logon
,
3730 auth_type
=auth_type
,
3731 auth_level
=auth_level
)
3733 elif logon_type
== netlogon
.NetlogonNetworkInformation
:
3734 computername
= ntlmssp
.AV_PAIR()
3735 computername
.AvId
= ntlmssp
.MsvAvNbComputerName
3736 computername
.Value
= workstation
3738 domainname
= ntlmssp
.AV_PAIR()
3739 domainname
.AvId
= ntlmssp
.MsvAvNbDomainName
3740 domainname
.Value
= domain
3742 eol
= ntlmssp
.AV_PAIR()
3743 eol
.AvId
= ntlmssp
.MsvAvEOL
3745 target_info
= ntlmssp
.AV_PAIR_LIST()
3746 target_info
.count
= 3
3747 target_info
.pair
= [domainname
, computername
, eol
]
3749 target_info_blob
= ndr_pack(target_info
)
3751 challenge
= b
'abcdefgh'
3752 response
= creds
.get_ntlm_response(flags
=0,
3753 challenge
=challenge
,
3754 target_info
=target_info_blob
)
3756 logon
= netlogon
.netr_NetworkInfo()
3758 logon
.challenge
= list(challenge
)
3759 logon
.nt
= netlogon
.netr_ChallengeResponse()
3760 logon
.nt
.length
= len(response
['nt_response'])
3761 logon
.nt
.data
= list(response
['nt_response'])
3764 self
.fail(f
'unknown logon type {logon_type}')
3766 identity_info
= netlogon
.netr_IdentityInfo()
3767 identity_info
.domain_name
.string
= domain
3768 identity_info
.account_name
.string
= username
3769 identity_info
.parameter_control
= (
3770 netlogon
.MSV1_0_ALLOW_SERVER_TRUST_ACCOUNT
) |
(
3771 netlogon
.MSV1_0_ALLOW_WORKSTATION_TRUST_ACCOUNT
)
3772 identity_info
.workstation
.string
= workstation
3774 logon
.identity_info
= identity_info
3780 if not expect_error
and not self
.expect_nt_hash
:
3781 expect_error
= ntstatus
.NT_STATUS_NTLM_BLOCKED
3784 (validation
, authoritative
, flags
) = (
3785 conn
.netr_LogonSamLogonEx(dc_server
,
3786 domain_joined_mach_creds
.get_workstation(),
3791 except NTSTATUSError
as err
:
3792 status
, _
= err
.args
3793 self
.assertIsNotNone(expect_error
,
3794 f
'unexpectedly failed with {status:08X}')
3795 self
.assertEqual(expect_error
, status
, 'got wrong status code')
3797 self
.assertIsNone(expect_error
, 'expected error')
3799 self
.assertEqual(1, authoritative
)
3800 self
.assertEqual(0, flags
)
3804 def check_ticket_times(self
,
3807 expected_renew_life
=None,
3809 ticket
= ticket_creds
.ticket_private
3811 authtime
= ticket
['authtime']
3812 starttime
= ticket
.get('starttime', authtime
)
3813 endtime
= ticket
['endtime']
3814 renew_till
= ticket
.get('renew-till', None)
3816 starttime
= self
.get_EpochFromKerberosTime(starttime
)
3818 if expected_life
is not None:
3819 actual_end
= self
.get_EpochFromKerberosTime(
3820 endtime
.decode('ascii'))
3821 actual_lifetime
= actual_end
- starttime
3823 self
.assertAlmostEqual(expected_life
, actual_lifetime
, delta
=delta
)
3825 if renew_till
is None:
3826 self
.assertIsNone(expected_renew_life
)
3828 if expected_renew_life
is not None:
3829 actual_renew_till
= self
.get_EpochFromKerberosTime(
3830 renew_till
.decode('ascii'))
3831 actual_renew_life
= actual_renew_till
- starttime
3833 self
.assertAlmostEqual(expected_renew_life
, actual_renew_life
, delta
=delta
)