ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / blackbox / claims.py
blob3bedeed9512392499a6e73c15fe26a181e972ef0
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
4 # Blackbox tests for claims support
6 # Copyright (C) Catalyst.Net Ltd. 2023
8 # Written by Rob van der Linde <rob@catalyst.net.nz>
10 # This program is free software; you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation; either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 import os
26 from samba import NTSTATUSError
27 from samba.auth import AuthContext
28 from samba.credentials import Credentials
29 from samba.gensec import FEATURE_SEAL, Security
30 from samba.ntstatus import NT_STATUS_LOGON_FAILURE, NT_STATUS_UNSUCCESSFUL
31 from samba.tests import BlackboxTestCase
33 SERVER = os.environ["SERVER"]
34 SERVER_USERNAME = os.environ["USERNAME"]
35 SERVER_PASSWORD = os.environ["PASSWORD"]
37 HOST = f"ldap://{SERVER}"
38 CREDS = f"-U{SERVER_USERNAME}%{SERVER_PASSWORD}"
41 class ClaimsSupportTests(BlackboxTestCase):
42 """Blackbox tests for Claims support
44 NOTE: all these commands are subcommands of samba-tool.
46 NOTE: the addCleanup functions get called automatically in reverse
47 order after the tests finishes, they don't execute straight away.
48 """
50 def test_device_group_restrictions(self):
51 client_password = "T3stPassword0nly"
52 target_password = "T3stC0mputerPassword"
53 device_password = "T3stD3vicePassword"
55 # Create target computer.
56 self.check_run("computer create claims-server")
57 self.addCleanup(self.run_command, "computer delete claims-server")
58 self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
60 # Create device computer.
61 self.check_run("computer create claims-device")
62 self.addCleanup(self.run_command, "computer delete claims-device")
63 self.check_run(rf"user setpassword claims-device\$ --newpassword={device_password}")
65 # Create a user.
66 self.check_run(f"user create claimstestuser {client_password}")
67 self.addCleanup(self.run_command, "user delete claimstestuser")
69 # Create an authentication policy.
70 self.check_run("domain auth policy create --enforce --name=device-restricted-users-pol")
71 self.addCleanup(self.run_command,
72 "domain auth policy delete --name=device-restricted-users-pol")
74 self.check_run("group add allowed-devices")
75 self.addCleanup(self.run_command, "group delete allowed-devices")
77 # Set allowed to authenticate from.
78 self.check_run("domain auth policy user-allowed-to-authenticate-from set "
79 "--name=device-restricted-users-pol --device-group=allowed-devices")
81 self.check_run("user auth policy assign claimstestuser --policy=device-restricted-users-pol")
83 with self.assertRaises(NTSTATUSError) as error:
84 self.verify_access(
85 client_username="claimstestuser",
86 client_password=client_password,
87 target_hostname="claims-server",
88 target_username="claims-server",
89 target_password=target_password,
90 device_username="claims-device",
91 device_password=device_password,
94 self.assertEqual(error.exception.args[0], NT_STATUS_LOGON_FAILURE)
95 self.assertEqual(
96 error.exception.args[1],
97 "The attempted logon is invalid. This is either due to a "
98 "bad username or authentication information.")
100 self.check_run("group addmembers allowed-devices claims-device")
102 self.verify_access(
103 client_username="claimstestuser",
104 client_password=client_password,
105 target_hostname="claims-server",
106 target_username="claims-server",
107 target_password=target_password,
108 device_username="claims-device",
109 device_password=device_password,
112 def test_device_silo_restrictions(self):
113 client_password = "T3stPassword0nly"
114 target_password = "T3stC0mputerPassword"
115 device_password = "T3stD3vicePassword"
117 # Create target computer.
118 self.check_run("computer create claims-server")
119 self.addCleanup(self.run_command, "computer delete claims-server")
120 self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
122 # Create device computer.
123 self.check_run("computer create claims-device")
124 self.addCleanup(self.run_command, "computer delete claims-device")
125 self.check_run(rf"user setpassword claims-device\$ --newpassword={device_password}")
127 # Create a user.
128 self.check_run(f"user create claimstestuser {client_password}")
129 self.addCleanup(self.run_command, "user delete claimstestuser")
131 # Create an authentication policy.
132 self.check_run("domain auth policy create --enforce --name=allowed-devices-only-pol")
133 self.addCleanup(self.run_command,
134 "domain auth policy delete --name=allowed-devices-only-pol")
136 # Create an authentication silo.
137 self.check_run("domain auth silo create --enforce --name=allowed-devices-only-silo "
138 "--user-authentication-policy=allowed-devices-only-pol "
139 "--computer-authentication-policy=allowed-devices-only-pol "
140 "--service-authentication-policy=allowed-devices-only-pol")
141 self.addCleanup(self.run_command,
142 "domain auth silo delete --name=allowed-devices-only-silo")
144 # Set allowed to authenticate from (where the login can happen) and to
145 # (server requires silo that in term has this rule, so knows the user
146 # was required to authenticate from).
147 self.check_run("domain auth policy user-allowed-to-authenticate-from set "
148 "--name=allowed-devices-only-pol --device-silo=allowed-devices-only-silo")
150 # Grant access to silo.
151 self.check_run(r"domain auth silo member grant --name=allowed-devices-only-silo --member=claims-device\$")
152 self.check_run("domain auth silo member grant --name=allowed-devices-only-silo --member=claimstestuser")
154 # However with nothing assigned, allow-by-default still applies
155 self.verify_access(
156 client_username="claimstestuser",
157 client_password=client_password,
158 target_hostname="claims-server",
159 target_username="claims-server",
160 target_password=target_password,
163 # Show that adding a FAST armor from the device doesn't change
164 # things either way
165 self.verify_access(
166 client_username="claimstestuser",
167 client_password=client_password,
168 target_hostname="claims-server",
169 target_username="claims-server",
170 target_password=target_password,
171 device_username="claims-device",
172 device_password=device_password,
175 # Assign silo to the user.
176 self.check_run("user auth silo assign claimstestuser --silo=allowed-devices-only-silo")
178 # We fail, as the KDC now requires the silo but the client is not using an approved device
179 with self.assertRaises(NTSTATUSError) as error:
180 self.verify_access(
181 client_username="claimstestuser",
182 client_password=client_password,
183 target_hostname="claims-server",
184 target_username="claims-server",
185 target_password=target_password,
186 device_username="claims-device",
187 device_password=device_password,
190 self.assertEqual(error.exception.args[0], NT_STATUS_UNSUCCESSFUL)
191 self.assertIn(
192 "The requested operation was unsuccessful.",
193 error.exception.args[1])
195 # Assign silo to the device.
196 self.check_run(r"user auth silo assign claims-device\$ --silo=allowed-devices-only-silo")
198 self.verify_access(
199 client_username="claimstestuser",
200 client_password=client_password,
201 target_hostname="claims-server",
202 target_username="claims-server",
203 target_password=target_password,
204 device_username="claims-device",
205 device_password=device_password,
208 def test_device_and_server_silo_restrictions(self):
209 client_password = "T3stPassword0nly"
210 target_password = "T3stC0mputerPassword"
211 device_password = "T3stD3vicePassword"
213 # Create target computer.
214 self.check_run("computer create claims-server")
215 self.addCleanup(self.run_command, "computer delete claims-server")
216 self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
218 # Create device computer.
219 self.check_run("computer create claims-device")
220 self.addCleanup(self.run_command, "computer delete claims-device")
221 self.check_run(rf"user setpassword claims-device\$ --newpassword={device_password}")
223 # Create a user.
224 self.check_run(f"user create claimstestuser {client_password}")
225 self.addCleanup(self.run_command, "user delete claimstestuser")
227 # Create an authentication policy.
228 self.check_run("domain auth policy create --enforce --name=allowed-devices-only-pol")
229 self.addCleanup(self.run_command,
230 "domain auth policy delete --name=allowed-devices-only-pol")
232 # Create an authentication silo.
233 self.check_run("domain auth silo create --enforce --name=allowed-devices-only-silo "
234 "--user-authentication-policy=allowed-devices-only-pol "
235 "--computer-authentication-policy=allowed-devices-only-pol "
236 "--service-authentication-policy=allowed-devices-only-pol")
237 self.addCleanup(self.run_command,
238 "domain auth silo delete --name=allowed-devices-only-silo")
240 # Set allowed to authenticate from (where the login can happen) and to
241 # (server requires silo that in term has this rule, so knows the user
242 # was required to authenticate from).
243 # If we assigned services to the silo we would need to add
244 # --service-allowed-to-authenticate-to/from options as well.
245 # Likewise, if there are services running in user accounts, we need
246 # --user-allowed-to-authenticate-to
247 self.check_run("domain auth policy user-allowed-to-authenticate-from set "
248 "--name=allowed-devices-only-pol --device-silo=allowed-devices-only-silo")
249 self.check_run("domain auth policy computer-allowed-to-authenticate-to set "
250 "--name=allowed-devices-only-pol --by-silo=allowed-devices-only-silo")
252 # Grant access to silo.
253 self.check_run(r"domain auth silo member grant --name=allowed-devices-only-silo --member=claims-device\$")
254 self.check_run(r"domain auth silo member grant --name=allowed-devices-only-silo --member=claims-server\$")
255 self.check_run("domain auth silo member grant --name=allowed-devices-only-silo --member=claimstestuser")
257 # However with nothing assigned, allow-by-default still applies
258 self.verify_access(
259 client_username="claimstestuser",
260 client_password=client_password,
261 target_hostname="claims-server",
262 target_username="claims-server",
263 target_password=target_password,
266 # Show that adding a FAST armor from the device doesn't change
267 # things either way
268 self.verify_access(
269 client_username="claimstestuser",
270 client_password=client_password,
271 target_hostname="claims-server",
272 target_username="claims-server",
273 target_password=target_password,
274 device_username="claims-device",
275 device_password=device_password,
278 self.check_run(r"user auth silo assign claims-server\$ --silo=allowed-devices-only-silo")
280 # We fail, as the server now requires the silo but the client is not in it
281 with self.assertRaises(NTSTATUSError) as error:
282 self.verify_access(
283 client_username="claimstestuser",
284 client_password=client_password,
285 target_hostname="claims-server",
286 target_username="claims-server",
287 target_password=target_password,
288 device_username="claims-device",
289 device_password=device_password,
292 self.assertEqual(error.exception.args[0], NT_STATUS_LOGON_FAILURE)
293 self.assertEqual(
294 error.exception.args[1],
295 "The attempted logon is invalid. This is either due to a "
296 "bad username or authentication information.")
298 # Assign silo to the user.
299 self.check_run("user auth silo assign claimstestuser --silo=allowed-devices-only-silo")
301 # We fail, as the KDC now requires the silo but the client not is using an approved device
302 with self.assertRaises(NTSTATUSError) as error:
303 self.verify_access(
304 client_username="claimstestuser",
305 client_password=client_password,
306 target_hostname="claims-server",
307 target_username="claims-server",
308 target_password=target_password,
309 device_username="claims-device",
310 device_password=device_password,
313 self.assertEqual(error.exception.args[0], NT_STATUS_UNSUCCESSFUL)
314 self.assertIn(
315 "The requested operation was unsuccessful.",
316 error.exception.args[1])
318 # Assign silo to the device.
319 self.check_run(r"user auth silo assign claims-device\$ --silo=allowed-devices-only-silo")
321 self.verify_access(
322 client_username="claimstestuser",
323 client_password=client_password,
324 target_hostname="claims-server",
325 target_username="claims-server",
326 target_password=target_password,
327 device_username="claims-device",
328 device_password=device_password,
331 def test_user_group_access(self):
332 """An example use with groups."""
333 client_password = "T3stPassword0nly"
334 target_password = "T3stC0mputerPassword"
336 # Create a computer.
337 self.check_run("computer create claims-server")
338 self.addCleanup(self.run_command, "computer delete claims-server")
339 self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
341 # Create a user.
342 self.check_run(f"user create claimstestuser {client_password}")
343 self.addCleanup(self.run_command, "user delete claimstestuser")
345 # Create an authentication policy.
346 self.check_run("domain auth policy create --enforce --name=restricted-servers-pol")
347 self.addCleanup(self.run_command,
348 "domain auth policy delete --name=restricted-servers-pol")
350 self.check_run("group add server-access-group")
351 self.addCleanup(self.run_command, "group delete server-access-group")
353 # Set allowed to authenticate to.
354 self.check_run("domain auth policy computer-allowed-to-authenticate-to set "
355 "--name=restricted-servers-pol --by-group=server-access-group")
357 self.check_run(r"user auth policy assign claims-server\$ --policy=restricted-servers-pol")
359 with self.assertRaises(NTSTATUSError) as error:
360 self.verify_access(
361 client_username="claimstestuser",
362 client_password=client_password,
363 target_hostname="claims-server",
364 target_username="claims-server",
365 target_password=target_password,
368 self.assertEqual(error.exception.args[0], NT_STATUS_LOGON_FAILURE)
369 self.assertEqual(
370 error.exception.args[1],
371 "The attempted logon is invalid. This is either due to a "
372 "bad username or authentication information.")
374 # Add group members.
375 self.check_run("group addmembers server-access-group claimstestuser")
377 self.verify_access(
378 client_username="claimstestuser",
379 client_password=client_password,
380 target_hostname="claims-server",
381 target_username="claims-server",
382 target_password=target_password,
385 def test_user_silo_access(self):
386 """An example use with authentication silos."""
387 client_password = "T3stPassword0nly"
388 target_password = "T3stC0mputerPassword"
390 # Create a computer.
391 self.check_run("computer create claims-server")
392 self.addCleanup(self.run_command, "computer delete claims-server")
393 self.check_run(rf"user setpassword claims-server\$ --newpassword={target_password}")
395 # Create a user.
396 self.check_run(f"user create claimstestuser {client_password}")
397 self.addCleanup(self.run_command, "user delete claimstestuser")
399 # Create an authentication policy.
400 self.check_run("domain auth policy create --enforce --name=restricted-servers-pol")
401 self.addCleanup(self.run_command,
402 "domain auth policy delete --name=restricted-servers-pol")
404 # Create an authentication silo.
405 self.check_run("domain auth silo create --enforce --name=restricted-servers-silo "
406 "--user-authentication-policy=restricted-servers-pol "
407 "--computer-authentication-policy=restricted-servers-pol "
408 "--service-authentication-policy=restricted-servers-pol")
409 self.addCleanup(self.run_command,
410 "domain auth silo delete --name=restricted-servers-silo")
412 # Set allowed to authenticate to.
413 self.check_run("domain auth policy computer-allowed-to-authenticate-to set "
414 "--name=restricted-servers-pol --by-silo=restricted-servers-silo")
416 # Grant access to silo.
417 self.check_run(r"domain auth silo member grant --name=restricted-servers-silo --member=claims-server\$")
418 self.check_run("domain auth silo member grant --name=restricted-servers-silo --member=claimstestuser")
420 self.verify_access(
421 client_username="claimstestuser",
422 client_password=client_password,
423 target_hostname="claims-server",
424 target_username="claims-server",
425 target_password=target_password,
428 self.check_run(r"user auth silo assign claims-server\$ --silo=restricted-servers-silo")
430 with self.assertRaises(NTSTATUSError) as error:
431 self.verify_access(
432 client_username="claimstestuser",
433 client_password=client_password,
434 target_hostname="claims-server",
435 target_username="claims-server",
436 target_password=target_password,
439 self.assertEqual(error.exception.args[0], NT_STATUS_LOGON_FAILURE)
440 self.assertEqual(
441 error.exception.args[1],
442 "The attempted logon is invalid. This is either due to a "
443 "bad username or authentication information.")
445 # Set assigned silo on user and computer.
446 self.check_run("user auth silo assign claimstestuser --silo=restricted-servers-silo")
448 self.verify_access(
449 client_username="claimstestuser",
450 client_password=client_password,
451 target_hostname="claims-server",
452 target_username="claims-server",
453 target_password=target_password,
456 @classmethod
457 def _make_cmdline(cls, line):
458 """Override to pass line as samba-tool subcommand instead.
460 Automatically fills in HOST and CREDS as well.
462 if isinstance(line, list):
463 cmd = ["samba-tool"] + line + ["-H", HOST, CREDS]
464 else:
465 cmd = f"samba-tool {line} -H {HOST} {CREDS}"
467 return super()._make_cmdline(cmd)
469 def verify_access(self, client_username, client_password,
470 target_hostname, target_username, target_password, *,
471 device_username=None, device_password=None):
473 lp = self.get_loadparm()
475 client_creds = Credentials()
476 client_creds.set_username(client_username)
477 client_creds.set_password(client_password)
478 client_creds.guess(lp)
480 if device_username:
481 device_creds = Credentials()
482 device_creds.set_username(device_username)
483 device_creds.set_password(device_password)
484 device_creds.guess(lp)
485 client_creds.set_krb5_fast_armor_credentials(device_creds, True)
487 target_creds = Credentials()
488 target_creds.set_username(target_username)
489 target_creds.set_password(target_password)
490 target_creds.guess(lp)
492 settings = {
493 "lp_ctx": lp,
494 "target_hostname": target_hostname
497 gensec_client = Security.start_client(settings)
498 gensec_client.set_credentials(client_creds)
499 gensec_client.want_feature(FEATURE_SEAL)
500 gensec_client.start_mech_by_sasl_name("GSSAPI")
502 gensec_target = Security.start_server(settings=settings,
503 auth_context=AuthContext(lp_ctx=lp))
504 gensec_target.set_credentials(target_creds)
505 gensec_target.start_mech_by_sasl_name("GSSAPI")
507 client_finished = False
508 server_finished = False
509 client_to_server = b""
510 server_to_client = b""
512 # Operate as both the client and the server to verify the user's
513 # credentials.
514 while not client_finished or not server_finished:
515 if not client_finished:
516 print("running client gensec_update")
517 client_finished, client_to_server = gensec_client.update(
518 server_to_client)
519 if not server_finished:
520 print("running server gensec_update")
521 server_finished, server_to_client = gensec_target.update(
522 client_to_server)
525 if __name__ == "__main__":
526 import unittest
527 unittest.main()