HBASE-26416 Implement a new method for region replication instead of using replay...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / master / TestSplitWALManager.java
blobe97d43b724e5e9db7530acbbc1c1fc4365654c1f
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.master;
20 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
21 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
22 import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import org.apache.hadoop.fs.FileStatus;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HBaseClassTestRule;
31 import org.apache.hadoop.hbase.HBaseTestingUtil;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.ServerName;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
36 import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
37 import org.apache.hadoop.hbase.procedure2.Procedure;
38 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
39 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
40 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
41 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
42 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
43 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
44 import org.apache.hadoop.hbase.testclassification.LargeTests;
45 import org.apache.hadoop.hbase.testclassification.MasterTests;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.CommonFSUtils;
48 import org.apache.hadoop.hbase.util.JVMClusterUtil;
49 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
50 import org.junit.After;
51 import org.junit.Assert;
52 import org.junit.Before;
53 import org.junit.ClassRule;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
57 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
58 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 @Category({ MasterTests.class, LargeTests.class })
64 public class TestSplitWALManager {
66 @ClassRule
67 public static final HBaseClassTestRule CLASS_RULE =
68 HBaseClassTestRule.forClass(TestSplitWALManager.class);
70 private static final Logger LOG = LoggerFactory.getLogger(TestSplitWALManager.class);
71 private static HBaseTestingUtil TEST_UTIL;
72 private HMaster master;
73 private SplitWALManager splitWALManager;
74 private TableName TABLE_NAME;
75 private byte[] FAMILY;
77 @Before
78 public void setup() throws Exception {
79 TEST_UTIL = new HBaseTestingUtil();
80 TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
81 TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
82 TEST_UTIL.startMiniCluster(3);
83 master = TEST_UTIL.getHBaseCluster().getMaster();
84 splitWALManager = master.getSplitWALManager();
85 TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager"));
86 FAMILY = Bytes.toBytes("test");
89 @After
90 public void teardown() throws Exception {
91 TEST_UTIL.shutdownMiniCluster();
94 @Test
95 public void testAcquireAndRelease() throws Exception {
96 List<FakeServerProcedure> testProcedures = new ArrayList<>();
97 for (int i = 0; i < 4; i++) {
98 testProcedures.add(new FakeServerProcedure(
99 TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
101 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
102 Assert.assertNotNull(server);
103 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
104 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
106 Exception e = null;
107 try {
108 splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
109 } catch (ProcedureSuspendedException suspendException) {
110 e = suspendException;
112 Assert.assertNotNull(e);
113 Assert.assertTrue(e instanceof ProcedureSuspendedException);
115 splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster()
116 .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
117 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
120 @Test
121 public void testAddNewServer() throws Exception {
122 List<FakeServerProcedure> testProcedures = new ArrayList<>();
123 for (int i = 0; i < 4; i++) {
124 testProcedures.add(new FakeServerProcedure(
125 TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
127 ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
128 Assert.assertNotNull(server);
129 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
130 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
132 Exception e = null;
133 try {
134 splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
135 } catch (ProcedureSuspendedException suspendException) {
136 e = suspendException;
138 Assert.assertNotNull(e);
139 Assert.assertTrue(e instanceof ProcedureSuspendedException);
141 JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer();
142 newServer.waitForServerOnline();
143 Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
146 @Test
147 public void testCreateSplitWALProcedures() throws Exception {
148 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
149 // load table
150 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
151 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
152 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
153 Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
154 AbstractFSWALProvider.getWALDirectoryName(metaServer.toString()));
155 // Test splitting meta wal
156 FileStatus[] wals =
157 TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER);
158 Assert.assertEquals(1, wals.length);
159 List<Procedure> testProcedures =
160 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
161 Assert.assertEquals(1, testProcedures.size());
162 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
163 Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
165 // Test splitting wal
166 wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER);
167 Assert.assertEquals(1, wals.length);
168 testProcedures =
169 splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
170 Assert.assertEquals(1, testProcedures.size());
171 ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
172 Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
175 @Test
176 public void testAcquireAndReleaseSplitWALWorker() throws Exception {
177 ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
178 List<FakeServerProcedure> testProcedures = new ArrayList<>();
179 for (int i = 0; i < 3; i++) {
180 FakeServerProcedure procedure =
181 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
182 testProcedures.add(procedure);
183 ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE,
184 HConstants.NO_NONCE);
186 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
187 FakeServerProcedure failedProcedure =
188 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
189 ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE,
190 HConstants.NO_NONCE);
191 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
192 Assert.assertFalse(failedProcedure.isWorkerAcquired());
193 // let one procedure finish and release worker
194 testProcedures.get(0).countDown();
195 TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
196 Assert.assertTrue(testProcedures.get(0).isSuccess());
199 @Test
200 public void testGetWALsToSplit() throws Exception {
201 TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
202 // load table
203 TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
204 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
205 List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true);
206 Assert.assertEquals(1, metaWals.size());
207 List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
208 Assert.assertEquals(1, wals.size());
209 ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
210 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
211 .get();
212 metaWals = splitWALManager.getWALsToSplit(testServer, true);
213 Assert.assertEquals(0, metaWals.size());
216 private void splitLogsTestHelper(HBaseTestingUtil testUtil) throws Exception {
217 HMaster hmaster = testUtil.getHBaseCluster().getMaster();
218 SplitWALManager splitWALManager = hmaster.getSplitWALManager();
219 LOG.info("The Master FS is pointing to: " + hmaster.getMasterFileSystem()
220 .getFileSystem().getUri());
221 LOG.info("The WAL FS is pointing to: " + hmaster.getMasterFileSystem()
222 .getWALFileSystem().getUri());
224 testUtil.createTable(TABLE_NAME, FAMILY, testUtil.KEYS_FOR_HBA_CREATE_TABLE);
225 // load table
226 testUtil.loadTable(testUtil.getConnection().getTable(TABLE_NAME), FAMILY);
227 ProcedureExecutor<MasterProcedureEnv> masterPE = hmaster.getMasterProcedureExecutor();
228 ServerName metaServer = testUtil.getHBaseCluster().getServerHoldingMeta();
229 ServerName testServer = testUtil.getHBaseCluster().getRegionServerThreads().stream()
230 .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
231 .get();
232 List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
233 Assert.assertEquals(1, procedures.size());
234 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
235 Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
237 // Validate the old WAL file archive dir
238 Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir();
239 Path walArchivePath = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
240 FileSystem walFS = hmaster.getMasterFileSystem().getWALFileSystem();
241 int archiveFileCount = walFS.listStatus(walArchivePath).length;
243 procedures = splitWALManager.splitWALs(metaServer, true);
244 Assert.assertEquals(1, procedures.size());
245 ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
246 Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
247 Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
248 // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish
249 Assert.assertEquals("Splitted WAL files should be archived", archiveFileCount + 1,
250 walFS.listStatus(walArchivePath).length);
253 @Test
254 public void testSplitLogs() throws Exception {
255 splitLogsTestHelper(TEST_UTIL);
258 @Test
259 public void testSplitLogsWithDifferentWalAndRootFS() throws Exception{
260 HBaseTestingUtil testUtil2 = new HBaseTestingUtil();
261 testUtil2.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
262 testUtil2.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
263 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir");
264 testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR, dir.toString());
265 CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir);
266 testUtil2.startMiniCluster(3);
267 splitLogsTestHelper(testUtil2);
268 testUtil2.shutdownMiniCluster();
271 @Test
272 public void testWorkerReloadWhenMasterRestart() throws Exception {
273 List<FakeServerProcedure> testProcedures = new ArrayList<>();
274 for (int i = 0; i < 3; i++) {
275 FakeServerProcedure procedure =
276 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
277 testProcedures.add(procedure);
278 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure,
279 HConstants.NO_NONCE, HConstants.NO_NONCE);
281 TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
282 // Kill master
283 TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
284 TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000);
285 // restart master
286 TEST_UTIL.getHBaseCluster().startMaster();
287 TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
288 this.master = TEST_UTIL.getHBaseCluster().getMaster();
290 FakeServerProcedure failedProcedure =
291 new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
292 ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure,
293 HConstants.NO_NONCE, HConstants.NO_NONCE);
294 TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
295 Assert.assertFalse(failedProcedure.isWorkerAcquired());
296 for (int i = 0; i < 3; i++) {
297 testProcedures.get(i).countDown();
299 failedProcedure.countDown();
302 public static final class FakeServerProcedure
303 extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
304 implements ServerProcedureInterface {
306 private ServerName serverName;
307 private ServerName worker;
308 private CountDownLatch barrier = new CountDownLatch(1);
309 private boolean triedToAcquire = false;
311 public FakeServerProcedure() {
314 public FakeServerProcedure(ServerName serverName) {
315 this.serverName = serverName;
318 public ServerName getServerName() {
319 return serverName;
322 @Override
323 public boolean hasMetaTableRegion() {
324 return false;
327 @Override
328 public ServerOperationType getServerOperationType() {
329 return SPLIT_WAL;
332 @Override
333 protected Flow executeFromState(MasterProcedureEnv env,
334 MasterProcedureProtos.SplitWALState state)
335 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
336 SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
337 switch (state) {
338 case ACQUIRE_SPLIT_WAL_WORKER:
339 triedToAcquire = true;
340 worker = splitWALManager.acquireSplitWALWorker(this);
341 setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
342 return Flow.HAS_MORE_STATE;
343 case DISPATCH_WAL_TO_WORKER:
344 barrier.await();
345 setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
346 return Flow.HAS_MORE_STATE;
347 case RELEASE_SPLIT_WORKER:
348 splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
349 return Flow.NO_MORE_STATE;
350 default:
351 throw new UnsupportedOperationException("unhandled state=" + state);
355 public boolean isWorkerAcquired() {
356 return worker != null;
359 public boolean isTriedToAcquire() {
360 return triedToAcquire;
363 public void countDown() {
364 this.barrier.countDown();
367 @Override
368 protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
369 throws IOException, InterruptedException {
373 @Override
374 protected MasterProcedureProtos.SplitWALState getState(int stateId) {
375 return MasterProcedureProtos.SplitWALState.forNumber(stateId);
378 @Override
379 protected int getStateId(MasterProcedureProtos.SplitWALState state) {
380 return state.getNumber();
383 @Override
384 protected MasterProcedureProtos.SplitWALState getInitialState() {
385 return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
388 @Override
389 protected boolean holdLock(MasterProcedureEnv env) {
390 return true;
393 @Override
394 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
398 @Override
399 protected boolean abort(MasterProcedureEnv env) {
400 return false;
403 @Override
404 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
405 MasterProcedureProtos.SplitWALData.Builder builder =
406 MasterProcedureProtos.SplitWALData.newBuilder();
407 builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName));
408 serializer.serialize(builder.build());
411 @Override
412 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
413 MasterProcedureProtos.SplitWALData data =
414 serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
415 serverName = ProtobufUtil.toServerName(data.getCrashedServer());