2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.master
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_COORDINATED_BY_ZK
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_MAX_SPLITTER
;
22 import static org
.apache
.hadoop
.hbase
.master
.procedure
.ServerProcedureInterface
.ServerOperationType
.SPLIT_WAL
;
23 import java
.io
.IOException
;
24 import java
.util
.ArrayList
;
25 import java
.util
.List
;
26 import java
.util
.concurrent
.CountDownLatch
;
27 import org
.apache
.hadoop
.fs
.FileStatus
;
28 import org
.apache
.hadoop
.fs
.FileSystem
;
29 import org
.apache
.hadoop
.fs
.Path
;
30 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.ServerName
;
34 import org
.apache
.hadoop
.hbase
.TableName
;
35 import org
.apache
.hadoop
.hbase
.master
.procedure
.MasterProcedureEnv
;
36 import org
.apache
.hadoop
.hbase
.master
.procedure
.ServerProcedureInterface
;
37 import org
.apache
.hadoop
.hbase
.procedure2
.Procedure
;
38 import org
.apache
.hadoop
.hbase
.procedure2
.ProcedureExecutor
;
39 import org
.apache
.hadoop
.hbase
.procedure2
.ProcedureStateSerializer
;
40 import org
.apache
.hadoop
.hbase
.procedure2
.ProcedureSuspendedException
;
41 import org
.apache
.hadoop
.hbase
.procedure2
.ProcedureTestingUtility
;
42 import org
.apache
.hadoop
.hbase
.procedure2
.ProcedureYieldException
;
43 import org
.apache
.hadoop
.hbase
.procedure2
.StateMachineProcedure
;
44 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
45 import org
.apache
.hadoop
.hbase
.testclassification
.MasterTests
;
46 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
47 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
48 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
49 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
50 import org
.junit
.After
;
51 import org
.junit
.Assert
;
52 import org
.junit
.Before
;
53 import org
.junit
.ClassRule
;
54 import org
.junit
.Test
;
55 import org
.junit
.experimental
.categories
.Category
;
56 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
57 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
58 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProcedureProtos
;
59 import org
.slf4j
.Logger
;
60 import org
.slf4j
.LoggerFactory
;
62 @Category({ MasterTests
.class, LargeTests
.class })
64 public class TestSplitWALManager
{
67 public static final HBaseClassTestRule CLASS_RULE
=
68 HBaseClassTestRule
.forClass(TestSplitWALManager
.class);
70 private static final Logger LOG
= LoggerFactory
.getLogger(TestSplitWALManager
.class);
71 private static HBaseTestingUtil TEST_UTIL
;
72 private HMaster master
;
73 private SplitWALManager splitWALManager
;
74 private TableName TABLE_NAME
;
75 private byte[] FAMILY
;
78 public void setup() throws Exception
{
79 TEST_UTIL
= new HBaseTestingUtil();
80 TEST_UTIL
.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK
, false);
81 TEST_UTIL
.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER
, 1);
82 TEST_UTIL
.startMiniCluster(3);
83 master
= TEST_UTIL
.getHBaseCluster().getMaster();
84 splitWALManager
= master
.getSplitWALManager();
85 TABLE_NAME
= TableName
.valueOf(Bytes
.toBytes("TestSplitWALManager"));
86 FAMILY
= Bytes
.toBytes("test");
90 public void teardown() throws Exception
{
91 TEST_UTIL
.shutdownMiniCluster();
95 public void testAcquireAndRelease() throws Exception
{
96 List
<FakeServerProcedure
> testProcedures
= new ArrayList
<>();
97 for (int i
= 0; i
< 4; i
++) {
98 testProcedures
.add(new FakeServerProcedure(
99 TEST_UTIL
.getHBaseCluster().getServerHoldingMeta()));
101 ServerName server
= splitWALManager
.acquireSplitWALWorker(testProcedures
.get(0));
102 Assert
.assertNotNull(server
);
103 Assert
.assertNotNull(splitWALManager
.acquireSplitWALWorker(testProcedures
.get(1)));
104 Assert
.assertNotNull(splitWALManager
.acquireSplitWALWorker(testProcedures
.get(2)));
108 splitWALManager
.acquireSplitWALWorker(testProcedures
.get(3));
109 } catch (ProcedureSuspendedException suspendException
) {
110 e
= suspendException
;
112 Assert
.assertNotNull(e
);
113 Assert
.assertTrue(e
instanceof ProcedureSuspendedException
);
115 splitWALManager
.releaseSplitWALWorker(server
, TEST_UTIL
.getHBaseCluster().getMaster()
116 .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
117 Assert
.assertNotNull(splitWALManager
.acquireSplitWALWorker(testProcedures
.get(3)));
121 public void testAddNewServer() throws Exception
{
122 List
<FakeServerProcedure
> testProcedures
= new ArrayList
<>();
123 for (int i
= 0; i
< 4; i
++) {
124 testProcedures
.add(new FakeServerProcedure(
125 TEST_UTIL
.getHBaseCluster().getServerHoldingMeta()));
127 ServerName server
= splitWALManager
.acquireSplitWALWorker(testProcedures
.get(0));
128 Assert
.assertNotNull(server
);
129 Assert
.assertNotNull(splitWALManager
.acquireSplitWALWorker(testProcedures
.get(1)));
130 Assert
.assertNotNull(splitWALManager
.acquireSplitWALWorker(testProcedures
.get(2)));
134 splitWALManager
.acquireSplitWALWorker(testProcedures
.get(3));
135 } catch (ProcedureSuspendedException suspendException
) {
136 e
= suspendException
;
138 Assert
.assertNotNull(e
);
139 Assert
.assertTrue(e
instanceof ProcedureSuspendedException
);
141 JVMClusterUtil
.RegionServerThread newServer
= TEST_UTIL
.getHBaseCluster().startRegionServer();
142 newServer
.waitForServerOnline();
143 Assert
.assertNotNull(splitWALManager
.acquireSplitWALWorker(testProcedures
.get(3)));
147 public void testCreateSplitWALProcedures() throws Exception
{
148 TEST_UTIL
.createTable(TABLE_NAME
, FAMILY
, TEST_UTIL
.KEYS_FOR_HBA_CREATE_TABLE
);
150 TEST_UTIL
.loadTable(TEST_UTIL
.getConnection().getTable(TABLE_NAME
), FAMILY
);
151 ProcedureExecutor
<MasterProcedureEnv
> masterPE
= master
.getMasterProcedureExecutor();
152 ServerName metaServer
= TEST_UTIL
.getHBaseCluster().getServerHoldingMeta();
153 Path metaWALDir
= new Path(TEST_UTIL
.getDefaultRootDirPath(),
154 AbstractFSWALProvider
.getWALDirectoryName(metaServer
.toString()));
155 // Test splitting meta wal
157 TEST_UTIL
.getTestFileSystem().listStatus(metaWALDir
, MasterWalManager
.META_FILTER
);
158 Assert
.assertEquals(1, wals
.length
);
159 List
<Procedure
> testProcedures
=
160 splitWALManager
.createSplitWALProcedures(Lists
.newArrayList(wals
[0]), metaServer
);
161 Assert
.assertEquals(1, testProcedures
.size());
162 ProcedureTestingUtility
.submitAndWait(masterPE
, testProcedures
.get(0));
163 Assert
.assertFalse(TEST_UTIL
.getTestFileSystem().exists(wals
[0].getPath()));
165 // Test splitting wal
166 wals
= TEST_UTIL
.getTestFileSystem().listStatus(metaWALDir
, MasterWalManager
.NON_META_FILTER
);
167 Assert
.assertEquals(1, wals
.length
);
169 splitWALManager
.createSplitWALProcedures(Lists
.newArrayList(wals
[0]), metaServer
);
170 Assert
.assertEquals(1, testProcedures
.size());
171 ProcedureTestingUtility
.submitAndWait(masterPE
, testProcedures
.get(0));
172 Assert
.assertFalse(TEST_UTIL
.getTestFileSystem().exists(wals
[0].getPath()));
176 public void testAcquireAndReleaseSplitWALWorker() throws Exception
{
177 ProcedureExecutor
<MasterProcedureEnv
> masterPE
= master
.getMasterProcedureExecutor();
178 List
<FakeServerProcedure
> testProcedures
= new ArrayList
<>();
179 for (int i
= 0; i
< 3; i
++) {
180 FakeServerProcedure procedure
=
181 new FakeServerProcedure(TEST_UTIL
.getHBaseCluster().getRegionServer(i
).getServerName());
182 testProcedures
.add(procedure
);
183 ProcedureTestingUtility
.submitProcedure(masterPE
, procedure
, HConstants
.NO_NONCE
,
184 HConstants
.NO_NONCE
);
186 TEST_UTIL
.waitFor(10000, () -> testProcedures
.get(2).isWorkerAcquired());
187 FakeServerProcedure failedProcedure
=
188 new FakeServerProcedure(TEST_UTIL
.getHBaseCluster().getServerHoldingMeta());
189 ProcedureTestingUtility
.submitProcedure(masterPE
, failedProcedure
, HConstants
.NO_NONCE
,
190 HConstants
.NO_NONCE
);
191 TEST_UTIL
.waitFor(20000, () -> failedProcedure
.isTriedToAcquire());
192 Assert
.assertFalse(failedProcedure
.isWorkerAcquired());
193 // let one procedure finish and release worker
194 testProcedures
.get(0).countDown();
195 TEST_UTIL
.waitFor(10000, () -> failedProcedure
.isWorkerAcquired());
196 Assert
.assertTrue(testProcedures
.get(0).isSuccess());
200 public void testGetWALsToSplit() throws Exception
{
201 TEST_UTIL
.createTable(TABLE_NAME
, FAMILY
, TEST_UTIL
.KEYS_FOR_HBA_CREATE_TABLE
);
203 TEST_UTIL
.loadTable(TEST_UTIL
.getConnection().getTable(TABLE_NAME
), FAMILY
);
204 ServerName metaServer
= TEST_UTIL
.getHBaseCluster().getServerHoldingMeta();
205 List
<FileStatus
> metaWals
= splitWALManager
.getWALsToSplit(metaServer
, true);
206 Assert
.assertEquals(1, metaWals
.size());
207 List
<FileStatus
> wals
= splitWALManager
.getWALsToSplit(metaServer
, false);
208 Assert
.assertEquals(1, wals
.size());
209 ServerName testServer
= TEST_UTIL
.getHBaseCluster().getRegionServerThreads().stream()
210 .map(rs
-> rs
.getRegionServer().getServerName()).filter(rs
-> rs
!= metaServer
).findAny()
212 metaWals
= splitWALManager
.getWALsToSplit(testServer
, true);
213 Assert
.assertEquals(0, metaWals
.size());
216 private void splitLogsTestHelper(HBaseTestingUtil testUtil
) throws Exception
{
217 HMaster hmaster
= testUtil
.getHBaseCluster().getMaster();
218 SplitWALManager splitWALManager
= hmaster
.getSplitWALManager();
219 LOG
.info("The Master FS is pointing to: " + hmaster
.getMasterFileSystem()
220 .getFileSystem().getUri());
221 LOG
.info("The WAL FS is pointing to: " + hmaster
.getMasterFileSystem()
222 .getWALFileSystem().getUri());
224 testUtil
.createTable(TABLE_NAME
, FAMILY
, testUtil
.KEYS_FOR_HBA_CREATE_TABLE
);
226 testUtil
.loadTable(testUtil
.getConnection().getTable(TABLE_NAME
), FAMILY
);
227 ProcedureExecutor
<MasterProcedureEnv
> masterPE
= hmaster
.getMasterProcedureExecutor();
228 ServerName metaServer
= testUtil
.getHBaseCluster().getServerHoldingMeta();
229 ServerName testServer
= testUtil
.getHBaseCluster().getRegionServerThreads().stream()
230 .map(rs
-> rs
.getRegionServer().getServerName()).filter(rs
-> rs
!= metaServer
).findAny()
232 List
<Procedure
> procedures
= splitWALManager
.splitWALs(testServer
, false);
233 Assert
.assertEquals(1, procedures
.size());
234 ProcedureTestingUtility
.submitAndWait(masterPE
, procedures
.get(0));
235 Assert
.assertEquals(0, splitWALManager
.getWALsToSplit(testServer
, false).size());
237 // Validate the old WAL file archive dir
238 Path walRootDir
= hmaster
.getMasterFileSystem().getWALRootDir();
239 Path walArchivePath
= new Path(walRootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
240 FileSystem walFS
= hmaster
.getMasterFileSystem().getWALFileSystem();
241 int archiveFileCount
= walFS
.listStatus(walArchivePath
).length
;
243 procedures
= splitWALManager
.splitWALs(metaServer
, true);
244 Assert
.assertEquals(1, procedures
.size());
245 ProcedureTestingUtility
.submitAndWait(masterPE
, procedures
.get(0));
246 Assert
.assertEquals(0, splitWALManager
.getWALsToSplit(metaServer
, true).size());
247 Assert
.assertEquals(1, splitWALManager
.getWALsToSplit(metaServer
, false).size());
248 // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish
249 Assert
.assertEquals("Splitted WAL files should be archived", archiveFileCount
+ 1,
250 walFS
.listStatus(walArchivePath
).length
);
254 public void testSplitLogs() throws Exception
{
255 splitLogsTestHelper(TEST_UTIL
);
259 public void testSplitLogsWithDifferentWalAndRootFS() throws Exception
{
260 HBaseTestingUtil testUtil2
= new HBaseTestingUtil();
261 testUtil2
.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK
, false);
262 testUtil2
.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER
, 1);
263 Path dir
= TEST_UTIL
.getDataTestDirOnTestFS("testWalDir");
264 testUtil2
.getConfiguration().set(CommonFSUtils
.HBASE_WAL_DIR
, dir
.toString());
265 CommonFSUtils
.setWALRootDir(testUtil2
.getConfiguration(), dir
);
266 testUtil2
.startMiniCluster(3);
267 splitLogsTestHelper(testUtil2
);
268 testUtil2
.shutdownMiniCluster();
272 public void testWorkerReloadWhenMasterRestart() throws Exception
{
273 List
<FakeServerProcedure
> testProcedures
= new ArrayList
<>();
274 for (int i
= 0; i
< 3; i
++) {
275 FakeServerProcedure procedure
=
276 new FakeServerProcedure(TEST_UTIL
.getHBaseCluster().getRegionServer(i
).getServerName());
277 testProcedures
.add(procedure
);
278 ProcedureTestingUtility
.submitProcedure(master
.getMasterProcedureExecutor(), procedure
,
279 HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
281 TEST_UTIL
.waitFor(10000, () -> testProcedures
.get(2).isWorkerAcquired());
283 TEST_UTIL
.getHBaseCluster().killMaster(master
.getServerName());
284 TEST_UTIL
.getHBaseCluster().waitForMasterToStop(master
.getServerName(), 20000);
286 TEST_UTIL
.getHBaseCluster().startMaster();
287 TEST_UTIL
.getHBaseCluster().waitForActiveAndReadyMaster();
288 this.master
= TEST_UTIL
.getHBaseCluster().getMaster();
290 FakeServerProcedure failedProcedure
=
291 new FakeServerProcedure(TEST_UTIL
.getHBaseCluster().getServerHoldingMeta());
292 ProcedureTestingUtility
.submitProcedure(master
.getMasterProcedureExecutor(), failedProcedure
,
293 HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
294 TEST_UTIL
.waitFor(20000, () -> failedProcedure
.isTriedToAcquire());
295 Assert
.assertFalse(failedProcedure
.isWorkerAcquired());
296 for (int i
= 0; i
< 3; i
++) {
297 testProcedures
.get(i
).countDown();
299 failedProcedure
.countDown();
302 public static final class FakeServerProcedure
303 extends StateMachineProcedure
<MasterProcedureEnv
, MasterProcedureProtos
.SplitWALState
>
304 implements ServerProcedureInterface
{
306 private ServerName serverName
;
307 private ServerName worker
;
308 private CountDownLatch barrier
= new CountDownLatch(1);
309 private boolean triedToAcquire
= false;
311 public FakeServerProcedure() {
314 public FakeServerProcedure(ServerName serverName
) {
315 this.serverName
= serverName
;
318 public ServerName
getServerName() {
323 public boolean hasMetaTableRegion() {
328 public ServerOperationType
getServerOperationType() {
333 protected Flow
executeFromState(MasterProcedureEnv env
,
334 MasterProcedureProtos
.SplitWALState state
)
335 throws ProcedureSuspendedException
, ProcedureYieldException
, InterruptedException
{
336 SplitWALManager splitWALManager
= env
.getMasterServices().getSplitWALManager();
338 case ACQUIRE_SPLIT_WAL_WORKER
:
339 triedToAcquire
= true;
340 worker
= splitWALManager
.acquireSplitWALWorker(this);
341 setNextState(MasterProcedureProtos
.SplitWALState
.DISPATCH_WAL_TO_WORKER
);
342 return Flow
.HAS_MORE_STATE
;
343 case DISPATCH_WAL_TO_WORKER
:
345 setNextState(MasterProcedureProtos
.SplitWALState
.RELEASE_SPLIT_WORKER
);
346 return Flow
.HAS_MORE_STATE
;
347 case RELEASE_SPLIT_WORKER
:
348 splitWALManager
.releaseSplitWALWorker(worker
, env
.getProcedureScheduler());
349 return Flow
.NO_MORE_STATE
;
351 throw new UnsupportedOperationException("unhandled state=" + state
);
355 public boolean isWorkerAcquired() {
356 return worker
!= null;
359 public boolean isTriedToAcquire() {
360 return triedToAcquire
;
363 public void countDown() {
364 this.barrier
.countDown();
368 protected void rollbackState(MasterProcedureEnv env
, MasterProcedureProtos
.SplitWALState state
)
369 throws IOException
, InterruptedException
{
374 protected MasterProcedureProtos
.SplitWALState
getState(int stateId
) {
375 return MasterProcedureProtos
.SplitWALState
.forNumber(stateId
);
379 protected int getStateId(MasterProcedureProtos
.SplitWALState state
) {
380 return state
.getNumber();
384 protected MasterProcedureProtos
.SplitWALState
getInitialState() {
385 return MasterProcedureProtos
.SplitWALState
.ACQUIRE_SPLIT_WAL_WORKER
;
389 protected boolean holdLock(MasterProcedureEnv env
) {
394 protected void rollback(MasterProcedureEnv env
) throws IOException
, InterruptedException
{
399 protected boolean abort(MasterProcedureEnv env
) {
404 protected void serializeStateData(ProcedureStateSerializer serializer
) throws IOException
{
405 MasterProcedureProtos
.SplitWALData
.Builder builder
=
406 MasterProcedureProtos
.SplitWALData
.newBuilder();
407 builder
.setWalPath("test").setCrashedServer(ProtobufUtil
.toServerName(serverName
));
408 serializer
.serialize(builder
.build());
412 protected void deserializeStateData(ProcedureStateSerializer serializer
) throws IOException
{
413 MasterProcedureProtos
.SplitWALData data
=
414 serializer
.deserialize(MasterProcedureProtos
.SplitWALData
.class);
415 serverName
= ProtobufUtil
.toServerName(data
.getCrashedServer());