drsuapi.idl: fix source_dsa spelling
[samba4-gss.git] / python / samba / tests / krb5 / kdc_base_test.py
blob17877aa886378faa27830a806e84f6bec0a304df
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import os
20 import sys
22 sys.path.insert(0, "bin/python")
23 os.environ["PYTHONUNBUFFERED"] = "1"
25 import binascii
26 import collections
27 import numbers
28 import secrets
29 import tempfile
30 from collections import namedtuple
31 from datetime import datetime, timezone
32 from enum import Enum
33 from functools import partial
34 from typing import Dict, Optional
36 import ldb
37 from ldb import SCOPE_BASE
39 from samba import (
40 NTSTATUSError,
41 arcfour_encrypt,
42 common,
43 generate_random_password,
44 net,
45 ntstatus,
46 current_unix_time,
47 unix2nttime,
49 from samba.auth import system_session
50 from samba.credentials import (
51 DONT_USE_KERBEROS,
52 MUST_USE_KERBEROS,
53 SPECIFIED,
54 Credentials,
56 from samba.crypto import des_crypt_blob_16, md4_hash_blob
57 from samba.lsa_utils import OpenPolicyFallback, CreateTrustedDomainFallback
58 from samba.dcerpc import (
59 claims,
60 dcerpc,
61 drsblobs,
62 drsuapi,
63 krb5ccache,
64 krb5pac,
65 lsa,
66 misc,
67 netlogon,
68 ntlmssp,
69 samr,
70 security,
72 from samba.dcerpc.misc import (
73 SEC_CHAN_NULL,
74 SEC_CHAN_BDC,
75 SEC_CHAN_DNS_DOMAIN,
76 SEC_CHAN_DOMAIN,
77 SEC_CHAN_WKSTA,
79 from samba.domain.models import AuthenticationPolicy, AuthenticationSilo
80 from samba.drs_utils import drs_Replicate, drsuapi_connect
81 from samba.dsdb import (
82 DS_DOMAIN_FUNCTION_2000,
83 DS_DOMAIN_FUNCTION_2008,
84 DS_GUID_COMPUTERS_CONTAINER,
85 DS_GUID_DOMAIN_CONTROLLERS_CONTAINER,
86 DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER,
87 DS_GUID_USERS_CONTAINER,
88 DSDB_SYNTAX_BINARY_DN,
89 GTYPE_SECURITY_DOMAIN_LOCAL_GROUP,
90 GTYPE_SECURITY_GLOBAL_GROUP,
91 GTYPE_SECURITY_UNIVERSAL_GROUP,
92 UF_ACCOUNTDISABLE,
93 UF_NO_AUTH_DATA_REQUIRED,
94 UF_NORMAL_ACCOUNT,
95 UF_NOT_DELEGATED,
96 UF_PARTIAL_SECRETS_ACCOUNT,
97 UF_SERVER_TRUST_ACCOUNT,
98 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION,
99 UF_WORKSTATION_TRUST_ACCOUNT,
100 UF_SMARTCARD_REQUIRED,
101 UF_INTERDOMAIN_TRUST_ACCOUNT,
103 from samba.join import DCJoinContext
104 from samba.ndr import ndr_pack, ndr_unpack
105 from samba.param import LoadParm
106 from samba.samdb import SamDB, dsdb_Dn
108 rc4_bit = security.KERB_ENCTYPE_RC4_HMAC_MD5
109 aes256_sk_bit = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
111 import samba.tests.krb5.kcrypto as kcrypto
112 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
113 from samba.tests import TestCaseInTempDir, delete_force
114 from samba.tests.krb5.raw_testcase import (
115 KerberosCredentials,
116 KerberosTicketCreds,
117 RawKerberosTest,
119 from samba.tests.krb5.rfc4120_constants import (
120 AD_IF_RELEVANT,
121 AD_WIN2K_PAC,
122 AES256_CTS_HMAC_SHA1_96,
123 ARCFOUR_HMAC_MD5,
124 KDC_ERR_PREAUTH_REQUIRED,
125 KDC_ERR_TGT_REVOKED,
126 KRB_AS_REP,
127 KRB_ERROR,
128 KRB_TGS_REP,
129 KU_AS_REP_ENC_PART,
130 KU_ENC_CHALLENGE_CLIENT,
131 KU_PA_ENC_TIMESTAMP,
132 KU_TICKET,
133 NT_PRINCIPAL,
134 NT_SRV_INST,
135 PADATA_ENC_TIMESTAMP,
136 PADATA_ENCRYPTED_CHALLENGE,
137 PADATA_ETYPE_INFO2,
140 global_asn1_print = False
141 global_hexdump = False
144 class GroupType(Enum):
145 GLOBAL = GTYPE_SECURITY_GLOBAL_GROUP
146 DOMAIN_LOCAL = GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
147 UNIVERSAL = GTYPE_SECURITY_UNIVERSAL_GROUP
150 # This simple class encapsulates the DN and SID of a Principal.
151 class Principal:
152 __slots__ = ['dn', 'sid']
154 def __init__(self, dn, sid):
155 if dn is not None and not isinstance(dn, ldb.Dn):
156 raise AssertionError(f'expected {dn} to be an ldb.Dn')
158 self.dn = dn
159 self.sid = sid
162 class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
163 """ Base class for KDC tests.
166 class AccountType(Enum):
167 USER = object()
168 COMPUTER = object()
169 SERVER = object()
170 RODC = object()
171 MANAGED_SERVICE = object()
172 GROUP_MANAGED_SERVICE = object()
173 TRUST = object()
175 @classmethod
176 def setUpClass(cls):
177 super().setUpClass()
178 cls._lp = None
180 cls._ldb = None
181 cls._rodc_ldb = None
183 cls._drsuapi_connection = None
184 cls._lsarpc_connection = None
186 cls._functional_level = None
188 # An identifier to ensure created accounts have unique names. Windows
189 # caches accounts based on usernames, so account names being different
190 # across test runs avoids previous test runs affecting the results.
191 cls.account_base = f'{secrets.token_hex(4)}_'
192 cls.account_id = 0
194 # A list containing DNs of accounts created as part of testing.
195 cls.accounts = []
197 # A list of tdo_handles of trusts created as part of testing.
198 cls.trusts = []
200 cls.account_cache = {}
201 cls.policy_cache = {}
202 cls.tkt_cache = {}
204 cls._rodc_ctx = None
206 cls.ldb_cleanups = []
208 cls._claim_types_dn = None
209 cls._authn_policy_config_dn = None
210 cls._authn_policies_dn = None
211 cls._authn_silos_dn = None
213 def get_claim_types_dn(self):
214 samdb = self.get_samdb()
216 if self._claim_types_dn is None:
217 claim_config_dn = samdb.get_config_basedn()
219 claim_config_dn.add_child('CN=Claims Configuration,CN=Services')
220 details = {
221 'dn': claim_config_dn,
222 'objectClass': 'container',
224 try:
225 samdb.add(details)
226 except ldb.LdbError as err:
227 num, _ = err.args
228 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
229 raise
230 else:
231 self.accounts.append(str(claim_config_dn))
233 claim_types_dn = claim_config_dn
234 claim_types_dn.add_child('CN=Claim Types')
235 details = {
236 'dn': claim_types_dn,
237 'objectClass': 'msDS-ClaimTypes',
239 try:
240 samdb.add(details)
241 except ldb.LdbError as err:
242 num, _ = err.args
243 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
244 raise
245 else:
246 self.accounts.append(str(claim_types_dn))
248 type(self)._claim_types_dn = claim_types_dn
250 # Return a copy of the DN.
251 return ldb.Dn(samdb, str(self._claim_types_dn))
253 def get_authn_policy_config_dn(self):
254 samdb = self.get_samdb()
256 if self._authn_policy_config_dn is None:
257 authn_policy_config_dn = samdb.get_config_basedn()
259 authn_policy_config_dn.add_child(
260 'CN=AuthN Policy Configuration,CN=Services')
261 details = {
262 'dn': authn_policy_config_dn,
263 'objectClass': 'container',
264 'description': ('Contains configuration for authentication '
265 'policy'),
267 try:
268 samdb.add(details)
269 except ldb.LdbError as err:
270 num, _ = err.args
271 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
272 raise
273 else:
274 self.accounts.append(str(authn_policy_config_dn))
276 type(self)._authn_policy_config_dn = authn_policy_config_dn
278 # Return a copy of the DN.
279 return ldb.Dn(samdb, str(self._authn_policy_config_dn))
281 def get_authn_policies_dn(self):
282 samdb = self.get_samdb()
284 if self._authn_policies_dn is None:
285 authn_policies_dn = self.get_authn_policy_config_dn()
286 authn_policies_dn.add_child('CN=AuthN Policies')
287 details = {
288 'dn': authn_policies_dn,
289 'objectClass': 'msDS-AuthNPolicies',
290 'description': 'Contains authentication policy objects',
292 try:
293 samdb.add(details)
294 except ldb.LdbError as err:
295 num, _ = err.args
296 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
297 raise
298 else:
299 self.accounts.append(str(authn_policies_dn))
301 type(self)._authn_policies_dn = authn_policies_dn
303 # Return a copy of the DN.
304 return ldb.Dn(samdb, str(self._authn_policies_dn))
306 def get_authn_silos_dn(self):
307 samdb = self.get_samdb()
309 if self._authn_silos_dn is None:
310 authn_silos_dn = self.get_authn_policy_config_dn()
311 authn_silos_dn.add_child('CN=AuthN Silos')
312 details = {
313 'dn': authn_silos_dn,
314 'objectClass': 'msDS-AuthNPolicySilos',
315 'description': 'Contains authentication policy silo objects',
317 try:
318 samdb.add(details)
319 except ldb.LdbError as err:
320 num, _ = err.args
321 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
322 raise
323 else:
324 self.accounts.append(str(authn_silos_dn))
326 type(self)._authn_silos_dn = authn_silos_dn
328 # Return a copy of the DN.
329 return ldb.Dn(samdb, str(self._authn_silos_dn))
331 @staticmethod
332 def freeze(m):
333 return frozenset((k, v) for k, v in m.items())
335 def tearDown(self):
336 # Run any cleanups that may modify accounts prior to deleting those
337 # accounts.
338 self.doCleanups()
340 # Clean up any accounts created for single tests.
341 if self._ldb is not None:
342 for dn in reversed(self.test_accounts):
343 delete_force(self._ldb, dn)
345 if self._test_rodc_ctx is not None:
346 self._test_rodc_ctx.cleanup_old_join(force=True)
348 # Clean up any trusts created for single tests.
349 if self._lsarpc_connection is not None:
350 lsa_conn, _, _, _, _ = self._lsarpc_connection
351 for tdo_handle in reversed(self.test_trusts):
352 lsa_conn.DeleteObject(tdo_handle)
354 super().tearDown()
356 @classmethod
357 def tearDownClass(cls):
358 # Clean up any accounts created by create_account. This is
359 # done in tearDownClass() rather than tearDown(), so that
360 # accounts need only be created once for permutation tests.
361 if cls._ldb is not None:
362 for cleanup in reversed(cls.ldb_cleanups):
363 try:
364 cls._ldb.modify(cleanup)
365 except ldb.LdbError:
366 pass
368 for dn in reversed(cls.accounts):
369 delete_force(cls._ldb, dn)
371 # Clean up any trusts created by create_trust. This is
372 # done in tearDownClass() rather than tearDown(), so that
373 # trust accounts need only be created once for permutation tests.
374 if cls._lsarpc_connection is not None:
375 lsa_conn, _, _, _, _ = cls._lsarpc_connection
376 for tdo_handle in reversed(cls.trusts):
377 lsa_conn.DeleteObject(tdo_handle)
379 if cls._rodc_ctx is not None:
380 cls._rodc_ctx.cleanup_old_join(force=True)
382 super().tearDownClass()
384 def setUp(self):
385 super().setUp()
386 self.do_asn1_print = global_asn1_print
387 self.do_hexdump = global_hexdump
389 # A list containing DNs of accounts that should be removed when the
390 # current test finishes.
391 self.test_accounts = []
392 self._test_rodc_ctx = None
394 # A list containing tdo_handles of trusts that should be removed when the
395 # current test finishes.
396 self.test_trusts = []
398 def get_lp(self) -> LoadParm:
399 if self._lp is None:
400 type(self)._lp = self.get_loadparm()
402 return self._lp
404 def get_samdb(self) -> SamDB:
405 if self._ldb is None:
406 creds = self.get_admin_creds()
407 lp = self.get_lp()
409 session = system_session()
410 type(self)._ldb = SamDB(url="ldap://%s" % self.dc_host,
411 session_info=session,
412 credentials=creds,
413 lp=lp)
415 return self._ldb
417 def get_rodc_samdb(self) -> SamDB:
418 if self._rodc_ldb is None:
419 creds = self.get_admin_creds()
420 lp = self.get_lp()
422 session = system_session()
423 type(self)._rodc_ldb = SamDB(url="ldap://%s" % self.host,
424 session_info=session,
425 credentials=creds,
426 lp=lp,
427 am_rodc=True)
429 return self._rodc_ldb
431 def get_drsuapi_connection(self):
432 if self._drsuapi_connection is None:
433 admin_creds = self.get_admin_creds()
434 samdb = self.get_samdb()
435 dns_hostname = samdb.host_dns_name()
436 type(self)._drsuapi_connection = drsuapi_connect(dns_hostname,
437 self.get_lp(),
438 admin_creds,
439 ip=self.dc_host)
441 return self._drsuapi_connection
443 def get_lsarpc_connection(self):
444 def get_lsa_info(conn, policy_access):
445 in_version = 1
446 in_revision_info1 = lsa.revision_info1()
447 in_revision_info1.revision = 1
448 in_revision_info1.supported_features = (
449 lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER
452 out_version, out_revision_info1, policy = OpenPolicyFallback(
453 conn,
454 b''.decode('utf-8'),
455 in_version,
456 in_revision_info1,
457 access_mask=policy_access
460 info = conn.QueryInfoPolicy2(policy, lsa.LSA_POLICY_INFO_DNS)
462 return (policy, out_version, out_revision_info1, info)
464 def lsarpc_connect(server, lp, creds, ip=None):
465 binding_options = ""
466 if lp.log_level() >= 9:
467 binding_options += ",print"
469 # Allow forcing the IP
470 if ip is not None:
471 binding_options += f",target_hostname={server}"
472 binding_string = f"ncacn_np:{ip}[{binding_options}]"
473 else:
474 binding_string = "ncacn_np:%s[%s]" % (server, binding_options)
476 try:
477 conn = lsa.lsarpc(binding_string, lp, creds)
478 policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
479 policy_access |= lsa.LSA_POLICY_TRUST_ADMIN
480 policy_access |= lsa.LSA_POLICY_CREATE_SECRET
481 (policy, out_version, out_revision_info1, info) = \
482 get_lsa_info(conn, policy_access)
483 except Exception as e:
484 raise RuntimeError("LSARPC connection to %s failed: %s" % (server, e))
486 return (conn, policy, out_version, out_revision_info1, info)
488 if self._lsarpc_connection is None:
489 admin_creds = self.get_admin_creds()
490 samdb = self.get_samdb()
491 dns_hostname = samdb.host_dns_name()
492 type(self)._lsarpc_connection = lsarpc_connect(dns_hostname,
493 self.get_lp(),
494 admin_creds,
495 ip=self.dc_host)
497 return self._lsarpc_connection
499 def get_server_dn(self, samdb):
500 server = samdb.get_serverName()
502 res = samdb.search(base=server,
503 scope=ldb.SCOPE_BASE,
504 attrs=['serverReference'])
505 dn = ldb.Dn(samdb, res[0]['serverReference'][0].decode('utf8'))
507 return dn
509 def get_mock_rodc_ctx(self, preserve=True):
510 if preserve:
511 rodc_ctx = self._rodc_ctx
512 else:
513 rodc_ctx = self._test_rodc_ctx
515 if rodc_ctx is None:
516 admin_creds = self.get_admin_creds()
517 lp = self.get_lp()
519 rodc_name = self.get_new_username()
520 site_name = 'Default-First-Site-Name'
522 rodc_ctx = DCJoinContext(server=self.dc_host,
523 creds=admin_creds,
524 lp=lp,
525 site=site_name,
526 netbios_name=rodc_name,
527 targetdir=None,
528 domain=None)
529 self.create_rodc(rodc_ctx)
531 if preserve:
532 # Mark this rodc for deletion in tearDownClass() after all the
533 # tests in this class finish.
534 type(self)._rodc_ctx = rodc_ctx
535 else:
536 # Mark this rodc for deletion in tearDown() after the current
537 # test finishes.
538 self._test_rodc_ctx = rodc_ctx
540 return rodc_ctx
542 def get_domain_functional_level(self, ldb=None):
543 if self._functional_level is None:
544 if ldb is None:
545 ldb = self.get_samdb()
547 res = ldb.search(base='',
548 scope=SCOPE_BASE,
549 attrs=['domainFunctionality'])
550 try:
551 functional_level = int(res[0]['domainFunctionality'][0])
552 except KeyError:
553 functional_level = DS_DOMAIN_FUNCTION_2000
555 type(self)._functional_level = functional_level
557 return self._functional_level
559 def get_default_enctypes(self, creds):
560 self.assertIsNotNone(creds, 'expected client creds to be passed in')
562 functional_level = self.get_domain_functional_level()
564 default_enctypes = []
566 if functional_level >= DS_DOMAIN_FUNCTION_2008:
567 # AES is only supported at functional level 2008 or higher
568 default_enctypes.append(kcrypto.Enctype.AES256)
569 default_enctypes.append(kcrypto.Enctype.AES128)
571 if self.expect_nt_hash or creds.get_workstation():
572 default_enctypes.append(kcrypto.Enctype.RC4)
574 return default_enctypes
576 def create_group(self, samdb, name, ou=None, gtype=None):
577 if ou is None:
578 ou = samdb.get_wellknown_dn(samdb.get_default_basedn(),
579 DS_GUID_USERS_CONTAINER)
581 dn = f'CN={name},{ou}'
583 # Remove the group if it exists; this will happen if a previous test
584 # run failed.
585 delete_force(samdb, dn)
587 # Save the group name so it can be deleted in tearDownClass.
588 self.accounts.append(dn)
590 details = {
591 'dn': dn,
592 'objectClass': 'group'
594 if gtype is not None:
595 details['groupType'] = common.normalise_int32(gtype)
596 samdb.add(details)
598 return dn
600 def get_dn_from_attribute(self, attribute):
601 return self.get_from_attribute(attribute).dn
603 def get_dn_from_class(self, attribute):
604 return self.get_from_class(attribute).dn
606 def get_schema_id_guid_from_attribute(self, attribute):
607 guid = self.get_from_attribute(attribute).get('schemaIDGUID', idx=0)
608 return misc.GUID(guid)
610 def get_from_attribute(self, attribute):
611 return self.get_from_schema(attribute, 'attributeSchema')
613 def get_from_class(self, attribute):
614 return self.get_from_schema(attribute, 'classSchema')
616 def get_from_schema(self, name, object_class):
617 samdb = self.get_samdb()
618 schema_dn = samdb.get_schema_basedn()
620 res = samdb.search(base=schema_dn,
621 scope=ldb.SCOPE_ONELEVEL,
622 attrs=['schemaIDGUID'],
623 expression=(f'(&(objectClass={object_class})'
624 f'(lDAPDisplayName={name}))'))
625 self.assertEqual(1, len(res),
626 f'could not locate {name} in {object_class}')
628 return res[0]
630 def create_authn_silo(self, *,
631 members=None,
632 user_policy=None,
633 computer_policy=None,
634 service_policy=None,
635 enforced=None):
636 samdb = self.get_samdb()
638 silo_id = self.get_new_username()
640 authn_silo_dn = self.get_authn_silos_dn()
641 authn_silo_dn.add_child(f'CN={silo_id}')
643 details = {
644 'dn': authn_silo_dn,
645 'objectClass': 'msDS-AuthNPolicySilo',
648 if enforced is True:
649 enforced = 'TRUE'
650 elif enforced is False:
651 enforced = 'FALSE'
653 if members is not None:
654 details['msDS-AuthNPolicySiloMembers'] = members
655 if user_policy is not None:
656 details['msDS-UserAuthNPolicy'] = str(user_policy.dn)
657 if computer_policy is not None:
658 details['msDS-ComputerAuthNPolicy'] = str(computer_policy.dn)
659 if service_policy is not None:
660 details['msDS-ServiceAuthNPolicy'] = str(service_policy.dn)
661 if enforced is not None:
662 details['msDS-AuthNPolicySiloEnforced'] = enforced
664 # Save the silo DN so it can be deleted in tearDownClass().
665 self.accounts.append(str(authn_silo_dn))
667 # Remove the silo if it exists; this will happen if a previous test run
668 # failed.
669 delete_force(samdb, authn_silo_dn)
671 samdb.add(details)
673 return AuthenticationSilo.get(samdb, dn=authn_silo_dn)
675 def create_authn_silo_claim_id(self):
676 claim_id = 'ad://ext/AuthenticationSilo'
678 for_classes = [
679 'msDS-GroupManagedServiceAccount',
680 'user',
681 'msDS-ManagedServiceAccount',
682 'computer',
685 self.create_claim(claim_id,
686 enabled=True,
687 single_valued=True,
688 value_space_restricted=False,
689 source_type='Constructed',
690 for_classes=for_classes,
691 value_type=claims.CLAIM_TYPE_STRING,
692 # It's OK if the claim type already exists.
693 force=False)
695 return claim_id
697 def create_authn_policy(self, *,
698 use_cache=True,
699 **kwargs):
701 if use_cache:
702 cache_key = self.freeze(kwargs)
704 authn_policy = self.policy_cache.get(cache_key)
705 if authn_policy is not None:
706 return authn_policy
708 authn_policy = self.create_authn_policy_opts(**kwargs)
709 if use_cache:
710 self.policy_cache[cache_key] = authn_policy
712 return authn_policy
714 def create_authn_policy_opts(self, *,
715 enforced=None,
716 strong_ntlm_policy=None,
717 user_allowed_from=None,
718 user_allowed_ntlm=None,
719 user_allowed_to=None,
720 user_tgt_lifetime=None,
721 computer_allowed_to=None,
722 computer_tgt_lifetime=None,
723 service_allowed_from=None,
724 service_allowed_ntlm=None,
725 service_allowed_to=None,
726 service_tgt_lifetime=None):
727 samdb = self.get_samdb()
729 policy_id = self.get_new_username()
731 policy_dn = self.get_authn_policies_dn()
732 policy_dn.add_child(f'CN={policy_id}')
734 details = {
735 'dn': policy_dn,
736 'objectClass': 'msDS-AuthNPolicy',
739 _domain_sid = None
741 def sd_from_sddl(sddl):
742 nonlocal _domain_sid
743 if _domain_sid is None:
744 _domain_sid = security.dom_sid(samdb.get_domain_sid())
746 return ndr_pack(security.descriptor.from_sddl(sddl, _domain_sid))
748 if enforced is True:
749 enforced = 'TRUE'
750 elif enforced is False:
751 enforced = 'FALSE'
753 if user_allowed_ntlm is True:
754 user_allowed_ntlm = 'TRUE'
755 elif user_allowed_ntlm is False:
756 user_allowed_ntlm = 'FALSE'
758 if service_allowed_ntlm is True:
759 service_allowed_ntlm = 'TRUE'
760 elif service_allowed_ntlm is False:
761 service_allowed_ntlm = 'FALSE'
763 if enforced is not None:
764 details['msDS-AuthNPolicyEnforced'] = enforced
765 if strong_ntlm_policy is not None:
766 details['msDS-StrongNTLMPolicy'] = strong_ntlm_policy
768 if user_allowed_from is not None:
769 details['msDS-UserAllowedToAuthenticateFrom'] = sd_from_sddl(
770 user_allowed_from)
771 if user_allowed_ntlm is not None:
772 details['msDS-UserAllowedNTLMNetworkAuthentication'] = (
773 user_allowed_ntlm)
774 if user_allowed_to is not None:
775 details['msDS-UserAllowedToAuthenticateTo'] = sd_from_sddl(
776 user_allowed_to)
777 if user_tgt_lifetime is not None:
778 if isinstance(user_tgt_lifetime, numbers.Number):
779 user_tgt_lifetime = str(int(user_tgt_lifetime * 10_000_000))
780 details['msDS-UserTGTLifetime'] = user_tgt_lifetime
782 if computer_allowed_to is not None:
783 details['msDS-ComputerAllowedToAuthenticateTo'] = sd_from_sddl(
784 computer_allowed_to)
785 if computer_tgt_lifetime is not None:
786 if isinstance(computer_tgt_lifetime, numbers.Number):
787 computer_tgt_lifetime = str(
788 int(computer_tgt_lifetime * 10_000_000))
789 details['msDS-ComputerTGTLifetime'] = computer_tgt_lifetime
791 if service_allowed_from is not None:
792 details['msDS-ServiceAllowedToAuthenticateFrom'] = sd_from_sddl(
793 service_allowed_from)
794 if service_allowed_ntlm is not None:
795 details['msDS-ServiceAllowedNTLMNetworkAuthentication'] = (
796 service_allowed_ntlm)
797 if service_allowed_to is not None:
798 details['msDS-ServiceAllowedToAuthenticateTo'] = sd_from_sddl(
799 service_allowed_to)
800 if service_tgt_lifetime is not None:
801 if isinstance(service_tgt_lifetime, numbers.Number):
802 service_tgt_lifetime = str(
803 int(service_tgt_lifetime * 10_000_000))
804 details['msDS-ServiceTGTLifetime'] = service_tgt_lifetime
806 # Save the policy DN so it can be deleted in tearDownClass().
807 self.accounts.append(str(policy_dn))
809 # Remove the policy if it exists; this will happen if a previous test
810 # run failed.
811 delete_force(samdb, policy_dn)
813 samdb.add(details)
815 return AuthenticationPolicy.get(samdb, dn=policy_dn)
817 def create_claim(self,
818 claim_id,
819 enabled=None,
820 attribute=None,
821 single_valued=None,
822 value_space_restricted=None,
823 source=None,
824 source_type=None,
825 for_classes=None,
826 value_type=None,
827 force=True):
828 samdb = self.get_samdb()
830 claim_dn = self.get_claim_types_dn()
831 claim_dn.add_child(f'CN={claim_id}')
833 details = {
834 'dn': claim_dn,
835 'objectClass': 'msDS-ClaimType',
838 if enabled is True:
839 enabled = 'TRUE'
840 elif enabled is False:
841 enabled = 'FALSE'
843 if attribute is not None:
844 attribute = str(self.get_dn_from_attribute(attribute))
846 if single_valued is True:
847 single_valued = 'TRUE'
848 elif single_valued is False:
849 single_valued = 'FALSE'
851 if value_space_restricted is True:
852 value_space_restricted = 'TRUE'
853 elif value_space_restricted is False:
854 value_space_restricted = 'FALSE'
856 if for_classes is not None:
857 for_classes = [str(self.get_dn_from_class(name))
858 for name in for_classes]
860 if isinstance(value_type, int):
861 value_type = str(value_type)
863 if enabled is not None:
864 details['Enabled'] = enabled
865 if attribute is not None:
866 details['msDS-ClaimAttributeSource'] = attribute
867 if single_valued is not None:
868 details['msDS-ClaimIsSingleValued'] = single_valued
869 if value_space_restricted is not None:
870 details['msDS-ClaimIsValueSpaceRestricted'] = (
871 value_space_restricted)
872 if source is not None:
873 details['msDS-ClaimSource'] = source
874 if source_type is not None:
875 details['msDS-ClaimSourceType'] = source_type
876 if for_classes is not None:
877 details['msDS-ClaimTypeAppliesToClass'] = for_classes
878 if value_type is not None:
879 details['msDS-ClaimValueType'] = value_type
881 if force:
882 # Remove the claim if it exists; this will happen if a previous
883 # test run failed
884 delete_force(samdb, claim_dn)
886 try:
887 samdb.add(details)
888 except ldb.LdbError as err:
889 num, estr = err.args
890 if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
891 raise
892 self.assertFalse(force, 'should not fail with force=True')
893 else:
894 # Save the claim DN so it can be deleted in tearDownClass()
895 self.accounts.append(str(claim_dn))
897 def create_trust(self, trust_info,
898 trust_enc_types=None,
899 trust_incoming_password=None,
900 trust_outgoing_password=None,
901 expect_error=None,
902 preserve=True):
903 """Create an trust account for testing.
904 The handle of the created trust is added to cls.trusts,
905 which is used by tearDownClass to clean up the created trusts.
906 With preserve=False the handle is added to self.test_trusts,
907 which is used by tearDown to clean up the created trusts.
910 if trust_incoming_password is None:
911 trust_incoming_password = generate_random_password(120, 120)
912 trust_incoming_secret = list(trust_incoming_password.encode('utf-16-le'))
913 if trust_outgoing_password is None:
914 trust_outgoing_password = generate_random_password(120, 120)
915 trust_outgoing_secret = list(trust_outgoing_password.encode('utf-16-le'))
917 def generate_AuthInOutBlob(secret, update_time):
918 if secret is None:
919 blob = drsblobs.trustAuthInOutBlob()
920 blob.count = 0
922 return blob
924 clear = drsblobs.AuthInfoClear()
925 clear.size = len(secret)
926 clear.password = secret
928 info = drsblobs.AuthenticationInformation()
929 info.LastUpdateTime = unix2nttime(update_time)
930 info.AuthType = lsa.TRUST_AUTH_TYPE_CLEAR
931 info.AuthInfo = clear
933 array = drsblobs.AuthenticationInformationArray()
934 array.count = 1
935 array.array = [info]
937 blob = drsblobs.trustAuthInOutBlob()
938 blob.count = 1
939 blob.current = array
941 return blob
943 update_time = current_unix_time()
944 trust_incoming_blob = generate_AuthInOutBlob(trust_incoming_secret,
945 update_time)
946 trust_outgoing_blob = generate_AuthInOutBlob(trust_outgoing_secret,
947 update_time)
949 lsa_conn, lsa_policy, lsa_version, lsa_revision_info1, local_info = \
950 self.get_lsarpc_connection()
952 try:
953 tdo_handle = CreateTrustedDomainFallback(lsa_conn,
954 lsa_policy,
955 trust_info,
956 lsa.LSA_TRUSTED_DOMAIN_ALL_ACCESS |
957 security.SEC_STD_DELETE,
958 lsa_version,
959 lsa_revision_info1,
960 trust_incoming_blob,
961 trust_outgoing_blob)
962 except NTSTATUSError as err:
963 status, _ = err.args
964 self.assertIsNotNone(expect_error,
965 f'unexpectedly failed with {status:08X}')
966 self.assertEqual(expect_error, status, 'got wrong status code')
967 return (None, None, None, None)
968 self.assertIsNone(expect_error, 'expected error')
969 if preserve:
970 # Mark this trust for deletion in tearDownClass() after all the
971 # tests in this class finish.
972 self.trusts.append(tdo_handle)
973 else:
974 # Mark this trust for deletion in tearDown() after the current
975 # test finishes.
976 self.test_trusts.append(tdo_handle)
977 if trust_enc_types:
978 lsa_conn.SetInformationTrustedDomain(tdo_handle,
979 lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES,
980 trust_enc_types)
982 samdb = self.get_samdb()
984 incoming_account_name = trust_info.netbios_name.string
985 incoming_account_name += '$'
986 incoming_nbt_domain = local_info.name.string
987 incoming_dns_domain = local_info.dns_domain.string
989 outgoing_account_name = local_info.name.string
990 outgoing_account_name += '$'
991 outgoing_nbt_domain = trust_info.netbios_name.string
992 outgoing_dns_domain = trust_info.domain_name.string
994 tdo_search_filter = "(&(objectClass=trustedDomain)(name=%s))" % (
995 outgoing_dns_domain)
996 tdo_res = samdb.search(scope=ldb.SCOPE_SUBTREE,
997 expression=tdo_search_filter,
998 attrs=['msDS-TrustForestTrustInfo'])
999 self.assertEqual(len(tdo_res), 1)
1000 tdo_dn = tdo_res[0].dn
1002 acct_search_filter = "(&(objectClass=user)(sAMAccountName=%s))" % (
1003 incoming_account_name)
1004 acct_res = samdb.search(scope=ldb.SCOPE_SUBTREE,
1005 expression=acct_search_filter,
1006 attrs=['msDS-KeyVersionNumber',
1007 'objectSid',
1008 'objectGUID'])
1009 self.assertEqual(len(acct_res), 1)
1010 acct_dn = acct_res[0].dn
1011 acct_kvno = int(acct_res[0]['msDS-KeyVersionNumber'][0])
1012 acct_sid = acct_res[0].get('objectSid', idx=0)
1013 acct_sid = samdb.schema_format_value('objectSID', acct_sid)
1014 acct_sid = acct_sid.decode('utf-8')
1015 acct_guid = acct_res[0].get('objectGUID', idx=0)
1016 acct_guid = samdb.schema_format_value('objectGUID', acct_guid)
1017 acct_guid = acct_guid.decode('utf-8')
1019 trust_incoming_salt = "%skrbtgt%s" % (
1020 incoming_dns_domain.upper(),
1021 outgoing_dns_domain.upper())
1022 trust_outgoing_salt = "%skrbtgt%s" % (
1023 outgoing_dns_domain.upper(),
1024 incoming_dns_domain.upper())
1025 trust_account_salt = "%skrbtgt%s" % (
1026 incoming_dns_domain.upper(),
1027 outgoing_nbt_domain.upper())
1029 if trust_info.trust_type != lsa.LSA_TRUST_TYPE_DOWNLEVEL:
1030 secure_channel_type = SEC_CHAN_DNS_DOMAIN
1031 else:
1032 secure_channel_type = SEC_CHAN_DOMAIN
1034 incoming_creds = KerberosCredentials()
1035 incoming_creds.guess(self.get_lp())
1036 incoming_creds.set_realm(incoming_dns_domain.upper())
1037 incoming_creds.set_domain(incoming_nbt_domain.upper())
1038 incoming_creds.set_forced_salt(trust_incoming_salt.encode('utf-8'))
1039 incoming_creds.set_password(trust_incoming_password)
1040 incoming_creds.set_username(incoming_account_name)
1041 incoming_creds.set_workstation('')
1042 incoming_creds.set_secure_channel_type(secure_channel_type)
1043 incoming_creds.set_dn(tdo_dn)
1044 incoming_creds.set_type(self.AccountType.TRUST)
1045 incoming_creds.set_user_account_control(UF_INTERDOMAIN_TRUST_ACCOUNT)
1046 self.creds_set_enctypes(incoming_creds)
1048 outgoing_creds = KerberosCredentials()
1049 outgoing_creds.guess(self.get_lp())
1050 outgoing_creds.set_realm(outgoing_dns_domain.upper())
1051 outgoing_creds.set_domain(outgoing_nbt_domain.upper())
1052 outgoing_creds.set_forced_salt(trust_outgoing_salt.encode('utf-8'))
1053 outgoing_creds.set_password(trust_outgoing_password)
1054 outgoing_creds.set_username(outgoing_account_name)
1055 outgoing_creds.set_workstation('')
1056 outgoing_creds.set_secure_channel_type(secure_channel_type)
1057 outgoing_creds.set_dn(tdo_dn)
1058 outgoing_creds.set_type(self.AccountType.TRUST)
1059 outgoing_creds.set_user_account_control(UF_INTERDOMAIN_TRUST_ACCOUNT)
1060 self.creds_set_enctypes(outgoing_creds)
1062 account_creds = KerberosCredentials()
1063 account_creds.guess(self.get_lp())
1064 account_creds.set_realm(incoming_dns_domain.upper())
1065 account_creds.set_domain(incoming_nbt_domain.upper())
1066 account_creds.set_forced_salt(trust_account_salt.encode('utf-8'))
1067 account_creds.set_password(trust_incoming_password)
1068 account_creds.set_username(incoming_account_name)
1069 account_creds.set_workstation('TEST-TRUST-DC')
1070 account_creds.set_secure_channel_type(secure_channel_type)
1071 account_creds.set_dn(acct_dn)
1072 account_creds.set_type(self.AccountType.TRUST)
1073 account_creds.set_user_account_control(UF_INTERDOMAIN_TRUST_ACCOUNT)
1074 account_creds.set_kvno(acct_kvno)
1075 account_creds.set_sid(str(acct_sid))
1076 account_creds.set_guid(acct_guid)
1077 if trust_enc_types is not None:
1078 self.creds_set_enctypes(account_creds,
1079 extra_bits=trust_enc_types.enc_types)
1080 else:
1081 self.creds_set_enctypes(account_creds)
1083 incoming_creds.set_trust_outgoing_creds(outgoing_creds)
1084 incoming_creds.set_trust_account_creds(account_creds)
1086 outgoing_creds.set_trust_incoming_creds(incoming_creds)
1087 outgoing_creds.set_trust_account_creds(account_creds)
1089 account_creds.set_trust_incoming_creds(incoming_creds)
1090 account_creds.set_trust_outgoing_creds(outgoing_creds)
1092 self.remember_creds_for_keytab_export(incoming_creds)
1093 self.remember_creds_for_keytab_export(outgoing_creds)
1094 self.remember_creds_for_keytab_export(account_creds)
1096 return (tdo_handle, incoming_creds, outgoing_creds, account_creds)
1098 def create_account(self, samdb, name, account_type=AccountType.USER,
1099 spn=None, upn=None, additional_details=None,
1100 ou=None, account_control=0, add_dollar=None,
1101 expired_password=False, force_nt4_hash=False,
1102 export_to_keytab=True,
1103 preserve=True):
1104 """Create an account for testing.
1105 The dn of the created account is added to self.accounts,
1106 which is used by tearDownClass to clean up the created accounts.
1108 if add_dollar is None and account_type is not self.AccountType.USER:
1109 add_dollar = True
1111 if ou is None:
1112 if account_type is self.AccountType.COMPUTER:
1113 guid = DS_GUID_COMPUTERS_CONTAINER
1114 elif account_type is self.AccountType.MANAGED_SERVICE or (
1115 account_type is self.AccountType.GROUP_MANAGED_SERVICE):
1116 guid = DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
1117 elif account_type is self.AccountType.SERVER:
1118 guid = DS_GUID_DOMAIN_CONTROLLERS_CONTAINER
1119 else:
1120 guid = DS_GUID_USERS_CONTAINER
1122 ou = samdb.get_wellknown_dn(samdb.get_default_basedn(), guid)
1124 dn = "CN=%s,%s" % (name, ou)
1126 # remove the account if it exists, this will happen if a previous test
1127 # run failed
1128 delete_force(samdb, dn)
1129 account_name = name
1130 if add_dollar:
1131 account_name += '$'
1132 secure_schannel_type = SEC_CHAN_NULL
1133 if account_type is self.AccountType.USER:
1134 object_class = "user"
1135 account_control |= UF_NORMAL_ACCOUNT
1136 elif account_type is self.AccountType.MANAGED_SERVICE:
1137 object_class = "msDS-ManagedServiceAccount"
1138 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
1139 secure_schannel_type = SEC_CHAN_WKSTA
1140 elif account_type is self.AccountType.GROUP_MANAGED_SERVICE:
1141 object_class = "msDS-GroupManagedServiceAccount"
1142 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
1143 secure_schannel_type = SEC_CHAN_WKSTA
1144 else:
1145 object_class = "computer"
1146 if account_type is self.AccountType.COMPUTER:
1147 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
1148 secure_schannel_type = SEC_CHAN_WKSTA
1149 elif account_type is self.AccountType.SERVER:
1150 account_control |= UF_SERVER_TRUST_ACCOUNT
1151 secure_schannel_type = SEC_CHAN_BDC
1152 else:
1153 self.fail()
1155 details = {
1156 "dn": dn,
1157 "objectClass": object_class,
1158 "sAMAccountName": account_name,
1159 "userAccountControl": str(account_control),
1162 if account_type is self.AccountType.GROUP_MANAGED_SERVICE:
1163 password = None
1164 else:
1165 password = generate_random_password(32, 32)
1166 utf16pw = ('"%s"' % password).encode('utf-16-le')
1168 details['unicodePwd'] = utf16pw
1170 if upn is not None:
1171 upn = upn.format(account=account_name)
1172 if spn is not None:
1173 if isinstance(spn, str):
1174 spn = spn.format(account=account_name)
1175 else:
1176 spn = tuple(s.format(account=account_name) for s in spn)
1177 details["servicePrincipalName"] = spn
1178 if upn is not None:
1179 details["userPrincipalName"] = upn
1180 if expired_password:
1181 details["pwdLastSet"] = "0"
1182 if additional_details is not None:
1183 details.update(additional_details)
1184 if preserve:
1185 # Mark this account for deletion in tearDownClass() after all the
1186 # tests in this class finish.
1187 self.accounts.append(dn)
1188 else:
1189 # Mark this account for deletion in tearDown() after the current
1190 # test finishes. Because the time complexity of deleting an account
1191 # in Samba scales with the number of accounts, it is faster to
1192 # delete accounts as soon as possible than to keep them around
1193 # until all the tests are finished.
1194 self.test_accounts.append(dn)
1195 samdb.add(details)
1197 expected_kvno = 1
1199 if force_nt4_hash:
1200 admin_creds = self.get_admin_creds()
1201 lp = self.get_lp()
1202 net_ctx = net.Net(admin_creds, lp, server=self.dc_host)
1203 domain = samdb.domain_netbios_name().upper()
1205 password = generate_random_password(32, 32)
1207 try:
1208 net_ctx.set_password(newpassword=password,
1209 account_name=account_name,
1210 domain_name=domain,
1211 force_samr_18=True)
1212 expected_kvno += 1
1213 except Exception as e:
1214 self.fail(e)
1216 creds = KerberosCredentials()
1217 creds.guess(self.get_lp())
1218 creds.set_realm(samdb.domain_dns_name().upper())
1219 creds.set_domain(samdb.domain_netbios_name().upper())
1220 if password is not None:
1221 creds.set_password(password)
1222 creds.set_username(account_name)
1223 if account_type is self.AccountType.USER:
1224 creds.set_workstation('')
1225 else:
1226 creds.set_workstation(name)
1227 creds.set_secure_channel_type(secure_schannel_type)
1228 creds.set_dn(ldb.Dn(samdb, dn))
1229 creds.set_upn(upn)
1230 creds.set_spn(spn)
1231 creds.set_type(account_type)
1232 creds.set_user_account_control(account_control)
1234 self.creds_set_enctypes(creds)
1236 res = samdb.search(base=dn,
1237 scope=ldb.SCOPE_BASE,
1238 attrs=['msDS-KeyVersionNumber',
1239 'objectSid',
1240 'objectGUID'])
1242 kvno = res[0].get('msDS-KeyVersionNumber', idx=0)
1243 if kvno is not None:
1244 self.assertEqual(int(kvno), expected_kvno)
1245 creds.set_kvno(expected_kvno)
1247 sid = res[0].get('objectSid', idx=0)
1248 sid = samdb.schema_format_value('objectSID', sid)
1249 sid = sid.decode('utf-8')
1250 creds.set_sid(sid)
1251 guid = res[0].get('objectGUID', idx=0)
1252 guid = samdb.schema_format_value('objectGUID', guid)
1253 guid = guid.decode('utf-8')
1254 creds.set_guid(guid)
1256 if export_to_keytab:
1257 self.remember_creds_for_keytab_export(creds)
1259 return (creds, dn)
1261 def get_security_descriptor(self, dn):
1262 samdb = self.get_samdb()
1264 sid = self.get_objectSid(samdb, dn)
1266 owner_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
1268 ace = security.ace()
1269 ace.access_mask = security.SEC_ADS_CONTROL_ACCESS
1271 ace.trustee = security.dom_sid(sid)
1273 dacl = security.acl()
1274 dacl.revision = security.SECURITY_ACL_REVISION_ADS
1275 dacl.aces = [ace]
1276 dacl.num_aces = 1
1278 security_desc = security.descriptor()
1279 security_desc.type |= security.SEC_DESC_DACL_PRESENT
1280 security_desc.owner_sid = owner_sid
1281 security_desc.dacl = dacl
1283 return ndr_pack(security_desc)
1285 def create_rodc(self, ctx):
1286 ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
1287 ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
1288 ctx.krbtgt_dn = f'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
1290 ctx.never_reveal_sid = [f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
1291 f'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
1292 f'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
1293 f'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
1294 f'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
1295 ctx.reveal_sid = f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
1297 mysid = ctx.get_mysid()
1298 admin_dn = f'<SID={mysid}>'
1299 ctx.managedby = admin_dn
1301 ctx.userAccountControl = (UF_WORKSTATION_TRUST_ACCOUNT |
1302 UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1303 UF_PARTIAL_SECRETS_ACCOUNT)
1305 ctx.connection_dn = f'CN=RODC Connection (FRS),{ctx.ntds_dn}'
1306 ctx.secure_channel_type = misc.SEC_CHAN_RODC
1307 ctx.RODC = True
1308 ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
1309 drsuapi.DRSUAPI_DRS_PER_SYNC |
1310 drsuapi.DRSUAPI_DRS_GET_ANC |
1311 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
1312 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
1313 ctx.domain_replica_flags = ctx.replica_flags | drsuapi.DRSUAPI_DRS_CRITICAL_ONLY
1315 ctx.build_nc_lists()
1317 ctx.cleanup_old_join()
1319 try:
1320 ctx.join_add_objects()
1321 except Exception:
1322 # cleanup the failed join (checking we still have a live LDB
1323 # connection to the remote DC first)
1324 ctx.refresh_ldb_connection()
1325 ctx.cleanup_old_join()
1326 raise
1328 def replicate_account_to_rodc(self, dn):
1329 samdb = self.get_samdb()
1330 rodc_samdb = self.get_rodc_samdb()
1332 repl_val = f'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
1334 msg = ldb.Message()
1335 msg.dn = ldb.Dn(rodc_samdb, '')
1336 msg['replicateSingleObject'] = ldb.MessageElement(
1337 repl_val,
1338 ldb.FLAG_MOD_REPLACE,
1339 'replicateSingleObject')
1341 try:
1342 # Try replication using the replicateSingleObject rootDSE
1343 # operation.
1344 rodc_samdb.modify(msg)
1345 except ldb.LdbError as err:
1346 enum, estr = err.args
1347 self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
1348 self.assertIn('rootdse_modify: unknown attribute to change!',
1349 estr)
1351 # If that method wasn't supported, we may be in the rodc:local test
1352 # environment, where we can try replicating to the local database.
1354 lp = self.get_lp()
1356 rodc_creds = Credentials()
1357 rodc_creds.guess(lp)
1358 rodc_creds.set_machine_account(lp)
1360 local_samdb = SamDB(url=None, session_info=system_session(),
1361 credentials=rodc_creds, lp=lp)
1363 destination_dsa_guid = misc.GUID(local_samdb.get_ntds_GUID())
1365 repl = drs_Replicate(f'ncacn_ip_tcp:{self.dc_host}[seal]',
1366 lp, rodc_creds,
1367 local_samdb, destination_dsa_guid)
1369 source_dsa_invocation_id = misc.GUID(samdb.invocation_id)
1371 repl.replicate(dn,
1372 source_dsa_invocation_id,
1373 destination_dsa_guid,
1374 exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
1375 rodc=True)
1377 def reveal_account_to_mock_rodc(self, dn):
1378 samdb = self.get_samdb()
1379 rodc_ctx = self.get_mock_rodc_ctx()
1381 self.get_secrets(
1383 destination_dsa_guid=rodc_ctx.ntds_guid,
1384 source_dsa_invocation_id=misc.GUID(samdb.invocation_id))
1386 def check_revealed(self, dn, rodc_dn, revealed=True):
1387 samdb = self.get_samdb()
1389 res = samdb.search(base=rodc_dn,
1390 scope=ldb.SCOPE_BASE,
1391 attrs=['msDS-RevealedUsers'])
1393 revealed_users = res[0].get('msDS-RevealedUsers')
1394 if revealed_users is None:
1395 self.assertFalse(revealed)
1396 return
1398 revealed_dns = set(str(dsdb_Dn(samdb, str(user),
1399 syntax_oid=DSDB_SYNTAX_BINARY_DN).dn)
1400 for user in revealed_users)
1402 if revealed:
1403 self.assertIn(str(dn), revealed_dns)
1404 else:
1405 self.assertNotIn(str(dn), revealed_dns)
1407 def get_secrets(self, dn,
1408 destination_dsa_guid,
1409 source_dsa_invocation_id):
1410 bind, handle, _ = self.get_drsuapi_connection()
1412 req = drsuapi.DsGetNCChangesRequest8()
1414 req.destination_dsa_guid = destination_dsa_guid
1415 req.source_dsa_invocation_id = source_dsa_invocation_id
1417 naming_context = drsuapi.DsReplicaObjectIdentifier()
1418 naming_context.dn = dn
1420 req.naming_context = naming_context
1422 hwm = drsuapi.DsReplicaHighWaterMark()
1423 hwm.tmp_highest_usn = 0
1424 hwm.reserved_usn = 0
1425 hwm.highest_usn = 0
1427 req.highwatermark = hwm
1428 req.uptodateness_vector = None
1430 req.replica_flags = 0
1432 req.max_object_count = 1
1433 req.max_ndr_size = 402116
1434 req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
1436 attids = [drsuapi.DRSUAPI_ATTID_supplementalCredentials,
1437 drsuapi.DRSUAPI_ATTID_unicodePwd,
1438 drsuapi.DRSUAPI_ATTID_ntPwdHistory]
1440 partial_attribute_set = drsuapi.DsPartialAttributeSet()
1441 partial_attribute_set.version = 1
1442 partial_attribute_set.attids = attids
1443 partial_attribute_set.num_attids = len(attids)
1445 req.partial_attribute_set = partial_attribute_set
1447 req.partial_attribute_set_ex = None
1448 req.mapping_ctr.num_mappings = 0
1449 req.mapping_ctr.mappings = None
1451 _, ctr = bind.DsGetNCChanges(handle, 8, req)
1453 self.assertEqual(1, ctr.object_count)
1455 identifier = ctr.first_object.object.identifier
1456 attributes = ctr.first_object.object.attribute_ctr.attributes
1458 self.assertEqual(dn, identifier.dn)
1460 return bind, identifier, attributes
1462 def unpack_supplemental_credentials(
1463 self, blob: bytes
1464 ) -> Dict[kcrypto.Enctype, str]:
1465 spl = ndr_unpack(drsblobs.supplementalCredentialsBlob, blob)
1467 keys: Dict[kcrypto.Enctype, str] = {}
1469 for pkg in spl.sub.packages:
1470 if pkg.name == 'Primary:Kerberos-Newer-Keys':
1471 krb5_new_keys_raw = binascii.a2b_hex(pkg.data)
1472 krb5_new_keys = ndr_unpack(
1473 drsblobs.package_PrimaryKerberosBlob, krb5_new_keys_raw
1475 for key in krb5_new_keys.ctr.keys:
1476 keytype = key.keytype
1477 if keytype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
1478 keys[keytype] = key.value.hex()
1480 return keys
1482 def get_keys(self, creds, expected_etypes=None):
1483 admin_creds = self.get_admin_creds()
1484 samdb = self.get_samdb()
1486 dn = creds.get_dn()
1488 bind, identifier, attributes = self.get_secrets(
1489 str(dn),
1490 destination_dsa_guid=misc.GUID(samdb.get_ntds_GUID()),
1491 source_dsa_invocation_id=misc.GUID())
1493 rid = identifier.sid.split()[1]
1495 net_ctx = net.Net(admin_creds)
1497 keys = {}
1499 for attr in attributes:
1500 if not attr.value_ctr.num_values:
1501 continue
1503 if attr.attid == drsuapi.DRSUAPI_ATTID_supplementalCredentials:
1504 net_ctx.replicate_decrypt(bind, attr, rid)
1506 keys.update(
1507 self.unpack_supplemental_credentials(attr.value_ctr.values[0].blob)
1509 elif attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
1510 net_ctx.replicate_decrypt(bind, attr, rid)
1512 pwd = attr.value_ctr.values[0].blob
1513 keys[kcrypto.Enctype.RC4] = pwd.hex()
1515 if expected_etypes is None:
1516 expected_etypes = self.get_default_enctypes(creds)
1518 self.assertCountEqual(expected_etypes, keys)
1520 return keys
1522 def creds_set_keys(self, creds, keys):
1523 if keys is not None:
1524 for enctype, key in keys.items():
1525 creds.set_forced_key(enctype, key)
1527 def creds_set_enctypes(self, creds,
1528 extra_bits=None,
1529 remove_bits=None):
1530 samdb = self.get_samdb()
1532 res = samdb.search(creds.get_dn(),
1533 scope=ldb.SCOPE_BASE,
1534 attrs=['msDS-SupportedEncryptionTypes'])
1535 supported_enctypes = res[0].get('msDS-SupportedEncryptionTypes', idx=0)
1537 if supported_enctypes is None:
1538 supported_enctypes = self.default_etypes
1539 if supported_enctypes is None:
1540 lp = self.get_lp()
1541 supported_enctypes = lp.get('kdc default domain supported enctypes')
1542 if supported_enctypes == 0:
1543 supported_enctypes = rc4_bit | aes256_sk_bit
1544 supported_enctypes = int(supported_enctypes)
1546 if extra_bits is not None:
1547 # We need to add in implicit or implied encryption types.
1548 supported_enctypes |= extra_bits
1549 if remove_bits is not None:
1550 # We also need to remove certain bits, such as the non-encryption
1551 # type bit aes256-sk.
1552 supported_enctypes &= ~remove_bits
1554 creds.set_as_supported_enctypes(supported_enctypes)
1555 creds.set_tgs_supported_enctypes(supported_enctypes)
1556 creds.set_ap_supported_enctypes(supported_enctypes)
1558 def creds_set_default_enctypes(self, creds,
1559 fast_support=False,
1560 claims_support=False,
1561 compound_id_support=False):
1562 default_enctypes = self.get_default_enctypes(creds)
1563 supported_enctypes = KerberosCredentials.etypes_to_bits(
1564 default_enctypes)
1566 if fast_support:
1567 supported_enctypes |= security.KERB_ENCTYPE_FAST_SUPPORTED
1568 if claims_support:
1569 supported_enctypes |= security.KERB_ENCTYPE_CLAIMS_SUPPORTED
1570 if compound_id_support:
1571 supported_enctypes |= (
1572 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED)
1574 creds.set_as_supported_enctypes(supported_enctypes)
1575 creds.set_tgs_supported_enctypes(supported_enctypes)
1576 creds.set_ap_supported_enctypes(supported_enctypes)
1578 def add_to_group(self, account_dn, group_dn, group_attr, expect_attr=True,
1579 new_group_type=None):
1580 samdb = self.get_samdb()
1582 try:
1583 res = samdb.search(base=group_dn,
1584 scope=ldb.SCOPE_BASE,
1585 attrs=[group_attr])
1586 except ldb.LdbError as err:
1587 num, _ = err.args
1588 if num != ldb.ERR_NO_SUCH_OBJECT:
1589 raise
1591 self.fail(err)
1593 orig_msg = res[0]
1594 members = orig_msg.get(group_attr)
1595 if expect_attr:
1596 self.assertIsNotNone(members)
1597 elif members is None:
1598 members = ()
1599 else:
1600 members = map(lambda s: s.decode('utf-8'), members)
1602 # Use a set so we can handle the same group being added twice.
1603 members = set(members)
1605 self.assertNotIsInstance(account_dn, ldb.Dn,
1606 'ldb.MessageElement does not support ldb.Dn')
1607 self.assertNotIsInstance(account_dn, bytes)
1609 if isinstance(account_dn, str):
1610 members.add(account_dn)
1611 else:
1612 members.update(account_dn)
1614 msg = ldb.Message()
1615 msg.dn = group_dn
1616 if new_group_type is not None:
1617 msg['0'] = ldb.MessageElement(
1618 common.normalise_int32(new_group_type),
1619 ldb.FLAG_MOD_REPLACE,
1620 'groupType')
1621 msg['1'] = ldb.MessageElement(list(members),
1622 ldb.FLAG_MOD_REPLACE,
1623 group_attr)
1624 cleanup = samdb.msg_diff(msg, orig_msg)
1625 self.ldb_cleanups.append(cleanup)
1626 samdb.modify(msg)
1628 return cleanup
1630 def remove_from_group(self, account_dn, group_dn):
1631 samdb = self.get_samdb()
1633 res = samdb.search(base=group_dn,
1634 scope=ldb.SCOPE_BASE,
1635 attrs=['member'])
1636 orig_msg = res[0]
1637 self.assertIn('member', orig_msg)
1638 members = list(orig_msg['member'])
1640 account_dn = str(account_dn).encode('utf-8')
1641 self.assertIn(account_dn, members)
1642 members.remove(account_dn)
1644 msg = ldb.Message()
1645 msg.dn = group_dn
1646 msg['member'] = ldb.MessageElement(members,
1647 ldb.FLAG_MOD_REPLACE,
1648 'member')
1650 cleanup = samdb.msg_diff(msg, orig_msg)
1651 self.ldb_cleanups.append(cleanup)
1652 samdb.modify(msg)
1654 return cleanup
1656 # Create a new group and return a Principal object representing it.
1657 def create_group_principal(self, samdb, group_type):
1658 name = self.get_new_username()
1659 dn = self.create_group(samdb, name, gtype=group_type.value)
1660 sid = self.get_objectSid(samdb, dn)
1662 return Principal(ldb.Dn(samdb, dn), sid)
1664 def set_group_type(self, samdb, dn, gtype):
1665 group_type = common.normalise_int32(gtype.value)
1666 msg = ldb.Message(dn)
1667 msg['groupType'] = ldb.MessageElement(group_type,
1668 ldb.FLAG_MOD_REPLACE,
1669 'groupType')
1670 samdb.modify(msg)
1672 def set_primary_group(self, samdb, dn, primary_sid,
1673 expected_error=None,
1674 expected_werror=None):
1675 # Get the RID to be set as our primary group.
1676 primary_rid = primary_sid.rsplit('-', 1)[1]
1678 # Find out our current primary group.
1679 res = samdb.search(dn,
1680 scope=ldb.SCOPE_BASE,
1681 attrs=['primaryGroupId'])
1682 orig_msg = res[0]
1684 # Prepare to modify the attribute.
1685 msg = ldb.Message(dn)
1686 msg['primaryGroupId'] = ldb.MessageElement(str(primary_rid),
1687 ldb.FLAG_MOD_REPLACE,
1688 'primaryGroupId')
1690 # We'll remove the primaryGroupId attribute after the test, to avoid
1691 # problems in the teardown if the user outlives the group.
1692 remove_msg = samdb.msg_diff(msg, orig_msg)
1693 self.addCleanup(samdb.modify, remove_msg)
1695 # Set primaryGroupId.
1696 if expected_error is None:
1697 self.assertIsNone(expected_werror)
1699 samdb.modify(msg)
1700 else:
1701 self.assertIsNotNone(expected_werror)
1703 with self.assertRaises(
1704 ldb.LdbError,
1705 msg='expected setting primary group to fail'
1706 ) as err:
1707 samdb.modify(msg)
1709 error, estr = err.exception.args
1710 self.assertEqual(expected_error, error)
1711 self.assertIn(f'{expected_werror:08X}', estr)
1713 # Create an arrangement of groups based on a configuration specified in a
1714 # test case. 'user_principal' is a principal representing the user account;
1715 # 'trust_principal', a principal representing the account of a user from
1716 # another domain.
1717 def setup_groups(self,
1718 samdb,
1719 preexisting_groups,
1720 group_setup,
1721 primary_groups):
1722 groups = dict(preexisting_groups)
1724 primary_group_types = {}
1726 # Create each group and add it to the group mapping.
1727 if group_setup is not None:
1728 for group_id, (group_type, _) in group_setup.items():
1729 self.assertNotIn(group_id, preexisting_groups,
1730 "don't specify placeholders")
1731 self.assertNotIn(group_id, groups,
1732 'group ID specified more than once')
1734 if primary_groups is not None and (
1735 group_id in primary_groups.values()):
1736 # Windows disallows setting a domain-local group as a
1737 # primary group, unless we create it as Universal first and
1738 # change it back to Domain-Local later.
1739 primary_group_types[group_id] = group_type
1740 group_type = GroupType.UNIVERSAL
1742 groups[group_id] = self.create_group_principal(samdb,
1743 group_type)
1745 if group_setup is not None:
1746 # Map a group ID to that group's DN, and generate an
1747 # understandable error message if the mapping fails.
1748 def group_id_to_dn(group_id):
1749 try:
1750 group = groups[group_id]
1751 except KeyError:
1752 self.fail(f"included group member '{group_id}', but it is "
1753 f"not specified in {groups.keys()}")
1754 else:
1755 if group.dn is not None:
1756 return str(group.dn)
1758 return f'<SID={group.sid}>'
1760 # Populate each group's members.
1761 for group_id, (_, members) in group_setup.items():
1762 # Get the group's DN and the mapped DNs of its members.
1763 dn = groups[group_id].dn
1764 principal_members = map(group_id_to_dn, members)
1766 # Add the members to the group.
1767 self.add_to_group(principal_members, dn, 'member',
1768 expect_attr=False)
1770 # Set primary groups.
1771 if primary_groups is not None:
1772 for user, primary_group in primary_groups.items():
1773 primary_sid = groups[primary_group].sid
1774 self.set_primary_group(samdb, user.dn, primary_sid)
1776 # Change the primary groups to their actual group types.
1777 for primary_group, primary_group_type in primary_group_types.items():
1778 self.set_group_type(samdb,
1779 groups[primary_group].dn,
1780 primary_group_type)
1782 # Return the mapping from group IDs to principals.
1783 return groups
1785 def map_to_sid(self, val, mapping, domain_sid):
1786 if isinstance(val, int):
1787 # If it's an integer, we assume it's a RID, and prefix the domain
1788 # SID.
1789 self.assertIsNotNone(domain_sid)
1790 return f'{domain_sid}-{val}'
1792 if mapping is not None and val in mapping:
1793 # Or if we have a mapping for it, apply that.
1794 return mapping[val].sid
1796 # Otherwise leave it unmodified.
1797 return val
1799 def map_to_dn(self, val, mapping, domain_sid):
1800 sid = self.map_to_sid(val, mapping, domain_sid)
1801 return ldb.Dn(self.get_samdb(), f'<SID={sid}>')
1803 # Return SIDs from principal placeholders based on a supplied mapping.
1804 def map_sids(self, sids, mapping, domain_sid):
1805 if sids is None:
1806 return None
1808 mapped_sids = set()
1810 for entry in sids:
1811 if isinstance(entry, frozenset):
1812 mapped_sids.add(frozenset(self.map_sids(entry,
1813 mapping,
1814 domain_sid)))
1815 else:
1816 val, sid_type, attrs = entry
1817 sid = self.map_to_sid(val, mapping, domain_sid)
1819 # There's no point expecting the 'Claims Valid' SID to be
1820 # present if we don't support claims. Filter it out to give the
1821 # tests a chance of passing.
1822 if not self.kdc_claims_support and (
1823 sid == security.SID_CLAIMS_VALID):
1824 continue
1826 mapped_sids.add((sid, sid_type, attrs))
1828 return mapped_sids
1830 def issued_by_rodc(self, ticket):
1831 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
1832 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(
1833 rodc_krbtgt_creds)
1835 checksum_keys = {
1836 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
1839 return self.modified_ticket(
1840 ticket,
1841 new_ticket_key=rodc_krbtgt_key,
1842 checksum_keys=checksum_keys)
1844 def signed_by_rodc(self, ticket):
1845 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
1846 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(
1847 rodc_krbtgt_creds)
1849 checksum_keys = {
1850 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
1853 return self.modified_ticket(ticket,
1854 checksum_keys=checksum_keys)
1856 # Get a ticket with the SIDs in the PAC replaced with ones we specify. This
1857 # is useful for creating arbitrary tickets that can be used to perform a
1858 # TGS-REQ.
1859 def ticket_with_sids(self,
1860 ticket,
1861 new_sids,
1862 domain_sid,
1863 user_rid,
1864 set_user_flags=0,
1865 reset_user_flags=0,
1866 from_rodc=False):
1867 if from_rodc:
1868 krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
1869 else:
1870 krbtgt_creds = self.get_krbtgt_creds()
1871 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1873 checksum_keys = {
1874 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
1877 modify_pac_fn = partial(self.set_pac_sids,
1878 new_sids=new_sids,
1879 domain_sid=domain_sid,
1880 user_rid=user_rid,
1881 set_user_flags=set_user_flags,
1882 reset_user_flags=reset_user_flags)
1884 return self.modified_ticket(ticket,
1885 new_ticket_key=krbtgt_key,
1886 modify_pac_fn=modify_pac_fn,
1887 checksum_keys=checksum_keys)
1889 # Replace the SIDs in a PAC with 'new_sids'.
1890 def set_pac_sids(self,
1891 pac,
1893 new_sids,
1894 domain_sid=None,
1895 user_rid=None,
1896 set_user_flags=0,
1897 reset_user_flags=0):
1898 if domain_sid is None:
1899 domain_sid = self.get_samdb().get_domain_sid()
1901 base_sids = []
1902 extra_sids = []
1903 resource_sids = []
1905 resource_domain = None
1907 primary_gid = None
1909 # Filter our SIDs into three arrays depending on their ultimate
1910 # location in the PAC.
1911 for sid, sid_type, attrs in new_sids:
1912 if sid_type is self.SidType.BASE_SID:
1913 if isinstance(sid, int):
1914 domain, rid = domain_sid, sid
1915 else:
1916 domain, rid = sid.rsplit('-', 1)
1917 self.assertEqual(domain_sid, domain,
1918 f'base SID {sid} must be in our domain')
1920 base_sid = samr.RidWithAttribute()
1921 base_sid.rid = int(rid)
1922 base_sid.attributes = attrs
1924 base_sids.append(base_sid)
1925 elif sid_type is self.SidType.EXTRA_SID:
1926 extra_sid = netlogon.netr_SidAttr()
1927 extra_sid.sid = security.dom_sid(sid)
1928 extra_sid.attributes = attrs
1930 extra_sids.append(extra_sid)
1931 elif sid_type is self.SidType.RESOURCE_SID:
1932 if isinstance(sid, int):
1933 domain, rid = domain_sid, sid
1934 else:
1935 domain, rid = sid.rsplit('-', 1)
1936 if resource_domain is None:
1937 resource_domain = domain
1938 else:
1939 self.assertEqual(resource_domain, domain,
1940 'resource SIDs must share the same '
1941 'domain')
1943 resource_sid = samr.RidWithAttribute()
1944 resource_sid.rid = int(rid)
1945 resource_sid.attributes = attrs
1947 resource_sids.append(resource_sid)
1948 elif sid_type is self.SidType.PRIMARY_GID:
1949 self.assertIsNone(primary_gid,
1950 f'must not specify a second primary GID '
1951 f'{sid}')
1952 self.assertIsNone(attrs, 'cannot specify primary GID attrs')
1954 if isinstance(sid, int):
1955 domain, primary_gid = domain_sid, sid
1956 else:
1957 domain, primary_gid = sid.rsplit('-', 1)
1958 self.assertEqual(domain_sid, domain,
1959 f'primary GID {sid} must be in our domain')
1960 else:
1961 self.fail(f'invalid SID type {sid_type}')
1963 found_logon_info = False
1965 pac_buffers = pac.buffers
1966 for pac_buffer in pac_buffers:
1967 # Find the LOGON_INFO PAC buffer.
1968 if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
1969 logon_info = pac_buffer.info.info
1971 # Add Extra SIDs and set the EXTRA_SIDS flag as needed.
1972 logon_info.info3.sidcount = len(extra_sids)
1973 if extra_sids:
1974 logon_info.info3.sids = extra_sids
1975 logon_info.info3.base.user_flags |= (
1976 netlogon.NETLOGON_EXTRA_SIDS)
1977 else:
1978 logon_info.info3.sids = None
1979 logon_info.info3.base.user_flags &= ~(
1980 netlogon.NETLOGON_EXTRA_SIDS)
1982 # Add Base SIDs.
1983 logon_info.info3.base.groups.count = len(base_sids)
1984 if base_sids:
1985 logon_info.info3.base.groups.rids = base_sids
1986 else:
1987 logon_info.info3.base.groups.rids = None
1989 logon_info.info3.base.domain_sid = security.dom_sid(domain_sid)
1990 if user_rid is not None:
1991 logon_info.info3.base.rid = int(user_rid)
1993 if primary_gid is not None:
1994 logon_info.info3.base.primary_gid = int(primary_gid)
1996 # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed.
1997 logon_info.resource_groups.groups.count = len(resource_sids)
1998 if resource_sids:
1999 resource_domain = security.dom_sid(resource_domain)
2000 logon_info.resource_groups.domain_sid = resource_domain
2001 logon_info.resource_groups.groups.rids = resource_sids
2002 logon_info.info3.base.user_flags |= (
2003 netlogon.NETLOGON_RESOURCE_GROUPS)
2004 else:
2005 logon_info.resource_groups.domain_sid = None
2006 logon_info.resource_groups.groups.rids = None
2007 logon_info.info3.base.user_flags &= ~(
2008 netlogon.NETLOGON_RESOURCE_GROUPS)
2010 logon_info.info3.base.user_flags |= set_user_flags
2011 logon_info.info3.base.user_flags &= ~reset_user_flags
2013 found_logon_info = True
2015 # Also replace the user's SID in the UPN DNS buffer.
2016 elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
2017 upn_dns_info_ex = pac_buffer.info.ex
2019 if user_rid is not None:
2020 upn_dns_info_ex.objectsid = security.dom_sid(
2021 f'{domain_sid}-{user_rid}')
2023 # But don't replace the user's SID in the Requester SID buffer, or
2024 # we'll get a SID mismatch.
2026 self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer')
2028 pac.buffers = pac_buffers
2030 return pac
2032 # Replace the device SIDs in a PAC with 'new_sids'.
2033 def set_pac_device_sids(self,
2034 pac,
2036 new_sids,
2037 domain_sid=None,
2038 user_rid):
2039 if domain_sid is None:
2040 domain_sid = self.get_samdb().get_domain_sid()
2042 base_sids = []
2043 extra_sids = []
2044 resource_sids = []
2046 primary_gid = None
2048 # Filter our SIDs into three arrays depending on their ultimate
2049 # location in the PAC.
2050 for entry in new_sids:
2051 if isinstance(entry, frozenset):
2052 resource_domain = None
2053 domain_sids = []
2055 for sid, sid_type, attrs in entry:
2056 self.assertIs(sid_type, self.SidType.RESOURCE_SID,
2057 'only resource SIDs may be specified in this way')
2059 if isinstance(sid, int):
2060 domain, rid = domain_sid, sid
2061 else:
2062 domain, rid = sid.rsplit('-', 1)
2063 if resource_domain is None:
2064 resource_domain = domain
2065 else:
2066 self.assertEqual(resource_domain, domain,
2067 'resource SIDs must share the same '
2068 'domain')
2070 resource_sid = samr.RidWithAttribute()
2071 resource_sid.rid = int(rid)
2072 resource_sid.attributes = attrs
2074 domain_sids.append(resource_sid)
2076 membership = krb5pac.PAC_DOMAIN_GROUP_MEMBERSHIP()
2077 if resource_domain is not None:
2078 membership.domain_sid = security.dom_sid(resource_domain)
2079 membership.groups.rids = domain_sids
2080 membership.groups.count = len(domain_sids)
2082 resource_sids.append(membership)
2083 else:
2084 sid, sid_type, attrs = entry
2085 if sid_type is self.SidType.BASE_SID:
2086 if isinstance(sid, int):
2087 domain, rid = domain_sid, sid
2088 else:
2089 domain, rid = sid.rsplit('-', 1)
2090 self.assertEqual(domain_sid, domain,
2091 f'base SID {sid} must be in our domain')
2093 base_sid = samr.RidWithAttribute()
2094 base_sid.rid = int(rid)
2095 base_sid.attributes = attrs
2097 base_sids.append(base_sid)
2098 elif sid_type is self.SidType.EXTRA_SID:
2099 extra_sid = netlogon.netr_SidAttr()
2100 extra_sid.sid = security.dom_sid(sid)
2101 extra_sid.attributes = attrs
2103 extra_sids.append(extra_sid)
2104 elif sid_type is self.SidType.RESOURCE_SID:
2105 self.fail('specify resource groups in frozenset(s)')
2106 elif sid_type is self.SidType.PRIMARY_GID:
2107 self.assertIsNone(primary_gid,
2108 f'must not specify a second primary GID '
2109 f'{sid}')
2110 self.assertIsNone(attrs, 'cannot specify primary GID attrs')
2112 if isinstance(sid, int):
2113 domain, primary_gid = domain_sid, sid
2114 else:
2115 domain, primary_gid = sid.rsplit('-', 1)
2116 self.assertEqual(domain_sid, domain,
2117 f'primary GID {sid} must be in our domain')
2118 else:
2119 self.fail(f'invalid SID type {sid_type}')
2121 pac_buffers = pac.buffers
2122 for pac_buffer in pac_buffers:
2123 # Find the DEVICE_INFO PAC buffer.
2124 if pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO:
2125 logon_info = pac_buffer.info.info
2126 break
2127 else:
2128 logon_info = krb5pac.PAC_DEVICE_INFO()
2130 logon_info_ctr = krb5pac.PAC_DEVICE_INFO_CTR()
2131 logon_info_ctr.info = logon_info
2133 pac_buffer = krb5pac.PAC_BUFFER()
2134 pac_buffer.type = krb5pac.PAC_TYPE_DEVICE_INFO
2135 pac_buffer.info = logon_info_ctr
2137 pac_buffers.append(pac_buffer)
2139 logon_info.domain_sid = security.dom_sid(domain_sid)
2140 logon_info.rid = int(user_rid)
2142 self.assertIsNotNone(primary_gid, 'please specify the primary GID')
2143 logon_info.primary_gid = int(primary_gid)
2145 # Add Base SIDs.
2146 if base_sids:
2147 logon_info.groups.rids = base_sids
2148 else:
2149 logon_info.groups.rids = None
2150 logon_info.groups.count = len(base_sids)
2152 # Add Extra SIDs.
2153 if extra_sids:
2154 logon_info.sids = extra_sids
2155 else:
2156 logon_info.sids = None
2157 logon_info.sid_count = len(extra_sids)
2159 # Add Resource SIDs.
2160 if resource_sids:
2161 logon_info.domain_groups = resource_sids
2162 else:
2163 logon_info.domain_groups = None
2164 logon_info.domain_group_count = len(resource_sids)
2166 pac.buffers = pac_buffers
2167 pac.num_buffers = len(pac_buffers)
2169 return pac
2171 def set_pac_claims(self, pac, *, client_claims=None, device_claims=None, claim_ids=None):
2172 if claim_ids is None:
2173 claim_ids = {}
2175 if client_claims is not None:
2176 self.assertIsNone(device_claims,
2177 'don’t specify both client and device claims')
2178 pac_claims = client_claims
2179 pac_buffer_type = krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO
2180 else:
2181 self.assertIsNotNone(device_claims,
2182 'please specify client or device claims')
2183 pac_claims = device_claims
2184 pac_buffer_type = krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO
2186 claim_value_types = {
2187 claims.CLAIM_TYPE_INT64: claims.CLAIM_INT64,
2188 claims.CLAIM_TYPE_UINT64: claims.CLAIM_UINT64,
2189 claims.CLAIM_TYPE_STRING: claims.CLAIM_STRING,
2190 claims.CLAIM_TYPE_BOOLEAN: claims.CLAIM_UINT64,
2193 claims_arrays = []
2195 for pac_claim_array in pac_claims:
2196 pac_claim_source_type, pac_claim_entries = (
2197 pac_claim_array)
2199 claim_entries = []
2201 for pac_claim_entry in pac_claim_entries:
2202 pac_claim_id, pac_claim_type, pac_claim_values = (
2203 pac_claim_entry)
2205 claim_values_type = claim_value_types.get(
2206 pac_claim_type, claims.CLAIM_STRING)
2208 claim_values_enum = claim_values_type()
2209 claim_values_enum.values = pac_claim_values
2210 claim_values_enum.value_count = len(
2211 pac_claim_values)
2213 claim_entry = claims.CLAIM_ENTRY()
2214 try:
2215 claim_entry.id = pac_claim_id.format_map(
2216 claim_ids)
2217 except KeyError as err:
2218 raise RuntimeError(
2219 f'unknown claim name(s) '
2220 f'in ‘{pac_claim_id}’'
2221 ) from err
2222 claim_entry.type = pac_claim_type
2223 claim_entry.values = claim_values_enum
2225 claim_entries.append(claim_entry)
2227 claims_array = claims.CLAIMS_ARRAY()
2228 claims_array.claims_source_type = pac_claim_source_type
2229 claims_array.claim_entries = claim_entries
2230 claims_array.claims_count = len(claim_entries)
2232 claims_arrays.append(claims_array)
2234 claims_set = claims.CLAIMS_SET()
2235 claims_set.claims_arrays = claims_arrays
2236 claims_set.claims_array_count = len(claims_arrays)
2238 claims_ctr = claims.CLAIMS_SET_CTR()
2239 claims_ctr.claims = claims_set
2241 claims_ndr = claims.CLAIMS_SET_NDR()
2242 claims_ndr.claims = claims_ctr
2244 metadata = claims.CLAIMS_SET_METADATA()
2245 metadata.claims_set = claims_ndr
2246 metadata.compression_format = (
2247 claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF)
2249 metadata_ctr = claims.CLAIMS_SET_METADATA_CTR()
2250 metadata_ctr.metadata = metadata
2252 metadata_ndr = claims.CLAIMS_SET_METADATA_NDR()
2253 metadata_ndr.claims = metadata_ctr
2255 pac_buffers = pac.buffers
2256 for pac_buffer in pac_buffers:
2257 if pac_buffer.type == pac_buffer_type:
2258 break
2259 else:
2260 pac_buffer = krb5pac.PAC_BUFFER()
2261 pac_buffer.type = pac_buffer_type
2262 pac_buffer.info = krb5pac.DATA_BLOB_REM()
2264 pac_buffers.append(pac_buffer)
2266 pac_buffer.info.remaining = ndr_pack(metadata_ndr)
2268 pac.buffers = pac_buffers
2269 pac.num_buffers = len(pac_buffers)
2271 return pac
2273 def add_extra_pac_buffers(self, pac, *, buffers=None):
2274 if buffers is None:
2275 buffers = []
2277 pac_buffers = pac.buffers
2278 for pac_buffer_type in buffers:
2279 info = krb5pac.DATA_BLOB_REM()
2280 # Having an empty PAC buffer will trigger an assertion failure in
2281 # the MIT KDC’s k5_pac_locate_buffer(), so we need at least one
2282 # byte.
2283 info.remaining = b'0'
2285 pac_buffer = krb5pac.PAC_BUFFER()
2286 pac_buffer.type = pac_buffer_type
2287 pac_buffer.info = info
2289 pac_buffers.append(pac_buffer)
2291 pac.buffers = pac_buffers
2292 pac.num_buffers = len(pac_buffers)
2294 return pac
2296 def get_cached_creds(self, *,
2297 account_type: AccountType,
2298 opts: Optional[dict]=None,
2299 samdb: Optional[SamDB]=None,
2300 use_cache=True) -> KerberosCredentials:
2301 if opts is None:
2302 opts = {}
2304 opts_default = {
2305 'name_prefix': None,
2306 'name_suffix': None,
2307 'add_dollar': None,
2308 'upn': None,
2309 'spn': None,
2310 'additional_details': None,
2311 'allowed_replication': False,
2312 'allowed_replication_mock': False,
2313 'denied_replication': False,
2314 'denied_replication_mock': False,
2315 'revealed_to_rodc': False,
2316 'revealed_to_mock_rodc': False,
2317 'no_auth_data_required': False,
2318 'expired_password': False,
2319 'supported_enctypes': None,
2320 'not_delegated': False,
2321 'delegation_to_spn': None,
2322 'delegation_from_dn': None,
2323 'trusted_to_auth_for_delegation': False,
2324 'fast_support': False,
2325 'claims_support': False,
2326 'compound_id_support': False,
2327 'sid_compression_support': True,
2328 'member_of': None,
2329 'kerberos_enabled': True,
2330 'secure_channel_type': None,
2331 'id': None,
2332 'force_nt4_hash': False,
2333 'assigned_policy': None,
2334 'assigned_silo': None,
2335 'logon_hours': None,
2336 'smartcard_required': False,
2337 'enabled': True,
2340 account_opts = {
2341 'account_type': account_type,
2342 **opts_default,
2343 **opts
2346 if use_cache:
2347 self.assertIsNone(samdb)
2348 cache_key = tuple(sorted(account_opts.items()))
2349 creds = self.account_cache.get(cache_key)
2350 if creds is not None:
2351 return creds
2353 creds = self.create_account_opts(samdb, use_cache, **account_opts)
2354 if use_cache:
2355 self.account_cache[cache_key] = creds
2357 return creds
2359 def create_account_opts(self,
2360 samdb: Optional[SamDB],
2361 use_cache,
2363 account_type,
2364 name_prefix,
2365 name_suffix,
2366 add_dollar,
2367 upn,
2368 spn,
2369 additional_details,
2370 allowed_replication,
2371 allowed_replication_mock,
2372 denied_replication,
2373 denied_replication_mock,
2374 revealed_to_rodc,
2375 revealed_to_mock_rodc,
2376 no_auth_data_required,
2377 expired_password,
2378 supported_enctypes,
2379 not_delegated,
2380 delegation_to_spn,
2381 delegation_from_dn,
2382 trusted_to_auth_for_delegation,
2383 fast_support,
2384 claims_support,
2385 compound_id_support,
2386 sid_compression_support,
2387 member_of,
2388 kerberos_enabled,
2389 secure_channel_type,
2391 force_nt4_hash,
2392 assigned_policy,
2393 assigned_silo,
2394 logon_hours,
2395 smartcard_required,
2396 enabled):
2397 if account_type is self.AccountType.USER:
2398 self.assertIsNone(delegation_to_spn)
2399 self.assertIsNone(delegation_from_dn)
2400 self.assertFalse(trusted_to_auth_for_delegation)
2401 else:
2402 self.assertFalse(not_delegated)
2404 if samdb is None:
2405 samdb = self.get_samdb()
2407 user_name = self.get_new_username()
2408 if name_prefix is not None:
2409 user_name = name_prefix + user_name
2410 if name_suffix is not None:
2411 user_name += name_suffix
2413 user_account_control = 0
2414 if trusted_to_auth_for_delegation:
2415 user_account_control |= UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
2416 if not_delegated:
2417 user_account_control |= UF_NOT_DELEGATED
2418 if no_auth_data_required:
2419 user_account_control |= UF_NO_AUTH_DATA_REQUIRED
2420 if smartcard_required:
2421 user_account_control |= UF_SMARTCARD_REQUIRED
2422 if not enabled:
2423 user_account_control |= UF_ACCOUNTDISABLE
2425 if additional_details:
2426 details = {k: v for k, v in additional_details}
2427 else:
2428 details = {}
2430 enctypes = supported_enctypes
2431 if fast_support:
2432 enctypes = enctypes or 0
2433 enctypes |= security.KERB_ENCTYPE_FAST_SUPPORTED
2434 if claims_support:
2435 enctypes = enctypes or 0
2436 enctypes |= security.KERB_ENCTYPE_CLAIMS_SUPPORTED
2437 if compound_id_support:
2438 enctypes = enctypes or 0
2439 enctypes |= security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED
2440 if sid_compression_support is False:
2441 enctypes = enctypes or 0
2442 enctypes |= security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED
2444 if enctypes is not None:
2445 details['msDS-SupportedEncryptionTypes'] = str(enctypes)
2447 if delegation_to_spn:
2448 details['msDS-AllowedToDelegateTo'] = delegation_to_spn
2450 if delegation_from_dn:
2451 if isinstance(delegation_from_dn, str):
2452 delegation_from_dn = self.get_security_descriptor(
2453 delegation_from_dn)
2454 details['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
2455 delegation_from_dn)
2457 if spn is None and account_type is not self.AccountType.USER:
2458 spn = 'host/' + user_name
2460 if assigned_policy is not None:
2461 details['msDS-AssignedAuthNPolicy'] = assigned_policy
2463 if assigned_silo is not None:
2464 details['msDS-AssignedAuthNPolicySilo'] = assigned_silo
2466 if logon_hours is not None:
2467 details['logonHours'] = logon_hours
2469 creds, dn = self.create_account(samdb, user_name,
2470 account_type=account_type,
2471 upn=upn,
2472 spn=spn,
2473 additional_details=details,
2474 account_control=user_account_control,
2475 add_dollar=add_dollar,
2476 force_nt4_hash=force_nt4_hash,
2477 expired_password=expired_password,
2478 export_to_keytab=False, # explicit below
2479 preserve=use_cache)
2481 expected_etypes = None
2483 # We don't force fetching the keys other than the NT hash as
2484 # how the server stores the unused KDC keys for the
2485 # smartcard_required case is not important and makes unrelated
2486 # tests break because of differences between Samba and
2487 # Windows.
2489 # The NT hash is different, as it is returned to the client in
2490 # the PAC so is visible in the network behaviour.
2491 if force_nt4_hash:
2492 expected_etypes = {kcrypto.Enctype.RC4}
2493 keys = self.get_keys(creds, expected_etypes=expected_etypes)
2494 self.creds_set_keys(creds, keys)
2496 # Handle secret replication to the RODC.
2498 if allowed_replication or revealed_to_rodc:
2499 rodc_samdb = self.get_rodc_samdb()
2500 rodc_dn = self.get_server_dn(rodc_samdb)
2502 # Allow replicating this account's secrets if requested, or allow
2503 # it only temporarily if we're about to replicate them.
2504 allowed_cleanup = self.add_to_group(
2505 dn, rodc_dn,
2506 'msDS-RevealOnDemandGroup')
2508 if revealed_to_rodc:
2509 # Replicate this account's secrets to the RODC.
2510 self.replicate_account_to_rodc(dn)
2512 if not allowed_replication:
2513 # If we don't want replicating secrets to be allowed for this
2514 # account, disable it again.
2515 samdb.modify(allowed_cleanup)
2517 self.check_revealed(dn,
2518 rodc_dn,
2519 revealed=revealed_to_rodc)
2521 if denied_replication:
2522 rodc_samdb = self.get_rodc_samdb()
2523 rodc_dn = self.get_server_dn(rodc_samdb)
2525 # Deny replicating this account's secrets to the RODC.
2526 self.add_to_group(dn, rodc_dn, 'msDS-NeverRevealGroup')
2528 # Handle secret replication to the mock RODC.
2530 if allowed_replication_mock or revealed_to_mock_rodc:
2531 # Allow replicating this account's secrets if requested, or allow
2532 # it only temporarily if we want to add the account to the mock
2533 # RODC's msDS-RevealedUsers.
2534 rodc_ctx = self.get_mock_rodc_ctx()
2535 mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2537 allowed_mock_cleanup = self.add_to_group(
2538 dn, mock_rodc_dn,
2539 'msDS-RevealOnDemandGroup')
2541 if revealed_to_mock_rodc:
2542 # Request replicating this account's secrets to the mock RODC,
2543 # which updates msDS-RevealedUsers.
2544 self.reveal_account_to_mock_rodc(dn)
2546 if not allowed_replication_mock:
2547 # If we don't want replicating secrets to be allowed for this
2548 # account, disable it again.
2549 samdb.modify(allowed_mock_cleanup)
2551 self.check_revealed(dn,
2552 mock_rodc_dn,
2553 revealed=revealed_to_mock_rodc)
2555 if denied_replication_mock:
2556 # Deny replicating this account's secrets to the mock RODC.
2557 rodc_ctx = self.get_mock_rodc_ctx()
2558 mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2560 self.add_to_group(dn, mock_rodc_dn, 'msDS-NeverRevealGroup')
2562 if member_of is not None:
2563 for group_dn in member_of:
2564 self.add_to_group(dn, ldb.Dn(samdb, group_dn), 'member',
2565 expect_attr=False)
2567 if kerberos_enabled:
2568 creds.set_kerberos_state(MUST_USE_KERBEROS)
2569 else:
2570 creds.set_kerberos_state(DONT_USE_KERBEROS)
2572 if secure_channel_type is not None:
2573 creds.set_secure_channel_type(secure_channel_type)
2575 self.remember_creds_for_keytab_export(creds)
2577 return creds
2579 def get_new_username(self):
2580 user_name = self.account_base + str(self.account_id)
2581 type(self).account_id += 1
2583 return user_name
2585 def get_client_creds(self,
2586 allow_missing_password=False,
2587 allow_missing_keys=True):
2588 def create_client_account():
2589 return self.get_cached_creds(account_type=self.AccountType.USER)
2591 c = self._get_krb5_creds(prefix='CLIENT',
2592 allow_missing_password=allow_missing_password,
2593 allow_missing_keys=allow_missing_keys,
2594 fallback_creds_fn=create_client_account)
2595 return c
2597 def get_mach_creds(self,
2598 allow_missing_password=False,
2599 allow_missing_keys=True):
2600 def create_mach_account():
2601 return self.get_cached_creds(
2602 account_type=self.AccountType.COMPUTER,
2603 opts={
2604 'fast_support': True,
2605 'claims_support': True,
2606 'compound_id_support': True,
2607 'supported_enctypes': (
2608 security.KERB_ENCTYPE_RC4_HMAC_MD5 |
2609 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2613 c = self._get_krb5_creds(prefix='MAC',
2614 allow_missing_password=allow_missing_password,
2615 allow_missing_keys=allow_missing_keys,
2616 fallback_creds_fn=create_mach_account)
2617 return c
2619 def get_service_creds(self,
2620 allow_missing_password=False,
2621 allow_missing_keys=True):
2622 def create_service_account():
2623 return self.get_cached_creds(
2624 account_type=self.AccountType.COMPUTER,
2625 opts={
2626 'trusted_to_auth_for_delegation': True,
2627 'fast_support': True,
2628 'claims_support': True,
2629 'compound_id_support': True,
2630 'supported_enctypes': (
2631 security.KERB_ENCTYPE_RC4_HMAC_MD5 |
2632 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2636 c = self._get_krb5_creds(prefix='SERVICE',
2637 allow_missing_password=allow_missing_password,
2638 allow_missing_keys=allow_missing_keys,
2639 fallback_creds_fn=create_service_account)
2640 return c
2642 def get_rodc_krbtgt_creds(self,
2643 require_keys=True,
2644 require_strongest_key=False):
2645 if require_strongest_key:
2646 self.assertTrue(require_keys)
2648 def download_rodc_krbtgt_creds():
2649 samdb = self.get_samdb()
2650 rodc_samdb = self.get_rodc_samdb()
2652 rodc_dn = self.get_server_dn(rodc_samdb)
2654 res = samdb.search(rodc_dn,
2655 scope=ldb.SCOPE_BASE,
2656 attrs=['msDS-KrbTgtLink'])
2657 krbtgt_dn = res[0]['msDS-KrbTgtLink'][0]
2659 res = samdb.search(krbtgt_dn,
2660 scope=ldb.SCOPE_BASE,
2661 attrs=['sAMAccountName',
2662 'msDS-KeyVersionNumber',
2663 'msDS-SecondaryKrbTgtNumber'])
2664 krbtgt_dn = res[0].dn
2665 username = str(res[0]['sAMAccountName'])
2667 creds = KerberosCredentials()
2668 creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
2669 creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
2670 creds.set_username(username)
2672 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2673 krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
2675 rodc_kvno = krbtgt_number << 16 | kvno
2676 creds.set_kvno(rodc_kvno)
2677 creds.set_dn(krbtgt_dn)
2679 keys = self.get_keys(creds)
2680 self.creds_set_keys(creds, keys)
2682 # The RODC krbtgt account should support the default enctypes,
2683 # although it might not have the msDS-SupportedEncryptionTypes
2684 # attribute.
2685 self.creds_set_default_enctypes(
2686 creds,
2687 fast_support=self.kdc_fast_support,
2688 claims_support=self.kdc_claims_support,
2689 compound_id_support=self.kdc_compound_id_support)
2691 if type(self).export_existing_creds:
2692 self.remember_creds_for_keytab_export(creds)
2694 return creds
2696 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
2697 allow_missing_password=True,
2698 allow_missing_keys=not require_keys,
2699 require_strongest_key=require_strongest_key,
2700 fallback_creds_fn=download_rodc_krbtgt_creds)
2701 return c
2703 def get_mock_rodc_krbtgt_creds(self,
2704 require_keys=True,
2705 require_strongest_key=False,
2706 preserve=True):
2707 if require_strongest_key:
2708 self.assertTrue(require_keys)
2710 def create_rodc_krbtgt_account():
2711 samdb = self.get_samdb()
2713 rodc_ctx = self.get_mock_rodc_ctx(preserve=preserve)
2715 krbtgt_dn = rodc_ctx.new_krbtgt_dn
2717 res = samdb.search(base=ldb.Dn(samdb, krbtgt_dn),
2718 scope=ldb.SCOPE_BASE,
2719 attrs=['msDS-KeyVersionNumber',
2720 'msDS-SecondaryKrbTgtNumber'])
2721 dn = res[0].dn
2722 username = str(rodc_ctx.krbtgt_name)
2724 krbtgt_creds = KerberosCredentials()
2725 krbtgt_creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
2726 krbtgt_creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
2727 krbtgt_creds.set_username(username)
2729 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2730 krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
2732 rodc_kvno = krbtgt_number << 16 | kvno
2733 krbtgt_creds.set_kvno(rodc_kvno)
2734 krbtgt_creds.set_dn(dn)
2736 krbtgt_keys = self.get_keys(krbtgt_creds)
2737 self.creds_set_keys(krbtgt_creds, krbtgt_keys)
2739 self.remember_creds_for_keytab_export(krbtgt_creds)
2741 acct_res = samdb.search(base=rodc_ctx.acct_dn,
2742 scope=ldb.SCOPE_BASE,
2743 attrs=['msDS-KeyVersionNumber',
2744 'objectSid',
2745 'objectGUID'])
2747 computer_creds = KerberosCredentials()
2748 computer_creds.set_domain(krbtgt_creds.get_domain())
2749 computer_creds.set_realm(krbtgt_creds.get_realm())
2750 computer_creds.set_username(rodc_ctx.samname)
2751 computer_creds.set_password(rodc_ctx.acct_pass)
2752 computer_creds.set_workstation(rodc_ctx.myname)
2753 computer_creds.set_secure_channel_type(misc.SEC_CHAN_RODC)
2754 computer_creds.set_dn(acct_res[0].dn)
2755 computer_creds.set_type(self.AccountType.RODC)
2756 computer_creds.set_user_account_control(rodc_ctx.userAccountControl)
2758 computer_kvno = int(acct_res[0]['msDS-KeyVersionNumber'][0])
2759 computer_creds.set_kvno(computer_kvno)
2761 sid = acct_res[0].get('objectSid', idx=0)
2762 sid = samdb.schema_format_value('objectSID', sid)
2763 sid = sid.decode('utf-8')
2764 computer_creds.set_sid(sid)
2765 guid = acct_res[0].get('objectGUID', idx=0)
2766 guid = samdb.schema_format_value('objectGUID', guid)
2767 guid = guid.decode('utf-8')
2768 computer_creds.set_guid(guid)
2770 computer_keys = self.get_keys(computer_creds)
2771 # we just let wireshark see the keys via drsuapi
2772 # but we don't force them on computer_creds
2773 # as computer_creds.set_password() could be
2774 # used by the caller...
2775 # self.creds_set_keys(computer_creds, computer_keys)
2777 self.remember_creds_for_keytab_export(computer_creds)
2779 if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
2780 extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2781 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96)
2782 else:
2783 extra_bits = 0
2784 remove_bits = (security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK |
2785 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2786 self.creds_set_enctypes(krbtgt_creds,
2787 extra_bits=extra_bits,
2788 remove_bits=remove_bits)
2789 self.creds_set_enctypes(computer_creds,
2790 extra_bits=extra_bits,
2791 remove_bits=remove_bits)
2793 krbtgt_creds.set_rodc_computer_creds(computer_creds)
2795 return krbtgt_creds
2797 if not preserve:
2798 return create_rodc_krbtgt_account()
2800 c = self._get_krb5_creds(prefix='MOCK_RODC_KRBTGT',
2801 allow_missing_password=True,
2802 allow_missing_keys=not require_keys,
2803 require_strongest_key=require_strongest_key,
2804 fallback_creds_fn=create_rodc_krbtgt_account)
2805 return c
2807 def get_krbtgt_creds(self,
2808 require_keys=True,
2809 require_strongest_key=False):
2810 if require_strongest_key:
2811 self.assertTrue(require_keys)
2813 def download_krbtgt_creds():
2814 samdb = self.get_samdb()
2816 krbtgt_rid = security.DOMAIN_RID_KRBTGT
2817 krbtgt_sid = '%s-%d' % (samdb.get_domain_sid(), krbtgt_rid)
2819 res = samdb.search(base='<SID=%s>' % krbtgt_sid,
2820 scope=ldb.SCOPE_BASE,
2821 attrs=['sAMAccountName',
2822 'msDS-KeyVersionNumber'])
2823 dn = res[0].dn
2824 username = str(res[0]['sAMAccountName'])
2826 creds = KerberosCredentials()
2827 creds.set_domain(self.env_get_var('DOMAIN', 'KRBTGT'))
2828 creds.set_realm(self.env_get_var('REALM', 'KRBTGT'))
2829 creds.set_username(username)
2831 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2832 creds.set_kvno(kvno)
2833 creds.set_dn(dn)
2835 keys = self.get_keys(creds)
2836 self.creds_set_keys(creds, keys)
2838 # The krbtgt account should support the default enctypes, although
2839 # it might not (on Samba) have the msDS-SupportedEncryptionTypes
2840 # attribute.
2841 self.creds_set_default_enctypes(
2842 creds,
2843 fast_support=self.kdc_fast_support,
2844 claims_support=self.kdc_claims_support,
2845 compound_id_support=self.kdc_compound_id_support)
2847 if type(self).export_existing_creds:
2848 self.remember_creds_for_keytab_export(creds)
2850 return creds
2852 c = self._get_krb5_creds(prefix='KRBTGT',
2853 default_username='krbtgt',
2854 allow_missing_password=True,
2855 allow_missing_keys=not require_keys,
2856 require_strongest_key=require_strongest_key,
2857 fallback_creds_fn=download_krbtgt_creds)
2858 return c
2860 def get_dc_creds(self,
2861 require_keys=True,
2862 require_strongest_key=False):
2863 if require_strongest_key:
2864 self.assertTrue(require_keys)
2866 def download_dc_creds():
2867 samdb = self.get_samdb()
2869 dc_rid = 1000
2870 dc_sid = '%s-%d' % (samdb.get_domain_sid(), dc_rid)
2872 res = samdb.search(base='<SID=%s>' % dc_sid,
2873 scope=ldb.SCOPE_BASE,
2874 attrs=['sAMAccountName',
2875 'msDS-KeyVersionNumber'])
2876 dn = res[0].dn
2877 username = str(res[0]['sAMAccountName'])
2879 creds = KerberosCredentials()
2880 creds.set_domain(self.env_get_var('DOMAIN', 'DC'))
2881 creds.set_realm(self.env_get_var('REALM', 'DC'))
2882 creds.set_username(username)
2884 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2885 creds.set_kvno(kvno)
2886 creds.set_workstation(username[:-1])
2887 creds.set_dn(dn)
2889 keys = self.get_keys(creds)
2890 self.creds_set_keys(creds, keys)
2892 if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
2893 extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2894 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96)
2895 else:
2896 extra_bits = 0
2897 remove_bits = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2898 self.creds_set_enctypes(creds,
2899 extra_bits=extra_bits,
2900 remove_bits=remove_bits)
2902 if type(self).export_existing_creds:
2903 self.remember_creds_for_keytab_export(creds)
2905 return creds
2907 c = self._get_krb5_creds(prefix='DC',
2908 allow_missing_password=True,
2909 allow_missing_keys=not require_keys,
2910 require_strongest_key=require_strongest_key,
2911 fallback_creds_fn=download_dc_creds)
2912 return c
2914 def get_server_creds(self,
2915 require_keys=True,
2916 require_strongest_key=False):
2917 if require_strongest_key:
2918 self.assertTrue(require_keys)
2920 def download_server_creds():
2921 samdb = self.get_samdb()
2923 res = samdb.search(base=samdb.get_default_basedn(),
2924 expression=(f'(|(sAMAccountName={self.host}*)'
2925 f'(dNSHostName={self.host}))'),
2926 scope=ldb.SCOPE_SUBTREE,
2927 attrs=['sAMAccountName',
2928 'msDS-KeyVersionNumber'])
2929 self.assertEqual(1, len(res))
2930 dn = res[0].dn
2931 username = str(res[0]['sAMAccountName'])
2933 creds = KerberosCredentials()
2934 creds.set_domain(self.env_get_var('DOMAIN', 'SERVER'))
2935 creds.set_realm(self.env_get_var('REALM', 'SERVER'))
2936 creds.set_username(username)
2938 kvno = int(res[0]['msDS-KeyVersionNumber'][0])
2939 creds.set_kvno(kvno)
2940 creds.set_dn(dn)
2942 keys = self.get_keys(creds)
2943 self.creds_set_keys(creds, keys)
2945 if self.get_domain_functional_level() >= DS_DOMAIN_FUNCTION_2008:
2946 extra_bits = (security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 |
2947 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96)
2948 else:
2949 extra_bits = 0
2950 remove_bits = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
2951 self.creds_set_enctypes(creds,
2952 extra_bits=extra_bits,
2953 remove_bits=remove_bits)
2955 if type(self).export_existing_creds:
2956 self.remember_creds_for_keytab_export(creds)
2958 return creds
2960 c = self._get_krb5_creds(prefix='SERVER',
2961 allow_missing_password=True,
2962 allow_missing_keys=not require_keys,
2963 require_strongest_key=require_strongest_key,
2964 fallback_creds_fn=download_server_creds)
2965 return c
2967 # Get the credentials and server principal name of either the krbtgt, or a
2968 # specially created account, with resource SID compression either supported
2969 # or unsupported.
2970 def get_target(self,
2971 to_krbtgt, *,
2972 compound_id=None,
2973 compression=None,
2974 extra_enctypes=0):
2975 if to_krbtgt:
2976 self.assertIsNone(compound_id,
2977 "it's no good specifying compound id support "
2978 "for the krbtgt")
2979 self.assertIsNone(compression,
2980 "it's no good specifying compression support "
2981 "for the krbtgt")
2982 self.assertFalse(extra_enctypes,
2983 "it's no good specifying extra enctypes "
2984 "for the krbtgt")
2985 creds = self.get_krbtgt_creds()
2986 sname = self.get_krbtgt_sname()
2987 else:
2988 creds = self.get_cached_creds(
2989 account_type=self.AccountType.COMPUTER,
2990 opts={
2991 'supported_enctypes':
2992 security.KERB_ENCTYPE_RC4_HMAC_MD5 |
2993 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 |
2994 extra_enctypes,
2995 'compound_id_support': compound_id,
2996 'sid_compression_support': compression,
2998 target_name = creds.get_username()
3000 if target_name[-1] == '$':
3001 target_name = target_name[:-1]
3002 sname = self.PrincipalName_create(
3003 name_type=NT_PRINCIPAL,
3004 names=['host', target_name])
3006 return creds, sname
3008 def as_req(self, cname, sname, realm, etypes, padata=None, kdc_options=0):
3009 """Send a Kerberos AS_REQ, returns the undecoded response
3012 till = self.get_KerberosTime(offset=36000)
3014 req = self.AS_REQ_create(padata=padata,
3015 kdc_options=str(kdc_options),
3016 cname=cname,
3017 realm=realm,
3018 sname=sname,
3019 from_time=None,
3020 till_time=till,
3021 renew_time=None,
3022 nonce=0x7fffffff,
3023 etypes=etypes,
3024 addresses=None,
3025 additional_tickets=None)
3026 rep = self.send_recv_transaction(req)
3027 return rep
3029 def get_as_rep_key(self, creds, rep):
3030 """Extract the session key from an AS-REP
3032 rep_padata = self.der_decode(
3033 rep['e-data'],
3034 asn1Spec=krb5_asn1.METHOD_DATA())
3036 for pa in rep_padata:
3037 if pa['padata-type'] == PADATA_ETYPE_INFO2:
3038 padata_value = pa['padata-value']
3039 break
3040 else:
3041 self.fail('expected to find ETYPE-INFO2')
3043 etype_info2 = self.der_decode(
3044 padata_value, asn1Spec=krb5_asn1.ETYPE_INFO2())
3046 key = self.PasswordKey_from_etype_info2(creds, etype_info2[0],
3047 creds.get_kvno())
3048 return key
3050 def get_enc_timestamp_pa_data(self, creds, rep, skew=0):
3051 """generate the pa_data data element for an AS-REQ
3054 key = self.get_as_rep_key(creds, rep)
3056 return self.get_enc_timestamp_pa_data_from_key(key, skew=skew)
3058 def get_enc_timestamp_pa_data_from_key(self, key, skew=0):
3059 (patime, pausec) = self.get_KerberosTimeWithUsec(offset=skew)
3060 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
3061 padata = self.der_encode(padata, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
3063 padata = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, padata)
3064 padata = self.der_encode(padata, asn1Spec=krb5_asn1.EncryptedData())
3066 padata = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, padata)
3068 return padata
3070 def get_challenge_pa_data(self, client_challenge_key, skew=0):
3071 patime, pausec = self.get_KerberosTimeWithUsec(offset=skew)
3072 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
3073 padata = self.der_encode(padata,
3074 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
3076 padata = self.EncryptedData_create(client_challenge_key,
3077 KU_ENC_CHALLENGE_CLIENT,
3078 padata)
3079 padata = self.der_encode(padata,
3080 asn1Spec=krb5_asn1.EncryptedData())
3082 padata = self.PA_DATA_create(PADATA_ENCRYPTED_CHALLENGE,
3083 padata)
3085 return padata
3087 def get_as_rep_enc_data(self, key, rep):
3088 """ Decrypt and Decode the encrypted data in an AS-REP
3090 enc_part = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
3091 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
3092 # application tag 26
3093 try:
3094 enc_part = self.der_decode(
3095 enc_part, asn1Spec=krb5_asn1.EncASRepPart())
3096 except Exception:
3097 enc_part = self.der_decode(
3098 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
3100 return enc_part
3102 def check_pre_authentication(self, rep):
3103 """ Check that the kdc response was pre-authentication required
3105 self.check_error_rep(rep, KDC_ERR_PREAUTH_REQUIRED)
3107 def check_as_reply(self, rep):
3108 """ Check that the kdc response is an AS-REP and that the
3109 values for:
3110 msg-type
3111 pvno
3112 tkt-pvno
3113 kvno
3114 match the expected values
3116 self.check_reply(rep, msg_type=KRB_AS_REP)
3118 def check_tgs_reply(self, rep):
3119 """ Check that the kdc response is an TGS-REP and that the
3120 values for:
3121 msg-type
3122 pvno
3123 tkt-pvno
3124 kvno
3125 match the expected values
3127 self.check_reply(rep, msg_type=KRB_TGS_REP)
3129 def check_reply(self, rep, msg_type):
3131 # Should have a reply, and it should an TGS-REP message.
3132 self.assertIsNotNone(rep)
3133 self.assertEqual(rep['msg-type'], msg_type, "rep = {%s}" % rep)
3135 # Protocol version number should be 5
3136 pvno = int(rep['pvno'])
3137 self.assertEqual(5, pvno, "rep = {%s}" % rep)
3139 # The ticket version number should be 5
3140 tkt_vno = int(rep['ticket']['tkt-vno'])
3141 self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
3143 # Check that the kvno is not an RODC kvno
3144 # MIT kerberos does not provide the kvno, so we treat it as optional.
3145 # This is tested in compatability_test.py
3146 if 'kvno' in rep['enc-part']:
3147 kvno = int(rep['enc-part']['kvno'])
3148 # If the high order bits are set this is an RODC kvno.
3149 self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
3151 def check_error_rep(self, rep, expected):
3152 """ Check that the reply is an error message, with the expected
3153 error-code specified.
3155 self.assertIsNotNone(rep)
3156 self.assertEqual(rep['msg-type'], KRB_ERROR, "rep = {%s}" % rep)
3157 if isinstance(expected, collections.abc.Container):
3158 self.assertIn(rep['error-code'], expected, "rep = {%s}" % rep)
3159 else:
3160 self.assertEqual(rep['error-code'], expected, "rep = {%s}" % rep)
3162 def tgs_req(self, cname, sname, realm, ticket, key, etypes,
3163 expected_error_mode=0, padata=None, kdc_options=0,
3164 to_rodc=False, creds=None, service_creds=None, expect_pac=True,
3165 expect_edata=None, expected_flags=None, unexpected_flags=None):
3166 """Send a TGS-REQ, returns the response and the decrypted and
3167 decoded enc-part
3170 subkey = self.RandomKey(key.etype)
3172 (ctime, cusec) = self.get_KerberosTimeWithUsec()
3174 tgt = KerberosTicketCreds(ticket,
3175 key,
3176 crealm=realm,
3177 cname=cname)
3179 if service_creds is not None:
3180 decryption_key = self.TicketDecryptionKey_from_creds(
3181 service_creds)
3182 expected_supported_etypes = service_creds.tgs_supported_enctypes
3183 else:
3184 decryption_key = None
3185 expected_supported_etypes = None
3187 if not expected_error_mode:
3188 check_error_fn = None
3189 check_rep_fn = self.generic_check_kdc_rep
3190 else:
3191 check_error_fn = self.generic_check_kdc_error
3192 check_rep_fn = None
3194 def generate_padata(_kdc_exchange_dict,
3195 _callback_dict,
3196 req_body):
3198 return padata, req_body
3200 kdc_exchange_dict = self.tgs_exchange_dict(
3201 creds=creds,
3202 expected_crealm=realm,
3203 expected_cname=cname,
3204 expected_srealm=realm,
3205 expected_sname=sname,
3206 expected_error_mode=expected_error_mode,
3207 expected_flags=expected_flags,
3208 unexpected_flags=unexpected_flags,
3209 expected_supported_etypes=expected_supported_etypes,
3210 check_error_fn=check_error_fn,
3211 check_rep_fn=check_rep_fn,
3212 check_kdc_private_fn=self.generic_check_kdc_private,
3213 ticket_decryption_key=decryption_key,
3214 generate_padata_fn=generate_padata if padata is not None else None,
3215 tgt=tgt,
3216 authenticator_subkey=subkey,
3217 kdc_options=str(kdc_options),
3218 expect_edata=expect_edata,
3219 expect_pac=expect_pac,
3220 to_rodc=to_rodc)
3222 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3223 cname=None,
3224 realm=realm,
3225 sname=sname,
3226 etypes=etypes)
3228 if expected_error_mode:
3229 enc_part = None
3230 else:
3231 ticket_creds = kdc_exchange_dict['rep_ticket_creds']
3232 enc_part = ticket_creds.encpart_private
3234 return rep, enc_part
3236 def get_service_ticket(self, tgt, target_creds, service='host',
3237 sname=None,
3238 target_name=None, till=None, rc4_support=True,
3239 to_rodc=False, kdc_options=None,
3240 expected_flags=None, unexpected_flags=None,
3241 expected_groups=None,
3242 unexpected_groups=None,
3243 expect_client_claims=None,
3244 expect_device_claims=None,
3245 expected_client_claims=None,
3246 unexpected_client_claims=None,
3247 expected_device_claims=None,
3248 unexpected_device_claims=None,
3249 pac_request=True, expect_pac=True,
3250 expect_requester_sid=None,
3251 expect_pac_attrs=None,
3252 expect_pac_attrs_pac_request=None,
3253 expect_krbtgt_referral=False,
3254 fresh=False):
3255 user_name = tgt.cname['name-string'][0]
3256 ticket_sname = tgt.sname
3257 if target_name is None:
3258 target_name = target_creds.get_username()[:-1]
3259 else:
3260 self.assertIsNone(sname, 'supplied both target name and sname')
3261 cache_key = (user_name, target_name, service, to_rodc, kdc_options,
3262 pac_request, str(expected_flags), str(unexpected_flags),
3263 till, rc4_support,
3264 str(ticket_sname),
3265 str(sname),
3266 str(expected_groups),
3267 str(unexpected_groups),
3268 expect_client_claims, expect_device_claims,
3269 str(expected_client_claims),
3270 str(unexpected_client_claims),
3271 str(expected_device_claims),
3272 str(unexpected_device_claims),
3273 expect_pac,
3274 expect_requester_sid,
3275 expect_pac_attrs,
3276 expect_pac_attrs_pac_request)
3278 if not fresh:
3279 ticket = self.tkt_cache.get(cache_key)
3281 if ticket is not None:
3282 return ticket
3284 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
3286 if kdc_options is None:
3287 kdc_options = '0'
3288 kdc_options = str(krb5_asn1.KDCOptions(kdc_options))
3290 if sname is None:
3291 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
3292 names=[service, target_name])
3294 srealm = target_creds.get_realm()
3296 authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
3298 decryption_key = self.TicketDecryptionKey_from_creds(target_creds)
3300 kdc_exchange_dict = self.tgs_exchange_dict(
3301 expected_crealm=tgt.crealm,
3302 expected_cname=tgt.cname,
3303 expected_srealm=srealm,
3304 expected_sname=sname,
3305 expected_supported_etypes=target_creds.tgs_supported_enctypes,
3306 expected_flags=expected_flags,
3307 unexpected_flags=unexpected_flags,
3308 expected_groups=expected_groups,
3309 unexpected_groups=unexpected_groups,
3310 expect_client_claims=expect_client_claims,
3311 expect_device_claims=expect_device_claims,
3312 expected_client_claims=expected_client_claims,
3313 unexpected_client_claims=unexpected_client_claims,
3314 expected_device_claims=expected_device_claims,
3315 unexpected_device_claims=unexpected_device_claims,
3316 ticket_decryption_key=decryption_key,
3317 expect_ticket_kvno=(not expect_krbtgt_referral),
3318 check_rep_fn=self.generic_check_kdc_rep,
3319 check_kdc_private_fn=self.generic_check_kdc_private,
3320 tgt=tgt,
3321 authenticator_subkey=authenticator_subkey,
3322 kdc_options=kdc_options,
3323 pac_request=pac_request,
3324 expect_pac=expect_pac,
3325 expect_requester_sid=expect_requester_sid,
3326 expect_pac_attrs=expect_pac_attrs,
3327 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
3328 rc4_support=rc4_support,
3329 to_rodc=to_rodc)
3331 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3332 cname=None,
3333 realm=srealm,
3334 sname=sname,
3335 till_time=till,
3336 etypes=etype)
3337 self.check_tgs_reply(rep)
3339 service_ticket_creds = kdc_exchange_dict['rep_ticket_creds']
3341 if to_rodc:
3342 krbtgt_creds = self.get_rodc_krbtgt_creds()
3343 else:
3344 krbtgt_creds = self.get_krbtgt_creds()
3345 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3347 is_tgs_princ = self.is_tgs_principal(sname)
3348 expect_ticket_checksum = (self.tkt_sig_support
3349 and not is_tgs_princ)
3350 expect_full_checksum = (self.full_sig_support
3351 and not is_tgs_princ)
3352 self.verify_ticket(service_ticket_creds, krbtgt_key,
3353 service_ticket=True, expect_pac=expect_pac,
3354 expect_ticket_checksum=expect_ticket_checksum,
3355 expect_full_checksum=expect_full_checksum)
3357 self.tkt_cache[cache_key] = service_ticket_creds
3359 return service_ticket_creds
3361 def get_tgt(self, creds, to_rodc=False, kdc_options=None,
3362 client_account=None, client_name_type=NT_PRINCIPAL,
3363 target_creds=None, ticket_etype=None,
3364 expected_flags=None, unexpected_flags=None,
3365 expected_account_name=None, expected_upn_name=None,
3366 expected_cname=None,
3367 expected_sid=None,
3368 sname=None, realm=None,
3369 expected_groups=None,
3370 unexpected_groups=None,
3371 pac_request=True, expect_pac=True,
3372 expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
3373 pac_options=None,
3374 expect_requester_sid=None,
3375 rc4_support=True,
3376 expect_edata=None,
3377 expect_client_claims=None, expect_device_claims=None,
3378 expected_client_claims=None, unexpected_client_claims=None,
3379 expected_device_claims=None, unexpected_device_claims=None,
3380 fresh=False):
3381 if client_account is not None:
3382 user_name = client_account
3383 else:
3384 user_name = creds.get_username()
3386 cache_key = (user_name, to_rodc, kdc_options, pac_request, pac_options,
3387 client_name_type,
3388 ticket_etype,
3389 str(expected_flags), str(unexpected_flags),
3390 expected_account_name, expected_upn_name, expected_sid,
3391 str(sname), str(realm),
3392 str(expected_groups),
3393 str(unexpected_groups),
3394 str(expected_cname),
3395 rc4_support,
3396 expect_edata,
3397 expect_pac, expect_pac_attrs,
3398 expect_pac_attrs_pac_request, expect_requester_sid,
3399 expect_client_claims, expect_device_claims,
3400 str(expected_client_claims),
3401 str(unexpected_client_claims),
3402 str(expected_device_claims),
3403 str(unexpected_device_claims))
3405 if not fresh:
3406 tgt = self.tkt_cache.get(cache_key)
3408 if tgt is not None:
3409 return tgt
3411 if realm is None:
3412 realm = creds.get_realm()
3414 salt = creds.get_salt()
3416 etype = self.get_default_enctypes(creds)
3417 cname = self.PrincipalName_create(name_type=client_name_type,
3418 names=user_name.split('/'))
3419 if sname is None:
3420 sname = self.PrincipalName_create(name_type=NT_SRV_INST,
3421 names=['krbtgt', realm])
3422 expected_sname = self.PrincipalName_create(
3423 name_type=NT_SRV_INST, names=['krbtgt', realm.upper()])
3424 else:
3425 expected_sname = sname
3427 if expected_cname is None:
3428 expected_cname = cname
3430 till = self.get_KerberosTime(offset=36000)
3432 if target_creds is not None:
3433 krbtgt_creds = target_creds
3434 elif to_rodc:
3435 krbtgt_creds = self.get_rodc_krbtgt_creds()
3436 else:
3437 krbtgt_creds = self.get_krbtgt_creds()
3438 ticket_decryption_key = (
3439 self.TicketDecryptionKey_from_creds(krbtgt_creds,
3440 etype=ticket_etype))
3442 expected_etypes = krbtgt_creds.tgs_supported_enctypes
3444 if kdc_options is None:
3445 kdc_options = ('forwardable,'
3446 'renewable,'
3447 'canonicalize,'
3448 'renewable-ok')
3449 kdc_options = krb5_asn1.KDCOptions(kdc_options)
3451 if pac_options is None:
3452 pac_options = '1' # supports claims
3454 rep, kdc_exchange_dict = self._test_as_exchange(
3455 creds=creds,
3456 cname=cname,
3457 realm=realm,
3458 sname=sname,
3459 till=till,
3460 expected_error_mode=KDC_ERR_PREAUTH_REQUIRED,
3461 expected_crealm=realm,
3462 expected_cname=expected_cname,
3463 expected_srealm=realm,
3464 expected_sname=sname,
3465 expected_account_name=expected_account_name,
3466 expected_upn_name=expected_upn_name,
3467 expected_sid=expected_sid,
3468 expected_groups=expected_groups,
3469 unexpected_groups=unexpected_groups,
3470 expected_salt=salt,
3471 expected_flags=expected_flags,
3472 unexpected_flags=unexpected_flags,
3473 expected_supported_etypes=expected_etypes,
3474 etypes=etype,
3475 padata=None,
3476 kdc_options=kdc_options,
3477 preauth_key=None,
3478 ticket_decryption_key=ticket_decryption_key,
3479 pac_request=pac_request,
3480 pac_options=pac_options,
3481 expect_pac=expect_pac,
3482 expect_pac_attrs=expect_pac_attrs,
3483 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
3484 expect_requester_sid=expect_requester_sid,
3485 rc4_support=rc4_support,
3486 expect_edata=expect_edata,
3487 expect_client_claims=expect_client_claims,
3488 expect_device_claims=expect_device_claims,
3489 expected_client_claims=expected_client_claims,
3490 unexpected_client_claims=unexpected_client_claims,
3491 expected_device_claims=expected_device_claims,
3492 unexpected_device_claims=unexpected_device_claims,
3493 to_rodc=to_rodc)
3494 self.check_pre_authentication(rep)
3496 etype_info2 = kdc_exchange_dict['preauth_etype_info2']
3498 preauth_key = self.PasswordKey_from_etype_info2(creds,
3499 etype_info2[0],
3500 creds.get_kvno())
3502 ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
3504 padata = [ts_enc_padata]
3506 expected_realm = realm.upper()
3508 rep, kdc_exchange_dict = self._test_as_exchange(
3509 creds=creds,
3510 cname=cname,
3511 realm=realm,
3512 sname=sname,
3513 till=till,
3514 expected_error_mode=0,
3515 expected_crealm=expected_realm,
3516 expected_cname=expected_cname,
3517 expected_srealm=expected_realm,
3518 expected_sname=expected_sname,
3519 expected_account_name=expected_account_name,
3520 expected_upn_name=expected_upn_name,
3521 expected_sid=expected_sid,
3522 expected_groups=expected_groups,
3523 unexpected_groups=unexpected_groups,
3524 expected_salt=salt,
3525 expected_flags=expected_flags,
3526 unexpected_flags=unexpected_flags,
3527 expected_supported_etypes=expected_etypes,
3528 etypes=etype,
3529 padata=padata,
3530 kdc_options=kdc_options,
3531 preauth_key=preauth_key,
3532 ticket_decryption_key=ticket_decryption_key,
3533 pac_request=pac_request,
3534 pac_options=pac_options,
3535 expect_pac=expect_pac,
3536 expect_pac_attrs=expect_pac_attrs,
3537 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
3538 expect_requester_sid=expect_requester_sid,
3539 rc4_support=rc4_support,
3540 expect_edata=expect_edata,
3541 expect_client_claims=expect_client_claims,
3542 expect_device_claims=expect_device_claims,
3543 expected_client_claims=expected_client_claims,
3544 unexpected_client_claims=unexpected_client_claims,
3545 expected_device_claims=expected_device_claims,
3546 unexpected_device_claims=unexpected_device_claims,
3547 to_rodc=to_rodc)
3548 self.check_as_reply(rep)
3550 ticket_creds = kdc_exchange_dict['rep_ticket_creds']
3552 self.tkt_cache[cache_key] = ticket_creds
3554 return ticket_creds
3556 def _make_tgs_request(self, client_creds, service_creds, tgt,
3557 client_account=None,
3558 client_name_type=NT_PRINCIPAL,
3559 kdc_options=None,
3560 pac_request=None, expect_pac=True,
3561 expect_error=False,
3562 expected_cname=None,
3563 expected_account_name=None,
3564 expected_upn_name=None,
3565 expected_sid=None):
3566 if client_account is None:
3567 client_account = client_creds.get_username()
3568 cname = self.PrincipalName_create(name_type=client_name_type,
3569 names=client_account.split('/'))
3571 service_account = service_creds.get_username()
3572 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
3573 names=[service_account])
3575 realm = service_creds.get_realm()
3577 expected_crealm = realm
3578 if expected_cname is None:
3579 expected_cname = cname
3580 expected_srealm = realm
3581 expected_sname = sname
3583 expected_supported_etypes = service_creds.tgs_supported_enctypes
3585 etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
3587 if kdc_options is None:
3588 kdc_options = 'canonicalize'
3589 kdc_options = str(krb5_asn1.KDCOptions(kdc_options))
3591 target_decryption_key = self.TicketDecryptionKey_from_creds(
3592 service_creds)
3594 authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
3596 if expect_error:
3597 expected_error_mode = expect_error
3598 if expected_error_mode is True:
3599 expected_error_mode = KDC_ERR_TGT_REVOKED
3600 check_error_fn = self.generic_check_kdc_error
3601 check_rep_fn = None
3602 else:
3603 expected_error_mode = 0
3604 check_error_fn = None
3605 check_rep_fn = self.generic_check_kdc_rep
3607 kdc_exchange_dict = self.tgs_exchange_dict(
3608 expected_crealm=expected_crealm,
3609 expected_cname=expected_cname,
3610 expected_srealm=expected_srealm,
3611 expected_sname=expected_sname,
3612 expected_account_name=expected_account_name,
3613 expected_upn_name=expected_upn_name,
3614 expected_sid=expected_sid,
3615 expected_supported_etypes=expected_supported_etypes,
3616 ticket_decryption_key=target_decryption_key,
3617 check_error_fn=check_error_fn,
3618 check_rep_fn=check_rep_fn,
3619 check_kdc_private_fn=self.generic_check_kdc_private,
3620 expected_error_mode=expected_error_mode,
3621 tgt=tgt,
3622 authenticator_subkey=authenticator_subkey,
3623 kdc_options=kdc_options,
3624 pac_request=pac_request,
3625 expect_pac=expect_pac,
3626 expect_edata=False)
3628 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3629 cname=cname,
3630 realm=realm,
3631 sname=sname,
3632 etypes=etypes)
3633 if expect_error:
3634 self.check_error_rep(rep, expected_error_mode)
3636 return None
3637 else:
3638 self.check_reply(rep, KRB_TGS_REP)
3640 return kdc_exchange_dict['rep_ticket_creds']
3642 # Named tuple to contain values of interest when the PAC is decoded.
3643 PacData = namedtuple(
3644 "PacData",
3645 "account_name account_sid logon_name upn domain_name")
3647 def get_pac_data(self, authorization_data):
3648 """Decode the PAC element contained in the authorization-data element
3650 account_name = None
3651 user_sid = None
3652 logon_name = None
3653 upn = None
3654 domain_name = None
3656 # The PAC data will be wrapped in an AD_IF_RELEVANT element
3657 ad_if_relevant_elements = (
3658 x for x in authorization_data if x['ad-type'] == AD_IF_RELEVANT)
3659 for dt in ad_if_relevant_elements:
3660 buf = self.der_decode(
3661 dt['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3662 # The PAC data is further wrapped in a AD_WIN2K_PAC element
3663 for ad in (x for x in buf if x['ad-type'] == AD_WIN2K_PAC):
3664 pb = ndr_unpack(krb5pac.PAC_DATA, ad['ad-data'])
3665 for pac in pb.buffers:
3666 if pac.type == krb5pac.PAC_TYPE_LOGON_INFO:
3667 account_name = (
3668 pac.info.info.info3.base.account_name)
3669 user_sid = (
3670 str(pac.info.info.info3.base.domain_sid)
3671 + "-" + str(pac.info.info.info3.base.rid))
3672 elif pac.type == krb5pac.PAC_TYPE_LOGON_NAME:
3673 logon_name = pac.info.account_name
3674 elif pac.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
3675 upn = pac.info.upn_name
3676 domain_name = pac.info.dns_domain_name
3678 return self.PacData(
3679 account_name,
3680 user_sid,
3681 logon_name,
3682 upn,
3683 domain_name)
3685 def decode_service_ticket(self, creds, ticket):
3686 """Decrypt and decode a service ticket
3689 enc_part = ticket['enc-part']
3691 key = self.TicketDecryptionKey_from_creds(creds,
3692 enc_part['etype'])
3694 if key.kvno is not None:
3695 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3697 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3698 enc_ticket_part = self.der_decode(
3699 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3700 return enc_ticket_part
3702 def modify_ticket_flag(self, enc_part, flag, value):
3703 self.assertIsInstance(value, bool)
3705 flag = krb5_asn1.TicketFlags(flag)
3706 pos = len(tuple(flag)) - 1
3708 flags = enc_part['flags']
3709 self.assertLessEqual(pos, len(flags))
3711 new_flags = flags[:pos] + str(int(value)) + flags[pos + 1:]
3712 enc_part['flags'] = new_flags
3714 return enc_part
3716 def get_objectSid(self, samdb, dn):
3717 """ Get the objectSID for a DN
3718 Note: performs an Ldb query.
3720 res = samdb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
3721 self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
3722 sid = samdb.schema_format_value("objectSID", res[0]["objectSID"][0])
3723 return sid.decode('utf8')
3725 def add_attribute(self, samdb, dn_str, name, value):
3726 if isinstance(value, list):
3727 values = value
3728 else:
3729 values = [value]
3730 flag = ldb.FLAG_MOD_ADD
3732 dn = ldb.Dn(samdb, dn_str)
3733 msg = ldb.Message(dn)
3734 msg[name] = ldb.MessageElement(values, flag, name)
3735 samdb.modify(msg)
3737 def modify_attribute(self, samdb, dn_str, name, value):
3738 if isinstance(value, list):
3739 values = value
3740 else:
3741 values = [value]
3742 flag = ldb.FLAG_MOD_REPLACE
3744 dn = ldb.Dn(samdb, dn_str)
3745 msg = ldb.Message(dn)
3746 msg[name] = ldb.MessageElement(values, flag, name)
3747 samdb.modify(msg)
3749 def remove_attribute(self, samdb, dn_str, name):
3750 flag = ldb.FLAG_MOD_DELETE
3752 dn = ldb.Dn(samdb, dn_str)
3753 msg = ldb.Message(dn)
3754 msg[name] = ldb.MessageElement([], flag, name)
3755 samdb.modify(msg)
3757 def create_ccache(self, cname, ticket, enc_part):
3758 """ Lay out a version 4 on-disk credentials cache, to be read using the
3759 FILE: protocol.
3762 field = krb5ccache.DELTATIME_TAG()
3763 field.kdc_sec_offset = 0
3764 field.kdc_usec_offset = 0
3766 v4tag = krb5ccache.V4TAG()
3767 v4tag.tag = 1
3768 v4tag.field = field
3770 v4tags = krb5ccache.V4TAGS()
3771 v4tags.tag = v4tag
3772 v4tags.further_tags = b''
3774 optional_header = krb5ccache.V4HEADER()
3775 optional_header.v4tags = v4tags
3777 cname_string = cname['name-string']
3779 cprincipal = krb5ccache.PRINCIPAL()
3780 cprincipal.name_type = cname['name-type']
3781 cprincipal.component_count = len(cname_string)
3782 cprincipal.realm = ticket['realm']
3783 cprincipal.components = cname_string
3785 sname = ticket['sname']
3786 sname_string = sname['name-string']
3788 sprincipal = krb5ccache.PRINCIPAL()
3789 sprincipal.name_type = sname['name-type']
3790 sprincipal.component_count = len(sname_string)
3791 sprincipal.realm = ticket['realm']
3792 sprincipal.components = sname_string
3794 key = self.EncryptionKey_import(enc_part['key'])
3796 key_data = key.export_obj()
3797 keyblock = krb5ccache.KEYBLOCK()
3798 keyblock.enctype = key_data['keytype']
3799 keyblock.data = key_data['keyvalue']
3801 addresses = krb5ccache.ADDRESSES()
3802 addresses.count = 0
3803 addresses.data = []
3805 authdata = krb5ccache.AUTHDATA()
3806 authdata.count = 0
3807 authdata.data = []
3809 # Re-encode the ticket, since it was decoded by another layer.
3810 ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
3812 authtime = enc_part['authtime']
3813 starttime = enc_part.get('starttime', authtime)
3814 endtime = enc_part['endtime']
3816 cred = krb5ccache.CREDENTIAL()
3817 cred.client = cprincipal
3818 cred.server = sprincipal
3819 cred.keyblock = keyblock
3820 cred.authtime = self.get_EpochFromKerberosTime(authtime)
3821 cred.starttime = self.get_EpochFromKerberosTime(starttime)
3822 cred.endtime = self.get_EpochFromKerberosTime(endtime)
3824 # Account for clock skew of up to five minutes.
3825 self.assertLess(cred.authtime - 5 * 60,
3826 datetime.now(timezone.utc).timestamp(),
3827 "Ticket not yet valid - clocks may be out of sync.")
3828 self.assertLess(cred.starttime - 5 * 60,
3829 datetime.now(timezone.utc).timestamp(),
3830 "Ticket not yet valid - clocks may be out of sync.")
3831 self.assertGreater(cred.endtime - 60 * 60,
3832 datetime.now(timezone.utc).timestamp(),
3833 "Ticket already expired/about to expire - "
3834 "clocks may be out of sync.")
3836 cred.renew_till = cred.endtime
3837 cred.is_skey = 0
3838 cred.ticket_flags = int(enc_part['flags'], 2)
3839 cred.addresses = addresses
3840 cred.authdata = authdata
3841 cred.ticket = ticket_data
3842 cred.second_ticket = b''
3844 ccache = krb5ccache.CCACHE()
3845 ccache.pvno = 5
3846 ccache.version = 4
3847 ccache.optional_header = optional_header
3848 ccache.principal = cprincipal
3849 ccache.cred = cred
3851 # Serialise the credentials cache structure.
3852 result = ndr_pack(ccache)
3854 # Create a temporary file and write the credentials.
3855 cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
3856 cachefile.write(result)
3857 cachefile.close()
3859 return cachefile
3861 def create_ccache_with_ticket(self, user_credentials, ticket, pac=True):
3862 # Place the ticket into a newly created credentials cache file.
3864 user_name = user_credentials.get_username()
3865 realm = user_credentials.get_realm()
3867 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
3868 names=[user_name])
3870 if not pac:
3871 ticket = self.modified_ticket(ticket, exclude_pac=True)
3873 # Write the ticket into a credentials cache file that can be ingested
3874 # by the main credentials code.
3875 cachefile = self.create_ccache(cname, ticket.ticket,
3876 ticket.encpart_private)
3878 # Create a credentials object to reference the credentials cache.
3879 creds = Credentials()
3880 creds.set_kerberos_state(MUST_USE_KERBEROS)
3881 creds.set_username(user_name, SPECIFIED)
3882 creds.set_realm(realm)
3883 creds.set_named_ccache(cachefile.name, SPECIFIED, self.get_lp())
3885 # Return the credentials along with the cache file.
3886 return (creds, cachefile)
3888 def create_ccache_with_user(self, user_credentials, mach_credentials,
3889 service="host", target_name=None, pac=True):
3890 # Obtain a service ticket authorising the user and place it into a
3891 # newly created credentials cache file.
3893 tgt = self.get_tgt(user_credentials)
3895 ticket = self.get_service_ticket(tgt, mach_credentials,
3896 service=service,
3897 target_name=target_name)
3899 return self.create_ccache_with_ticket(user_credentials, ticket,
3900 pac=pac)
3902 # Test credentials by connecting to the DC through LDAP.
3903 def _connect(self, creds, simple_bind, expect_error=None):
3904 samdb = self.get_samdb()
3905 dn = creds.get_dn()
3907 if simple_bind:
3908 url = f'ldaps://{samdb.host_dns_name()}'
3909 creds.set_bind_dn(str(dn))
3910 else:
3911 url = f'ldap://{samdb.host_dns_name()}'
3912 creds.set_bind_dn(None)
3913 try:
3914 ldap = SamDB(url=url,
3915 credentials=creds,
3916 lp=self.get_lp())
3917 except ldb.LdbError as err:
3918 self.assertIsNotNone(expect_error, 'got unexpected error')
3919 num, estr = err.args
3920 if num != ldb.ERR_INVALID_CREDENTIALS:
3921 raise
3923 self.assertIn(expect_error, estr)
3925 return
3926 else:
3927 self.assertIsNone(expect_error, 'expected to get an error')
3929 res = ldap.search('',
3930 scope=ldb.SCOPE_BASE,
3931 attrs=['tokenGroups'])
3932 self.assertEqual(1, len(res))
3934 sid = creds.get_sid()
3936 token_groups = res[0].get('tokenGroups', idx=0)
3937 token_sid = ndr_unpack(security.dom_sid, token_groups)
3939 self.assertEqual(sid, str(token_sid))
3941 # Test the two SAMR password change methods implemented in Samba. If the
3942 # user is protected, we should get an ACCOUNT_RESTRICTION error indicating
3943 # that the password change is not allowed.
3944 def _test_samr_change_password(self, creds, expect_error,
3945 connect_error=None):
3946 samdb = self.get_samdb()
3947 server_name = samdb.host_dns_name()
3948 try:
3949 conn = samr.samr(f'ncacn_np:{server_name}[seal,smb2]',
3950 self.get_lp(),
3951 creds)
3952 except NTSTATUSError as err:
3953 self.assertIsNotNone(connect_error,
3954 'connection unexpectedly failed')
3955 self.assertIsNone(expect_error, 'don’t specify both errors')
3957 num, _ = err.args
3958 self.assertEqual(num, connect_error)
3960 return
3961 else:
3962 self.assertIsNone(connect_error, 'expected connection to fail')
3964 # Get the NT hash.
3965 nt_hash = creds.get_nt_hash()
3967 # Generate a new UTF-16 password.
3968 new_password_str = generate_random_password(32, 32)
3969 new_password = new_password_str.encode('utf-16le')
3971 # Generate the MD4 hash of the password.
3972 new_password_md4 = md4_hash_blob(new_password)
3974 # Prefix the password with padding so it is 512 bytes long.
3975 new_password_len = len(new_password)
3976 remaining_len = 512 - new_password_len
3977 new_password = bytes(remaining_len) + new_password
3979 # Append the 32-bit length of the password.
3980 new_password += int.to_bytes(new_password_len,
3981 length=4,
3982 byteorder='little')
3984 # Create a key from the MD4 hash of the new password.
3985 key = new_password_md4[:14]
3987 # Encrypt the old NT hash with DES to obtain the verifier.
3988 verifier = des_crypt_blob_16(nt_hash, key)
3990 server = lsa.String()
3991 server.string = server_name
3993 account = lsa.String()
3994 account.string = creds.get_username()
3996 nt_verifier = samr.Password()
3997 nt_verifier.hash = list(verifier)
3999 nt_password = samr.CryptPassword()
4000 nt_password.data = list(arcfour_encrypt(nt_hash, new_password))
4002 if not self.expect_nt_hash:
4003 expect_error = ntstatus.NT_STATUS_NTLM_BLOCKED
4005 try:
4006 conn.ChangePasswordUser2(server=server,
4007 account=account,
4008 nt_password=nt_password,
4009 nt_verifier=nt_verifier,
4010 lm_change=False,
4011 lm_password=None,
4012 lm_verifier=None)
4013 except NTSTATUSError as err:
4014 num, _ = err.args
4015 self.assertIsNotNone(expect_error,
4016 f'unexpectedly failed with {num:08X}')
4017 self.assertEqual(num, expect_error)
4018 else:
4019 self.assertIsNone(expect_error, 'expected to fail')
4021 creds.set_password(new_password_str)
4023 # Get the NT hash.
4024 nt_hash = creds.get_nt_hash()
4026 # Generate a new UTF-16 password.
4027 new_password = generate_random_password(32, 32)
4028 new_password = new_password.encode('utf-16le')
4030 # Generate the MD4 hash of the password.
4031 new_password_md4 = md4_hash_blob(new_password)
4033 # Prefix the password with padding so it is 512 bytes long.
4034 new_password_len = len(new_password)
4035 remaining_len = 512 - new_password_len
4036 new_password = bytes(remaining_len) + new_password
4038 # Append the 32-bit length of the password.
4039 new_password += int.to_bytes(new_password_len,
4040 length=4,
4041 byteorder='little')
4043 # Create a key from the MD4 hash of the new password.
4044 key = new_password_md4[:14]
4046 # Encrypt the old NT hash with DES to obtain the verifier.
4047 verifier = des_crypt_blob_16(nt_hash, key)
4049 nt_verifier.hash = list(verifier)
4051 nt_password.data = list(arcfour_encrypt(nt_hash, new_password))
4053 try:
4054 conn.ChangePasswordUser3(server=server,
4055 account=account,
4056 nt_password=nt_password,
4057 nt_verifier=nt_verifier,
4058 lm_change=False,
4059 lm_password=None,
4060 lm_verifier=None,
4061 password3=None)
4062 except NTSTATUSError as err:
4063 self.assertIsNotNone(expect_error, 'unexpectedly failed')
4065 num, _ = err.args
4066 self.assertEqual(num, expect_error)
4067 else:
4068 self.assertIsNone(expect_error, 'expected to fail')
4070 # Test SamLogon. Authentication should succeed for non-protected accounts,
4071 # and fail for protected accounts.
4072 def _test_samlogon(self, creds, logon_type, expect_error=None,
4073 validation_level=netlogon.NetlogonValidationSamInfo2,
4074 domain_joined_mach_creds=None):
4075 samdb = self.get_samdb()
4077 if domain_joined_mach_creds is None:
4078 domain_joined_mach_creds = self.get_cached_creds(
4079 account_type=self.AccountType.COMPUTER,
4080 opts={'secure_channel_type': misc.SEC_CHAN_WKSTA})
4082 dc_server = samdb.host_dns_name()
4083 username, domain = creds.get_ntlm_username_domain()
4084 workstation = domain_joined_mach_creds.get_username()
4086 # Calling this initializes netlogon_creds on mach_creds, as is required
4087 # before calling mach_creds.encrypt_netr_PasswordInfo().
4088 conn = netlogon.netlogon(f'ncacn_ip_tcp:{dc_server}[schannel,seal]',
4089 self.get_lp(),
4090 domain_joined_mach_creds)
4091 (auth_type, auth_level) = conn.auth_info()
4093 if logon_type == netlogon.NetlogonInteractiveInformation:
4094 logon = netlogon.netr_PasswordInfo()
4096 lm_pass = samr.Password()
4097 lm_pass.hash = [0] * 16
4099 nt_pass = samr.Password()
4100 nt_pass.hash = list(creds.get_nt_hash())
4102 logon.lmpassword = lm_pass
4103 logon.ntpassword = nt_pass
4105 domain_joined_mach_creds.encrypt_netr_PasswordInfo(info=logon,
4106 auth_type=auth_type,
4107 auth_level=auth_level)
4109 elif logon_type == netlogon.NetlogonNetworkInformation:
4110 computername = ntlmssp.AV_PAIR()
4111 computername.AvId = ntlmssp.MsvAvNbComputerName
4112 computername.Value = workstation
4114 domainname = ntlmssp.AV_PAIR()
4115 domainname.AvId = ntlmssp.MsvAvNbDomainName
4116 domainname.Value = domain
4118 eol = ntlmssp.AV_PAIR()
4119 eol.AvId = ntlmssp.MsvAvEOL
4121 target_info = ntlmssp.AV_PAIR_LIST()
4122 target_info.count = 3
4123 target_info.pair = [domainname, computername, eol]
4125 target_info_blob = ndr_pack(target_info)
4127 challenge = b'abcdefgh'
4128 response = creds.get_ntlm_response(flags=0,
4129 challenge=challenge,
4130 target_info=target_info_blob)
4132 logon = netlogon.netr_NetworkInfo()
4134 logon.challenge = list(challenge)
4135 logon.nt = netlogon.netr_ChallengeResponse()
4136 logon.nt.length = len(response['nt_response'])
4137 logon.nt.data = list(response['nt_response'])
4139 else:
4140 self.fail(f'unknown logon type {logon_type}')
4142 identity_info = netlogon.netr_IdentityInfo()
4143 identity_info.domain_name.string = domain
4144 identity_info.account_name.string = username
4145 identity_info.parameter_control = (
4146 netlogon.MSV1_0_ALLOW_SERVER_TRUST_ACCOUNT) | (
4147 netlogon.MSV1_0_ALLOW_WORKSTATION_TRUST_ACCOUNT)
4148 identity_info.workstation.string = workstation
4150 logon.identity_info = identity_info
4152 netr_flags = 0
4154 validation = None
4156 if not expect_error and not self.expect_nt_hash:
4157 expect_error = ntstatus.NT_STATUS_NTLM_BLOCKED
4159 try:
4160 (validation, authoritative, flags) = (
4161 conn.netr_LogonSamLogonEx(dc_server,
4162 domain_joined_mach_creds.get_workstation(),
4163 logon_type,
4164 logon,
4165 validation_level,
4166 netr_flags))
4167 except NTSTATUSError as err:
4168 status, _ = err.args
4169 self.assertIsNotNone(expect_error,
4170 f'unexpectedly failed with {status:08X}')
4171 self.assertEqual(expect_error, status, 'got wrong status code')
4172 else:
4173 self.assertIsNone(expect_error, 'expected error')
4175 self.assertEqual(1, authoritative)
4176 self.assertEqual(0, flags)
4178 return validation
4180 def check_ticket_times(self,
4181 ticket_creds,
4182 expected_life=None,
4183 expected_renew_life=None,
4184 delta=0):
4185 ticket = ticket_creds.ticket_private
4187 authtime = ticket['authtime']
4188 starttime = ticket.get('starttime', authtime)
4189 endtime = ticket['endtime']
4190 renew_till = ticket.get('renew-till', None)
4192 starttime = self.get_EpochFromKerberosTime(starttime)
4194 if expected_life is not None:
4195 actual_end = self.get_EpochFromKerberosTime(
4196 endtime.decode('ascii'))
4197 actual_lifetime = actual_end - starttime
4199 self.assertAlmostEqual(expected_life, actual_lifetime, delta=delta)
4201 if renew_till is None:
4202 self.assertIsNone(expected_renew_life)
4203 else:
4204 if expected_renew_life is not None:
4205 actual_renew_till = self.get_EpochFromKerberosTime(
4206 renew_till.decode('ascii'))
4207 actual_renew_life = actual_renew_till - starttime
4209 self.assertAlmostEqual(expected_renew_life, actual_renew_life, delta=delta)