ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / krb5 / gmsa_tests.py
blob4189f05d22d360311e03facec83b8a3b95e6b09a
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2024
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <https://www.gnu.org/licenses/>.
20 import sys
21 import os
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
26 from typing import Callable, Iterable, NewType, Optional, Set, Tuple, TypeVar
28 import datetime
29 from itertools import chain
31 import ldb
33 from samba import (
34 auth,
35 dsdb,
36 generate_random_password,
37 gensec,
38 ntstatus,
39 NTSTATUSError,
40 werror,
42 from samba.dcerpc import gkdi, gmsa, misc, netlogon, security, srvsvc
43 from samba.ndr import ndr_pack, ndr_unpack
44 from samba.net import Net
45 from samba.nt_time import (
46 nt_time_delta_from_timedelta,
47 nt_time_from_datetime,
48 NtTime,
49 NtTimeDelta,
50 timedelta_from_nt_time_delta,
52 from samba.samdb import SamDB
53 from samba.credentials import Credentials, DONT_USE_KERBEROS
54 from samba.gkdi import (
55 Gkid,
56 GroupKey,
57 KEY_CYCLE_DURATION,
58 MAX_CLOCK_SKEW,
61 from samba.tests import connect_samdb
62 from samba.tests.dckeytab import keytab_as_set
63 from samba.tests.krb5 import kcrypto
64 from samba.tests.gkdi import GkdiBaseTest, ROOT_KEY_START_TIME
65 from samba.tests.krb5.kdc_base_test import KDCBaseTest
66 from samba.tests.krb5.raw_testcase import KerberosCredentials
67 from samba.tests.krb5.rfc4120_constants import (
68 KU_PA_ENC_TIMESTAMP,
69 NT_PRINCIPAL,
70 PADATA_ENC_TIMESTAMP,
72 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
74 GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL = 30
76 Gmsa = NewType("Gmsa", ldb.Message)
79 def gkdi_rollover_interval(managed_password_interval: int) -> NtTimeDelta:
80 rollover_interval = NtTimeDelta(
81 managed_password_interval * 24 // 10 * KEY_CYCLE_DURATION
83 if rollover_interval == 0:
84 raise ValueError("rollover interval must not be zero")
85 return rollover_interval
88 class GmsaSeries:
89 start_time: NtTime
90 rollover_interval: NtTimeDelta
92 def __init__(self, start_gkid: Gkid, rollover_interval: NtTimeDelta) -> None:
93 self.start_time = start_gkid.start_nt_time()
94 self.rollover_interval = rollover_interval
96 def interval_gkid(self, n: int) -> Gkid:
97 return Gkid.from_nt_time(self.start_of_interval(n))
99 def start_of_interval(self, n: int) -> NtTime:
100 if not isinstance(n, int):
101 raise ValueError(f"{n} must be an integer")
102 return NtTime(int(self.start_time + n * self.rollover_interval))
104 def during_interval(self, n: int) -> NtTime:
105 return NtTime(int(self.start_of_interval(n) + self.rollover_interval // 2))
107 def during_skew_window(self, n: int) -> NtTime:
108 two_minutes = nt_time_delta_from_timedelta(datetime.timedelta(minutes=2))
109 return NtTime(
110 int(self.start_of_interval(n) + self.rollover_interval - two_minutes)
113 def outside_previous_password_valid_window(self, n: int) -> NtTime:
114 return NtTime(self.start_of_interval(n) + MAX_CLOCK_SKEW)
116 def within_previous_password_valid_window(self, n: int) -> NtTime:
117 return NtTime(self.outside_previous_password_valid_window(n) - 1)
120 class GmsaTests(GkdiBaseTest, KDCBaseTest):
121 def _as_req(
122 self,
123 creds: KerberosCredentials,
124 target_creds: KerberosCredentials,
125 enctype: kcrypto.Enctype,
126 ) -> dict:
127 preauth_key = self.PasswordKey_from_creds(creds, enctype)
129 def generate_padata_fn(
130 _kdc_exchange_dict: dict, _callback_dict: Optional[dict], req_body: dict
131 ) -> Tuple[list, dict]:
132 padata = []
134 patime, pausec = self.get_KerberosTimeWithUsec()
135 enc_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
136 enc_ts = self.der_encode(enc_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
138 enc_ts = self.EncryptedData_create(preauth_key, KU_PA_ENC_TIMESTAMP, enc_ts)
139 enc_ts = self.der_encode(enc_ts, asn1Spec=krb5_asn1.EncryptedData())
141 enc_ts = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, enc_ts)
143 padata.append(enc_ts)
145 return padata, req_body
147 user_name = creds.get_username()
148 cname = self.PrincipalName_create(
149 name_type=NT_PRINCIPAL, names=user_name.split("/")
152 target_name = target_creds.get_username()
153 target_realm = target_creds.get_realm()
155 sname = self.PrincipalName_create(
156 name_type=NT_PRINCIPAL, names=["host", target_name[:-1]]
159 check_error_fn = None
160 check_rep_fn = self.generic_check_kdc_rep
162 expected_sname = self.PrincipalName_create(
163 name_type=NT_PRINCIPAL, names=[target_name]
166 kdc_options = "forwardable,renewable,canonicalize,renewable-ok"
167 kdc_options = krb5_asn1.KDCOptions(kdc_options)
169 ticket_decryption_key = self.TicketDecryptionKey_from_creds(target_creds)
171 kdc_exchange_dict = self.as_exchange_dict(
172 creds=creds,
173 expected_crealm=creds.get_realm(),
174 expected_cname=cname,
175 expected_srealm=target_realm,
176 expected_sname=expected_sname,
177 expected_supported_etypes=target_creds.tgs_supported_enctypes,
178 ticket_decryption_key=ticket_decryption_key,
179 generate_padata_fn=generate_padata_fn,
180 check_error_fn=check_error_fn,
181 check_rep_fn=check_rep_fn,
182 check_kdc_private_fn=self.generic_check_kdc_private,
183 expected_error_mode=0,
184 expected_salt=creds.get_salt(),
185 preauth_key=preauth_key,
186 kdc_options=str(kdc_options),
189 till = self.get_KerberosTime(offset=36000)
191 etypes = kcrypto.Enctype.AES256, kcrypto.Enctype.RC4
193 rep = self._generic_kdc_exchange(
194 kdc_exchange_dict,
195 cname=cname,
196 realm=target_realm,
197 sname=sname,
198 till_time=till,
199 etypes=etypes,
201 self.check_as_reply(rep)
203 return kdc_exchange_dict
205 # Note: unused
206 def gkdi_get_key_start_time(self, key_id: gkdi.KeyEnvelope) -> NtTime:
207 return Gkid.from_key_envelope(key_id).start_nt_time()
209 def get_password(
210 self,
211 samdb: SamDB,
212 target_sd: bytes,
213 root_key_id: Optional[misc.GUID],
214 gkid: Gkid,
215 sid: security.dom_sid,
216 ) -> bytes:
217 group_key = self.get_key_exact(samdb, target_sd, root_key_id, gkid)
219 password = self.generate_gmsa_password(group_key, sid)
220 return self.post_process_password_buffer(password)
222 def get_password_based_on_gkid(
223 self, samdb: SamDB, gkid: Gkid, sid: security.dom_sid
224 ) -> bytes:
225 return self.get_password(samdb, self.gmsa_sd, None, gkid, sid)
227 def get_password_based_on_timestamp(
228 self, samdb: SamDB, timestamp: NtTime, sid: security.dom_sid
229 ) -> bytes:
230 return self.get_password_based_on_gkid(samdb, Gkid.from_nt_time(timestamp), sid)
232 # Note: unused
233 def get_password_based_on_key_id(
234 self, samdb: SamDB, managed_password: gkdi.KeyEnvelope, sid: str
235 ) -> bytes:
236 return self.get_password(
237 samdb,
238 self.gmsa_sd,
239 managed_password.root_key_id,
240 Gkid.from_key_envelope(managed_password),
241 sid,
244 def generate_gmsa_password(self, key: GroupKey, sid: str) -> bytes:
245 context = ndr_pack(security.dom_sid(sid))
246 algorithm = key.hash_algorithm.algorithm()
247 gmsa_password_len = 256
249 return self.kdf(
250 algorithm,
251 key.key,
252 context,
253 label="GMSA PASSWORD",
254 len_in_bytes=gmsa_password_len,
257 def post_process_password_buffer(self, key: bytes) -> bytes:
258 self.assertEqual(0, len(key) & 1, f"length of key ({len(key)}) is not even")
260 def convert_null(t: Tuple[int, int]) -> Tuple[int, int]:
261 if t == (0, 0):
262 return 1, 0
264 return t
266 T = TypeVar("T")
268 def take_pairs(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]:
269 it = iter(iterable)
270 while True:
271 try:
272 first = next(it)
273 except StopIteration:
274 break
276 yield first, next(it)
278 return bytes(chain.from_iterable(map(convert_null, take_pairs(key))))
280 def get_gmsa_object(self, samdb: SamDB, dn: ldb.Dn) -> Gmsa:
281 res = samdb.search(
283 scope=ldb.SCOPE_BASE,
284 attrs=[
285 "msDS-ManagedPasswordInterval",
286 "msDS-ManagedPasswordId",
287 "msDS-ManagedPasswordPreviousId",
288 "whenCreated",
291 return res[0]
293 def gmsa_rollover_interval(self, gmsa_object: Gmsa) -> NtTimeDelta:
294 managed_password_interval = gmsa_object.get(
295 "msDS-ManagedPasswordInterval", idx=0
297 if managed_password_interval is None:
298 managed_password_interval = GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL
299 else:
300 managed_password_interval = int(managed_password_interval)
302 return gkdi_rollover_interval(managed_password_interval)
304 def gmsa_creation_nt_time(self, gmsa_object: Gmsa) -> NtTime:
305 creation_time: Optional[bytes] = gmsa_object.get("whenCreated", idx=0)
306 self.assertIsNotNone(creation_time)
307 assert creation_time is not None # to help the type checker
309 create_time = datetime.datetime.fromtimestamp(
310 ldb.string_to_time(creation_time.decode()), tz=datetime.timezone.utc
312 return nt_time_from_datetime(create_time)
314 def gmsa_series(self, managed_password_interval: int) -> GmsaSeries:
315 return GmsaSeries(
316 self.future_gkid(), gkdi_rollover_interval(managed_password_interval)
319 def gmsa_series_for_account(
320 self, samdb: SamDB, creds: KerberosCredentials, managed_password_interval: int
321 ) -> GmsaSeries:
322 gmsa_object = self.get_gmsa_object(samdb, creds.get_dn())
323 current_nt_time = self.current_nt_time(samdb)
324 gkid = Gkid.from_nt_time(
325 self.account_quantized_time(gmsa_object, current_nt_time)
327 return GmsaSeries(gkid, gkdi_rollover_interval(managed_password_interval))
329 def quantized_time(
330 self, key_start_time: NtTime, time: NtTime, gkdi_rollover_interval: NtTimeDelta
331 ) -> NtTime:
332 self.assertLessEqual(key_start_time, time)
334 time_since_key_start = NtTimeDelta(time - key_start_time)
335 quantized_time_since_key_start = NtTimeDelta(
336 time_since_key_start // gkdi_rollover_interval * gkdi_rollover_interval
338 return NtTime(key_start_time + quantized_time_since_key_start)
340 def account_quantized_time(self, gmsa_object: Gmsa, time: NtTime) -> NtTime:
341 pwd_id_blob = gmsa_object.get("msDS-ManagedPasswordId", idx=0)
342 self.assertIsNotNone(pwd_id_blob, "SAM should have initialized password ID")
344 pwd_id = ndr_unpack(gkdi.KeyEnvelope, pwd_id_blob)
345 key_start_time = Gkid.from_key_envelope(pwd_id).start_nt_time()
347 gkdi_rollover_interval = self.gmsa_rollover_interval(gmsa_object)
348 return self.quantized_time(key_start_time, time, gkdi_rollover_interval)
350 def expected_gmsa_password_blob(
351 self,
352 samdb: SamDB,
353 creds: KerberosCredentials,
354 gkid: Gkid,
356 query_expiration_gkid: Gkid,
357 previous_gkid: Optional[Gkid] = None,
358 return_future_key: bool = False,
359 ) -> gmsa.MANAGEDPASSWORD_BLOB:
360 new_password = self.get_password_based_on_gkid(samdb, gkid, creds.get_sid())
361 old_password = None
362 if previous_gkid is not None:
363 old_password = self.get_password_based_on_gkid(
364 samdb, previous_gkid, creds.get_sid()
367 current_time = self.current_nt_time(samdb)
369 gmsa_object = self.get_gmsa_object(samdb, creds.get_dn())
370 gkdi_rollover_interval = self.gmsa_rollover_interval(gmsa_object)
372 query_expiration_time = query_expiration_gkid.start_nt_time()
373 query_password_interval = NtTimeDelta(query_expiration_time - current_time)
374 unchanged_password_interval = NtTimeDelta(
375 max(
377 query_expiration_time
378 + (gkdi_rollover_interval if return_future_key else 0)
379 - current_time
380 - MAX_CLOCK_SKEW,
384 return self.marshal_password(
385 new_password,
386 old_password,
387 query_password_interval,
388 unchanged_password_interval,
391 def expected_current_gmsa_password_blob(
392 self,
393 samdb: SamDB,
394 creds: KerberosCredentials,
396 future_key_is_acceptable: bool,
397 ) -> gmsa.MANAGEDPASSWORD_BLOB:
398 gmsa_object = self.get_gmsa_object(samdb, creds.get_dn())
400 gkdi_rollover_interval = self.gmsa_rollover_interval(gmsa_object)
402 pwd_id_blob = gmsa_object.get("msDS-ManagedPasswordId", idx=0)
403 self.assertIsNotNone(pwd_id_blob, "SAM should have initialized password ID")
405 pwd_id = ndr_unpack(gkdi.KeyEnvelope, pwd_id_blob)
406 key_start_time = Gkid.from_key_envelope(pwd_id).start_nt_time()
408 current_time = self.current_nt_time(samdb)
410 new_key_start_time = self.quantized_time(
411 key_start_time, current_time, gkdi_rollover_interval
413 new_key_expiration_time = NtTime(new_key_start_time + gkdi_rollover_interval)
415 account_sid = creds.get_sid()
417 within_clock_skew_window = (
418 new_key_expiration_time - current_time <= MAX_CLOCK_SKEW
420 return_future_key = future_key_is_acceptable and within_clock_skew_window
421 if return_future_key:
422 new_password = self.get_password_based_on_timestamp(
423 samdb, new_key_expiration_time, account_sid
425 old_password = self.get_password_based_on_timestamp(
426 samdb, new_key_start_time, account_sid
428 else:
429 new_password = self.get_password_based_on_timestamp(
430 samdb, new_key_start_time, account_sid
433 account_age = NtTimeDelta(
434 current_time - self.gmsa_creation_nt_time(gmsa_object)
436 if account_age >= gkdi_rollover_interval:
437 old_password = self.get_password_based_on_timestamp(
438 samdb,
439 NtTime(new_key_start_time - gkdi_rollover_interval),
440 account_sid,
442 else:
443 # The account is not old enough to have a previous password.
444 old_password = None
446 key_expiration_time = NtTime(key_start_time + gkdi_rollover_interval)
447 key_is_expired = key_expiration_time <= current_time
449 query_expiration_time = NtTime(
450 new_key_expiration_time if key_is_expired else key_expiration_time
452 query_password_interval = NtTimeDelta(query_expiration_time - current_time)
453 unchanged_password_interval = NtTimeDelta(
454 max(
456 query_expiration_time
457 + (gkdi_rollover_interval if return_future_key else 0)
458 - current_time
459 - MAX_CLOCK_SKEW,
463 return self.marshal_password(
464 new_password,
465 old_password,
466 query_password_interval,
467 unchanged_password_interval,
470 def marshal_password(
471 self,
472 current_password: bytes,
473 previous_password: Optional[bytes],
474 query_password_interval: NtTimeDelta,
475 unchanged_password_interval: NtTimeDelta,
476 ) -> gmsa.MANAGEDPASSWORD_BLOB:
477 managed_password = gmsa.MANAGEDPASSWORD_BLOB()
479 managed_password.passwords.current = current_password
480 managed_password.passwords.previous = previous_password
481 managed_password.passwords.query_interval = query_password_interval
482 managed_password.passwords.unchanged_interval = unchanged_password_interval
484 return managed_password
486 def gmsa_account(
487 self,
489 samdb: Optional[SamDB] = None,
490 interval: int = 1,
491 msa_membership: Optional[str] = None,
492 **kwargs,
493 ) -> KerberosCredentials:
494 if msa_membership is None:
495 allow_world_sddl = "O:SYD:(A;;RP;;;WD)"
496 msa_membership = allow_world_sddl
498 msa_membership_sd = ndr_pack(
499 security.descriptor.from_sddl(msa_membership, security.dom_sid())
502 try:
503 creds = self.get_cached_creds(
504 samdb=samdb,
505 account_type=self.AccountType.GROUP_MANAGED_SERVICE,
506 opts={
507 "additional_details": self.freeze(
509 "msDS-GroupMSAMembership": msa_membership_sd,
510 "msDS-ManagedPasswordInterval": str(interval),
513 **kwargs,
515 # Ensure the gMSA is a brand‐new account.
516 use_cache=False,
518 except ldb.LdbError as err:
519 if err.args[0] == ldb.ERR_UNWILLING_TO_PERFORM:
520 self.fail(
521 "If you’re running these tests against Windows, try “warming up”"
522 " the GKDI service by running `samba.tests.krb5.gkdi_tests` first."
525 raise
527 # Derive the account’s current password. The account is too new to have a previous password yet.
528 managed_pwd = self.expected_current_gmsa_password_blob(
529 self.get_samdb() if samdb is None else samdb,
530 creds,
531 future_key_is_acceptable=False,
534 # Set the password.
535 self.assertIsNotNone(
536 managed_pwd.passwords.current, "current password must be present"
538 creds.set_utf16_password(managed_pwd.passwords.current)
540 return creds
542 def get_local_samdb(self) -> SamDB:
543 """Return a connection to the local database."""
545 lp = self.get_lp()
546 samdb = connect_samdb(
547 samdb_url=lp.samdb_url(), lp=lp, credentials=self.get_admin_creds()
549 self.assertLocalSamDB(samdb)
551 return samdb
553 # Perform a gensec logon using NTLMSSP. As samdb is passed in as a
554 # parameter, it can have a time set on it with set_db_time().
555 def gensec_ntlmssp_logon(
556 self, client_creds: Credentials, samdb: SamDB, expect_success: bool = True
557 ) -> "Optional[auth.session_info]":
558 lp = self.get_lp()
559 lp.set("server role", "active directory domain controller")
561 settings = {"lp_ctx": lp, "target_hostname": lp.get("netbios name")}
563 gensec_client = gensec.Security.start_client(settings)
564 # Ensure that we don’t use Kerberos.
565 self.assertEqual(DONT_USE_KERBEROS, client_creds.get_kerberos_state())
566 gensec_client.set_credentials(client_creds)
567 gensec_client.want_feature(gensec.FEATURE_SEAL)
568 gensec_client.start_mech_by_name("ntlmssp")
570 auth_context = auth.AuthContext(lp_ctx=lp, ldb=samdb)
572 gensec_server = gensec.Security.start_server(settings, auth_context)
573 machine_creds = Credentials()
574 machine_creds.guess(lp)
575 machine_creds.set_machine_account(lp)
576 gensec_server.set_credentials(machine_creds)
578 gensec_server.start_mech_by_name("ntlmssp")
580 client_finished = False
581 server_finished = False
582 client_to_server = b""
583 server_to_client = b""
585 # Operate as both the client and the server to verify the user’s credentials.
586 while not client_finished or not server_finished:
587 if not client_finished:
588 client_finished, client_to_server = gensec_client.update(
589 server_to_client
591 if not server_finished:
592 try:
593 server_finished, server_to_client = gensec_server.update(
594 client_to_server
596 except NTSTATUSError as err:
597 self.assertFalse(expect_success, "got an unexpected error")
599 self.assertEqual(ntstatus.NT_STATUS_WRONG_PASSWORD, err.args[0])
600 return None
602 self.assertTrue(expect_success, "expected to get an error")
604 # Retrieve the SIDs from the security token.
605 return gensec_server.session_info()
607 def check_nt_interval(
608 self,
609 expected_nt_interval: NtTimeDelta,
610 nt_interval: NtTimeDelta,
611 interval_name: str,
612 ) -> None:
613 """Check that the intervals match to within thirty seconds or so."""
615 threshold = datetime.timedelta(seconds=30)
617 interval = timedelta_from_nt_time_delta(nt_interval)
618 expected_interval = timedelta_from_nt_time_delta(expected_nt_interval)
619 interval_difference = abs(interval - expected_interval)
620 self.assertLess(
621 interval_difference,
622 threshold,
623 f"{interval_name} ({interval}) is out by {interval_difference} from"
624 f" expected ({expected_interval})",
627 def check_managed_pwd_intervals(
628 self,
629 expected_managed_pwd: gmsa.MANAGEDPASSWORD_BLOB,
630 managed_pwd: gmsa.MANAGEDPASSWORD_BLOB,
631 ) -> None:
632 expected_passwords = expected_managed_pwd.passwords
633 passwords = managed_pwd.passwords
635 self.check_nt_interval(
636 expected_passwords.query_interval,
637 passwords.query_interval,
638 "query interval",
640 self.check_nt_interval(
641 expected_passwords.unchanged_interval,
642 passwords.unchanged_interval,
643 "unchanged interval",
646 def check_managed_pwd(
647 self,
648 samdb: SamDB,
649 creds: KerberosCredentials,
651 expected_managed_pwd: gmsa.MANAGEDPASSWORD_BLOB,
652 ) -> None:
653 res = samdb.search(
654 creds.get_dn(), scope=ldb.SCOPE_BASE, attrs=["msDS-ManagedPassword"]
656 self.assertEqual(1, len(res), "gMSA not found")
657 managed_password = res[0].get("msDS-ManagedPassword", idx=0)
659 self.assertIsNotNone(managed_password)
660 managed_pwd = ndr_unpack(gmsa.MANAGEDPASSWORD_BLOB, managed_password)
662 self.assertEqual(1, managed_pwd.version)
663 self.assertEqual(0, managed_pwd.reserved)
664 self.assertEqual(len(managed_password), managed_pwd.length)
666 self.assertIsNotNone(expected_managed_pwd.passwords.current)
668 self.assertEqual(
669 managed_pwd.passwords.current, expected_managed_pwd.passwords.current
671 self.assertEqual(
672 managed_pwd.passwords.previous, expected_managed_pwd.passwords.previous
675 self.check_managed_pwd_intervals(expected_managed_pwd, managed_pwd)
677 # When creating a gMSA, Windows seems to pick the root key with the
678 # greatest msKds-CreateTime having msKds-UseStartTime ≤ ten hours ago.
679 # Bear in mind that it seems also to cache the key, so it won’t always
680 # use the latest one.
682 def get_managed_service_accounts_dn(self) -> ldb.Dn:
683 samdb = self.get_samdb()
684 return samdb.get_wellknown_dn(
685 samdb.get_default_basedn(), dsdb.DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
688 def check_managed_password_access(
689 self,
690 creds: Credentials,
692 samdb: Optional[SamDB] = None,
693 expect_access: bool = False,
694 expected_werror: int = werror.WERR_SUCCESS,
695 ) -> None:
696 if samdb is None:
697 samdb = self.get_samdb()
698 if expected_werror:
699 self.assertFalse(expect_access)
700 managed_service_accounts_dn = self.get_managed_service_accounts_dn()
701 username = creds.get_username()
703 # Try base, subtree, and one‐level searches.
704 searches = (
705 (creds.get_dn(), ldb.SCOPE_BASE),
706 (managed_service_accounts_dn, ldb.SCOPE_SUBTREE),
707 (managed_service_accounts_dn, ldb.SCOPE_ONELEVEL),
710 for dn, scope in searches:
711 # Perform a search and see whether we’re allowed to view the managed password.
713 try:
714 res = samdb.search(
716 scope=scope,
717 expression=f"sAMAccountName={username}",
718 attrs=["msDS-ManagedPassword"],
720 except ldb.LdbError as err:
721 self.assertTrue(expected_werror, "got an unexpected error")
723 num, estr = err.args
724 if num != ldb.ERR_OPERATIONS_ERROR:
725 raise
727 self.assertIn(f"{expected_werror:08X}", estr)
728 return
730 self.assertFalse(expected_werror, "expected to get an error")
731 self.assertEqual(1, len(res), "should always find the gMSA")
733 managed_password = res[0].get("msDS-ManagedPassword", idx=0)
734 if expect_access:
735 self.assertIsNotNone(
736 managed_password, "should be allowed to view the password"
738 else:
739 self.assertIsNone(
740 managed_password, "should not be allowed to view the password"
743 def test_retrieved_password_allowed(self):
744 """Test being allowed to view the managed password."""
745 self.check_managed_password_access(self.gmsa_account(), expect_access=True)
747 def test_retrieved_password_denied(self):
748 """Test not being allowed to view the managed password."""
749 deny_world_sddl = "O:SYD:(D;;RP;;;WD)"
750 self.check_managed_password_access(
751 self.gmsa_account(msa_membership=deny_world_sddl), expect_access=False
754 def test_retrieving_password_over_sealed_connection(self):
755 lp = self.get_lp()
756 samdb = SamDB(
757 f"ldap://{self.dc_host}",
758 credentials=self.get_admin_creds(),
759 session_info=auth.system_session(lp),
760 lp=lp,
763 self.check_managed_password_access(
764 self.gmsa_account(), samdb=samdb, expect_access=True
767 def test_retrieving_password_over_unsealed_connection(self):
768 # Requires --use-kerberos=required, or it automatically upgrades to an
769 # encrypted connection.
771 # Remove FEATURE_SEAL which gets added by insta_creds.
772 creds = self.insta_creds(template=self.get_admin_creds())
773 creds.set_gensec_features(creds.get_gensec_features() & ~gensec.FEATURE_SEAL)
775 lp = self.get_lp()
777 sasl_wrap = lp.get("client ldap sasl wrapping")
778 self.addCleanup(lp.set, "client ldap sasl wrapping", sasl_wrap)
779 lp.set("client ldap sasl wrapping", "sign")
781 # Create a second ldb connection without seal.
782 samdb = SamDB(
783 f"ldap://{self.dc_host}",
784 credentials=creds,
785 session_info=auth.system_session(lp),
786 lp=lp,
789 self.check_managed_password_access(
790 self.gmsa_account(),
791 samdb=samdb,
792 expected_werror=werror.WERR_DS_CONFIDENTIALITY_REQUIRED,
795 def test_retrieving_denied_password_over_unsealed_connection(self):
796 # Requires --use-kerberos=required, or it automatically upgrades to an
797 # encrypted connection.
799 # Remove FEATURE_SEAL which gets added by insta_creds.
800 creds = self.insta_creds(template=self.get_admin_creds())
801 creds.set_gensec_features(creds.get_gensec_features() & ~gensec.FEATURE_SEAL)
803 lp = self.get_lp()
805 sasl_wrap = lp.get("client ldap sasl wrapping")
806 self.addCleanup(lp.set, "client ldap sasl wrapping", sasl_wrap)
807 lp.set("client ldap sasl wrapping", "sign")
809 # Create a second ldb connection without seal.
810 samdb = SamDB(
811 f"ldap://{self.dc_host}",
812 credentials=creds,
813 session_info=auth.system_session(lp),
814 lp=lp,
817 # Deny anyone from being able to view the password.
818 deny_world_sddl = "O:SYD:(D;;RP;;;WD)"
819 self.check_managed_password_access(
820 self.gmsa_account(msa_membership=deny_world_sddl),
821 samdb=samdb,
822 expected_werror=werror.WERR_DS_CONFIDENTIALITY_REQUIRED,
825 def test_retrieving_password_after_encrypted_simple_bind(self):
826 """Test retrieving the managed password using a simple bind with encryption."""
827 admin_sid = self.get_samdb().get_admin_sid()
829 creds = self.insta_creds(template=self.get_admin_creds())
830 creds.set_bind_dn(admin_sid)
831 samdb = SamDB(
832 url=f"ldaps://{self.dc_host}", credentials=creds, lp=self.get_lp()
835 self.check_managed_password_access(
836 self.gmsa_account(), samdb=samdb, expect_access=True
839 def test_retrieving_password_after_unencrypted_simple_bind(self):
840 """Test retrieving the managed password using a simple bind without encryption."""
841 admin_sid = self.get_samdb().get_admin_sid()
843 creds = self.insta_creds(template=self.get_admin_creds())
844 creds.set_bind_dn(admin_sid)
845 try:
846 samdb = SamDB(
847 url=f"ldap://{self.dc_host}", credentials=creds, lp=self.get_lp()
849 except ldb.LdbError:
850 self.fail("failed to perform simple bind")
852 self.check_managed_password_access(
853 self.gmsa_account(),
854 samdb=samdb,
855 expected_werror=werror.WERR_DS_CONFIDENTIALITY_REQUIRED,
858 def future_gkid(self) -> Gkid:
859 """Return (6333, 26, 5)—an arbitrary GKID far enough in the future that
860 it’s situated beyond any reasonable rollover period. But not so far in
861 the future that Python’s datetime library will throw OverflowErrors."""
862 future_date = datetime.datetime(9000, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
863 return Gkid.from_nt_time(nt_time_from_datetime(future_date))
865 def future_time(self) -> NtTime:
866 """Return an arbitrary time far enough in the future that it’s situated
867 beyond any reasonable rollover period. But not so far in the future that
868 Python’s datetime library will throw OverflowErrors."""
869 return self.future_gkid().start_nt_time()
871 def test_retrieved_password(self):
872 """Test that we can retrieve the correct password for a gMSA."""
874 samdb = self.get_samdb()
875 creds = self.gmsa_account()
877 expected = self.expected_current_gmsa_password_blob(
878 samdb,
879 creds,
880 future_key_is_acceptable=True,
882 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
884 def test_retrieved_password_when_current_key_is_valid(self):
885 """Test that we can retrieve the correct password for a gMSA at a time
886 when we are sure it is valid."""
887 password_interval = 37
889 samdb = self.get_local_samdb()
890 series = self.gmsa_series(password_interval)
891 self.set_db_time(samdb, series.start_of_interval(0))
893 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
895 # Check the managed password of the account the moment it has been
896 # created.
897 expected = self.expected_gmsa_password_blob(
898 samdb,
899 creds,
900 series.interval_gkid(0),
901 previous_gkid=series.interval_gkid(-1),
902 query_expiration_gkid=series.interval_gkid(1),
904 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
906 def test_retrieved_password_when_current_key_is_expired(self):
907 """Test that we can retrieve the correct password for a gMSA when the
908 original password has expired."""
909 password_interval = 14
911 samdb = self.get_local_samdb()
912 series = self.gmsa_series(password_interval)
913 self.set_db_time(samdb, series.start_of_interval(0))
915 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
917 # Set the time to the moment the original password has expired, and
918 # check that the managed password is correct.
919 expired_time = series.start_of_interval(1)
920 self.set_db_time(samdb, expired_time)
921 expected = self.expected_gmsa_password_blob(
922 samdb,
923 creds,
924 series.interval_gkid(1),
925 previous_gkid=series.interval_gkid(0),
926 query_expiration_gkid=series.interval_gkid(2),
928 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
930 def test_retrieved_password_when_next_key_is_expired(self):
931 password_interval = 1
933 samdb = self.get_local_samdb()
934 series = self.gmsa_series(password_interval)
935 self.set_db_time(samdb, series.start_of_interval(0))
937 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
939 expired_time = series.start_of_interval(2)
940 self.set_db_time(samdb, expired_time)
942 expected = self.expected_gmsa_password_blob(
943 samdb,
944 creds,
945 series.interval_gkid(2),
946 previous_gkid=series.interval_gkid(1),
947 query_expiration_gkid=series.interval_gkid(3),
949 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
951 def test_retrieved_password_during_clock_skew_window_when_current_key_is_valid(
952 self,
954 password_interval = 60
956 samdb = self.get_local_samdb()
957 series = self.gmsa_series(password_interval)
958 self.set_db_time(samdb, series.start_of_interval(0))
960 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
962 self.set_db_time(samdb, series.during_skew_window(0))
964 expected = self.expected_gmsa_password_blob(
965 samdb,
966 creds,
967 series.interval_gkid(1),
968 previous_gkid=series.interval_gkid(0),
969 query_expiration_gkid=series.interval_gkid(1),
970 return_future_key=True,
972 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
974 def test_retrieved_password_during_clock_skew_window_when_current_key_is_expired(
975 self,
977 password_interval = 100
979 samdb = self.get_local_samdb()
980 series = self.gmsa_series(password_interval)
981 self.set_db_time(samdb, series.start_of_interval(0))
983 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
985 self.set_db_time(samdb, series.during_skew_window(1))
987 expected = self.expected_gmsa_password_blob(
988 samdb,
989 creds,
990 series.interval_gkid(2),
991 previous_gkid=series.interval_gkid(1),
992 query_expiration_gkid=series.interval_gkid(2),
993 return_future_key=True,
995 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
997 def test_retrieved_password_during_clock_skew_window_when_next_key_is_expired(
998 self,
1000 password_interval = 16
1002 samdb = self.get_local_samdb()
1003 series = self.gmsa_series(password_interval)
1004 self.set_db_time(samdb, series.start_of_interval(0))
1006 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
1008 self.set_db_time(samdb, series.during_skew_window(2))
1010 expected = self.expected_gmsa_password_blob(
1011 samdb,
1012 creds,
1013 series.interval_gkid(3),
1014 previous_gkid=series.interval_gkid(2),
1015 query_expiration_gkid=series.interval_gkid(3),
1016 return_future_key=True,
1018 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
1020 def test_retrieving_managed_password_triggers_keys_update(self):
1021 # Create a root key with a start time early enough to be usable at the
1022 # time the gMSA is purported to be created.
1023 samdb = self.get_samdb()
1024 domain_dn = self.get_server_dn(samdb)
1025 self.create_root_key(samdb, domain_dn, use_start_time=ROOT_KEY_START_TIME)
1027 password_interval = 16
1029 local_samdb = self.get_local_samdb()
1030 series = GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval))
1031 self.set_db_time(local_samdb, series.start_of_interval(0))
1033 creds = self.gmsa_account(samdb=local_samdb, interval=password_interval)
1034 dn = creds.get_dn()
1036 self.set_db_time(local_samdb, None)
1038 # Search the local database for the account’s keys.
1039 res = local_samdb.search(
1040 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1042 self.assertEqual(1, len(res))
1044 previous_nt_hash = res[0].get("unicodePwd", idx=0)
1045 previous_supplemental_creds = self.unpack_supplemental_credentials(
1046 res[0].get("supplementalCredentials", idx=0)
1049 # Check that the NT hash is the value we expect.
1050 self.assertEqual(creds.get_nt_hash(), previous_nt_hash)
1052 # Search for the managed password over LDAP, triggering an update of the
1053 # keys in the database.
1054 res = samdb.search(dn, scope=ldb.SCOPE_BASE, attrs=["msDS-ManagedPassword"])
1055 self.assertEqual(1, len(res))
1057 # Verify that the password is present in the result.
1058 managed_password = res[0].get("msDS-ManagedPassword", idx=0)
1059 self.assertIsNotNone(managed_password, "should be allowed to view the password")
1061 # Search the local database again for the account’s keys, which should
1062 # have been updated.
1063 res = local_samdb.search(
1064 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1066 self.assertEqual(1, len(res))
1068 nt_hash = res[0].get("unicodePwd", idx=0)
1069 supplemental_creds = self.unpack_supplemental_credentials(
1070 res[0].get("supplementalCredentials", idx=0)
1073 self.assertNotEqual(
1074 previous_nt_hash, nt_hash, "NT hash has not been updated (yet)"
1076 self.assertNotEqual(
1077 previous_supplemental_creds,
1078 supplemental_creds,
1079 "supplementalCredentials has not been updated (yet)",
1082 # Calculate the password with which to authenticate.
1083 current_series = self.gmsa_series_for_account(
1084 local_samdb, creds, password_interval
1086 managed_pwd = self.expected_gmsa_password_blob(
1087 local_samdb,
1088 creds,
1089 current_series.interval_gkid(0),
1090 query_expiration_gkid=current_series.interval_gkid(1),
1093 # Set the new password.
1094 self.assertIsNotNone(
1095 managed_pwd.passwords.current, "current password must be present"
1097 creds.set_utf16_password(managed_pwd.passwords.current)
1099 # Check that the new NT hash is the value we expect.
1100 self.assertEqual(creds.get_nt_hash(), nt_hash)
1102 def test_authentication_triggers_keys_update(self):
1103 # Create a root key with a start time early enough to be usable at the
1104 # time the gMSA is purported to be created. But don’t create it on a
1105 # local samdb with a specifically set time, because (if the key isn’t
1106 # deleted later) we could end up with multiple keys with identical
1107 # creation and start times, and tests failing when the test and the
1108 # server don’t agree on which root key to use at a specific time.
1109 samdb = self.get_samdb()
1110 domain_dn = self.get_server_dn(samdb)
1111 self.create_root_key(samdb, domain_dn, use_start_time=ROOT_KEY_START_TIME)
1113 password_interval = 16
1115 local_samdb = self.get_local_samdb()
1116 series = GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval))
1117 self.set_db_time(local_samdb, series.start_of_interval(0))
1119 creds = self.gmsa_account(samdb=local_samdb, interval=password_interval)
1120 dn = creds.get_dn()
1122 self.set_db_time(local_samdb, None)
1124 # Search the local database for the account’s keys.
1125 res = local_samdb.search(
1126 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1128 self.assertEqual(1, len(res))
1130 previous_nt_hash = res[0].get("unicodePwd", idx=0)
1131 previous_supplemental_creds = self.unpack_supplemental_credentials(
1132 res[0].get("supplementalCredentials", idx=0)
1135 # Check that the NT hash is the value we expect.
1136 self.assertEqual(creds.get_nt_hash(), previous_nt_hash)
1138 # Calculate the password with which to authenticate.
1139 current_series = self.gmsa_series_for_account(
1140 local_samdb, creds, password_interval
1142 managed_pwd = self.expected_gmsa_password_blob(
1143 local_samdb,
1144 creds,
1145 current_series.interval_gkid(0),
1146 query_expiration_gkid=current_series.interval_gkid(1),
1149 # Set the new password.
1150 self.assertIsNotNone(
1151 managed_pwd.passwords.current, "current password must be present"
1153 creds.set_utf16_password(managed_pwd.passwords.current)
1155 # Perform an authentication using the new password. The KDC should
1156 # recognize that the keys in the database are out of date and update
1157 # them.
1158 self._as_req(creds, self.get_service_creds(), kcrypto.Enctype.AES256)
1160 # Search the local database again for the account’s keys, which should
1161 # have been updated.
1162 res = local_samdb.search(
1163 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1165 self.assertEqual(1, len(res))
1167 nt_hash = res[0].get("unicodePwd", idx=0)
1168 supplemental_creds = self.unpack_supplemental_credentials(
1169 res[0].get("supplementalCredentials", idx=0)
1172 self.assertNotEqual(
1173 previous_nt_hash, nt_hash, "NT hash has not been updated (yet)"
1175 self.assertNotEqual(
1176 previous_supplemental_creds,
1177 supplemental_creds,
1178 "supplementalCredentials has not been updated (yet)",
1181 # Check that the new NT hash is the value we expect.
1182 self.assertEqual(creds.get_nt_hash(), nt_hash)
1184 def test_gmsa_can_perform_gensec_ntlmssp_logon(self):
1185 creds = self.gmsa_account(kerberos_enabled=False)
1187 # Perform a gensec logon.
1188 session = self.gensec_ntlmssp_logon(creds, self.get_local_samdb())
1190 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1191 token = session.security_token
1192 token_sids = token.sids
1193 self.assertGreater(len(token_sids), 0)
1195 # Ensure that they match.
1196 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1198 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_valid(self):
1199 """Test that we can perform a gensec logon at a time when we are sure
1200 the current gMSA password is valid."""
1202 password_interval = 18
1204 samdb = self.get_local_samdb()
1205 series = self.gmsa_series(password_interval)
1206 self.set_db_time(samdb, series.start_of_interval(0))
1208 creds = self.gmsa_account(
1209 samdb=samdb, interval=password_interval, kerberos_enabled=False
1212 # Perform a gensec logon.
1213 session = self.gensec_ntlmssp_logon(creds, samdb)
1215 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1216 token = session.security_token
1217 token_sids = token.sids
1218 self.assertGreater(len(token_sids), 0)
1220 # Ensure that they match.
1221 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1223 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_expired(self):
1224 """Test that we can perform a gensec logon using NTLMSSP at a time when
1225 the current gMSA password has expired."""
1227 password_interval = 40
1229 samdb = self.get_local_samdb()
1230 series = self.gmsa_series(password_interval)
1231 self.set_db_time(samdb, series.start_of_interval(0))
1233 creds = self.gmsa_account(
1234 samdb=samdb, interval=password_interval, kerberos_enabled=False
1237 # Set the time to the moment the original password has expired, and
1238 # perform a gensec logon.
1239 expired_time = series.start_of_interval(1)
1240 self.set_db_time(samdb, expired_time)
1242 # Calculate the password with which to authenticate.
1243 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1244 managed_pwd = self.expected_gmsa_password_blob(
1245 samdb,
1246 creds,
1247 current_series.interval_gkid(0),
1248 previous_gkid=current_series.interval_gkid(-1),
1249 query_expiration_gkid=current_series.interval_gkid(1),
1252 # Set the new password.
1253 self.assertIsNotNone(
1254 managed_pwd.passwords.current, "current password must be present"
1256 creds.set_utf16_password(managed_pwd.passwords.current)
1258 # Perform a gensec logon.
1259 session = self.gensec_ntlmssp_logon(creds, samdb)
1261 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1262 token = session.security_token
1263 token_sids = token.sids
1264 self.assertGreater(len(token_sids), 0)
1266 # Ensure that they match.
1267 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1269 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_next_key_is_expired(self):
1270 password_interval = 42
1272 samdb = self.get_local_samdb()
1273 series = self.gmsa_series(password_interval)
1274 self.set_db_time(samdb, series.start_of_interval(0))
1276 creds = self.gmsa_account(
1277 samdb=samdb, interval=password_interval, kerberos_enabled=False
1280 expired_time = series.start_of_interval(2)
1281 self.set_db_time(samdb, expired_time)
1283 # Calculate the password with which to authenticate.
1284 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1285 managed_pwd = self.expected_gmsa_password_blob(
1286 samdb,
1287 creds,
1288 current_series.interval_gkid(0),
1289 previous_gkid=current_series.interval_gkid(-1),
1290 query_expiration_gkid=current_series.interval_gkid(1),
1293 # Set the new password.
1294 self.assertIsNotNone(
1295 managed_pwd.passwords.current, "current password must be present"
1297 creds.set_utf16_password(managed_pwd.passwords.current)
1299 # Perform a gensec logon.
1300 session = self.gensec_ntlmssp_logon(creds, samdb)
1302 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1303 token = session.security_token
1304 token_sids = token.sids
1305 self.assertGreater(len(token_sids), 0)
1307 # Ensure that they match.
1308 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1310 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_valid(
1311 self,
1313 password_interval = 43
1315 samdb = self.get_local_samdb()
1316 series = self.gmsa_series(password_interval)
1317 self.set_db_time(samdb, series.start_of_interval(0))
1319 creds = self.gmsa_account(
1320 samdb=samdb, interval=password_interval, kerberos_enabled=False
1323 self.set_db_time(samdb, series.during_skew_window(0))
1325 # Calculate the password with which to authenticate.
1326 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1327 managed_pwd = self.expected_gmsa_password_blob(
1328 samdb,
1329 creds,
1330 current_series.interval_gkid(0),
1331 previous_gkid=current_series.interval_gkid(-1),
1332 query_expiration_gkid=current_series.interval_gkid(1),
1335 # Set the new password.
1336 self.assertIsNotNone(
1337 managed_pwd.passwords.current, "current password must be present"
1339 creds.set_utf16_password(managed_pwd.passwords.current)
1341 # Perform a gensec logon.
1342 session = self.gensec_ntlmssp_logon(creds, samdb)
1344 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1345 token = session.security_token
1346 token_sids = token.sids
1347 self.assertGreater(len(token_sids), 0)
1349 # Ensure that they match.
1350 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1352 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_expired(
1353 self,
1355 password_interval = 44
1357 samdb = self.get_local_samdb()
1358 series = self.gmsa_series(password_interval)
1359 self.set_db_time(samdb, series.start_of_interval(0))
1361 creds = self.gmsa_account(
1362 samdb=samdb, interval=password_interval, kerberos_enabled=False
1365 self.set_db_time(samdb, series.during_skew_window(1))
1367 # Calculate the password with which to authenticate.
1368 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1369 managed_pwd = self.expected_gmsa_password_blob(
1370 samdb,
1371 creds,
1372 current_series.interval_gkid(0),
1373 previous_gkid=current_series.interval_gkid(-1),
1374 query_expiration_gkid=current_series.interval_gkid(1),
1377 # Set the new password.
1378 self.assertIsNotNone(
1379 managed_pwd.passwords.current, "current password must be present"
1381 creds.set_utf16_password(managed_pwd.passwords.current)
1383 # Perform a gensec logon.
1384 session = self.gensec_ntlmssp_logon(creds, samdb)
1386 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1387 token = session.security_token
1388 token_sids = token.sids
1389 self.assertGreater(len(token_sids), 0)
1391 # Ensure that they match.
1392 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1394 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_next_key_is_expired(
1395 self,
1397 password_interval = 47
1399 samdb = self.get_local_samdb()
1400 series = self.gmsa_series(password_interval)
1401 self.set_db_time(samdb, series.start_of_interval(0))
1403 creds = self.gmsa_account(
1404 samdb=samdb, interval=password_interval, kerberos_enabled=False
1407 self.set_db_time(samdb, series.during_skew_window(2))
1409 # Calculate the password with which to authenticate.
1410 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1411 managed_pwd = self.expected_gmsa_password_blob(
1412 samdb,
1413 creds,
1414 current_series.interval_gkid(0),
1415 previous_gkid=current_series.interval_gkid(-1),
1416 query_expiration_gkid=current_series.interval_gkid(1),
1419 # Set the new password.
1420 self.assertIsNotNone(
1421 managed_pwd.passwords.current, "current password must be present"
1423 creds.set_utf16_password(managed_pwd.passwords.current)
1425 # Perform a gensec logon.
1426 session = self.gensec_ntlmssp_logon(creds, samdb)
1428 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1429 token = session.security_token
1430 token_sids = token.sids
1431 self.assertGreater(len(token_sids), 0)
1433 # Ensure that they match.
1434 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1436 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_within_five_minutes(
1437 self,
1439 password_interval = 123
1441 samdb = self.get_local_samdb()
1442 series = self.gmsa_series(password_interval)
1443 self.set_db_time(samdb, series.start_of_interval(0))
1445 creds = self.gmsa_account(
1446 samdb=samdb, interval=password_interval, kerberos_enabled=False
1449 # Set the time to within five minutes of the original password’s expiry,
1450 # and perform a gensec logon with the original password.
1451 expired_time = series.within_previous_password_valid_window(1)
1452 self.set_db_time(samdb, expired_time)
1454 # Perform a gensec logon.
1455 session = self.gensec_ntlmssp_logon(creds, samdb)
1457 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1458 token = session.security_token
1459 token_sids = token.sids
1460 self.assertGreater(len(token_sids), 0)
1462 # Ensure that they match.
1463 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1465 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_but_one_password_within_five_minutes(
1466 self,
1468 password_interval = 123
1470 samdb = self.get_local_samdb()
1471 series = self.gmsa_series(password_interval)
1472 self.set_db_time(samdb, series.start_of_interval(0))
1474 creds = self.gmsa_account(
1475 samdb=samdb, interval=password_interval, kerberos_enabled=False
1478 # Set the time to within five minutes of the *following* password’s expiry,
1479 # and perform a gensec logon with the original password.
1480 expired_time = series.within_previous_password_valid_window(2)
1481 self.set_db_time(samdb, expired_time)
1483 # Expect the gensec logon to fail.
1484 self.gensec_ntlmssp_logon(creds, samdb, expect_success=False)
1486 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_beyond_five_minutes(
1487 self,
1489 password_interval = 456
1491 samdb = self.get_local_samdb()
1492 series = self.gmsa_series(password_interval)
1493 self.set_db_time(samdb, series.start_of_interval(0))
1495 creds = self.gmsa_account(
1496 samdb=samdb, interval=password_interval, kerberos_enabled=False
1499 # Set the time to five minutes beyond the original password’s expiry,
1500 # and try to perform a gensec logon with the original password.
1501 expired_time = series.outside_previous_password_valid_window(1)
1502 self.set_db_time(samdb, expired_time)
1504 # Perform a gensec logon.
1505 session = self.gensec_ntlmssp_logon(creds, samdb)
1507 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1508 token = session.security_token
1509 token_sids = token.sids
1510 self.assertGreater(len(token_sids), 0)
1512 # Ensure that they match.
1513 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1515 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_password_five_minutes_apart(
1516 self,
1518 password_interval = 789
1520 samdb = self.get_local_samdb()
1521 series = self.gmsa_series(password_interval)
1522 self.set_db_time(samdb, series.start_of_interval(0))
1524 creds = self.gmsa_account(
1525 samdb=samdb, interval=password_interval, kerberos_enabled=False
1527 gmsa_sid = creds.get_sid()
1529 # Set the time to after the original password’s expiry, and perform a
1530 # gensec logon with the original password.
1531 db_time = series.during_interval(1)
1532 self.set_db_time(samdb, db_time)
1534 # Perform a gensec logon.
1535 session = self.gensec_ntlmssp_logon(creds, samdb)
1537 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1538 token = session.security_token
1539 token_sids = token.sids
1540 self.assertGreater(len(token_sids), 0)
1542 # Ensure that they match.
1543 self.assertEqual(security.dom_sid(gmsa_sid), token_sids[0])
1545 # Set the time to not quite five minutes later, and perform a gensec
1546 # logon with the original password.
1547 self.set_db_time(samdb, NtTime(db_time + MAX_CLOCK_SKEW - 1))
1549 # Perform a gensec logon.
1550 session = self.gensec_ntlmssp_logon(creds, samdb)
1552 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1553 token = session.security_token
1554 token_sids = token.sids
1555 self.assertGreater(len(token_sids), 0)
1557 # Ensure that they match.
1558 self.assertEqual(security.dom_sid(gmsa_sid), token_sids[0])
1560 # Now set the time to exactly five minutes later, and try to perform a
1561 # gensec logon with the original password.
1562 self.set_db_time(samdb, NtTime(db_time + MAX_CLOCK_SKEW))
1564 # Expect the gensec logon to fail.
1565 self.gensec_ntlmssp_logon(creds, samdb, expect_success=False)
1567 def test_gmsa_keys_when_previous_password_is_not_acceptable(self):
1568 self._check_gmsa_keys(within_valid_window=False, expect_previous_keys=False)
1570 def test_gmsa_keys_when_previous_password_is_acceptable(self):
1571 self._check_gmsa_keys(within_valid_window=True, expect_previous_keys=True)
1573 def _check_gmsa_keys(
1574 self, *, within_valid_window: bool, expect_previous_keys: bool
1576 password_interval = 77
1578 samdb = self.get_local_samdb()
1579 series = self.gmsa_series(password_interval)
1580 self.set_db_time(samdb, series.start_of_interval(0))
1582 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
1584 if within_valid_window:
1585 db_time = series.within_previous_password_valid_window(1)
1586 else:
1587 db_time = series.outside_previous_password_valid_window(1)
1588 self.set_db_time(samdb, db_time)
1590 gmsa_principal = f"{creds.get_username()}@{creds.get_realm()}"
1592 ktfile = os.path.join(self.tempdir, "test.keytab")
1593 self.addCleanup(self.rm_files, ktfile)
1595 net = Net(None, self.get_lp())
1596 net.export_keytab(
1597 keytab=ktfile,
1598 samdb=samdb,
1599 principal=gmsa_principal,
1600 only_current_keys=True,
1601 as_for_AS_REQ=True,
1603 self.assertTrue(os.path.exists(ktfile), "keytab was not created")
1605 with open(ktfile, "rb") as bytes_kt:
1606 keytab_bytes = bytes_kt.read()
1608 keytab_set = keytab_as_set(keytab_bytes)
1609 exported_etypes = {entry[1] for entry in keytab_set}
1611 # Ensure that the AES keys were exported.
1612 self.assertLessEqual(
1613 {kcrypto.Enctype.AES256, kcrypto.Enctype.AES128}, exported_etypes
1616 def fill_keytab(
1617 creds: KerberosCredentials,
1618 keytab: Set[Tuple[str, kcrypto.Enctype, int, bytes]],
1619 etypes: Iterable[kcrypto.Enctype],
1620 ) -> None:
1621 for etype in etypes:
1622 key = self.TicketDecryptionKey_from_creds(creds, etype=etype)
1623 kvno = 2
1624 entry = gmsa_principal, etype, kvno, key.key.contents
1626 self.assertNotIn(entry, keytab, "key already present in keytab")
1627 keytab.add(entry)
1629 expected_keytab = set()
1631 if expect_previous_keys:
1632 # Fill the expected keytab with the previous keys.
1633 fill_keytab(creds, expected_keytab, exported_etypes)
1635 # Calculate the new password.
1636 managed_pwd = self.expected_gmsa_password_blob(
1637 samdb,
1638 creds,
1639 series.interval_gkid(1),
1640 previous_gkid=series.interval_gkid(0),
1641 query_expiration_gkid=series.interval_gkid(2),
1644 # Set the new password.
1645 self.assertIsNotNone(
1646 managed_pwd.passwords.current, "current password must be present"
1648 creds.set_utf16_password(managed_pwd.passwords.current)
1650 # Clear the initial set of keys associated with this credentials object.
1651 creds.clear_forced_keys()
1652 # Add the current keys to the expected keytab.
1653 fill_keytab(creds, expected_keytab, exported_etypes)
1655 # Ensure the keytab is as we expect.
1656 self.assertEqual(expected_keytab, keytab_set)
1658 def test_gmsa_can_perform_netlogon(self):
1659 self._test_samlogon(
1660 self.gmsa_account(kerberos_enabled=False),
1661 netlogon.NetlogonNetworkInformation,
1662 validation_level=netlogon.NetlogonValidationSamInfo4,
1665 def test_computer_cannot_perform_interactive_logon(self):
1666 self._test_samlogon(
1667 self.get_mach_creds(),
1668 netlogon.NetlogonInteractiveInformation,
1669 expect_error=ntstatus.NT_STATUS_NO_SUCH_USER,
1670 validation_level=netlogon.NetlogonValidationSamInfo4,
1673 def test_gmsa_cannot_perform_interactive_logon(self):
1674 self._test_samlogon(
1675 self.gmsa_account(kerberos_enabled=False),
1676 netlogon.NetlogonInteractiveInformation,
1677 expect_error=ntstatus.NT_STATUS_NO_SUCH_USER,
1678 validation_level=netlogon.NetlogonValidationSamInfo4,
1681 def _gmsa_can_perform_as_req(self, *, enctype: kcrypto.Enctype) -> None:
1682 self._as_req(self.gmsa_account(), self.get_service_creds(), enctype)
1684 def test_gmsa_can_perform_as_req_with_aes256(self):
1685 self._gmsa_can_perform_as_req(enctype=kcrypto.Enctype.AES256)
1687 def test_gmsa_can_perform_as_req_with_rc4(self):
1688 self._gmsa_can_perform_as_req(enctype=kcrypto.Enctype.RC4)
1690 def _gmsa_can_authenticate_to_ldap(self, *, with_kerberos: bool) -> None:
1691 creds = self.gmsa_account(kerberos_enabled=with_kerberos)
1693 protocol = "ldap"
1695 # Authenticate to LDAP.
1696 samdb_user = SamDB(
1697 url=f"{protocol}://{self.dc_host}", credentials=creds, lp=self.get_lp()
1700 # Search for the user’s token groups.
1701 res = samdb_user.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
1702 self.assertEqual(1, len(res))
1704 token_groups = res[0].get("tokenGroups", idx=0)
1705 self.assertIsNotNone(token_groups)
1707 # Ensure that the token SID matches.
1708 token_sid = ndr_unpack(security.dom_sid, token_groups)
1709 self.assertEqual(security.dom_sid(creds.get_sid()), token_sid)
1711 def test_gmsa_can_authenticate_to_ldap_with_kerberos(self):
1712 self._gmsa_can_authenticate_to_ldap(with_kerberos=True)
1714 def test_gmsa_can_authenticate_to_ldap_without_kerberos(self):
1715 self._gmsa_can_authenticate_to_ldap(with_kerberos=False)
1717 def test_gmsa_can_perform_ServerAuthenticate3(self):
1718 creds = self.gmsa_account(kerberos_enabled=False)
1719 username = creds.get_username()
1720 dc_server = self.get_samdb().host_dns_name()
1722 # Use the gMSA’s credentials to create a netlogon connection. This call,
1723 # which internally performs a ServerAuthenticate3, is more than just
1724 # setup: it is the centrepiece of the test.
1725 c = netlogon.netlogon(
1726 f"ncacn_ip_tcp:{dc_server}[schannel,seal]", self.get_lp(), creds
1728 credential = netlogon.netr_Credential()
1729 credential.data = list(b"abcdefgh")
1730 server_credential = c.netr_ServerReqChallenge(None, username, credential)
1731 with self.assertRaises(NTSTATUSError) as err:
1732 # Try performing a ServerAuthenticate3 with our gMSA account. The
1733 # procedure for calculating a correct challenge is too complicated
1734 # to try to reimplement in Python, so we won’t even try. But the
1735 # fact that we get an ACCESS_DENIED error, rather than something
1736 # like NO_TRUST_SAM_ACCOUNT, shows that gMSAs are not prevented from
1737 # using ServerAuthenticate3 to authenticate.
1738 c.netr_ServerAuthenticate3(
1739 dc_server,
1740 username,
1741 misc.SEC_CHAN_WKSTA,
1742 username,
1743 server_credential,
1744 netlogon.NETLOGON_NEG_STRONG_KEYS | netlogon.NETLOGON_NEG_SUPPORTS_AES,
1747 self.assertEqual(ntstatus.NT_STATUS_ACCESS_DENIED, err.exception.args[0])
1749 def test_gmsa_cannot_be_locked_out_with_gensec_ntlmssp(self):
1750 def try_bad_creds(creds: Credentials, samdb: SamDB) -> None:
1751 self.gensec_ntlmssp_logon(creds, samdb, expect_success=False)
1753 self._check_gmsa_cannot_be_locked_out(
1754 try_bad_creds_fn=try_bad_creds, kerberos_enabled=False, local=True
1757 def test_gmsa_cannot_be_locked_out_with_ldap_authentication(self):
1758 def try_bad_creds(creds: Credentials, _samdb: SamDB) -> None:
1759 with self.assertRaises(ldb.LdbError) as err:
1760 SamDB(url=f"ldap://{self.dc_host}", credentials=creds, lp=self.get_lp())
1762 num, estr = err.exception.args
1764 self.assertEqual(ldb.ERR_INVALID_CREDENTIALS, num)
1765 self.assertIn("NT_STATUS_LOGON_FAILURE", estr)
1767 self._check_gmsa_cannot_be_locked_out(try_bad_creds_fn=try_bad_creds)
1769 def _check_gmsa_cannot_be_locked_out(
1770 self,
1772 try_bad_creds_fn: Callable[[Credentials, SamDB], None],
1773 kerberos_enabled: bool = True,
1774 local: bool = False,
1776 samdb = self.get_local_samdb() if local else self.get_samdb()
1777 base_dn = ldb.Dn(samdb, samdb.domain_dn())
1779 def modify_attr(attr, value):
1780 if value is None:
1781 value = []
1782 flag = ldb.FLAG_MOD_DELETE
1783 else:
1784 value = str(value)
1785 flag = ldb.FLAG_MOD_REPLACE
1787 msg = ldb.Message(base_dn)
1788 msg[attr] = ldb.MessageElement(value, flag, attr)
1789 samdb.modify(msg)
1791 res = samdb.search(base_dn, scope=ldb.SCOPE_BASE, attrs=["lockoutThreshold"])
1792 self.assertEqual(1, len(res))
1794 # Reset the lockout threshold as it was before.
1795 lockout_threshold = res[0].get("lockoutThreshold", idx=0)
1796 self.addCleanup(modify_attr, "lockoutThreshold", lockout_threshold)
1798 # Set the new lockout threshold.
1799 lockout_threshold = 3
1800 modify_attr("lockoutThreshold", lockout_threshold)
1802 creds = self.gmsa_account(kerberos_enabled=kerberos_enabled)
1803 dn = creds.get_dn()
1805 # Truncate the password to ensure that it is invalid.
1806 creds.set_password(creds.get_password()[:-1])
1808 prev_bad_pwd_time = 0
1810 for i in range(lockout_threshold + 1):
1811 try_bad_creds_fn(creds, samdb)
1813 # Ensure the account is not locked out.
1815 res = samdb.search(
1817 scope=ldb.SCOPE_BASE,
1818 attrs=[
1819 "badPasswordTime",
1820 "badPwdCount",
1821 "lockoutTime",
1822 "msDS-User-Account-Control-Computed",
1825 self.assertEqual(1, len(res))
1827 # Despite the bad password count having increased, …
1828 bad_pwd_count = int(res[0].get("badPwdCount", idx=0))
1829 self.assertEqual(i + 1, bad_pwd_count)
1831 # …the account should not be locked out.
1832 uac = int(res[0].get("msDS-User-Account-Control-Computed", idx=0))
1833 self.assertFalse(uac & dsdb.UF_LOCKOUT)
1835 # The bad password time should have increased.
1836 bad_pwd_time = int(res[0].get("badPasswordTime", idx=0))
1837 self.assertGreater(bad_pwd_time, prev_bad_pwd_time)
1839 prev_bad_pwd_time = bad_pwd_time
1841 # The lockout time should not be set.
1842 lockout_time = res[0].get("lockoutTime", idx=0)
1843 self.assertIsNone(lockout_time)
1845 def _server_set_password(self, creds: Credentials, password: str) -> None:
1846 dc_server = self.get_samdb().host_dns_name()
1847 lp = self.get_lp()
1849 conn = netlogon.netlogon(f"ncacn_ip_tcp:{dc_server}[schannel,seal]", lp, creds)
1851 auth = creds.new_client_authenticator()
1852 authenticator = netlogon.netr_Authenticator()
1853 authenticator.cred.data = list(auth["credential"])
1854 authenticator.timestamp = auth["timestamp"]
1856 DATA_LEN = 512
1858 encoded = password.encode("utf-16-le")
1859 pwd_len = len(encoded)
1860 filler = os.urandom(DATA_LEN - pwd_len)
1862 pwd = netlogon.netr_CryptPassword()
1863 pwd.length = pwd_len
1864 pwd.data = list(filler + encoded)
1865 creds.encrypt_netr_crypt_password(pwd)
1867 conn.netr_ServerPasswordSet2(
1868 dc_server,
1869 creds.get_username(),
1870 misc.SEC_CHAN_WKSTA,
1871 creds.get_workstation(),
1872 authenticator,
1873 pwd,
1876 def test_gmsa_can_authenticate_with_previous_password_and_ntlm(self):
1877 creds = self.gmsa_account(kerberos_enabled=False)
1878 dc_server = self.get_samdb().host_dns_name()
1879 lp = self.get_lp()
1881 # We can use NTLM to authenticate.
1882 srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
1884 PWD_LEN = 32
1886 # Change the password using ServerPasswordSet2. Windows does not prevent
1887 # this, despite the whole point of Group Managed Service Accounts being
1888 # that the password is managed by AD, and despite changing passwords
1889 # outside of that system not making much sense.
1890 password_1 = generate_random_password(PWD_LEN, PWD_LEN)
1891 self._server_set_password(creds, password_1)
1893 # As less than five minutes have passed, we can still authenticate with
1894 # our original password.
1895 srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
1897 # Change the password again.
1898 password_2 = generate_random_password(PWD_LEN, PWD_LEN)
1899 self._server_set_password(creds, password_2)
1901 # This time, NTLM authentication fails!
1902 with self.assertRaises(NTSTATUSError) as err:
1903 srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
1905 self.assertEqual(ntstatus.NT_STATUS_LOGON_FAILURE, err.exception.args[0])
1907 # But we can use the previous password to authenticate.
1908 creds.update_password(password_1)
1909 srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
1911 # And we can authenticate using the current password.
1912 creds.update_password(password_2)
1913 srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
1916 if __name__ == "__main__":
1917 import unittest
1919 unittest.main()