HBASE-26416 Implement a new method for region replication instead of using replay...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / procedure / TestZKProcedureControllers.java
blob9d1c4a614545fc19f6c03b67d03178e49e09f525
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.procedure;
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.spy;
24 import static org.mockito.Mockito.times;
25 import static org.mockito.Mockito.verify;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import org.apache.hadoop.hbase.HBaseClassTestRule;
31 import org.apache.hadoop.hbase.HBaseTestingUtil;
32 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
33 import org.apache.hadoop.hbase.testclassification.MasterTests;
34 import org.apache.hadoop.hbase.testclassification.MediumTests;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.Pair;
37 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
38 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
39 import org.junit.AfterClass;
40 import org.junit.BeforeClass;
41 import org.junit.ClassRule;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.mockito.Mockito;
45 import org.mockito.invocation.InvocationOnMock;
46 import org.mockito.stubbing.Answer;
47 import org.mockito.verification.VerificationMode;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
53 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
55 /**
56 * Test zookeeper-based, procedure controllers
58 @Category({MasterTests.class, MediumTests.class})
59 public class TestZKProcedureControllers {
61 @ClassRule
62 public static final HBaseClassTestRule CLASS_RULE =
63 HBaseClassTestRule.forClass(TestZKProcedureControllers.class);
65 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class);
66 private final static HBaseTestingUtil UTIL = new HBaseTestingUtil();
67 private static final String COHORT_NODE_NAME = "expected";
68 private static final String CONTROLLER_NODE_NAME = "controller";
69 private static final VerificationMode once = Mockito.times(1);
71 private final byte[] memberData = Bytes.toBytes("data from member");
73 @BeforeClass
74 public static void setupTest() throws Exception {
75 UTIL.startMiniZKCluster();
78 @AfterClass
79 public static void cleanupTest() throws Exception {
80 UTIL.shutdownMiniZKCluster();
83 /**
84 * Smaller test to just test the actuation on the cohort member
85 * @throws Exception on failure
87 @Test
88 public void testSimpleZKCohortMemberController() throws Exception {
89 ZKWatcher watcher = UTIL.getZooKeeperWatcher();
90 final String operationName = "instanceTest";
92 final Subprocedure sub = Mockito.mock(Subprocedure.class);
93 Mockito.when(sub.getName()).thenReturn(operationName);
95 final byte[] data = new byte[] { 1, 2, 3 };
96 final CountDownLatch prepared = new CountDownLatch(1);
97 final CountDownLatch committed = new CountDownLatch(1);
99 final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
100 final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
101 watcher, "testSimple");
103 // mock out cohort member callbacks
104 final ProcedureMember member = Mockito
105 .mock(ProcedureMember.class);
106 Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
107 Mockito.doAnswer(new Answer<Void>() {
108 @Override
109 public Void answer(InvocationOnMock invocation) throws Throwable {
110 controller.sendMemberAcquired(sub);
111 prepared.countDown();
112 return null;
114 }).when(member).submitSubprocedure(sub);
115 Mockito.doAnswer(new Answer<Void>() {
116 @Override
117 public Void answer(InvocationOnMock invocation) throws Throwable {
118 controller.sendMemberCompleted(sub, memberData);
119 committed.countDown();
120 return null;
122 }).when(member).receivedReachedGlobalBarrier(operationName);
124 // start running the listener
125 controller.start(COHORT_NODE_NAME, member);
127 // set a prepare node from a 'coordinator'
128 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
129 ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
130 // wait for the operation to be prepared
131 prepared.await();
133 // create the commit node so we update the operation to enter the commit phase
134 String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
135 LOG.debug("Found prepared, posting commit node:" + commit);
136 ZKUtil.createAndFailSilent(watcher, commit);
137 LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
138 committed.await();
140 verify(monitor, never()).receive(Mockito.any());
141 // XXX: broken due to composition.
142 // verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
143 // Mockito.any());
144 // cleanup after the test
145 ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
146 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
147 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
150 @Test
151 public void testZKCoordinatorControllerWithNoCohort() throws Exception {
152 final String operationName = "no cohort controller test";
153 final byte[] data = new byte[] { 1, 2, 3 };
155 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
156 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
159 @Test
160 public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
161 final String operationName = "single member controller test";
162 final byte[] data = new byte[] { 1, 2, 3 };
164 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
165 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
168 @Test
169 public void testZKCoordinatorControllerMultipleCohort() throws Exception {
170 final String operationName = "multi member controller test";
171 final byte[] data = new byte[] { 1, 2, 3 };
173 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
174 "cohort2", "cohort3");
175 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
176 "cohort2", "cohort3");
179 private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
180 String operationName, byte[] data, String... cohort) throws Exception {
181 ZKWatcher watcher = UTIL.getZooKeeperWatcher();
182 List<String> expected = Lists.newArrayList(cohort);
184 final Subprocedure sub = Mockito.mock(Subprocedure.class);
185 Mockito.when(sub.getName()).thenReturn(operationName);
187 CountDownLatch prepared = new CountDownLatch(expected.size());
188 CountDownLatch committed = new CountDownLatch(expected.size());
189 ArrayList<byte[]> dataFromMembers = new ArrayList<>();
191 // mock out coordinator so we can keep track of zk progress
192 ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
193 prepared, committed, dataFromMembers);
195 ProcedureMember member = Mockito.mock(ProcedureMember.class);
197 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers
198 .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
199 ZKProcedureCoordinator controller = pair.getFirst();
200 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
201 // start the operation
202 Procedure p = Mockito.mock(Procedure.class);
203 Mockito.when(p.getName()).thenReturn(operationName);
205 controller.sendGlobalBarrierAcquire(p, data, expected);
207 // post the prepare node for each expected node
208 for (ZKProcedureMemberRpcs cc : cohortControllers) {
209 cc.sendMemberAcquired(sub);
212 // wait for all the notifications to reach the coordinator
213 prepared.await();
214 // make sure we got the all the nodes and no more
215 Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
216 Mockito.anyString());
218 // kick off the commit phase
219 controller.sendGlobalBarrierReached(p, expected);
221 // post the committed node for each expected node
222 for (ZKProcedureMemberRpcs cc : cohortControllers) {
223 cc.sendMemberCompleted(sub, memberData);
226 // wait for all commit notifications to reach the coordinator
227 committed.await();
228 // make sure we got the all the nodes and no more
229 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
230 Mockito.anyString(), Mockito.eq(memberData));
232 assertEquals("Incorrect number of members returnd data", expected.size(),
233 dataFromMembers.size());
234 for (byte[] result : dataFromMembers) {
235 assertArrayEquals("Incorrect data from member", memberData, result);
238 controller.resetMembers(p);
240 // verify all behavior
241 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
242 verifyCohort(member, cohortControllers.size(), operationName, data);
243 verifyCoordinator(operationName, coordinator, expected);
246 // TODO Broken by composition.
247 // @Test
248 // public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
249 // runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
250 // "cohort1", "cohort2");
251 // runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
252 // "cohort1", "cohort2");
253 // }
255 public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
256 String... cohort) throws Exception {
257 ZKWatcher watcher = UTIL.getZooKeeperWatcher();
258 List<String> expected = Lists.newArrayList(cohort);
260 final Subprocedure sub = Mockito.mock(Subprocedure.class);
261 Mockito.when(sub.getName()).thenReturn(operationName);
263 final CountDownLatch prepared = new CountDownLatch(expected.size());
264 final CountDownLatch committed = new CountDownLatch(expected.size());
265 ArrayList<byte[]> dataFromMembers = new ArrayList<>();
267 // mock out coordinator so we can keep track of zk progress
268 ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
269 prepared, committed, dataFromMembers);
271 ProcedureMember member = Mockito.mock(ProcedureMember.class);
272 Procedure p = Mockito.mock(Procedure.class);
273 Mockito.when(p.getName()).thenReturn(operationName);
275 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers
276 .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
277 ZKProcedureCoordinator controller = pair.getFirst();
278 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
280 // post 1/2 the prepare nodes early
281 for (int i = 0; i < cohortControllers.size() / 2; i++) {
282 cohortControllers.get(i).sendMemberAcquired(sub);
285 // start the operation
286 controller.sendGlobalBarrierAcquire(p, data, expected);
288 // post the prepare node for each expected node
289 for (ZKProcedureMemberRpcs cc : cohortControllers) {
290 cc.sendMemberAcquired(sub);
293 // wait for all the notifications to reach the coordinator
294 prepared.await();
295 // make sure we got the all the nodes and no more
296 Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
297 Mockito.anyString());
299 // kick off the commit phase
300 controller.sendGlobalBarrierReached(p, expected);
302 // post the committed node for each expected node
303 for (ZKProcedureMemberRpcs cc : cohortControllers) {
304 cc.sendMemberCompleted(sub, memberData);
307 // wait for all commit notifications to reach the coordiantor
308 committed.await();
309 // make sure we got the all the nodes and no more
310 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
311 Mockito.anyString(), Mockito.eq(memberData));
313 controller.resetMembers(p);
315 // verify all behavior
316 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
317 verifyCohort(member, cohortControllers.size(), operationName, data);
318 verifyCoordinator(operationName, coordinator, expected);
322 * @param dataFromMembers
323 * @return a mock {@link ProcedureCoordinator} that just counts down the
324 * prepared and committed latch for called to the respective method
326 private ProcedureCoordinator setupMockCoordinator(String operationName,
327 final CountDownLatch prepared, final CountDownLatch committed,
328 final ArrayList<byte[]> dataFromMembers) {
329 ProcedureCoordinator coordinator = Mockito
330 .mock(ProcedureCoordinator.class);
331 Mockito.doAnswer(new Answer<Void>() {
332 @Override
333 public Void answer(InvocationOnMock invocation) throws Throwable {
334 prepared.countDown();
335 return null;
337 }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
338 Mockito.doAnswer(new Answer<Void>() {
339 @Override
340 public Void answer(InvocationOnMock invocation) throws Throwable {
341 dataFromMembers.add(memberData);
342 committed.countDown();
343 return null;
345 }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(),
346 Mockito.eq(memberData));
347 return coordinator;
351 * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
353 private void verifyZooKeeperClean(String operationName, ZKWatcher watcher,
354 ZKProcedureUtil controller) throws Exception {
355 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
356 String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
357 String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
358 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
359 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
360 assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
364 * Verify the cohort controller got called once per expected node to start the operation
366 private void verifyCohort(ProcedureMember member, int cohortSize,
367 String operationName, byte[] data) {
368 // verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
369 // (byte[]) Mockito.argThat(new ArrayEquals(data)));
370 Mockito.verify(member,
371 Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any());
376 * Verify that the coordinator only got called once for each expected node
378 private void verifyCoordinator(String operationName,
379 ProcedureCoordinator coordinator, List<String> expected) {
380 // verify that we got all the expected nodes
381 for (String node : expected) {
382 verify(coordinator, once).memberAcquiredBarrier(operationName, node);
383 verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData);
388 * Specify how the controllers that should be started (not spy/mockable) for the test.
390 private abstract class StartControllers {
391 public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
392 ZKWatcher watcher, String operationName,
393 ProcedureCoordinator coordinator, String controllerName,
394 ProcedureMember member, List<String> cohortNames) throws Exception;
397 private final StartControllers startCoordinatorFirst = new StartControllers() {
399 @Override
400 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
401 ZKWatcher watcher, String operationName,
402 ProcedureCoordinator coordinator, String controllerName,
403 ProcedureMember member, List<String> expected) throws Exception {
404 // start the controller
405 ZKProcedureCoordinator controller = new ZKProcedureCoordinator(
406 watcher, operationName, CONTROLLER_NODE_NAME);
407 controller.start(coordinator);
409 // make a cohort controller for each expected node
411 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
412 for (String nodeName : expected) {
413 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
414 cc.start(nodeName, member);
415 cohortControllers.add(cc);
417 return new Pair<>(controller, cohortControllers);
422 * Check for the possible race condition where a cohort member starts after the controller and
423 * therefore could miss a new operation
425 private final StartControllers startCohortFirst = new StartControllers() {
427 @Override
428 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
429 ZKWatcher watcher, String operationName,
430 ProcedureCoordinator coordinator, String controllerName,
431 ProcedureMember member, List<String> expected) throws Exception {
433 // make a cohort controller for each expected node
434 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
435 for (String nodeName : expected) {
436 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
437 cc.start(nodeName, member);
438 cohortControllers.add(cc);
441 // start the controller
442 ZKProcedureCoordinator controller = new ZKProcedureCoordinator(
443 watcher, operationName, CONTROLLER_NODE_NAME);
444 controller.start(coordinator);
446 return new Pair<>(controller, cohortControllers);