2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.procedure
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.mockito
.Mockito
.never
;
23 import static org
.mockito
.Mockito
.spy
;
24 import static org
.mockito
.Mockito
.times
;
25 import static org
.mockito
.Mockito
.verify
;
27 import java
.util
.ArrayList
;
28 import java
.util
.List
;
29 import java
.util
.concurrent
.CountDownLatch
;
30 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
32 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignExceptionDispatcher
;
33 import org
.apache
.hadoop
.hbase
.testclassification
.MasterTests
;
34 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
35 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
36 import org
.apache
.hadoop
.hbase
.util
.Pair
;
37 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
38 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
39 import org
.junit
.AfterClass
;
40 import org
.junit
.BeforeClass
;
41 import org
.junit
.ClassRule
;
42 import org
.junit
.Test
;
43 import org
.junit
.experimental
.categories
.Category
;
44 import org
.mockito
.Mockito
;
45 import org
.mockito
.invocation
.InvocationOnMock
;
46 import org
.mockito
.stubbing
.Answer
;
47 import org
.mockito
.verification
.VerificationMode
;
48 import org
.slf4j
.Logger
;
49 import org
.slf4j
.LoggerFactory
;
51 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
53 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
56 * Test zookeeper-based, procedure controllers
58 @Category({MasterTests
.class, MediumTests
.class})
59 public class TestZKProcedureControllers
{
62 public static final HBaseClassTestRule CLASS_RULE
=
63 HBaseClassTestRule
.forClass(TestZKProcedureControllers
.class);
65 private static final Logger LOG
= LoggerFactory
.getLogger(TestZKProcedureControllers
.class);
66 private final static HBaseTestingUtil UTIL
= new HBaseTestingUtil();
67 private static final String COHORT_NODE_NAME
= "expected";
68 private static final String CONTROLLER_NODE_NAME
= "controller";
69 private static final VerificationMode once
= Mockito
.times(1);
71 private final byte[] memberData
= Bytes
.toBytes("data from member");
74 public static void setupTest() throws Exception
{
75 UTIL
.startMiniZKCluster();
79 public static void cleanupTest() throws Exception
{
80 UTIL
.shutdownMiniZKCluster();
84 * Smaller test to just test the actuation on the cohort member
85 * @throws Exception on failure
88 public void testSimpleZKCohortMemberController() throws Exception
{
89 ZKWatcher watcher
= UTIL
.getZooKeeperWatcher();
90 final String operationName
= "instanceTest";
92 final Subprocedure sub
= Mockito
.mock(Subprocedure
.class);
93 Mockito
.when(sub
.getName()).thenReturn(operationName
);
95 final byte[] data
= new byte[] { 1, 2, 3 };
96 final CountDownLatch prepared
= new CountDownLatch(1);
97 final CountDownLatch committed
= new CountDownLatch(1);
99 final ForeignExceptionDispatcher monitor
= spy(new ForeignExceptionDispatcher());
100 final ZKProcedureMemberRpcs controller
= new ZKProcedureMemberRpcs(
101 watcher
, "testSimple");
103 // mock out cohort member callbacks
104 final ProcedureMember member
= Mockito
105 .mock(ProcedureMember
.class);
106 Mockito
.doReturn(sub
).when(member
).createSubprocedure(operationName
, data
);
107 Mockito
.doAnswer(new Answer
<Void
>() {
109 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
110 controller
.sendMemberAcquired(sub
);
111 prepared
.countDown();
114 }).when(member
).submitSubprocedure(sub
);
115 Mockito
.doAnswer(new Answer
<Void
>() {
117 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
118 controller
.sendMemberCompleted(sub
, memberData
);
119 committed
.countDown();
122 }).when(member
).receivedReachedGlobalBarrier(operationName
);
124 // start running the listener
125 controller
.start(COHORT_NODE_NAME
, member
);
127 // set a prepare node from a 'coordinator'
128 String prepare
= ZKProcedureUtil
.getAcquireBarrierNode(controller
.getZkController(), operationName
);
129 ZKUtil
.createSetData(watcher
, prepare
, ProtobufUtil
.prependPBMagic(data
));
130 // wait for the operation to be prepared
133 // create the commit node so we update the operation to enter the commit phase
134 String commit
= ZKProcedureUtil
.getReachedBarrierNode(controller
.getZkController(), operationName
);
135 LOG
.debug("Found prepared, posting commit node:" + commit
);
136 ZKUtil
.createAndFailSilent(watcher
, commit
);
137 LOG
.debug("Commit node:" + commit
+ ", exists:" + ZKUtil
.checkExists(watcher
, commit
));
140 verify(monitor
, never()).receive(Mockito
.any());
141 // XXX: broken due to composition.
142 // verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
144 // cleanup after the test
145 ZKUtil
.deleteNodeRecursively(watcher
, controller
.getZkController().getBaseZnode());
146 assertEquals("Didn't delete prepare node", -1, ZKUtil
.checkExists(watcher
, prepare
));
147 assertEquals("Didn't delete commit node", -1, ZKUtil
.checkExists(watcher
, commit
));
151 public void testZKCoordinatorControllerWithNoCohort() throws Exception
{
152 final String operationName
= "no cohort controller test";
153 final byte[] data
= new byte[] { 1, 2, 3 };
155 runMockCommitWithOrchestratedControllers(startCoordinatorFirst
, operationName
, data
);
156 runMockCommitWithOrchestratedControllers(startCohortFirst
, operationName
, data
);
160 public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception
{
161 final String operationName
= "single member controller test";
162 final byte[] data
= new byte[] { 1, 2, 3 };
164 runMockCommitWithOrchestratedControllers(startCoordinatorFirst
, operationName
, data
, "cohort");
165 runMockCommitWithOrchestratedControllers(startCohortFirst
, operationName
, data
, "cohort");
169 public void testZKCoordinatorControllerMultipleCohort() throws Exception
{
170 final String operationName
= "multi member controller test";
171 final byte[] data
= new byte[] { 1, 2, 3 };
173 runMockCommitWithOrchestratedControllers(startCoordinatorFirst
, operationName
, data
, "cohort",
174 "cohort2", "cohort3");
175 runMockCommitWithOrchestratedControllers(startCohortFirst
, operationName
, data
, "cohort",
176 "cohort2", "cohort3");
179 private void runMockCommitWithOrchestratedControllers(StartControllers controllers
,
180 String operationName
, byte[] data
, String
... cohort
) throws Exception
{
181 ZKWatcher watcher
= UTIL
.getZooKeeperWatcher();
182 List
<String
> expected
= Lists
.newArrayList(cohort
);
184 final Subprocedure sub
= Mockito
.mock(Subprocedure
.class);
185 Mockito
.when(sub
.getName()).thenReturn(operationName
);
187 CountDownLatch prepared
= new CountDownLatch(expected
.size());
188 CountDownLatch committed
= new CountDownLatch(expected
.size());
189 ArrayList
<byte[]> dataFromMembers
= new ArrayList
<>();
191 // mock out coordinator so we can keep track of zk progress
192 ProcedureCoordinator coordinator
= setupMockCoordinator(operationName
,
193 prepared
, committed
, dataFromMembers
);
195 ProcedureMember member
= Mockito
.mock(ProcedureMember
.class);
197 Pair
<ZKProcedureCoordinator
, List
<ZKProcedureMemberRpcs
>> pair
= controllers
198 .start(watcher
, operationName
, coordinator
, CONTROLLER_NODE_NAME
, member
, expected
);
199 ZKProcedureCoordinator controller
= pair
.getFirst();
200 List
<ZKProcedureMemberRpcs
> cohortControllers
= pair
.getSecond();
201 // start the operation
202 Procedure p
= Mockito
.mock(Procedure
.class);
203 Mockito
.when(p
.getName()).thenReturn(operationName
);
205 controller
.sendGlobalBarrierAcquire(p
, data
, expected
);
207 // post the prepare node for each expected node
208 for (ZKProcedureMemberRpcs cc
: cohortControllers
) {
209 cc
.sendMemberAcquired(sub
);
212 // wait for all the notifications to reach the coordinator
214 // make sure we got the all the nodes and no more
215 Mockito
.verify(coordinator
, times(expected
.size())).memberAcquiredBarrier(Mockito
.eq(operationName
),
216 Mockito
.anyString());
218 // kick off the commit phase
219 controller
.sendGlobalBarrierReached(p
, expected
);
221 // post the committed node for each expected node
222 for (ZKProcedureMemberRpcs cc
: cohortControllers
) {
223 cc
.sendMemberCompleted(sub
, memberData
);
226 // wait for all commit notifications to reach the coordinator
228 // make sure we got the all the nodes and no more
229 Mockito
.verify(coordinator
, times(expected
.size())).memberFinishedBarrier(Mockito
.eq(operationName
),
230 Mockito
.anyString(), Mockito
.eq(memberData
));
232 assertEquals("Incorrect number of members returnd data", expected
.size(),
233 dataFromMembers
.size());
234 for (byte[] result
: dataFromMembers
) {
235 assertArrayEquals("Incorrect data from member", memberData
, result
);
238 controller
.resetMembers(p
);
240 // verify all behavior
241 verifyZooKeeperClean(operationName
, watcher
, controller
.getZkProcedureUtil());
242 verifyCohort(member
, cohortControllers
.size(), operationName
, data
);
243 verifyCoordinator(operationName
, coordinator
, expected
);
246 // TODO Broken by composition.
248 // public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
249 // runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
250 // "cohort1", "cohort2");
251 // runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
252 // "cohort1", "cohort2");
255 public void runEarlyPrepareNodes(StartControllers controllers
, String operationName
, byte[] data
,
256 String
... cohort
) throws Exception
{
257 ZKWatcher watcher
= UTIL
.getZooKeeperWatcher();
258 List
<String
> expected
= Lists
.newArrayList(cohort
);
260 final Subprocedure sub
= Mockito
.mock(Subprocedure
.class);
261 Mockito
.when(sub
.getName()).thenReturn(operationName
);
263 final CountDownLatch prepared
= new CountDownLatch(expected
.size());
264 final CountDownLatch committed
= new CountDownLatch(expected
.size());
265 ArrayList
<byte[]> dataFromMembers
= new ArrayList
<>();
267 // mock out coordinator so we can keep track of zk progress
268 ProcedureCoordinator coordinator
= setupMockCoordinator(operationName
,
269 prepared
, committed
, dataFromMembers
);
271 ProcedureMember member
= Mockito
.mock(ProcedureMember
.class);
272 Procedure p
= Mockito
.mock(Procedure
.class);
273 Mockito
.when(p
.getName()).thenReturn(operationName
);
275 Pair
<ZKProcedureCoordinator
, List
<ZKProcedureMemberRpcs
>> pair
= controllers
276 .start(watcher
, operationName
, coordinator
, CONTROLLER_NODE_NAME
, member
, expected
);
277 ZKProcedureCoordinator controller
= pair
.getFirst();
278 List
<ZKProcedureMemberRpcs
> cohortControllers
= pair
.getSecond();
280 // post 1/2 the prepare nodes early
281 for (int i
= 0; i
< cohortControllers
.size() / 2; i
++) {
282 cohortControllers
.get(i
).sendMemberAcquired(sub
);
285 // start the operation
286 controller
.sendGlobalBarrierAcquire(p
, data
, expected
);
288 // post the prepare node for each expected node
289 for (ZKProcedureMemberRpcs cc
: cohortControllers
) {
290 cc
.sendMemberAcquired(sub
);
293 // wait for all the notifications to reach the coordinator
295 // make sure we got the all the nodes and no more
296 Mockito
.verify(coordinator
, times(expected
.size())).memberAcquiredBarrier(Mockito
.eq(operationName
),
297 Mockito
.anyString());
299 // kick off the commit phase
300 controller
.sendGlobalBarrierReached(p
, expected
);
302 // post the committed node for each expected node
303 for (ZKProcedureMemberRpcs cc
: cohortControllers
) {
304 cc
.sendMemberCompleted(sub
, memberData
);
307 // wait for all commit notifications to reach the coordiantor
309 // make sure we got the all the nodes and no more
310 Mockito
.verify(coordinator
, times(expected
.size())).memberFinishedBarrier(Mockito
.eq(operationName
),
311 Mockito
.anyString(), Mockito
.eq(memberData
));
313 controller
.resetMembers(p
);
315 // verify all behavior
316 verifyZooKeeperClean(operationName
, watcher
, controller
.getZkProcedureUtil());
317 verifyCohort(member
, cohortControllers
.size(), operationName
, data
);
318 verifyCoordinator(operationName
, coordinator
, expected
);
322 * @param dataFromMembers
323 * @return a mock {@link ProcedureCoordinator} that just counts down the
324 * prepared and committed latch for called to the respective method
326 private ProcedureCoordinator
setupMockCoordinator(String operationName
,
327 final CountDownLatch prepared
, final CountDownLatch committed
,
328 final ArrayList
<byte[]> dataFromMembers
) {
329 ProcedureCoordinator coordinator
= Mockito
330 .mock(ProcedureCoordinator
.class);
331 Mockito
.doAnswer(new Answer
<Void
>() {
333 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
334 prepared
.countDown();
337 }).when(coordinator
).memberAcquiredBarrier(Mockito
.eq(operationName
), Mockito
.anyString());
338 Mockito
.doAnswer(new Answer
<Void
>() {
340 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
341 dataFromMembers
.add(memberData
);
342 committed
.countDown();
345 }).when(coordinator
).memberFinishedBarrier(Mockito
.eq(operationName
), Mockito
.anyString(),
346 Mockito
.eq(memberData
));
351 * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
353 private void verifyZooKeeperClean(String operationName
, ZKWatcher watcher
,
354 ZKProcedureUtil controller
) throws Exception
{
355 String prepare
= ZKProcedureUtil
.getAcquireBarrierNode(controller
, operationName
);
356 String commit
= ZKProcedureUtil
.getReachedBarrierNode(controller
, operationName
);
357 String abort
= ZKProcedureUtil
.getAbortNode(controller
, operationName
);
358 assertEquals("Didn't delete prepare node", -1, ZKUtil
.checkExists(watcher
, prepare
));
359 assertEquals("Didn't delete commit node", -1, ZKUtil
.checkExists(watcher
, commit
));
360 assertEquals("Didn't delete abort node", -1, ZKUtil
.checkExists(watcher
, abort
));
364 * Verify the cohort controller got called once per expected node to start the operation
366 private void verifyCohort(ProcedureMember member
, int cohortSize
,
367 String operationName
, byte[] data
) {
368 // verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
369 // (byte[]) Mockito.argThat(new ArrayEquals(data)));
370 Mockito
.verify(member
,
371 Mockito
.atLeast(cohortSize
)).submitSubprocedure(Mockito
.any());
376 * Verify that the coordinator only got called once for each expected node
378 private void verifyCoordinator(String operationName
,
379 ProcedureCoordinator coordinator
, List
<String
> expected
) {
380 // verify that we got all the expected nodes
381 for (String node
: expected
) {
382 verify(coordinator
, once
).memberAcquiredBarrier(operationName
, node
);
383 verify(coordinator
, once
).memberFinishedBarrier(operationName
, node
, memberData
);
388 * Specify how the controllers that should be started (not spy/mockable) for the test.
390 private abstract class StartControllers
{
391 public abstract Pair
<ZKProcedureCoordinator
, List
<ZKProcedureMemberRpcs
>> start(
392 ZKWatcher watcher
, String operationName
,
393 ProcedureCoordinator coordinator
, String controllerName
,
394 ProcedureMember member
, List
<String
> cohortNames
) throws Exception
;
397 private final StartControllers startCoordinatorFirst
= new StartControllers() {
400 public Pair
<ZKProcedureCoordinator
, List
<ZKProcedureMemberRpcs
>> start(
401 ZKWatcher watcher
, String operationName
,
402 ProcedureCoordinator coordinator
, String controllerName
,
403 ProcedureMember member
, List
<String
> expected
) throws Exception
{
404 // start the controller
405 ZKProcedureCoordinator controller
= new ZKProcedureCoordinator(
406 watcher
, operationName
, CONTROLLER_NODE_NAME
);
407 controller
.start(coordinator
);
409 // make a cohort controller for each expected node
411 List
<ZKProcedureMemberRpcs
> cohortControllers
= new ArrayList
<>();
412 for (String nodeName
: expected
) {
413 ZKProcedureMemberRpcs cc
= new ZKProcedureMemberRpcs(watcher
, operationName
);
414 cc
.start(nodeName
, member
);
415 cohortControllers
.add(cc
);
417 return new Pair
<>(controller
, cohortControllers
);
422 * Check for the possible race condition where a cohort member starts after the controller and
423 * therefore could miss a new operation
425 private final StartControllers startCohortFirst
= new StartControllers() {
428 public Pair
<ZKProcedureCoordinator
, List
<ZKProcedureMemberRpcs
>> start(
429 ZKWatcher watcher
, String operationName
,
430 ProcedureCoordinator coordinator
, String controllerName
,
431 ProcedureMember member
, List
<String
> expected
) throws Exception
{
433 // make a cohort controller for each expected node
434 List
<ZKProcedureMemberRpcs
> cohortControllers
= new ArrayList
<>();
435 for (String nodeName
: expected
) {
436 ZKProcedureMemberRpcs cc
= new ZKProcedureMemberRpcs(watcher
, operationName
);
437 cc
.start(nodeName
, member
);
438 cohortControllers
.add(cc
);
441 // start the controller
442 ZKProcedureCoordinator controller
= new ZKProcedureCoordinator(
443 watcher
, operationName
, CONTROLLER_NODE_NAME
);
444 controller
.start(coordinator
);
446 return new Pair
<>(controller
, cohortControllers
);