2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2024
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <https://www.gnu.org/licenses/>.
23 sys
.path
.insert(0, "bin/python")
24 os
.environ
["PYTHONUNBUFFERED"] = "1"
26 from typing
import Callable
, Iterable
, NewType
, Optional
, Set
, Tuple
, TypeVar
29 from itertools
import chain
36 generate_random_password
,
42 from samba
.dcerpc
import gkdi
, gmsa
, misc
, netlogon
, security
, srvsvc
43 from samba
.ndr
import ndr_pack
, ndr_unpack
44 from samba
.net
import Net
45 from samba
.nt_time
import (
46 nt_time_delta_from_timedelta
,
47 nt_time_from_datetime
,
50 timedelta_from_nt_time_delta
,
52 from samba
.samdb
import SamDB
53 from samba
.credentials
import Credentials
, DONT_USE_KERBEROS
54 from samba
.gkdi
import (
61 from samba
.tests
import connect_samdb
62 from samba
.tests
.dckeytab
import keytab_as_set
63 from samba
.tests
.krb5
import kcrypto
64 from samba
.tests
.gkdi
import GkdiBaseTest
, ROOT_KEY_START_TIME
65 from samba
.tests
.krb5
.kdc_base_test
import KDCBaseTest
66 from samba
.tests
.krb5
.raw_testcase
import KerberosCredentials
67 from samba
.tests
.krb5
.rfc4120_constants
import (
72 import samba
.tests
.krb5
.rfc4120_pyasn1
as krb5_asn1
74 GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL
= 30
76 Gmsa
= NewType("Gmsa", ldb
.Message
)
79 def gkdi_rollover_interval(managed_password_interval
: int) -> NtTimeDelta
:
80 rollover_interval
= NtTimeDelta(
81 managed_password_interval
* 24 // 10 * KEY_CYCLE_DURATION
83 if rollover_interval
== 0:
84 raise ValueError("rollover interval must not be zero")
85 return rollover_interval
90 rollover_interval
: NtTimeDelta
92 def __init__(self
, start_gkid
: Gkid
, rollover_interval
: NtTimeDelta
) -> None:
93 self
.start_time
= start_gkid
.start_nt_time()
94 self
.rollover_interval
= rollover_interval
96 def interval_gkid(self
, n
: int) -> Gkid
:
97 return Gkid
.from_nt_time(self
.start_of_interval(n
))
99 def start_of_interval(self
, n
: int) -> NtTime
:
100 if not isinstance(n
, int):
101 raise ValueError(f
"{n} must be an integer")
102 return NtTime(int(self
.start_time
+ n
* self
.rollover_interval
))
104 def during_interval(self
, n
: int) -> NtTime
:
105 return NtTime(int(self
.start_of_interval(n
) + self
.rollover_interval
// 2))
107 def during_skew_window(self
, n
: int) -> NtTime
:
108 two_minutes
= nt_time_delta_from_timedelta(datetime
.timedelta(minutes
=2))
110 int(self
.start_of_interval(n
) + self
.rollover_interval
- two_minutes
)
113 def outside_previous_password_valid_window(self
, n
: int) -> NtTime
:
114 return NtTime(self
.start_of_interval(n
) + MAX_CLOCK_SKEW
)
116 def within_previous_password_valid_window(self
, n
: int) -> NtTime
:
117 return NtTime(self
.outside_previous_password_valid_window(n
) - 1)
120 class GmsaTests(GkdiBaseTest
, KDCBaseTest
):
123 creds
: KerberosCredentials
,
124 target_creds
: KerberosCredentials
,
125 enctype
: kcrypto
.Enctype
,
127 preauth_key
= self
.PasswordKey_from_creds(creds
, enctype
)
129 def generate_padata_fn(
130 _kdc_exchange_dict
: dict, _callback_dict
: Optional
[dict], req_body
: dict
131 ) -> Tuple
[list, dict]:
134 patime
, pausec
= self
.get_KerberosTimeWithUsec()
135 enc_ts
= self
.PA_ENC_TS_ENC_create(patime
, pausec
)
136 enc_ts
= self
.der_encode(enc_ts
, asn1Spec
=krb5_asn1
.PA_ENC_TS_ENC())
138 enc_ts
= self
.EncryptedData_create(preauth_key
, KU_PA_ENC_TIMESTAMP
, enc_ts
)
139 enc_ts
= self
.der_encode(enc_ts
, asn1Spec
=krb5_asn1
.EncryptedData())
141 enc_ts
= self
.PA_DATA_create(PADATA_ENC_TIMESTAMP
, enc_ts
)
143 padata
.append(enc_ts
)
145 return padata
, req_body
147 user_name
= creds
.get_username()
148 cname
= self
.PrincipalName_create(
149 name_type
=NT_PRINCIPAL
, names
=user_name
.split("/")
152 target_name
= target_creds
.get_username()
153 target_realm
= target_creds
.get_realm()
155 sname
= self
.PrincipalName_create(
156 name_type
=NT_PRINCIPAL
, names
=["host", target_name
[:-1]]
159 check_error_fn
= None
160 check_rep_fn
= self
.generic_check_kdc_rep
162 expected_sname
= self
.PrincipalName_create(
163 name_type
=NT_PRINCIPAL
, names
=[target_name
]
166 kdc_options
= "forwardable,renewable,canonicalize,renewable-ok"
167 kdc_options
= krb5_asn1
.KDCOptions(kdc_options
)
169 ticket_decryption_key
= self
.TicketDecryptionKey_from_creds(target_creds
)
171 kdc_exchange_dict
= self
.as_exchange_dict(
173 expected_crealm
=creds
.get_realm(),
174 expected_cname
=cname
,
175 expected_srealm
=target_realm
,
176 expected_sname
=expected_sname
,
177 expected_supported_etypes
=target_creds
.tgs_supported_enctypes
,
178 ticket_decryption_key
=ticket_decryption_key
,
179 generate_padata_fn
=generate_padata_fn
,
180 check_error_fn
=check_error_fn
,
181 check_rep_fn
=check_rep_fn
,
182 check_kdc_private_fn
=self
.generic_check_kdc_private
,
183 expected_error_mode
=0,
184 expected_salt
=creds
.get_salt(),
185 preauth_key
=preauth_key
,
186 kdc_options
=str(kdc_options
),
189 till
= self
.get_KerberosTime(offset
=36000)
191 etypes
= kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.RC4
193 rep
= self
._generic
_kdc
_exchange
(
201 self
.check_as_reply(rep
)
203 return kdc_exchange_dict
206 def gkdi_get_key_start_time(self
, key_id
: gkdi
.KeyEnvelope
) -> NtTime
:
207 return Gkid
.from_key_envelope(key_id
).start_nt_time()
213 root_key_id
: Optional
[misc
.GUID
],
215 sid
: security
.dom_sid
,
217 group_key
= self
.get_key_exact(samdb
, target_sd
, root_key_id
, gkid
)
219 password
= self
.generate_gmsa_password(group_key
, sid
)
220 return self
.post_process_password_buffer(password
)
222 def get_password_based_on_gkid(
223 self
, samdb
: SamDB
, gkid
: Gkid
, sid
: security
.dom_sid
225 return self
.get_password(samdb
, self
.gmsa_sd
, None, gkid
, sid
)
227 def get_password_based_on_timestamp(
228 self
, samdb
: SamDB
, timestamp
: NtTime
, sid
: security
.dom_sid
230 return self
.get_password_based_on_gkid(samdb
, Gkid
.from_nt_time(timestamp
), sid
)
233 def get_password_based_on_key_id(
234 self
, samdb
: SamDB
, managed_password
: gkdi
.KeyEnvelope
, sid
: str
236 return self
.get_password(
239 managed_password
.root_key_id
,
240 Gkid
.from_key_envelope(managed_password
),
244 def generate_gmsa_password(self
, key
: GroupKey
, sid
: str) -> bytes
:
245 context
= ndr_pack(security
.dom_sid(sid
))
246 algorithm
= key
.hash_algorithm
.algorithm()
247 gmsa_password_len
= 256
253 label
="GMSA PASSWORD",
254 len_in_bytes
=gmsa_password_len
,
257 def post_process_password_buffer(self
, key
: bytes
) -> bytes
:
258 self
.assertEqual(0, len(key
) & 1, f
"length of key ({len(key)}) is not even")
260 def convert_null(t
: Tuple
[int, int]) -> Tuple
[int, int]:
268 def take_pairs(iterable
: Iterable
[T
]) -> Iterable
[Tuple
[T
, T
]]:
273 except StopIteration:
276 yield first
, next(it
)
278 return bytes(chain
.from_iterable(map(convert_null
, take_pairs(key
))))
280 def get_gmsa_object(self
, samdb
: SamDB
, dn
: ldb
.Dn
) -> Gmsa
:
283 scope
=ldb
.SCOPE_BASE
,
285 "msDS-ManagedPasswordInterval",
286 "msDS-ManagedPasswordId",
287 "msDS-ManagedPasswordPreviousId",
293 def gmsa_rollover_interval(self
, gmsa_object
: Gmsa
) -> NtTimeDelta
:
294 managed_password_interval
= gmsa_object
.get(
295 "msDS-ManagedPasswordInterval", idx
=0
297 if managed_password_interval
is None:
298 managed_password_interval
= GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL
300 managed_password_interval
= int(managed_password_interval
)
302 return gkdi_rollover_interval(managed_password_interval
)
304 def gmsa_creation_nt_time(self
, gmsa_object
: Gmsa
) -> NtTime
:
305 creation_time
: Optional
[bytes
] = gmsa_object
.get("whenCreated", idx
=0)
306 self
.assertIsNotNone(creation_time
)
307 assert creation_time
is not None # to help the type checker
309 create_time
= datetime
.datetime
.fromtimestamp(
310 ldb
.string_to_time(creation_time
.decode()), tz
=datetime
.timezone
.utc
312 return nt_time_from_datetime(create_time
)
314 def gmsa_series(self
, managed_password_interval
: int) -> GmsaSeries
:
316 self
.future_gkid(), gkdi_rollover_interval(managed_password_interval
)
319 def gmsa_series_for_account(
320 self
, samdb
: SamDB
, creds
: KerberosCredentials
, managed_password_interval
: int
322 gmsa_object
= self
.get_gmsa_object(samdb
, creds
.get_dn())
323 current_nt_time
= self
.current_nt_time(samdb
)
324 gkid
= Gkid
.from_nt_time(
325 self
.account_quantized_time(gmsa_object
, current_nt_time
)
327 return GmsaSeries(gkid
, gkdi_rollover_interval(managed_password_interval
))
330 self
, key_start_time
: NtTime
, time
: NtTime
, gkdi_rollover_interval
: NtTimeDelta
332 self
.assertLessEqual(key_start_time
, time
)
334 time_since_key_start
= NtTimeDelta(time
- key_start_time
)
335 quantized_time_since_key_start
= NtTimeDelta(
336 time_since_key_start
// gkdi_rollover_interval
* gkdi_rollover_interval
338 return NtTime(key_start_time
+ quantized_time_since_key_start
)
340 def account_quantized_time(self
, gmsa_object
: Gmsa
, time
: NtTime
) -> NtTime
:
341 pwd_id_blob
= gmsa_object
.get("msDS-ManagedPasswordId", idx
=0)
342 self
.assertIsNotNone(pwd_id_blob
, "SAM should have initialized password ID")
344 pwd_id
= ndr_unpack(gkdi
.KeyEnvelope
, pwd_id_blob
)
345 key_start_time
= Gkid
.from_key_envelope(pwd_id
).start_nt_time()
347 gkdi_rollover_interval
= self
.gmsa_rollover_interval(gmsa_object
)
348 return self
.quantized_time(key_start_time
, time
, gkdi_rollover_interval
)
350 def expected_gmsa_password_blob(
353 creds
: KerberosCredentials
,
356 query_expiration_gkid
: Gkid
,
357 previous_gkid
: Optional
[Gkid
] = None,
358 return_future_key
: bool = False,
359 ) -> gmsa
.MANAGEDPASSWORD_BLOB
:
360 new_password
= self
.get_password_based_on_gkid(samdb
, gkid
, creds
.get_sid())
362 if previous_gkid
is not None:
363 old_password
= self
.get_password_based_on_gkid(
364 samdb
, previous_gkid
, creds
.get_sid()
367 current_time
= self
.current_nt_time(samdb
)
369 gmsa_object
= self
.get_gmsa_object(samdb
, creds
.get_dn())
370 gkdi_rollover_interval
= self
.gmsa_rollover_interval(gmsa_object
)
372 query_expiration_time
= query_expiration_gkid
.start_nt_time()
373 query_password_interval
= NtTimeDelta(query_expiration_time
- current_time
)
374 unchanged_password_interval
= NtTimeDelta(
377 query_expiration_time
378 + (gkdi_rollover_interval
if return_future_key
else 0)
384 return self
.marshal_password(
387 query_password_interval
,
388 unchanged_password_interval
,
391 def expected_current_gmsa_password_blob(
394 creds
: KerberosCredentials
,
396 future_key_is_acceptable
: bool,
397 ) -> gmsa
.MANAGEDPASSWORD_BLOB
:
398 gmsa_object
= self
.get_gmsa_object(samdb
, creds
.get_dn())
400 gkdi_rollover_interval
= self
.gmsa_rollover_interval(gmsa_object
)
402 pwd_id_blob
= gmsa_object
.get("msDS-ManagedPasswordId", idx
=0)
403 self
.assertIsNotNone(pwd_id_blob
, "SAM should have initialized password ID")
405 pwd_id
= ndr_unpack(gkdi
.KeyEnvelope
, pwd_id_blob
)
406 key_start_time
= Gkid
.from_key_envelope(pwd_id
).start_nt_time()
408 current_time
= self
.current_nt_time(samdb
)
410 new_key_start_time
= self
.quantized_time(
411 key_start_time
, current_time
, gkdi_rollover_interval
413 new_key_expiration_time
= NtTime(new_key_start_time
+ gkdi_rollover_interval
)
415 account_sid
= creds
.get_sid()
417 within_clock_skew_window
= (
418 new_key_expiration_time
- current_time
<= MAX_CLOCK_SKEW
420 return_future_key
= future_key_is_acceptable
and within_clock_skew_window
421 if return_future_key
:
422 new_password
= self
.get_password_based_on_timestamp(
423 samdb
, new_key_expiration_time
, account_sid
425 old_password
= self
.get_password_based_on_timestamp(
426 samdb
, new_key_start_time
, account_sid
429 new_password
= self
.get_password_based_on_timestamp(
430 samdb
, new_key_start_time
, account_sid
433 account_age
= NtTimeDelta(
434 current_time
- self
.gmsa_creation_nt_time(gmsa_object
)
436 if account_age
>= gkdi_rollover_interval
:
437 old_password
= self
.get_password_based_on_timestamp(
439 NtTime(new_key_start_time
- gkdi_rollover_interval
),
443 # The account is not old enough to have a previous password.
446 key_expiration_time
= NtTime(key_start_time
+ gkdi_rollover_interval
)
447 key_is_expired
= key_expiration_time
<= current_time
449 query_expiration_time
= NtTime(
450 new_key_expiration_time
if key_is_expired
else key_expiration_time
452 query_password_interval
= NtTimeDelta(query_expiration_time
- current_time
)
453 unchanged_password_interval
= NtTimeDelta(
456 query_expiration_time
457 + (gkdi_rollover_interval
if return_future_key
else 0)
463 return self
.marshal_password(
466 query_password_interval
,
467 unchanged_password_interval
,
470 def marshal_password(
472 current_password
: bytes
,
473 previous_password
: Optional
[bytes
],
474 query_password_interval
: NtTimeDelta
,
475 unchanged_password_interval
: NtTimeDelta
,
476 ) -> gmsa
.MANAGEDPASSWORD_BLOB
:
477 managed_password
= gmsa
.MANAGEDPASSWORD_BLOB()
479 managed_password
.passwords
.current
= current_password
480 managed_password
.passwords
.previous
= previous_password
481 managed_password
.passwords
.query_interval
= query_password_interval
482 managed_password
.passwords
.unchanged_interval
= unchanged_password_interval
484 return managed_password
489 samdb
: Optional
[SamDB
] = None,
491 msa_membership
: Optional
[str] = None,
493 ) -> KerberosCredentials
:
494 if msa_membership
is None:
495 allow_world_sddl
= "O:SYD:(A;;RP;;;WD)"
496 msa_membership
= allow_world_sddl
498 msa_membership_sd
= ndr_pack(
499 security
.descriptor
.from_sddl(msa_membership
, security
.dom_sid())
503 creds
= self
.get_cached_creds(
505 account_type
=self
.AccountType
.GROUP_MANAGED_SERVICE
,
507 "additional_details": self
.freeze(
509 "msDS-GroupMSAMembership": msa_membership_sd
,
510 "msDS-ManagedPasswordInterval": str(interval
),
515 # Ensure the gMSA is a brand‐new account.
518 except ldb
.LdbError
as err
:
519 if err
.args
[0] == ldb
.ERR_UNWILLING_TO_PERFORM
:
521 "If you’re running these tests against Windows, try “warming up”"
522 " the GKDI service by running `samba.tests.krb5.gkdi_tests` first."
527 # Derive the account’s current password. The account is too new to have a previous password yet.
528 managed_pwd
= self
.expected_current_gmsa_password_blob(
529 self
.get_samdb() if samdb
is None else samdb
,
531 future_key_is_acceptable
=False,
535 self
.assertIsNotNone(
536 managed_pwd
.passwords
.current
, "current password must be present"
538 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
542 def get_local_samdb(self
) -> SamDB
:
543 """Return a connection to the local database."""
546 samdb
= connect_samdb(
547 samdb_url
=lp
.samdb_url(), lp
=lp
, credentials
=self
.get_admin_creds()
549 self
.assertLocalSamDB(samdb
)
553 # Perform a gensec logon using NTLMSSP. As samdb is passed in as a
554 # parameter, it can have a time set on it with set_db_time().
555 def gensec_ntlmssp_logon(
556 self
, client_creds
: Credentials
, samdb
: SamDB
, expect_success
: bool = True
557 ) -> "Optional[auth.session_info]":
559 lp
.set("server role", "active directory domain controller")
561 settings
= {"lp_ctx": lp
, "target_hostname": lp
.get("netbios name")}
563 gensec_client
= gensec
.Security
.start_client(settings
)
564 # Ensure that we don’t use Kerberos.
565 self
.assertEqual(DONT_USE_KERBEROS
, client_creds
.get_kerberos_state())
566 gensec_client
.set_credentials(client_creds
)
567 gensec_client
.want_feature(gensec
.FEATURE_SEAL
)
568 gensec_client
.start_mech_by_name("ntlmssp")
570 auth_context
= auth
.AuthContext(lp_ctx
=lp
, ldb
=samdb
)
572 gensec_server
= gensec
.Security
.start_server(settings
, auth_context
)
573 machine_creds
= Credentials()
574 machine_creds
.guess(lp
)
575 machine_creds
.set_machine_account(lp
)
576 gensec_server
.set_credentials(machine_creds
)
578 gensec_server
.start_mech_by_name("ntlmssp")
580 client_finished
= False
581 server_finished
= False
582 client_to_server
= b
""
583 server_to_client
= b
""
585 # Operate as both the client and the server to verify the user’s credentials.
586 while not client_finished
or not server_finished
:
587 if not client_finished
:
588 client_finished
, client_to_server
= gensec_client
.update(
591 if not server_finished
:
593 server_finished
, server_to_client
= gensec_server
.update(
596 except NTSTATUSError
as err
:
597 self
.assertFalse(expect_success
, "got an unexpected error")
599 self
.assertEqual(ntstatus
.NT_STATUS_WRONG_PASSWORD
, err
.args
[0])
602 self
.assertTrue(expect_success
, "expected to get an error")
604 # Retrieve the SIDs from the security token.
605 return gensec_server
.session_info()
607 def check_nt_interval(
609 expected_nt_interval
: NtTimeDelta
,
610 nt_interval
: NtTimeDelta
,
613 """Check that the intervals match to within thirty seconds or so."""
615 threshold
= datetime
.timedelta(seconds
=30)
617 interval
= timedelta_from_nt_time_delta(nt_interval
)
618 expected_interval
= timedelta_from_nt_time_delta(expected_nt_interval
)
619 interval_difference
= abs(interval
- expected_interval
)
623 f
"{interval_name} ({interval}) is out by {interval_difference} from"
624 f
" expected ({expected_interval})",
627 def check_managed_pwd_intervals(
629 expected_managed_pwd
: gmsa
.MANAGEDPASSWORD_BLOB
,
630 managed_pwd
: gmsa
.MANAGEDPASSWORD_BLOB
,
632 expected_passwords
= expected_managed_pwd
.passwords
633 passwords
= managed_pwd
.passwords
635 self
.check_nt_interval(
636 expected_passwords
.query_interval
,
637 passwords
.query_interval
,
640 self
.check_nt_interval(
641 expected_passwords
.unchanged_interval
,
642 passwords
.unchanged_interval
,
643 "unchanged interval",
646 def check_managed_pwd(
649 creds
: KerberosCredentials
,
651 expected_managed_pwd
: gmsa
.MANAGEDPASSWORD_BLOB
,
654 creds
.get_dn(), scope
=ldb
.SCOPE_BASE
, attrs
=["msDS-ManagedPassword"]
656 self
.assertEqual(1, len(res
), "gMSA not found")
657 managed_password
= res
[0].get("msDS-ManagedPassword", idx
=0)
659 self
.assertIsNotNone(managed_password
)
660 managed_pwd
= ndr_unpack(gmsa
.MANAGEDPASSWORD_BLOB
, managed_password
)
662 self
.assertEqual(1, managed_pwd
.version
)
663 self
.assertEqual(0, managed_pwd
.reserved
)
664 self
.assertEqual(len(managed_password
), managed_pwd
.length
)
666 self
.assertIsNotNone(expected_managed_pwd
.passwords
.current
)
669 managed_pwd
.passwords
.current
, expected_managed_pwd
.passwords
.current
672 managed_pwd
.passwords
.previous
, expected_managed_pwd
.passwords
.previous
675 self
.check_managed_pwd_intervals(expected_managed_pwd
, managed_pwd
)
677 # When creating a gMSA, Windows seems to pick the root key with the
678 # greatest msKds-CreateTime having msKds-UseStartTime ≤ ten hours ago.
679 # Bear in mind that it seems also to cache the key, so it won’t always
680 # use the latest one.
682 def get_managed_service_accounts_dn(self
) -> ldb
.Dn
:
683 samdb
= self
.get_samdb()
684 return samdb
.get_wellknown_dn(
685 samdb
.get_default_basedn(), dsdb
.DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
688 def check_managed_password_access(
692 samdb
: Optional
[SamDB
] = None,
693 expect_access
: bool = False,
694 expected_werror
: int = werror
.WERR_SUCCESS
,
697 samdb
= self
.get_samdb()
699 self
.assertFalse(expect_access
)
700 managed_service_accounts_dn
= self
.get_managed_service_accounts_dn()
701 username
= creds
.get_username()
703 # Try base, subtree, and one‐level searches.
705 (creds
.get_dn(), ldb
.SCOPE_BASE
),
706 (managed_service_accounts_dn
, ldb
.SCOPE_SUBTREE
),
707 (managed_service_accounts_dn
, ldb
.SCOPE_ONELEVEL
),
710 for dn
, scope
in searches
:
711 # Perform a search and see whether we’re allowed to view the managed password.
717 expression
=f
"sAMAccountName={username}",
718 attrs
=["msDS-ManagedPassword"],
720 except ldb
.LdbError
as err
:
721 self
.assertTrue(expected_werror
, "got an unexpected error")
724 if num
!= ldb
.ERR_OPERATIONS_ERROR
:
727 self
.assertIn(f
"{expected_werror:08X}", estr
)
730 self
.assertFalse(expected_werror
, "expected to get an error")
731 self
.assertEqual(1, len(res
), "should always find the gMSA")
733 managed_password
= res
[0].get("msDS-ManagedPassword", idx
=0)
735 self
.assertIsNotNone(
736 managed_password
, "should be allowed to view the password"
740 managed_password
, "should not be allowed to view the password"
743 def test_retrieved_password_allowed(self
):
744 """Test being allowed to view the managed password."""
745 self
.check_managed_password_access(self
.gmsa_account(), expect_access
=True)
747 def test_retrieved_password_denied(self
):
748 """Test not being allowed to view the managed password."""
749 deny_world_sddl
= "O:SYD:(D;;RP;;;WD)"
750 self
.check_managed_password_access(
751 self
.gmsa_account(msa_membership
=deny_world_sddl
), expect_access
=False
754 def test_retrieving_password_over_sealed_connection(self
):
757 f
"ldap://{self.dc_host}",
758 credentials
=self
.get_admin_creds(),
759 session_info
=auth
.system_session(lp
),
763 self
.check_managed_password_access(
764 self
.gmsa_account(), samdb
=samdb
, expect_access
=True
767 def test_retrieving_password_over_unsealed_connection(self
):
768 # Requires --use-kerberos=required, or it automatically upgrades to an
769 # encrypted connection.
771 # Remove FEATURE_SEAL which gets added by insta_creds.
772 creds
= self
.insta_creds(template
=self
.get_admin_creds())
773 creds
.set_gensec_features(creds
.get_gensec_features() & ~gensec
.FEATURE_SEAL
)
777 sasl_wrap
= lp
.get("client ldap sasl wrapping")
778 self
.addCleanup(lp
.set, "client ldap sasl wrapping", sasl_wrap
)
779 lp
.set("client ldap sasl wrapping", "sign")
781 # Create a second ldb connection without seal.
783 f
"ldap://{self.dc_host}",
785 session_info
=auth
.system_session(lp
),
789 self
.check_managed_password_access(
792 expected_werror
=werror
.WERR_DS_CONFIDENTIALITY_REQUIRED
,
795 def test_retrieving_denied_password_over_unsealed_connection(self
):
796 # Requires --use-kerberos=required, or it automatically upgrades to an
797 # encrypted connection.
799 # Remove FEATURE_SEAL which gets added by insta_creds.
800 creds
= self
.insta_creds(template
=self
.get_admin_creds())
801 creds
.set_gensec_features(creds
.get_gensec_features() & ~gensec
.FEATURE_SEAL
)
805 sasl_wrap
= lp
.get("client ldap sasl wrapping")
806 self
.addCleanup(lp
.set, "client ldap sasl wrapping", sasl_wrap
)
807 lp
.set("client ldap sasl wrapping", "sign")
809 # Create a second ldb connection without seal.
811 f
"ldap://{self.dc_host}",
813 session_info
=auth
.system_session(lp
),
817 # Deny anyone from being able to view the password.
818 deny_world_sddl
= "O:SYD:(D;;RP;;;WD)"
819 self
.check_managed_password_access(
820 self
.gmsa_account(msa_membership
=deny_world_sddl
),
822 expected_werror
=werror
.WERR_DS_CONFIDENTIALITY_REQUIRED
,
825 def test_retrieving_password_after_encrypted_simple_bind(self
):
826 """Test retrieving the managed password using a simple bind with encryption."""
827 admin_sid
= self
.get_samdb().get_admin_sid()
829 creds
= self
.insta_creds(template
=self
.get_admin_creds())
830 creds
.set_bind_dn(admin_sid
)
832 url
=f
"ldaps://{self.dc_host}", credentials
=creds
, lp
=self
.get_lp()
835 self
.check_managed_password_access(
836 self
.gmsa_account(), samdb
=samdb
, expect_access
=True
839 def test_retrieving_password_after_unencrypted_simple_bind(self
):
840 """Test retrieving the managed password using a simple bind without encryption."""
841 admin_sid
= self
.get_samdb().get_admin_sid()
843 creds
= self
.insta_creds(template
=self
.get_admin_creds())
844 creds
.set_bind_dn(admin_sid
)
847 url
=f
"ldap://{self.dc_host}", credentials
=creds
, lp
=self
.get_lp()
850 self
.fail("failed to perform simple bind")
852 self
.check_managed_password_access(
855 expected_werror
=werror
.WERR_DS_CONFIDENTIALITY_REQUIRED
,
858 def future_gkid(self
) -> Gkid
:
859 """Return (6333, 26, 5)—an arbitrary GKID far enough in the future that
860 it’s situated beyond any reasonable rollover period. But not so far in
861 the future that Python’s datetime library will throw OverflowErrors."""
862 future_date
= datetime
.datetime(9000, 1, 1, 0, 0, tzinfo
=datetime
.timezone
.utc
)
863 return Gkid
.from_nt_time(nt_time_from_datetime(future_date
))
865 def future_time(self
) -> NtTime
:
866 """Return an arbitrary time far enough in the future that it’s situated
867 beyond any reasonable rollover period. But not so far in the future that
868 Python’s datetime library will throw OverflowErrors."""
869 return self
.future_gkid().start_nt_time()
871 def test_retrieved_password(self
):
872 """Test that we can retrieve the correct password for a gMSA."""
874 samdb
= self
.get_samdb()
875 creds
= self
.gmsa_account()
877 expected
= self
.expected_current_gmsa_password_blob(
880 future_key_is_acceptable
=True,
882 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
884 def test_retrieved_password_when_current_key_is_valid(self
):
885 """Test that we can retrieve the correct password for a gMSA at a time
886 when we are sure it is valid."""
887 password_interval
= 37
889 samdb
= self
.get_local_samdb()
890 series
= self
.gmsa_series(password_interval
)
891 self
.set_db_time(samdb
, series
.start_of_interval(0))
893 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
895 # Check the managed password of the account the moment it has been
897 expected
= self
.expected_gmsa_password_blob(
900 series
.interval_gkid(0),
901 previous_gkid
=series
.interval_gkid(-1),
902 query_expiration_gkid
=series
.interval_gkid(1),
904 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
906 def test_retrieved_password_when_current_key_is_expired(self
):
907 """Test that we can retrieve the correct password for a gMSA when the
908 original password has expired."""
909 password_interval
= 14
911 samdb
= self
.get_local_samdb()
912 series
= self
.gmsa_series(password_interval
)
913 self
.set_db_time(samdb
, series
.start_of_interval(0))
915 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
917 # Set the time to the moment the original password has expired, and
918 # check that the managed password is correct.
919 expired_time
= series
.start_of_interval(1)
920 self
.set_db_time(samdb
, expired_time
)
921 expected
= self
.expected_gmsa_password_blob(
924 series
.interval_gkid(1),
925 previous_gkid
=series
.interval_gkid(0),
926 query_expiration_gkid
=series
.interval_gkid(2),
928 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
930 def test_retrieved_password_when_next_key_is_expired(self
):
931 password_interval
= 1
933 samdb
= self
.get_local_samdb()
934 series
= self
.gmsa_series(password_interval
)
935 self
.set_db_time(samdb
, series
.start_of_interval(0))
937 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
939 expired_time
= series
.start_of_interval(2)
940 self
.set_db_time(samdb
, expired_time
)
942 expected
= self
.expected_gmsa_password_blob(
945 series
.interval_gkid(2),
946 previous_gkid
=series
.interval_gkid(1),
947 query_expiration_gkid
=series
.interval_gkid(3),
949 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
951 def test_retrieved_password_during_clock_skew_window_when_current_key_is_valid(
954 password_interval
= 60
956 samdb
= self
.get_local_samdb()
957 series
= self
.gmsa_series(password_interval
)
958 self
.set_db_time(samdb
, series
.start_of_interval(0))
960 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
962 self
.set_db_time(samdb
, series
.during_skew_window(0))
964 expected
= self
.expected_gmsa_password_blob(
967 series
.interval_gkid(1),
968 previous_gkid
=series
.interval_gkid(0),
969 query_expiration_gkid
=series
.interval_gkid(1),
970 return_future_key
=True,
972 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
974 def test_retrieved_password_during_clock_skew_window_when_current_key_is_expired(
977 password_interval
= 100
979 samdb
= self
.get_local_samdb()
980 series
= self
.gmsa_series(password_interval
)
981 self
.set_db_time(samdb
, series
.start_of_interval(0))
983 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
985 self
.set_db_time(samdb
, series
.during_skew_window(1))
987 expected
= self
.expected_gmsa_password_blob(
990 series
.interval_gkid(2),
991 previous_gkid
=series
.interval_gkid(1),
992 query_expiration_gkid
=series
.interval_gkid(2),
993 return_future_key
=True,
995 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
997 def test_retrieved_password_during_clock_skew_window_when_next_key_is_expired(
1000 password_interval
= 16
1002 samdb
= self
.get_local_samdb()
1003 series
= self
.gmsa_series(password_interval
)
1004 self
.set_db_time(samdb
, series
.start_of_interval(0))
1006 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
1008 self
.set_db_time(samdb
, series
.during_skew_window(2))
1010 expected
= self
.expected_gmsa_password_blob(
1013 series
.interval_gkid(3),
1014 previous_gkid
=series
.interval_gkid(2),
1015 query_expiration_gkid
=series
.interval_gkid(3),
1016 return_future_key
=True,
1018 self
.check_managed_pwd(samdb
, creds
, expected_managed_pwd
=expected
)
1020 def test_retrieving_managed_password_triggers_keys_update(self
):
1021 # Create a root key with a start time early enough to be usable at the
1022 # time the gMSA is purported to be created.
1023 samdb
= self
.get_samdb()
1024 domain_dn
= self
.get_server_dn(samdb
)
1025 self
.create_root_key(samdb
, domain_dn
, use_start_time
=ROOT_KEY_START_TIME
)
1027 password_interval
= 16
1029 local_samdb
= self
.get_local_samdb()
1030 series
= GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval
))
1031 self
.set_db_time(local_samdb
, series
.start_of_interval(0))
1033 creds
= self
.gmsa_account(samdb
=local_samdb
, interval
=password_interval
)
1036 self
.set_db_time(local_samdb
, None)
1038 # Search the local database for the account’s keys.
1039 res
= local_samdb
.search(
1040 dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["unicodePwd", "supplementalCredentials"]
1042 self
.assertEqual(1, len(res
))
1044 previous_nt_hash
= res
[0].get("unicodePwd", idx
=0)
1045 previous_supplemental_creds
= self
.unpack_supplemental_credentials(
1046 res
[0].get("supplementalCredentials", idx
=0)
1049 # Check that the NT hash is the value we expect.
1050 self
.assertEqual(creds
.get_nt_hash(), previous_nt_hash
)
1052 # Search for the managed password over LDAP, triggering an update of the
1053 # keys in the database.
1054 res
= samdb
.search(dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["msDS-ManagedPassword"])
1055 self
.assertEqual(1, len(res
))
1057 # Verify that the password is present in the result.
1058 managed_password
= res
[0].get("msDS-ManagedPassword", idx
=0)
1059 self
.assertIsNotNone(managed_password
, "should be allowed to view the password")
1061 # Search the local database again for the account’s keys, which should
1062 # have been updated.
1063 res
= local_samdb
.search(
1064 dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["unicodePwd", "supplementalCredentials"]
1066 self
.assertEqual(1, len(res
))
1068 nt_hash
= res
[0].get("unicodePwd", idx
=0)
1069 supplemental_creds
= self
.unpack_supplemental_credentials(
1070 res
[0].get("supplementalCredentials", idx
=0)
1073 self
.assertNotEqual(
1074 previous_nt_hash
, nt_hash
, "NT hash has not been updated (yet)"
1076 self
.assertNotEqual(
1077 previous_supplemental_creds
,
1079 "supplementalCredentials has not been updated (yet)",
1082 # Calculate the password with which to authenticate.
1083 current_series
= self
.gmsa_series_for_account(
1084 local_samdb
, creds
, password_interval
1086 managed_pwd
= self
.expected_gmsa_password_blob(
1089 current_series
.interval_gkid(0),
1090 query_expiration_gkid
=current_series
.interval_gkid(1),
1093 # Set the new password.
1094 self
.assertIsNotNone(
1095 managed_pwd
.passwords
.current
, "current password must be present"
1097 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1099 # Check that the new NT hash is the value we expect.
1100 self
.assertEqual(creds
.get_nt_hash(), nt_hash
)
1102 def test_authentication_triggers_keys_update(self
):
1103 # Create a root key with a start time early enough to be usable at the
1104 # time the gMSA is purported to be created. But don’t create it on a
1105 # local samdb with a specifically set time, because (if the key isn’t
1106 # deleted later) we could end up with multiple keys with identical
1107 # creation and start times, and tests failing when the test and the
1108 # server don’t agree on which root key to use at a specific time.
1109 samdb
= self
.get_samdb()
1110 domain_dn
= self
.get_server_dn(samdb
)
1111 self
.create_root_key(samdb
, domain_dn
, use_start_time
=ROOT_KEY_START_TIME
)
1113 password_interval
= 16
1115 local_samdb
= self
.get_local_samdb()
1116 series
= GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval
))
1117 self
.set_db_time(local_samdb
, series
.start_of_interval(0))
1119 creds
= self
.gmsa_account(samdb
=local_samdb
, interval
=password_interval
)
1122 self
.set_db_time(local_samdb
, None)
1124 # Search the local database for the account’s keys.
1125 res
= local_samdb
.search(
1126 dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["unicodePwd", "supplementalCredentials"]
1128 self
.assertEqual(1, len(res
))
1130 previous_nt_hash
= res
[0].get("unicodePwd", idx
=0)
1131 previous_supplemental_creds
= self
.unpack_supplemental_credentials(
1132 res
[0].get("supplementalCredentials", idx
=0)
1135 # Check that the NT hash is the value we expect.
1136 self
.assertEqual(creds
.get_nt_hash(), previous_nt_hash
)
1138 # Calculate the password with which to authenticate.
1139 current_series
= self
.gmsa_series_for_account(
1140 local_samdb
, creds
, password_interval
1142 managed_pwd
= self
.expected_gmsa_password_blob(
1145 current_series
.interval_gkid(0),
1146 query_expiration_gkid
=current_series
.interval_gkid(1),
1149 # Set the new password.
1150 self
.assertIsNotNone(
1151 managed_pwd
.passwords
.current
, "current password must be present"
1153 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1155 # Perform an authentication using the new password. The KDC should
1156 # recognize that the keys in the database are out of date and update
1158 self
._as
_req
(creds
, self
.get_service_creds(), kcrypto
.Enctype
.AES256
)
1160 # Search the local database again for the account’s keys, which should
1161 # have been updated.
1162 res
= local_samdb
.search(
1163 dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["unicodePwd", "supplementalCredentials"]
1165 self
.assertEqual(1, len(res
))
1167 nt_hash
= res
[0].get("unicodePwd", idx
=0)
1168 supplemental_creds
= self
.unpack_supplemental_credentials(
1169 res
[0].get("supplementalCredentials", idx
=0)
1172 self
.assertNotEqual(
1173 previous_nt_hash
, nt_hash
, "NT hash has not been updated (yet)"
1175 self
.assertNotEqual(
1176 previous_supplemental_creds
,
1178 "supplementalCredentials has not been updated (yet)",
1181 # Check that the new NT hash is the value we expect.
1182 self
.assertEqual(creds
.get_nt_hash(), nt_hash
)
1184 def test_gmsa_can_perform_gensec_ntlmssp_logon(self
):
1185 creds
= self
.gmsa_account(kerberos_enabled
=False)
1187 # Perform a gensec logon.
1188 session
= self
.gensec_ntlmssp_logon(creds
, self
.get_local_samdb())
1190 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1191 token
= session
.security_token
1192 token_sids
= token
.sids
1193 self
.assertGreater(len(token_sids
), 0)
1195 # Ensure that they match.
1196 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1198 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_valid(self
):
1199 """Test that we can perform a gensec logon at a time when we are sure
1200 the current gMSA password is valid."""
1202 password_interval
= 18
1204 samdb
= self
.get_local_samdb()
1205 series
= self
.gmsa_series(password_interval
)
1206 self
.set_db_time(samdb
, series
.start_of_interval(0))
1208 creds
= self
.gmsa_account(
1209 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1212 # Perform a gensec logon.
1213 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1215 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1216 token
= session
.security_token
1217 token_sids
= token
.sids
1218 self
.assertGreater(len(token_sids
), 0)
1220 # Ensure that they match.
1221 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1223 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_expired(self
):
1224 """Test that we can perform a gensec logon using NTLMSSP at a time when
1225 the current gMSA password has expired."""
1227 password_interval
= 40
1229 samdb
= self
.get_local_samdb()
1230 series
= self
.gmsa_series(password_interval
)
1231 self
.set_db_time(samdb
, series
.start_of_interval(0))
1233 creds
= self
.gmsa_account(
1234 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1237 # Set the time to the moment the original password has expired, and
1238 # perform a gensec logon.
1239 expired_time
= series
.start_of_interval(1)
1240 self
.set_db_time(samdb
, expired_time
)
1242 # Calculate the password with which to authenticate.
1243 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1244 managed_pwd
= self
.expected_gmsa_password_blob(
1247 current_series
.interval_gkid(0),
1248 previous_gkid
=current_series
.interval_gkid(-1),
1249 query_expiration_gkid
=current_series
.interval_gkid(1),
1252 # Set the new password.
1253 self
.assertIsNotNone(
1254 managed_pwd
.passwords
.current
, "current password must be present"
1256 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1258 # Perform a gensec logon.
1259 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1261 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1262 token
= session
.security_token
1263 token_sids
= token
.sids
1264 self
.assertGreater(len(token_sids
), 0)
1266 # Ensure that they match.
1267 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1269 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_next_key_is_expired(self
):
1270 password_interval
= 42
1272 samdb
= self
.get_local_samdb()
1273 series
= self
.gmsa_series(password_interval
)
1274 self
.set_db_time(samdb
, series
.start_of_interval(0))
1276 creds
= self
.gmsa_account(
1277 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1280 expired_time
= series
.start_of_interval(2)
1281 self
.set_db_time(samdb
, expired_time
)
1283 # Calculate the password with which to authenticate.
1284 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1285 managed_pwd
= self
.expected_gmsa_password_blob(
1288 current_series
.interval_gkid(0),
1289 previous_gkid
=current_series
.interval_gkid(-1),
1290 query_expiration_gkid
=current_series
.interval_gkid(1),
1293 # Set the new password.
1294 self
.assertIsNotNone(
1295 managed_pwd
.passwords
.current
, "current password must be present"
1297 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1299 # Perform a gensec logon.
1300 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1302 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1303 token
= session
.security_token
1304 token_sids
= token
.sids
1305 self
.assertGreater(len(token_sids
), 0)
1307 # Ensure that they match.
1308 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1310 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_valid(
1313 password_interval
= 43
1315 samdb
= self
.get_local_samdb()
1316 series
= self
.gmsa_series(password_interval
)
1317 self
.set_db_time(samdb
, series
.start_of_interval(0))
1319 creds
= self
.gmsa_account(
1320 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1323 self
.set_db_time(samdb
, series
.during_skew_window(0))
1325 # Calculate the password with which to authenticate.
1326 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1327 managed_pwd
= self
.expected_gmsa_password_blob(
1330 current_series
.interval_gkid(0),
1331 previous_gkid
=current_series
.interval_gkid(-1),
1332 query_expiration_gkid
=current_series
.interval_gkid(1),
1335 # Set the new password.
1336 self
.assertIsNotNone(
1337 managed_pwd
.passwords
.current
, "current password must be present"
1339 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1341 # Perform a gensec logon.
1342 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1344 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1345 token
= session
.security_token
1346 token_sids
= token
.sids
1347 self
.assertGreater(len(token_sids
), 0)
1349 # Ensure that they match.
1350 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1352 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_expired(
1355 password_interval
= 44
1357 samdb
= self
.get_local_samdb()
1358 series
= self
.gmsa_series(password_interval
)
1359 self
.set_db_time(samdb
, series
.start_of_interval(0))
1361 creds
= self
.gmsa_account(
1362 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1365 self
.set_db_time(samdb
, series
.during_skew_window(1))
1367 # Calculate the password with which to authenticate.
1368 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1369 managed_pwd
= self
.expected_gmsa_password_blob(
1372 current_series
.interval_gkid(0),
1373 previous_gkid
=current_series
.interval_gkid(-1),
1374 query_expiration_gkid
=current_series
.interval_gkid(1),
1377 # Set the new password.
1378 self
.assertIsNotNone(
1379 managed_pwd
.passwords
.current
, "current password must be present"
1381 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1383 # Perform a gensec logon.
1384 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1386 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1387 token
= session
.security_token
1388 token_sids
= token
.sids
1389 self
.assertGreater(len(token_sids
), 0)
1391 # Ensure that they match.
1392 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1394 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_next_key_is_expired(
1397 password_interval
= 47
1399 samdb
= self
.get_local_samdb()
1400 series
= self
.gmsa_series(password_interval
)
1401 self
.set_db_time(samdb
, series
.start_of_interval(0))
1403 creds
= self
.gmsa_account(
1404 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1407 self
.set_db_time(samdb
, series
.during_skew_window(2))
1409 # Calculate the password with which to authenticate.
1410 current_series
= self
.gmsa_series_for_account(samdb
, creds
, password_interval
)
1411 managed_pwd
= self
.expected_gmsa_password_blob(
1414 current_series
.interval_gkid(0),
1415 previous_gkid
=current_series
.interval_gkid(-1),
1416 query_expiration_gkid
=current_series
.interval_gkid(1),
1419 # Set the new password.
1420 self
.assertIsNotNone(
1421 managed_pwd
.passwords
.current
, "current password must be present"
1423 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1425 # Perform a gensec logon.
1426 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1428 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1429 token
= session
.security_token
1430 token_sids
= token
.sids
1431 self
.assertGreater(len(token_sids
), 0)
1433 # Ensure that they match.
1434 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1436 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_within_five_minutes(
1439 password_interval
= 123
1441 samdb
= self
.get_local_samdb()
1442 series
= self
.gmsa_series(password_interval
)
1443 self
.set_db_time(samdb
, series
.start_of_interval(0))
1445 creds
= self
.gmsa_account(
1446 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1449 # Set the time to within five minutes of the original password’s expiry,
1450 # and perform a gensec logon with the original password.
1451 expired_time
= series
.within_previous_password_valid_window(1)
1452 self
.set_db_time(samdb
, expired_time
)
1454 # Perform a gensec logon.
1455 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1457 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1458 token
= session
.security_token
1459 token_sids
= token
.sids
1460 self
.assertGreater(len(token_sids
), 0)
1462 # Ensure that they match.
1463 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1465 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_but_one_password_within_five_minutes(
1468 password_interval
= 123
1470 samdb
= self
.get_local_samdb()
1471 series
= self
.gmsa_series(password_interval
)
1472 self
.set_db_time(samdb
, series
.start_of_interval(0))
1474 creds
= self
.gmsa_account(
1475 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1478 # Set the time to within five minutes of the *following* password’s expiry,
1479 # and perform a gensec logon with the original password.
1480 expired_time
= series
.within_previous_password_valid_window(2)
1481 self
.set_db_time(samdb
, expired_time
)
1483 # Expect the gensec logon to fail.
1484 self
.gensec_ntlmssp_logon(creds
, samdb
, expect_success
=False)
1486 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_beyond_five_minutes(
1489 password_interval
= 456
1491 samdb
= self
.get_local_samdb()
1492 series
= self
.gmsa_series(password_interval
)
1493 self
.set_db_time(samdb
, series
.start_of_interval(0))
1495 creds
= self
.gmsa_account(
1496 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1499 # Set the time to five minutes beyond the original password’s expiry,
1500 # and try to perform a gensec logon with the original password.
1501 expired_time
= series
.outside_previous_password_valid_window(1)
1502 self
.set_db_time(samdb
, expired_time
)
1504 # Perform a gensec logon.
1505 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1507 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1508 token
= session
.security_token
1509 token_sids
= token
.sids
1510 self
.assertGreater(len(token_sids
), 0)
1512 # Ensure that they match.
1513 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sids
[0])
1515 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_password_five_minutes_apart(
1518 password_interval
= 789
1520 samdb
= self
.get_local_samdb()
1521 series
= self
.gmsa_series(password_interval
)
1522 self
.set_db_time(samdb
, series
.start_of_interval(0))
1524 creds
= self
.gmsa_account(
1525 samdb
=samdb
, interval
=password_interval
, kerberos_enabled
=False
1527 gmsa_sid
= creds
.get_sid()
1529 # Set the time to after the original password’s expiry, and perform a
1530 # gensec logon with the original password.
1531 db_time
= series
.during_interval(1)
1532 self
.set_db_time(samdb
, db_time
)
1534 # Perform a gensec logon.
1535 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1537 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1538 token
= session
.security_token
1539 token_sids
= token
.sids
1540 self
.assertGreater(len(token_sids
), 0)
1542 # Ensure that they match.
1543 self
.assertEqual(security
.dom_sid(gmsa_sid
), token_sids
[0])
1545 # Set the time to not quite five minutes later, and perform a gensec
1546 # logon with the original password.
1547 self
.set_db_time(samdb
, NtTime(db_time
+ MAX_CLOCK_SKEW
- 1))
1549 # Perform a gensec logon.
1550 session
= self
.gensec_ntlmssp_logon(creds
, samdb
)
1552 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1553 token
= session
.security_token
1554 token_sids
= token
.sids
1555 self
.assertGreater(len(token_sids
), 0)
1557 # Ensure that they match.
1558 self
.assertEqual(security
.dom_sid(gmsa_sid
), token_sids
[0])
1560 # Now set the time to exactly five minutes later, and try to perform a
1561 # gensec logon with the original password.
1562 self
.set_db_time(samdb
, NtTime(db_time
+ MAX_CLOCK_SKEW
))
1564 # Expect the gensec logon to fail.
1565 self
.gensec_ntlmssp_logon(creds
, samdb
, expect_success
=False)
1567 def test_gmsa_keys_when_previous_password_is_not_acceptable(self
):
1568 self
._check
_gmsa
_keys
(within_valid_window
=False, expect_previous_keys
=False)
1570 def test_gmsa_keys_when_previous_password_is_acceptable(self
):
1571 self
._check
_gmsa
_keys
(within_valid_window
=True, expect_previous_keys
=True)
1573 def _check_gmsa_keys(
1574 self
, *, within_valid_window
: bool, expect_previous_keys
: bool
1576 password_interval
= 77
1578 samdb
= self
.get_local_samdb()
1579 series
= self
.gmsa_series(password_interval
)
1580 self
.set_db_time(samdb
, series
.start_of_interval(0))
1582 creds
= self
.gmsa_account(samdb
=samdb
, interval
=password_interval
)
1584 if within_valid_window
:
1585 db_time
= series
.within_previous_password_valid_window(1)
1587 db_time
= series
.outside_previous_password_valid_window(1)
1588 self
.set_db_time(samdb
, db_time
)
1590 gmsa_principal
= f
"{creds.get_username()}@{creds.get_realm()}"
1592 ktfile
= os
.path
.join(self
.tempdir
, "test.keytab")
1593 self
.addCleanup(self
.rm_files
, ktfile
)
1595 net
= Net(None, self
.get_lp())
1599 principal
=gmsa_principal
,
1600 only_current_keys
=True,
1603 self
.assertTrue(os
.path
.exists(ktfile
), "keytab was not created")
1605 with
open(ktfile
, "rb") as bytes_kt
:
1606 keytab_bytes
= bytes_kt
.read()
1608 keytab_set
= keytab_as_set(keytab_bytes
)
1609 exported_etypes
= {entry
[1] for entry
in keytab_set
}
1611 # Ensure that the AES keys were exported.
1612 self
.assertLessEqual(
1613 {kcrypto
.Enctype
.AES256
, kcrypto
.Enctype
.AES128
}, exported_etypes
1617 creds
: KerberosCredentials
,
1618 keytab
: Set
[Tuple
[str, kcrypto
.Enctype
, int, bytes
]],
1619 etypes
: Iterable
[kcrypto
.Enctype
],
1621 for etype
in etypes
:
1622 key
= self
.TicketDecryptionKey_from_creds(creds
, etype
=etype
)
1624 entry
= gmsa_principal
, etype
, kvno
, key
.key
.contents
1626 self
.assertNotIn(entry
, keytab
, "key already present in keytab")
1629 expected_keytab
= set()
1631 if expect_previous_keys
:
1632 # Fill the expected keytab with the previous keys.
1633 fill_keytab(creds
, expected_keytab
, exported_etypes
)
1635 # Calculate the new password.
1636 managed_pwd
= self
.expected_gmsa_password_blob(
1639 series
.interval_gkid(1),
1640 previous_gkid
=series
.interval_gkid(0),
1641 query_expiration_gkid
=series
.interval_gkid(2),
1644 # Set the new password.
1645 self
.assertIsNotNone(
1646 managed_pwd
.passwords
.current
, "current password must be present"
1648 creds
.set_utf16_password(managed_pwd
.passwords
.current
)
1650 # Clear the initial set of keys associated with this credentials object.
1651 creds
.clear_forced_keys()
1652 # Add the current keys to the expected keytab.
1653 fill_keytab(creds
, expected_keytab
, exported_etypes
)
1655 # Ensure the keytab is as we expect.
1656 self
.assertEqual(expected_keytab
, keytab_set
)
1658 def test_gmsa_can_perform_netlogon(self
):
1659 self
._test
_samlogon
(
1660 self
.gmsa_account(kerberos_enabled
=False),
1661 netlogon
.NetlogonNetworkInformation
,
1662 validation_level
=netlogon
.NetlogonValidationSamInfo4
,
1665 def test_computer_cannot_perform_interactive_logon(self
):
1666 self
._test
_samlogon
(
1667 self
.get_mach_creds(),
1668 netlogon
.NetlogonInteractiveInformation
,
1669 expect_error
=ntstatus
.NT_STATUS_NO_SUCH_USER
,
1670 validation_level
=netlogon
.NetlogonValidationSamInfo4
,
1673 def test_gmsa_cannot_perform_interactive_logon(self
):
1674 self
._test
_samlogon
(
1675 self
.gmsa_account(kerberos_enabled
=False),
1676 netlogon
.NetlogonInteractiveInformation
,
1677 expect_error
=ntstatus
.NT_STATUS_NO_SUCH_USER
,
1678 validation_level
=netlogon
.NetlogonValidationSamInfo4
,
1681 def _gmsa_can_perform_as_req(self
, *, enctype
: kcrypto
.Enctype
) -> None:
1682 self
._as
_req
(self
.gmsa_account(), self
.get_service_creds(), enctype
)
1684 def test_gmsa_can_perform_as_req_with_aes256(self
):
1685 self
._gmsa
_can
_perform
_as
_req
(enctype
=kcrypto
.Enctype
.AES256
)
1687 def test_gmsa_can_perform_as_req_with_rc4(self
):
1688 self
._gmsa
_can
_perform
_as
_req
(enctype
=kcrypto
.Enctype
.RC4
)
1690 def _gmsa_can_authenticate_to_ldap(self
, *, with_kerberos
: bool) -> None:
1691 creds
= self
.gmsa_account(kerberos_enabled
=with_kerberos
)
1695 # Authenticate to LDAP.
1697 url
=f
"{protocol}://{self.dc_host}", credentials
=creds
, lp
=self
.get_lp()
1700 # Search for the user’s token groups.
1701 res
= samdb_user
.search("", scope
=ldb
.SCOPE_BASE
, attrs
=["tokenGroups"])
1702 self
.assertEqual(1, len(res
))
1704 token_groups
= res
[0].get("tokenGroups", idx
=0)
1705 self
.assertIsNotNone(token_groups
)
1707 # Ensure that the token SID matches.
1708 token_sid
= ndr_unpack(security
.dom_sid
, token_groups
)
1709 self
.assertEqual(security
.dom_sid(creds
.get_sid()), token_sid
)
1711 def test_gmsa_can_authenticate_to_ldap_with_kerberos(self
):
1712 self
._gmsa
_can
_authenticate
_to
_ldap
(with_kerberos
=True)
1714 def test_gmsa_can_authenticate_to_ldap_without_kerberos(self
):
1715 self
._gmsa
_can
_authenticate
_to
_ldap
(with_kerberos
=False)
1717 def test_gmsa_can_perform_ServerAuthenticate3(self
):
1718 creds
= self
.gmsa_account(kerberos_enabled
=False)
1719 username
= creds
.get_username()
1720 dc_server
= self
.get_samdb().host_dns_name()
1722 # Use the gMSA’s credentials to create a netlogon connection. This call,
1723 # which internally performs a ServerAuthenticate3, is more than just
1724 # setup: it is the centrepiece of the test.
1725 c
= netlogon
.netlogon(
1726 f
"ncacn_ip_tcp:{dc_server}[schannel,seal]", self
.get_lp(), creds
1728 credential
= netlogon
.netr_Credential()
1729 credential
.data
= list(b
"abcdefgh")
1730 server_credential
= c
.netr_ServerReqChallenge(None, username
, credential
)
1731 with self
.assertRaises(NTSTATUSError
) as err
:
1732 # Try performing a ServerAuthenticate3 with our gMSA account. The
1733 # procedure for calculating a correct challenge is too complicated
1734 # to try to reimplement in Python, so we won’t even try. But the
1735 # fact that we get an ACCESS_DENIED error, rather than something
1736 # like NO_TRUST_SAM_ACCOUNT, shows that gMSAs are not prevented from
1737 # using ServerAuthenticate3 to authenticate.
1738 c
.netr_ServerAuthenticate3(
1741 misc
.SEC_CHAN_WKSTA
,
1744 netlogon
.NETLOGON_NEG_STRONG_KEYS | netlogon
.NETLOGON_NEG_SUPPORTS_AES
,
1747 self
.assertEqual(ntstatus
.NT_STATUS_ACCESS_DENIED
, err
.exception
.args
[0])
1749 def test_gmsa_cannot_be_locked_out_with_gensec_ntlmssp(self
):
1750 def try_bad_creds(creds
: Credentials
, samdb
: SamDB
) -> None:
1751 self
.gensec_ntlmssp_logon(creds
, samdb
, expect_success
=False)
1753 self
._check
_gmsa
_cannot
_be
_locked
_out
(
1754 try_bad_creds_fn
=try_bad_creds
, kerberos_enabled
=False, local
=True
1757 def test_gmsa_cannot_be_locked_out_with_ldap_authentication(self
):
1758 def try_bad_creds(creds
: Credentials
, _samdb
: SamDB
) -> None:
1759 with self
.assertRaises(ldb
.LdbError
) as err
:
1760 SamDB(url
=f
"ldap://{self.dc_host}", credentials
=creds
, lp
=self
.get_lp())
1762 num
, estr
= err
.exception
.args
1764 self
.assertEqual(ldb
.ERR_INVALID_CREDENTIALS
, num
)
1765 self
.assertIn("NT_STATUS_LOGON_FAILURE", estr
)
1767 self
._check
_gmsa
_cannot
_be
_locked
_out
(try_bad_creds_fn
=try_bad_creds
)
1769 def _check_gmsa_cannot_be_locked_out(
1772 try_bad_creds_fn
: Callable
[[Credentials
, SamDB
], None],
1773 kerberos_enabled
: bool = True,
1774 local
: bool = False,
1776 samdb
= self
.get_local_samdb() if local
else self
.get_samdb()
1777 base_dn
= ldb
.Dn(samdb
, samdb
.domain_dn())
1779 def modify_attr(attr
, value
):
1782 flag
= ldb
.FLAG_MOD_DELETE
1785 flag
= ldb
.FLAG_MOD_REPLACE
1787 msg
= ldb
.Message(base_dn
)
1788 msg
[attr
] = ldb
.MessageElement(value
, flag
, attr
)
1791 res
= samdb
.search(base_dn
, scope
=ldb
.SCOPE_BASE
, attrs
=["lockoutThreshold"])
1792 self
.assertEqual(1, len(res
))
1794 # Reset the lockout threshold as it was before.
1795 lockout_threshold
= res
[0].get("lockoutThreshold", idx
=0)
1796 self
.addCleanup(modify_attr
, "lockoutThreshold", lockout_threshold
)
1798 # Set the new lockout threshold.
1799 lockout_threshold
= 3
1800 modify_attr("lockoutThreshold", lockout_threshold
)
1802 creds
= self
.gmsa_account(kerberos_enabled
=kerberos_enabled
)
1805 # Truncate the password to ensure that it is invalid.
1806 creds
.set_password(creds
.get_password()[:-1])
1808 prev_bad_pwd_time
= 0
1810 for i
in range(lockout_threshold
+ 1):
1811 try_bad_creds_fn(creds
, samdb
)
1813 # Ensure the account is not locked out.
1817 scope
=ldb
.SCOPE_BASE
,
1822 "msDS-User-Account-Control-Computed",
1825 self
.assertEqual(1, len(res
))
1827 # Despite the bad password count having increased, …
1828 bad_pwd_count
= int(res
[0].get("badPwdCount", idx
=0))
1829 self
.assertEqual(i
+ 1, bad_pwd_count
)
1831 # …the account should not be locked out.
1832 uac
= int(res
[0].get("msDS-User-Account-Control-Computed", idx
=0))
1833 self
.assertFalse(uac
& dsdb
.UF_LOCKOUT
)
1835 # The bad password time should have increased.
1836 bad_pwd_time
= int(res
[0].get("badPasswordTime", idx
=0))
1837 self
.assertGreater(bad_pwd_time
, prev_bad_pwd_time
)
1839 prev_bad_pwd_time
= bad_pwd_time
1841 # The lockout time should not be set.
1842 lockout_time
= res
[0].get("lockoutTime", idx
=0)
1843 self
.assertIsNone(lockout_time
)
1845 def _server_set_password(self
, creds
: Credentials
, password
: str) -> None:
1846 dc_server
= self
.get_samdb().host_dns_name()
1849 conn
= netlogon
.netlogon(f
"ncacn_ip_tcp:{dc_server}[schannel,seal]", lp
, creds
)
1851 auth
= creds
.new_client_authenticator()
1852 authenticator
= netlogon
.netr_Authenticator()
1853 authenticator
.cred
.data
= list(auth
["credential"])
1854 authenticator
.timestamp
= auth
["timestamp"]
1858 encoded
= password
.encode("utf-16-le")
1859 pwd_len
= len(encoded
)
1860 filler
= os
.urandom(DATA_LEN
- pwd_len
)
1862 pwd
= netlogon
.netr_CryptPassword()
1863 pwd
.length
= pwd_len
1864 pwd
.data
= list(filler
+ encoded
)
1865 creds
.encrypt_netr_crypt_password(pwd
)
1867 conn
.netr_ServerPasswordSet2(
1869 creds
.get_username(),
1870 misc
.SEC_CHAN_WKSTA
,
1871 creds
.get_workstation(),
1876 def test_gmsa_can_authenticate_with_previous_password_and_ntlm(self
):
1877 creds
= self
.gmsa_account(kerberos_enabled
=False)
1878 dc_server
= self
.get_samdb().host_dns_name()
1881 # We can use NTLM to authenticate.
1882 srvsvc
.srvsvc(f
"ncacn_np:{dc_server}", lp
, creds
)
1886 # Change the password using ServerPasswordSet2. Windows does not prevent
1887 # this, despite the whole point of Group Managed Service Accounts being
1888 # that the password is managed by AD, and despite changing passwords
1889 # outside of that system not making much sense.
1890 password_1
= generate_random_password(PWD_LEN
, PWD_LEN
)
1891 self
._server
_set
_password
(creds
, password_1
)
1893 # As less than five minutes have passed, we can still authenticate with
1894 # our original password.
1895 srvsvc
.srvsvc(f
"ncacn_np:{dc_server}", lp
, creds
)
1897 # Change the password again.
1898 password_2
= generate_random_password(PWD_LEN
, PWD_LEN
)
1899 self
._server
_set
_password
(creds
, password_2
)
1901 # This time, NTLM authentication fails!
1902 with self
.assertRaises(NTSTATUSError
) as err
:
1903 srvsvc
.srvsvc(f
"ncacn_np:{dc_server}", lp
, creds
)
1905 self
.assertEqual(ntstatus
.NT_STATUS_LOGON_FAILURE
, err
.exception
.args
[0])
1907 # But we can use the previous password to authenticate.
1908 creds
.update_password(password_1
)
1909 srvsvc
.srvsvc(f
"ncacn_np:{dc_server}", lp
, creds
)
1911 # And we can authenticate using the current password.
1912 creds
.update_password(password_2
)
1913 srvsvc
.srvsvc(f
"ncacn_np:{dc_server}", lp
, creds
)
1916 if __name__
== "__main__":