2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.procedure
;
20 import static org
.mockito
.Matchers
.any
;
21 import static org
.mockito
.Matchers
.anyString
;
22 import static org
.mockito
.Mockito
.mock
;
23 import static org
.mockito
.Mockito
.never
;
24 import static org
.mockito
.Mockito
.spy
;
25 import static org
.mockito
.Mockito
.verify
;
26 import static org
.mockito
.Mockito
.when
;
28 import java
.util
.ArrayList
;
29 import java
.util
.List
;
30 import java
.util
.concurrent
.CountDownLatch
;
31 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
32 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignException
;
33 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignExceptionDispatcher
;
34 import org
.apache
.hadoop
.hbase
.testclassification
.MasterTests
;
35 import org
.apache
.hadoop
.hbase
.testclassification
.SmallTests
;
36 import org
.junit
.Before
;
37 import org
.junit
.ClassRule
;
38 import org
.junit
.Test
;
39 import org
.junit
.experimental
.categories
.Category
;
42 * Demonstrate how Procedure handles single members, multiple members, and errors semantics
44 @Category({MasterTests
.class, SmallTests
.class})
45 public class TestProcedure
{
48 public static final HBaseClassTestRule CLASS_RULE
=
49 HBaseClassTestRule
.forClass(TestProcedure
.class);
51 ProcedureCoordinator coord
;
55 coord
= mock(ProcedureCoordinator
.class);
56 final ProcedureCoordinatorRpcs comms
= mock(ProcedureCoordinatorRpcs
.class);
57 when(coord
.getRpcs()).thenReturn(comms
); // make it not null
60 static class LatchedProcedure
extends Procedure
{
61 CountDownLatch startedAcquireBarrier
= new CountDownLatch(1);
62 CountDownLatch startedDuringBarrier
= new CountDownLatch(1);
63 CountDownLatch completedProcedure
= new CountDownLatch(1);
65 public LatchedProcedure(ProcedureCoordinator coord
, ForeignExceptionDispatcher monitor
,
66 long wakeFreq
, long timeout
, String opName
, byte[] data
,
67 List
<String
> expectedMembers
) {
68 super(coord
, monitor
, wakeFreq
, timeout
, opName
, data
, expectedMembers
);
72 public void sendGlobalBarrierStart() {
73 startedAcquireBarrier
.countDown();
77 public void sendGlobalBarrierReached() {
78 startedDuringBarrier
.countDown();
82 public void sendGlobalBarrierComplete() {
83 completedProcedure
.countDown();
88 * With a single member, verify ordered execution. The Coordinator side is run in a separate
89 * thread so we can only trigger from members and wait for particular state latches.
92 public void testSingleMember() throws Exception
{
94 List
<String
> members
= new ArrayList
<>();
95 members
.add("member");
96 LatchedProcedure proc
= new LatchedProcedure(coord
, new ForeignExceptionDispatcher(), 100,
97 Integer
.MAX_VALUE
, "op", null, members
);
98 final LatchedProcedure procspy
= spy(proc
);
99 // coordinator: start the barrier procedure
107 // coordinator: wait for the barrier to be acquired, then send start barrier
108 proc
.startedAcquireBarrier
.await();
110 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
111 verify(procspy
).sendGlobalBarrierStart();
112 verify(procspy
, never()).sendGlobalBarrierReached();
113 verify(procspy
, never()).sendGlobalBarrierComplete();
114 verify(procspy
, never()).barrierAcquiredByMember(anyString());
116 // member: trigger global barrier acquisition
117 proc
.barrierAcquiredByMember(members
.get(0));
119 // coordinator: wait for global barrier to be acquired.
120 proc
.acquiredBarrierLatch
.await();
121 verify(procspy
).sendGlobalBarrierStart(); // old news
123 // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
124 // or was not called here.
126 // member: trigger global barrier release
127 proc
.barrierReleasedByMember(members
.get(0), new byte[0]);
129 // coordinator: wait for procedure to be completed
130 proc
.completedProcedure
.await();
131 verify(procspy
).sendGlobalBarrierReached();
132 verify(procspy
).sendGlobalBarrierComplete();
133 verify(procspy
, never()).receive(any());
137 public void testMultipleMember() throws Exception
{
139 List
<String
> members
= new ArrayList
<>();
140 members
.add("member1");
141 members
.add("member2");
143 LatchedProcedure proc
= new LatchedProcedure(coord
, new ForeignExceptionDispatcher(), 100,
144 Integer
.MAX_VALUE
, "op", null, members
);
145 final LatchedProcedure procspy
= spy(proc
);
146 // start the barrier procedure
154 // coordinator: wait for the barrier to be acquired, then send start barrier
155 procspy
.startedAcquireBarrier
.await();
157 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
158 verify(procspy
).sendGlobalBarrierStart();
159 verify(procspy
, never()).sendGlobalBarrierReached();
160 verify(procspy
, never()).sendGlobalBarrierComplete();
161 verify(procspy
, never()).barrierAcquiredByMember(anyString()); // no externals
163 // member0: [1/2] trigger global barrier acquisition.
164 procspy
.barrierAcquiredByMember(members
.get(0));
166 // coordinator not satisified.
167 verify(procspy
).sendGlobalBarrierStart();
168 verify(procspy
, never()).sendGlobalBarrierReached();
169 verify(procspy
, never()).sendGlobalBarrierComplete();
171 // member 1: [2/2] trigger global barrier acquisition.
172 procspy
.barrierAcquiredByMember(members
.get(1));
174 // coordinator: wait for global barrier to be acquired.
175 procspy
.startedDuringBarrier
.await();
176 verify(procspy
).sendGlobalBarrierStart(); // old news
178 // member 1, 2: trigger global barrier release
179 procspy
.barrierReleasedByMember(members
.get(0), new byte[0]);
180 procspy
.barrierReleasedByMember(members
.get(1), new byte[0]);
182 // coordinator wait for procedure to be completed
183 procspy
.completedProcedure
.await();
184 verify(procspy
).sendGlobalBarrierReached();
185 verify(procspy
).sendGlobalBarrierComplete();
186 verify(procspy
, never()).receive(any());
190 public void testErrorPropagation() throws Exception
{
191 List
<String
> members
= new ArrayList
<>();
192 members
.add("member");
193 Procedure proc
= new Procedure(coord
, new ForeignExceptionDispatcher(), 100,
194 Integer
.MAX_VALUE
, "op", null, members
);
195 final Procedure procspy
= spy(proc
);
197 ForeignException cause
= new ForeignException("SRC", "External Exception");
200 // start the barrier procedure
201 Thread t
= new Thread() {
210 verify(procspy
, never()).sendGlobalBarrierStart();
211 verify(procspy
, never()).sendGlobalBarrierReached();
212 verify(procspy
).sendGlobalBarrierComplete();
216 public void testBarrieredErrorPropagation() throws Exception
{
217 List
<String
> members
= new ArrayList
<>();
218 members
.add("member");
219 LatchedProcedure proc
= new LatchedProcedure(coord
, new ForeignExceptionDispatcher(), 100,
220 Integer
.MAX_VALUE
, "op", null, members
);
221 final LatchedProcedure procspy
= spy(proc
);
223 // start the barrier procedure
224 Thread t
= new Thread() {
232 // now test that we can put an error in before the commit phase runs
233 procspy
.startedAcquireBarrier
.await();
234 ForeignException cause
= new ForeignException("SRC", "External Exception");
235 procspy
.receive(cause
);
236 procspy
.barrierAcquiredByMember(members
.get(0));
239 // verify state of all the object
240 verify(procspy
).sendGlobalBarrierStart();
241 verify(procspy
).sendGlobalBarrierComplete();
242 verify(procspy
, never()).sendGlobalBarrierReached();