HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / procedure / TestZKProcedure.java
blob0266851ab39e4eae5373d1903e6454434cf613f7
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.procedure;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.mockito.Matchers.any;
23 import static org.mockito.Matchers.anyListOf;
24 import static org.mockito.Matchers.eq;
25 import static org.mockito.Mockito.atMost;
26 import static org.mockito.Mockito.never;
27 import static org.mockito.Mockito.spy;
28 import static org.mockito.Mockito.when;
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.List;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import org.apache.hadoop.hbase.Abortable;
38 import org.apache.hadoop.hbase.HBaseClassTestRule;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.errorhandling.ForeignException;
41 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
42 import org.apache.hadoop.hbase.errorhandling.TimeoutException;
43 import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
44 import org.apache.hadoop.hbase.testclassification.MasterTests;
45 import org.apache.hadoop.hbase.testclassification.MediumTests;
46 import org.apache.hadoop.hbase.util.Pair;
47 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
48 import org.junit.AfterClass;
49 import org.junit.BeforeClass;
50 import org.junit.ClassRule;
51 import org.junit.Test;
52 import org.junit.experimental.categories.Category;
53 import org.mockito.Mockito;
54 import org.mockito.internal.matchers.ArrayEquals;
55 import org.mockito.invocation.InvocationOnMock;
56 import org.mockito.stubbing.Answer;
57 import org.mockito.verification.VerificationMode;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
63 /**
64 * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
66 @Category({MasterTests.class, MediumTests.class})
67 public class TestZKProcedure {
69 @ClassRule
70 public static final HBaseClassTestRule CLASS_RULE =
71 HBaseClassTestRule.forClass(TestZKProcedure.class);
73 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedure.class);
74 private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
75 private static final String COORDINATOR_NODE_NAME = "coordinator";
76 private static final long KEEP_ALIVE = 100; // seconds
77 private static final int POOL_SIZE = 1;
78 private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
79 private static final long WAKE_FREQUENCY = 500;
80 private static final String opName = "op";
81 private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
82 private static final VerificationMode once = Mockito.times(1);
84 @BeforeClass
85 public static void setupTest() throws Exception {
86 UTIL.startMiniZKCluster();
89 @AfterClass
90 public static void cleanupTest() throws Exception {
91 UTIL.shutdownMiniZKCluster();
94 private static ZKWatcher newZooKeeperWatcher() throws IOException {
95 return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
96 @Override
97 public void abort(String why, Throwable e) {
98 throw new RuntimeException(
99 "Unexpected abort in distributed three phase commit test:" + why, e);
102 @Override
103 public boolean isAborted() {
104 return false;
109 @Test
110 public void testEmptyMemberSet() throws Exception {
111 runCommit();
114 @Test
115 public void testSingleMember() throws Exception {
116 runCommit("one");
119 @Test
120 public void testMultipleMembers() throws Exception {
121 runCommit("one", "two", "three", "four" );
124 private void runCommit(String... members) throws Exception {
125 // make sure we just have an empty list
126 if (members == null) {
127 members = new String[0];
129 List<String> expected = Arrays.asList(members);
131 // setup the constants
132 ZKWatcher coordZkw = newZooKeeperWatcher();
133 String opDescription = "coordination test - " + members.length + " cohort members";
135 // start running the controller
136 ZKProcedureCoordinator coordinatorComms = new ZKProcedureCoordinator(
137 coordZkw, opDescription, COORDINATOR_NODE_NAME);
138 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
139 ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
140 @Override
141 public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
142 List<String> expectedMembers) {
143 return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
147 // build and start members
148 // NOTE: There is a single subprocedure builder for all members here.
149 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
150 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<>(members.length);
151 // start each member
152 for (String member : members) {
153 ZKWatcher watcher = newZooKeeperWatcher();
154 ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
155 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
156 ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
157 procMembers.add(new Pair<>(procMember, comms));
158 comms.start(member, procMember);
161 // setup mock member subprocedures
162 final List<Subprocedure> subprocs = new ArrayList<>();
163 for (int i = 0; i < procMembers.size(); i++) {
164 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
165 Subprocedure commit = Mockito
166 .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
167 WAKE_FREQUENCY, TIMEOUT));
168 subprocs.add(commit);
171 // link subprocedure to buildNewOperation invocation.
172 final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
173 Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
174 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
175 new Answer<Subprocedure>() {
176 @Override
177 public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
178 int index = i.getAndIncrement();
179 LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
180 Subprocedure commit = subprocs.get(index);
181 return commit;
185 // setup spying on the coordinator
186 // Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
187 // Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
189 // start running the operation
190 Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
191 // assertEquals("Didn't mock coordinator task", proc, task);
193 // verify all things ran as expected
194 // waitAndVerifyProc(proc, once, once, never(), once, false);
195 waitAndVerifyProc(task, once, once, never(), once, false);
196 verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
198 // close all the things
199 closeAll(coordinator, coordinatorComms, procMembers);
203 * Test a distributed commit with multiple cohort members, where one of the cohort members has a
204 * timeout exception during the prepare stage.
206 @Test
207 public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
208 String opDescription = "error injection coordination";
209 String[] cohortMembers = new String[] { "one", "two", "three" };
210 List<String> expected = Lists.newArrayList(cohortMembers);
211 // error constants
212 final int memberErrorIndex = 2;
213 final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
215 // start running the coordinator and its controller
216 ZKWatcher coordinatorWatcher = newZooKeeperWatcher();
217 ZKProcedureCoordinator coordinatorController = new ZKProcedureCoordinator(
218 coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
219 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
220 ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
222 // start a member for each node
223 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
224 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<>(expected.size());
225 for (String member : expected) {
226 ZKWatcher watcher = newZooKeeperWatcher();
227 ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
228 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
229 ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
230 members.add(new Pair<>(mem, controller));
231 controller.start(member, mem);
234 // setup mock subprocedures
235 final List<Subprocedure> cohortTasks = new ArrayList<>();
236 final int[] elem = new int[1];
237 for (int i = 0; i < members.size(); i++) {
238 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
239 final ProcedureMember comms = members.get(i).getFirst();
240 Subprocedure commit = Mockito
241 .spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
242 // This nasty bit has one of the impls throw a TimeoutException
243 Mockito.doAnswer(new Answer<Void>() {
244 @Override
245 public Void answer(InvocationOnMock invocation) throws Throwable {
246 int index = elem[0];
247 if (index == memberErrorIndex) {
248 LOG.debug("Sending error to coordinator");
249 ForeignException remoteCause = new ForeignException("TIMER",
250 new TimeoutException("subprocTimeout" , 1, 2, 0));
251 Subprocedure r = ((Subprocedure) invocation.getMock());
252 LOG.error("Remote commit failure, not propagating error:" + remoteCause);
253 comms.receiveAbortProcedure(r.getName(), remoteCause);
254 assertTrue(r.isComplete());
255 // don't complete the error phase until the coordinator has gotten the error
256 // notification (which ensures that we never progress past prepare)
257 try {
258 Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
259 WAKE_FREQUENCY, "coordinator received error");
260 } catch (InterruptedException e) {
261 LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
262 // reset the interrupt status on the thread
263 Thread.currentThread().interrupt();
266 elem[0] = ++index;
267 return null;
269 }).when(commit).acquireBarrier();
270 cohortTasks.add(commit);
273 // pass out a task per member
274 final AtomicInteger taskIndex = new AtomicInteger();
275 Mockito.when(
276 subprocFactory.buildSubprocedure(Mockito.eq(opName),
277 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
278 new Answer<Subprocedure>() {
279 @Override
280 public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
281 int index = taskIndex.getAndIncrement();
282 Subprocedure commit = cohortTasks.get(index);
283 return commit;
287 // setup spying on the coordinator
288 ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
289 .spy(new ForeignExceptionDispatcher());
290 Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
291 coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
292 opName, data, expected));
293 when(coordinator.createProcedure(any(), eq(opName), eq(data), anyListOf(String.class)))
294 .thenReturn(coordinatorTask);
295 // count down the error latch when we get the remote error
296 Mockito.doAnswer(new Answer<Void>() {
297 @Override
298 public Void answer(InvocationOnMock invocation) throws Throwable {
299 // pass on the error to the master
300 invocation.callRealMethod();
301 // then count down the got error latch
302 coordinatorReceivedErrorLatch.countDown();
303 return null;
305 }).when(coordinatorTask).receive(Mockito.any());
307 // ----------------------------
308 // start running the operation
309 // ----------------------------
311 Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
312 assertEquals("Didn't mock coordinator task", coordinatorTask, task);
314 // wait for the task to complete
315 try {
316 task.waitForCompleted();
317 } catch (ForeignException fe) {
318 // this may get caught or may not
321 // -------------
322 // verification
323 // -------------
325 // always expect prepared, never committed, and possible to have cleanup and finish (racy since
326 // error case)
327 waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true);
328 verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
329 once, true);
331 // close all the open things
332 closeAll(coordinator, coordinatorController, members);
336 * Wait for the coordinator task to complete, and verify all the mocks
337 * @param proc the {@link Procedure} to execute
338 * @param prepare the mock prepare
339 * @param commit the mock commit
340 * @param cleanup the mock cleanup
341 * @param finish the mock finish
342 * @param opHasError the operation error state
343 * @throws Exception on unexpected failure
345 private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
346 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
347 throws Exception {
348 boolean caughtError = false;
349 try {
350 proc.waitForCompleted();
351 } catch (ForeignException fe) {
352 caughtError = true;
354 // make sure that the task called all the expected phases
355 Mockito.verify(proc, prepare).sendGlobalBarrierStart();
356 Mockito.verify(proc, commit).sendGlobalBarrierReached();
357 Mockito.verify(proc, finish).sendGlobalBarrierComplete();
358 assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
359 .hasException());
360 assertEquals("Operation error state was unexpected", opHasError, caughtError);
365 * Wait for the coordinator task to complete, and verify all the mocks
366 * @param op the {@link Subprocedure} to use
367 * @param prepare the mock prepare
368 * @param commit the mock commit
369 * @param cleanup the mock cleanup
370 * @param finish the mock finish
371 * @param opHasError the operation error state
372 * @throws Exception on unexpected failure
374 private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
375 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
376 throws Exception {
377 boolean caughtError = false;
378 try {
379 op.waitForLocallyCompleted();
380 } catch (ForeignException fe) {
381 caughtError = true;
383 // make sure that the task called all the expected phases
384 Mockito.verify(op, prepare).acquireBarrier();
385 Mockito.verify(op, commit).insideBarrier();
386 // We cannot guarantee that cleanup has run so we don't check it.
388 assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
389 .hasException());
390 assertEquals("Operation error state was unexpected", opHasError, caughtError);
394 private void verifyCohortSuccessful(List<String> cohortNames,
395 SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
396 VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
397 VerificationMode finish, boolean opHasError) throws Exception {
399 // make sure we build the correct number of cohort members
400 Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
401 Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
402 // verify that we ran each of the operations cleanly
403 int j = 0;
404 for (Subprocedure op : cohortTasks) {
405 LOG.debug("Checking mock:" + (j++));
406 waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
410 private void closeAll(
411 ProcedureCoordinator coordinator,
412 ZKProcedureCoordinator coordinatorController,
413 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
414 throws IOException {
415 // make sure we close all the resources
416 for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
417 member.getFirst().close();
418 member.getSecond().close();
420 coordinator.close();
421 coordinatorController.close();