HBASE-26416 Implement a new method for region replication instead of using replay...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / procedure / TestProcedure.java
blobdef375d12f547b8fc894e55db7835f38bd53482c
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.procedure;
20 import static org.mockito.Matchers.any;
21 import static org.mockito.Matchers.anyString;
22 import static org.mockito.Mockito.mock;
23 import static org.mockito.Mockito.never;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.Mockito.when;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.CountDownLatch;
31 import org.apache.hadoop.hbase.HBaseClassTestRule;
32 import org.apache.hadoop.hbase.errorhandling.ForeignException;
33 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
34 import org.apache.hadoop.hbase.testclassification.MasterTests;
35 import org.apache.hadoop.hbase.testclassification.SmallTests;
36 import org.junit.Before;
37 import org.junit.ClassRule;
38 import org.junit.Test;
39 import org.junit.experimental.categories.Category;
41 /**
42 * Demonstrate how Procedure handles single members, multiple members, and errors semantics
44 @Category({MasterTests.class, SmallTests.class})
45 public class TestProcedure {
47 @ClassRule
48 public static final HBaseClassTestRule CLASS_RULE =
49 HBaseClassTestRule.forClass(TestProcedure.class);
51 ProcedureCoordinator coord;
53 @Before
54 public void setup() {
55 coord = mock(ProcedureCoordinator.class);
56 final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class);
57 when(coord.getRpcs()).thenReturn(comms); // make it not null
60 static class LatchedProcedure extends Procedure {
61 CountDownLatch startedAcquireBarrier = new CountDownLatch(1);
62 CountDownLatch startedDuringBarrier = new CountDownLatch(1);
63 CountDownLatch completedProcedure = new CountDownLatch(1);
65 public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor,
66 long wakeFreq, long timeout, String opName, byte[] data,
67 List<String> expectedMembers) {
68 super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers);
71 @Override
72 public void sendGlobalBarrierStart() {
73 startedAcquireBarrier.countDown();
76 @Override
77 public void sendGlobalBarrierReached() {
78 startedDuringBarrier.countDown();
81 @Override
82 public void sendGlobalBarrierComplete() {
83 completedProcedure.countDown();
87 /**
88 * With a single member, verify ordered execution. The Coordinator side is run in a separate
89 * thread so we can only trigger from members and wait for particular state latches.
91 @Test
92 public void testSingleMember() throws Exception {
93 // The member
94 List<String> members = new ArrayList<>();
95 members.add("member");
96 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
97 Integer.MAX_VALUE, "op", null, members);
98 final LatchedProcedure procspy = spy(proc);
99 // coordinator: start the barrier procedure
100 new Thread() {
101 @Override
102 public void run() {
103 procspy.call();
105 }.start();
107 // coordinator: wait for the barrier to be acquired, then send start barrier
108 proc.startedAcquireBarrier.await();
110 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
111 verify(procspy).sendGlobalBarrierStart();
112 verify(procspy, never()).sendGlobalBarrierReached();
113 verify(procspy, never()).sendGlobalBarrierComplete();
114 verify(procspy, never()).barrierAcquiredByMember(anyString());
116 // member: trigger global barrier acquisition
117 proc.barrierAcquiredByMember(members.get(0));
119 // coordinator: wait for global barrier to be acquired.
120 proc.acquiredBarrierLatch.await();
121 verify(procspy).sendGlobalBarrierStart(); // old news
123 // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
124 // or was not called here.
126 // member: trigger global barrier release
127 proc.barrierReleasedByMember(members.get(0), new byte[0]);
129 // coordinator: wait for procedure to be completed
130 proc.completedProcedure.await();
131 verify(procspy).sendGlobalBarrierReached();
132 verify(procspy).sendGlobalBarrierComplete();
133 verify(procspy, never()).receive(any());
136 @Test
137 public void testMultipleMember() throws Exception {
138 // 2 members
139 List<String> members = new ArrayList<>();
140 members.add("member1");
141 members.add("member2");
143 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
144 Integer.MAX_VALUE, "op", null, members);
145 final LatchedProcedure procspy = spy(proc);
146 // start the barrier procedure
147 new Thread() {
148 @Override
149 public void run() {
150 procspy.call();
152 }.start();
154 // coordinator: wait for the barrier to be acquired, then send start barrier
155 procspy.startedAcquireBarrier.await();
157 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
158 verify(procspy).sendGlobalBarrierStart();
159 verify(procspy, never()).sendGlobalBarrierReached();
160 verify(procspy, never()).sendGlobalBarrierComplete();
161 verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals
163 // member0: [1/2] trigger global barrier acquisition.
164 procspy.barrierAcquiredByMember(members.get(0));
166 // coordinator not satisified.
167 verify(procspy).sendGlobalBarrierStart();
168 verify(procspy, never()).sendGlobalBarrierReached();
169 verify(procspy, never()).sendGlobalBarrierComplete();
171 // member 1: [2/2] trigger global barrier acquisition.
172 procspy.barrierAcquiredByMember(members.get(1));
174 // coordinator: wait for global barrier to be acquired.
175 procspy.startedDuringBarrier.await();
176 verify(procspy).sendGlobalBarrierStart(); // old news
178 // member 1, 2: trigger global barrier release
179 procspy.barrierReleasedByMember(members.get(0), new byte[0]);
180 procspy.barrierReleasedByMember(members.get(1), new byte[0]);
182 // coordinator wait for procedure to be completed
183 procspy.completedProcedure.await();
184 verify(procspy).sendGlobalBarrierReached();
185 verify(procspy).sendGlobalBarrierComplete();
186 verify(procspy, never()).receive(any());
189 @Test
190 public void testErrorPropagation() throws Exception {
191 List<String> members = new ArrayList<>();
192 members.add("member");
193 Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100,
194 Integer.MAX_VALUE, "op", null, members);
195 final Procedure procspy = spy(proc);
197 ForeignException cause = new ForeignException("SRC", "External Exception");
198 proc.receive(cause);
200 // start the barrier procedure
201 Thread t = new Thread() {
202 @Override
203 public void run() {
204 procspy.call();
207 t.start();
208 t.join();
210 verify(procspy, never()).sendGlobalBarrierStart();
211 verify(procspy, never()).sendGlobalBarrierReached();
212 verify(procspy).sendGlobalBarrierComplete();
215 @Test
216 public void testBarrieredErrorPropagation() throws Exception {
217 List<String> members = new ArrayList<>();
218 members.add("member");
219 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
220 Integer.MAX_VALUE, "op", null, members);
221 final LatchedProcedure procspy = spy(proc);
223 // start the barrier procedure
224 Thread t = new Thread() {
225 @Override
226 public void run() {
227 procspy.call();
230 t.start();
232 // now test that we can put an error in before the commit phase runs
233 procspy.startedAcquireBarrier.await();
234 ForeignException cause = new ForeignException("SRC", "External Exception");
235 procspy.receive(cause);
236 procspy.barrierAcquiredByMember(members.get(0));
237 t.join();
239 // verify state of all the object
240 verify(procspy).sendGlobalBarrierStart();
241 verify(procspy).sendGlobalBarrierComplete();
242 verify(procspy, never()).sendGlobalBarrierReached();