2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.procedure
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertTrue
;
22 import static org
.mockito
.Matchers
.any
;
23 import static org
.mockito
.Matchers
.anyListOf
;
24 import static org
.mockito
.Matchers
.eq
;
25 import static org
.mockito
.Mockito
.atMost
;
26 import static org
.mockito
.Mockito
.never
;
27 import static org
.mockito
.Mockito
.spy
;
28 import static org
.mockito
.Mockito
.when
;
30 import java
.io
.IOException
;
31 import java
.util
.ArrayList
;
32 import java
.util
.Arrays
;
33 import java
.util
.List
;
34 import java
.util
.concurrent
.CountDownLatch
;
35 import java
.util
.concurrent
.ThreadPoolExecutor
;
36 import java
.util
.concurrent
.atomic
.AtomicInteger
;
37 import org
.apache
.hadoop
.hbase
.Abortable
;
38 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
39 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
40 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignException
;
41 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignExceptionDispatcher
;
42 import org
.apache
.hadoop
.hbase
.errorhandling
.TimeoutException
;
43 import org
.apache
.hadoop
.hbase
.procedure
.Subprocedure
.SubprocedureImpl
;
44 import org
.apache
.hadoop
.hbase
.testclassification
.MasterTests
;
45 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
46 import org
.apache
.hadoop
.hbase
.util
.Pair
;
47 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
48 import org
.junit
.AfterClass
;
49 import org
.junit
.BeforeClass
;
50 import org
.junit
.ClassRule
;
51 import org
.junit
.Test
;
52 import org
.junit
.experimental
.categories
.Category
;
53 import org
.mockito
.Mockito
;
54 import org
.mockito
.internal
.matchers
.ArrayEquals
;
55 import org
.mockito
.invocation
.InvocationOnMock
;
56 import org
.mockito
.stubbing
.Answer
;
57 import org
.mockito
.verification
.VerificationMode
;
58 import org
.slf4j
.Logger
;
59 import org
.slf4j
.LoggerFactory
;
61 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
64 * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
66 @Category({MasterTests
.class, MediumTests
.class})
67 public class TestZKProcedure
{
70 public static final HBaseClassTestRule CLASS_RULE
=
71 HBaseClassTestRule
.forClass(TestZKProcedure
.class);
73 private static final Logger LOG
= LoggerFactory
.getLogger(TestZKProcedure
.class);
74 private static HBaseTestingUtility UTIL
= new HBaseTestingUtility();
75 private static final String COORDINATOR_NODE_NAME
= "coordinator";
76 private static final long KEEP_ALIVE
= 100; // seconds
77 private static final int POOL_SIZE
= 1;
78 private static final long TIMEOUT
= 10000; // when debugging make this larger for debugging
79 private static final long WAKE_FREQUENCY
= 500;
80 private static final String opName
= "op";
81 private static final byte[] data
= new byte[] { 1, 2 }; // TODO what is this used for?
82 private static final VerificationMode once
= Mockito
.times(1);
85 public static void setupTest() throws Exception
{
86 UTIL
.startMiniZKCluster();
90 public static void cleanupTest() throws Exception
{
91 UTIL
.shutdownMiniZKCluster();
94 private static ZKWatcher
newZooKeeperWatcher() throws IOException
{
95 return new ZKWatcher(UTIL
.getConfiguration(), "testing utility", new Abortable() {
97 public void abort(String why
, Throwable e
) {
98 throw new RuntimeException(
99 "Unexpected abort in distributed three phase commit test:" + why
, e
);
103 public boolean isAborted() {
110 public void testEmptyMemberSet() throws Exception
{
115 public void testSingleMember() throws Exception
{
120 public void testMultipleMembers() throws Exception
{
121 runCommit("one", "two", "three", "four" );
124 private void runCommit(String
... members
) throws Exception
{
125 // make sure we just have an empty list
126 if (members
== null) {
127 members
= new String
[0];
129 List
<String
> expected
= Arrays
.asList(members
);
131 // setup the constants
132 ZKWatcher coordZkw
= newZooKeeperWatcher();
133 String opDescription
= "coordination test - " + members
.length
+ " cohort members";
135 // start running the controller
136 ZKProcedureCoordinator coordinatorComms
= new ZKProcedureCoordinator(
137 coordZkw
, opDescription
, COORDINATOR_NODE_NAME
);
138 ThreadPoolExecutor pool
= ProcedureCoordinator
.defaultPool(COORDINATOR_NODE_NAME
, POOL_SIZE
, KEEP_ALIVE
);
139 ProcedureCoordinator coordinator
= new ProcedureCoordinator(coordinatorComms
, pool
) {
141 public Procedure
createProcedure(ForeignExceptionDispatcher fed
, String procName
, byte[] procArgs
,
142 List
<String
> expectedMembers
) {
143 return Mockito
.spy(super.createProcedure(fed
, procName
, procArgs
, expectedMembers
));
147 // build and start members
148 // NOTE: There is a single subprocedure builder for all members here.
149 SubprocedureFactory subprocFactory
= Mockito
.mock(SubprocedureFactory
.class);
150 List
<Pair
<ProcedureMember
, ZKProcedureMemberRpcs
>> procMembers
= new ArrayList
<>(members
.length
);
152 for (String member
: members
) {
153 ZKWatcher watcher
= newZooKeeperWatcher();
154 ZKProcedureMemberRpcs comms
= new ZKProcedureMemberRpcs(watcher
, opDescription
);
155 ThreadPoolExecutor pool2
= ProcedureMember
.defaultPool(member
, 1, KEEP_ALIVE
);
156 ProcedureMember procMember
= new ProcedureMember(comms
, pool2
, subprocFactory
);
157 procMembers
.add(new Pair
<>(procMember
, comms
));
158 comms
.start(member
, procMember
);
161 // setup mock member subprocedures
162 final List
<Subprocedure
> subprocs
= new ArrayList
<>();
163 for (int i
= 0; i
< procMembers
.size(); i
++) {
164 ForeignExceptionDispatcher cohortMonitor
= new ForeignExceptionDispatcher();
165 Subprocedure commit
= Mockito
166 .spy(new SubprocedureImpl(procMembers
.get(i
).getFirst(), opName
, cohortMonitor
,
167 WAKE_FREQUENCY
, TIMEOUT
));
168 subprocs
.add(commit
);
171 // link subprocedure to buildNewOperation invocation.
172 final AtomicInteger i
= new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
173 Mockito
.when(subprocFactory
.buildSubprocedure(Mockito
.eq(opName
),
174 (byte[]) Mockito
.argThat(new ArrayEquals(data
)))).thenAnswer(
175 new Answer
<Subprocedure
>() {
177 public Subprocedure
answer(InvocationOnMock invocation
) throws Throwable
{
178 int index
= i
.getAndIncrement();
179 LOG
.debug("Task size:" + subprocs
.size() + ", getting:" + index
);
180 Subprocedure commit
= subprocs
.get(index
);
185 // setup spying on the coordinator
186 // Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
187 // Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
189 // start running the operation
190 Procedure task
= coordinator
.startProcedure(new ForeignExceptionDispatcher(), opName
, data
, expected
);
191 // assertEquals("Didn't mock coordinator task", proc, task);
193 // verify all things ran as expected
194 // waitAndVerifyProc(proc, once, once, never(), once, false);
195 waitAndVerifyProc(task
, once
, once
, never(), once
, false);
196 verifyCohortSuccessful(expected
, subprocFactory
, subprocs
, once
, once
, never(), once
, false);
198 // close all the things
199 closeAll(coordinator
, coordinatorComms
, procMembers
);
203 * Test a distributed commit with multiple cohort members, where one of the cohort members has a
204 * timeout exception during the prepare stage.
207 public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception
{
208 String opDescription
= "error injection coordination";
209 String
[] cohortMembers
= new String
[] { "one", "two", "three" };
210 List
<String
> expected
= Lists
.newArrayList(cohortMembers
);
212 final int memberErrorIndex
= 2;
213 final CountDownLatch coordinatorReceivedErrorLatch
= new CountDownLatch(1);
215 // start running the coordinator and its controller
216 ZKWatcher coordinatorWatcher
= newZooKeeperWatcher();
217 ZKProcedureCoordinator coordinatorController
= new ZKProcedureCoordinator(
218 coordinatorWatcher
, opDescription
, COORDINATOR_NODE_NAME
);
219 ThreadPoolExecutor pool
= ProcedureCoordinator
.defaultPool(COORDINATOR_NODE_NAME
, POOL_SIZE
, KEEP_ALIVE
);
220 ProcedureCoordinator coordinator
= spy(new ProcedureCoordinator(coordinatorController
, pool
));
222 // start a member for each node
223 SubprocedureFactory subprocFactory
= Mockito
.mock(SubprocedureFactory
.class);
224 List
<Pair
<ProcedureMember
, ZKProcedureMemberRpcs
>> members
= new ArrayList
<>(expected
.size());
225 for (String member
: expected
) {
226 ZKWatcher watcher
= newZooKeeperWatcher();
227 ZKProcedureMemberRpcs controller
= new ZKProcedureMemberRpcs(watcher
, opDescription
);
228 ThreadPoolExecutor pool2
= ProcedureMember
.defaultPool(member
, 1, KEEP_ALIVE
);
229 ProcedureMember mem
= new ProcedureMember(controller
, pool2
, subprocFactory
);
230 members
.add(new Pair
<>(mem
, controller
));
231 controller
.start(member
, mem
);
234 // setup mock subprocedures
235 final List
<Subprocedure
> cohortTasks
= new ArrayList
<>();
236 final int[] elem
= new int[1];
237 for (int i
= 0; i
< members
.size(); i
++) {
238 ForeignExceptionDispatcher cohortMonitor
= new ForeignExceptionDispatcher();
239 final ProcedureMember comms
= members
.get(i
).getFirst();
240 Subprocedure commit
= Mockito
241 .spy(new SubprocedureImpl(comms
, opName
, cohortMonitor
, WAKE_FREQUENCY
, TIMEOUT
));
242 // This nasty bit has one of the impls throw a TimeoutException
243 Mockito
.doAnswer(new Answer
<Void
>() {
245 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
247 if (index
== memberErrorIndex
) {
248 LOG
.debug("Sending error to coordinator");
249 ForeignException remoteCause
= new ForeignException("TIMER",
250 new TimeoutException("subprocTimeout" , 1, 2, 0));
251 Subprocedure r
= ((Subprocedure
) invocation
.getMock());
252 LOG
.error("Remote commit failure, not propagating error:" + remoteCause
);
253 comms
.receiveAbortProcedure(r
.getName(), remoteCause
);
254 assertTrue(r
.isComplete());
255 // don't complete the error phase until the coordinator has gotten the error
256 // notification (which ensures that we never progress past prepare)
258 Procedure
.waitForLatch(coordinatorReceivedErrorLatch
, new ForeignExceptionDispatcher(),
259 WAKE_FREQUENCY
, "coordinator received error");
260 } catch (InterruptedException e
) {
261 LOG
.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch
.getCount() == 0));
262 // reset the interrupt status on the thread
263 Thread
.currentThread().interrupt();
269 }).when(commit
).acquireBarrier();
270 cohortTasks
.add(commit
);
273 // pass out a task per member
274 final AtomicInteger taskIndex
= new AtomicInteger();
276 subprocFactory
.buildSubprocedure(Mockito
.eq(opName
),
277 (byte[]) Mockito
.argThat(new ArrayEquals(data
)))).thenAnswer(
278 new Answer
<Subprocedure
>() {
280 public Subprocedure
answer(InvocationOnMock invocation
) throws Throwable
{
281 int index
= taskIndex
.getAndIncrement();
282 Subprocedure commit
= cohortTasks
.get(index
);
287 // setup spying on the coordinator
288 ForeignExceptionDispatcher coordinatorTaskErrorMonitor
= Mockito
289 .spy(new ForeignExceptionDispatcher());
290 Procedure coordinatorTask
= Mockito
.spy(new Procedure(coordinator
,
291 coordinatorTaskErrorMonitor
, WAKE_FREQUENCY
, TIMEOUT
,
292 opName
, data
, expected
));
293 when(coordinator
.createProcedure(any(), eq(opName
), eq(data
), anyListOf(String
.class)))
294 .thenReturn(coordinatorTask
);
295 // count down the error latch when we get the remote error
296 Mockito
.doAnswer(new Answer
<Void
>() {
298 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
299 // pass on the error to the master
300 invocation
.callRealMethod();
301 // then count down the got error latch
302 coordinatorReceivedErrorLatch
.countDown();
305 }).when(coordinatorTask
).receive(Mockito
.any());
307 // ----------------------------
308 // start running the operation
309 // ----------------------------
311 Procedure task
= coordinator
.startProcedure(coordinatorTaskErrorMonitor
, opName
, data
, expected
);
312 assertEquals("Didn't mock coordinator task", coordinatorTask
, task
);
314 // wait for the task to complete
316 task
.waitForCompleted();
317 } catch (ForeignException fe
) {
318 // this may get caught or may not
325 // always expect prepared, never committed, and possible to have cleanup and finish (racy since
327 waitAndVerifyProc(coordinatorTask
, once
, never(), once
, atMost(1), true);
328 verifyCohortSuccessful(expected
, subprocFactory
, cohortTasks
, once
, never(), once
,
331 // close all the open things
332 closeAll(coordinator
, coordinatorController
, members
);
336 * Wait for the coordinator task to complete, and verify all the mocks
337 * @param proc the {@link Procedure} to execute
338 * @param prepare the mock prepare
339 * @param commit the mock commit
340 * @param cleanup the mock cleanup
341 * @param finish the mock finish
342 * @param opHasError the operation error state
343 * @throws Exception on unexpected failure
345 private void waitAndVerifyProc(Procedure proc
, VerificationMode prepare
,
346 VerificationMode commit
, VerificationMode cleanup
, VerificationMode finish
, boolean opHasError
)
348 boolean caughtError
= false;
350 proc
.waitForCompleted();
351 } catch (ForeignException fe
) {
354 // make sure that the task called all the expected phases
355 Mockito
.verify(proc
, prepare
).sendGlobalBarrierStart();
356 Mockito
.verify(proc
, commit
).sendGlobalBarrierReached();
357 Mockito
.verify(proc
, finish
).sendGlobalBarrierComplete();
358 assertEquals("Operation error state was unexpected", opHasError
, proc
.getErrorMonitor()
360 assertEquals("Operation error state was unexpected", opHasError
, caughtError
);
365 * Wait for the coordinator task to complete, and verify all the mocks
366 * @param op the {@link Subprocedure} to use
367 * @param prepare the mock prepare
368 * @param commit the mock commit
369 * @param cleanup the mock cleanup
370 * @param finish the mock finish
371 * @param opHasError the operation error state
372 * @throws Exception on unexpected failure
374 private void waitAndVerifySubproc(Subprocedure op
, VerificationMode prepare
,
375 VerificationMode commit
, VerificationMode cleanup
, VerificationMode finish
, boolean opHasError
)
377 boolean caughtError
= false;
379 op
.waitForLocallyCompleted();
380 } catch (ForeignException fe
) {
383 // make sure that the task called all the expected phases
384 Mockito
.verify(op
, prepare
).acquireBarrier();
385 Mockito
.verify(op
, commit
).insideBarrier();
386 // We cannot guarantee that cleanup has run so we don't check it.
388 assertEquals("Operation error state was unexpected", opHasError
, op
.getErrorCheckable()
390 assertEquals("Operation error state was unexpected", opHasError
, caughtError
);
394 private void verifyCohortSuccessful(List
<String
> cohortNames
,
395 SubprocedureFactory subprocFactory
, Iterable
<Subprocedure
> cohortTasks
,
396 VerificationMode prepare
, VerificationMode commit
, VerificationMode cleanup
,
397 VerificationMode finish
, boolean opHasError
) throws Exception
{
399 // make sure we build the correct number of cohort members
400 Mockito
.verify(subprocFactory
, Mockito
.times(cohortNames
.size())).buildSubprocedure(
401 Mockito
.eq(opName
), (byte[]) Mockito
.argThat(new ArrayEquals(data
)));
402 // verify that we ran each of the operations cleanly
404 for (Subprocedure op
: cohortTasks
) {
405 LOG
.debug("Checking mock:" + (j
++));
406 waitAndVerifySubproc(op
, prepare
, commit
, cleanup
, finish
, opHasError
);
410 private void closeAll(
411 ProcedureCoordinator coordinator
,
412 ZKProcedureCoordinator coordinatorController
,
413 List
<Pair
<ProcedureMember
, ZKProcedureMemberRpcs
>> cohort
)
415 // make sure we close all the resources
416 for (Pair
<ProcedureMember
, ZKProcedureMemberRpcs
> member
: cohort
) {
417 member
.getFirst().close();
418 member
.getSecond().close();
421 coordinatorController
.close();