1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 sys
.path
.insert(0, "bin/python")
23 os
.environ
["PYTHONUNBUFFERED"] = "1"
30 from collections
import namedtuple
31 from datetime
import datetime
, timezone
33 from functools
import partial
34 from typing
import Dict
, Optional
37 from ldb
import SCOPE_BASE
43 generate_random_password
,
49 from samba
.auth
import system_session
50 from samba
.credentials
import (
56 from samba
.crypto
import des_crypt_blob_16
, md4_hash_blob
57 from samba
.lsa_utils
import OpenPolicyFallback
, CreateTrustedDomainFallback
58 from samba
.dcerpc
import (
72 from samba
.dcerpc
.misc
import (
79 from samba
.domain
.models
import AuthenticationPolicy
, AuthenticationSilo
80 from samba
.drs_utils
import drs_Replicate
, drsuapi_connect
81 from samba
.dsdb
import (
82 DS_DOMAIN_FUNCTION_2000
,
83 DS_DOMAIN_FUNCTION_2008
,
84 DS_GUID_COMPUTERS_CONTAINER
,
85 DS_GUID_DOMAIN_CONTROLLERS_CONTAINER
,
86 DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
,
87 DS_GUID_USERS_CONTAINER
,
88 DSDB_SYNTAX_BINARY_DN
,
89 GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
,
90 GTYPE_SECURITY_GLOBAL_GROUP
,
91 GTYPE_SECURITY_UNIVERSAL_GROUP
,
93 UF_NO_AUTH_DATA_REQUIRED
,
96 UF_PARTIAL_SECRETS_ACCOUNT
,
97 UF_SERVER_TRUST_ACCOUNT
,
98 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
,
99 UF_WORKSTATION_TRUST_ACCOUNT
,
100 UF_SMARTCARD_REQUIRED
,
101 UF_INTERDOMAIN_TRUST_ACCOUNT
,
103 from samba
.join
import DCJoinContext
104 from samba
.ndr
import ndr_pack
, ndr_unpack
105 from samba
.param
import LoadParm
106 from samba
.samdb
import SamDB
, dsdb_Dn
108 rc4_bit
= security
.KERB_ENCTYPE_RC4_HMAC_MD5
109 aes256_sk_bit
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
111 import samba
.tests
.krb5
.kcrypto
as kcrypto
112 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
113 from samba
.tests
import TestCaseInTempDir
, delete_force
114 from samba
.tests
.krb5
.raw_testcase
import (
119 from samba
.tests
.krb5
.rfc4120_constants
import (
122 AES256_CTS_HMAC_SHA1_96
,
124 KDC_ERR_PREAUTH_REQUIRED
,
130 KU_ENC_CHALLENGE_CLIENT
,
135 PADATA_ENC_TIMESTAMP
,
136 PADATA_ENCRYPTED_CHALLENGE
,
140 global_asn1_print
= False
141 global_hexdump
= False
144 class GroupType(Enum
):
145 GLOBAL
= GTYPE_SECURITY_GLOBAL_GROUP
146 DOMAIN_LOCAL
= GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
147 UNIVERSAL
= GTYPE_SECURITY_UNIVERSAL_GROUP
150 # This simple class encapsulates the DN and SID of a Principal.
152 __slots__
= ['dn', 'sid']
154 def __init__(self
, dn
, sid
):
155 if dn
is not None and not isinstance(dn
, ldb
.Dn
):
156 raise AssertionError(f
'expected {dn} to be an ldb.Dn')
162 class KDCBaseTest(TestCaseInTempDir
, RawKerberosTest
):
163 """ Base class for KDC tests.
166 class AccountType(Enum
):
171 MANAGED_SERVICE
= object()
172 GROUP_MANAGED_SERVICE
= object()
183 cls
._drsuapi
_connection
= None
184 cls
._lsarpc
_connection
= None
186 cls
._functional
_level
= None
188 # An identifier to ensure created accounts have unique names. Windows
189 # caches accounts based on usernames, so account names being different
190 # across test runs avoids previous test runs affecting the results.
191 cls
.account_base
= f
'{secrets.token_hex(4)}_'
194 # A list containing DNs of accounts created as part of testing.
197 # A list of tdo_handles of trusts created as part of testing.
200 cls
.account_cache
= {}
201 cls
.policy_cache
= {}
206 cls
.ldb_cleanups
= []
208 cls
._claim
_types
_dn
= None
209 cls
._authn
_policy
_config
_dn
= None
210 cls
._authn
_policies
_dn
= None
211 cls
._authn
_silos
_dn
= None
213 def get_claim_types_dn(self
):
214 samdb
= self
.get_samdb()
216 if self
._claim
_types
_dn
is None:
217 claim_config_dn
= samdb
.get_config_basedn()
219 claim_config_dn
.add_child('CN=Claims Configuration,CN=Services')
221 'dn': claim_config_dn
,
222 'objectClass': 'container',
226 except ldb
.LdbError
as err
:
228 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
231 self
.accounts
.append(str(claim_config_dn
))
233 claim_types_dn
= claim_config_dn
234 claim_types_dn
.add_child('CN=Claim Types')
236 'dn': claim_types_dn
,
237 'objectClass': 'msDS-ClaimTypes',
241 except ldb
.LdbError
as err
:
243 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
246 self
.accounts
.append(str(claim_types_dn
))
248 type(self
)._claim
_types
_dn
= claim_types_dn
250 # Return a copy of the DN.
251 return ldb
.Dn(samdb
, str(self
._claim
_types
_dn
))
253 def get_authn_policy_config_dn(self
):
254 samdb
= self
.get_samdb()
256 if self
._authn
_policy
_config
_dn
is None:
257 authn_policy_config_dn
= samdb
.get_config_basedn()
259 authn_policy_config_dn
.add_child(
260 'CN=AuthN Policy Configuration,CN=Services')
262 'dn': authn_policy_config_dn
,
263 'objectClass': 'container',
264 'description': ('Contains configuration for authentication '
269 except ldb
.LdbError
as err
:
271 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
274 self
.accounts
.append(str(authn_policy_config_dn
))
276 type(self
)._authn
_policy
_config
_dn
= authn_policy_config_dn
278 # Return a copy of the DN.
279 return ldb
.Dn(samdb
, str(self
._authn
_policy
_config
_dn
))
281 def get_authn_policies_dn(self
):
282 samdb
= self
.get_samdb()
284 if self
._authn
_policies
_dn
is None:
285 authn_policies_dn
= self
.get_authn_policy_config_dn()
286 authn_policies_dn
.add_child('CN=AuthN Policies')
288 'dn': authn_policies_dn
,
289 'objectClass': 'msDS-AuthNPolicies',
290 'description': 'Contains authentication policy objects',
294 except ldb
.LdbError
as err
:
296 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
299 self
.accounts
.append(str(authn_policies_dn
))
301 type(self
)._authn
_policies
_dn
= authn_policies_dn
303 # Return a copy of the DN.
304 return ldb
.Dn(samdb
, str(self
._authn
_policies
_dn
))
306 def get_authn_silos_dn(self
):
307 samdb
= self
.get_samdb()
309 if self
._authn
_silos
_dn
is None:
310 authn_silos_dn
= self
.get_authn_policy_config_dn()
311 authn_silos_dn
.add_child('CN=AuthN Silos')
313 'dn': authn_silos_dn
,
314 'objectClass': 'msDS-AuthNPolicySilos',
315 'description': 'Contains authentication policy silo objects',
319 except ldb
.LdbError
as err
:
321 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
324 self
.accounts
.append(str(authn_silos_dn
))
326 type(self
)._authn
_silos
_dn
= authn_silos_dn
328 # Return a copy of the DN.
329 return ldb
.Dn(samdb
, str(self
._authn
_silos
_dn
))
333 return frozenset((k
, v
) for k
, v
in m
.items())
336 # Run any cleanups that may modify accounts prior to deleting those
340 # Clean up any accounts created for single tests.
341 if self
._ldb
is not None:
342 for dn
in reversed(self
.test_accounts
):
343 delete_force(self
._ldb
, dn
)
345 if self
._test
_rodc
_ctx
is not None:
346 self
._test
_rodc
_ctx
.cleanup_old_join(force
=True)
348 # Clean up any trusts created for single tests.
349 if self
._lsarpc
_connection
is not None:
350 lsa_conn
, _
, _
, _
, _
= self
._lsarpc
_connection
351 for tdo_handle
in reversed(self
.test_trusts
):
352 lsa_conn
.DeleteObject(tdo_handle
)
357 def tearDownClass(cls
):
358 # Clean up any accounts created by create_account. This is
359 # done in tearDownClass() rather than tearDown(), so that
360 # accounts need only be created once for permutation tests.
361 if cls
._ldb
is not None:
362 for cleanup
in reversed(cls
.ldb_cleanups
):
364 cls
._ldb
.modify(cleanup
)
368 for dn
in reversed(cls
.accounts
):
369 delete_force(cls
._ldb
, dn
)
371 # Clean up any trusts created by create_trust. This is
372 # done in tearDownClass() rather than tearDown(), so that
373 # trust accounts need only be created once for permutation tests.
374 if cls
._lsarpc
_connection
is not None:
375 lsa_conn
, _
, _
, _
, _
= cls
._lsarpc
_connection
376 for tdo_handle
in reversed(cls
.trusts
):
377 lsa_conn
.DeleteObject(tdo_handle
)
379 if cls
._rodc
_ctx
is not None:
380 cls
._rodc
_ctx
.cleanup_old_join(force
=True)
382 super().tearDownClass()
386 self
.do_asn1_print
= global_asn1_print
387 self
.do_hexdump
= global_hexdump
389 # A list containing DNs of accounts that should be removed when the
390 # current test finishes.
391 self
.test_accounts
= []
392 self
._test
_rodc
_ctx
= None
394 # A list containing tdo_handles of trusts that should be removed when the
395 # current test finishes.
396 self
.test_trusts
= []
398 def get_lp(self
) -> LoadParm
:
400 type(self
)._lp
= self
.get_loadparm()
404 def get_samdb(self
) -> SamDB
:
405 if self
._ldb
is None:
406 creds
= self
.get_admin_creds()
409 session
= system_session()
410 type(self
)._ldb
= SamDB(url
="ldap://%s" % self
.dc_host
,
411 session_info
=session
,
417 def get_rodc_samdb(self
) -> SamDB
:
418 if self
._rodc
_ldb
is None:
419 creds
= self
.get_admin_creds()
422 session
= system_session()
423 type(self
)._rodc
_ldb
= SamDB(url
="ldap://%s" % self
.host
,
424 session_info
=session
,
429 return self
._rodc
_ldb
431 def get_drsuapi_connection(self
):
432 if self
._drsuapi
_connection
is None:
433 admin_creds
= self
.get_admin_creds()
434 samdb
= self
.get_samdb()
435 dns_hostname
= samdb
.host_dns_name()
436 type(self
)._drsuapi
_connection
= drsuapi_connect(dns_hostname
,
441 return self
._drsuapi
_connection
443 def get_lsarpc_connection(self
):
444 def get_lsa_info(conn
, policy_access
):
446 in_revision_info1
= lsa
.revision_info1()
447 in_revision_info1
.revision
= 1
448 in_revision_info1
.supported_features
= (
449 lsa
.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER
452 out_version
, out_revision_info1
, policy
= OpenPolicyFallback(
457 access_mask
=policy_access
460 info
= conn
.QueryInfoPolicy2(policy
, lsa
.LSA_POLICY_INFO_DNS
)
462 return (policy
, out_version
, out_revision_info1
, info
)
464 def lsarpc_connect(server
, lp
, creds
, ip
=None):
466 if lp
.log_level() >= 9:
467 binding_options
+= ",print"
469 # Allow forcing the IP
471 binding_options
+= f
",target_hostname={server}"
472 binding_string
= f
"ncacn_np:{ip}[{binding_options}]"
474 binding_string
= "ncacn_np:%s[%s]" % (server
, binding_options
)
477 conn
= lsa
.lsarpc(binding_string
, lp
, creds
)
478 policy_access
= lsa
.LSA_POLICY_VIEW_LOCAL_INFORMATION
479 policy_access |
= lsa
.LSA_POLICY_TRUST_ADMIN
480 policy_access |
= lsa
.LSA_POLICY_CREATE_SECRET
481 (policy
, out_version
, out_revision_info1
, info
) = \
482 get_lsa_info(conn
, policy_access
)
483 except Exception as e
:
484 raise RuntimeError("LSARPC connection to %s failed: %s" % (server
, e
))
486 return (conn
, policy
, out_version
, out_revision_info1
, info
)
488 if self
._lsarpc
_connection
is None:
489 admin_creds
= self
.get_admin_creds()
490 samdb
= self
.get_samdb()
491 dns_hostname
= samdb
.host_dns_name()
492 type(self
)._lsarpc
_connection
= lsarpc_connect(dns_hostname
,
497 return self
._lsarpc
_connection
499 def get_server_dn(self
, samdb
):
500 server
= samdb
.get_serverName()
502 res
= samdb
.search(base
=server
,
503 scope
=ldb
.SCOPE_BASE
,
504 attrs
=['serverReference'])
505 dn
= ldb
.Dn(samdb
, res
[0]['serverReference'][0].decode('utf8'))
509 def get_mock_rodc_ctx(self
, preserve
=True):
511 rodc_ctx
= self
._rodc
_ctx
513 rodc_ctx
= self
._test
_rodc
_ctx
516 admin_creds
= self
.get_admin_creds()
519 rodc_name
= self
.get_new_username()
520 site_name
= 'Default-First-Site-Name'
522 rodc_ctx
= DCJoinContext(server
=self
.dc_host
,
526 netbios_name
=rodc_name
,
529 self
.create_rodc(rodc_ctx
)
532 # Mark this rodc for deletion in tearDownClass() after all the
533 # tests in this class finish.
534 type(self
)._rodc
_ctx
= rodc_ctx
536 # Mark this rodc for deletion in tearDown() after the current
538 self
._test
_rodc
_ctx
= rodc_ctx
542 def get_domain_functional_level(self
, ldb
=None):
543 if self
._functional
_level
is None:
545 ldb
= self
.get_samdb()
547 res
= ldb
.search(base
='',
549 attrs
=['domainFunctionality'])
551 functional_level
= int(res
[0]['domainFunctionality'][0])
553 functional_level
= DS_DOMAIN_FUNCTION_2000
555 type(self
)._functional
_level
= functional_level
557 return self
._functional
_level
559 def get_default_enctypes(self
, creds
):
560 self
.assertIsNotNone(creds
, 'expected client creds to be passed in')
562 functional_level
= self
.get_domain_functional_level()
564 default_enctypes
= []
566 if functional_level
>= DS_DOMAIN_FUNCTION_2008
:
567 # AES is only supported at functional level 2008 or higher
568 default_enctypes
.append(kcrypto
.Enctype
.AES256
)
569 default_enctypes
.append(kcrypto
.Enctype
.AES128
)
571 if self
.expect_nt_hash
or creds
.get_workstation():
572 default_enctypes
.append(kcrypto
.Enctype
.RC4
)
574 return default_enctypes
576 def create_group(self
, samdb
, name
, ou
=None, gtype
=None):
578 ou
= samdb
.get_wellknown_dn(samdb
.get_default_basedn(),
579 DS_GUID_USERS_CONTAINER
)
581 dn
= f
'CN={name},{ou}'
583 # Remove the group if it exists; this will happen if a previous test
585 delete_force(samdb
, dn
)
587 # Save the group name so it can be deleted in tearDownClass.
588 self
.accounts
.append(dn
)
592 'objectClass': 'group'
594 if gtype
is not None:
595 details
['groupType'] = common
.normalise_int32(gtype
)
600 def get_dn_from_attribute(self
, attribute
):
601 return self
.get_from_attribute(attribute
).dn
603 def get_dn_from_class(self
, attribute
):
604 return self
.get_from_class(attribute
).dn
606 def get_schema_id_guid_from_attribute(self
, attribute
):
607 guid
= self
.get_from_attribute(attribute
).get('schemaIDGUID', idx
=0)
608 return misc
.GUID(guid
)
610 def get_from_attribute(self
, attribute
):
611 return self
.get_from_schema(attribute
, 'attributeSchema')
613 def get_from_class(self
, attribute
):
614 return self
.get_from_schema(attribute
, 'classSchema')
616 def get_from_schema(self
, name
, object_class
):
617 samdb
= self
.get_samdb()
618 schema_dn
= samdb
.get_schema_basedn()
620 res
= samdb
.search(base
=schema_dn
,
621 scope
=ldb
.SCOPE_ONELEVEL
,
622 attrs
=['schemaIDGUID'],
623 expression
=(f
'(&(objectClass={object_class})'
624 f
'(lDAPDisplayName={name}))'))
625 self
.assertEqual(1, len(res
),
626 f
'could not locate {name} in {object_class}')
630 def create_authn_silo(self
, *,
633 computer_policy
=None,
636 samdb
= self
.get_samdb()
638 silo_id
= self
.get_new_username()
640 authn_silo_dn
= self
.get_authn_silos_dn()
641 authn_silo_dn
.add_child(f
'CN={silo_id}')
645 'objectClass': 'msDS-AuthNPolicySilo',
650 elif enforced
is False:
653 if members
is not None:
654 details
['msDS-AuthNPolicySiloMembers'] = members
655 if user_policy
is not None:
656 details
['msDS-UserAuthNPolicy'] = str(user_policy
.dn
)
657 if computer_policy
is not None:
658 details
['msDS-ComputerAuthNPolicy'] = str(computer_policy
.dn
)
659 if service_policy
is not None:
660 details
['msDS-ServiceAuthNPolicy'] = str(service_policy
.dn
)
661 if enforced
is not None:
662 details
['msDS-AuthNPolicySiloEnforced'] = enforced
664 # Save the silo DN so it can be deleted in tearDownClass().
665 self
.accounts
.append(str(authn_silo_dn
))
667 # Remove the silo if it exists; this will happen if a previous test run
669 delete_force(samdb
, authn_silo_dn
)
673 return AuthenticationSilo
.get(samdb
, dn
=authn_silo_dn
)
675 def create_authn_silo_claim_id(self
):
676 claim_id
= 'ad://ext/AuthenticationSilo'
679 'msDS-GroupManagedServiceAccount',
681 'msDS-ManagedServiceAccount',
685 self
.create_claim(claim_id
,
688 value_space_restricted
=False,
689 source_type
='Constructed',
690 for_classes
=for_classes
,
691 value_type
=claims
.CLAIM_TYPE_STRING
,
692 # It's OK if the claim type already exists.
697 def create_authn_policy(self
, *,
702 cache_key
= self
.freeze(kwargs
)
704 authn_policy
= self
.policy_cache
.get(cache_key
)
705 if authn_policy
is not None:
708 authn_policy
= self
.create_authn_policy_opts(**kwargs
)
710 self
.policy_cache
[cache_key
] = authn_policy
714 def create_authn_policy_opts(self
, *,
716 strong_ntlm_policy
=None,
717 user_allowed_from
=None,
718 user_allowed_ntlm
=None,
719 user_allowed_to
=None,
720 user_tgt_lifetime
=None,
721 computer_allowed_to
=None,
722 computer_tgt_lifetime
=None,
723 service_allowed_from
=None,
724 service_allowed_ntlm
=None,
725 service_allowed_to
=None,
726 service_tgt_lifetime
=None):
727 samdb
= self
.get_samdb()
729 policy_id
= self
.get_new_username()
731 policy_dn
= self
.get_authn_policies_dn()
732 policy_dn
.add_child(f
'CN={policy_id}')
736 'objectClass': 'msDS-AuthNPolicy',
741 def sd_from_sddl(sddl
):
743 if _domain_sid
is None:
744 _domain_sid
= security
.dom_sid(samdb
.get_domain_sid())
746 return ndr_pack(security
.descriptor
.from_sddl(sddl
, _domain_sid
))
750 elif enforced
is False:
753 if user_allowed_ntlm
is True:
754 user_allowed_ntlm
= 'TRUE'
755 elif user_allowed_ntlm
is False:
756 user_allowed_ntlm
= 'FALSE'
758 if service_allowed_ntlm
is True:
759 service_allowed_ntlm
= 'TRUE'
760 elif service_allowed_ntlm
is False:
761 service_allowed_ntlm
= 'FALSE'
763 if enforced
is not None:
764 details
['msDS-AuthNPolicyEnforced'] = enforced
765 if strong_ntlm_policy
is not None:
766 details
['msDS-StrongNTLMPolicy'] = strong_ntlm_policy
768 if user_allowed_from
is not None:
769 details
['msDS-UserAllowedToAuthenticateFrom'] = sd_from_sddl(
771 if user_allowed_ntlm
is not None:
772 details
['msDS-UserAllowedNTLMNetworkAuthentication'] = (
774 if user_allowed_to
is not None:
775 details
['msDS-UserAllowedToAuthenticateTo'] = sd_from_sddl(
777 if user_tgt_lifetime
is not None:
778 if isinstance(user_tgt_lifetime
, numbers
.Number
):
779 user_tgt_lifetime
= str(int(user_tgt_lifetime
* 10_000_000))
780 details
['msDS-UserTGTLifetime'] = user_tgt_lifetime
782 if computer_allowed_to
is not None:
783 details
['msDS-ComputerAllowedToAuthenticateTo'] = sd_from_sddl(
785 if computer_tgt_lifetime
is not None:
786 if isinstance(computer_tgt_lifetime
, numbers
.Number
):
787 computer_tgt_lifetime
= str(
788 int(computer_tgt_lifetime
* 10_000_000))
789 details
['msDS-ComputerTGTLifetime'] = computer_tgt_lifetime
791 if service_allowed_from
is not None:
792 details
['msDS-ServiceAllowedToAuthenticateFrom'] = sd_from_sddl(
793 service_allowed_from
)
794 if service_allowed_ntlm
is not None:
795 details
['msDS-ServiceAllowedNTLMNetworkAuthentication'] = (
796 service_allowed_ntlm
)
797 if service_allowed_to
is not None:
798 details
['msDS-ServiceAllowedToAuthenticateTo'] = sd_from_sddl(
800 if service_tgt_lifetime
is not None:
801 if isinstance(service_tgt_lifetime
, numbers
.Number
):
802 service_tgt_lifetime
= str(
803 int(service_tgt_lifetime
* 10_000_000))
804 details
['msDS-ServiceTGTLifetime'] = service_tgt_lifetime
806 # Save the policy DN so it can be deleted in tearDownClass().
807 self
.accounts
.append(str(policy_dn
))
809 # Remove the policy if it exists; this will happen if a previous test
811 delete_force(samdb
, policy_dn
)
815 return AuthenticationPolicy
.get(samdb
, dn
=policy_dn
)
817 def create_claim(self
,
822 value_space_restricted
=None,
828 samdb
= self
.get_samdb()
830 claim_dn
= self
.get_claim_types_dn()
831 claim_dn
.add_child(f
'CN={claim_id}')
835 'objectClass': 'msDS-ClaimType',
840 elif enabled
is False:
843 if attribute
is not None:
844 attribute
= str(self
.get_dn_from_attribute(attribute
))
846 if single_valued
is True:
847 single_valued
= 'TRUE'
848 elif single_valued
is False:
849 single_valued
= 'FALSE'
851 if value_space_restricted
is True:
852 value_space_restricted
= 'TRUE'
853 elif value_space_restricted
is False:
854 value_space_restricted
= 'FALSE'
856 if for_classes
is not None:
857 for_classes
= [str(self
.get_dn_from_class(name
))
858 for name
in for_classes
]
860 if isinstance(value_type
, int):
861 value_type
= str(value_type
)
863 if enabled
is not None:
864 details
['Enabled'] = enabled
865 if attribute
is not None:
866 details
['msDS-ClaimAttributeSource'] = attribute
867 if single_valued
is not None:
868 details
['msDS-ClaimIsSingleValued'] = single_valued
869 if value_space_restricted
is not None:
870 details
['msDS-ClaimIsValueSpaceRestricted'] = (
871 value_space_restricted
)
872 if source
is not None:
873 details
['msDS-ClaimSource'] = source
874 if source_type
is not None:
875 details
['msDS-ClaimSourceType'] = source_type
876 if for_classes
is not None:
877 details
['msDS-ClaimTypeAppliesToClass'] = for_classes
878 if value_type
is not None:
879 details
['msDS-ClaimValueType'] = value_type
882 # Remove the claim if it exists; this will happen if a previous
884 delete_force(samdb
, claim_dn
)
888 except ldb
.LdbError
as err
:
890 if num
!= ldb
.ERR_ENTRY_ALREADY_EXISTS
:
892 self
.assertFalse(force
, 'should not fail with force=True')
894 # Save the claim DN so it can be deleted in tearDownClass()
895 self
.accounts
.append(str(claim_dn
))
897 def create_trust(self
, trust_info
,
898 trust_enc_types
=None,
899 trust_incoming_password
=None,
900 trust_outgoing_password
=None,
903 """Create an trust account for testing.
904 The handle of the created trust is added to cls.trusts,
905 which is used by tearDownClass to clean up the created trusts.
906 With preserve=False the handle is added to self.test_trusts,
907 which is used by tearDown to clean up the created trusts.
910 if trust_incoming_password
is None:
911 trust_incoming_password
= generate_random_password(120, 120)
912 trust_incoming_secret
= list(trust_incoming_password
.encode('utf-16-le'))
913 if trust_outgoing_password
is None:
914 trust_outgoing_password
= generate_random_password(120, 120)
915 trust_outgoing_secret
= list(trust_outgoing_password
.encode('utf-16-le'))
917 def generate_AuthInOutBlob(secret
, update_time
):
919 blob
= drsblobs
.trustAuthInOutBlob()
924 clear
= drsblobs
.AuthInfoClear()
925 clear
.size
= len(secret
)
926 clear
.password
= secret
928 info
= drsblobs
.AuthenticationInformation()
929 info
.LastUpdateTime
= unix2nttime(update_time
)
930 info
.AuthType
= lsa
.TRUST_AUTH_TYPE_CLEAR
931 info
.AuthInfo
= clear
933 array
= drsblobs
.AuthenticationInformationArray()
937 blob
= drsblobs
.trustAuthInOutBlob()
943 update_time
= current_unix_time()
944 trust_incoming_blob
= generate_AuthInOutBlob(trust_incoming_secret
,
946 trust_outgoing_blob
= generate_AuthInOutBlob(trust_outgoing_secret
,
949 lsa_conn
, lsa_policy
, lsa_version
, lsa_revision_info1
, local_info
= \
950 self
.get_lsarpc_connection()
953 tdo_handle
= CreateTrustedDomainFallback(lsa_conn
,
956 lsa
.LSA_TRUSTED_DOMAIN_ALL_ACCESS |
957 security
.SEC_STD_DELETE
,
962 except NTSTATUSError
as err
:
964 self
.assertIsNotNone(expect_error
,
965 f
'unexpectedly failed with {status:08X}')
966 self
.assertEqual(expect_error
, status
, 'got wrong status code')
967 return (None, None, None, None)
968 self
.assertIsNone(expect_error
, 'expected error')
970 # Mark this trust for deletion in tearDownClass() after all the
971 # tests in this class finish.
972 self
.trusts
.append(tdo_handle
)
974 # Mark this trust for deletion in tearDown() after the current
976 self
.test_trusts
.append(tdo_handle
)
978 lsa_conn
.SetInformationTrustedDomain(tdo_handle
,
979 lsa
.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES
,
982 samdb
= self
.get_samdb()
984 incoming_account_name
= trust_info
.netbios_name
.string
985 incoming_account_name
+= '$'
986 incoming_nbt_domain
= local_info
.name
.string
987 incoming_dns_domain
= local_info
.dns_domain
.string
989 outgoing_account_name
= local_info
.name
.string
990 outgoing_account_name
+= '$'
991 outgoing_nbt_domain
= trust_info
.netbios_name
.string
992 outgoing_dns_domain
= trust_info
.domain_name
.string
994 tdo_search_filter
= "(&(objectClass=trustedDomain)(name=%s))" % (
996 tdo_res
= samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
997 expression
=tdo_search_filter
,
998 attrs
=['msDS-TrustForestTrustInfo'])
999 self
.assertEqual(len(tdo_res
), 1)
1000 tdo_dn
= tdo_res
[0].dn
1002 acct_search_filter
= "(&(objectClass=user)(sAMAccountName=%s))" % (
1003 incoming_account_name
)
1004 acct_res
= samdb
.search(scope
=ldb
.SCOPE_SUBTREE
,
1005 expression
=acct_search_filter
,
1006 attrs
=['msDS-KeyVersionNumber',
1009 self
.assertEqual(len(acct_res
), 1)
1010 acct_dn
= acct_res
[0].dn
1011 acct_kvno
= int(acct_res
[0]['msDS-KeyVersionNumber'][0])
1012 acct_sid
= acct_res
[0].get('objectSid', idx
=0)
1013 acct_sid
= samdb
.schema_format_value('objectSID', acct_sid
)
1014 acct_sid
= acct_sid
.decode('utf-8')
1015 acct_guid
= acct_res
[0].get('objectGUID', idx
=0)
1016 acct_guid
= samdb
.schema_format_value('objectGUID', acct_guid
)
1017 acct_guid
= acct_guid
.decode('utf-8')
1019 trust_incoming_salt
= "%skrbtgt%s" % (
1020 incoming_dns_domain
.upper(),
1021 outgoing_dns_domain
.upper())
1022 trust_outgoing_salt
= "%skrbtgt%s" % (
1023 outgoing_dns_domain
.upper(),
1024 incoming_dns_domain
.upper())
1025 trust_account_salt
= "%skrbtgt%s" % (
1026 incoming_dns_domain
.upper(),
1027 outgoing_nbt_domain
.upper())
1029 if trust_info
.trust_type
!= lsa
.LSA_TRUST_TYPE_DOWNLEVEL
:
1030 secure_channel_type
= SEC_CHAN_DNS_DOMAIN
1032 secure_channel_type
= SEC_CHAN_DOMAIN
1034 incoming_creds
= KerberosCredentials()
1035 incoming_creds
.guess(self
.get_lp())
1036 incoming_creds
.set_realm(incoming_dns_domain
.upper())
1037 incoming_creds
.set_domain(incoming_nbt_domain
.upper())
1038 incoming_creds
.set_forced_salt(trust_incoming_salt
.encode('utf-8'))
1039 incoming_creds
.set_password(trust_incoming_password
)
1040 incoming_creds
.set_username(incoming_account_name
)
1041 incoming_creds
.set_workstation('')
1042 incoming_creds
.set_secure_channel_type(secure_channel_type
)
1043 incoming_creds
.set_dn(tdo_dn
)
1044 incoming_creds
.set_type(self
.AccountType
.TRUST
)
1045 incoming_creds
.set_user_account_control(UF_INTERDOMAIN_TRUST_ACCOUNT
)
1046 self
.creds_set_enctypes(incoming_creds
)
1048 outgoing_creds
= KerberosCredentials()
1049 outgoing_creds
.guess(self
.get_lp())
1050 outgoing_creds
.set_realm(outgoing_dns_domain
.upper())
1051 outgoing_creds
.set_domain(outgoing_nbt_domain
.upper())
1052 outgoing_creds
.set_forced_salt(trust_outgoing_salt
.encode('utf-8'))
1053 outgoing_creds
.set_password(trust_outgoing_password
)
1054 outgoing_creds
.set_username(outgoing_account_name
)
1055 outgoing_creds
.set_workstation('')
1056 outgoing_creds
.set_secure_channel_type(secure_channel_type
)
1057 outgoing_creds
.set_dn(tdo_dn
)
1058 outgoing_creds
.set_type(self
.AccountType
.TRUST
)
1059 outgoing_creds
.set_user_account_control(UF_INTERDOMAIN_TRUST_ACCOUNT
)
1060 self
.creds_set_enctypes(outgoing_creds
)
1062 account_creds
= KerberosCredentials()
1063 account_creds
.guess(self
.get_lp())
1064 account_creds
.set_realm(incoming_dns_domain
.upper())
1065 account_creds
.set_domain(incoming_nbt_domain
.upper())
1066 account_creds
.set_forced_salt(trust_account_salt
.encode('utf-8'))
1067 account_creds
.set_password(trust_incoming_password
)
1068 account_creds
.set_username(incoming_account_name
)
1069 account_creds
.set_workstation('TEST-TRUST-DC')
1070 account_creds
.set_secure_channel_type(secure_channel_type
)
1071 account_creds
.set_dn(acct_dn
)
1072 account_creds
.set_type(self
.AccountType
.TRUST
)
1073 account_creds
.set_user_account_control(UF_INTERDOMAIN_TRUST_ACCOUNT
)
1074 account_creds
.set_kvno(acct_kvno
)
1075 account_creds
.set_sid(str(acct_sid
))
1076 account_creds
.set_guid(acct_guid
)
1077 if trust_enc_types
is not None:
1078 self
.creds_set_enctypes(account_creds
,
1079 extra_bits
=trust_enc_types
.enc_types
)
1081 self
.creds_set_enctypes(account_creds
)
1083 incoming_creds
.set_trust_outgoing_creds(outgoing_creds
)
1084 incoming_creds
.set_trust_account_creds(account_creds
)
1086 outgoing_creds
.set_trust_incoming_creds(incoming_creds
)
1087 outgoing_creds
.set_trust_account_creds(account_creds
)
1089 account_creds
.set_trust_incoming_creds(incoming_creds
)
1090 account_creds
.set_trust_outgoing_creds(outgoing_creds
)
1092 self
.remember_creds_for_keytab_export(incoming_creds
)
1093 self
.remember_creds_for_keytab_export(outgoing_creds
)
1094 self
.remember_creds_for_keytab_export(account_creds
)
1096 return (tdo_handle
, incoming_creds
, outgoing_creds
, account_creds
)
1098 def create_account(self
, samdb
, name
, account_type
=AccountType
.USER
,
1099 spn
=None, upn
=None, additional_details
=None,
1100 ou
=None, account_control
=0, add_dollar
=None,
1101 expired_password
=False, force_nt4_hash
=False,
1102 export_to_keytab
=True,
1104 """Create an account for testing.
1105 The dn of the created account is added to self.accounts,
1106 which is used by tearDownClass to clean up the created accounts.
1108 if add_dollar
is None and account_type
is not self
.AccountType
.USER
:
1112 if account_type
is self
.AccountType
.COMPUTER
:
1113 guid
= DS_GUID_COMPUTERS_CONTAINER
1114 elif account_type
is self
.AccountType
.MANAGED_SERVICE
or (
1115 account_type
is self
.AccountType
.GROUP_MANAGED_SERVICE
):
1116 guid
= DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
1117 elif account_type
is self
.AccountType
.SERVER
:
1118 guid
= DS_GUID_DOMAIN_CONTROLLERS_CONTAINER
1120 guid
= DS_GUID_USERS_CONTAINER
1122 ou
= samdb
.get_wellknown_dn(samdb
.get_default_basedn(), guid
)
1124 dn
= "CN=%s,%s" % (name
, ou
)
1126 # remove the account if it exists, this will happen if a previous test
1128 delete_force(samdb
, dn
)
1132 secure_schannel_type
= SEC_CHAN_NULL
1133 if account_type
is self
.AccountType
.USER
:
1134 object_class
= "user"
1135 account_control |
= UF_NORMAL_ACCOUNT
1136 elif account_type
is self
.AccountType
.MANAGED_SERVICE
:
1137 object_class
= "msDS-ManagedServiceAccount"
1138 account_control |
= UF_WORKSTATION_TRUST_ACCOUNT
1139 secure_schannel_type
= SEC_CHAN_WKSTA
1140 elif account_type
is self
.AccountType
.GROUP_MANAGED_SERVICE
:
1141 object_class
= "msDS-GroupManagedServiceAccount"
1142 account_control |
= UF_WORKSTATION_TRUST_ACCOUNT
1143 secure_schannel_type
= SEC_CHAN_WKSTA
1145 object_class
= "computer"
1146 if account_type
is self
.AccountType
.COMPUTER
:
1147 account_control |
= UF_WORKSTATION_TRUST_ACCOUNT
1148 secure_schannel_type
= SEC_CHAN_WKSTA
1149 elif account_type
is self
.AccountType
.SERVER
:
1150 account_control |
= UF_SERVER_TRUST_ACCOUNT
1151 secure_schannel_type
= SEC_CHAN_BDC
1157 "objectClass": object_class
,
1158 "sAMAccountName": account_name
,
1159 "userAccountControl": str(account_control
),
1162 if account_type
is self
.AccountType
.GROUP_MANAGED_SERVICE
:
1165 password
= generate_random_password(32, 32)
1166 utf16pw
= ('"%s"' % password
).encode('utf-16-le')
1168 details
['unicodePwd'] = utf16pw
1171 upn
= upn
.format(account
=account_name
)
1173 if isinstance(spn
, str):
1174 spn
= spn
.format(account
=account_name
)
1176 spn
= tuple(s
.format(account
=account_name
) for s
in spn
)
1177 details
["servicePrincipalName"] = spn
1179 details
["userPrincipalName"] = upn
1180 if expired_password
:
1181 details
["pwdLastSet"] = "0"
1182 if additional_details
is not None:
1183 details
.update(additional_details
)
1185 # Mark this account for deletion in tearDownClass() after all the
1186 # tests in this class finish.
1187 self
.accounts
.append(dn
)
1189 # Mark this account for deletion in tearDown() after the current
1190 # test finishes. Because the time complexity of deleting an account
1191 # in Samba scales with the number of accounts, it is faster to
1192 # delete accounts as soon as possible than to keep them around
1193 # until all the tests are finished.
1194 self
.test_accounts
.append(dn
)
1200 admin_creds
= self
.get_admin_creds()
1202 net_ctx
= net
.Net(admin_creds
, lp
, server
=self
.dc_host
)
1203 domain
= samdb
.domain_netbios_name().upper()
1205 password
= generate_random_password(32, 32)
1208 net_ctx
.set_password(newpassword
=password
,
1209 account_name
=account_name
,
1213 except Exception as e
:
1216 creds
= KerberosCredentials()
1217 creds
.guess(self
.get_lp())
1218 creds
.set_realm(samdb
.domain_dns_name().upper())
1219 creds
.set_domain(samdb
.domain_netbios_name().upper())
1220 if password
is not None:
1221 creds
.set_password(password
)
1222 creds
.set_username(account_name
)
1223 if account_type
is self
.AccountType
.USER
:
1224 creds
.set_workstation('')
1226 creds
.set_workstation(name
)
1227 creds
.set_secure_channel_type(secure_schannel_type
)
1228 creds
.set_dn(ldb
.Dn(samdb
, dn
))
1231 creds
.set_type(account_type
)
1232 creds
.set_user_account_control(account_control
)
1234 self
.creds_set_enctypes(creds
)
1236 res
= samdb
.search(base
=dn
,
1237 scope
=ldb
.SCOPE_BASE
,
1238 attrs
=['msDS-KeyVersionNumber',
1242 kvno
= res
[0].get('msDS-KeyVersionNumber', idx
=0)
1243 if kvno
is not None:
1244 self
.assertEqual(int(kvno
), expected_kvno
)
1245 creds
.set_kvno(expected_kvno
)
1247 sid
= res
[0].get('objectSid', idx
=0)
1248 sid
= samdb
.schema_format_value('objectSID', sid
)
1249 sid
= sid
.decode('utf-8')
1251 guid
= res
[0].get('objectGUID', idx
=0)
1252 guid
= samdb
.schema_format_value('objectGUID', guid
)
1253 guid
= guid
.decode('utf-8')
1254 creds
.set_guid(guid
)
1256 if export_to_keytab
:
1257 self
.remember_creds_for_keytab_export(creds
)
1261 def get_security_descriptor(self
, dn
):
1262 samdb
= self
.get_samdb()
1264 sid
= self
.get_objectSid(samdb
, dn
)
1266 owner_sid
= security
.dom_sid(security
.SID_BUILTIN_ADMINISTRATORS
)
1268 ace
= security
.ace()
1269 ace
.access_mask
= security
.SEC_ADS_CONTROL_ACCESS
1271 ace
.trustee
= security
.dom_sid(sid
)
1273 dacl
= security
.acl()
1274 dacl
.revision
= security
.SECURITY_ACL_REVISION_ADS
1278 security_desc
= security
.descriptor()
1279 security_desc
.type |
= security
.SEC_DESC_DACL_PRESENT
1280 security_desc
.owner_sid
= owner_sid
1281 security_desc
.dacl
= dacl
1283 return ndr_pack(security_desc
)
1285 def create_rodc(self
, ctx
):
1286 ctx
.nc_list
= [ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
1287 ctx
.full_nc_list
= [ctx
.base_dn
, ctx
.config_dn
, ctx
.schema_dn
]
1288 ctx
.krbtgt_dn
= f
'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
1290 ctx
.never_reveal_sid
= [f
'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
1291 f
'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
1292 f
'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
1293 f
'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
1294 f
'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
1295 ctx
.reveal_sid
= f
'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
1297 mysid
= ctx
.get_mysid()
1298 admin_dn
= f
'<SID={mysid}>'
1299 ctx
.managedby
= admin_dn
1301 ctx
.userAccountControl
= (UF_WORKSTATION_TRUST_ACCOUNT |
1302 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1303 UF_PARTIAL_SECRETS_ACCOUNT
)
1305 ctx
.connection_dn
= f
'CN=RODC Connection (FRS),{ctx.ntds_dn}'
1306 ctx
.secure_channel_type
= misc
.SEC_CHAN_RODC
1308 ctx
.replica_flags
= (drsuapi
.DRSUAPI_DRS_INIT_SYNC |
1309 drsuapi
.DRSUAPI_DRS_PER_SYNC |
1310 drsuapi
.DRSUAPI_DRS_GET_ANC |
1311 drsuapi
.DRSUAPI_DRS_NEVER_SYNCED |
1312 drsuapi
.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING
)
1313 ctx
.domain_replica_flags
= ctx
.replica_flags | drsuapi
.DRSUAPI_DRS_CRITICAL_ONLY
1315 ctx
.build_nc_lists()
1317 ctx
.cleanup_old_join()
1320 ctx
.join_add_objects()
1322 # cleanup the failed join (checking we still have a live LDB
1323 # connection to the remote DC first)
1324 ctx
.refresh_ldb_connection()
1325 ctx
.cleanup_old_join()
1328 def replicate_account_to_rodc(self
, dn
):
1329 samdb
= self
.get_samdb()
1330 rodc_samdb
= self
.get_rodc_samdb()
1332 repl_val
= f
'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
1335 msg
.dn
= ldb
.Dn(rodc_samdb
, '')
1336 msg
['replicateSingleObject'] = ldb
.MessageElement(
1338 ldb
.FLAG_MOD_REPLACE
,
1339 'replicateSingleObject')
1342 # Try replication using the replicateSingleObject rootDSE
1344 rodc_samdb
.modify(msg
)
1345 except ldb
.LdbError
as err
:
1346 enum
, estr
= err
.args
1347 self
.assertEqual(enum
, ldb
.ERR_UNWILLING_TO_PERFORM
)
1348 self
.assertIn('rootdse_modify: unknown attribute to change!',
1351 # If that method wasn't supported, we may be in the rodc:local test
1352 # environment, where we can try replicating to the local database.
1356 rodc_creds
= Credentials()
1357 rodc_creds
.guess(lp
)
1358 rodc_creds
.set_machine_account(lp
)
1360 local_samdb
= SamDB(url
=None, session_info
=system_session(),
1361 credentials
=rodc_creds
, lp
=lp
)
1363 destination_dsa_guid
= misc
.GUID(local_samdb
.get_ntds_GUID())
1365 repl
= drs_Replicate(f
'ncacn_ip_tcp:{self.dc_host}[seal]',
1367 local_samdb
, destination_dsa_guid
)
1369 source_dsa_invocation_id
= misc
.GUID(samdb
.invocation_id
)
1372 source_dsa_invocation_id
,
1373 destination_dsa_guid
,
1374 exop
=drsuapi
.DRSUAPI_EXOP_REPL_SECRET
,
1377 def reveal_account_to_mock_rodc(self
, dn
):
1378 samdb
= self
.get_samdb()
1379 rodc_ctx
= self
.get_mock_rodc_ctx()
1383 destination_dsa_guid
=rodc_ctx
.ntds_guid
,
1384 source_dsa_invocation_id
=misc
.GUID(samdb
.invocation_id
))
1386 def check_revealed(self
, dn
, rodc_dn
, revealed
=True):
1387 samdb
= self
.get_samdb()
1389 res
= samdb
.search(base
=rodc_dn
,
1390 scope
=ldb
.SCOPE_BASE
,
1391 attrs
=['msDS-RevealedUsers'])
1393 revealed_users
= res
[0].get('msDS-RevealedUsers')
1394 if revealed_users
is None:
1395 self
.assertFalse(revealed
)
1398 revealed_dns
= set(str(dsdb_Dn(samdb
, str(user
),
1399 syntax_oid
=DSDB_SYNTAX_BINARY_DN
).dn
)
1400 for user
in revealed_users
)
1403 self
.assertIn(str(dn
), revealed_dns
)
1405 self
.assertNotIn(str(dn
), revealed_dns
)
1407 def get_secrets(self
, dn
,
1408 destination_dsa_guid
,
1409 source_dsa_invocation_id
):
1410 bind
, handle
, _
= self
.get_drsuapi_connection()
1412 req
= drsuapi
.DsGetNCChangesRequest8()
1414 req
.destination_dsa_guid
= destination_dsa_guid
1415 req
.source_dsa_invocation_id
= source_dsa_invocation_id
1417 naming_context
= drsuapi
.DsReplicaObjectIdentifier()
1418 naming_context
.dn
= dn
1420 req
.naming_context
= naming_context
1422 hwm
= drsuapi
.DsReplicaHighWaterMark()
1423 hwm
.tmp_highest_usn
= 0
1424 hwm
.reserved_usn
= 0
1427 req
.highwatermark
= hwm
1428 req
.uptodateness_vector
= None
1430 req
.replica_flags
= 0
1432 req
.max_object_count
= 1
1433 req
.max_ndr_size
= 402116
1434 req
.extended_op
= drsuapi
.DRSUAPI_EXOP_REPL_SECRET
1436 attids
= [drsuapi
.DRSUAPI_ATTID_supplementalCredentials
,
1437 drsuapi
.DRSUAPI_ATTID_unicodePwd
,
1438 drsuapi
.DRSUAPI_ATTID_ntPwdHistory
]
1440 partial_attribute_set
= drsuapi
.DsPartialAttributeSet()
1441 partial_attribute_set
.version
= 1
1442 partial_attribute_set
.attids
= attids
1443 partial_attribute_set
.num_attids
= len(attids
)
1445 req
.partial_attribute_set
= partial_attribute_set
1447 req
.partial_attribute_set_ex
= None
1448 req
.mapping_ctr
.num_mappings
= 0
1449 req
.mapping_ctr
.mappings
= None
1451 _
, ctr
= bind
.DsGetNCChanges(handle
, 8, req
)
1453 self
.assertEqual(1, ctr
.object_count
)
1455 identifier
= ctr
.first_object
.object.identifier
1456 attributes
= ctr
.first_object
.object.attribute_ctr
.attributes
1458 self
.assertEqual(dn
, identifier
.dn
)
1460 return bind
, identifier
, attributes
1462 def unpack_supplemental_credentials(
1464 ) -> Dict
[kcrypto
.Enctype
, str]:
1465 spl
= ndr_unpack(drsblobs
.supplementalCredentialsBlob
, blob
)
1467 keys
: Dict
[kcrypto
.Enctype
, str] = {}
1469 for pkg
in spl
.sub
.packages
:
1470 if pkg
.name
== 'Primary:Kerberos-Newer-Keys':
1471 krb5_new_keys_raw
= binascii
.a2b_hex(pkg
.data
)
1472 krb5_new_keys
= ndr_unpack(
1473 drsblobs
.package_PrimaryKerberosBlob
, krb5_new_keys_raw
1475 for key
in krb5_new_keys
.ctr
.keys
:
1476 keytype
= key
.keytype
1477 if keytype
in (kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
):
1478 keys
[keytype
] = key
.value
.hex()
1482 def get_keys(self
, creds
, expected_etypes
=None):
1483 admin_creds
= self
.get_admin_creds()
1484 samdb
= self
.get_samdb()
1488 bind
, identifier
, attributes
= self
.get_secrets(
1490 destination_dsa_guid
=misc
.GUID(samdb
.get_ntds_GUID()),
1491 source_dsa_invocation_id
=misc
.GUID())
1493 rid
= identifier
.sid
.split()[1]
1495 net_ctx
= net
.Net(admin_creds
)
1499 for attr
in attributes
:
1500 if not attr
.value_ctr
.num_values
:
1503 if attr
.attid
== drsuapi
.DRSUAPI_ATTID_supplementalCredentials
:
1504 net_ctx
.replicate_decrypt(bind
, attr
, rid
)
1507 self
.unpack_supplemental_credentials(attr
.value_ctr
.values
[0].blob
)
1509 elif attr
.attid
== drsuapi
.DRSUAPI_ATTID_unicodePwd
:
1510 net_ctx
.replicate_decrypt(bind
, attr
, rid
)
1512 pwd
= attr
.value_ctr
.values
[0].blob
1513 keys
[kcrypto
.Enctype
.RC4
] = pwd
.hex()
1515 if expected_etypes
is None:
1516 expected_etypes
= self
.get_default_enctypes(creds
)
1518 self
.assertCountEqual(expected_etypes
, keys
)
1522 def creds_set_keys(self
, creds
, keys
):
1523 if keys
is not None:
1524 for enctype
, key
in keys
.items():
1525 creds
.set_forced_key(enctype
, key
)
1527 def creds_set_enctypes(self
, creds
,
1530 samdb
= self
.get_samdb()
1532 res
= samdb
.search(creds
.get_dn(),
1533 scope
=ldb
.SCOPE_BASE
,
1534 attrs
=['msDS-SupportedEncryptionTypes'])
1535 supported_enctypes
= res
[0].get('msDS-SupportedEncryptionTypes', idx
=0)
1537 if supported_enctypes
is None:
1538 supported_enctypes
= self
.default_etypes
1539 if supported_enctypes
is None:
1541 supported_enctypes
= lp
.get('kdc default domain supported enctypes')
1542 if supported_enctypes
== 0:
1543 supported_enctypes
= rc4_bit | aes256_sk_bit
1544 supported_enctypes
= int(supported_enctypes
)
1546 if extra_bits
is not None:
1547 # We need to add in implicit or implied encryption types.
1548 supported_enctypes |
= extra_bits
1549 if remove_bits
is not None:
1550 # We also need to remove certain bits, such as the non-encryption
1551 # type bit aes256-sk.
1552 supported_enctypes
&= ~remove_bits
1554 creds
.set_as_supported_enctypes(supported_enctypes
)
1555 creds
.set_tgs_supported_enctypes(supported_enctypes
)
1556 creds
.set_ap_supported_enctypes(supported_enctypes
)
1558 def creds_set_default_enctypes(self
, creds
,
1560 claims_support
=False,
1561 compound_id_support
=False):
1562 default_enctypes
= self
.get_default_enctypes(creds
)
1563 supported_enctypes
= KerberosCredentials
.etypes_to_bits(
1567 supported_enctypes |
= security
.KERB_ENCTYPE_FAST_SUPPORTED
1569 supported_enctypes |
= security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
1570 if compound_id_support
:
1571 supported_enctypes |
= (
1572 security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
)
1574 creds
.set_as_supported_enctypes(supported_enctypes
)
1575 creds
.set_tgs_supported_enctypes(supported_enctypes
)
1576 creds
.set_ap_supported_enctypes(supported_enctypes
)
1578 def add_to_group(self
, account_dn
, group_dn
, group_attr
, expect_attr
=True,
1579 new_group_type
=None):
1580 samdb
= self
.get_samdb()
1583 res
= samdb
.search(base
=group_dn
,
1584 scope
=ldb
.SCOPE_BASE
,
1586 except ldb
.LdbError
as err
:
1588 if num
!= ldb
.ERR_NO_SUCH_OBJECT
:
1594 members
= orig_msg
.get(group_attr
)
1596 self
.assertIsNotNone(members
)
1597 elif members
is None:
1600 members
= map(lambda s
: s
.decode('utf-8'), members
)
1602 # Use a set so we can handle the same group being added twice.
1603 members
= set(members
)
1605 self
.assertNotIsInstance(account_dn
, ldb
.Dn
,
1606 'ldb.MessageElement does not support ldb.Dn')
1607 self
.assertNotIsInstance(account_dn
, bytes
)
1609 if isinstance(account_dn
, str):
1610 members
.add(account_dn
)
1612 members
.update(account_dn
)
1616 if new_group_type
is not None:
1617 msg
['0'] = ldb
.MessageElement(
1618 common
.normalise_int32(new_group_type
),
1619 ldb
.FLAG_MOD_REPLACE
,
1621 msg
['1'] = ldb
.MessageElement(list(members
),
1622 ldb
.FLAG_MOD_REPLACE
,
1624 cleanup
= samdb
.msg_diff(msg
, orig_msg
)
1625 self
.ldb_cleanups
.append(cleanup
)
1630 def remove_from_group(self
, account_dn
, group_dn
):
1631 samdb
= self
.get_samdb()
1633 res
= samdb
.search(base
=group_dn
,
1634 scope
=ldb
.SCOPE_BASE
,
1637 self
.assertIn('member', orig_msg
)
1638 members
= list(orig_msg
['member'])
1640 account_dn
= str(account_dn
).encode('utf-8')
1641 self
.assertIn(account_dn
, members
)
1642 members
.remove(account_dn
)
1646 msg
['member'] = ldb
.MessageElement(members
,
1647 ldb
.FLAG_MOD_REPLACE
,
1650 cleanup
= samdb
.msg_diff(msg
, orig_msg
)
1651 self
.ldb_cleanups
.append(cleanup
)
1656 # Create a new group and return a Principal object representing it.
1657 def create_group_principal(self
, samdb
, group_type
):
1658 name
= self
.get_new_username()
1659 dn
= self
.create_group(samdb
, name
, gtype
=group_type
.value
)
1660 sid
= self
.get_objectSid(samdb
, dn
)
1662 return Principal(ldb
.Dn(samdb
, dn
), sid
)
1664 def set_group_type(self
, samdb
, dn
, gtype
):
1665 group_type
= common
.normalise_int32(gtype
.value
)
1666 msg
= ldb
.Message(dn
)
1667 msg
['groupType'] = ldb
.MessageElement(group_type
,
1668 ldb
.FLAG_MOD_REPLACE
,
1672 def set_primary_group(self
, samdb
, dn
, primary_sid
,
1673 expected_error
=None,
1674 expected_werror
=None):
1675 # Get the RID to be set as our primary group.
1676 primary_rid
= primary_sid
.rsplit('-', 1)[1]
1678 # Find out our current primary group.
1679 res
= samdb
.search(dn
,
1680 scope
=ldb
.SCOPE_BASE
,
1681 attrs
=['primaryGroupId'])
1684 # Prepare to modify the attribute.
1685 msg
= ldb
.Message(dn
)
1686 msg
['primaryGroupId'] = ldb
.MessageElement(str(primary_rid
),
1687 ldb
.FLAG_MOD_REPLACE
,
1690 # We'll remove the primaryGroupId attribute after the test, to avoid
1691 # problems in the teardown if the user outlives the group.
1692 remove_msg
= samdb
.msg_diff(msg
, orig_msg
)
1693 self
.addCleanup(samdb
.modify
, remove_msg
)
1695 # Set primaryGroupId.
1696 if expected_error
is None:
1697 self
.assertIsNone(expected_werror
)
1701 self
.assertIsNotNone(expected_werror
)
1703 with self
.assertRaises(
1705 msg
='expected setting primary group to fail'
1709 error
, estr
= err
.exception
.args
1710 self
.assertEqual(expected_error
, error
)
1711 self
.assertIn(f
'{expected_werror:08X}', estr
)
1713 # Create an arrangement of groups based on a configuration specified in a
1714 # test case. 'user_principal' is a principal representing the user account;
1715 # 'trust_principal', a principal representing the account of a user from
1717 def setup_groups(self
,
1722 groups
= dict(preexisting_groups
)
1724 primary_group_types
= {}
1726 # Create each group and add it to the group mapping.
1727 if group_setup
is not None:
1728 for group_id
, (group_type
, _
) in group_setup
.items():
1729 self
.assertNotIn(group_id
, preexisting_groups
,
1730 "don't specify placeholders")
1731 self
.assertNotIn(group_id
, groups
,
1732 'group ID specified more than once')
1734 if primary_groups
is not None and (
1735 group_id
in primary_groups
.values()):
1736 # Windows disallows setting a domain-local group as a
1737 # primary group, unless we create it as Universal first and
1738 # change it back to Domain-Local later.
1739 primary_group_types
[group_id
] = group_type
1740 group_type
= GroupType
.UNIVERSAL
1742 groups
[group_id
] = self
.create_group_principal(samdb
,
1745 if group_setup
is not None:
1746 # Map a group ID to that group's DN, and generate an
1747 # understandable error message if the mapping fails.
1748 def group_id_to_dn(group_id
):
1750 group
= groups
[group_id
]
1752 self
.fail(f
"included group member '{group_id}', but it is "
1753 f
"not specified in {groups.keys()}")
1755 if group
.dn
is not None:
1756 return str(group
.dn
)
1758 return f
'<SID={group.sid}>'
1760 # Populate each group's members.
1761 for group_id
, (_
, members
) in group_setup
.items():
1762 # Get the group's DN and the mapped DNs of its members.
1763 dn
= groups
[group_id
].dn
1764 principal_members
= map(group_id_to_dn
, members
)
1766 # Add the members to the group.
1767 self
.add_to_group(principal_members
, dn
, 'member',
1770 # Set primary groups.
1771 if primary_groups
is not None:
1772 for user
, primary_group
in primary_groups
.items():
1773 primary_sid
= groups
[primary_group
].sid
1774 self
.set_primary_group(samdb
, user
.dn
, primary_sid
)
1776 # Change the primary groups to their actual group types.
1777 for primary_group
, primary_group_type
in primary_group_types
.items():
1778 self
.set_group_type(samdb
,
1779 groups
[primary_group
].dn
,
1782 # Return the mapping from group IDs to principals.
1785 def map_to_sid(self
, val
, mapping
, domain_sid
):
1786 if isinstance(val
, int):
1787 # If it's an integer, we assume it's a RID, and prefix the domain
1789 self
.assertIsNotNone(domain_sid
)
1790 return f
'{domain_sid}-{val}'
1792 if mapping
is not None and val
in mapping
:
1793 # Or if we have a mapping for it, apply that.
1794 return mapping
[val
].sid
1796 # Otherwise leave it unmodified.
1799 def map_to_dn(self
, val
, mapping
, domain_sid
):
1800 sid
= self
.map_to_sid(val
, mapping
, domain_sid
)
1801 return ldb
.Dn(self
.get_samdb(), f
'<SID={sid}>')
1803 # Return SIDs from principal placeholders based on a supplied mapping.
1804 def map_sids(self
, sids
, mapping
, domain_sid
):
1811 if isinstance(entry
, frozenset):
1812 mapped_sids
.add(frozenset(self
.map_sids(entry
,
1816 val
, sid_type
, attrs
= entry
1817 sid
= self
.map_to_sid(val
, mapping
, domain_sid
)
1819 # There's no point expecting the 'Claims Valid' SID to be
1820 # present if we don't support claims. Filter it out to give the
1821 # tests a chance of passing.
1822 if not self
.kdc_claims_support
and (
1823 sid
== security
.SID_CLAIMS_VALID
):
1826 mapped_sids
.add((sid
, sid_type
, attrs
))
1830 def issued_by_rodc(self
, ticket
):
1831 rodc_krbtgt_creds
= self
.get_mock_rodc_krbtgt_creds()
1832 rodc_krbtgt_key
= self
.TicketDecryptionKey_from_creds(
1836 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: rodc_krbtgt_key
,
1839 return self
.modified_ticket(
1841 new_ticket_key
=rodc_krbtgt_key
,
1842 checksum_keys
=checksum_keys
)
1844 def signed_by_rodc(self
, ticket
):
1845 rodc_krbtgt_creds
= self
.get_mock_rodc_krbtgt_creds()
1846 rodc_krbtgt_key
= self
.TicketDecryptionKey_from_creds(
1850 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: rodc_krbtgt_key
,
1853 return self
.modified_ticket(ticket
,
1854 checksum_keys
=checksum_keys
)
1856 # Get a ticket with the SIDs in the PAC replaced with ones we specify. This
1857 # is useful for creating arbitrary tickets that can be used to perform a
1859 def ticket_with_sids(self
,
1868 krbtgt_creds
= self
.get_mock_rodc_krbtgt_creds()
1870 krbtgt_creds
= self
.get_krbtgt_creds()
1871 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
1874 krb5pac
.PAC_TYPE_KDC_CHECKSUM
: krbtgt_key
1877 modify_pac_fn
= partial(self
.set_pac_sids
,
1879 domain_sid
=domain_sid
,
1881 set_user_flags
=set_user_flags
,
1882 reset_user_flags
=reset_user_flags
)
1884 return self
.modified_ticket(ticket
,
1885 new_ticket_key
=krbtgt_key
,
1886 modify_pac_fn
=modify_pac_fn
,
1887 checksum_keys
=checksum_keys
)
1889 # Replace the SIDs in a PAC with 'new_sids'.
1890 def set_pac_sids(self
,
1897 reset_user_flags
=0):
1898 if domain_sid
is None:
1899 domain_sid
= self
.get_samdb().get_domain_sid()
1905 resource_domain
= None
1909 # Filter our SIDs into three arrays depending on their ultimate
1910 # location in the PAC.
1911 for sid
, sid_type
, attrs
in new_sids
:
1912 if sid_type
is self
.SidType
.BASE_SID
:
1913 if isinstance(sid
, int):
1914 domain
, rid
= domain_sid
, sid
1916 domain
, rid
= sid
.rsplit('-', 1)
1917 self
.assertEqual(domain_sid
, domain
,
1918 f
'base SID {sid} must be in our domain')
1920 base_sid
= samr
.RidWithAttribute()
1921 base_sid
.rid
= int(rid
)
1922 base_sid
.attributes
= attrs
1924 base_sids
.append(base_sid
)
1925 elif sid_type
is self
.SidType
.EXTRA_SID
:
1926 extra_sid
= netlogon
.netr_SidAttr()
1927 extra_sid
.sid
= security
.dom_sid(sid
)
1928 extra_sid
.attributes
= attrs
1930 extra_sids
.append(extra_sid
)
1931 elif sid_type
is self
.SidType
.RESOURCE_SID
:
1932 if isinstance(sid
, int):
1933 domain
, rid
= domain_sid
, sid
1935 domain
, rid
= sid
.rsplit('-', 1)
1936 if resource_domain
is None:
1937 resource_domain
= domain
1939 self
.assertEqual(resource_domain
, domain
,
1940 'resource SIDs must share the same '
1943 resource_sid
= samr
.RidWithAttribute()
1944 resource_sid
.rid
= int(rid
)
1945 resource_sid
.attributes
= attrs
1947 resource_sids
.append(resource_sid
)
1948 elif sid_type
is self
.SidType
.PRIMARY_GID
:
1949 self
.assertIsNone(primary_gid
,
1950 f
'must not specify a second primary GID '
1952 self
.assertIsNone(attrs
, 'cannot specify primary GID attrs')
1954 if isinstance(sid
, int):
1955 domain
, primary_gid
= domain_sid
, sid
1957 domain
, primary_gid
= sid
.rsplit('-', 1)
1958 self
.assertEqual(domain_sid
, domain
,
1959 f
'primary GID {sid} must be in our domain')
1961 self
.fail(f
'invalid SID type {sid_type}')
1963 found_logon_info
= False
1965 pac_buffers
= pac
.buffers
1966 for pac_buffer
in pac_buffers
:
1967 # Find the LOGON_INFO PAC buffer.
1968 if pac_buffer
.type == krb5pac
.PAC_TYPE_LOGON_INFO
:
1969 logon_info
= pac_buffer
.info
.info
1971 # Add Extra SIDs and set the EXTRA_SIDS flag as needed.
1972 logon_info
.info3
.sidcount
= len(extra_sids
)
1974 logon_info
.info3
.sids
= extra_sids
1975 logon_info
.info3
.base
.user_flags |
= (
1976 netlogon
.NETLOGON_EXTRA_SIDS
)
1978 logon_info
.info3
.sids
= None
1979 logon_info
.info3
.base
.user_flags
&= ~
(
1980 netlogon
.NETLOGON_EXTRA_SIDS
)
1983 logon_info
.info3
.base
.groups
.count
= len(base_sids
)
1985 logon_info
.info3
.base
.groups
.rids
= base_sids
1987 logon_info
.info3
.base
.groups
.rids
= None
1989 logon_info
.info3
.base
.domain_sid
= security
.dom_sid(domain_sid
)
1990 if user_rid
is not None:
1991 logon_info
.info3
.base
.rid
= int(user_rid
)
1993 if primary_gid
is not None:
1994 logon_info
.info3
.base
.primary_gid
= int(primary_gid
)
1996 # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed.
1997 logon_info
.resource_groups
.groups
.count
= len(resource_sids
)
1999 resource_domain
= security
.dom_sid(resource_domain
)
2000 logon_info
.resource_groups
.domain_sid
= resource_domain
2001 logon_info
.resource_groups
.groups
.rids
= resource_sids
2002 logon_info
.info3
.base
.user_flags |
= (
2003 netlogon
.NETLOGON_RESOURCE_GROUPS
)
2005 logon_info
.resource_groups
.domain_sid
= None
2006 logon_info
.resource_groups
.groups
.rids
= None
2007 logon_info
.info3
.base
.user_flags
&= ~
(
2008 netlogon
.NETLOGON_RESOURCE_GROUPS
)
2010 logon_info
.info3
.base
.user_flags |
= set_user_flags
2011 logon_info
.info3
.base
.user_flags
&= ~reset_user_flags
2013 found_logon_info
= True
2015 # Also replace the user's SID in the UPN DNS buffer.
2016 elif pac_buffer
.type == krb5pac
.PAC_TYPE_UPN_DNS_INFO
:
2017 upn_dns_info_ex
= pac_buffer
.info
.ex
2019 if user_rid
is not None:
2020 upn_dns_info_ex
.objectsid
= security
.dom_sid(
2021 f
'{domain_sid}-{user_rid}')
2023 # But don't replace the user's SID in the Requester SID buffer, or
2024 # we'll get a SID mismatch.
2026 self
.assertTrue(found_logon_info
, 'no LOGON_INFO PAC buffer')
2028 pac
.buffers
= pac_buffers
2032 # Replace the device SIDs in a PAC with 'new_sids'.
2033 def set_pac_device_sids(self
,
2039 if domain_sid
is None:
2040 domain_sid
= self
.get_samdb().get_domain_sid()
2048 # Filter our SIDs into three arrays depending on their ultimate
2049 # location in the PAC.
2050 for entry
in new_sids
:
2051 if isinstance(entry
, frozenset):
2052 resource_domain
= None
2055 for sid
, sid_type
, attrs
in entry
:
2056 self
.assertIs(sid_type
, self
.SidType
.RESOURCE_SID
,
2057 'only resource SIDs may be specified in this way')
2059 if isinstance(sid
, int):
2060 domain
, rid
= domain_sid
, sid
2062 domain
, rid
= sid
.rsplit('-', 1)
2063 if resource_domain
is None:
2064 resource_domain
= domain
2066 self
.assertEqual(resource_domain
, domain
,
2067 'resource SIDs must share the same '
2070 resource_sid
= samr
.RidWithAttribute()
2071 resource_sid
.rid
= int(rid
)
2072 resource_sid
.attributes
= attrs
2074 domain_sids
.append(resource_sid
)
2076 membership
= krb5pac
.PAC_DOMAIN_GROUP_MEMBERSHIP()
2077 if resource_domain
is not None:
2078 membership
.domain_sid
= security
.dom_sid(resource_domain
)
2079 membership
.groups
.rids
= domain_sids
2080 membership
.groups
.count
= len(domain_sids
)
2082 resource_sids
.append(membership
)
2084 sid
, sid_type
, attrs
= entry
2085 if sid_type
is self
.SidType
.BASE_SID
:
2086 if isinstance(sid
, int):
2087 domain
, rid
= domain_sid
, sid
2089 domain
, rid
= sid
.rsplit('-', 1)
2090 self
.assertEqual(domain_sid
, domain
,
2091 f
'base SID {sid} must be in our domain')
2093 base_sid
= samr
.RidWithAttribute()
2094 base_sid
.rid
= int(rid
)
2095 base_sid
.attributes
= attrs
2097 base_sids
.append(base_sid
)
2098 elif sid_type
is self
.SidType
.EXTRA_SID
:
2099 extra_sid
= netlogon
.netr_SidAttr()
2100 extra_sid
.sid
= security
.dom_sid(sid
)
2101 extra_sid
.attributes
= attrs
2103 extra_sids
.append(extra_sid
)
2104 elif sid_type
is self
.SidType
.RESOURCE_SID
:
2105 self
.fail('specify resource groups in frozenset(s)')
2106 elif sid_type
is self
.SidType
.PRIMARY_GID
:
2107 self
.assertIsNone(primary_gid
,
2108 f
'must not specify a second primary GID '
2110 self
.assertIsNone(attrs
, 'cannot specify primary GID attrs')
2112 if isinstance(sid
, int):
2113 domain
, primary_gid
= domain_sid
, sid
2115 domain
, primary_gid
= sid
.rsplit('-', 1)
2116 self
.assertEqual(domain_sid
, domain
,
2117 f
'primary GID {sid} must be in our domain')
2119 self
.fail(f
'invalid SID type {sid_type}')
2121 pac_buffers
= pac
.buffers
2122 for pac_buffer
in pac_buffers
:
2123 # Find the DEVICE_INFO PAC buffer.
2124 if pac_buffer
.type == krb5pac
.PAC_TYPE_DEVICE_INFO
:
2125 logon_info
= pac_buffer
.info
.info
2128 logon_info
= krb5pac
.PAC_DEVICE_INFO()
2130 logon_info_ctr
= krb5pac
.PAC_DEVICE_INFO_CTR()
2131 logon_info_ctr
.info
= logon_info
2133 pac_buffer
= krb5pac
.PAC_BUFFER()
2134 pac_buffer
.type = krb5pac
.PAC_TYPE_DEVICE_INFO
2135 pac_buffer
.info
= logon_info_ctr
2137 pac_buffers
.append(pac_buffer
)
2139 logon_info
.domain_sid
= security
.dom_sid(domain_sid
)
2140 logon_info
.rid
= int(user_rid
)
2142 self
.assertIsNotNone(primary_gid
, 'please specify the primary GID')
2143 logon_info
.primary_gid
= int(primary_gid
)
2147 logon_info
.groups
.rids
= base_sids
2149 logon_info
.groups
.rids
= None
2150 logon_info
.groups
.count
= len(base_sids
)
2154 logon_info
.sids
= extra_sids
2156 logon_info
.sids
= None
2157 logon_info
.sid_count
= len(extra_sids
)
2159 # Add Resource SIDs.
2161 logon_info
.domain_groups
= resource_sids
2163 logon_info
.domain_groups
= None
2164 logon_info
.domain_group_count
= len(resource_sids
)
2166 pac
.buffers
= pac_buffers
2167 pac
.num_buffers
= len(pac_buffers
)
2171 def set_pac_claims(self
, pac
, *, client_claims
=None, device_claims
=None, claim_ids
=None):
2172 if claim_ids
is None:
2175 if client_claims
is not None:
2176 self
.assertIsNone(device_claims
,
2177 'don’t specify both client and device claims')
2178 pac_claims
= client_claims
2179 pac_buffer_type
= krb5pac
.PAC_TYPE_CLIENT_CLAIMS_INFO
2181 self
.assertIsNotNone(device_claims
,
2182 'please specify client or device claims')
2183 pac_claims
= device_claims
2184 pac_buffer_type
= krb5pac
.PAC_TYPE_DEVICE_CLAIMS_INFO
2186 claim_value_types
= {
2187 claims
.CLAIM_TYPE_INT64
: claims
.CLAIM_INT64
,
2188 claims
.CLAIM_TYPE_UINT64
: claims
.CLAIM_UINT64
,
2189 claims
.CLAIM_TYPE_STRING
: claims
.CLAIM_STRING
,
2190 claims
.CLAIM_TYPE_BOOLEAN
: claims
.CLAIM_UINT64
,
2195 for pac_claim_array
in pac_claims
:
2196 pac_claim_source_type
, pac_claim_entries
= (
2201 for pac_claim_entry
in pac_claim_entries
:
2202 pac_claim_id
, pac_claim_type
, pac_claim_values
= (
2205 claim_values_type
= claim_value_types
.get(
2206 pac_claim_type
, claims
.CLAIM_STRING
)
2208 claim_values_enum
= claim_values_type()
2209 claim_values_enum
.values
= pac_claim_values
2210 claim_values_enum
.value_count
= len(
2213 claim_entry
= claims
.CLAIM_ENTRY()
2215 claim_entry
.id = pac_claim_id
.format_map(
2217 except KeyError as err
:
2219 f
'unknown claim name(s) '
2220 f
'in ‘{pac_claim_id}’'
2222 claim_entry
.type = pac_claim_type
2223 claim_entry
.values
= claim_values_enum
2225 claim_entries
.append(claim_entry
)
2227 claims_array
= claims
.CLAIMS_ARRAY()
2228 claims_array
.claims_source_type
= pac_claim_source_type
2229 claims_array
.claim_entries
= claim_entries
2230 claims_array
.claims_count
= len(claim_entries
)
2232 claims_arrays
.append(claims_array
)
2234 claims_set
= claims
.CLAIMS_SET()
2235 claims_set
.claims_arrays
= claims_arrays
2236 claims_set
.claims_array_count
= len(claims_arrays
)
2238 claims_ctr
= claims
.CLAIMS_SET_CTR()
2239 claims_ctr
.claims
= claims_set
2241 claims_ndr
= claims
.CLAIMS_SET_NDR()
2242 claims_ndr
.claims
= claims_ctr
2244 metadata
= claims
.CLAIMS_SET_METADATA()
2245 metadata
.claims_set
= claims_ndr
2246 metadata
.compression_format
= (
2247 claims
.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF
)
2249 metadata_ctr
= claims
.CLAIMS_SET_METADATA_CTR()
2250 metadata_ctr
.metadata
= metadata
2252 metadata_ndr
= claims
.CLAIMS_SET_METADATA_NDR()
2253 metadata_ndr
.claims
= metadata_ctr
2255 pac_buffers
= pac
.buffers
2256 for pac_buffer
in pac_buffers
:
2257 if pac_buffer
.type == pac_buffer_type
:
2260 pac_buffer
= krb5pac
.PAC_BUFFER()
2261 pac_buffer
.type = pac_buffer_type
2262 pac_buffer
.info
= krb5pac
.DATA_BLOB_REM()
2264 pac_buffers
.append(pac_buffer
)
2266 pac_buffer
.info
.remaining
= ndr_pack(metadata_ndr
)
2268 pac
.buffers
= pac_buffers
2269 pac
.num_buffers
= len(pac_buffers
)
2273 def add_extra_pac_buffers(self
, pac
, *, buffers
=None):
2277 pac_buffers
= pac
.buffers
2278 for pac_buffer_type
in buffers
:
2279 info
= krb5pac
.DATA_BLOB_REM()
2280 # Having an empty PAC buffer will trigger an assertion failure in
2281 # the MIT KDC’s k5_pac_locate_buffer(), so we need at least one
2283 info
.remaining
= b
'0'
2285 pac_buffer
= krb5pac
.PAC_BUFFER()
2286 pac_buffer
.type = pac_buffer_type
2287 pac_buffer
.info
= info
2289 pac_buffers
.append(pac_buffer
)
2291 pac
.buffers
= pac_buffers
2292 pac
.num_buffers
= len(pac_buffers
)
2296 def get_cached_creds(self
, *,
2297 account_type
: AccountType
,
2298 opts
: Optional
[dict]=None,
2299 samdb
: Optional
[SamDB
]=None,
2300 use_cache
=True) -> KerberosCredentials
:
2305 'name_prefix': None,
2306 'name_suffix': None,
2310 'additional_details': None,
2311 'allowed_replication': False,
2312 'allowed_replication_mock': False,
2313 'denied_replication': False,
2314 'denied_replication_mock': False,
2315 'revealed_to_rodc': False,
2316 'revealed_to_mock_rodc': False,
2317 'no_auth_data_required': False,
2318 'expired_password': False,
2319 'supported_enctypes': None,
2320 'not_delegated': False,
2321 'delegation_to_spn': None,
2322 'delegation_from_dn': None,
2323 'trusted_to_auth_for_delegation': False,
2324 'fast_support': False,
2325 'claims_support': False,
2326 'compound_id_support': False,
2327 'sid_compression_support': True,
2329 'kerberos_enabled': True,
2330 'secure_channel_type': None,
2332 'force_nt4_hash': False,
2333 'assigned_policy': None,
2334 'assigned_silo': None,
2335 'logon_hours': None,
2336 'smartcard_required': False,
2341 'account_type': account_type
,
2347 self
.assertIsNone(samdb
)
2348 cache_key
= tuple(sorted(account_opts
.items()))
2349 creds
= self
.account_cache
.get(cache_key
)
2350 if creds
is not None:
2353 creds
= self
.create_account_opts(samdb
, use_cache
, **account_opts
)
2355 self
.account_cache
[cache_key
] = creds
2359 def create_account_opts(self
,
2360 samdb
: Optional
[SamDB
],
2370 allowed_replication
,
2371 allowed_replication_mock
,
2373 denied_replication_mock
,
2375 revealed_to_mock_rodc
,
2376 no_auth_data_required
,
2382 trusted_to_auth_for_delegation
,
2385 compound_id_support
,
2386 sid_compression_support
,
2389 secure_channel_type
,
2397 if account_type
is self
.AccountType
.USER
:
2398 self
.assertIsNone(delegation_to_spn
)
2399 self
.assertIsNone(delegation_from_dn
)
2400 self
.assertFalse(trusted_to_auth_for_delegation
)
2402 self
.assertFalse(not_delegated
)
2405 samdb
= self
.get_samdb()
2407 user_name
= self
.get_new_username()
2408 if name_prefix
is not None:
2409 user_name
= name_prefix
+ user_name
2410 if name_suffix
is not None:
2411 user_name
+= name_suffix
2413 user_account_control
= 0
2414 if trusted_to_auth_for_delegation
:
2415 user_account_control |
= UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
2417 user_account_control |
= UF_NOT_DELEGATED
2418 if no_auth_data_required
:
2419 user_account_control |
= UF_NO_AUTH_DATA_REQUIRED
2420 if smartcard_required
:
2421 user_account_control |
= UF_SMARTCARD_REQUIRED
2423 user_account_control |
= UF_ACCOUNTDISABLE
2425 if additional_details
:
2426 details
= {k
: v
for k
, v
in additional_details
}
2430 enctypes
= supported_enctypes
2432 enctypes
= enctypes
or 0
2433 enctypes |
= security
.KERB_ENCTYPE_FAST_SUPPORTED
2435 enctypes
= enctypes
or 0
2436 enctypes |
= security
.KERB_ENCTYPE_CLAIMS_SUPPORTED
2437 if compound_id_support
:
2438 enctypes
= enctypes
or 0
2439 enctypes |
= security
.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2440 if sid_compression_support
is False:
2441 enctypes
= enctypes
or 0
2442 enctypes |
= security
.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED
2444 if enctypes
is not None:
2445 details
['msDS-SupportedEncryptionTypes'] = str(enctypes
)
2447 if delegation_to_spn
:
2448 details
['msDS-AllowedToDelegateTo'] = delegation_to_spn
2450 if delegation_from_dn
:
2451 if isinstance(delegation_from_dn
, str):
2452 delegation_from_dn
= self
.get_security_descriptor(
2454 details
['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
2457 if spn
is None and account_type
is not self
.AccountType
.USER
:
2458 spn
= 'host/' + user_name
2460 if assigned_policy
is not None:
2461 details
['msDS-AssignedAuthNPolicy'] = assigned_policy
2463 if assigned_silo
is not None:
2464 details
['msDS-AssignedAuthNPolicySilo'] = assigned_silo
2466 if logon_hours
is not None:
2467 details
['logonHours'] = logon_hours
2469 creds
, dn
= self
.create_account(samdb
, user_name
,
2470 account_type
=account_type
,
2473 additional_details
=details
,
2474 account_control
=user_account_control
,
2475 add_dollar
=add_dollar
,
2476 force_nt4_hash
=force_nt4_hash
,
2477 expired_password
=expired_password
,
2478 export_to_keytab
=False, # explicit below
2481 expected_etypes
= None
2483 # We don't force fetching the keys other than the NT hash as
2484 # how the server stores the unused KDC keys for the
2485 # smartcard_required case is not important and makes unrelated
2486 # tests break because of differences between Samba and
2489 # The NT hash is different, as it is returned to the client in
2490 # the PAC so is visible in the network behaviour.
2492 expected_etypes
= {kcrypto
.Enctype
.RC4
}
2493 keys
= self
.get_keys(creds
, expected_etypes
=expected_etypes
)
2494 self
.creds_set_keys(creds
, keys
)
2496 # Handle secret replication to the RODC.
2498 if allowed_replication
or revealed_to_rodc
:
2499 rodc_samdb
= self
.get_rodc_samdb()
2500 rodc_dn
= self
.get_server_dn(rodc_samdb
)
2502 # Allow replicating this account's secrets if requested, or allow
2503 # it only temporarily if we're about to replicate them.
2504 allowed_cleanup
= self
.add_to_group(
2506 'msDS-RevealOnDemandGroup')
2508 if revealed_to_rodc
:
2509 # Replicate this account's secrets to the RODC.
2510 self
.replicate_account_to_rodc(dn
)
2512 if not allowed_replication
:
2513 # If we don't want replicating secrets to be allowed for this
2514 # account, disable it again.
2515 samdb
.modify(allowed_cleanup
)
2517 self
.check_revealed(dn
,
2519 revealed
=revealed_to_rodc
)
2521 if denied_replication
:
2522 rodc_samdb
= self
.get_rodc_samdb()
2523 rodc_dn
= self
.get_server_dn(rodc_samdb
)
2525 # Deny replicating this account's secrets to the RODC.
2526 self
.add_to_group(dn
, rodc_dn
, 'msDS-NeverRevealGroup')
2528 # Handle secret replication to the mock RODC.
2530 if allowed_replication_mock
or revealed_to_mock_rodc
:
2531 # Allow replicating this account's secrets if requested, or allow
2532 # it only temporarily if we want to add the account to the mock
2533 # RODC's msDS-RevealedUsers.
2534 rodc_ctx
= self
.get_mock_rodc_ctx()
2535 mock_rodc_dn
= ldb
.Dn(samdb
, rodc_ctx
.acct_dn
)
2537 allowed_mock_cleanup
= self
.add_to_group(
2539 'msDS-RevealOnDemandGroup')
2541 if revealed_to_mock_rodc
:
2542 # Request replicating this account's secrets to the mock RODC,
2543 # which updates msDS-RevealedUsers.
2544 self
.reveal_account_to_mock_rodc(dn
)
2546 if not allowed_replication_mock
:
2547 # If we don't want replicating secrets to be allowed for this
2548 # account, disable it again.
2549 samdb
.modify(allowed_mock_cleanup
)
2551 self
.check_revealed(dn
,
2553 revealed
=revealed_to_mock_rodc
)
2555 if denied_replication_mock
:
2556 # Deny replicating this account's secrets to the mock RODC.
2557 rodc_ctx
= self
.get_mock_rodc_ctx()
2558 mock_rodc_dn
= ldb
.Dn(samdb
, rodc_ctx
.acct_dn
)
2560 self
.add_to_group(dn
, mock_rodc_dn
, 'msDS-NeverRevealGroup')
2562 if member_of
is not None:
2563 for group_dn
in member_of
:
2564 self
.add_to_group(dn
, ldb
.Dn(samdb
, group_dn
), 'member',
2567 if kerberos_enabled
:
2568 creds
.set_kerberos_state(MUST_USE_KERBEROS
)
2570 creds
.set_kerberos_state(DONT_USE_KERBEROS
)
2572 if secure_channel_type
is not None:
2573 creds
.set_secure_channel_type(secure_channel_type
)
2575 self
.remember_creds_for_keytab_export(creds
)
2579 def get_new_username(self
):
2580 user_name
= self
.account_base
+ str(self
.account_id
)
2581 type(self
).account_id
+= 1
2585 def get_client_creds(self
,
2586 allow_missing_password
=False,
2587 allow_missing_keys
=True):
2588 def create_client_account():
2589 return self
.get_cached_creds(account_type
=self
.AccountType
.USER
)
2591 c
= self
._get
_krb
5_creds
(prefix
='CLIENT',
2592 allow_missing_password
=allow_missing_password
,
2593 allow_missing_keys
=allow_missing_keys
,
2594 fallback_creds_fn
=create_client_account
)
2597 def get_mach_creds(self
,
2598 allow_missing_password
=False,
2599 allow_missing_keys
=True):
2600 def create_mach_account():
2601 return self
.get_cached_creds(
2602 account_type
=self
.AccountType
.COMPUTER
,
2604 'fast_support': True,
2605 'claims_support': True,
2606 'compound_id_support': True,
2607 'supported_enctypes': (
2608 security
.KERB_ENCTYPE_RC4_HMAC_MD5 |
2609 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2613 c
= self
._get
_krb
5_creds
(prefix
='MAC',
2614 allow_missing_password
=allow_missing_password
,
2615 allow_missing_keys
=allow_missing_keys
,
2616 fallback_creds_fn
=create_mach_account
)
2619 def get_service_creds(self
,
2620 allow_missing_password
=False,
2621 allow_missing_keys
=True):
2622 def create_service_account():
2623 return self
.get_cached_creds(
2624 account_type
=self
.AccountType
.COMPUTER
,
2626 'trusted_to_auth_for_delegation': True,
2627 'fast_support': True,
2628 'claims_support': True,
2629 'compound_id_support': True,
2630 'supported_enctypes': (
2631 security
.KERB_ENCTYPE_RC4_HMAC_MD5 |
2632 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2636 c
= self
._get
_krb
5_creds
(prefix
='SERVICE',
2637 allow_missing_password
=allow_missing_password
,
2638 allow_missing_keys
=allow_missing_keys
,
2639 fallback_creds_fn
=create_service_account
)
2642 def get_rodc_krbtgt_creds(self
,
2644 require_strongest_key
=False):
2645 if require_strongest_key
:
2646 self
.assertTrue(require_keys
)
2648 def download_rodc_krbtgt_creds():
2649 samdb
= self
.get_samdb()
2650 rodc_samdb
= self
.get_rodc_samdb()
2652 rodc_dn
= self
.get_server_dn(rodc_samdb
)
2654 res
= samdb
.search(rodc_dn
,
2655 scope
=ldb
.SCOPE_BASE
,
2656 attrs
=['msDS-KrbTgtLink'])
2657 krbtgt_dn
= res
[0]['msDS-KrbTgtLink'][0]
2659 res
= samdb
.search(krbtgt_dn
,
2660 scope
=ldb
.SCOPE_BASE
,
2661 attrs
=['sAMAccountName',
2662 'msDS-KeyVersionNumber',
2663 'msDS-SecondaryKrbTgtNumber'])
2664 krbtgt_dn
= res
[0].dn
2665 username
= str(res
[0]['sAMAccountName'])
2667 creds
= KerberosCredentials()
2668 creds
.set_domain(self
.env_get_var('DOMAIN', 'RODC_KRBTGT'))
2669 creds
.set_realm(self
.env_get_var('REALM', 'RODC_KRBTGT'))
2670 creds
.set_username(username
)
2672 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2673 krbtgt_number
= int(res
[0]['msDS-SecondaryKrbTgtNumber'][0])
2675 rodc_kvno
= krbtgt_number
<< 16 | kvno
2676 creds
.set_kvno(rodc_kvno
)
2677 creds
.set_dn(krbtgt_dn
)
2679 keys
= self
.get_keys(creds
)
2680 self
.creds_set_keys(creds
, keys
)
2682 # The RODC krbtgt account should support the default enctypes,
2683 # although it might not have the msDS-SupportedEncryptionTypes
2685 self
.creds_set_default_enctypes(
2687 fast_support
=self
.kdc_fast_support
,
2688 claims_support
=self
.kdc_claims_support
,
2689 compound_id_support
=self
.kdc_compound_id_support
)
2691 if type(self
).export_existing_creds
:
2692 self
.remember_creds_for_keytab_export(creds
)
2696 c
= self
._get
_krb
5_creds
(prefix
='RODC_KRBTGT',
2697 allow_missing_password
=True,
2698 allow_missing_keys
=not require_keys
,
2699 require_strongest_key
=require_strongest_key
,
2700 fallback_creds_fn
=download_rodc_krbtgt_creds
)
2703 def get_mock_rodc_krbtgt_creds(self
,
2705 require_strongest_key
=False,
2707 if require_strongest_key
:
2708 self
.assertTrue(require_keys
)
2710 def create_rodc_krbtgt_account():
2711 samdb
= self
.get_samdb()
2713 rodc_ctx
= self
.get_mock_rodc_ctx(preserve
=preserve
)
2715 krbtgt_dn
= rodc_ctx
.new_krbtgt_dn
2717 res
= samdb
.search(base
=ldb
.Dn(samdb
, krbtgt_dn
),
2718 scope
=ldb
.SCOPE_BASE
,
2719 attrs
=['msDS-KeyVersionNumber',
2720 'msDS-SecondaryKrbTgtNumber'])
2722 username
= str(rodc_ctx
.krbtgt_name
)
2724 krbtgt_creds
= KerberosCredentials()
2725 krbtgt_creds
.set_domain(self
.env_get_var('DOMAIN', 'RODC_KRBTGT'))
2726 krbtgt_creds
.set_realm(self
.env_get_var('REALM', 'RODC_KRBTGT'))
2727 krbtgt_creds
.set_username(username
)
2729 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2730 krbtgt_number
= int(res
[0]['msDS-SecondaryKrbTgtNumber'][0])
2732 rodc_kvno
= krbtgt_number
<< 16 | kvno
2733 krbtgt_creds
.set_kvno(rodc_kvno
)
2734 krbtgt_creds
.set_dn(dn
)
2736 krbtgt_keys
= self
.get_keys(krbtgt_creds
)
2737 self
.creds_set_keys(krbtgt_creds
, krbtgt_keys
)
2739 self
.remember_creds_for_keytab_export(krbtgt_creds
)
2741 acct_res
= samdb
.search(base
=rodc_ctx
.acct_dn
,
2742 scope
=ldb
.SCOPE_BASE
,
2743 attrs
=['msDS-KeyVersionNumber',
2747 computer_creds
= KerberosCredentials()
2748 computer_creds
.set_domain(krbtgt_creds
.get_domain())
2749 computer_creds
.set_realm(krbtgt_creds
.get_realm())
2750 computer_creds
.set_username(rodc_ctx
.samname
)
2751 computer_creds
.set_password(rodc_ctx
.acct_pass
)
2752 computer_creds
.set_workstation(rodc_ctx
.myname
)
2753 computer_creds
.set_secure_channel_type(misc
.SEC_CHAN_RODC
)
2754 computer_creds
.set_dn(acct_res
[0].dn
)
2755 computer_creds
.set_type(self
.AccountType
.RODC
)
2756 computer_creds
.set_user_account_control(rodc_ctx
.userAccountControl
)
2758 computer_kvno
= int(acct_res
[0]['msDS-KeyVersionNumber'][0])
2759 computer_creds
.set_kvno(computer_kvno
)
2761 sid
= acct_res
[0].get('objectSid', idx
=0)
2762 sid
= samdb
.schema_format_value('objectSID', sid
)
2763 sid
= sid
.decode('utf-8')
2764 computer_creds
.set_sid(sid
)
2765 guid
= acct_res
[0].get('objectGUID', idx
=0)
2766 guid
= samdb
.schema_format_value('objectGUID', guid
)
2767 guid
= guid
.decode('utf-8')
2768 computer_creds
.set_guid(guid
)
2770 computer_keys
= self
.get_keys(computer_creds
)
2771 # we just let wireshark see the keys via drsuapi
2772 # but we don't force them on computer_creds
2773 # as computer_creds.set_password() could be
2774 # used by the caller...
2775 # self.creds_set_keys(computer_creds, computer_keys)
2777 self
.remember_creds_for_keytab_export(computer_creds
)
2779 if self
.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008
:
2780 extra_bits
= (security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2781 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
)
2784 remove_bits
= (security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK |
2785 security
.KERB_ENCTYPE_RC4_HMAC_MD5
)
2786 self
.creds_set_enctypes(krbtgt_creds
,
2787 extra_bits
=extra_bits
,
2788 remove_bits
=remove_bits
)
2789 self
.creds_set_enctypes(computer_creds
,
2790 extra_bits
=extra_bits
,
2791 remove_bits
=remove_bits
)
2793 krbtgt_creds
.set_rodc_computer_creds(computer_creds
)
2798 return create_rodc_krbtgt_account()
2800 c
= self
._get
_krb
5_creds
(prefix
='MOCK_RODC_KRBTGT',
2801 allow_missing_password
=True,
2802 allow_missing_keys
=not require_keys
,
2803 require_strongest_key
=require_strongest_key
,
2804 fallback_creds_fn
=create_rodc_krbtgt_account
)
2807 def get_krbtgt_creds(self
,
2809 require_strongest_key
=False):
2810 if require_strongest_key
:
2811 self
.assertTrue(require_keys
)
2813 def download_krbtgt_creds():
2814 samdb
= self
.get_samdb()
2816 krbtgt_rid
= security
.DOMAIN_RID_KRBTGT
2817 krbtgt_sid
= '%s-%d' % (samdb
.get_domain_sid(), krbtgt_rid
)
2819 res
= samdb
.search(base
='<SID=%s>' % krbtgt_sid
,
2820 scope
=ldb
.SCOPE_BASE
,
2821 attrs
=['sAMAccountName',
2822 'msDS-KeyVersionNumber'])
2824 username
= str(res
[0]['sAMAccountName'])
2826 creds
= KerberosCredentials()
2827 creds
.set_domain(self
.env_get_var('DOMAIN', 'KRBTGT'))
2828 creds
.set_realm(self
.env_get_var('REALM', 'KRBTGT'))
2829 creds
.set_username(username
)
2831 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2832 creds
.set_kvno(kvno
)
2835 keys
= self
.get_keys(creds
)
2836 self
.creds_set_keys(creds
, keys
)
2838 # The krbtgt account should support the default enctypes, although
2839 # it might not (on Samba) have the msDS-SupportedEncryptionTypes
2841 self
.creds_set_default_enctypes(
2843 fast_support
=self
.kdc_fast_support
,
2844 claims_support
=self
.kdc_claims_support
,
2845 compound_id_support
=self
.kdc_compound_id_support
)
2847 if type(self
).export_existing_creds
:
2848 self
.remember_creds_for_keytab_export(creds
)
2852 c
= self
._get
_krb
5_creds
(prefix
='KRBTGT',
2853 default_username
='krbtgt',
2854 allow_missing_password
=True,
2855 allow_missing_keys
=not require_keys
,
2856 require_strongest_key
=require_strongest_key
,
2857 fallback_creds_fn
=download_krbtgt_creds
)
2860 def get_dc_creds(self
,
2862 require_strongest_key
=False):
2863 if require_strongest_key
:
2864 self
.assertTrue(require_keys
)
2866 def download_dc_creds():
2867 samdb
= self
.get_samdb()
2870 dc_sid
= '%s-%d' % (samdb
.get_domain_sid(), dc_rid
)
2872 res
= samdb
.search(base
='<SID=%s>' % dc_sid
,
2873 scope
=ldb
.SCOPE_BASE
,
2874 attrs
=['sAMAccountName',
2875 'msDS-KeyVersionNumber'])
2877 username
= str(res
[0]['sAMAccountName'])
2879 creds
= KerberosCredentials()
2880 creds
.set_domain(self
.env_get_var('DOMAIN', 'DC'))
2881 creds
.set_realm(self
.env_get_var('REALM', 'DC'))
2882 creds
.set_username(username
)
2884 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2885 creds
.set_kvno(kvno
)
2886 creds
.set_workstation(username
[:-1])
2889 keys
= self
.get_keys(creds
)
2890 self
.creds_set_keys(creds
, keys
)
2892 if self
.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008
:
2893 extra_bits
= (security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2894 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
)
2897 remove_bits
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2898 self
.creds_set_enctypes(creds
,
2899 extra_bits
=extra_bits
,
2900 remove_bits
=remove_bits
)
2902 if type(self
).export_existing_creds
:
2903 self
.remember_creds_for_keytab_export(creds
)
2907 c
= self
._get
_krb
5_creds
(prefix
='DC',
2908 allow_missing_password
=True,
2909 allow_missing_keys
=not require_keys
,
2910 require_strongest_key
=require_strongest_key
,
2911 fallback_creds_fn
=download_dc_creds
)
2914 def get_server_creds(self
,
2916 require_strongest_key
=False):
2917 if require_strongest_key
:
2918 self
.assertTrue(require_keys
)
2920 def download_server_creds():
2921 samdb
= self
.get_samdb()
2923 res
= samdb
.search(base
=samdb
.get_default_basedn(),
2924 expression
=(f
'(|(sAMAccountName={self.host}*)'
2925 f
'(dNSHostName={self.host}))'),
2926 scope
=ldb
.SCOPE_SUBTREE
,
2927 attrs
=['sAMAccountName',
2928 'msDS-KeyVersionNumber'])
2929 self
.assertEqual(1, len(res
))
2931 username
= str(res
[0]['sAMAccountName'])
2933 creds
= KerberosCredentials()
2934 creds
.set_domain(self
.env_get_var('DOMAIN', 'SERVER'))
2935 creds
.set_realm(self
.env_get_var('REALM', 'SERVER'))
2936 creds
.set_username(username
)
2938 kvno
= int(res
[0]['msDS-KeyVersionNumber'][0])
2939 creds
.set_kvno(kvno
)
2942 keys
= self
.get_keys(creds
)
2943 self
.creds_set_keys(creds
, keys
)
2945 if self
.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008
:
2946 extra_bits
= (security
.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2947 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
)
2950 remove_bits
= security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2951 self
.creds_set_enctypes(creds
,
2952 extra_bits
=extra_bits
,
2953 remove_bits
=remove_bits
)
2955 if type(self
).export_existing_creds
:
2956 self
.remember_creds_for_keytab_export(creds
)
2960 c
= self
._get
_krb
5_creds
(prefix
='SERVER',
2961 allow_missing_password
=True,
2962 allow_missing_keys
=not require_keys
,
2963 require_strongest_key
=require_strongest_key
,
2964 fallback_creds_fn
=download_server_creds
)
2967 # Get the credentials and server principal name of either the krbtgt, or a
2968 # specially created account, with resource SID compression either supported
2970 def get_target(self
,
2976 self
.assertIsNone(compound_id
,
2977 "it's no good specifying compound id support "
2979 self
.assertIsNone(compression
,
2980 "it's no good specifying compression support "
2982 self
.assertFalse(extra_enctypes
,
2983 "it's no good specifying extra enctypes "
2985 creds
= self
.get_krbtgt_creds()
2986 sname
= self
.get_krbtgt_sname()
2988 creds
= self
.get_cached_creds(
2989 account_type
=self
.AccountType
.COMPUTER
,
2991 'supported_enctypes':
2992 security
.KERB_ENCTYPE_RC4_HMAC_MD5 |
2993 security
.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 |
2995 'compound_id_support': compound_id
,
2996 'sid_compression_support': compression
,
2998 target_name
= creds
.get_username()
3000 if target_name
[-1] == '$':
3001 target_name
= target_name
[:-1]
3002 sname
= self
.PrincipalName_create(
3003 name_type
=NT_PRINCIPAL
,
3004 names
=['host', target_name
])
3008 def as_req(self
, cname
, sname
, realm
, etypes
, padata
=None, kdc_options
=0):
3009 """Send a Kerberos AS_REQ, returns the undecoded response
3012 till
= self
.get_KerberosTime(offset
=36000)
3014 req
= self
.AS_REQ_create(padata
=padata
,
3015 kdc_options
=str(kdc_options
),
3025 additional_tickets
=None)
3026 rep
= self
.send_recv_transaction(req
)
3029 def get_as_rep_key(self
, creds
, rep
):
3030 """Extract the session key from an AS-REP
3032 rep_padata
= self
.der_decode(
3034 asn1Spec
=krb5_asn1
.METHOD_DATA())
3036 for pa
in rep_padata
:
3037 if pa
['padata-type'] == PADATA_ETYPE_INFO2
:
3038 padata_value
= pa
['padata-value']
3041 self
.fail('expected to find ETYPE-INFO2')
3043 etype_info2
= self
.der_decode(
3044 padata_value
, asn1Spec
=krb5_asn1
.ETYPE_INFO2())
3046 key
= self
.PasswordKey_from_etype_info2(creds
, etype_info2
[0],
3050 def get_enc_timestamp_pa_data(self
, creds
, rep
, skew
=0):
3051 """generate the pa_data data element for an AS-REQ
3054 key
= self
.get_as_rep_key(creds
, rep
)
3056 return self
.get_enc_timestamp_pa_data_from_key(key
, skew
=skew
)
3058 def get_enc_timestamp_pa_data_from_key(self
, key
, skew
=0):
3059 (patime
, pausec
) = self
.get_KerberosTimeWithUsec(offset
=skew
)
3060 padata
= self
.PA_ENC_TS_ENC_create(patime
, pausec
)
3061 padata
= self
.der_encode(padata
, asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
3063 padata
= self
.EncryptedData_create(key
, KU_PA_ENC_TIMESTAMP
, padata
)
3064 padata
= self
.der_encode(padata
, asn1Spec
=krb5_asn1
.EncryptedData())
3066 padata
= self
.PA_DATA_create(PADATA_ENC_TIMESTAMP
, padata
)
3070 def get_challenge_pa_data(self
, client_challenge_key
, skew
=0):
3071 patime
, pausec
= self
.get_KerberosTimeWithUsec(offset
=skew
)
3072 padata
= self
.PA_ENC_TS_ENC_create(patime
, pausec
)
3073 padata
= self
.der_encode(padata
,
3074 asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
3076 padata
= self
.EncryptedData_create(client_challenge_key
,
3077 KU_ENC_CHALLENGE_CLIENT
,
3079 padata
= self
.der_encode(padata
,
3080 asn1Spec
=krb5_asn1
.EncryptedData())
3082 padata
= self
.PA_DATA_create(PADATA_ENCRYPTED_CHALLENGE
,
3087 def get_as_rep_enc_data(self
, key
, rep
):
3088 """ Decrypt and Decode the encrypted data in an AS-REP
3090 enc_part
= key
.decrypt(KU_AS_REP_ENC_PART
, rep
['enc-part']['cipher'])
3091 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
3092 # application tag 26
3094 enc_part
= self
.der_decode(
3095 enc_part
, asn1Spec
=krb5_asn1
.EncASRepPart())
3097 enc_part
= self
.der_decode(
3098 enc_part
, asn1Spec
=krb5_asn1
.EncTGSRepPart())
3102 def check_pre_authentication(self
, rep
):
3103 """ Check that the kdc response was pre-authentication required
3105 self
.check_error_rep(rep
, KDC_ERR_PREAUTH_REQUIRED
)
3107 def check_as_reply(self
, rep
):
3108 """ Check that the kdc response is an AS-REP and that the
3114 match the expected values
3116 self
.check_reply(rep
, msg_type
=KRB_AS_REP
)
3118 def check_tgs_reply(self
, rep
):
3119 """ Check that the kdc response is an TGS-REP and that the
3125 match the expected values
3127 self
.check_reply(rep
, msg_type
=KRB_TGS_REP
)
3129 def check_reply(self
, rep
, msg_type
):
3131 # Should have a reply, and it should an TGS-REP message.
3132 self
.assertIsNotNone(rep
)
3133 self
.assertEqual(rep
['msg-type'], msg_type
, "rep = {%s}" % rep
)
3135 # Protocol version number should be 5
3136 pvno
= int(rep
['pvno'])
3137 self
.assertEqual(5, pvno
, "rep = {%s}" % rep
)
3139 # The ticket version number should be 5
3140 tkt_vno
= int(rep
['ticket']['tkt-vno'])
3141 self
.assertEqual(5, tkt_vno
, "rep = {%s}" % rep
)
3143 # Check that the kvno is not an RODC kvno
3144 # MIT kerberos does not provide the kvno, so we treat it as optional.
3145 # This is tested in compatability_test.py
3146 if 'kvno' in rep
['enc-part']:
3147 kvno
= int(rep
['enc-part']['kvno'])
3148 # If the high order bits are set this is an RODC kvno.
3149 self
.assertEqual(0, kvno
& 0xFFFF0000, "rep = {%s}" % rep
)
3151 def check_error_rep(self
, rep
, expected
):
3152 """ Check that the reply is an error message, with the expected
3153 error-code specified.
3155 self
.assertIsNotNone(rep
)
3156 self
.assertEqual(rep
['msg-type'], KRB_ERROR
, "rep = {%s}" % rep
)
3157 if isinstance(expected
, collections
.abc
.Container
):
3158 self
.assertIn(rep
['error-code'], expected
, "rep = {%s}" % rep
)
3160 self
.assertEqual(rep
['error-code'], expected
, "rep = {%s}" % rep
)
3162 def tgs_req(self
, cname
, sname
, realm
, ticket
, key
, etypes
,
3163 expected_error_mode
=0, padata
=None, kdc_options
=0,
3164 to_rodc
=False, creds
=None, service_creds
=None, expect_pac
=True,
3165 expect_edata
=None, expected_flags
=None, unexpected_flags
=None):
3166 """Send a TGS-REQ, returns the response and the decrypted and
3170 subkey
= self
.RandomKey(key
.etype
)
3172 (ctime
, cusec
) = self
.get_KerberosTimeWithUsec()
3174 tgt
= KerberosTicketCreds(ticket
,
3179 if service_creds
is not None:
3180 decryption_key
= self
.TicketDecryptionKey_from_creds(
3182 expected_supported_etypes
= service_creds
.tgs_supported_enctypes
3184 decryption_key
= None
3185 expected_supported_etypes
= None
3187 if not expected_error_mode
:
3188 check_error_fn
= None
3189 check_rep_fn
= self
.generic_check_kdc_rep
3191 check_error_fn
= self
.generic_check_kdc_error
3194 def generate_padata(_kdc_exchange_dict
,
3198 return padata
, req_body
3200 kdc_exchange_dict
= self
.tgs_exchange_dict(
3202 expected_crealm
=realm
,
3203 expected_cname
=cname
,
3204 expected_srealm
=realm
,
3205 expected_sname
=sname
,
3206 expected_error_mode
=expected_error_mode
,
3207 expected_flags
=expected_flags
,
3208 unexpected_flags
=unexpected_flags
,
3209 expected_supported_etypes
=expected_supported_etypes
,
3210 check_error_fn
=check_error_fn
,
3211 check_rep_fn
=check_rep_fn
,
3212 check_kdc_private_fn
=self
.generic_check_kdc_private
,
3213 ticket_decryption_key
=decryption_key
,
3214 generate_padata_fn
=generate_padata
if padata
is not None else None,
3216 authenticator_subkey
=subkey
,
3217 kdc_options
=str(kdc_options
),
3218 expect_edata
=expect_edata
,
3219 expect_pac
=expect_pac
,
3222 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
3228 if expected_error_mode
:
3231 ticket_creds
= kdc_exchange_dict
['rep_ticket_creds']
3232 enc_part
= ticket_creds
.encpart_private
3234 return rep
, enc_part
3236 def get_service_ticket(self
, tgt
, target_creds
, service
='host',
3238 target_name
=None, till
=None, rc4_support
=True,
3239 to_rodc
=False, kdc_options
=None,
3240 expected_flags
=None, unexpected_flags
=None,
3241 expected_groups
=None,
3242 unexpected_groups
=None,
3243 expect_client_claims
=None,
3244 expect_device_claims
=None,
3245 expected_client_claims
=None,
3246 unexpected_client_claims
=None,
3247 expected_device_claims
=None,
3248 unexpected_device_claims
=None,
3249 pac_request
=True, expect_pac
=True,
3250 expect_requester_sid
=None,
3251 expect_pac_attrs
=None,
3252 expect_pac_attrs_pac_request
=None,
3253 expect_krbtgt_referral
=False,
3255 user_name
= tgt
.cname
['name-string'][0]
3256 ticket_sname
= tgt
.sname
3257 if target_name
is None:
3258 target_name
= target_creds
.get_username()[:-1]
3260 self
.assertIsNone(sname
, 'supplied both target name and sname')
3261 cache_key
= (user_name
, target_name
, service
, to_rodc
, kdc_options
,
3262 pac_request
, str(expected_flags
), str(unexpected_flags
),
3266 str(expected_groups
),
3267 str(unexpected_groups
),
3268 expect_client_claims
, expect_device_claims
,
3269 str(expected_client_claims
),
3270 str(unexpected_client_claims
),
3271 str(expected_device_claims
),
3272 str(unexpected_device_claims
),
3274 expect_requester_sid
,
3276 expect_pac_attrs_pac_request
)
3279 ticket
= self
.tkt_cache
.get(cache_key
)
3281 if ticket
is not None:
3284 etype
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
3286 if kdc_options
is None:
3288 kdc_options
= str(krb5_asn1
.KDCOptions(kdc_options
))
3291 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
3292 names
=[service
, target_name
])
3294 srealm
= target_creds
.get_realm()
3296 authenticator_subkey
= self
.RandomKey(kcrypto
.Enctype
.AES256
)
3298 decryption_key
= self
.TicketDecryptionKey_from_creds(target_creds
)
3300 kdc_exchange_dict
= self
.tgs_exchange_dict(
3301 expected_crealm
=tgt
.crealm
,
3302 expected_cname
=tgt
.cname
,
3303 expected_srealm
=srealm
,
3304 expected_sname
=sname
,
3305 expected_supported_etypes
=target_creds
.tgs_supported_enctypes
,
3306 expected_flags
=expected_flags
,
3307 unexpected_flags
=unexpected_flags
,
3308 expected_groups
=expected_groups
,
3309 unexpected_groups
=unexpected_groups
,
3310 expect_client_claims
=expect_client_claims
,
3311 expect_device_claims
=expect_device_claims
,
3312 expected_client_claims
=expected_client_claims
,
3313 unexpected_client_claims
=unexpected_client_claims
,
3314 expected_device_claims
=expected_device_claims
,
3315 unexpected_device_claims
=unexpected_device_claims
,
3316 ticket_decryption_key
=decryption_key
,
3317 expect_ticket_kvno
=(not expect_krbtgt_referral
),
3318 check_rep_fn
=self
.generic_check_kdc_rep
,
3319 check_kdc_private_fn
=self
.generic_check_kdc_private
,
3321 authenticator_subkey
=authenticator_subkey
,
3322 kdc_options
=kdc_options
,
3323 pac_request
=pac_request
,
3324 expect_pac
=expect_pac
,
3325 expect_requester_sid
=expect_requester_sid
,
3326 expect_pac_attrs
=expect_pac_attrs
,
3327 expect_pac_attrs_pac_request
=expect_pac_attrs_pac_request
,
3328 rc4_support
=rc4_support
,
3331 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
3337 self
.check_tgs_reply(rep
)
3339 service_ticket_creds
= kdc_exchange_dict
['rep_ticket_creds']
3342 krbtgt_creds
= self
.get_rodc_krbtgt_creds()
3344 krbtgt_creds
= self
.get_krbtgt_creds()
3345 krbtgt_key
= self
.TicketDecryptionKey_from_creds(krbtgt_creds
)
3347 is_tgs_princ
= self
.is_tgs_principal(sname
)
3348 expect_ticket_checksum
= (self
.tkt_sig_support
3349 and not is_tgs_princ
)
3350 expect_full_checksum
= (self
.full_sig_support
3351 and not is_tgs_princ
)
3352 self
.verify_ticket(service_ticket_creds
, krbtgt_key
,
3353 service_ticket
=True, expect_pac
=expect_pac
,
3354 expect_ticket_checksum
=expect_ticket_checksum
,
3355 expect_full_checksum
=expect_full_checksum
)
3357 self
.tkt_cache
[cache_key
] = service_ticket_creds
3359 return service_ticket_creds
3361 def get_tgt(self
, creds
, to_rodc
=False, kdc_options
=None,
3362 client_account
=None, client_name_type
=NT_PRINCIPAL
,
3363 target_creds
=None, ticket_etype
=None,
3364 expected_flags
=None, unexpected_flags
=None,
3365 expected_account_name
=None, expected_upn_name
=None,
3366 expected_cname
=None,
3368 sname
=None, realm
=None,
3369 expected_groups
=None,
3370 unexpected_groups
=None,
3371 pac_request
=True, expect_pac
=True,
3372 expect_pac_attrs
=None, expect_pac_attrs_pac_request
=None,
3374 expect_requester_sid
=None,
3377 expect_client_claims
=None, expect_device_claims
=None,
3378 expected_client_claims
=None, unexpected_client_claims
=None,
3379 expected_device_claims
=None, unexpected_device_claims
=None,
3381 if client_account
is not None:
3382 user_name
= client_account
3384 user_name
= creds
.get_username()
3386 cache_key
= (user_name
, to_rodc
, kdc_options
, pac_request
, pac_options
,
3389 str(expected_flags
), str(unexpected_flags
),
3390 expected_account_name
, expected_upn_name
, expected_sid
,
3391 str(sname
), str(realm
),
3392 str(expected_groups
),
3393 str(unexpected_groups
),
3394 str(expected_cname
),
3397 expect_pac
, expect_pac_attrs
,
3398 expect_pac_attrs_pac_request
, expect_requester_sid
,
3399 expect_client_claims
, expect_device_claims
,
3400 str(expected_client_claims
),
3401 str(unexpected_client_claims
),
3402 str(expected_device_claims
),
3403 str(unexpected_device_claims
))
3406 tgt
= self
.tkt_cache
.get(cache_key
)
3412 realm
= creds
.get_realm()
3414 salt
= creds
.get_salt()
3416 etype
= self
.get_default_enctypes(creds
)
3417 cname
= self
.PrincipalName_create(name_type
=client_name_type
,
3418 names
=user_name
.split('/'))
3420 sname
= self
.PrincipalName_create(name_type
=NT_SRV_INST
,
3421 names
=['krbtgt', realm
])
3422 expected_sname
= self
.PrincipalName_create(
3423 name_type
=NT_SRV_INST
, names
=['krbtgt', realm
.upper()])
3425 expected_sname
= sname
3427 if expected_cname
is None:
3428 expected_cname
= cname
3430 till
= self
.get_KerberosTime(offset
=36000)
3432 if target_creds
is not None:
3433 krbtgt_creds
= target_creds
3435 krbtgt_creds
= self
.get_rodc_krbtgt_creds()
3437 krbtgt_creds
= self
.get_krbtgt_creds()
3438 ticket_decryption_key
= (
3439 self
.TicketDecryptionKey_from_creds(krbtgt_creds
,
3440 etype
=ticket_etype
))
3442 expected_etypes
= krbtgt_creds
.tgs_supported_enctypes
3444 if kdc_options
is None:
3445 kdc_options
= ('forwardable,'
3449 kdc_options
= krb5_asn1
.KDCOptions(kdc_options
)
3451 if pac_options
is None:
3452 pac_options
= '1' # supports claims
3454 rep
, kdc_exchange_dict
= self
._test
_as
_exchange
(
3460 expected_error_mode
=KDC_ERR_PREAUTH_REQUIRED
,
3461 expected_crealm
=realm
,
3462 expected_cname
=expected_cname
,
3463 expected_srealm
=realm
,
3464 expected_sname
=sname
,
3465 expected_account_name
=expected_account_name
,
3466 expected_upn_name
=expected_upn_name
,
3467 expected_sid
=expected_sid
,
3468 expected_groups
=expected_groups
,
3469 unexpected_groups
=unexpected_groups
,
3471 expected_flags
=expected_flags
,
3472 unexpected_flags
=unexpected_flags
,
3473 expected_supported_etypes
=expected_etypes
,
3476 kdc_options
=kdc_options
,
3478 ticket_decryption_key
=ticket_decryption_key
,
3479 pac_request
=pac_request
,
3480 pac_options
=pac_options
,
3481 expect_pac
=expect_pac
,
3482 expect_pac_attrs
=expect_pac_attrs
,
3483 expect_pac_attrs_pac_request
=expect_pac_attrs_pac_request
,
3484 expect_requester_sid
=expect_requester_sid
,
3485 rc4_support
=rc4_support
,
3486 expect_edata
=expect_edata
,
3487 expect_client_claims
=expect_client_claims
,
3488 expect_device_claims
=expect_device_claims
,
3489 expected_client_claims
=expected_client_claims
,
3490 unexpected_client_claims
=unexpected_client_claims
,
3491 expected_device_claims
=expected_device_claims
,
3492 unexpected_device_claims
=unexpected_device_claims
,
3494 self
.check_pre_authentication(rep
)
3496 etype_info2
= kdc_exchange_dict
['preauth_etype_info2']
3498 preauth_key
= self
.PasswordKey_from_etype_info2(creds
,
3502 ts_enc_padata
= self
.get_enc_timestamp_pa_data_from_key(preauth_key
)
3504 padata
= [ts_enc_padata
]
3506 expected_realm
= realm
.upper()
3508 rep
, kdc_exchange_dict
= self
._test
_as
_exchange
(
3514 expected_error_mode
=0,
3515 expected_crealm
=expected_realm
,
3516 expected_cname
=expected_cname
,
3517 expected_srealm
=expected_realm
,
3518 expected_sname
=expected_sname
,
3519 expected_account_name
=expected_account_name
,
3520 expected_upn_name
=expected_upn_name
,
3521 expected_sid
=expected_sid
,
3522 expected_groups
=expected_groups
,
3523 unexpected_groups
=unexpected_groups
,
3525 expected_flags
=expected_flags
,
3526 unexpected_flags
=unexpected_flags
,
3527 expected_supported_etypes
=expected_etypes
,
3530 kdc_options
=kdc_options
,
3531 preauth_key
=preauth_key
,
3532 ticket_decryption_key
=ticket_decryption_key
,
3533 pac_request
=pac_request
,
3534 pac_options
=pac_options
,
3535 expect_pac
=expect_pac
,
3536 expect_pac_attrs
=expect_pac_attrs
,
3537 expect_pac_attrs_pac_request
=expect_pac_attrs_pac_request
,
3538 expect_requester_sid
=expect_requester_sid
,
3539 rc4_support
=rc4_support
,
3540 expect_edata
=expect_edata
,
3541 expect_client_claims
=expect_client_claims
,
3542 expect_device_claims
=expect_device_claims
,
3543 expected_client_claims
=expected_client_claims
,
3544 unexpected_client_claims
=unexpected_client_claims
,
3545 expected_device_claims
=expected_device_claims
,
3546 unexpected_device_claims
=unexpected_device_claims
,
3548 self
.check_as_reply(rep
)
3550 ticket_creds
= kdc_exchange_dict
['rep_ticket_creds']
3552 self
.tkt_cache
[cache_key
] = ticket_creds
3556 def _make_tgs_request(self
, client_creds
, service_creds
, tgt
,
3557 client_account
=None,
3558 client_name_type
=NT_PRINCIPAL
,
3560 pac_request
=None, expect_pac
=True,
3562 expected_cname
=None,
3563 expected_account_name
=None,
3564 expected_upn_name
=None,
3566 if client_account
is None:
3567 client_account
= client_creds
.get_username()
3568 cname
= self
.PrincipalName_create(name_type
=client_name_type
,
3569 names
=client_account
.split('/'))
3571 service_account
= service_creds
.get_username()
3572 sname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
3573 names
=[service_account
])
3575 realm
= service_creds
.get_realm()
3577 expected_crealm
= realm
3578 if expected_cname
is None:
3579 expected_cname
= cname
3580 expected_srealm
= realm
3581 expected_sname
= sname
3583 expected_supported_etypes
= service_creds
.tgs_supported_enctypes
3585 etypes
= (AES256_CTS_HMAC_SHA1_96
, ARCFOUR_HMAC_MD5
)
3587 if kdc_options
is None:
3588 kdc_options
= 'canonicalize'
3589 kdc_options
= str(krb5_asn1
.KDCOptions(kdc_options
))
3591 target_decryption_key
= self
.TicketDecryptionKey_from_creds(
3594 authenticator_subkey
= self
.RandomKey(kcrypto
.Enctype
.AES256
)
3597 expected_error_mode
= expect_error
3598 if expected_error_mode
is True:
3599 expected_error_mode
= KDC_ERR_TGT_REVOKED
3600 check_error_fn
= self
.generic_check_kdc_error
3603 expected_error_mode
= 0
3604 check_error_fn
= None
3605 check_rep_fn
= self
.generic_check_kdc_rep
3607 kdc_exchange_dict
= self
.tgs_exchange_dict(
3608 expected_crealm
=expected_crealm
,
3609 expected_cname
=expected_cname
,
3610 expected_srealm
=expected_srealm
,
3611 expected_sname
=expected_sname
,
3612 expected_account_name
=expected_account_name
,
3613 expected_upn_name
=expected_upn_name
,
3614 expected_sid
=expected_sid
,
3615 expected_supported_etypes
=expected_supported_etypes
,
3616 ticket_decryption_key
=target_decryption_key
,
3617 check_error_fn
=check_error_fn
,
3618 check_rep_fn
=check_rep_fn
,
3619 check_kdc_private_fn
=self
.generic_check_kdc_private
,
3620 expected_error_mode
=expected_error_mode
,
3622 authenticator_subkey
=authenticator_subkey
,
3623 kdc_options
=kdc_options
,
3624 pac_request
=pac_request
,
3625 expect_pac
=expect_pac
,
3628 rep
= self
._generic
_kdc
_exchange
(kdc_exchange_dict
,
3634 self
.check_error_rep(rep
, expected_error_mode
)
3638 self
.check_reply(rep
, KRB_TGS_REP
)
3640 return kdc_exchange_dict
['rep_ticket_creds']
3642 # Named tuple to contain values of interest when the PAC is decoded.
3643 PacData
= namedtuple(
3645 "account_name account_sid logon_name upn domain_name")
3647 def get_pac_data(self
, authorization_data
):
3648 """Decode the PAC element contained in the authorization-data element
3656 # The PAC data will be wrapped in an AD_IF_RELEVANT element
3657 ad_if_relevant_elements
= (
3658 x
for x
in authorization_data
if x
['ad-type'] == AD_IF_RELEVANT
)
3659 for dt
in ad_if_relevant_elements
:
3660 buf
= self
.der_decode(
3661 dt
['ad-data'], asn1Spec
=krb5_asn1
.AD_IF_RELEVANT())
3662 # The PAC data is further wrapped in a AD_WIN2K_PAC element
3663 for ad
in (x
for x
in buf
if x
['ad-type'] == AD_WIN2K_PAC
):
3664 pb
= ndr_unpack(krb5pac
.PAC_DATA
, ad
['ad-data'])
3665 for pac
in pb
.buffers
:
3666 if pac
.type == krb5pac
.PAC_TYPE_LOGON_INFO
:
3668 pac
.info
.info
.info3
.base
.account_name
)
3670 str(pac
.info
.info
.info3
.base
.domain_sid
)
3671 + "-" + str(pac
.info
.info
.info3
.base
.rid
))
3672 elif pac
.type == krb5pac
.PAC_TYPE_LOGON_NAME
:
3673 logon_name
= pac
.info
.account_name
3674 elif pac
.type == krb5pac
.PAC_TYPE_UPN_DNS_INFO
:
3675 upn
= pac
.info
.upn_name
3676 domain_name
= pac
.info
.dns_domain_name
3678 return self
.PacData(
3685 def decode_service_ticket(self
, creds
, ticket
):
3686 """Decrypt and decode a service ticket
3689 enc_part
= ticket
['enc-part']
3691 key
= self
.TicketDecryptionKey_from_creds(creds
,
3694 if key
.kvno
is not None:
3695 self
.assertElementKVNO(enc_part
, 'kvno', key
.kvno
)
3697 enc_part
= key
.decrypt(KU_TICKET
, enc_part
['cipher'])
3698 enc_ticket_part
= self
.der_decode(
3699 enc_part
, asn1Spec
=krb5_asn1
.EncTicketPart())
3700 return enc_ticket_part
3702 def modify_ticket_flag(self
, enc_part
, flag
, value
):
3703 self
.assertIsInstance(value
, bool)
3705 flag
= krb5_asn1
.TicketFlags(flag
)
3706 pos
= len(tuple(flag
)) - 1
3708 flags
= enc_part
['flags']
3709 self
.assertLessEqual(pos
, len(flags
))
3711 new_flags
= flags
[:pos
] + str(int(value
)) + flags
[pos
+ 1:]
3712 enc_part
['flags'] = new_flags
3716 def get_objectSid(self
, samdb
, dn
):
3717 """ Get the objectSID for a DN
3718 Note: performs an Ldb query.
3720 res
= samdb
.search(dn
, scope
=SCOPE_BASE
, attrs
=["objectSID"])
3721 self
.assertTrue(len(res
) == 1, "did not get objectSid for %s" % dn
)
3722 sid
= samdb
.schema_format_value("objectSID", res
[0]["objectSID"][0])
3723 return sid
.decode('utf8')
3725 def add_attribute(self
, samdb
, dn_str
, name
, value
):
3726 if isinstance(value
, list):
3730 flag
= ldb
.FLAG_MOD_ADD
3732 dn
= ldb
.Dn(samdb
, dn_str
)
3733 msg
= ldb
.Message(dn
)
3734 msg
[name
] = ldb
.MessageElement(values
, flag
, name
)
3737 def modify_attribute(self
, samdb
, dn_str
, name
, value
):
3738 if isinstance(value
, list):
3742 flag
= ldb
.FLAG_MOD_REPLACE
3744 dn
= ldb
.Dn(samdb
, dn_str
)
3745 msg
= ldb
.Message(dn
)
3746 msg
[name
] = ldb
.MessageElement(values
, flag
, name
)
3749 def remove_attribute(self
, samdb
, dn_str
, name
):
3750 flag
= ldb
.FLAG_MOD_DELETE
3752 dn
= ldb
.Dn(samdb
, dn_str
)
3753 msg
= ldb
.Message(dn
)
3754 msg
[name
] = ldb
.MessageElement([], flag
, name
)
3757 def create_ccache(self
, cname
, ticket
, enc_part
):
3758 """ Lay out a version 4 on-disk credentials cache, to be read using the
3762 field
= krb5ccache
.DELTATIME_TAG()
3763 field
.kdc_sec_offset
= 0
3764 field
.kdc_usec_offset
= 0
3766 v4tag
= krb5ccache
.V4TAG()
3770 v4tags
= krb5ccache
.V4TAGS()
3772 v4tags
.further_tags
= b
''
3774 optional_header
= krb5ccache
.V4HEADER()
3775 optional_header
.v4tags
= v4tags
3777 cname_string
= cname
['name-string']
3779 cprincipal
= krb5ccache
.PRINCIPAL()
3780 cprincipal
.name_type
= cname
['name-type']
3781 cprincipal
.component_count
= len(cname_string
)
3782 cprincipal
.realm
= ticket
['realm']
3783 cprincipal
.components
= cname_string
3785 sname
= ticket
['sname']
3786 sname_string
= sname
['name-string']
3788 sprincipal
= krb5ccache
.PRINCIPAL()
3789 sprincipal
.name_type
= sname
['name-type']
3790 sprincipal
.component_count
= len(sname_string
)
3791 sprincipal
.realm
= ticket
['realm']
3792 sprincipal
.components
= sname_string
3794 key
= self
.EncryptionKey_import(enc_part
['key'])
3796 key_data
= key
.export_obj()
3797 keyblock
= krb5ccache
.KEYBLOCK()
3798 keyblock
.enctype
= key_data
['keytype']
3799 keyblock
.data
= key_data
['keyvalue']
3801 addresses
= krb5ccache
.ADDRESSES()
3805 authdata
= krb5ccache
.AUTHDATA()
3809 # Re-encode the ticket, since it was decoded by another layer.
3810 ticket_data
= self
.der_encode(ticket
, asn1Spec
=krb5_asn1
.Ticket())
3812 authtime
= enc_part
['authtime']
3813 starttime
= enc_part
.get('starttime', authtime
)
3814 endtime
= enc_part
['endtime']
3816 cred
= krb5ccache
.CREDENTIAL()
3817 cred
.client
= cprincipal
3818 cred
.server
= sprincipal
3819 cred
.keyblock
= keyblock
3820 cred
.authtime
= self
.get_EpochFromKerberosTime(authtime
)
3821 cred
.starttime
= self
.get_EpochFromKerberosTime(starttime
)
3822 cred
.endtime
= self
.get_EpochFromKerberosTime(endtime
)
3824 # Account for clock skew of up to five minutes.
3825 self
.assertLess(cred
.authtime
- 5 * 60,
3826 datetime
.now(timezone
.utc
).timestamp(),
3827 "Ticket not yet valid - clocks may be out of sync.")
3828 self
.assertLess(cred
.starttime
- 5 * 60,
3829 datetime
.now(timezone
.utc
).timestamp(),
3830 "Ticket not yet valid - clocks may be out of sync.")
3831 self
.assertGreater(cred
.endtime
- 60 * 60,
3832 datetime
.now(timezone
.utc
).timestamp(),
3833 "Ticket already expired/about to expire - "
3834 "clocks may be out of sync.")
3836 cred
.renew_till
= cred
.endtime
3838 cred
.ticket_flags
= int(enc_part
['flags'], 2)
3839 cred
.addresses
= addresses
3840 cred
.authdata
= authdata
3841 cred
.ticket
= ticket_data
3842 cred
.second_ticket
= b
''
3844 ccache
= krb5ccache
.CCACHE()
3847 ccache
.optional_header
= optional_header
3848 ccache
.principal
= cprincipal
3851 # Serialise the credentials cache structure.
3852 result
= ndr_pack(ccache
)
3854 # Create a temporary file and write the credentials.
3855 cachefile
= tempfile
.NamedTemporaryFile(dir=self
.tempdir
, delete
=False)
3856 cachefile
.write(result
)
3861 def create_ccache_with_ticket(self
, user_credentials
, ticket
, pac
=True):
3862 # Place the ticket into a newly created credentials cache file.
3864 user_name
= user_credentials
.get_username()
3865 realm
= user_credentials
.get_realm()
3867 cname
= self
.PrincipalName_create(name_type
=NT_PRINCIPAL
,
3871 ticket
= self
.modified_ticket(ticket
, exclude_pac
=True)
3873 # Write the ticket into a credentials cache file that can be ingested
3874 # by the main credentials code.
3875 cachefile
= self
.create_ccache(cname
, ticket
.ticket
,
3876 ticket
.encpart_private
)
3878 # Create a credentials object to reference the credentials cache.
3879 creds
= Credentials()
3880 creds
.set_kerberos_state(MUST_USE_KERBEROS
)
3881 creds
.set_username(user_name
, SPECIFIED
)
3882 creds
.set_realm(realm
)
3883 creds
.set_named_ccache(cachefile
.name
, SPECIFIED
, self
.get_lp())
3885 # Return the credentials along with the cache file.
3886 return (creds
, cachefile
)
3888 def create_ccache_with_user(self
, user_credentials
, mach_credentials
,
3889 service
="host", target_name
=None, pac
=True):
3890 # Obtain a service ticket authorising the user and place it into a
3891 # newly created credentials cache file.
3893 tgt
= self
.get_tgt(user_credentials
)
3895 ticket
= self
.get_service_ticket(tgt
, mach_credentials
,
3897 target_name
=target_name
)
3899 return self
.create_ccache_with_ticket(user_credentials
, ticket
,
3902 # Test credentials by connecting to the DC through LDAP.
3903 def _connect(self
, creds
, simple_bind
, expect_error
=None):
3904 samdb
= self
.get_samdb()
3908 url
= f
'ldaps://{samdb.host_dns_name()}'
3909 creds
.set_bind_dn(str(dn
))
3911 url
= f
'ldap://{samdb.host_dns_name()}'
3912 creds
.set_bind_dn(None)
3914 ldap
= SamDB(url
=url
,
3917 except ldb
.LdbError
as err
:
3918 self
.assertIsNotNone(expect_error
, 'got unexpected error')
3919 num
, estr
= err
.args
3920 if num
!= ldb
.ERR_INVALID_CREDENTIALS
:
3923 self
.assertIn(expect_error
, estr
)
3927 self
.assertIsNone(expect_error
, 'expected to get an error')
3929 res
= ldap
.search('',
3930 scope
=ldb
.SCOPE_BASE
,
3931 attrs
=['tokenGroups'])
3932 self
.assertEqual(1, len(res
))
3934 sid
= creds
.get_sid()
3936 token_groups
= res
[0].get('tokenGroups', idx
=0)
3937 token_sid
= ndr_unpack(security
.dom_sid
, token_groups
)
3939 self
.assertEqual(sid
, str(token_sid
))
3941 # Test the two SAMR password change methods implemented in Samba. If the
3942 # user is protected, we should get an ACCOUNT_RESTRICTION error indicating
3943 # that the password change is not allowed.
3944 def _test_samr_change_password(self
, creds
, expect_error
,
3945 connect_error
=None):
3946 samdb
= self
.get_samdb()
3947 server_name
= samdb
.host_dns_name()
3949 conn
= samr
.samr(f
'ncacn_np:{server_name}[seal,smb2]',
3952 except NTSTATUSError
as err
:
3953 self
.assertIsNotNone(connect_error
,
3954 'connection unexpectedly failed')
3955 self
.assertIsNone(expect_error
, 'don’t specify both errors')
3958 self
.assertEqual(num
, connect_error
)
3962 self
.assertIsNone(connect_error
, 'expected connection to fail')
3965 nt_hash
= creds
.get_nt_hash()
3967 # Generate a new UTF-16 password.
3968 new_password_str
= generate_random_password(32, 32)
3969 new_password
= new_password_str
.encode('utf-16le')
3971 # Generate the MD4 hash of the password.
3972 new_password_md4
= md4_hash_blob(new_password
)
3974 # Prefix the password with padding so it is 512 bytes long.
3975 new_password_len
= len(new_password
)
3976 remaining_len
= 512 - new_password_len
3977 new_password
= bytes(remaining_len
) + new_password
3979 # Append the 32-bit length of the password.
3980 new_password
+= int.to_bytes(new_password_len
,
3984 # Create a key from the MD4 hash of the new password.
3985 key
= new_password_md4
[:14]
3987 # Encrypt the old NT hash with DES to obtain the verifier.
3988 verifier
= des_crypt_blob_16(nt_hash
, key
)
3990 server
= lsa
.String()
3991 server
.string
= server_name
3993 account
= lsa
.String()
3994 account
.string
= creds
.get_username()
3996 nt_verifier
= samr
.Password()
3997 nt_verifier
.hash = list(verifier
)
3999 nt_password
= samr
.CryptPassword()
4000 nt_password
.data
= list(arcfour_encrypt(nt_hash
, new_password
))
4002 if not self
.expect_nt_hash
:
4003 expect_error
= ntstatus
.NT_STATUS_NTLM_BLOCKED
4006 conn
.ChangePasswordUser2(server
=server
,
4008 nt_password
=nt_password
,
4009 nt_verifier
=nt_verifier
,
4013 except NTSTATUSError
as err
:
4015 self
.assertIsNotNone(expect_error
,
4016 f
'unexpectedly failed with {num:08X}')
4017 self
.assertEqual(num
, expect_error
)
4019 self
.assertIsNone(expect_error
, 'expected to fail')
4021 creds
.set_password(new_password_str
)
4024 nt_hash
= creds
.get_nt_hash()
4026 # Generate a new UTF-16 password.
4027 new_password
= generate_random_password(32, 32)
4028 new_password
= new_password
.encode('utf-16le')
4030 # Generate the MD4 hash of the password.
4031 new_password_md4
= md4_hash_blob(new_password
)
4033 # Prefix the password with padding so it is 512 bytes long.
4034 new_password_len
= len(new_password
)
4035 remaining_len
= 512 - new_password_len
4036 new_password
= bytes(remaining_len
) + new_password
4038 # Append the 32-bit length of the password.
4039 new_password
+= int.to_bytes(new_password_len
,
4043 # Create a key from the MD4 hash of the new password.
4044 key
= new_password_md4
[:14]
4046 # Encrypt the old NT hash with DES to obtain the verifier.
4047 verifier
= des_crypt_blob_16(nt_hash
, key
)
4049 nt_verifier
.hash = list(verifier
)
4051 nt_password
.data
= list(arcfour_encrypt(nt_hash
, new_password
))
4054 conn
.ChangePasswordUser3(server
=server
,
4056 nt_password
=nt_password
,
4057 nt_verifier
=nt_verifier
,
4062 except NTSTATUSError
as err
:
4063 self
.assertIsNotNone(expect_error
, 'unexpectedly failed')
4066 self
.assertEqual(num
, expect_error
)
4068 self
.assertIsNone(expect_error
, 'expected to fail')
4070 # Test SamLogon. Authentication should succeed for non-protected accounts,
4071 # and fail for protected accounts.
4072 def _test_samlogon(self
, creds
, logon_type
, expect_error
=None,
4073 validation_level
=netlogon
.NetlogonValidationSamInfo2
,
4074 domain_joined_mach_creds
=None):
4075 samdb
= self
.get_samdb()
4077 if domain_joined_mach_creds
is None:
4078 domain_joined_mach_creds
= self
.get_cached_creds(
4079 account_type
=self
.AccountType
.COMPUTER
,
4080 opts
={'secure_channel_type': misc
.SEC_CHAN_WKSTA
})
4082 dc_server
= samdb
.host_dns_name()
4083 username
, domain
= creds
.get_ntlm_username_domain()
4084 workstation
= domain_joined_mach_creds
.get_username()
4086 # Calling this initializes netlogon_creds on mach_creds, as is required
4087 # before calling mach_creds.encrypt_netr_PasswordInfo().
4088 conn
= netlogon
.netlogon(f
'ncacn_ip_tcp:{dc_server}[schannel,seal]',
4090 domain_joined_mach_creds
)
4091 (auth_type
, auth_level
) = conn
.auth_info()
4093 if logon_type
== netlogon
.NetlogonInteractiveInformation
:
4094 logon
= netlogon
.netr_PasswordInfo()
4096 lm_pass
= samr
.Password()
4097 lm_pass
.hash = [0] * 16
4099 nt_pass
= samr
.Password()
4100 nt_pass
.hash = list(creds
.get_nt_hash())
4102 logon
.lmpassword
= lm_pass
4103 logon
.ntpassword
= nt_pass
4105 domain_joined_mach_creds
.encrypt_netr_PasswordInfo(info
=logon
,
4106 auth_type
=auth_type
,
4107 auth_level
=auth_level
)
4109 elif logon_type
== netlogon
.NetlogonNetworkInformation
:
4110 computername
= ntlmssp
.AV_PAIR()
4111 computername
.AvId
= ntlmssp
.MsvAvNbComputerName
4112 computername
.Value
= workstation
4114 domainname
= ntlmssp
.AV_PAIR()
4115 domainname
.AvId
= ntlmssp
.MsvAvNbDomainName
4116 domainname
.Value
= domain
4118 eol
= ntlmssp
.AV_PAIR()
4119 eol
.AvId
= ntlmssp
.MsvAvEOL
4121 target_info
= ntlmssp
.AV_PAIR_LIST()
4122 target_info
.count
= 3
4123 target_info
.pair
= [domainname
, computername
, eol
]
4125 target_info_blob
= ndr_pack(target_info
)
4127 challenge
= b
'abcdefgh'
4128 response
= creds
.get_ntlm_response(flags
=0,
4129 challenge
=challenge
,
4130 target_info
=target_info_blob
)
4132 logon
= netlogon
.netr_NetworkInfo()
4134 logon
.challenge
= list(challenge
)
4135 logon
.nt
= netlogon
.netr_ChallengeResponse()
4136 logon
.nt
.length
= len(response
['nt_response'])
4137 logon
.nt
.data
= list(response
['nt_response'])
4140 self
.fail(f
'unknown logon type {logon_type}')
4142 identity_info
= netlogon
.netr_IdentityInfo()
4143 identity_info
.domain_name
.string
= domain
4144 identity_info
.account_name
.string
= username
4145 identity_info
.parameter_control
= (
4146 netlogon
.MSV1_0_ALLOW_SERVER_TRUST_ACCOUNT
) |
(
4147 netlogon
.MSV1_0_ALLOW_WORKSTATION_TRUST_ACCOUNT
)
4148 identity_info
.workstation
.string
= workstation
4150 logon
.identity_info
= identity_info
4156 if not expect_error
and not self
.expect_nt_hash
:
4157 expect_error
= ntstatus
.NT_STATUS_NTLM_BLOCKED
4160 (validation
, authoritative
, flags
) = (
4161 conn
.netr_LogonSamLogonEx(dc_server
,
4162 domain_joined_mach_creds
.get_workstation(),
4167 except NTSTATUSError
as err
:
4168 status
, _
= err
.args
4169 self
.assertIsNotNone(expect_error
,
4170 f
'unexpectedly failed with {status:08X}')
4171 self
.assertEqual(expect_error
, status
, 'got wrong status code')
4173 self
.assertIsNone(expect_error
, 'expected error')
4175 self
.assertEqual(1, authoritative
)
4176 self
.assertEqual(0, flags
)
4180 def check_ticket_times(self
,
4183 expected_renew_life
=None,
4185 ticket
= ticket_creds
.ticket_private
4187 authtime
= ticket
['authtime']
4188 starttime
= ticket
.get('starttime', authtime
)
4189 endtime
= ticket
['endtime']
4190 renew_till
= ticket
.get('renew-till', None)
4192 starttime
= self
.get_EpochFromKerberosTime(starttime
)
4194 if expected_life
is not None:
4195 actual_end
= self
.get_EpochFromKerberosTime(
4196 endtime
.decode('ascii'))
4197 actual_lifetime
= actual_end
- starttime
4199 self
.assertAlmostEqual(expected_life
, actual_lifetime
, delta
=delta
)
4201 if renew_till
is None:
4202 self
.assertIsNone(expected_renew_life
)
4204 if expected_renew_life
is not None:
4205 actual_renew_till
= self
.get_EpochFromKerberosTime(
4206 renew_till
.decode('ascii'))
4207 actual_renew_life
= actual_renew_till
- starttime
4209 self
.assertAlmostEqual(expected_renew_life
, actual_renew_life
, delta
=delta
)