ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / blackbox / rpcd_witness_samba_only.py
blob38bc7e078c3deb6d05f6e09fd4299510c04905a0
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
4 # Copyright © 2024 Stefan Metzmacher <metze@samba.org>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 import sys
21 import os
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
26 import json
28 import samba.tests
29 from samba.credentials import Credentials
30 from samba.ndr import ndr_print
31 from samba.dcerpc import witness
32 from samba.tests import DynamicTestCase, BlackboxTestCase
33 from samba.common import get_string
34 from samba import werror, WERRORError
36 @DynamicTestCase
37 class RpcdWitnessSambaTests(BlackboxTestCase):
38 @classmethod
39 def setUpDynamicTestCases(cls):
40 cls.num_nodes = int(samba.tests.env_get_var_value('NUM_NODES'))
42 def _define_tests(idx1, idx2, ndr64=False):
43 cls._define_GetInterfaceList_test(idx1, idx2, ndr64)
44 if idx1 == 0 and idx2 != -1:
45 cls._define_ResourceChangeCTDB_tests(idx1, idx2, ndr64)
47 for idx1 in range(0, cls.num_nodes):
48 _define_tests(idx1, -1, ndr64=False)
49 _define_tests(idx1, -1, ndr64=True)
50 for idx2 in range(0, cls.num_nodes):
51 _define_tests(idx1, idx2, ndr64=False)
52 _define_tests(idx1, idx2, ndr64=True)
54 def setUp(self):
55 super().setUp()
57 # ctdb/tests/local_daemons.sh doesn't like CTDB_SOCKET to be set already
58 # and it doesn't need CTDB_BASE, so we stash them away
59 self.saved_CTDB_SOCKET = samba.tests.env_get_var_value('CTDB_SOCKET',
60 allow_missing=True)
61 if self.saved_CTDB_SOCKET is not None:
62 del os.environ["CTDB_SOCKET"]
63 self.saved_CTDB_BASE = samba.tests.env_get_var_value('CTDB_BASE',
64 allow_missing=True)
65 if self.saved_CTDB_BASE is not None:
66 del os.environ["CTDB_BASE"]
68 self.disabled_idx = -1
70 # set this to True in order to get verbose output
71 self.verbose = False
73 self.ctdb_prefix = samba.tests.env_get_var_value('CTDB_PREFIX')
75 self.cluster_share = samba.tests.env_get_var_value('CLUSTER_SHARE')
77 self.lp = self.get_loadparm(s3=True)
78 self.remote_domain = samba.tests.env_get_var_value('DOMAIN')
79 self.remote_user = samba.tests.env_get_var_value('USERNAME')
80 self.remote_password = samba.tests.env_get_var_value('PASSWORD')
81 self.remote_creds = Credentials()
82 self.remote_creds.guess(self.lp)
83 self.remote_creds.set_username(self.remote_user)
84 self.remote_creds.set_domain(self.remote_domain)
85 self.remote_creds.set_password(self.remote_password)
87 self.server_hostname = samba.tests.env_get_var_value('SERVER_HOSTNAME')
88 self.interface_group_name = samba.tests.env_get_var_value('INTERFACE_GROUP_NAME')
90 common_binding_args = "spnego,sign,target_hostname=%s" % (
91 self.server_hostname)
92 if self.verbose:
93 common_binding_args += ",print"
95 common_binding_args32 = common_binding_args
96 common_binding_args64 = common_binding_args + ",ndr64"
98 self.nodes = []
99 for node_idx in range(0, self.num_nodes):
100 node = {}
102 name_var = 'CTDB_SERVER_NAME_NODE%u' % node_idx
103 node["name"] = samba.tests.env_get_var_value(name_var)
105 ip_var = 'CTDB_IFACE_IP_NODE%u' % node_idx
106 node["ip"] = samba.tests.env_get_var_value(ip_var)
108 node["binding_string32"] = "ncacn_ip_tcp:%s[%s]" % (
109 node["ip"], common_binding_args32)
110 node["binding_string64"] = "ncacn_ip_tcp:%s[%s]" % (
111 node["ip"], common_binding_args64)
112 self.nodes.append(node)
114 self.all_registrations = None
116 def tearDown(self):
117 self.destroy_all_registrations()
119 if self.disabled_idx != -1:
120 self.enable_node(self.disabled_idx)
122 if self.saved_CTDB_SOCKET is not None:
123 os.environ["CTDB_SOCKET"] = self.saved_CTDB_SOCKET
124 self.saved_CTDB_SOCKET = None
125 if self.saved_CTDB_BASE is not None:
126 os.environ["CTDB_BASE"] = self.saved_CTDB_BASE
127 self.saved_CTDB_BASE = None
129 super().tearDown()
131 def call_onnode(self, nodes, cmd):
132 COMMAND = "ctdb/tests/local_daemons.sh"
134 argv = "%s '%s' onnode %s '%s'" % (COMMAND, self.ctdb_prefix, nodes, cmd)
136 try:
137 if self.verbose:
138 print("Calling: %s" % argv)
139 out = self.check_output(argv)
140 except samba.tests.BlackboxProcessError as e:
141 self.fail("Error calling [%s]: %s" % (argv, e))
143 out_str = get_string(out)
144 return out_str
146 def dump_ctdb_status_all(self):
147 for node_idx in range(0, self.num_nodes):
148 print("%s" % self.call_onnode(str(node_idx), "ctdb status"))
150 def disable_node(self, node_idx, dump_status=False):
151 if dump_status:
152 self.dump_ctdb_status_all()
154 self.assertEqual(self.disabled_idx, -1)
155 self.call_onnode(str(node_idx), "ctdb disable")
156 self.disabled_idx = node_idx
158 if dump_status:
159 self.dump_ctdb_status_all()
161 def enable_node(self, node_idx, dump_status=False):
162 if dump_status:
163 self.dump_ctdb_status_all()
165 self.assertEqual(self.disabled_idx, node_idx)
166 self.call_onnode(str(node_idx), "ctdb enable")
167 self.disabled_idx = -1
169 if dump_status:
170 self.dump_ctdb_status_all()
172 def call_net_witness_subcmd(self, subcmd,
173 as_json=False,
174 apply_to_all=False,
175 registration=None,
176 net_name=None,
177 share_name=None,
178 ip_address=None,
179 client_computer=None,
180 new_ip=None,
181 new_node=None,
182 forced_response=None):
183 COMMAND = "UID_WRAPPER_ROOT=1 bin/net witness"
185 argv = "%s %s" % (COMMAND, subcmd)
186 if as_json:
187 argv += " --json"
189 if apply_to_all:
190 argv += " --witness-apply-to-all"
192 if registration is not None:
193 argv += " --witness-registration='%s'" % (
194 registration.uuid)
196 if net_name is not None:
197 argv += " --witness-net-name='%s'" % (net_name)
199 if share_name is not None:
200 argv += " --witness-share-name='%s'" % (share_name)
202 if ip_address is not None:
203 argv += " --witness-ip-address='%s'" % (ip_address)
205 if client_computer is not None:
206 argv += " --witness-client-computer-name='%s'" % (client_computer)
208 if new_ip is not None:
209 argv += " --witness-new-ip='%s'" % (new_ip)
211 if new_node is not None:
212 argv += " --witness-new-node='%s'" % (new_node)
214 if forced_response:
215 argv += " --witness-forced-response='%s'" % (forced_response)
217 try:
218 if self.verbose:
219 print("Calling: %s" % argv)
220 out = self.check_output(argv)
221 except samba.tests.BlackboxProcessError as e:
222 self.fail("Error calling [%s]: %s" % (argv, e))
224 out_str = get_string(out)
225 if not as_json:
226 return out_str
228 json_out = json.loads(out_str)
229 return json_out
231 @classmethod
232 def _define_GetInterfaceList_test(cls, conn_idx, disable_idx, ndr64=False):
233 if disable_idx != -1:
234 disable_name = "%u_disabled" % disable_idx
235 else:
236 disable_name = "all_enabled"
238 if ndr64:
239 ndr_name = "NDR64"
240 else:
241 ndr_name = "NDR32"
243 name = "Node%u_%s_%s" % (conn_idx, disable_name, ndr_name)
244 args = {
245 'conn_idx': conn_idx,
246 'disable_idx': disable_idx,
247 'ndr64': ndr64,
249 cls.generate_dynamic_test('test_GetInterfaceList', name, args)
251 def _test_GetInterfaceList_with_args(self, args):
252 conn_idx = args.pop('conn_idx')
253 disable_idx = args.pop('disable_idx')
254 ndr64 = args.pop('ndr64')
255 self.assertEqual(len(args.keys()), 0)
257 conn_node = self.nodes[conn_idx]
258 if ndr64:
259 binding_string = conn_node["binding_string64"]
260 else:
261 binding_string = conn_node["binding_string32"]
263 if disable_idx != -1:
264 self.disable_node(disable_idx)
266 conn = witness.witness(binding_string, self.lp, self.remote_creds)
267 interface_list = conn.GetInterfaceList()
269 if disable_idx != -1:
270 self.enable_node(disable_idx)
272 self.assertIsNotNone(interface_list)
273 self.assertEqual(interface_list.num_interfaces, len(self.nodes))
274 for idx in range(0, interface_list.num_interfaces):
275 iface = interface_list.interfaces[idx]
276 node = self.nodes[idx]
278 expected_flags = 0
279 expected_flags |= witness.WITNESS_INFO_IPv4_VALID
280 if conn_idx != idx:
281 expected_flags |= witness.WITNESS_INFO_WITNESS_IF
283 if disable_idx == idx:
284 expected_state = witness.WITNESS_STATE_UNAVAILABLE
285 else:
286 expected_state = witness.WITNESS_STATE_AVAILABLE
288 self.assertIsNotNone(iface.group_name)
289 self.assertEqual(iface.group_name.lower(),
290 self.interface_group_name.lower())
292 self.assertEqual(iface.version, witness.WITNESS_V2)
293 self.assertEqual(iface.state, expected_state)
295 self.assertIsNotNone(iface.ipv4)
296 self.assertEqual(iface.ipv4, node["ip"])
298 self.assertIsNotNone(iface.ipv6)
299 self.assertEqual(iface.ipv6,
300 "0000:0000:0000:0000:0000:0000:0000:0000")
302 self.assertEqual(iface.flags, expected_flags)
304 def assertResourceChanges(self, response, expected_resource_changes):
305 self.assertIsNotNone(response)
306 self.assertEqual(response.type,
307 witness.WITNESS_NOTIFY_RESOURCE_CHANGE)
308 self.assertEqual(response.num, len(expected_resource_changes))
309 self.assertEqual(len(response.messages), len(expected_resource_changes))
310 for ri in range(0, len(expected_resource_changes)):
311 expected_resource_change = expected_resource_changes[ri]
312 resource_change = response.messages[ri]
313 self.assertIsNotNone(resource_change)
315 expected_type = witness.WITNESS_RESOURCE_STATE_UNAVAILABLE
316 expected_type = expected_resource_change.get('type', expected_type)
318 expected_name = expected_resource_change.get('name')
320 self.assertEqual(resource_change.type, expected_type)
321 self.assertIsNotNone(resource_change.name)
322 self.assertEqual(resource_change.name, expected_name)
324 def assertResourceChange(self, response, expected_type, expected_name):
325 expected_resource_change = {
326 'type': expected_type,
327 'name': expected_name,
329 expected_resource_changes = [expected_resource_change]
330 self.assertResourceChanges(response, expected_resource_changes)
332 def assertGenericIpLists(self, response, expected_type, expected_ip_lists):
333 self.assertIsNotNone(response)
334 self.assertEqual(response.type, expected_type)
335 self.assertEqual(response.num, len(expected_ip_lists))
336 self.assertEqual(len(response.messages), len(expected_ip_lists))
337 for li in range(0, len(expected_ip_lists)):
339 expected_ip_list = expected_ip_lists[li]
340 ip_list = response.messages[li]
341 self.assertIsNotNone(ip_list)
342 self.assertEqual(ip_list.num, len(expected_ip_list))
344 for i in range(0, len(expected_ip_list)):
345 ip_info = ip_list.addr[i]
347 expected_flags = 0
348 expected_flags |= witness.WITNESS_IPADDR_V4
349 expected_flags |= witness.WITNESS_IPADDR_ONLINE
350 expected_flags = expected_ip_list[i].get('flags', expected_flags)
352 expected_ipv4 = '0.0.0.0'
353 expected_ipv4 = expected_ip_list[i].get('ipv4', expected_ipv4)
355 expected_ipv6 = '0000:0000:0000:0000:0000:0000:0000:0000'
356 expected_ipv6 = expected_ip_list[i].get('ipv6', expected_ipv6)
358 self.assertEqual(ip_info.flags, expected_flags)
360 self.assertIsNotNone(ip_info.ipv4)
361 self.assertEqual(ip_info.ipv4, expected_ipv4)
363 self.assertIsNotNone(ip_info.ipv6)
364 self.assertEqual(ip_info.ipv6, expected_ipv6)
366 @classmethod
367 def _define_ResourceChangeCTDB_tests(cls, conn_idx, monitor_idx, ndr64=False):
368 if ndr64:
369 ndr_name = "NDR64"
370 else:
371 ndr_name = "NDR32"
373 name_suffix = "WNode%u_RNode%u_%s" % (conn_idx, monitor_idx, ndr_name)
374 base_args = {
375 'conn_idx': conn_idx,
376 'monitor_idx': monitor_idx,
377 'ndr64': ndr64,
380 name = "v1_disabled_after_%s" % name_suffix
381 args = base_args.copy()
382 args['reg_v1'] = True
383 args['disable_after_reg'] = True
384 args['explicit_unregister'] = False
385 cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
387 name = "v1_disabled_after_enabled_after_%s" % name_suffix
388 args = base_args.copy()
389 args['reg_v1'] = True
390 args['disable_after_reg'] = True
391 args['enable_after_reg'] = True
392 args['explicit_unregister'] = False
393 cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
395 name = "v2_disabled_before_enable_after_%s" % name_suffix
396 args = base_args.copy()
397 args['disable_before_reg'] = True
398 args['enable_after_reg'] = True
399 args['wait_for_timeout'] = True
400 args['timeout'] = 6
401 cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
403 name = "v2_disabled_after_%s" % name_suffix
404 args = base_args.copy()
405 args['disable_after_reg'] = True
406 args['wait_for_not_found'] = True
407 args['explicit_unregister'] = False
408 cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
410 name = "v2_disabled_after_enabled_after_%s" % name_suffix
411 args = base_args.copy()
412 args['disable_after_reg'] = True
413 args['enable_after_reg'] = True
414 args['wait_for_not_found'] = True
415 args['explicit_unregister'] = False
416 cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
418 name = "share_v2_disabled_before_enable_after_%s" % name_suffix
419 args = base_args.copy()
420 args['share_reg'] = True
421 args['disable_before_reg'] = True
422 args['enable_after_reg'] = True
423 cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
425 name = "share_v2_disabled_after_%s" % name_suffix
426 args = base_args.copy()
427 args['share_reg'] = True
428 args['disable_after_reg'] = True
429 args['explicit_unregister'] = False
430 cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
432 name = "share_v2_disabled_after_enabled_after_%s" % name_suffix
433 args = base_args.copy()
434 args['share_reg'] = True
435 args['disable_after_reg'] = True
436 args['enable_after_reg'] = True
437 args['explicit_unregister'] = False
438 cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
440 def _test_ResourceChangeCTDB_with_args(self, args):
441 conn_idx = args.pop('conn_idx')
442 monitor_idx = args.pop('monitor_idx')
443 ndr64 = args.pop('ndr64')
444 timeout = int(args.pop('timeout', 15))
445 reg_v1 = args.pop('reg_v1', False)
446 share_reg = args.pop('share_reg', False)
447 disable_before_reg = args.pop('disable_before_reg', False)
448 disable_after_reg = args.pop('disable_after_reg', False)
449 enable_after_reg = args.pop('enable_after_reg', False)
450 explicit_unregister = args.pop('explicit_unregister', True)
451 wait_for_not_found = args.pop('wait_for_not_found', False)
452 wait_for_timeout = args.pop('wait_for_timeout', False)
453 self.assertEqual(len(args.keys()), 0)
455 conn_node = self.nodes[conn_idx]
456 if ndr64:
457 binding_string = conn_node["binding_string64"]
458 else:
459 binding_string = conn_node["binding_string32"]
460 monitor_node = self.nodes[monitor_idx]
462 computer_name = "test-rpcd-witness-samba-only-client-computer"
464 conn = witness.witness(binding_string, self.lp, self.remote_creds)
466 if disable_before_reg:
467 self.assertFalse(disable_after_reg)
468 self.disable_node(monitor_idx)
470 if reg_v1:
471 self.assertFalse(wait_for_timeout)
472 self.assertFalse(share_reg)
474 reg_context = conn.Register(witness.WITNESS_V1,
475 self.server_hostname,
476 monitor_node["ip"],
477 computer_name)
478 else:
479 if share_reg:
480 share_name = self.cluster_share
481 else:
482 share_name = None
484 reg_context = conn.RegisterEx(witness.WITNESS_V2,
485 self.server_hostname,
486 share_name,
487 monitor_node["ip"],
488 computer_name,
489 witness.WITNESS_REGISTER_NONE,
490 timeout)
492 if disable_after_reg:
493 self.assertFalse(disable_before_reg)
494 self.disable_node(monitor_idx)
496 if enable_after_reg:
497 self.enable_node(monitor_idx)
499 if disable_after_reg:
500 response_unavailable = conn.AsyncNotify(reg_context)
501 self.assertResourceChange(response_unavailable,
502 witness.WITNESS_RESOURCE_STATE_UNAVAILABLE,
503 monitor_node["ip"])
505 if enable_after_reg:
506 response_available = conn.AsyncNotify(reg_context)
507 self.assertResourceChange(response_available,
508 witness.WITNESS_RESOURCE_STATE_AVAILABLE,
509 monitor_node["ip"])
511 if wait_for_timeout:
512 self.assertFalse(wait_for_not_found)
513 self.assertFalse(disable_after_reg)
514 try:
515 _ = conn.AsyncNotify(reg_context)
516 self.fail()
517 except WERRORError as e:
518 (num, string) = e.args
519 if num != werror.WERR_TIMEOUT:
520 raise
522 if wait_for_not_found:
523 self.assertFalse(wait_for_timeout)
524 self.assertTrue(disable_after_reg)
525 self.assertFalse(explicit_unregister)
526 try:
527 _ = conn.AsyncNotify(reg_context)
528 self.fail()
529 except WERRORError as e:
530 (num, string) = e.args
531 if num != werror.WERR_NOT_FOUND:
532 raise
534 if not explicit_unregister:
535 return
537 conn.UnRegister(reg_context)
539 try:
540 _ = conn.AsyncNotify(reg_context)
541 self.fail()
542 except WERRORError as e:
543 (num, string) = e.args
544 if num != werror.WERR_NOT_FOUND:
545 raise
547 try:
548 conn.UnRegister(reg_context)
549 self.fail()
550 except WERRORError as e:
551 (num, string) = e.args
552 if num != werror.WERR_NOT_FOUND:
553 raise
555 def prepare_all_registrations(self):
556 self.assertIsNone(self.all_registrations)
558 regs = []
559 for node_idx in range(0, self.num_nodes):
560 node = self.nodes[node_idx]
561 for ndr64 in [False, True]:
562 if ndr64:
563 binding_string = node["binding_string64"]
564 ndr_name = "NDR64"
565 else:
566 binding_string = node["binding_string32"]
567 ndr_name = "NDR32"
569 conn = witness.witness(binding_string, self.lp, self.remote_creds)
570 conn_ip = node["ip"]
572 net_name = self.server_hostname
573 ip_address = node["ip"]
574 share_name = self.cluster_share
575 computer_name = "test-net-witness-list-%s-%s" % (
576 node_idx, ndr_name)
577 flags = witness.WITNESS_REGISTER_NONE
578 timeout = 15
580 reg_version = witness.WITNESS_V1
581 reg = {
582 'node_idx': node_idx,
583 'ndr64': ndr64,
584 'binding_string': binding_string,
585 'conn_ip': conn_ip,
586 'version': reg_version,
587 'net_name': net_name,
588 'share_name': None,
589 'ip_address': ip_address,
590 'computer_name': computer_name,
591 'flags': 0,
592 'timeout': 0,
593 'conn': conn,
594 'context': None,
596 regs.append(reg)
598 reg_version = witness.WITNESS_V2
599 reg = {
600 'node_idx': node_idx,
601 'ndr64': ndr64,
602 'binding_string': binding_string,
603 'conn_ip': conn_ip,
604 'version': reg_version,
605 'net_name': net_name,
606 'share_name': None,
607 'ip_address': ip_address,
608 'computer_name': computer_name,
609 'flags': flags,
610 'timeout': timeout,
611 'conn': conn,
612 'context': None,
614 regs.append(reg)
616 reg = {
617 'node_idx': node_idx,
618 'ndr64': ndr64,
619 'binding_string': binding_string,
620 'conn_ip': conn_ip,
621 'version': reg_version,
622 'net_name': net_name,
623 'share_name': share_name,
624 'ip_address': ip_address,
625 'computer_name': computer_name,
626 'flags': flags,
627 'timeout': timeout,
628 'conn': conn,
629 'context': None,
631 regs.append(reg)
633 self.all_registrations = regs
634 return regs
636 def close_all_registrations(self):
637 self.assertIsNotNone(self.all_registrations)
639 for reg in self.all_registrations:
640 conn = reg['conn']
641 reg_context = reg['context']
642 if reg_context is not None:
643 conn.UnRegister(reg_context)
644 reg_context = None
645 reg['context'] = reg_context
647 def open_all_registrations(self):
648 self.assertIsNotNone(self.all_registrations)
650 for reg in self.all_registrations:
651 conn = reg['conn']
652 reg_context = reg['context']
653 self.assertIsNone(reg_context)
655 reg_version = reg['version']
656 if reg_version == witness.WITNESS_V1:
657 reg_context = conn.Register(reg_version,
658 reg['net_name'],
659 reg['ip_address'],
660 reg['computer_name'])
661 elif reg_version == witness.WITNESS_V2:
662 reg_context = conn.RegisterEx(reg_version,
663 reg['net_name'],
664 reg['share_name'],
665 reg['ip_address'],
666 reg['computer_name'],
667 reg['flags'],
668 reg['timeout'])
669 self.assertIsNotNone(reg_context)
670 reg['context'] = reg_context
672 def destroy_all_registrations(self):
673 if self.all_registrations is None:
674 return
676 for reg in self.all_registrations:
677 conn = reg['conn']
678 reg_context = reg['context']
679 if reg_context is not None:
680 conn.UnRegister(reg_context)
681 reg_context = None
682 reg['context'] = reg_context
683 conn = None
684 reg['conn'] = conn
686 self.all_registrations = None
688 def assertJsonReg(self, json_reg, reg):
689 self.assertEqual(json_reg['version'], "0x%08x" % reg['version'])
690 self.assertEqual(json_reg['net_name'], reg['net_name'])
691 if reg['share_name']:
692 self.assertEqual(json_reg['share_name'], reg['share_name'])
693 else:
694 self.assertIsNone(json_reg['share_name'])
695 self.assertEqual(json_reg['client_computer_name'], reg['computer_name'])
697 self.assertIn('flags', json_reg)
698 json_flags = json_reg['flags']
699 if reg['flags'] & witness.WITNESS_REGISTER_IP_NOTIFICATION:
700 expected_ip_notifaction = True
701 else:
702 expected_ip_notifaction = False
703 self.assertEqual(json_flags['WITNESS_REGISTER_IP_NOTIFICATION'],
704 expected_ip_notifaction)
705 self.assertEqual(json_flags['int'], reg['flags'])
706 self.assertEqual(json_flags['hex'], "0x%08x" % reg['flags'])
707 self.assertEqual(len(json_flags.keys()), 3)
709 self.assertEqual(json_reg['timeout'], reg['timeout'])
711 self.assertIn('context_handle', json_reg)
712 json_context = json_reg['context_handle']
713 self.assertEqual(json_context['uuid'], str(reg['context'].uuid))
714 self.assertEqual(json_context['handle_type'], reg['context'].handle_type)
715 self.assertEqual(len(json_context.keys()), 2)
717 self.assertIn('server_id', json_reg)
718 json_server_id = json_reg['server_id']
719 self.assertIn('pid', json_server_id)
720 self.assertIn('task_id', json_server_id)
721 self.assertEqual(json_server_id['vnn'], reg['node_idx'])
722 self.assertIn('unique_id', json_server_id)
723 self.assertEqual(len(json_server_id.keys()), 4)
725 self.assertIn('auth', json_reg)
726 json_auth = json_reg['auth']
727 self.assertEqual(json_auth['account_name'], self.remote_user)
728 self.assertEqual(json_auth['domain_name'], self.remote_domain)
729 self.assertIn('account_sid', json_auth)
730 self.assertEqual(len(json_auth.keys()), 3)
732 self.assertIn('connection', json_reg)
733 json_conn = json_reg['connection']
734 self.assertIn('local_address', json_conn)
735 self.assertIn(reg['conn_ip'], json_conn['local_address'])
736 self.assertIn('remote_address', json_conn)
737 self.assertEqual(len(json_conn.keys()), 2)
739 self.assertIn('registration_time', json_reg)
741 self.assertEqual(len(json_reg.keys()), 12)
743 def max_common_prefix(self, strings):
744 if len(strings) == 0:
745 return ""
747 def string_match_len(s1, s2):
748 idx = 0
749 for i in range(0, min(len(s1), len(s2))):
750 c1 = s1[i:i+1]
751 c2 = s2[i:i+1]
752 if c1 != c2:
753 break
754 idx = i
755 return idx
757 prefix = None
758 for s in strings:
759 if prefix is None:
760 prefix = s
761 continue
762 l = string_match_len(prefix, s)
763 prefix = prefix[0:l+1]
765 return prefix
767 def check_net_witness_output(self,
768 cmd,
769 regs,
770 apply_to_all=False,
771 registration_idx=None,
772 net_name=None,
773 share_name=None,
774 ip_address=None,
775 client_computer=None,
776 new_ip=None,
777 new_node=None,
778 forced_response=None,
779 expected_msg_type=None,
780 callback=None):
781 self.open_all_registrations()
782 if registration_idx is not None:
783 registration = regs[registration_idx]['context']
784 self.assertIsNotNone(registration)
785 else:
786 registration = None
788 plain_res = self.call_net_witness_subcmd(cmd,
789 apply_to_all=apply_to_all,
790 registration=registration,
791 net_name=net_name,
792 share_name=share_name,
793 ip_address=ip_address,
794 client_computer=client_computer,
795 new_ip=new_ip,
796 new_node=new_node,
797 forced_response=forced_response)
798 if self.verbose:
799 print("%s" % plain_res)
800 plain_lines = plain_res.splitlines()
802 num_headlines = 2
803 if expected_msg_type:
804 num_headlines += 1
805 self.assertEqual(len(plain_lines), num_headlines+len(regs))
806 if expected_msg_type:
807 self.assertIn(expected_msg_type, plain_lines[0])
808 plain_lines = plain_lines[num_headlines:]
809 self.assertEqual(len(plain_lines), len(regs))
811 for reg in regs:
812 reg_uuid = reg['context'].uuid
814 expected_line = "%-36s " % reg_uuid
815 expected_line += "%-20s " % reg['net_name']
816 if reg['share_name']:
817 expected_share = reg['share_name']
818 else:
819 expected_share = "''"
820 expected_line += "%-15s " % expected_share
821 expected_line += "%-20s " % reg['ip_address']
822 expected_line += "%s" % reg['computer_name']
824 line = None
825 for l in plain_lines:
826 if not l.startswith(str(reg_uuid)):
827 continue
828 self.assertIsNone(line)
829 line = l
830 self.assertEqual(line, expected_line)
831 self.assertIsNotNone(line)
833 if callback is not None:
834 callback(reg)
836 self.close_all_registrations()
838 self.open_all_registrations()
839 if registration_idx is not None:
840 registration = regs[registration_idx]['context']
841 self.assertIsNotNone(registration)
842 else:
843 registration = None
845 json_res = self.call_net_witness_subcmd(cmd,
846 as_json=True,
847 apply_to_all=apply_to_all,
848 registration=registration,
849 net_name=net_name,
850 share_name=share_name,
851 ip_address=ip_address,
852 client_computer=client_computer,
853 new_ip=new_ip,
854 new_node=new_node,
855 forced_response=forced_response)
857 num_filters = 0
858 if apply_to_all:
859 num_filters += 1
860 if registration:
861 num_filters += 1
862 if net_name:
863 num_filters += 1
864 if share_name:
865 num_filters += 1
866 if ip_address:
867 num_filters += 1
868 if client_computer:
869 num_filters += 1
871 num_toplevel = 2
872 if expected_msg_type:
873 num_toplevel += 1
875 self.assertIn('filters', json_res);
876 if expected_msg_type:
877 self.assertIn('message', json_res);
878 self.assertIn('registrations', json_res);
879 self.assertEqual(len(json_res.keys()), num_toplevel)
881 json_filters = json_res['filters']
882 self.assertEqual(len(json_filters.keys()), num_filters)
884 if apply_to_all:
885 self.assertTrue(json_filters['--witness-apply-to-all'])
887 if registration:
888 self.assertEqual(json_filters['--witness-registration'],
889 str(registration.uuid))
890 if net_name:
891 self.assertEqual(json_filters['--witness-net-name'],
892 net_name)
893 if share_name:
894 self.assertEqual(json_filters['--witness-share-name'],
895 share_name)
896 if ip_address:
897 self.assertEqual(json_filters['--witness-ip-address'],
898 ip_address)
899 if client_computer:
900 self.assertEqual(json_filters['--witness-client-computer-name'],
901 client_computer)
902 if expected_msg_type:
903 json_message = json_res['message']
904 num_sub = 1
905 self.assertEqual(json_message['type'], expected_msg_type);
907 if new_ip is not None:
908 num_sub += 1
909 self.assertEqual(json_message['new_ip'], new_ip);
910 elif new_node == -1:
911 num_sub += 1
912 self.assertTrue(json_message['all_nodes'])
913 elif new_node is not None:
914 num_sub += 1
915 self.assertEqual(json_message['new_node'], new_node)
916 if forced_response is not None:
917 num_sub += 1
918 forced_response_json = json.loads(str(forced_response))
919 self.assertDictEqual(json_message['json'], forced_response_json)
921 self.assertEqual(len(json_message.keys()), num_sub)
923 json_regs = json_res['registrations']
924 self.assertEqual(len(json_regs.keys()), len(regs))
926 for reg in regs:
927 reg_uuid = reg['context'].uuid
929 self.assertIn(str(reg_uuid), json_regs)
930 json_reg = json_regs[str(reg_uuid)]
931 self.assertJsonReg(json_reg, reg)
933 if callback is not None:
934 callback(reg)
936 self.close_all_registrations()
938 def check_combinations(self, check_func, only_shares=False):
939 all_regs = self.prepare_all_registrations()
941 share_name_regs = {}
942 all_share_name_regs = []
943 no_share_name_regs = []
944 for reg in all_regs:
945 if reg['share_name'] is not None:
946 if reg['share_name'] not in share_name_regs:
947 share_name_regs[reg['share_name']] = []
948 share_name_regs[reg['share_name']].append(reg)
949 all_share_name_regs.append(reg)
950 else:
951 no_share_name_regs.append(reg)
953 if only_shares:
954 all_regs = all_share_name_regs
955 no_share_name_regs = []
957 ip_address_regs = {}
958 computer_name_regs = {}
959 for reg in all_regs:
960 if reg['ip_address'] not in ip_address_regs:
961 ip_address_regs[reg['ip_address']] = []
962 ip_address_regs[reg['ip_address']].append(reg)
964 if reg['computer_name'] not in computer_name_regs:
965 computer_name_regs[reg['computer_name']] = []
966 computer_name_regs[reg['computer_name']].append(reg)
968 all_share_names = '|'.join(share_name_regs.keys())
969 common_share_name = self.max_common_prefix(share_name_regs.keys())
970 all_ip_addresses = '|'.join(ip_address_regs.keys())
971 common_ip_address = self.max_common_prefix(ip_address_regs.keys())
972 all_computer_names = '|'.join(computer_name_regs.keys())
973 common_computer_name = self.max_common_prefix(computer_name_regs.keys())
975 check_func(all_regs,
976 apply_to_all=True)
977 check_func(all_regs,
978 net_name=self.server_hostname)
979 check_func(all_regs,
980 ip_address=all_ip_addresses)
981 check_func(all_regs,
982 client_computer=all_computer_names)
983 check_func(all_regs,
984 net_name=self.server_hostname,
985 ip_address=all_ip_addresses,
986 client_computer=all_computer_names)
987 check_func(all_regs,
988 net_name='.*',
989 share_name='.*',
990 ip_address='.*',
991 client_computer='.*')
992 check_func(all_regs,
993 share_name='^$|%s.*' % common_share_name,
994 ip_address='%s.*' % common_ip_address,
995 client_computer='%s.*' % common_computer_name)
996 check_func(all_share_name_regs,
997 share_name=all_share_names)
998 check_func(all_share_name_regs,
999 share_name='%s.*' % common_share_name)
1000 check_func(no_share_name_regs,
1001 share_name='^$')
1003 for share_name in share_name_regs.keys():
1004 regs = share_name_regs[share_name]
1005 check_func(regs, share_name=share_name)
1007 for ip_address in ip_address_regs.keys():
1008 regs = ip_address_regs[ip_address]
1009 check_func(regs, ip_address=ip_address)
1011 for computer_name in computer_name_regs.keys():
1012 regs = computer_name_regs[computer_name]
1013 check_func(regs, client_computer=computer_name)
1015 for reg in all_regs:
1016 regs = [reg]
1017 check_func(regs,
1018 registration_idx=0)
1019 check_func(regs,
1020 registration_idx=0,
1021 net_name=reg['net_name'],
1022 share_name=reg['share_name'],
1023 ip_address=reg['ip_address'],
1024 client_computer=reg['computer_name'])
1026 def test_net_witness_list(self):
1027 def check_list(regs,
1028 apply_to_all=False,
1029 registration_idx=None,
1030 net_name=None,
1031 share_name=None,
1032 ip_address=None,
1033 client_computer=None):
1034 # --witness-apply-to-all is not needed for 'list'
1035 apply_to_all = None
1036 return self.check_net_witness_output('list',
1037 regs,
1038 apply_to_all=apply_to_all,
1039 registration_idx=registration_idx,
1040 net_name=net_name,
1041 share_name=share_name,
1042 ip_address=ip_address,
1043 client_computer=client_computer)
1045 self.check_combinations(check_list)
1047 def _test_net_witness_generic_move(self,
1048 move_cmd,
1049 msg_type_prefix,
1050 msg_type):
1051 def _check_generic_move(regs,
1052 apply_to_all=False,
1053 registration_idx=None,
1054 net_name=None,
1055 share_name=None,
1056 ip_address=None,
1057 client_computer=None,
1058 new_ip=None,
1059 new_node=None):
1061 if new_ip:
1062 expected_msg_type = "%s_IPV4" % msg_type_prefix
1063 else:
1064 expected_msg_type = "%s_NODE" % msg_type_prefix
1066 expected_ip_list = []
1067 if new_ip:
1068 ip = { 'ipv4': str(new_ip), }
1069 expected_ip_list.append(ip)
1070 if new_node == -1:
1071 for node_idx in range(0, len(self.nodes)):
1072 node = self.nodes[node_idx]
1073 ip = { 'ipv4': str(node['ip']), }
1074 expected_ip_list.append(ip)
1075 elif new_node is not None:
1076 node = self.nodes[new_node]
1077 ip = { 'ipv4': str(node['ip']), }
1078 expected_ip_list.append(ip)
1080 expected_ip_lists = [expected_ip_list]
1082 def check_generic_move_response(reg):
1083 conn = reg['conn']
1084 reg_context = reg['context']
1085 response = conn.AsyncNotify(reg_context)
1086 self.assertGenericIpLists(response, msg_type, expected_ip_lists)
1088 return self.check_net_witness_output(move_cmd,
1089 regs,
1090 apply_to_all=apply_to_all,
1091 registration_idx=registration_idx,
1092 net_name=net_name,
1093 share_name=share_name,
1094 ip_address=ip_address,
1095 client_computer=client_computer,
1096 new_ip=new_ip,
1097 new_node=new_node,
1098 expected_msg_type=expected_msg_type,
1099 callback=check_generic_move_response)
1101 def check_generic_move(regs,
1102 apply_to_all=False,
1103 registration_idx=None,
1104 net_name=None,
1105 share_name=None,
1106 ip_address=None,
1107 client_computer=None):
1108 _check_generic_move(regs,
1109 apply_to_all=apply_to_all,
1110 registration_idx=registration_idx,
1111 net_name=net_name,
1112 share_name=share_name,
1113 ip_address=ip_address,
1114 client_computer=client_computer,
1115 new_node=-1)
1117 for node_idx in range(0, len(self.nodes)):
1118 node = self.nodes[node_idx]
1120 _check_generic_move(regs,
1121 apply_to_all=apply_to_all,
1122 registration_idx=registration_idx,
1123 net_name=net_name,
1124 share_name=share_name,
1125 ip_address=ip_address,
1126 client_computer=client_computer,
1127 new_node=node_idx)
1128 _check_generic_move(regs,
1129 apply_to_all=apply_to_all,
1130 registration_idx=registration_idx,
1131 net_name=net_name,
1132 share_name=share_name,
1133 ip_address=ip_address,
1134 client_computer=client_computer,
1135 new_ip=node['ip'])
1137 if msg_type == witness.WITNESS_NOTIFY_CLIENT_MOVE:
1138 only_shares = False
1139 elif msg_type == witness.WITNESS_NOTIFY_SHARE_MOVE:
1140 only_shares = True
1142 self.check_combinations(check_generic_move, only_shares=only_shares)
1144 def test_net_witness_client_move(self):
1145 self._test_net_witness_generic_move('client-move',
1146 'CLIENT_MOVE_TO',
1147 witness.WITNESS_NOTIFY_CLIENT_MOVE)
1148 def test_net_witness_share_move(self):
1149 self._test_net_witness_generic_move('share-move',
1150 'SHARE_MOVE_TO',
1151 witness.WITNESS_NOTIFY_SHARE_MOVE)
1153 def test_net_witness_force_unregister(self):
1154 def check_force_unregister(regs,
1155 apply_to_all=False,
1156 registration_idx=None,
1157 net_name=None,
1158 share_name=None,
1159 ip_address=None,
1160 client_computer=None):
1161 def check_force_unregister_happened(reg):
1162 conn = reg['conn']
1163 reg_context = reg['context']
1164 self.assertIsNotNone(reg_context)
1165 try:
1166 conn.UnRegister(reg_context)
1167 self.fail()
1168 except WERRORError as e:
1169 (num, string) = e.args
1170 if num != werror.WERR_NOT_FOUND:
1171 raise
1172 reg['context'] = None
1174 return self.check_net_witness_output("force-unregister",
1175 regs,
1176 apply_to_all=apply_to_all,
1177 registration_idx=registration_idx,
1178 net_name=net_name,
1179 share_name=share_name,
1180 ip_address=ip_address,
1181 client_computer=client_computer,
1182 expected_msg_type="FORCE_UNREGISTER",
1183 callback=check_force_unregister_happened)
1185 self.check_combinations(check_force_unregister)
1187 def _test_net_witness_force_response(self,
1188 msg_type=None,
1189 expected_resource_changes=None,
1190 expected_ip_lists=None):
1191 def check_force_response(regs,
1192 apply_to_all=False,
1193 registration_idx=None,
1194 net_name=None,
1195 share_name=None,
1196 ip_address=None,
1197 client_computer=None):
1198 move_types = [
1199 witness.WITNESS_NOTIFY_CLIENT_MOVE,
1200 witness.WITNESS_NOTIFY_SHARE_MOVE,
1201 witness.WITNESS_NOTIFY_IP_CHANGE,
1204 forced_response = '{ '
1205 forced_response += '"result": 0, '
1206 forced_response += '"response": { '
1207 forced_response += '"type": %u, ' % msg_type
1208 forced_response += '"messages": [ '
1209 if msg_type == witness.WITNESS_NOTIFY_RESOURCE_CHANGE:
1210 prefix_d1 = ""
1211 for rc in expected_resource_changes:
1212 forced_response += prefix_d1
1213 forced_response += '{ '
1214 prefix_d2 = ""
1215 if 'type' in rc:
1216 forced_response += prefix_d2
1217 forced_response += '"type": %u ' % rc['type']
1218 prefix_d2 = ", "
1219 if 'name' in rc:
1220 forced_response += prefix_d2
1221 forced_response += '"name": "%s" ' % rc['name']
1222 prefix_d2 = ", "
1223 forced_response += '} '
1224 prefix_d1 = ", "
1225 if msg_type in move_types:
1226 prefix_d1 = ""
1227 for ip_list in expected_ip_lists:
1228 forced_response += prefix_d1
1229 forced_response += '['
1230 prefix_d2 = ""
1231 for ip in ip_list:
1232 forced_response += prefix_d2
1233 forced_response += '{ '
1234 prefix_d3 = ""
1235 if 'flags' in ip:
1236 forced_response += prefix_d3
1237 forced_response += '"flags": %u' % ip['flags']
1238 prefix_d3 = ", "
1239 if 'ipv4' in ip:
1240 forced_response += prefix_d3
1241 forced_response += '"ipv4": "%s" ' % ip['ipv4']
1242 prefix_d3 = ", "
1243 if 'ipv6' in ip:
1244 forced_response += prefix_d3
1245 forced_response += '"ipv6": "%s" ' % ip['ipv6']
1246 prefix_d3 = ", "
1247 forced_response += '}'
1248 prefix_d2 = ", "
1249 forced_response += ']'
1250 prefix_d1 = ", "
1251 forced_response += ']'
1252 forced_response += '}'
1253 forced_response += '}'
1255 def check_forced_response_result(reg):
1256 conn = reg['conn']
1257 reg_context = reg['context']
1258 response = conn.AsyncNotify(reg_context)
1259 if msg_type == witness.WITNESS_NOTIFY_RESOURCE_CHANGE:
1260 self.assertResourceChanges(response, expected_resource_changes)
1261 if msg_type in move_types:
1262 self.assertGenericIpLists(response, msg_type, expected_ip_lists)
1264 return self.check_net_witness_output("force-response",
1265 regs,
1266 apply_to_all=apply_to_all,
1267 registration_idx=registration_idx,
1268 net_name=net_name,
1269 share_name=share_name,
1270 ip_address=ip_address,
1271 client_computer=client_computer,
1272 forced_response=forced_response,
1273 expected_msg_type="FORCE_RESPONSE",
1274 callback=check_forced_response_result)
1276 self.check_combinations(check_force_response)
1278 def test_net_witness_force_response_resource_changes(self):
1279 msg_type = witness.WITNESS_NOTIFY_RESOURCE_CHANGE
1280 expected_resource_changes = [
1282 'type': witness.WITNESS_RESOURCE_STATE_UNAVAILABLE,
1283 'name': "some-resource-name"
1286 'type': witness.WITNESS_RESOURCE_STATE_AVAILABLE,
1287 'name': "other-resource-name"
1290 self._test_net_witness_force_response(msg_type=msg_type,
1291 expected_resource_changes=expected_resource_changes)
1293 def _test_net_witness_force_response_generic_moves(self, msg_type):
1294 expected_flags = 0
1295 expected_flags |= witness.WITNESS_IPADDR_V4
1296 expected_flags |= witness.WITNESS_IPADDR_ONLINE
1298 expected_ip_list10 = [
1300 'flags': expected_flags,
1301 'ipv4': '10.0.10.1',
1304 'flags': 0,
1305 'ipv4': '10.0.10.2',
1306 'ipv6': 'fd00:0000:0000:0000:0010:0000:0010:0002',
1309 expected_ip_list20 = [
1311 'flags': expected_flags,
1312 'ipv4': '10.0.20.1',
1315 'flags': 0,
1316 'ipv4': '10.0.20.2',
1317 'ipv6': 'fd00:0000:0000:0000:0010:0000:0020:0002',
1321 expected_ip_lists = [expected_ip_list10, expected_ip_list20]
1322 self._test_net_witness_force_response(msg_type=msg_type,
1323 expected_ip_lists=expected_ip_lists)
1325 def test_net_witness_force_response_client_moves(self):
1326 msg_type = witness.WITNESS_NOTIFY_CLIENT_MOVE
1327 self._test_net_witness_force_response_generic_moves(msg_type)
1329 def test_net_witness_force_response_share_moves(self):
1330 msg_type = witness.WITNESS_NOTIFY_SHARE_MOVE
1331 self._test_net_witness_force_response_generic_moves(msg_type)
1333 def test_net_witness_force_response_ip_changes(self):
1334 msg_type = witness.WITNESS_NOTIFY_IP_CHANGE
1335 self._test_net_witness_force_response_generic_moves(msg_type)
1337 if __name__ == "__main__":
1338 import unittest
1339 unittest.main()