ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / krb5 / kdc_base_test.py
blob093b6712e8eedc910db4c593f14dfe827052bb0d
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import os
20 import sys
22 sys.path.insert(0, "bin/python")
23 os.environ["PYTHONUNBUFFERED"] = "1"
25 import binascii
26 import collections
27 import numbers
28 import secrets
29 import tempfile
30 from collections import namedtuple
31 from datetime import datetime, timezone
32 from enum import Enum
33 from functools import partial
34 from typing import Dict, Optional
36 import ldb
37 from ldb import SCOPE_BASE
39 from samba import (
40 NTSTATUSError,
41 arcfour_encrypt,
42 common,
43 generate_random_password,
44 net,
45 ntstatus,
47 from samba.auth import system_session
48 from samba.credentials import (
49 DONT_USE_KERBEROS,
50 MUST_USE_KERBEROS,
51 SPECIFIED,
52 Credentials,
54 from samba.crypto import des_crypt_blob_16, md4_hash_blob
55 from samba.dcerpc import (
56 claims,
57 dcerpc,
58 drsblobs,
59 drsuapi,
60 krb5ccache,
61 krb5pac,
62 lsa,
63 misc,
64 netlogon,
65 ntlmssp,
66 samr,
67 security,
69 from samba.dcerpc.misc import SEC_CHAN_BDC, SEC_CHAN_NULL, SEC_CHAN_WKSTA
70 from samba.domain.models import AuthenticationPolicy, AuthenticationSilo
71 from samba.drs_utils import drs_Replicate, drsuapi_connect
72 from samba.dsdb import (
73 DS_DOMAIN_FUNCTION_2000,
74 DS_DOMAIN_FUNCTION_2008,
75 DS_GUID_COMPUTERS_CONTAINER,
76 DS_GUID_DOMAIN_CONTROLLERS_CONTAINER,
77 DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER,
78 DS_GUID_USERS_CONTAINER,
79 DSDB_SYNTAX_BINARY_DN,
80 GTYPE_SECURITY_DOMAIN_LOCAL_GROUP,
81 GTYPE_SECURITY_GLOBAL_GROUP,
82 GTYPE_SECURITY_UNIVERSAL_GROUP,
83 UF_ACCOUNTDISABLE,
84 UF_NO_AUTH_DATA_REQUIRED,
85 UF_NORMAL_ACCOUNT,
86 UF_NOT_DELEGATED,
87 UF_PARTIAL_SECRETS_ACCOUNT,
88 UF_SERVER_TRUST_ACCOUNT,
89 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION,
90 UF_WORKSTATION_TRUST_ACCOUNT,
91 UF_SMARTCARD_REQUIRED
93 from samba.join import DCJoinContext
94 from samba.ndr import ndr_pack, ndr_unpack
95 from samba.param import LoadParm
96 from samba.samdb import SamDB, dsdb_Dn
98 rc4_bit = security.KERB_ENCTYPE_RC4_HMAC_MD5
99 aes256_sk_bit = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
101 import samba.tests.krb5.kcrypto as kcrypto
102 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
103 from samba.tests import TestCaseInTempDir, delete_force
104 from samba.tests.krb5.raw_testcase import (
105 KerberosCredentials,
106 KerberosTicketCreds,
107 RawKerberosTest,
109 from samba.tests.krb5.rfc4120_constants import (
110 AD_IF_RELEVANT,
111 AD_WIN2K_PAC,
112 AES256_CTS_HMAC_SHA1_96,
113 ARCFOUR_HMAC_MD5,
114 KDC_ERR_PREAUTH_REQUIRED,
115 KDC_ERR_TGT_REVOKED,
116 KRB_AS_REP,
117 KRB_ERROR,
118 KRB_TGS_REP,
119 KU_AS_REP_ENC_PART,
120 KU_ENC_CHALLENGE_CLIENT,
121 KU_PA_ENC_TIMESTAMP,
122 KU_TICKET,
123 NT_PRINCIPAL,
124 NT_SRV_INST,
125 PADATA_ENC_TIMESTAMP,
126 PADATA_ENCRYPTED_CHALLENGE,
127 PADATA_ETYPE_INFO2,
130 global_asn1_print = False
131 global_hexdump = False
134 class GroupType(Enum):
135 GLOBAL = GTYPE_SECURITY_GLOBAL_GROUP
136 DOMAIN_LOCAL = GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
137 UNIVERSAL = GTYPE_SECURITY_UNIVERSAL_GROUP
140 # This simple class encapsulates the DN and SID of a Principal.
141 class Principal:
142 __slots__ = ['dn', 'sid']
144 def __init__(self, dn, sid):
145 if dn is not None and not isinstance(dn, ldb.Dn):
146 raise AssertionError(f'expected {dn} to be an ldb.Dn')
148 self.dn = dn
149 self.sid = sid
152 class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
153 """ Base class for KDC tests.
156 class AccountType(Enum):
157 USER = object()
158 COMPUTER = object()
159 SERVER = object()
160 RODC = object()
161 MANAGED_SERVICE = object()
162 GROUP_MANAGED_SERVICE = object()
164 @classmethod
165 def setUpClass(cls):
166 super().setUpClass()
167 cls._lp = None
169 cls._ldb = None
170 cls._rodc_ldb = None
172 cls._drsuapi_connection = None
174 cls._functional_level = None
176 # An identifier to ensure created accounts have unique names. Windows
177 # caches accounts based on usernames, so account names being different
178 # across test runs avoids previous test runs affecting the results.
179 cls.account_base = f'{secrets.token_hex(4)}_'
180 cls.account_id = 0
182 # A list containing DNs of accounts created as part of testing.
183 cls.accounts = []
185 cls.account_cache = {}
186 cls.policy_cache = {}
187 cls.tkt_cache = {}
189 cls._rodc_ctx = None
191 cls.ldb_cleanups = []
193 cls._claim_types_dn = None
194 cls._authn_policy_config_dn = None
195 cls._authn_policies_dn = None
196 cls._authn_silos_dn = None
198 def get_claim_types_dn(self):
199 samdb = self.get_samdb()
201 if self._claim_types_dn is None:
202 claim_config_dn = samdb.get_config_basedn()
204 claim_config_dn.add_child('CN=Claims Configuration,CN=Services')
205 details = {
206 'dn': claim_config_dn,
207 'objectClass': 'container',
209 try:
210 samdb.add(details)
211 except ldb.LdbError as err:
212 num, _ = err.args
213 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
214 raise
215 else:
216 self.accounts.append(str(claim_config_dn))
218 claim_types_dn = claim_config_dn
219 claim_types_dn.add_child('CN=Claim Types')
220 details = {
221 'dn': claim_types_dn,
222 'objectClass': 'msDS-ClaimTypes',
224 try:
225 samdb.add(details)
226 except ldb.LdbError as err:
227 num, _ = err.args
228 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
229 raise
230 else:
231 self.accounts.append(str(claim_types_dn))
233 type(self)._claim_types_dn = claim_types_dn
235 # Return a copy of the DN.
236 return ldb.Dn(samdb, str(self._claim_types_dn))
238 def get_authn_policy_config_dn(self):
239 samdb = self.get_samdb()
241 if self._authn_policy_config_dn is None:
242 authn_policy_config_dn = samdb.get_config_basedn()
244 authn_policy_config_dn.add_child(
245 'CN=AuthN Policy Configuration,CN=Services')
246 details = {
247 'dn': authn_policy_config_dn,
248 'objectClass': 'container',
249 'description': ('Contains configuration for authentication '
250 'policy'),
252 try:
253 samdb.add(details)
254 except ldb.LdbError as err:
255 num, _ = err.args
256 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
257 raise
258 else:
259 self.accounts.append(str(authn_policy_config_dn))
261 type(self)._authn_policy_config_dn = authn_policy_config_dn
263 # Return a copy of the DN.
264 return ldb.Dn(samdb, str(self._authn_policy_config_dn))
266 def get_authn_policies_dn(self):
267 samdb = self.get_samdb()
269 if self._authn_policies_dn is None:
270 authn_policies_dn = self.get_authn_policy_config_dn()
271 authn_policies_dn.add_child('CN=AuthN Policies')
272 details = {
273 'dn': authn_policies_dn,
274 'objectClass': 'msDS-AuthNPolicies',
275 'description': 'Contains authentication policy objects',
277 try:
278 samdb.add(details)
279 except ldb.LdbError as err:
280 num, _ = err.args
281 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
282 raise
283 else:
284 self.accounts.append(str(authn_policies_dn))
286 type(self)._authn_policies_dn = authn_policies_dn
288 # Return a copy of the DN.
289 return ldb.Dn(samdb, str(self._authn_policies_dn))
291 def get_authn_silos_dn(self):
292 samdb = self.get_samdb()
294 if self._authn_silos_dn is None:
295 authn_silos_dn = self.get_authn_policy_config_dn()
296 authn_silos_dn.add_child('CN=AuthN Silos')
297 details = {
298 'dn': authn_silos_dn,
299 'objectClass': 'msDS-AuthNPolicySilos',
300 'description': 'Contains authentication policy silo objects',
302 try:
303 samdb.add(details)
304 except ldb.LdbError as err:
305 num, _ = err.args
306 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
307 raise
308 else:
309 self.accounts.append(str(authn_silos_dn))
311 type(self)._authn_silos_dn = authn_silos_dn
313 # Return a copy of the DN.
314 return ldb.Dn(samdb, str(self._authn_silos_dn))
316 @staticmethod
317 def freeze(m):
318 return frozenset((k, v) for k, v in m.items())
320 def tearDown(self):
321 # Run any cleanups that may modify accounts prior to deleting those
322 # accounts.
323 self.doCleanups()
325 # Clean up any accounts created for single tests.
326 if self._ldb is not None:
327 for dn in reversed(self.test_accounts):
328 delete_force(self._ldb, dn)
330 super().tearDown()
332 @classmethod
333 def tearDownClass(cls):
334 # Clean up any accounts created by create_account. This is
335 # done in tearDownClass() rather than tearDown(), so that
336 # accounts need only be created once for permutation tests.
337 if cls._ldb is not None:
338 for cleanup in reversed(cls.ldb_cleanups):
339 try:
340 cls._ldb.modify(cleanup)
341 except ldb.LdbError:
342 pass
344 for dn in reversed(cls.accounts):
345 delete_force(cls._ldb, dn)
347 if cls._rodc_ctx is not None:
348 cls._rodc_ctx.cleanup_old_join(force=True)
350 super().tearDownClass()
352 def setUp(self):
353 super().setUp()
354 self.do_asn1_print = global_asn1_print
355 self.do_hexdump = global_hexdump
357 # A list containing DNs of accounts that should be removed when the
358 # current test finishes.
359 self.test_accounts = []
361 def get_lp(self) -> LoadParm:
362 if self._lp is None:
363 type(self)._lp = self.get_loadparm()
365 return self._lp
367 def get_samdb(self) -> SamDB:
368 if self._ldb is None:
369 creds = self.get_admin_creds()
370 lp = self.get_lp()
372 session = system_session()
373 type(self)._ldb = SamDB(url="ldap://%s" % self.dc_host,
374 session_info=session,
375 credentials=creds,
376 lp=lp)
378 return self._ldb
380 def get_rodc_samdb(self) -> SamDB:
381 if self._rodc_ldb is None:
382 creds = self.get_admin_creds()
383 lp = self.get_lp()
385 session = system_session()
386 type(self)._rodc_ldb = SamDB(url="ldap://%s" % self.host,
387 session_info=session,
388 credentials=creds,
389 lp=lp,
390 am_rodc=True)
392 return self._rodc_ldb
394 def get_drsuapi_connection(self):
395 if self._drsuapi_connection is None:
396 admin_creds = self.get_admin_creds()
397 samdb = self.get_samdb()
398 dns_hostname = samdb.host_dns_name()
399 type(self)._drsuapi_connection = drsuapi_connect(dns_hostname,
400 self.get_lp(),
401 admin_creds,
402 ip=self.dc_host)
404 return self._drsuapi_connection
406 def get_server_dn(self, samdb):
407 server = samdb.get_serverName()
409 res = samdb.search(base=server,
410 scope=ldb.SCOPE_BASE,
411 attrs=['serverReference'])
412 dn = ldb.Dn(samdb, res[0]['serverReference'][0].decode('utf8'))
414 return dn
416 def get_mock_rodc_ctx(self):
417 if self._rodc_ctx is None:
418 admin_creds = self.get_admin_creds()
419 lp = self.get_lp()
421 rodc_name = self.get_new_username()
422 site_name = 'Default-First-Site-Name'
424 rodc_ctx = DCJoinContext(server=self.dc_host,
425 creds=admin_creds,
426 lp=lp,
427 site=site_name,
428 netbios_name=rodc_name,
429 targetdir=None,
430 domain=None)
431 self.create_rodc(rodc_ctx)
433 type(self)._rodc_ctx = rodc_ctx
435 return self._rodc_ctx
437 def get_domain_functional_level(self, ldb=None):
438 if self._functional_level is None:
439 if ldb is None:
440 ldb = self.get_samdb()
442 res = ldb.search(base='',
443 scope=SCOPE_BASE,
444 attrs=['domainFunctionality'])
445 try:
446 functional_level = int(res[0]['domainFunctionality'][0])
447 except KeyError:
448 functional_level = DS_DOMAIN_FUNCTION_2000
450 type(self)._functional_level = functional_level
452 return self._functional_level
454 def get_default_enctypes(self, creds):
455 self.assertIsNotNone(creds, 'expected client creds to be passed in')
457 functional_level = self.get_domain_functional_level()
459 default_enctypes = []
461 if functional_level >= DS_DOMAIN_FUNCTION_2008:
462 # AES is only supported at functional level 2008 or higher
463 default_enctypes.append(kcrypto.Enctype.AES256)
464 default_enctypes.append(kcrypto.Enctype.AES128)
466 if self.expect_nt_hash or creds.get_workstation():
467 default_enctypes.append(kcrypto.Enctype.RC4)
469 return default_enctypes
471 def create_group(self, samdb, name, ou=None, gtype=None):
472 if ou is None:
473 ou = samdb.get_wellknown_dn(samdb.get_default_basedn(),
474 DS_GUID_USERS_CONTAINER)
476 dn = f'CN={name},{ou}'
478 # Remove the group if it exists; this will happen if a previous test
479 # run failed.
480 delete_force(samdb, dn)
482 # Save the group name so it can be deleted in tearDownClass.
483 self.accounts.append(dn)
485 details = {
486 'dn': dn,
487 'objectClass': 'group'
489 if gtype is not None:
490 details['groupType'] = common.normalise_int32(gtype)
491 samdb.add(details)
493 return dn
495 def get_dn_from_attribute(self, attribute):
496 return self.get_from_attribute(attribute).dn
498 def get_dn_from_class(self, attribute):
499 return self.get_from_class(attribute).dn
501 def get_schema_id_guid_from_attribute(self, attribute):
502 guid = self.get_from_attribute(attribute).get('schemaIDGUID', idx=0)
503 return misc.GUID(guid)
505 def get_from_attribute(self, attribute):
506 return self.get_from_schema(attribute, 'attributeSchema')
508 def get_from_class(self, attribute):
509 return self.get_from_schema(attribute, 'classSchema')
511 def get_from_schema(self, name, object_class):
512 samdb = self.get_samdb()
513 schema_dn = samdb.get_schema_basedn()
515 res = samdb.search(base=schema_dn,
516 scope=ldb.SCOPE_ONELEVEL,
517 attrs=['schemaIDGUID'],
518 expression=(f'(&(objectClass={object_class})'
519 f'(lDAPDisplayName={name}))'))
520 self.assertEqual(1, len(res),
521 f'could not locate {name} in {object_class}')
523 return res[0]
525 def create_authn_silo(self, *,
526 members=None,
527 user_policy=None,
528 computer_policy=None,
529 service_policy=None,
530 enforced=None):
531 samdb = self.get_samdb()
533 silo_id = self.get_new_username()
535 authn_silo_dn = self.get_authn_silos_dn()
536 authn_silo_dn.add_child(f'CN={silo_id}')
538 details = {
539 'dn': authn_silo_dn,
540 'objectClass': 'msDS-AuthNPolicySilo',
543 if enforced is True:
544 enforced = 'TRUE'
545 elif enforced is False:
546 enforced = 'FALSE'
548 if members is not None:
549 details['msDS-AuthNPolicySiloMembers'] = members
550 if user_policy is not None:
551 details['msDS-UserAuthNPolicy'] = str(user_policy.dn)
552 if computer_policy is not None:
553 details['msDS-ComputerAuthNPolicy'] = str(computer_policy.dn)
554 if service_policy is not None:
555 details['msDS-ServiceAuthNPolicy'] = str(service_policy.dn)
556 if enforced is not None:
557 details['msDS-AuthNPolicySiloEnforced'] = enforced
559 # Save the silo DN so it can be deleted in tearDownClass().
560 self.accounts.append(str(authn_silo_dn))
562 # Remove the silo if it exists; this will happen if a previous test run
563 # failed.
564 delete_force(samdb, authn_silo_dn)
566 samdb.add(details)
568 return AuthenticationSilo.get(samdb, dn=authn_silo_dn)
570 def create_authn_silo_claim_id(self):
571 claim_id = 'ad://ext/AuthenticationSilo'
573 for_classes = [
574 'msDS-GroupManagedServiceAccount',
575 'user',
576 'msDS-ManagedServiceAccount',
577 'computer',
580 self.create_claim(claim_id,
581 enabled=True,
582 single_valued=True,
583 value_space_restricted=False,
584 source_type='Constructed',
585 for_classes=for_classes,
586 value_type=claims.CLAIM_TYPE_STRING,
587 # It's OK if the claim type already exists.
588 force=False)
590 return claim_id
592 def create_authn_policy(self, *,
593 use_cache=True,
594 **kwargs):
596 if use_cache:
597 cache_key = self.freeze(kwargs)
599 authn_policy = self.policy_cache.get(cache_key)
600 if authn_policy is not None:
601 return authn_policy
603 authn_policy = self.create_authn_policy_opts(**kwargs)
604 if use_cache:
605 self.policy_cache[cache_key] = authn_policy
607 return authn_policy
609 def create_authn_policy_opts(self, *,
610 enforced=None,
611 strong_ntlm_policy=None,
612 user_allowed_from=None,
613 user_allowed_ntlm=None,
614 user_allowed_to=None,
615 user_tgt_lifetime=None,
616 computer_allowed_to=None,
617 computer_tgt_lifetime=None,
618 service_allowed_from=None,
619 service_allowed_ntlm=None,
620 service_allowed_to=None,
621 service_tgt_lifetime=None):
622 samdb = self.get_samdb()
624 policy_id = self.get_new_username()
626 policy_dn = self.get_authn_policies_dn()
627 policy_dn.add_child(f'CN={policy_id}')
629 details = {
630 'dn': policy_dn,
631 'objectClass': 'msDS-AuthNPolicy',
634 _domain_sid = None
636 def sd_from_sddl(sddl):
637 nonlocal _domain_sid
638 if _domain_sid is None:
639 _domain_sid = security.dom_sid(samdb.get_domain_sid())
641 return ndr_pack(security.descriptor.from_sddl(sddl, _domain_sid))
643 if enforced is True:
644 enforced = 'TRUE'
645 elif enforced is False:
646 enforced = 'FALSE'
648 if user_allowed_ntlm is True:
649 user_allowed_ntlm = 'TRUE'
650 elif user_allowed_ntlm is False:
651 user_allowed_ntlm = 'FALSE'
653 if service_allowed_ntlm is True:
654 service_allowed_ntlm = 'TRUE'
655 elif service_allowed_ntlm is False:
656 service_allowed_ntlm = 'FALSE'
658 if enforced is not None:
659 details['msDS-AuthNPolicyEnforced'] = enforced
660 if strong_ntlm_policy is not None:
661 details['msDS-StrongNTLMPolicy'] = strong_ntlm_policy
663 if user_allowed_from is not None:
664 details['msDS-UserAllowedToAuthenticateFrom'] = sd_from_sddl(
665 user_allowed_from)
666 if user_allowed_ntlm is not None:
667 details['msDS-UserAllowedNTLMNetworkAuthentication'] = (
668 user_allowed_ntlm)
669 if user_allowed_to is not None:
670 details['msDS-UserAllowedToAuthenticateTo'] = sd_from_sddl(
671 user_allowed_to)
672 if user_tgt_lifetime is not None:
673 if isinstance(user_tgt_lifetime, numbers.Number):
674 user_tgt_lifetime = str(int(user_tgt_lifetime * 10_000_000))
675 details['msDS-UserTGTLifetime'] = user_tgt_lifetime
677 if computer_allowed_to is not None:
678 details['msDS-ComputerAllowedToAuthenticateTo'] = sd_from_sddl(
679 computer_allowed_to)
680 if computer_tgt_lifetime is not None:
681 if isinstance(computer_tgt_lifetime, numbers.Number):
682 computer_tgt_lifetime = str(
683 int(computer_tgt_lifetime * 10_000_000))
684 details['msDS-ComputerTGTLifetime'] = computer_tgt_lifetime
686 if service_allowed_from is not None:
687 details['msDS-ServiceAllowedToAuthenticateFrom'] = sd_from_sddl(
688 service_allowed_from)
689 if service_allowed_ntlm is not None:
690 details['msDS-ServiceAllowedNTLMNetworkAuthentication'] = (
691 service_allowed_ntlm)
692 if service_allowed_to is not None:
693 details['msDS-ServiceAllowedToAuthenticateTo'] = sd_from_sddl(
694 service_allowed_to)
695 if service_tgt_lifetime is not None:
696 if isinstance(service_tgt_lifetime, numbers.Number):
697 service_tgt_lifetime = str(
698 int(service_tgt_lifetime * 10_000_000))
699 details['msDS-ServiceTGTLifetime'] = service_tgt_lifetime
701 # Save the policy DN so it can be deleted in tearDownClass().
702 self.accounts.append(str(policy_dn))
704 # Remove the policy if it exists; this will happen if a previous test
705 # run failed.
706 delete_force(samdb, policy_dn)
708 samdb.add(details)
710 return AuthenticationPolicy.get(samdb, dn=policy_dn)
712 def create_claim(self,
713 claim_id,
714 enabled=None,
715 attribute=None,
716 single_valued=None,
717 value_space_restricted=None,
718 source=None,
719 source_type=None,
720 for_classes=None,
721 value_type=None,
722 force=True):
723 samdb = self.get_samdb()
725 claim_dn = self.get_claim_types_dn()
726 claim_dn.add_child(f'CN={claim_id}')
728 details = {
729 'dn': claim_dn,
730 'objectClass': 'msDS-ClaimType',
733 if enabled is True:
734 enabled = 'TRUE'
735 elif enabled is False:
736 enabled = 'FALSE'
738 if attribute is not None:
739 attribute = str(self.get_dn_from_attribute(attribute))
741 if single_valued is True:
742 single_valued = 'TRUE'
743 elif single_valued is False:
744 single_valued = 'FALSE'
746 if value_space_restricted is True:
747 value_space_restricted = 'TRUE'
748 elif value_space_restricted is False:
749 value_space_restricted = 'FALSE'
751 if for_classes is not None:
752 for_classes = [str(self.get_dn_from_class(name))
753 for name in for_classes]
755 if isinstance(value_type, int):
756 value_type = str(value_type)
758 if enabled is not None:
759 details['Enabled'] = enabled
760 if attribute is not None:
761 details['msDS-ClaimAttributeSource'] = attribute
762 if single_valued is not None:
763 details['msDS-ClaimIsSingleValued'] = single_valued
764 if value_space_restricted is not None:
765 details['msDS-ClaimIsValueSpaceRestricted'] = (
766 value_space_restricted)
767 if source is not None:
768 details['msDS-ClaimSource'] = source
769 if source_type is not None:
770 details['msDS-ClaimSourceType'] = source_type
771 if for_classes is not None:
772 details['msDS-ClaimTypeAppliesToClass'] = for_classes
773 if value_type is not None:
774 details['msDS-ClaimValueType'] = value_type
776 if force:
777 # Remove the claim if it exists; this will happen if a previous
778 # test run failed
779 delete_force(samdb, claim_dn)
781 try:
782 samdb.add(details)
783 except ldb.LdbError as err:
784 num, estr = err.args
785 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
786 raise
787 self.assertFalse(force, 'should not fail with force=True')
788 else:
789 # Save the claim DN so it can be deleted in tearDownClass()
790 self.accounts.append(str(claim_dn))
792 def create_account(self, samdb, name, account_type=AccountType.USER,
793 spn=None, upn=None, additional_details=None,
794 ou=None, account_control=0, add_dollar=None,
795 expired_password=False, force_nt4_hash=False,
796 preserve=True):
797 """Create an account for testing.
798 The dn of the created account is added to self.accounts,
799 which is used by tearDownClass to clean up the created accounts.
801 if add_dollar is None and account_type is not self.AccountType.USER:
802 add_dollar = True
804 if ou is None:
805 if account_type is self.AccountType.COMPUTER:
806 guid = DS_GUID_COMPUTERS_CONTAINER
807 elif account_type is self.AccountType.MANAGED_SERVICE or (
808 account_type is self.AccountType.GROUP_MANAGED_SERVICE):
809 guid = DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
810 elif account_type is self.AccountType.SERVER:
811 guid = DS_GUID_DOMAIN_CONTROLLERS_CONTAINER
812 else:
813 guid = DS_GUID_USERS_CONTAINER
815 ou = samdb.get_wellknown_dn(samdb.get_default_basedn(), guid)
817 dn = "CN=%s,%s" % (name, ou)
819 # remove the account if it exists, this will happen if a previous test
820 # run failed
821 delete_force(samdb, dn)
822 account_name = name
823 if add_dollar:
824 account_name += '$'
825 secure_schannel_type = SEC_CHAN_NULL
826 if account_type is self.AccountType.USER:
827 object_class = "user"
828 account_control |= UF_NORMAL_ACCOUNT
829 elif account_type is self.AccountType.MANAGED_SERVICE:
830 object_class = "msDS-ManagedServiceAccount"
831 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
832 secure_schannel_type = SEC_CHAN_WKSTA
833 elif account_type is self.AccountType.GROUP_MANAGED_SERVICE:
834 object_class = "msDS-GroupManagedServiceAccount"
835 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
836 secure_schannel_type = SEC_CHAN_WKSTA
837 else:
838 object_class = "computer"
839 if account_type is self.AccountType.COMPUTER:
840 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
841 secure_schannel_type = SEC_CHAN_WKSTA
842 elif account_type is self.AccountType.SERVER:
843 account_control |= UF_SERVER_TRUST_ACCOUNT
844 secure_schannel_type = SEC_CHAN_BDC
845 else:
846 self.fail()
848 details = {
849 "dn": dn,
850 "objectClass": object_class,
851 "sAMAccountName": account_name,
852 "userAccountControl": str(account_control),
855 if account_type is self.AccountType.GROUP_MANAGED_SERVICE:
856 password = None
857 else:
858 password = generate_random_password(32, 32)
859 utf16pw = ('"%s"' % password).encode('utf-16-le')
861 details['unicodePwd'] = utf16pw
863 if upn is not None:
864 upn = upn.format(account=account_name)
865 if spn is not None:
866 if isinstance(spn, str):
867 spn = spn.format(account=account_name)
868 else:
869 spn = tuple(s.format(account=account_name) for s in spn)
870 details["servicePrincipalName"] = spn
871 if upn is not None:
872 details["userPrincipalName"] = upn
873 if expired_password:
874 details["pwdLastSet"] = "0"
875 if additional_details is not None:
876 details.update(additional_details)
877 if preserve:
878 # Mark this account for deletion in tearDownClass() after all the
879 # tests in this class finish.
880 self.accounts.append(dn)
881 else:
882 # Mark this account for deletion in tearDown() after the current
883 # test finishes. Because the time complexity of deleting an account
884 # in Samba scales with the number of accounts, it is faster to
885 # delete accounts as soon as possible than to keep them around
886 # until all the tests are finished.
887 self.test_accounts.append(dn)
888 samdb.add(details)
890 expected_kvno = 1
892 if force_nt4_hash:
893 admin_creds = self.get_admin_creds()
894 lp = self.get_lp()
895 net_ctx = net.Net(admin_creds, lp, server=self.dc_host)
896 domain = samdb.domain_netbios_name().upper()
898 password = generate_random_password(32, 32)
900 try:
901 net_ctx.set_password(newpassword=password,
902 account_name=account_name,
903 domain_name=domain,
904 force_samr_18=True)
905 expected_kvno += 1
906 except Exception as e:
907 self.fail(e)
909 creds = KerberosCredentials()
910 creds.guess(self.get_lp())
911 creds.set_realm(samdb.domain_dns_name().upper())
912 creds.set_domain(samdb.domain_netbios_name().upper())
913 if password is not None:
914 creds.set_password(password)
915 creds.set_username(account_name)
916 if account_type is self.AccountType.USER:
917 creds.set_workstation('')
918 else:
919 creds.set_workstation(name)
920 creds.set_secure_channel_type(secure_schannel_type)
921 creds.set_dn(ldb.Dn(samdb, dn))
922 creds.set_upn(upn)
923 creds.set_spn(spn)
924 creds.set_type(account_type)
925 creds.set_user_account_control(account_control)
927 self.creds_set_enctypes(creds)
929 res = samdb.search(base=dn,
930 scope=ldb.SCOPE_BASE,
931 attrs=['msDS-KeyVersionNumber',
932 'objectSid',
933 'objectGUID'])
935 kvno = res[0].get('msDS-KeyVersionNumber', idx=0)
936 if kvno is not None:
937 self.assertEqual(int(kvno), expected_kvno)
938 creds.set_kvno(expected_kvno)
940 sid = res[0].get('objectSid', idx=0)
941 sid = samdb.schema_format_value('objectSID', sid)
942 sid = sid.decode('utf-8')
943 creds.set_sid(sid)
944 guid = res[0].get('objectGUID', idx=0)
945 guid = samdb.schema_format_value('objectGUID', guid)
946 guid = guid.decode('utf-8')
947 creds.set_guid(guid)
949 return (creds, dn)
951 def get_security_descriptor(self, dn):
952 samdb = self.get_samdb()
954 sid = self.get_objectSid(samdb, dn)
956 owner_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
958 ace = security.ace()
959 ace.access_mask = security.SEC_ADS_CONTROL_ACCESS
961 ace.trustee = security.dom_sid(sid)
963 dacl = security.acl()
964 dacl.revision = security.SECURITY_ACL_REVISION_ADS
965 dacl.aces = [ace]
966 dacl.num_aces = 1
968 security_desc = security.descriptor()
969 security_desc.type |= security.SEC_DESC_DACL_PRESENT
970 security_desc.owner_sid = owner_sid
971 security_desc.dacl = dacl
973 return ndr_pack(security_desc)
975 def create_rodc(self, ctx):
976 ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
977 ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
978 ctx.krbtgt_dn = f'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
980 ctx.never_reveal_sid = [f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
981 f'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
982 f'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
983 f'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
984 f'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
985 ctx.reveal_sid = f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
987 mysid = ctx.get_mysid()
988 admin_dn = f'<SID={mysid}>'
989 ctx.managedby = admin_dn
991 ctx.userAccountControl = (UF_WORKSTATION_TRUST_ACCOUNT |
992 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
993 UF_PARTIAL_SECRETS_ACCOUNT)
995 ctx.connection_dn = f'CN=RODC Connection (FRS),{ctx.ntds_dn}'
996 ctx.secure_channel_type = misc.SEC_CHAN_RODC
997 ctx.RODC = True
998 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
999 drsuapi.DRSUAPI_DRS_PER_SYNC |
1000 drsuapi.DRSUAPI_DRS_GET_ANC |
1001 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
1002 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
1003 ctx.domain_replica_flags = ctx.replica_flags | drsuapi.DRSUAPI_DRS_CRITICAL_ONLY
1005 ctx.build_nc_lists()
1007 ctx.cleanup_old_join()
1009 try:
1010 ctx.join_add_objects()
1011 except Exception:
1012 # cleanup the failed join (checking we still have a live LDB
1013 # connection to the remote DC first)
1014 ctx.refresh_ldb_connection()
1015 ctx.cleanup_old_join()
1016 raise
1018 def replicate_account_to_rodc(self, dn):
1019 samdb = self.get_samdb()
1020 rodc_samdb = self.get_rodc_samdb()
1022 repl_val = f'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
1024 msg = ldb.Message()
1025 msg.dn = ldb.Dn(rodc_samdb, '')
1026 msg['replicateSingleObject'] = ldb.MessageElement(
1027 repl_val,
1028 ldb.FLAG_MOD_REPLACE,
1029 'replicateSingleObject')
1031 try:
1032 # Try replication using the replicateSingleObject rootDSE
1033 # operation.
1034 rodc_samdb.modify(msg)
1035 except ldb.LdbError as err:
1036 enum, estr = err.args
1037 self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
1038 self.assertIn('rootdse_modify: unknown attribute to change!',
1039 estr)
1041 # If that method wasn't supported, we may be in the rodc:local test
1042 # environment, where we can try replicating to the local database.
1044 lp = self.get_lp()
1046 rodc_creds = Credentials()
1047 rodc_creds.guess(lp)
1048 rodc_creds.set_machine_account(lp)
1050 local_samdb = SamDB(url=None, session_info=system_session(),
1051 credentials=rodc_creds, lp=lp)
1053 destination_dsa_guid = misc.GUID(local_samdb.get_ntds_GUID())
1055 repl = drs_Replicate(f'ncacn_ip_tcp:{self.dc_host}[seal]',
1056 lp, rodc_creds,
1057 local_samdb, destination_dsa_guid)
1059 source_dsa_invocation_id = misc.GUID(samdb.invocation_id)
1061 repl.replicate(dn,
1062 source_dsa_invocation_id,
1063 destination_dsa_guid,
1064 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
1065 rodc=True)
1067 def reveal_account_to_mock_rodc(self, dn):
1068 samdb = self.get_samdb()
1069 rodc_ctx = self.get_mock_rodc_ctx()
1071 self.get_secrets(
1073 destination_dsa_guid=rodc_ctx.ntds_guid,
1074 source_dsa_invocation_id=misc.GUID(samdb.invocation_id))
1076 def check_revealed(self, dn, rodc_dn, revealed=True):
1077 samdb = self.get_samdb()
1079 res = samdb.search(base=rodc_dn,
1080 scope=ldb.SCOPE_BASE,
1081 attrs=['msDS-RevealedUsers'])
1083 revealed_users = res[0].get('msDS-RevealedUsers')
1084 if revealed_users is None:
1085 self.assertFalse(revealed)
1086 return
1088 revealed_dns = set(str(dsdb_Dn(samdb, str(user),
1089 syntax_oid=DSDB_SYNTAX_BINARY_DN).dn)
1090 for user in revealed_users)
1092 if revealed:
1093 self.assertIn(str(dn), revealed_dns)
1094 else:
1095 self.assertNotIn(str(dn), revealed_dns)
1097 def get_secrets(self, dn,
1098 destination_dsa_guid,
1099 source_dsa_invocation_id):
1100 bind, handle, _ = self.get_drsuapi_connection()
1102 req = drsuapi.DsGetNCChangesRequest8()
1104 req.destination_dsa_guid = destination_dsa_guid
1105 req.source_dsa_invocation_id = source_dsa_invocation_id
1107 naming_context = drsuapi.DsReplicaObjectIdentifier()
1108 naming_context.dn = dn
1110 req.naming_context = naming_context
1112 hwm = drsuapi.DsReplicaHighWaterMark()
1113 hwm.tmp_highest_usn = 0
1114 hwm.reserved_usn = 0
1115 hwm.highest_usn = 0
1117 req.highwatermark = hwm
1118 req.uptodateness_vector = None
1120 req.replica_flags = 0
1122 req.max_object_count = 1
1123 req.max_ndr_size = 402116
1124 req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
1126 attids = [drsuapi.DRSUAPI_ATTID_supplementalCredentials,
1127 drsuapi.DRSUAPI_ATTID_unicodePwd,
1128 drsuapi.DRSUAPI_ATTID_ntPwdHistory]
1130 partial_attribute_set = drsuapi.DsPartialAttributeSet()
1131 partial_attribute_set.version = 1
1132 partial_attribute_set.attids = attids
1133 partial_attribute_set.num_attids = len(attids)
1135 req.partial_attribute_set = partial_attribute_set
1137 req.partial_attribute_set_ex = None
1138 req.mapping_ctr.num_mappings = 0
1139 req.mapping_ctr.mappings = None
1141 _, ctr = bind.DsGetNCChanges(handle, 8, req)
1143 self.assertEqual(1, ctr.object_count)
1145 identifier = ctr.first_object.object.identifier
1146 attributes = ctr.first_object.object.attribute_ctr.attributes
1148 self.assertEqual(dn, identifier.dn)
1150 return bind, identifier, attributes
1152 def unpack_supplemental_credentials(
1153 self, blob: bytes
1154 ) -> Dict[kcrypto.Enctype, str]:
1155 spl = ndr_unpack(drsblobs.supplementalCredentialsBlob, blob)
1157 keys: Dict[kcrypto.Enctype, str] = {}
1159 for pkg in spl.sub.packages:
1160 if pkg.name == 'Primary:Kerberos-Newer-Keys':
1161 krb5_new_keys_raw = binascii.a2b_hex(pkg.data)
1162 krb5_new_keys = ndr_unpack(
1163 drsblobs.package_PrimaryKerberosBlob, krb5_new_keys_raw
1165 for key in krb5_new_keys.ctr.keys:
1166 keytype = key.keytype
1167 if keytype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
1168 keys[keytype] = key.value.hex()
1170 return keys
1172 def get_keys(self, creds, expected_etypes=None):
1173 admin_creds = self.get_admin_creds()
1174 samdb = self.get_samdb()
1176 dn = creds.get_dn()
1178 bind, identifier, attributes = self.get_secrets(
1179 str(dn),
1180 destination_dsa_guid=misc.GUID(samdb.get_ntds_GUID()),
1181 source_dsa_invocation_id=misc.GUID())
1183 rid = identifier.sid.split()[1]
1185 net_ctx = net.Net(admin_creds)
1187 keys = {}
1189 for attr in attributes:
1190 if not attr.value_ctr.num_values:
1191 continue
1193 if attr.attid == drsuapi.DRSUAPI_ATTID_supplementalCredentials:
1194 net_ctx.replicate_decrypt(bind, attr, rid)
1196 keys.update(
1197 self.unpack_supplemental_credentials(attr.value_ctr.values[0].blob)
1199 elif attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
1200 net_ctx.replicate_decrypt(bind, attr, rid)
1202 pwd = attr.value_ctr.values[0].blob
1203 keys[kcrypto.Enctype.RC4] = pwd.hex()
1205 if expected_etypes is None:
1206 expected_etypes = self.get_default_enctypes(creds)
1208 self.assertCountEqual(expected_etypes, keys)
1210 return keys
1212 def creds_set_keys(self, creds, keys):
1213 if keys is not None:
1214 for enctype, key in keys.items():
1215 creds.set_forced_key(enctype, key)
1217 def creds_set_enctypes(self, creds,
1218 extra_bits=None,
1219 remove_bits=None):
1220 samdb = self.get_samdb()
1222 res = samdb.search(creds.get_dn(),
1223 scope=ldb.SCOPE_BASE,
1224 attrs=['msDS-SupportedEncryptionTypes'])
1225 supported_enctypes = res[0].get('msDS-SupportedEncryptionTypes', idx=0)
1227 if supported_enctypes is None:
1228 supported_enctypes = self.default_etypes
1229 if supported_enctypes is None:
1230 lp = self.get_lp()
1231 supported_enctypes = lp.get('kdc default domain supported enctypes')
1232 if supported_enctypes == 0:
1233 supported_enctypes = rc4_bit | aes256_sk_bit
1234 supported_enctypes = int(supported_enctypes)
1236 if extra_bits is not None:
1237 # We need to add in implicit or implied encryption types.
1238 supported_enctypes |= extra_bits
1239 if remove_bits is not None:
1240 # We also need to remove certain bits, such as the non-encryption
1241 # type bit aes256-sk.
1242 supported_enctypes &= ~remove_bits
1244 creds.set_as_supported_enctypes(supported_enctypes)
1245 creds.set_tgs_supported_enctypes(supported_enctypes)
1246 creds.set_ap_supported_enctypes(supported_enctypes)
1248 def creds_set_default_enctypes(self, creds,
1249 fast_support=False,
1250 claims_support=False,
1251 compound_id_support=False):
1252 default_enctypes = self.get_default_enctypes(creds)
1253 supported_enctypes = KerberosCredentials.etypes_to_bits(
1254 default_enctypes)
1256 if fast_support:
1257 supported_enctypes |= security.KERB_ENCTYPE_FAST_SUPPORTED
1258 if claims_support:
1259 supported_enctypes |= security.KERB_ENCTYPE_CLAIMS_SUPPORTED
1260 if compound_id_support:
1261 supported_enctypes |= (
1262 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED)
1264 creds.set_as_supported_enctypes(supported_enctypes)
1265 creds.set_tgs_supported_enctypes(supported_enctypes)
1266 creds.set_ap_supported_enctypes(supported_enctypes)
1268 def add_to_group(self, account_dn, group_dn, group_attr, expect_attr=True,
1269 new_group_type=None):
1270 samdb = self.get_samdb()
1272 try:
1273 res = samdb.search(base=group_dn,
1274 scope=ldb.SCOPE_BASE,
1275 attrs=[group_attr])
1276 except ldb.LdbError as err:
1277 num, _ = err.args
1278 if num != ldb.ERR_NO_SUCH_OBJECT:
1279 raise
1281 self.fail(err)
1283 orig_msg = res[0]
1284 members = orig_msg.get(group_attr)
1285 if expect_attr:
1286 self.assertIsNotNone(members)
1287 elif members is None:
1288 members = ()
1289 else:
1290 members = map(lambda s: s.decode('utf-8'), members)
1292 # Use a set so we can handle the same group being added twice.
1293 members = set(members)
1295 self.assertNotIsInstance(account_dn, ldb.Dn,
1296 'ldb.MessageElement does not support ldb.Dn')
1297 self.assertNotIsInstance(account_dn, bytes)
1299 if isinstance(account_dn, str):
1300 members.add(account_dn)
1301 else:
1302 members.update(account_dn)
1304 msg = ldb.Message()
1305 msg.dn = group_dn
1306 if new_group_type is not None:
1307 msg['0'] = ldb.MessageElement(
1308 common.normalise_int32(new_group_type),
1309 ldb.FLAG_MOD_REPLACE,
1310 'groupType')
1311 msg['1'] = ldb.MessageElement(list(members),
1312 ldb.FLAG_MOD_REPLACE,
1313 group_attr)
1314 cleanup = samdb.msg_diff(msg, orig_msg)
1315 self.ldb_cleanups.append(cleanup)
1316 samdb.modify(msg)
1318 return cleanup
1320 def remove_from_group(self, account_dn, group_dn):
1321 samdb = self.get_samdb()
1323 res = samdb.search(base=group_dn,
1324 scope=ldb.SCOPE_BASE,
1325 attrs=['member'])
1326 orig_msg = res[0]
1327 self.assertIn('member', orig_msg)
1328 members = list(orig_msg['member'])
1330 account_dn = str(account_dn).encode('utf-8')
1331 self.assertIn(account_dn, members)
1332 members.remove(account_dn)
1334 msg = ldb.Message()
1335 msg.dn = group_dn
1336 msg['member'] = ldb.MessageElement(members,
1337 ldb.FLAG_MOD_REPLACE,
1338 'member')
1340 cleanup = samdb.msg_diff(msg, orig_msg)
1341 self.ldb_cleanups.append(cleanup)
1342 samdb.modify(msg)
1344 return cleanup
1346 # Create a new group and return a Principal object representing it.
1347 def create_group_principal(self, samdb, group_type):
1348 name = self.get_new_username()
1349 dn = self.create_group(samdb, name, gtype=group_type.value)
1350 sid = self.get_objectSid(samdb, dn)
1352 return Principal(ldb.Dn(samdb, dn), sid)
1354 def set_group_type(self, samdb, dn, gtype):
1355 group_type = common.normalise_int32(gtype.value)
1356 msg = ldb.Message(dn)
1357 msg['groupType'] = ldb.MessageElement(group_type,
1358 ldb.FLAG_MOD_REPLACE,
1359 'groupType')
1360 samdb.modify(msg)
1362 def set_primary_group(self, samdb, dn, primary_sid,
1363 expected_error=None,
1364 expected_werror=None):
1365 # Get the RID to be set as our primary group.
1366 primary_rid = primary_sid.rsplit('-', 1)[1]
1368 # Find out our current primary group.
1369 res = samdb.search(dn,
1370 scope=ldb.SCOPE_BASE,
1371 attrs=['primaryGroupId'])
1372 orig_msg = res[0]
1374 # Prepare to modify the attribute.
1375 msg = ldb.Message(dn)
1376 msg['primaryGroupId'] = ldb.MessageElement(str(primary_rid),
1377 ldb.FLAG_MOD_REPLACE,
1378 'primaryGroupId')
1380 # We'll remove the primaryGroupId attribute after the test, to avoid
1381 # problems in the teardown if the user outlives the group.
1382 remove_msg = samdb.msg_diff(msg, orig_msg)
1383 self.addCleanup(samdb.modify, remove_msg)
1385 # Set primaryGroupId.
1386 if expected_error is None:
1387 self.assertIsNone(expected_werror)
1389 samdb.modify(msg)
1390 else:
1391 self.assertIsNotNone(expected_werror)
1393 with self.assertRaises(
1394 ldb.LdbError,
1395 msg='expected setting primary group to fail'
1396 ) as err:
1397 samdb.modify(msg)
1399 error, estr = err.exception.args
1400 self.assertEqual(expected_error, error)
1401 self.assertIn(f'{expected_werror:08X}', estr)
1403 # Create an arrangement of groups based on a configuration specified in a
1404 # test case. 'user_principal' is a principal representing the user account;
1405 # 'trust_principal', a principal representing the account of a user from
1406 # another domain.
1407 def setup_groups(self,
1408 samdb,
1409 preexisting_groups,
1410 group_setup,
1411 primary_groups):
1412 groups = dict(preexisting_groups)
1414 primary_group_types = {}
1416 # Create each group and add it to the group mapping.
1417 if group_setup is not None:
1418 for group_id, (group_type, _) in group_setup.items():
1419 self.assertNotIn(group_id, preexisting_groups,
1420 "don't specify placeholders")
1421 self.assertNotIn(group_id, groups,
1422 'group ID specified more than once')
1424 if primary_groups is not None and (
1425 group_id in primary_groups.values()):
1426 # Windows disallows setting a domain-local group as a
1427 # primary group, unless we create it as Universal first and
1428 # change it back to Domain-Local later.
1429 primary_group_types[group_id] = group_type
1430 group_type = GroupType.UNIVERSAL
1432 groups[group_id] = self.create_group_principal(samdb,
1433 group_type)
1435 if group_setup is not None:
1436 # Map a group ID to that group's DN, and generate an
1437 # understandable error message if the mapping fails.
1438 def group_id_to_dn(group_id):
1439 try:
1440 group = groups[group_id]
1441 except KeyError:
1442 self.fail(f"included group member '{group_id}', but it is "
1443 f"not specified in {groups.keys()}")
1444 else:
1445 if group.dn is not None:
1446 return str(group.dn)
1448 return f'<SID={group.sid}>'
1450 # Populate each group's members.
1451 for group_id, (_, members) in group_setup.items():
1452 # Get the group's DN and the mapped DNs of its members.
1453 dn = groups[group_id].dn
1454 principal_members = map(group_id_to_dn, members)
1456 # Add the members to the group.
1457 self.add_to_group(principal_members, dn, 'member',
1458 expect_attr=False)
1460 # Set primary groups.
1461 if primary_groups is not None:
1462 for user, primary_group in primary_groups.items():
1463 primary_sid = groups[primary_group].sid
1464 self.set_primary_group(samdb, user.dn, primary_sid)
1466 # Change the primary groups to their actual group types.
1467 for primary_group, primary_group_type in primary_group_types.items():
1468 self.set_group_type(samdb,
1469 groups[primary_group].dn,
1470 primary_group_type)
1472 # Return the mapping from group IDs to principals.
1473 return groups
1475 def map_to_sid(self, val, mapping, domain_sid):
1476 if isinstance(val, int):
1477 # If it's an integer, we assume it's a RID, and prefix the domain
1478 # SID.
1479 self.assertIsNotNone(domain_sid)
1480 return f'{domain_sid}-{val}'
1482 if mapping is not None and val in mapping:
1483 # Or if we have a mapping for it, apply that.
1484 return mapping[val].sid
1486 # Otherwise leave it unmodified.
1487 return val
1489 def map_to_dn(self, val, mapping, domain_sid):
1490 sid = self.map_to_sid(val, mapping, domain_sid)
1491 return ldb.Dn(self.get_samdb(), f'<SID={sid}>')
1493 # Return SIDs from principal placeholders based on a supplied mapping.
1494 def map_sids(self, sids, mapping, domain_sid):
1495 if sids is None:
1496 return None
1498 mapped_sids = set()
1500 for entry in sids:
1501 if isinstance(entry, frozenset):
1502 mapped_sids.add(frozenset(self.map_sids(entry,
1503 mapping,
1504 domain_sid)))
1505 else:
1506 val, sid_type, attrs = entry
1507 sid = self.map_to_sid(val, mapping, domain_sid)
1509 # There's no point expecting the 'Claims Valid' SID to be
1510 # present if we don't support claims. Filter it out to give the
1511 # tests a chance of passing.
1512 if not self.kdc_claims_support and (
1513 sid == security.SID_CLAIMS_VALID):
1514 continue
1516 mapped_sids.add((sid, sid_type, attrs))
1518 return mapped_sids
1520 def issued_by_rodc(self, ticket):
1521 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
1522 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(
1523 rodc_krbtgt_creds)
1525 checksum_keys = {
1526 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
1529 return self.modified_ticket(
1530 ticket,
1531 new_ticket_key=rodc_krbtgt_key,
1532 checksum_keys=checksum_keys)
1534 def signed_by_rodc(self, ticket):
1535 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
1536 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(
1537 rodc_krbtgt_creds)
1539 checksum_keys = {
1540 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
1543 return self.modified_ticket(ticket,
1544 checksum_keys=checksum_keys)
1546 # Get a ticket with the SIDs in the PAC replaced with ones we specify. This
1547 # is useful for creating arbitrary tickets that can be used to perform a
1548 # TGS-REQ.
1549 def ticket_with_sids(self,
1550 ticket,
1551 new_sids,
1552 domain_sid,
1553 user_rid,
1554 set_user_flags=0,
1555 reset_user_flags=0,
1556 from_rodc=False):
1557 if from_rodc:
1558 krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
1559 else:
1560 krbtgt_creds = self.get_krbtgt_creds()
1561 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1563 checksum_keys = {
1564 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
1567 modify_pac_fn = partial(self.set_pac_sids,
1568 new_sids=new_sids,
1569 domain_sid=domain_sid,
1570 user_rid=user_rid,
1571 set_user_flags=set_user_flags,
1572 reset_user_flags=reset_user_flags)
1574 return self.modified_ticket(ticket,
1575 new_ticket_key=krbtgt_key,
1576 modify_pac_fn=modify_pac_fn,
1577 checksum_keys=checksum_keys)
1579 # Replace the SIDs in a PAC with 'new_sids'.
1580 def set_pac_sids(self,
1581 pac,
1583 new_sids,
1584 domain_sid=None,
1585 user_rid=None,
1586 set_user_flags=0,
1587 reset_user_flags=0):
1588 if domain_sid is None:
1589 domain_sid = self.get_samdb().get_domain_sid()
1591 base_sids = []
1592 extra_sids = []
1593 resource_sids = []
1595 resource_domain = None
1597 primary_gid = None
1599 # Filter our SIDs into three arrays depending on their ultimate
1600 # location in the PAC.
1601 for sid, sid_type, attrs in new_sids:
1602 if sid_type is self.SidType.BASE_SID:
1603 if isinstance(sid, int):
1604 domain, rid = domain_sid, sid
1605 else:
1606 domain, rid = sid.rsplit('-', 1)
1607 self.assertEqual(domain_sid, domain,
1608 f'base SID {sid} must be in our domain')
1610 base_sid = samr.RidWithAttribute()
1611 base_sid.rid = int(rid)
1612 base_sid.attributes = attrs
1614 base_sids.append(base_sid)
1615 elif sid_type is self.SidType.EXTRA_SID:
1616 extra_sid = netlogon.netr_SidAttr()
1617 extra_sid.sid = security.dom_sid(sid)
1618 extra_sid.attributes = attrs
1620 extra_sids.append(extra_sid)
1621 elif sid_type is self.SidType.RESOURCE_SID:
1622 if isinstance(sid, int):
1623 domain, rid = domain_sid, sid
1624 else:
1625 domain, rid = sid.rsplit('-', 1)
1626 if resource_domain is None:
1627 resource_domain = domain
1628 else:
1629 self.assertEqual(resource_domain, domain,
1630 'resource SIDs must share the same '
1631 'domain')
1633 resource_sid = samr.RidWithAttribute()
1634 resource_sid.rid = int(rid)
1635 resource_sid.attributes = attrs
1637 resource_sids.append(resource_sid)
1638 elif sid_type is self.SidType.PRIMARY_GID:
1639 self.assertIsNone(primary_gid,
1640 f'must not specify a second primary GID '
1641 f'{sid}')
1642 self.assertIsNone(attrs, 'cannot specify primary GID attrs')
1644 if isinstance(sid, int):
1645 domain, primary_gid = domain_sid, sid
1646 else:
1647 domain, primary_gid = sid.rsplit('-', 1)
1648 self.assertEqual(domain_sid, domain,
1649 f'primary GID {sid} must be in our domain')
1650 else:
1651 self.fail(f'invalid SID type {sid_type}')
1653 found_logon_info = False
1655 pac_buffers = pac.buffers
1656 for pac_buffer in pac_buffers:
1657 # Find the LOGON_INFO PAC buffer.
1658 if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
1659 logon_info = pac_buffer.info.info
1661 # Add Extra SIDs and set the EXTRA_SIDS flag as needed.
1662 logon_info.info3.sidcount = len(extra_sids)
1663 if extra_sids:
1664 logon_info.info3.sids = extra_sids
1665 logon_info.info3.base.user_flags |= (
1666 netlogon.NETLOGON_EXTRA_SIDS)
1667 else:
1668 logon_info.info3.sids = None
1669 logon_info.info3.base.user_flags &= ~(
1670 netlogon.NETLOGON_EXTRA_SIDS)
1672 # Add Base SIDs.
1673 logon_info.info3.base.groups.count = len(base_sids)
1674 if base_sids:
1675 logon_info.info3.base.groups.rids = base_sids
1676 else:
1677 logon_info.info3.base.groups.rids = None
1679 logon_info.info3.base.domain_sid = security.dom_sid(domain_sid)
1680 if user_rid is not None:
1681 logon_info.info3.base.rid = int(user_rid)
1683 if primary_gid is not None:
1684 logon_info.info3.base.primary_gid = int(primary_gid)
1686 # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed.
1687 logon_info.resource_groups.groups.count = len(resource_sids)
1688 if resource_sids:
1689 resource_domain = security.dom_sid(resource_domain)
1690 logon_info.resource_groups.domain_sid = resource_domain
1691 logon_info.resource_groups.groups.rids = resource_sids
1692 logon_info.info3.base.user_flags |= (
1693 netlogon.NETLOGON_RESOURCE_GROUPS)
1694 else:
1695 logon_info.resource_groups.domain_sid = None
1696 logon_info.resource_groups.groups.rids = None
1697 logon_info.info3.base.user_flags &= ~(
1698 netlogon.NETLOGON_RESOURCE_GROUPS)
1700 logon_info.info3.base.user_flags |= set_user_flags
1701 logon_info.info3.base.user_flags &= ~reset_user_flags
1703 found_logon_info = True
1705 # Also replace the user's SID in the UPN DNS buffer.
1706 elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
1707 upn_dns_info_ex = pac_buffer.info.ex
1709 if user_rid is not None:
1710 upn_dns_info_ex.objectsid = security.dom_sid(
1711 f'{domain_sid}-{user_rid}')
1713 # But don't replace the user's SID in the Requester SID buffer, or
1714 # we'll get a SID mismatch.
1716 self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer')
1718 pac.buffers = pac_buffers
1720 return pac
1722 # Replace the device SIDs in a PAC with 'new_sids'.
1723 def set_pac_device_sids(self,
1724 pac,
1726 new_sids,
1727 domain_sid=None,
1728 user_rid):
1729 if domain_sid is None:
1730 domain_sid = self.get_samdb().get_domain_sid()
1732 base_sids = []
1733 extra_sids = []
1734 resource_sids = []
1736 primary_gid = None
1738 # Filter our SIDs into three arrays depending on their ultimate
1739 # location in the PAC.
1740 for entry in new_sids:
1741 if isinstance(entry, frozenset):
1742 resource_domain = None
1743 domain_sids = []
1745 for sid, sid_type, attrs in entry:
1746 self.assertIs(sid_type, self.SidType.RESOURCE_SID,
1747 'only resource SIDs may be specified in this way')
1749 if isinstance(sid, int):
1750 domain, rid = domain_sid, sid
1751 else:
1752 domain, rid = sid.rsplit('-', 1)
1753 if resource_domain is None:
1754 resource_domain = domain
1755 else:
1756 self.assertEqual(resource_domain, domain,
1757 'resource SIDs must share the same '
1758 'domain')
1760 resource_sid = samr.RidWithAttribute()
1761 resource_sid.rid = int(rid)
1762 resource_sid.attributes = attrs
1764 domain_sids.append(resource_sid)
1766 membership = krb5pac.PAC_DOMAIN_GROUP_MEMBERSHIP()
1767 if resource_domain is not None:
1768 membership.domain_sid = security.dom_sid(resource_domain)
1769 membership.groups.rids = domain_sids
1770 membership.groups.count = len(domain_sids)
1772 resource_sids.append(membership)
1773 else:
1774 sid, sid_type, attrs = entry
1775 if sid_type is self.SidType.BASE_SID:
1776 if isinstance(sid, int):
1777 domain, rid = domain_sid, sid
1778 else:
1779 domain, rid = sid.rsplit('-', 1)
1780 self.assertEqual(domain_sid, domain,
1781 f'base SID {sid} must be in our domain')
1783 base_sid = samr.RidWithAttribute()
1784 base_sid.rid = int(rid)
1785 base_sid.attributes = attrs
1787 base_sids.append(base_sid)
1788 elif sid_type is self.SidType.EXTRA_SID:
1789 extra_sid = netlogon.netr_SidAttr()
1790 extra_sid.sid = security.dom_sid(sid)
1791 extra_sid.attributes = attrs
1793 extra_sids.append(extra_sid)
1794 elif sid_type is self.SidType.RESOURCE_SID:
1795 self.fail('specify resource groups in frozenset(s)')
1796 elif sid_type is self.SidType.PRIMARY_GID:
1797 self.assertIsNone(primary_gid,
1798 f'must not specify a second primary GID '
1799 f'{sid}')
1800 self.assertIsNone(attrs, 'cannot specify primary GID attrs')
1802 if isinstance(sid, int):
1803 domain, primary_gid = domain_sid, sid
1804 else:
1805 domain, primary_gid = sid.rsplit('-', 1)
1806 self.assertEqual(domain_sid, domain,
1807 f'primary GID {sid} must be in our domain')
1808 else:
1809 self.fail(f'invalid SID type {sid_type}')
1811 pac_buffers = pac.buffers
1812 for pac_buffer in pac_buffers:
1813 # Find the DEVICE_INFO PAC buffer.
1814 if pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO:
1815 logon_info = pac_buffer.info.info
1816 break
1817 else:
1818 logon_info = krb5pac.PAC_DEVICE_INFO()
1820 logon_info_ctr = krb5pac.PAC_DEVICE_INFO_CTR()
1821 logon_info_ctr.info = logon_info
1823 pac_buffer = krb5pac.PAC_BUFFER()
1824 pac_buffer.type = krb5pac.PAC_TYPE_DEVICE_INFO
1825 pac_buffer.info = logon_info_ctr
1827 pac_buffers.append(pac_buffer)
1829 logon_info.domain_sid = security.dom_sid(domain_sid)
1830 logon_info.rid = int(user_rid)
1832 self.assertIsNotNone(primary_gid, 'please specify the primary GID')
1833 logon_info.primary_gid = int(primary_gid)
1835 # Add Base SIDs.
1836 if base_sids:
1837 logon_info.groups.rids = base_sids
1838 else:
1839 logon_info.groups.rids = None
1840 logon_info.groups.count = len(base_sids)
1842 # Add Extra SIDs.
1843 if extra_sids:
1844 logon_info.sids = extra_sids
1845 else:
1846 logon_info.sids = None
1847 logon_info.sid_count = len(extra_sids)
1849 # Add Resource SIDs.
1850 if resource_sids:
1851 logon_info.domain_groups = resource_sids
1852 else:
1853 logon_info.domain_groups = None
1854 logon_info.domain_group_count = len(resource_sids)
1856 pac.buffers = pac_buffers
1857 pac.num_buffers = len(pac_buffers)
1859 return pac
1861 def set_pac_claims(self, pac, *, client_claims=None, device_claims=None, claim_ids=None):
1862 if claim_ids is None:
1863 claim_ids = {}
1865 if client_claims is not None:
1866 self.assertIsNone(device_claims,
1867 'don’t specify both client and device claims')
1868 pac_claims = client_claims
1869 pac_buffer_type = krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO
1870 else:
1871 self.assertIsNotNone(device_claims,
1872 'please specify client or device claims')
1873 pac_claims = device_claims
1874 pac_buffer_type = krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO
1876 claim_value_types = {
1877 claims.CLAIM_TYPE_INT64: claims.CLAIM_INT64,
1878 claims.CLAIM_TYPE_UINT64: claims.CLAIM_UINT64,
1879 claims.CLAIM_TYPE_STRING: claims.CLAIM_STRING,
1880 claims.CLAIM_TYPE_BOOLEAN: claims.CLAIM_UINT64,
1883 claims_arrays = []
1885 for pac_claim_array in pac_claims:
1886 pac_claim_source_type, pac_claim_entries = (
1887 pac_claim_array)
1889 claim_entries = []
1891 for pac_claim_entry in pac_claim_entries:
1892 pac_claim_id, pac_claim_type, pac_claim_values = (
1893 pac_claim_entry)
1895 claim_values_type = claim_value_types.get(
1896 pac_claim_type, claims.CLAIM_STRING)
1898 claim_values_enum = claim_values_type()
1899 claim_values_enum.values = pac_claim_values
1900 claim_values_enum.value_count = len(
1901 pac_claim_values)
1903 claim_entry = claims.CLAIM_ENTRY()
1904 try:
1905 claim_entry.id = pac_claim_id.format_map(
1906 claim_ids)
1907 except KeyError as err:
1908 raise RuntimeError(
1909 f'unknown claim name(s) '
1910 f'in ‘{pac_claim_id}’'
1911 ) from err
1912 claim_entry.type = pac_claim_type
1913 claim_entry.values = claim_values_enum
1915 claim_entries.append(claim_entry)
1917 claims_array = claims.CLAIMS_ARRAY()
1918 claims_array.claims_source_type = pac_claim_source_type
1919 claims_array.claim_entries = claim_entries
1920 claims_array.claims_count = len(claim_entries)
1922 claims_arrays.append(claims_array)
1924 claims_set = claims.CLAIMS_SET()
1925 claims_set.claims_arrays = claims_arrays
1926 claims_set.claims_array_count = len(claims_arrays)
1928 claims_ctr = claims.CLAIMS_SET_CTR()
1929 claims_ctr.claims = claims_set
1931 claims_ndr = claims.CLAIMS_SET_NDR()
1932 claims_ndr.claims = claims_ctr
1934 metadata = claims.CLAIMS_SET_METADATA()
1935 metadata.claims_set = claims_ndr
1936 metadata.compression_format = (
1937 claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF)
1939 metadata_ctr = claims.CLAIMS_SET_METADATA_CTR()
1940 metadata_ctr.metadata = metadata
1942 metadata_ndr = claims.CLAIMS_SET_METADATA_NDR()
1943 metadata_ndr.claims = metadata_ctr
1945 pac_buffers = pac.buffers
1946 for pac_buffer in pac_buffers:
1947 if pac_buffer.type == pac_buffer_type:
1948 break
1949 else:
1950 pac_buffer = krb5pac.PAC_BUFFER()
1951 pac_buffer.type = pac_buffer_type
1952 pac_buffer.info = krb5pac.DATA_BLOB_REM()
1954 pac_buffers.append(pac_buffer)
1956 pac_buffer.info.remaining = ndr_pack(metadata_ndr)
1958 pac.buffers = pac_buffers
1959 pac.num_buffers = len(pac_buffers)
1961 return pac
1963 def add_extra_pac_buffers(self, pac, *, buffers=None):
1964 if buffers is None:
1965 buffers = []
1967 pac_buffers = pac.buffers
1968 for pac_buffer_type in buffers:
1969 info = krb5pac.DATA_BLOB_REM()
1970 # Having an empty PAC buffer will trigger an assertion failure in
1971 # the MIT KDC’s k5_pac_locate_buffer(), so we need at least one
1972 # byte.
1973 info.remaining = b'0'
1975 pac_buffer = krb5pac.PAC_BUFFER()
1976 pac_buffer.type = pac_buffer_type
1977 pac_buffer.info = info
1979 pac_buffers.append(pac_buffer)
1981 pac.buffers = pac_buffers
1982 pac.num_buffers = len(pac_buffers)
1984 return pac
1986 def get_cached_creds(self, *,
1987 account_type: AccountType,
1988 opts: Optional[dict]=None,
1989 samdb: Optional[SamDB]=None,
1990 use_cache=True) -> KerberosCredentials:
1991 if opts is None:
1992 opts = {}
1994 opts_default = {
1995 'name_prefix': None,
1996 'name_suffix': None,
1997 'add_dollar': None,
1998 'upn': None,
1999 'spn': None,
2000 'additional_details': None,
2001 'allowed_replication': False,
2002 'allowed_replication_mock': False,
2003 'denied_replication': False,
2004 'denied_replication_mock': False,
2005 'revealed_to_rodc': False,
2006 'revealed_to_mock_rodc': False,
2007 'no_auth_data_required': False,
2008 'expired_password': False,
2009 'supported_enctypes': None,
2010 'not_delegated': False,
2011 'delegation_to_spn': None,
2012 'delegation_from_dn': None,
2013 'trusted_to_auth_for_delegation': False,
2014 'fast_support': False,
2015 'claims_support': False,
2016 'compound_id_support': False,
2017 'sid_compression_support': True,
2018 'member_of': None,
2019 'kerberos_enabled': True,
2020 'secure_channel_type': None,
2021 'id': None,
2022 'force_nt4_hash': False,
2023 'assigned_policy': None,
2024 'assigned_silo': None,
2025 'logon_hours': None,
2026 'smartcard_required': False,
2027 'enabled': True,
2030 account_opts = {
2031 'account_type': account_type,
2032 **opts_default,
2033 **opts
2036 if use_cache:
2037 self.assertIsNone(samdb)
2038 cache_key = tuple(sorted(account_opts.items()))
2039 creds = self.account_cache.get(cache_key)
2040 if creds is not None:
2041 return creds
2043 creds = self.create_account_opts(samdb, use_cache, **account_opts)
2044 if use_cache:
2045 self.account_cache[cache_key] = creds
2047 return creds
2049 def create_account_opts(self,
2050 samdb: Optional[SamDB],
2051 use_cache,
2053 account_type,
2054 name_prefix,
2055 name_suffix,
2056 add_dollar,
2057 upn,
2058 spn,
2059 additional_details,
2060 allowed_replication,
2061 allowed_replication_mock,
2062 denied_replication,
2063 denied_replication_mock,
2064 revealed_to_rodc,
2065 revealed_to_mock_rodc,
2066 no_auth_data_required,
2067 expired_password,
2068 supported_enctypes,
2069 not_delegated,
2070 delegation_to_spn,
2071 delegation_from_dn,
2072 trusted_to_auth_for_delegation,
2073 fast_support,
2074 claims_support,
2075 compound_id_support,
2076 sid_compression_support,
2077 member_of,
2078 kerberos_enabled,
2079 secure_channel_type,
2081 force_nt4_hash,
2082 assigned_policy,
2083 assigned_silo,
2084 logon_hours,
2085 smartcard_required,
2086 enabled):
2087 if account_type is self.AccountType.USER:
2088 self.assertIsNone(delegation_to_spn)
2089 self.assertIsNone(delegation_from_dn)
2090 self.assertFalse(trusted_to_auth_for_delegation)
2091 else:
2092 self.assertFalse(not_delegated)
2094 if samdb is None:
2095 samdb = self.get_samdb()
2097 user_name = self.get_new_username()
2098 if name_prefix is not None:
2099 user_name = name_prefix + user_name
2100 if name_suffix is not None:
2101 user_name += name_suffix
2103 user_account_control = 0
2104 if trusted_to_auth_for_delegation:
2105 user_account_control |= UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
2106 if not_delegated:
2107 user_account_control |= UF_NOT_DELEGATED
2108 if no_auth_data_required:
2109 user_account_control |= UF_NO_AUTH_DATA_REQUIRED
2110 if smartcard_required:
2111 user_account_control |= UF_SMARTCARD_REQUIRED
2112 if not enabled:
2113 user_account_control |= UF_ACCOUNTDISABLE
2115 if additional_details:
2116 details = {k: v for k, v in additional_details}
2117 else:
2118 details = {}
2120 enctypes = supported_enctypes
2121 if fast_support:
2122 enctypes = enctypes or 0
2123 enctypes |= security.KERB_ENCTYPE_FAST_SUPPORTED
2124 if claims_support:
2125 enctypes = enctypes or 0
2126 enctypes |= security.KERB_ENCTYPE_CLAIMS_SUPPORTED
2127 if compound_id_support:
2128 enctypes = enctypes or 0
2129 enctypes |= security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2130 if sid_compression_support is False:
2131 enctypes = enctypes or 0
2132 enctypes |= security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED
2134 if enctypes is not None:
2135 details['msDS-SupportedEncryptionTypes'] = str(enctypes)
2137 if delegation_to_spn:
2138 details['msDS-AllowedToDelegateTo'] = delegation_to_spn
2140 if delegation_from_dn:
2141 if isinstance(delegation_from_dn, str):
2142 delegation_from_dn = self.get_security_descriptor(
2143 delegation_from_dn)
2144 details['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
2145 delegation_from_dn)
2147 if spn is None and account_type is not self.AccountType.USER:
2148 spn = 'host/' + user_name
2150 if assigned_policy is not None:
2151 details['msDS-AssignedAuthNPolicy'] = assigned_policy
2153 if assigned_silo is not None:
2154 details['msDS-AssignedAuthNPolicySilo'] = assigned_silo
2156 if logon_hours is not None:
2157 details['logonHours'] = logon_hours
2159 creds, dn = self.create_account(samdb, user_name,
2160 account_type=account_type,
2161 upn=upn,
2162 spn=spn,
2163 additional_details=details,
2164 account_control=user_account_control,
2165 add_dollar=add_dollar,
2166 force_nt4_hash=force_nt4_hash,
2167 expired_password=expired_password,
2168 preserve=use_cache)
2170 expected_etypes = None
2172 # We don't force fetching the keys other than the NT hash as
2173 # how the server stores the unused KDC keys for the
2174 # smartcard_required case is not important and makes unrelated
2175 # tests break because of differences between Samba and
2176 # Windows.
2178 # The NT hash is different, as it is returned to the client in
2179 # the PAC so is visible in the network behaviour.
2180 if force_nt4_hash:
2181 expected_etypes = {kcrypto.Enctype.RC4}
2182 keys = self.get_keys(creds, expected_etypes=expected_etypes)
2183 self.creds_set_keys(creds, keys)
2185 # Handle secret replication to the RODC.
2187 if allowed_replication or revealed_to_rodc:
2188 rodc_samdb = self.get_rodc_samdb()
2189 rodc_dn = self.get_server_dn(rodc_samdb)
2191 # Allow replicating this account's secrets if requested, or allow
2192 # it only temporarily if we're about to replicate them.
2193 allowed_cleanup = self.add_to_group(
2194 dn, rodc_dn,
2195 'msDS-RevealOnDemandGroup')
2197 if revealed_to_rodc:
2198 # Replicate this account's secrets to the RODC.
2199 self.replicate_account_to_rodc(dn)
2201 if not allowed_replication:
2202 # If we don't want replicating secrets to be allowed for this
2203 # account, disable it again.
2204 samdb.modify(allowed_cleanup)
2206 self.check_revealed(dn,
2207 rodc_dn,
2208 revealed=revealed_to_rodc)
2210 if denied_replication:
2211 rodc_samdb = self.get_rodc_samdb()
2212 rodc_dn = self.get_server_dn(rodc_samdb)
2214 # Deny replicating this account's secrets to the RODC.
2215 self.add_to_group(dn, rodc_dn, 'msDS-NeverRevealGroup')
2217 # Handle secret replication to the mock RODC.
2219 if allowed_replication_mock or revealed_to_mock_rodc:
2220 # Allow replicating this account's secrets if requested, or allow
2221 # it only temporarily if we want to add the account to the mock
2222 # RODC's msDS-RevealedUsers.
2223 rodc_ctx = self.get_mock_rodc_ctx()
2224 mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2226 allowed_mock_cleanup = self.add_to_group(
2227 dn, mock_rodc_dn,
2228 'msDS-RevealOnDemandGroup')
2230 if revealed_to_mock_rodc:
2231 # Request replicating this account's secrets to the mock RODC,
2232 # which updates msDS-RevealedUsers.
2233 self.reveal_account_to_mock_rodc(dn)
2235 if not allowed_replication_mock:
2236 # If we don't want replicating secrets to be allowed for this
2237 # account, disable it again.
2238 samdb.modify(allowed_mock_cleanup)
2240 self.check_revealed(dn,
2241 mock_rodc_dn,
2242 revealed=revealed_to_mock_rodc)
2244 if denied_replication_mock:
2245 # Deny replicating this account's secrets to the mock RODC.
2246 rodc_ctx = self.get_mock_rodc_ctx()
2247 mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2249 self.add_to_group(dn, mock_rodc_dn, 'msDS-NeverRevealGroup')
2251 if member_of is not None:
2252 for group_dn in member_of:
2253 self.add_to_group(dn, ldb.Dn(samdb, group_dn), 'member',
2254 expect_attr=False)
2256 if kerberos_enabled:
2257 creds.set_kerberos_state(MUST_USE_KERBEROS)
2258 else:
2259 creds.set_kerberos_state(DONT_USE_KERBEROS)
2261 if secure_channel_type is not None:
2262 creds.set_secure_channel_type(secure_channel_type)
2264 return creds
2266 def get_new_username(self):
2267 user_name = self.account_base + str(self.account_id)
2268 type(self).account_id += 1
2270 return user_name
2272 def get_client_creds(self,
2273 allow_missing_password=False,
2274 allow_missing_keys=True):
2275 def create_client_account():
2276 return self.get_cached_creds(account_type=self.AccountType.USER)
2278 c = self._get_krb5_creds(prefix='CLIENT',
2279 allow_missing_password=allow_missing_password,
2280 allow_missing_keys=allow_missing_keys,
2281 fallback_creds_fn=create_client_account)
2282 return c
2284 def get_mach_creds(self,
2285 allow_missing_password=False,
2286 allow_missing_keys=True):
2287 def create_mach_account():
2288 return self.get_cached_creds(
2289 account_type=self.AccountType.COMPUTER,
2290 opts={
2291 'fast_support': True,
2292 'claims_support': True,
2293 'compound_id_support': True,
2294 'supported_enctypes': (
2295 security.KERB_ENCTYPE_RC4_HMAC_MD5 |
2296 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2300 c = self._get_krb5_creds(prefix='MAC',
2301 allow_missing_password=allow_missing_password,
2302 allow_missing_keys=allow_missing_keys,
2303 fallback_creds_fn=create_mach_account)
2304 return c
2306 def get_service_creds(self,
2307 allow_missing_password=False,
2308 allow_missing_keys=True):
2309 def create_service_account():
2310 return self.get_cached_creds(
2311 account_type=self.AccountType.COMPUTER,
2312 opts={
2313 'trusted_to_auth_for_delegation': True,
2314 'fast_support': True,
2315 'claims_support': True,
2316 'compound_id_support': True,
2317 'supported_enctypes': (
2318 security.KERB_ENCTYPE_RC4_HMAC_MD5 |
2319 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2323 c = self._get_krb5_creds(prefix='SERVICE',
2324 allow_missing_password=allow_missing_password,
2325 allow_missing_keys=allow_missing_keys,
2326 fallback_creds_fn=create_service_account)
2327 return c
2329 def get_rodc_krbtgt_creds(self,
2330 require_keys=True,
2331 require_strongest_key=False):
2332 if require_strongest_key:
2333 self.assertTrue(require_keys)
2335 def download_rodc_krbtgt_creds():
2336 samdb = self.get_samdb()
2337 rodc_samdb = self.get_rodc_samdb()
2339 rodc_dn = self.get_server_dn(rodc_samdb)
2341 res = samdb.search(rodc_dn,
2342 scope=ldb.SCOPE_BASE,
2343 attrs=['msDS-KrbTgtLink'])
2344 krbtgt_dn = res[0]['msDS-KrbTgtLink'][0]
2346 res = samdb.search(krbtgt_dn,
2347 scope=ldb.SCOPE_BASE,
2348 attrs=['sAMAccountName',
2349 'msDS-KeyVersionNumber',
2350 'msDS-SecondaryKrbTgtNumber'])
2351 krbtgt_dn = res[0].dn
2352 username = str(res[0]['sAMAccountName'])
2354 creds = KerberosCredentials()
2355 creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
2356 creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
2357 creds.set_username(username)
2359 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2360 krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
2362 rodc_kvno = krbtgt_number << 16 | kvno
2363 creds.set_kvno(rodc_kvno)
2364 creds.set_dn(krbtgt_dn)
2366 keys = self.get_keys(creds)
2367 self.creds_set_keys(creds, keys)
2369 # The RODC krbtgt account should support the default enctypes,
2370 # although it might not have the msDS-SupportedEncryptionTypes
2371 # attribute.
2372 self.creds_set_default_enctypes(
2373 creds,
2374 fast_support=self.kdc_fast_support,
2375 claims_support=self.kdc_claims_support,
2376 compound_id_support=self.kdc_compound_id_support)
2378 return creds
2380 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
2381 allow_missing_password=True,
2382 allow_missing_keys=not require_keys,
2383 require_strongest_key=require_strongest_key,
2384 fallback_creds_fn=download_rodc_krbtgt_creds)
2385 return c
2387 def get_mock_rodc_krbtgt_creds(self,
2388 require_keys=True,
2389 require_strongest_key=False):
2390 if require_strongest_key:
2391 self.assertTrue(require_keys)
2393 def create_rodc_krbtgt_account():
2394 samdb = self.get_samdb()
2396 rodc_ctx = self.get_mock_rodc_ctx()
2398 krbtgt_dn = rodc_ctx.new_krbtgt_dn
2400 res = samdb.search(base=ldb.Dn(samdb, krbtgt_dn),
2401 scope=ldb.SCOPE_BASE,
2402 attrs=['msDS-KeyVersionNumber',
2403 'msDS-SecondaryKrbTgtNumber'])
2404 dn = res[0].dn
2405 username = str(rodc_ctx.krbtgt_name)
2407 creds = KerberosCredentials()
2408 creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
2409 creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
2410 creds.set_username(username)
2412 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2413 krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
2415 rodc_kvno = krbtgt_number << 16 | kvno
2416 creds.set_kvno(rodc_kvno)
2417 creds.set_dn(dn)
2419 keys = self.get_keys(creds)
2420 self.creds_set_keys(creds, keys)
2422 if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
2423 extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2424 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96)
2425 else:
2426 extra_bits = 0
2427 remove_bits = (security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK |
2428 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2429 self.creds_set_enctypes(creds,
2430 extra_bits=extra_bits,
2431 remove_bits=remove_bits)
2433 return creds
2435 c = self._get_krb5_creds(prefix='MOCK_RODC_KRBTGT',
2436 allow_missing_password=True,
2437 allow_missing_keys=not require_keys,
2438 require_strongest_key=require_strongest_key,
2439 fallback_creds_fn=create_rodc_krbtgt_account)
2440 return c
2442 def get_krbtgt_creds(self,
2443 require_keys=True,
2444 require_strongest_key=False):
2445 if require_strongest_key:
2446 self.assertTrue(require_keys)
2448 def download_krbtgt_creds():
2449 samdb = self.get_samdb()
2451 krbtgt_rid = security.DOMAIN_RID_KRBTGT
2452 krbtgt_sid = '%s-%d' % (samdb.get_domain_sid(), krbtgt_rid)
2454 res = samdb.search(base='<SID=%s>' % krbtgt_sid,
2455 scope=ldb.SCOPE_BASE,
2456 attrs=['sAMAccountName',
2457 'msDS-KeyVersionNumber'])
2458 dn = res[0].dn
2459 username = str(res[0]['sAMAccountName'])
2461 creds = KerberosCredentials()
2462 creds.set_domain(self.env_get_var('DOMAIN', 'KRBTGT'))
2463 creds.set_realm(self.env_get_var('REALM', 'KRBTGT'))
2464 creds.set_username(username)
2466 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2467 creds.set_kvno(kvno)
2468 creds.set_dn(dn)
2470 keys = self.get_keys(creds)
2471 self.creds_set_keys(creds, keys)
2473 # The krbtgt account should support the default enctypes, although
2474 # it might not (on Samba) have the msDS-SupportedEncryptionTypes
2475 # attribute.
2476 self.creds_set_default_enctypes(
2477 creds,
2478 fast_support=self.kdc_fast_support,
2479 claims_support=self.kdc_claims_support,
2480 compound_id_support=self.kdc_compound_id_support)
2482 return creds
2484 c = self._get_krb5_creds(prefix='KRBTGT',
2485 default_username='krbtgt',
2486 allow_missing_password=True,
2487 allow_missing_keys=not require_keys,
2488 require_strongest_key=require_strongest_key,
2489 fallback_creds_fn=download_krbtgt_creds)
2490 return c
2492 def get_dc_creds(self,
2493 require_keys=True,
2494 require_strongest_key=False):
2495 if require_strongest_key:
2496 self.assertTrue(require_keys)
2498 def download_dc_creds():
2499 samdb = self.get_samdb()
2501 dc_rid = 1000
2502 dc_sid = '%s-%d' % (samdb.get_domain_sid(), dc_rid)
2504 res = samdb.search(base='<SID=%s>' % dc_sid,
2505 scope=ldb.SCOPE_BASE,
2506 attrs=['sAMAccountName',
2507 'msDS-KeyVersionNumber'])
2508 dn = res[0].dn
2509 username = str(res[0]['sAMAccountName'])
2511 creds = KerberosCredentials()
2512 creds.set_domain(self.env_get_var('DOMAIN', 'DC'))
2513 creds.set_realm(self.env_get_var('REALM', 'DC'))
2514 creds.set_username(username)
2516 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2517 creds.set_kvno(kvno)
2518 creds.set_workstation(username[:-1])
2519 creds.set_dn(dn)
2521 keys = self.get_keys(creds)
2522 self.creds_set_keys(creds, keys)
2524 if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
2525 extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2526 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96)
2527 else:
2528 extra_bits = 0
2529 remove_bits = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2530 self.creds_set_enctypes(creds,
2531 extra_bits=extra_bits,
2532 remove_bits=remove_bits)
2534 return creds
2536 c = self._get_krb5_creds(prefix='DC',
2537 allow_missing_password=True,
2538 allow_missing_keys=not require_keys,
2539 require_strongest_key=require_strongest_key,
2540 fallback_creds_fn=download_dc_creds)
2541 return c
2543 def get_server_creds(self,
2544 require_keys=True,
2545 require_strongest_key=False):
2546 if require_strongest_key:
2547 self.assertTrue(require_keys)
2549 def download_server_creds():
2550 samdb = self.get_samdb()
2552 res = samdb.search(base=samdb.get_default_basedn(),
2553 expression=(f'(|(sAMAccountName={self.host}*)'
2554 f'(dNSHostName={self.host}))'),
2555 scope=ldb.SCOPE_SUBTREE,
2556 attrs=['sAMAccountName',
2557 'msDS-KeyVersionNumber'])
2558 self.assertEqual(1, len(res))
2559 dn = res[0].dn
2560 username = str(res[0]['sAMAccountName'])
2562 creds = KerberosCredentials()
2563 creds.set_domain(self.env_get_var('DOMAIN', 'SERVER'))
2564 creds.set_realm(self.env_get_var('REALM', 'SERVER'))
2565 creds.set_username(username)
2567 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2568 creds.set_kvno(kvno)
2569 creds.set_dn(dn)
2571 keys = self.get_keys(creds)
2572 self.creds_set_keys(creds, keys)
2574 if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
2575 extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2576 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96)
2577 else:
2578 extra_bits = 0
2579 remove_bits = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2580 self.creds_set_enctypes(creds,
2581 extra_bits=extra_bits,
2582 remove_bits=remove_bits)
2584 return creds
2586 c = self._get_krb5_creds(prefix='SERVER',
2587 allow_missing_password=True,
2588 allow_missing_keys=not require_keys,
2589 require_strongest_key=require_strongest_key,
2590 fallback_creds_fn=download_server_creds)
2591 return c
2593 # Get the credentials and server principal name of either the krbtgt, or a
2594 # specially created account, with resource SID compression either supported
2595 # or unsupported.
2596 def get_target(self,
2597 to_krbtgt, *,
2598 compound_id=None,
2599 compression=None,
2600 extra_enctypes=0):
2601 if to_krbtgt:
2602 self.assertIsNone(compound_id,
2603 "it's no good specifying compound id support "
2604 "for the krbtgt")
2605 self.assertIsNone(compression,
2606 "it's no good specifying compression support "
2607 "for the krbtgt")
2608 self.assertFalse(extra_enctypes,
2609 "it's no good specifying extra enctypes "
2610 "for the krbtgt")
2611 creds = self.get_krbtgt_creds()
2612 sname = self.get_krbtgt_sname()
2613 else:
2614 creds = self.get_cached_creds(
2615 account_type=self.AccountType.COMPUTER,
2616 opts={
2617 'supported_enctypes':
2618 security.KERB_ENCTYPE_RC4_HMAC_MD5 |
2619 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 |
2620 extra_enctypes,
2621 'compound_id_support': compound_id,
2622 'sid_compression_support': compression,
2624 target_name = creds.get_username()
2626 if target_name[-1] == '$':
2627 target_name = target_name[:-1]
2628 sname = self.PrincipalName_create(
2629 name_type=NT_PRINCIPAL,
2630 names=['host', target_name])
2632 return creds, sname
2634 def as_req(self, cname, sname, realm, etypes, padata=None, kdc_options=0):
2635 """Send a Kerberos AS_REQ, returns the undecoded response
2638 till = self.get_KerberosTime(offset=36000)
2640 req = self.AS_REQ_create(padata=padata,
2641 kdc_options=str(kdc_options),
2642 cname=cname,
2643 realm=realm,
2644 sname=sname,
2645 from_time=None,
2646 till_time=till,
2647 renew_time=None,
2648 nonce=0x7fffffff,
2649 etypes=etypes,
2650 addresses=None,
2651 additional_tickets=None)
2652 rep = self.send_recv_transaction(req)
2653 return rep
2655 def get_as_rep_key(self, creds, rep):
2656 """Extract the session key from an AS-REP
2658 rep_padata = self.der_decode(
2659 rep['e-data'],
2660 asn1Spec=krb5_asn1.METHOD_DATA())
2662 for pa in rep_padata:
2663 if pa['padata-type'] == PADATA_ETYPE_INFO2:
2664 padata_value = pa['padata-value']
2665 break
2666 else:
2667 self.fail('expected to find ETYPE-INFO2')
2669 etype_info2 = self.der_decode(
2670 padata_value, asn1Spec=krb5_asn1.ETYPE_INFO2())
2672 key = self.PasswordKey_from_etype_info2(creds, etype_info2[0],
2673 creds.get_kvno())
2674 return key
2676 def get_enc_timestamp_pa_data(self, creds, rep, skew=0):
2677 """generate the pa_data data element for an AS-REQ
2680 key = self.get_as_rep_key(creds, rep)
2682 return self.get_enc_timestamp_pa_data_from_key(key, skew=skew)
2684 def get_enc_timestamp_pa_data_from_key(self, key, skew=0):
2685 (patime, pausec) = self.get_KerberosTimeWithUsec(offset=skew)
2686 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
2687 padata = self.der_encode(padata, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2689 padata = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, padata)
2690 padata = self.der_encode(padata, asn1Spec=krb5_asn1.EncryptedData())
2692 padata = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, padata)
2694 return padata
2696 def get_challenge_pa_data(self, client_challenge_key, skew=0):
2697 patime, pausec = self.get_KerberosTimeWithUsec(offset=skew)
2698 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
2699 padata = self.der_encode(padata,
2700 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
2702 padata = self.EncryptedData_create(client_challenge_key,
2703 KU_ENC_CHALLENGE_CLIENT,
2704 padata)
2705 padata = self.der_encode(padata,
2706 asn1Spec=krb5_asn1.EncryptedData())
2708 padata = self.PA_DATA_create(PADATA_ENCRYPTED_CHALLENGE,
2709 padata)
2711 return padata
2713 def get_as_rep_enc_data(self, key, rep):
2714 """ Decrypt and Decode the encrypted data in an AS-REP
2716 enc_part = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
2717 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2718 # application tag 26
2719 try:
2720 enc_part = self.der_decode(
2721 enc_part, asn1Spec=krb5_asn1.EncASRepPart())
2722 except Exception:
2723 enc_part = self.der_decode(
2724 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
2726 return enc_part
2728 def check_pre_authentication(self, rep):
2729 """ Check that the kdc response was pre-authentication required
2731 self.check_error_rep(rep, KDC_ERR_PREAUTH_REQUIRED)
2733 def check_as_reply(self, rep):
2734 """ Check that the kdc response is an AS-REP and that the
2735 values for:
2736 msg-type
2737 pvno
2738 tkt-pvno
2739 kvno
2740 match the expected values
2742 self.check_reply(rep, msg_type=KRB_AS_REP)
2744 def check_tgs_reply(self, rep):
2745 """ Check that the kdc response is an TGS-REP and that the
2746 values for:
2747 msg-type
2748 pvno
2749 tkt-pvno
2750 kvno
2751 match the expected values
2753 self.check_reply(rep, msg_type=KRB_TGS_REP)
2755 def check_reply(self, rep, msg_type):
2757 # Should have a reply, and it should an TGS-REP message.
2758 self.assertIsNotNone(rep)
2759 self.assertEqual(rep['msg-type'], msg_type, "rep = {%s}" % rep)
2761 # Protocol version number should be 5
2762 pvno = int(rep['pvno'])
2763 self.assertEqual(5, pvno, "rep = {%s}" % rep)
2765 # The ticket version number should be 5
2766 tkt_vno = int(rep['ticket']['tkt-vno'])
2767 self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
2769 # Check that the kvno is not an RODC kvno
2770 # MIT kerberos does not provide the kvno, so we treat it as optional.
2771 # This is tested in compatability_test.py
2772 if 'kvno' in rep['enc-part']:
2773 kvno = int(rep['enc-part']['kvno'])
2774 # If the high order bits are set this is an RODC kvno.
2775 self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
2777 def check_error_rep(self, rep, expected):
2778 """ Check that the reply is an error message, with the expected
2779 error-code specified.
2781 self.assertIsNotNone(rep)
2782 self.assertEqual(rep['msg-type'], KRB_ERROR, "rep = {%s}" % rep)
2783 if isinstance(expected, collections.abc.Container):
2784 self.assertIn(rep['error-code'], expected, "rep = {%s}" % rep)
2785 else:
2786 self.assertEqual(rep['error-code'], expected, "rep = {%s}" % rep)
2788 def tgs_req(self, cname, sname, realm, ticket, key, etypes,
2789 expected_error_mode=0, padata=None, kdc_options=0,
2790 to_rodc=False, creds=None, service_creds=None, expect_pac=True,
2791 expect_edata=None, expected_flags=None, unexpected_flags=None):
2792 """Send a TGS-REQ, returns the response and the decrypted and
2793 decoded enc-part
2796 subkey = self.RandomKey(key.etype)
2798 (ctime, cusec) = self.get_KerberosTimeWithUsec()
2800 tgt = KerberosTicketCreds(ticket,
2801 key,
2802 crealm=realm,
2803 cname=cname)
2805 if service_creds is not None:
2806 decryption_key = self.TicketDecryptionKey_from_creds(
2807 service_creds)
2808 expected_supported_etypes = service_creds.tgs_supported_enctypes
2809 else:
2810 decryption_key = None
2811 expected_supported_etypes = None
2813 if not expected_error_mode:
2814 check_error_fn = None
2815 check_rep_fn = self.generic_check_kdc_rep
2816 else:
2817 check_error_fn = self.generic_check_kdc_error
2818 check_rep_fn = None
2820 def generate_padata(_kdc_exchange_dict,
2821 _callback_dict,
2822 req_body):
2824 return padata, req_body
2826 kdc_exchange_dict = self.tgs_exchange_dict(
2827 creds=creds,
2828 expected_crealm=realm,
2829 expected_cname=cname,
2830 expected_srealm=realm,
2831 expected_sname=sname,
2832 expected_error_mode=expected_error_mode,
2833 expected_flags=expected_flags,
2834 unexpected_flags=unexpected_flags,
2835 expected_supported_etypes=expected_supported_etypes,
2836 check_error_fn=check_error_fn,
2837 check_rep_fn=check_rep_fn,
2838 check_kdc_private_fn=self.generic_check_kdc_private,
2839 ticket_decryption_key=decryption_key,
2840 generate_padata_fn=generate_padata if padata is not None else None,
2841 tgt=tgt,
2842 authenticator_subkey=subkey,
2843 kdc_options=str(kdc_options),
2844 expect_edata=expect_edata,
2845 expect_pac=expect_pac,
2846 to_rodc=to_rodc)
2848 rep = self._generic_kdc_exchange(kdc_exchange_dict,
2849 cname=None,
2850 realm=realm,
2851 sname=sname,
2852 etypes=etypes)
2854 if expected_error_mode:
2855 enc_part = None
2856 else:
2857 ticket_creds = kdc_exchange_dict['rep_ticket_creds']
2858 enc_part = ticket_creds.encpart_private
2860 return rep, enc_part
2862 def get_service_ticket(self, tgt, target_creds, service='host',
2863 sname=None,
2864 target_name=None, till=None, rc4_support=True,
2865 to_rodc=False, kdc_options=None,
2866 expected_flags=None, unexpected_flags=None,
2867 expected_groups=None,
2868 unexpected_groups=None,
2869 expect_client_claims=None,
2870 expect_device_claims=None,
2871 expected_client_claims=None,
2872 unexpected_client_claims=None,
2873 expected_device_claims=None,
2874 unexpected_device_claims=None,
2875 pac_request=True, expect_pac=True,
2876 expect_requester_sid=None,
2877 expect_pac_attrs=None,
2878 expect_pac_attrs_pac_request=None,
2879 fresh=False):
2880 user_name = tgt.cname['name-string'][0]
2881 ticket_sname = tgt.sname
2882 if target_name is None:
2883 target_name = target_creds.get_username()[:-1]
2884 else:
2885 self.assertIsNone(sname, 'supplied both target name and sname')
2886 cache_key = (user_name, target_name, service, to_rodc, kdc_options,
2887 pac_request, str(expected_flags), str(unexpected_flags),
2888 till, rc4_support,
2889 str(ticket_sname),
2890 str(sname),
2891 str(expected_groups),
2892 str(unexpected_groups),
2893 expect_client_claims, expect_device_claims,
2894 str(expected_client_claims),
2895 str(unexpected_client_claims),
2896 str(expected_device_claims),
2897 str(unexpected_device_claims),
2898 expect_pac,
2899 expect_requester_sid,
2900 expect_pac_attrs,
2901 expect_pac_attrs_pac_request)
2903 if not fresh:
2904 ticket = self.tkt_cache.get(cache_key)
2906 if ticket is not None:
2907 return ticket
2909 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
2911 if kdc_options is None:
2912 kdc_options = '0'
2913 kdc_options = str(krb5_asn1.KDCOptions(kdc_options))
2915 if sname is None:
2916 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2917 names=[service, target_name])
2919 srealm = target_creds.get_realm()
2921 authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
2923 decryption_key = self.TicketDecryptionKey_from_creds(target_creds)
2925 kdc_exchange_dict = self.tgs_exchange_dict(
2926 expected_crealm=tgt.crealm,
2927 expected_cname=tgt.cname,
2928 expected_srealm=srealm,
2929 expected_sname=sname,
2930 expected_supported_etypes=target_creds.tgs_supported_enctypes,
2931 expected_flags=expected_flags,
2932 unexpected_flags=unexpected_flags,
2933 expected_groups=expected_groups,
2934 unexpected_groups=unexpected_groups,
2935 expect_client_claims=expect_client_claims,
2936 expect_device_claims=expect_device_claims,
2937 expected_client_claims=expected_client_claims,
2938 unexpected_client_claims=unexpected_client_claims,
2939 expected_device_claims=expected_device_claims,
2940 unexpected_device_claims=unexpected_device_claims,
2941 ticket_decryption_key=decryption_key,
2942 check_rep_fn=self.generic_check_kdc_rep,
2943 check_kdc_private_fn=self.generic_check_kdc_private,
2944 tgt=tgt,
2945 authenticator_subkey=authenticator_subkey,
2946 kdc_options=kdc_options,
2947 pac_request=pac_request,
2948 expect_pac=expect_pac,
2949 expect_requester_sid=expect_requester_sid,
2950 expect_pac_attrs=expect_pac_attrs,
2951 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2952 rc4_support=rc4_support,
2953 to_rodc=to_rodc)
2955 rep = self._generic_kdc_exchange(kdc_exchange_dict,
2956 cname=None,
2957 realm=srealm,
2958 sname=sname,
2959 till_time=till,
2960 etypes=etype)
2961 self.check_tgs_reply(rep)
2963 service_ticket_creds = kdc_exchange_dict['rep_ticket_creds']
2965 if to_rodc:
2966 krbtgt_creds = self.get_rodc_krbtgt_creds()
2967 else:
2968 krbtgt_creds = self.get_krbtgt_creds()
2969 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2971 is_tgs_princ = self.is_tgs_principal(sname)
2972 expect_ticket_checksum = (self.tkt_sig_support
2973 and not is_tgs_princ)
2974 expect_full_checksum = (self.full_sig_support
2975 and not is_tgs_princ)
2976 self.verify_ticket(service_ticket_creds, krbtgt_key,
2977 service_ticket=True, expect_pac=expect_pac,
2978 expect_ticket_checksum=expect_ticket_checksum,
2979 expect_full_checksum=expect_full_checksum)
2981 self.tkt_cache[cache_key] = service_ticket_creds
2983 return service_ticket_creds
2985 def get_tgt(self, creds, to_rodc=False, kdc_options=None,
2986 client_account=None, client_name_type=NT_PRINCIPAL,
2987 target_creds=None, ticket_etype=None,
2988 expected_flags=None, unexpected_flags=None,
2989 expected_account_name=None, expected_upn_name=None,
2990 expected_cname=None,
2991 expected_sid=None,
2992 sname=None, realm=None,
2993 expected_groups=None,
2994 unexpected_groups=None,
2995 pac_request=True, expect_pac=True,
2996 expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2997 pac_options=None,
2998 expect_requester_sid=None,
2999 rc4_support=True,
3000 expect_edata=None,
3001 expect_client_claims=None, expect_device_claims=None,
3002 expected_client_claims=None, unexpected_client_claims=None,
3003 expected_device_claims=None, unexpected_device_claims=None,
3004 fresh=False):
3005 if client_account is not None:
3006 user_name = client_account
3007 else:
3008 user_name = creds.get_username()
3010 cache_key = (user_name, to_rodc, kdc_options, pac_request, pac_options,
3011 client_name_type,
3012 ticket_etype,
3013 str(expected_flags), str(unexpected_flags),
3014 expected_account_name, expected_upn_name, expected_sid,
3015 str(sname), str(realm),
3016 str(expected_groups),
3017 str(unexpected_groups),
3018 str(expected_cname),
3019 rc4_support,
3020 expect_edata,
3021 expect_pac, expect_pac_attrs,
3022 expect_pac_attrs_pac_request, expect_requester_sid,
3023 expect_client_claims, expect_device_claims,
3024 str(expected_client_claims),
3025 str(unexpected_client_claims),
3026 str(expected_device_claims),
3027 str(unexpected_device_claims))
3029 if not fresh:
3030 tgt = self.tkt_cache.get(cache_key)
3032 if tgt is not None:
3033 return tgt
3035 if realm is None:
3036 realm = creds.get_realm()
3038 salt = creds.get_salt()
3040 etype = self.get_default_enctypes(creds)
3041 cname = self.PrincipalName_create(name_type=client_name_type,
3042 names=user_name.split('/'))
3043 if sname is None:
3044 sname = self.PrincipalName_create(name_type=NT_SRV_INST,
3045 names=['krbtgt', realm])
3046 expected_sname = self.PrincipalName_create(
3047 name_type=NT_SRV_INST, names=['krbtgt', realm.upper()])
3048 else:
3049 expected_sname = sname
3051 if expected_cname is None:
3052 expected_cname = cname
3054 till = self.get_KerberosTime(offset=36000)
3056 if target_creds is not None:
3057 krbtgt_creds = target_creds
3058 elif to_rodc:
3059 krbtgt_creds = self.get_rodc_krbtgt_creds()
3060 else:
3061 krbtgt_creds = self.get_krbtgt_creds()
3062 ticket_decryption_key = (
3063 self.TicketDecryptionKey_from_creds(krbtgt_creds,
3064 etype=ticket_etype))
3066 expected_etypes = krbtgt_creds.tgs_supported_enctypes
3068 if kdc_options is None:
3069 kdc_options = ('forwardable,'
3070 'renewable,'
3071 'canonicalize,'
3072 'renewable-ok')
3073 kdc_options = krb5_asn1.KDCOptions(kdc_options)
3075 if pac_options is None:
3076 pac_options = '1' # supports claims
3078 rep, kdc_exchange_dict = self._test_as_exchange(
3079 creds=creds,
3080 cname=cname,
3081 realm=realm,
3082 sname=sname,
3083 till=till,
3084 expected_error_mode=KDC_ERR_PREAUTH_REQUIRED,
3085 expected_crealm=realm,
3086 expected_cname=expected_cname,
3087 expected_srealm=realm,
3088 expected_sname=sname,
3089 expected_account_name=expected_account_name,
3090 expected_upn_name=expected_upn_name,
3091 expected_sid=expected_sid,
3092 expected_groups=expected_groups,
3093 unexpected_groups=unexpected_groups,
3094 expected_salt=salt,
3095 expected_flags=expected_flags,
3096 unexpected_flags=unexpected_flags,
3097 expected_supported_etypes=expected_etypes,
3098 etypes=etype,
3099 padata=None,
3100 kdc_options=kdc_options,
3101 preauth_key=None,
3102 ticket_decryption_key=ticket_decryption_key,
3103 pac_request=pac_request,
3104 pac_options=pac_options,
3105 expect_pac=expect_pac,
3106 expect_pac_attrs=expect_pac_attrs,
3107 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
3108 expect_requester_sid=expect_requester_sid,
3109 rc4_support=rc4_support,
3110 expect_edata=expect_edata,
3111 expect_client_claims=expect_client_claims,
3112 expect_device_claims=expect_device_claims,
3113 expected_client_claims=expected_client_claims,
3114 unexpected_client_claims=unexpected_client_claims,
3115 expected_device_claims=expected_device_claims,
3116 unexpected_device_claims=unexpected_device_claims,
3117 to_rodc=to_rodc)
3118 self.check_pre_authentication(rep)
3120 etype_info2 = kdc_exchange_dict['preauth_etype_info2']
3122 preauth_key = self.PasswordKey_from_etype_info2(creds,
3123 etype_info2[0],
3124 creds.get_kvno())
3126 ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
3128 padata = [ts_enc_padata]
3130 expected_realm = realm.upper()
3132 rep, kdc_exchange_dict = self._test_as_exchange(
3133 creds=creds,
3134 cname=cname,
3135 realm=realm,
3136 sname=sname,
3137 till=till,
3138 expected_error_mode=0,
3139 expected_crealm=expected_realm,
3140 expected_cname=expected_cname,
3141 expected_srealm=expected_realm,
3142 expected_sname=expected_sname,
3143 expected_account_name=expected_account_name,
3144 expected_upn_name=expected_upn_name,
3145 expected_sid=expected_sid,
3146 expected_groups=expected_groups,
3147 unexpected_groups=unexpected_groups,
3148 expected_salt=salt,
3149 expected_flags=expected_flags,
3150 unexpected_flags=unexpected_flags,
3151 expected_supported_etypes=expected_etypes,
3152 etypes=etype,
3153 padata=padata,
3154 kdc_options=kdc_options,
3155 preauth_key=preauth_key,
3156 ticket_decryption_key=ticket_decryption_key,
3157 pac_request=pac_request,
3158 pac_options=pac_options,
3159 expect_pac=expect_pac,
3160 expect_pac_attrs=expect_pac_attrs,
3161 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
3162 expect_requester_sid=expect_requester_sid,
3163 rc4_support=rc4_support,
3164 expect_edata=expect_edata,
3165 expect_client_claims=expect_client_claims,
3166 expect_device_claims=expect_device_claims,
3167 expected_client_claims=expected_client_claims,
3168 unexpected_client_claims=unexpected_client_claims,
3169 expected_device_claims=expected_device_claims,
3170 unexpected_device_claims=unexpected_device_claims,
3171 to_rodc=to_rodc)
3172 self.check_as_reply(rep)
3174 ticket_creds = kdc_exchange_dict['rep_ticket_creds']
3176 self.tkt_cache[cache_key] = ticket_creds
3178 return ticket_creds
3180 def _make_tgs_request(self, client_creds, service_creds, tgt,
3181 client_account=None,
3182 client_name_type=NT_PRINCIPAL,
3183 kdc_options=None,
3184 pac_request=None, expect_pac=True,
3185 expect_error=False,
3186 expected_cname=None,
3187 expected_account_name=None,
3188 expected_upn_name=None,
3189 expected_sid=None):
3190 if client_account is None:
3191 client_account = client_creds.get_username()
3192 cname = self.PrincipalName_create(name_type=client_name_type,
3193 names=client_account.split('/'))
3195 service_account = service_creds.get_username()
3196 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
3197 names=[service_account])
3199 realm = service_creds.get_realm()
3201 expected_crealm = realm
3202 if expected_cname is None:
3203 expected_cname = cname
3204 expected_srealm = realm
3205 expected_sname = sname
3207 expected_supported_etypes = service_creds.tgs_supported_enctypes
3209 etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
3211 if kdc_options is None:
3212 kdc_options = 'canonicalize'
3213 kdc_options = str(krb5_asn1.KDCOptions(kdc_options))
3215 target_decryption_key = self.TicketDecryptionKey_from_creds(
3216 service_creds)
3218 authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
3220 if expect_error:
3221 expected_error_mode = expect_error
3222 if expected_error_mode is True:
3223 expected_error_mode = KDC_ERR_TGT_REVOKED
3224 check_error_fn = self.generic_check_kdc_error
3225 check_rep_fn = None
3226 else:
3227 expected_error_mode = 0
3228 check_error_fn = None
3229 check_rep_fn = self.generic_check_kdc_rep
3231 kdc_exchange_dict = self.tgs_exchange_dict(
3232 expected_crealm=expected_crealm,
3233 expected_cname=expected_cname,
3234 expected_srealm=expected_srealm,
3235 expected_sname=expected_sname,
3236 expected_account_name=expected_account_name,
3237 expected_upn_name=expected_upn_name,
3238 expected_sid=expected_sid,
3239 expected_supported_etypes=expected_supported_etypes,
3240 ticket_decryption_key=target_decryption_key,
3241 check_error_fn=check_error_fn,
3242 check_rep_fn=check_rep_fn,
3243 check_kdc_private_fn=self.generic_check_kdc_private,
3244 expected_error_mode=expected_error_mode,
3245 tgt=tgt,
3246 authenticator_subkey=authenticator_subkey,
3247 kdc_options=kdc_options,
3248 pac_request=pac_request,
3249 expect_pac=expect_pac,
3250 expect_edata=False)
3252 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3253 cname=cname,
3254 realm=realm,
3255 sname=sname,
3256 etypes=etypes)
3257 if expect_error:
3258 self.check_error_rep(rep, expected_error_mode)
3260 return None
3261 else:
3262 self.check_reply(rep, KRB_TGS_REP)
3264 return kdc_exchange_dict['rep_ticket_creds']
3266 # Named tuple to contain values of interest when the PAC is decoded.
3267 PacData = namedtuple(
3268 "PacData",
3269 "account_name account_sid logon_name upn domain_name")
3271 def get_pac_data(self, authorization_data):
3272 """Decode the PAC element contained in the authorization-data element
3274 account_name = None
3275 user_sid = None
3276 logon_name = None
3277 upn = None
3278 domain_name = None
3280 # The PAC data will be wrapped in an AD_IF_RELEVANT element
3281 ad_if_relevant_elements = (
3282 x for x in authorization_data if x['ad-type'] == AD_IF_RELEVANT)
3283 for dt in ad_if_relevant_elements:
3284 buf = self.der_decode(
3285 dt['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3286 # The PAC data is further wrapped in a AD_WIN2K_PAC element
3287 for ad in (x for x in buf if x['ad-type'] == AD_WIN2K_PAC):
3288 pb = ndr_unpack(krb5pac.PAC_DATA, ad['ad-data'])
3289 for pac in pb.buffers:
3290 if pac.type == krb5pac.PAC_TYPE_LOGON_INFO:
3291 account_name = (
3292 pac.info.info.info3.base.account_name)
3293 user_sid = (
3294 str(pac.info.info.info3.base.domain_sid)
3295 + "-" + str(pac.info.info.info3.base.rid))
3296 elif pac.type == krb5pac.PAC_TYPE_LOGON_NAME:
3297 logon_name = pac.info.account_name
3298 elif pac.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
3299 upn = pac.info.upn_name
3300 domain_name = pac.info.dns_domain_name
3302 return self.PacData(
3303 account_name,
3304 user_sid,
3305 logon_name,
3306 upn,
3307 domain_name)
3309 def decode_service_ticket(self, creds, ticket):
3310 """Decrypt and decode a service ticket
3313 enc_part = ticket['enc-part']
3315 key = self.TicketDecryptionKey_from_creds(creds,
3316 enc_part['etype'])
3318 if key.kvno is not None:
3319 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3321 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3322 enc_ticket_part = self.der_decode(
3323 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3324 return enc_ticket_part
3326 def modify_ticket_flag(self, enc_part, flag, value):
3327 self.assertIsInstance(value, bool)
3329 flag = krb5_asn1.TicketFlags(flag)
3330 pos = len(tuple(flag)) - 1
3332 flags = enc_part['flags']
3333 self.assertLessEqual(pos, len(flags))
3335 new_flags = flags[:pos] + str(int(value)) + flags[pos + 1:]
3336 enc_part['flags'] = new_flags
3338 return enc_part
3340 def get_objectSid(self, samdb, dn):
3341 """ Get the objectSID for a DN
3342 Note: performs an Ldb query.
3344 res = samdb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
3345 self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
3346 sid = samdb.schema_format_value("objectSID", res[0]["objectSID"][0])
3347 return sid.decode('utf8')
3349 def add_attribute(self, samdb, dn_str, name, value):
3350 if isinstance(value, list):
3351 values = value
3352 else:
3353 values = [value]
3354 flag = ldb.FLAG_MOD_ADD
3356 dn = ldb.Dn(samdb, dn_str)
3357 msg = ldb.Message(dn)
3358 msg[name] = ldb.MessageElement(values, flag, name)
3359 samdb.modify(msg)
3361 def modify_attribute(self, samdb, dn_str, name, value):
3362 if isinstance(value, list):
3363 values = value
3364 else:
3365 values = [value]
3366 flag = ldb.FLAG_MOD_REPLACE
3368 dn = ldb.Dn(samdb, dn_str)
3369 msg = ldb.Message(dn)
3370 msg[name] = ldb.MessageElement(values, flag, name)
3371 samdb.modify(msg)
3373 def remove_attribute(self, samdb, dn_str, name):
3374 flag = ldb.FLAG_MOD_DELETE
3376 dn = ldb.Dn(samdb, dn_str)
3377 msg = ldb.Message(dn)
3378 msg[name] = ldb.MessageElement([], flag, name)
3379 samdb.modify(msg)
3381 def create_ccache(self, cname, ticket, enc_part):
3382 """ Lay out a version 4 on-disk credentials cache, to be read using the
3383 FILE: protocol.
3386 field = krb5ccache.DELTATIME_TAG()
3387 field.kdc_sec_offset = 0
3388 field.kdc_usec_offset = 0
3390 v4tag = krb5ccache.V4TAG()
3391 v4tag.tag = 1
3392 v4tag.field = field
3394 v4tags = krb5ccache.V4TAGS()
3395 v4tags.tag = v4tag
3396 v4tags.further_tags = b''
3398 optional_header = krb5ccache.V4HEADER()
3399 optional_header.v4tags = v4tags
3401 cname_string = cname['name-string']
3403 cprincipal = krb5ccache.PRINCIPAL()
3404 cprincipal.name_type = cname['name-type']
3405 cprincipal.component_count = len(cname_string)
3406 cprincipal.realm = ticket['realm']
3407 cprincipal.components = cname_string
3409 sname = ticket['sname']
3410 sname_string = sname['name-string']
3412 sprincipal = krb5ccache.PRINCIPAL()
3413 sprincipal.name_type = sname['name-type']
3414 sprincipal.component_count = len(sname_string)
3415 sprincipal.realm = ticket['realm']
3416 sprincipal.components = sname_string
3418 key = self.EncryptionKey_import(enc_part['key'])
3420 key_data = key.export_obj()
3421 keyblock = krb5ccache.KEYBLOCK()
3422 keyblock.enctype = key_data['keytype']
3423 keyblock.data = key_data['keyvalue']
3425 addresses = krb5ccache.ADDRESSES()
3426 addresses.count = 0
3427 addresses.data = []
3429 authdata = krb5ccache.AUTHDATA()
3430 authdata.count = 0
3431 authdata.data = []
3433 # Re-encode the ticket, since it was decoded by another layer.
3434 ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
3436 authtime = enc_part['authtime']
3437 starttime = enc_part.get('starttime', authtime)
3438 endtime = enc_part['endtime']
3440 cred = krb5ccache.CREDENTIAL()
3441 cred.client = cprincipal
3442 cred.server = sprincipal
3443 cred.keyblock = keyblock
3444 cred.authtime = self.get_EpochFromKerberosTime(authtime)
3445 cred.starttime = self.get_EpochFromKerberosTime(starttime)
3446 cred.endtime = self.get_EpochFromKerberosTime(endtime)
3448 # Account for clock skew of up to five minutes.
3449 self.assertLess(cred.authtime - 5 * 60,
3450 datetime.now(timezone.utc).timestamp(),
3451 "Ticket not yet valid - clocks may be out of sync.")
3452 self.assertLess(cred.starttime - 5 * 60,
3453 datetime.now(timezone.utc).timestamp(),
3454 "Ticket not yet valid - clocks may be out of sync.")
3455 self.assertGreater(cred.endtime - 60 * 60,
3456 datetime.now(timezone.utc).timestamp(),
3457 "Ticket already expired/about to expire - "
3458 "clocks may be out of sync.")
3460 cred.renew_till = cred.endtime
3461 cred.is_skey = 0
3462 cred.ticket_flags = int(enc_part['flags'], 2)
3463 cred.addresses = addresses
3464 cred.authdata = authdata
3465 cred.ticket = ticket_data
3466 cred.second_ticket = b''
3468 ccache = krb5ccache.CCACHE()
3469 ccache.pvno = 5
3470 ccache.version = 4
3471 ccache.optional_header = optional_header
3472 ccache.principal = cprincipal
3473 ccache.cred = cred
3475 # Serialise the credentials cache structure.
3476 result = ndr_pack(ccache)
3478 # Create a temporary file and write the credentials.
3479 cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
3480 cachefile.write(result)
3481 cachefile.close()
3483 return cachefile
3485 def create_ccache_with_ticket(self, user_credentials, ticket, pac=True):
3486 # Place the ticket into a newly created credentials cache file.
3488 user_name = user_credentials.get_username()
3489 realm = user_credentials.get_realm()
3491 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
3492 names=[user_name])
3494 if not pac:
3495 ticket = self.modified_ticket(ticket, exclude_pac=True)
3497 # Write the ticket into a credentials cache file that can be ingested
3498 # by the main credentials code.
3499 cachefile = self.create_ccache(cname, ticket.ticket,
3500 ticket.encpart_private)
3502 # Create a credentials object to reference the credentials cache.
3503 creds = Credentials()
3504 creds.set_kerberos_state(MUST_USE_KERBEROS)
3505 creds.set_username(user_name, SPECIFIED)
3506 creds.set_realm(realm)
3507 creds.set_named_ccache(cachefile.name, SPECIFIED, self.get_lp())
3509 # Return the credentials along with the cache file.
3510 return (creds, cachefile)
3512 def create_ccache_with_user(self, user_credentials, mach_credentials,
3513 service="host", target_name=None, pac=True):
3514 # Obtain a service ticket authorising the user and place it into a
3515 # newly created credentials cache file.
3517 tgt = self.get_tgt(user_credentials)
3519 ticket = self.get_service_ticket(tgt, mach_credentials,
3520 service=service,
3521 target_name=target_name)
3523 return self.create_ccache_with_ticket(user_credentials, ticket,
3524 pac=pac)
3526 # Test credentials by connecting to the DC through LDAP.
3527 def _connect(self, creds, simple_bind, expect_error=None):
3528 samdb = self.get_samdb()
3529 dn = creds.get_dn()
3531 if simple_bind:
3532 url = f'ldaps://{samdb.host_dns_name()}'
3533 creds.set_bind_dn(str(dn))
3534 else:
3535 url = f'ldap://{samdb.host_dns_name()}'
3536 creds.set_bind_dn(None)
3537 try:
3538 ldap = SamDB(url=url,
3539 credentials=creds,
3540 lp=self.get_lp())
3541 except ldb.LdbError as err:
3542 self.assertIsNotNone(expect_error, 'got unexpected error')
3543 num, estr = err.args
3544 if num != ldb.ERR_INVALID_CREDENTIALS:
3545 raise
3547 self.assertIn(expect_error, estr)
3549 return
3550 else:
3551 self.assertIsNone(expect_error, 'expected to get an error')
3553 res = ldap.search('',
3554 scope=ldb.SCOPE_BASE,
3555 attrs=['tokenGroups'])
3556 self.assertEqual(1, len(res))
3558 sid = creds.get_sid()
3560 token_groups = res[0].get('tokenGroups', idx=0)
3561 token_sid = ndr_unpack(security.dom_sid, token_groups)
3563 self.assertEqual(sid, str(token_sid))
3565 # Test the two SAMR password change methods implemented in Samba. If the
3566 # user is protected, we should get an ACCOUNT_RESTRICTION error indicating
3567 # that the password change is not allowed.
3568 def _test_samr_change_password(self, creds, expect_error,
3569 connect_error=None):
3570 samdb = self.get_samdb()
3571 server_name = samdb.host_dns_name()
3572 try:
3573 conn = samr.samr(f'ncacn_np:{server_name}[seal,smb2]',
3574 self.get_lp(),
3575 creds)
3576 except NTSTATUSError as err:
3577 self.assertIsNotNone(connect_error,
3578 'connection unexpectedly failed')
3579 self.assertIsNone(expect_error, 'don’t specify both errors')
3581 num, _ = err.args
3582 self.assertEqual(num, connect_error)
3584 return
3585 else:
3586 self.assertIsNone(connect_error, 'expected connection to fail')
3588 # Get the NT hash.
3589 nt_hash = creds.get_nt_hash()
3591 # Generate a new UTF-16 password.
3592 new_password_str = generate_random_password(32, 32)
3593 new_password = new_password_str.encode('utf-16le')
3595 # Generate the MD4 hash of the password.
3596 new_password_md4 = md4_hash_blob(new_password)
3598 # Prefix the password with padding so it is 512 bytes long.
3599 new_password_len = len(new_password)
3600 remaining_len = 512 - new_password_len
3601 new_password = bytes(remaining_len) + new_password
3603 # Append the 32-bit length of the password.
3604 new_password += int.to_bytes(new_password_len,
3605 length=4,
3606 byteorder='little')
3608 # Create a key from the MD4 hash of the new password.
3609 key = new_password_md4[:14]
3611 # Encrypt the old NT hash with DES to obtain the verifier.
3612 verifier = des_crypt_blob_16(nt_hash, key)
3614 server = lsa.String()
3615 server.string = server_name
3617 account = lsa.String()
3618 account.string = creds.get_username()
3620 nt_verifier = samr.Password()
3621 nt_verifier.hash = list(verifier)
3623 nt_password = samr.CryptPassword()
3624 nt_password.data = list(arcfour_encrypt(nt_hash, new_password))
3626 if not self.expect_nt_hash:
3627 expect_error = ntstatus.NT_STATUS_NTLM_BLOCKED
3629 try:
3630 conn.ChangePasswordUser2(server=server,
3631 account=account,
3632 nt_password=nt_password,
3633 nt_verifier=nt_verifier,
3634 lm_change=False,
3635 lm_password=None,
3636 lm_verifier=None)
3637 except NTSTATUSError as err:
3638 num, _ = err.args
3639 self.assertIsNotNone(expect_error,
3640 f'unexpectedly failed with {num:08X}')
3641 self.assertEqual(num, expect_error)
3642 else:
3643 self.assertIsNone(expect_error, 'expected to fail')
3645 creds.set_password(new_password_str)
3647 # Get the NT hash.
3648 nt_hash = creds.get_nt_hash()
3650 # Generate a new UTF-16 password.
3651 new_password = generate_random_password(32, 32)
3652 new_password = new_password.encode('utf-16le')
3654 # Generate the MD4 hash of the password.
3655 new_password_md4 = md4_hash_blob(new_password)
3657 # Prefix the password with padding so it is 512 bytes long.
3658 new_password_len = len(new_password)
3659 remaining_len = 512 - new_password_len
3660 new_password = bytes(remaining_len) + new_password
3662 # Append the 32-bit length of the password.
3663 new_password += int.to_bytes(new_password_len,
3664 length=4,
3665 byteorder='little')
3667 # Create a key from the MD4 hash of the new password.
3668 key = new_password_md4[:14]
3670 # Encrypt the old NT hash with DES to obtain the verifier.
3671 verifier = des_crypt_blob_16(nt_hash, key)
3673 nt_verifier.hash = list(verifier)
3675 nt_password.data = list(arcfour_encrypt(nt_hash, new_password))
3677 try:
3678 conn.ChangePasswordUser3(server=server,
3679 account=account,
3680 nt_password=nt_password,
3681 nt_verifier=nt_verifier,
3682 lm_change=False,
3683 lm_password=None,
3684 lm_verifier=None,
3685 password3=None)
3686 except NTSTATUSError as err:
3687 self.assertIsNotNone(expect_error, 'unexpectedly failed')
3689 num, _ = err.args
3690 self.assertEqual(num, expect_error)
3691 else:
3692 self.assertIsNone(expect_error, 'expected to fail')
3694 # Test SamLogon. Authentication should succeed for non-protected accounts,
3695 # and fail for protected accounts.
3696 def _test_samlogon(self, creds, logon_type, expect_error=None,
3697 validation_level=netlogon.NetlogonValidationSamInfo2,
3698 domain_joined_mach_creds=None):
3699 samdb = self.get_samdb()
3701 if domain_joined_mach_creds is None:
3702 domain_joined_mach_creds = self.get_cached_creds(
3703 account_type=self.AccountType.COMPUTER,
3704 opts={'secure_channel_type': misc.SEC_CHAN_WKSTA})
3706 dc_server = samdb.host_dns_name()
3707 username, domain = creds.get_ntlm_username_domain()
3708 workstation = domain_joined_mach_creds.get_username()
3710 # Calling this initializes netlogon_creds on mach_creds, as is required
3711 # before calling mach_creds.encrypt_netr_PasswordInfo().
3712 conn = netlogon.netlogon(f'ncacn_ip_tcp:{dc_server}[schannel,seal]',
3713 self.get_lp(),
3714 domain_joined_mach_creds)
3715 (auth_type, auth_level) = conn.auth_info()
3717 if logon_type == netlogon.NetlogonInteractiveInformation:
3718 logon = netlogon.netr_PasswordInfo()
3720 lm_pass = samr.Password()
3721 lm_pass.hash = [0] * 16
3723 nt_pass = samr.Password()
3724 nt_pass.hash = list(creds.get_nt_hash())
3726 logon.lmpassword = lm_pass
3727 logon.ntpassword = nt_pass
3729 domain_joined_mach_creds.encrypt_netr_PasswordInfo(info=logon,
3730 auth_type=auth_type,
3731 auth_level=auth_level)
3733 elif logon_type == netlogon.NetlogonNetworkInformation:
3734 computername = ntlmssp.AV_PAIR()
3735 computername.AvId = ntlmssp.MsvAvNbComputerName
3736 computername.Value = workstation
3738 domainname = ntlmssp.AV_PAIR()
3739 domainname.AvId = ntlmssp.MsvAvNbDomainName
3740 domainname.Value = domain
3742 eol = ntlmssp.AV_PAIR()
3743 eol.AvId = ntlmssp.MsvAvEOL
3745 target_info = ntlmssp.AV_PAIR_LIST()
3746 target_info.count = 3
3747 target_info.pair = [domainname, computername, eol]
3749 target_info_blob = ndr_pack(target_info)
3751 challenge = b'abcdefgh'
3752 response = creds.get_ntlm_response(flags=0,
3753 challenge=challenge,
3754 target_info=target_info_blob)
3756 logon = netlogon.netr_NetworkInfo()
3758 logon.challenge = list(challenge)
3759 logon.nt = netlogon.netr_ChallengeResponse()
3760 logon.nt.length = len(response['nt_response'])
3761 logon.nt.data = list(response['nt_response'])
3763 else:
3764 self.fail(f'unknown logon type {logon_type}')
3766 identity_info = netlogon.netr_IdentityInfo()
3767 identity_info.domain_name.string = domain
3768 identity_info.account_name.string = username
3769 identity_info.parameter_control = (
3770 netlogon.MSV1_0_ALLOW_SERVER_TRUST_ACCOUNT) | (
3771 netlogon.MSV1_0_ALLOW_WORKSTATION_TRUST_ACCOUNT)
3772 identity_info.workstation.string = workstation
3774 logon.identity_info = identity_info
3776 netr_flags = 0
3778 validation = None
3780 if not expect_error and not self.expect_nt_hash:
3781 expect_error = ntstatus.NT_STATUS_NTLM_BLOCKED
3783 try:
3784 (validation, authoritative, flags) = (
3785 conn.netr_LogonSamLogonEx(dc_server,
3786 domain_joined_mach_creds.get_workstation(),
3787 logon_type,
3788 logon,
3789 validation_level,
3790 netr_flags))
3791 except NTSTATUSError as err:
3792 status, _ = err.args
3793 self.assertIsNotNone(expect_error,
3794 f'unexpectedly failed with {status:08X}')
3795 self.assertEqual(expect_error, status, 'got wrong status code')
3796 else:
3797 self.assertIsNone(expect_error, 'expected error')
3799 self.assertEqual(1, authoritative)
3800 self.assertEqual(0, flags)
3802 return validation
3804 def check_ticket_times(self,
3805 ticket_creds,
3806 expected_life=None,
3807 expected_renew_life=None,
3808 delta=0):
3809 ticket = ticket_creds.ticket_private
3811 authtime = ticket['authtime']
3812 starttime = ticket.get('starttime', authtime)
3813 endtime = ticket['endtime']
3814 renew_till = ticket.get('renew-till', None)
3816 starttime = self.get_EpochFromKerberosTime(starttime)
3818 if expected_life is not None:
3819 actual_end = self.get_EpochFromKerberosTime(
3820 endtime.decode('ascii'))
3821 actual_lifetime = actual_end - starttime
3823 self.assertAlmostEqual(expected_life, actual_lifetime, delta=delta)
3825 if renew_till is None:
3826 self.assertIsNone(expected_renew_life)
3827 else:
3828 if expected_renew_life is not None:
3829 actual_renew_till = self.get_EpochFromKerberosTime(
3830 renew_till.decode('ascii'))
3831 actual_renew_life = actual_renew_till - starttime
3833 self.assertAlmostEqual(expected_renew_life, actual_renew_life, delta=delta)