HBASE-26416 Implement a new method for region replication instead of using replay...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / master / AbstractTestDLS.java
bloba0f26c001e28110567030cc5cfb0913bb538df65
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.master;
20 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
21 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.NavigableSet;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import java.util.concurrent.atomic.LongAdder;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.FSDataOutputStream;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.HBaseTestingUtil;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.KeyValue;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
47 import org.apache.hadoop.hbase.SplitLogCounters;
48 import org.apache.hadoop.hbase.StartTestingClusterOption;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.Waiter;
51 import org.apache.hadoop.hbase.client.Put;
52 import org.apache.hadoop.hbase.client.RegionInfo;
53 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
54 import org.apache.hadoop.hbase.client.RegionLocator;
55 import org.apache.hadoop.hbase.client.Table;
56 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
57 import org.apache.hadoop.hbase.master.assignment.RegionStates;
58 import org.apache.hadoop.hbase.regionserver.HRegionServer;
59 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
60 import org.apache.hadoop.hbase.regionserver.Region;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.CommonFSUtils;
63 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
65 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
66 import org.apache.hadoop.hbase.util.Threads;
67 import org.apache.hadoop.hbase.wal.WAL;
68 import org.apache.hadoop.hbase.wal.WALEdit;
69 import org.apache.hadoop.hbase.wal.WALFactory;
70 import org.apache.hadoop.hbase.wal.WALKeyImpl;
71 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
72 import org.junit.After;
73 import org.junit.AfterClass;
74 import org.junit.Before;
75 import org.junit.BeforeClass;
76 import org.junit.Rule;
77 import org.junit.Test;
78 import org.junit.rules.TestName;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
82 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
84 /**
85 * Base class for testing distributed log splitting.
87 public abstract class AbstractTestDLS {
88 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
90 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
92 // Start a cluster with 2 masters and 5 regionservers
93 private static final int NUM_MASTERS = 2;
94 private static final int NUM_RS = 5;
95 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
97 @Rule
98 public TestName testName = new TestName();
100 private TableName tableName;
101 private SingleProcessHBaseCluster cluster;
102 private HMaster master;
103 private Configuration conf;
105 @Rule
106 public TestName name = new TestName();
108 @BeforeClass
109 public static void setup() throws Exception {
110 // Uncomment the following line if more verbosity is needed for
111 // debugging (see HBASE-12285 for details).
112 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
113 TEST_UTIL.startMiniZKCluster();
114 TEST_UTIL.startMiniDFSCluster(3);
117 @AfterClass
118 public static void tearDown() throws Exception {
119 TEST_UTIL.shutdownMiniCluster();
122 protected abstract String getWalProvider();
124 private void startCluster(int numRS) throws Exception {
125 SplitLogCounters.resetCounters();
126 LOG.info("Starting cluster");
127 conf.setLong("hbase.splitlog.max.resubmit", 0);
128 // Make the failure test faster
129 conf.setInt("zookeeper.recovery.retry", 0);
130 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
131 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
132 conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
133 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
134 conf.set("hbase.wal.provider", getWalProvider());
135 StartTestingClusterOption option = StartTestingClusterOption.builder()
136 .numMasters(NUM_MASTERS).numRegionServers(numRS).build();
137 TEST_UTIL.startMiniHBaseCluster(option);
138 cluster = TEST_UTIL.getHBaseCluster();
139 LOG.info("Waiting for active/ready master");
140 cluster.waitForActiveAndReadyMaster();
141 master = cluster.getMaster();
142 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
143 @Override
144 public boolean evaluate() throws Exception {
145 return cluster.getLiveRegionServerThreads().size() >= numRS;
150 @Before
151 public void before() throws Exception {
152 conf = TEST_UTIL.getConfiguration();
153 tableName = TableName.valueOf(testName.getMethodName());
156 @After
157 public void after() throws Exception {
158 TEST_UTIL.shutdownMiniHBaseCluster();
159 TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()),
160 true);
161 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
164 @Test
165 public void testMasterStartsUpWithLogSplittingWork() throws Exception {
166 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
167 startCluster(NUM_RS);
169 int numRegionsToCreate = 40;
170 int numLogLines = 1000;
171 // turn off load balancing to prevent regions from moving around otherwise
172 // they will consume recovered.edits
173 master.balanceSwitch(false);
175 try (Table ht = installTable(numRegionsToCreate)) {
176 HRegionServer hrs = findRSToKill(false);
177 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
178 makeWAL(hrs, regions, numLogLines, 100);
180 // abort master
181 abortMaster(cluster);
183 // abort RS
184 LOG.info("Aborting region server: " + hrs.getServerName());
185 hrs.abort("testing");
187 // wait for abort completes
188 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
189 @Override
190 public boolean evaluate() throws Exception {
191 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
195 Thread.sleep(2000);
196 LOG.info("Current Open Regions:" + HBaseTestingUtil.getAllOnlineRegions(cluster).size());
198 // wait for abort completes
199 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
200 @Override
201 public boolean evaluate() throws Exception {
202 return (HBaseTestingUtil.getAllOnlineRegions(cluster)
203 .size() >= (numRegionsToCreate + 1));
207 LOG.info("Current Open Regions After Master Node Starts Up:" +
208 HBaseTestingUtil.getAllOnlineRegions(cluster).size());
210 assertEquals(numLogLines, TEST_UTIL.countRows(ht));
214 @Test
215 public void testThreeRSAbort() throws Exception {
216 LOG.info("testThreeRSAbort");
217 int numRegionsToCreate = 40;
218 int numRowsPerRegion = 100;
220 startCluster(NUM_RS); // NUM_RS=6.
222 try (Table table = installTable(numRegionsToCreate)) {
223 populateDataInTable(numRowsPerRegion);
225 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
226 assertEquals(NUM_RS, rsts.size());
227 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
228 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
229 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
231 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
233 @Override
234 public boolean evaluate() throws Exception {
235 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
238 @Override
239 public String explainFailure() throws Exception {
240 return "Timed out waiting for server aborts.";
243 TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
244 int rows;
245 try {
246 rows = TEST_UTIL.countRows(table);
247 } catch (Exception e) {
248 Threads.printThreadInfo(System.out, "Thread dump before fail");
249 throw e;
251 assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
255 @Test
256 public void testDelayedDeleteOnFailure() throws Exception {
257 if (!this.conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
258 HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
259 // This test depends on zk coordination....
260 return;
262 LOG.info("testDelayedDeleteOnFailure");
263 startCluster(1);
264 final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
265 final FileSystem fs = master.getMasterFileSystem().getFileSystem();
266 final Path rootLogDir =
267 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
268 final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString());
269 fs.mkdirs(logDir);
270 ExecutorService executor = null;
271 try {
272 final Path corruptedLogFile = new Path(logDir, "x");
273 FSDataOutputStream out;
274 out = fs.create(corruptedLogFile);
275 out.write(0);
276 out.write(Bytes.toBytes("corrupted bytes"));
277 out.close();
278 ZKSplitLogManagerCoordination coordination =
279 (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
280 .getSplitLogManagerCoordination();
281 coordination.setIgnoreDeleteForTesting(true);
282 executor = Executors.newSingleThreadExecutor();
283 Runnable runnable = new Runnable() {
284 @Override
285 public void run() {
286 try {
287 // since the logDir is a fake, corrupted one, so the split log worker
288 // will finish it quickly with error, and this call will fail and throw
289 // an IOException.
290 slm.splitLogDistributed(logDir);
291 } catch (IOException ioe) {
292 try {
293 assertTrue(fs.exists(corruptedLogFile));
294 // this call will block waiting for the task to be removed from the
295 // tasks map which is not going to happen since ignoreZKDeleteForTesting
296 // is set to true, until it is interrupted.
297 slm.splitLogDistributed(logDir);
298 } catch (IOException e) {
299 assertTrue(Thread.currentThread().isInterrupted());
300 return;
302 fail("did not get the expected IOException from the 2nd call");
304 fail("did not get the expected IOException from the 1st call");
307 Future<?> result = executor.submit(runnable);
308 try {
309 result.get(2000, TimeUnit.MILLISECONDS);
310 } catch (TimeoutException te) {
311 // it is ok, expected.
313 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
314 executor.shutdownNow();
315 executor = null;
317 // make sure the runnable is finished with no exception thrown.
318 result.get();
319 } finally {
320 if (executor != null) {
321 // interrupt the thread in case the test fails in the middle.
322 // it has no effect if the thread is already terminated.
323 executor.shutdownNow();
325 fs.delete(logDir, true);
329 private Table installTable(int nrs) throws Exception {
330 return installTable(nrs, 0);
333 private Table installTable(int nrs, int existingRegions) throws Exception {
334 // Create a table with regions
335 byte[] family = Bytes.toBytes("family");
336 LOG.info("Creating table with " + nrs + " regions");
337 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
338 int numRegions = -1;
339 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
340 numRegions = r.getStartKeys().length;
342 assertEquals(nrs, numRegions);
343 LOG.info("Waiting for no more RIT\n");
344 blockUntilNoRIT();
345 // disable-enable cycle to get rid of table's dead regions left behind
346 // by createMultiRegions
347 assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
348 LOG.debug("Disabling table\n");
349 TEST_UTIL.getAdmin().disableTable(tableName);
350 LOG.debug("Waiting for no more RIT\n");
351 blockUntilNoRIT();
352 NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
353 LOG.debug("Verifying only catalog region is assigned\n");
354 if (regions.size() != 1) {
355 for (String oregion : regions) {
356 LOG.debug("Region still online: " + oregion);
359 assertEquals(1 + existingRegions, regions.size());
360 LOG.debug("Enabling table\n");
361 TEST_UTIL.getAdmin().enableTable(tableName);
362 LOG.debug("Waiting for no more RIT\n");
363 blockUntilNoRIT();
364 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
365 regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
366 assertEquals(numRegions + 1 + existingRegions, regions.size());
367 return table;
370 void populateDataInTable(int nrows) throws Exception {
371 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
372 assertEquals(NUM_RS, rsts.size());
374 for (RegionServerThread rst : rsts) {
375 HRegionServer hrs = rst.getRegionServer();
376 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
377 for (RegionInfo hri : hris) {
378 if (hri.getTable().isSystemTable()) {
379 continue;
381 LOG.debug(
382 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
383 Region region = hrs.getOnlineRegion(hri.getRegionName());
384 assertTrue(region != null);
385 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
390 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
391 throws IOException {
392 makeWAL(hrs, regions, num_edits, edit_size, true);
395 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
396 boolean cleanShutdown) throws IOException {
397 // remove root and meta region
398 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
400 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
401 RegionInfo regionInfo = iter.next();
402 if (regionInfo.getTable().isSystemTable()) {
403 iter.remove();
406 byte[] value = new byte[editSize];
408 List<RegionInfo> hris = new ArrayList<>();
409 for (RegionInfo region : regions) {
410 if (region.getTable() != tableName) {
411 continue;
413 hris.add(region);
415 LOG.info("Creating wal edits across " + hris.size() + " regions.");
416 for (int i = 0; i < editSize; i++) {
417 value[i] = (byte) ('a' + (i % 26));
419 int n = hris.size();
420 int[] counts = new int[n];
421 // sync every ~30k to line up with desired wal rolls
422 final int syncEvery = 30 * 1024 / editSize;
423 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
424 if (n > 0) {
425 for (int i = 0; i < numEdits; i += 1) {
426 WALEdit e = new WALEdit();
427 RegionInfo curRegionInfo = hris.get(i % n);
428 WAL log = hrs.getWAL(curRegionInfo);
429 byte[] startRow = curRegionInfo.getStartKey();
430 if (startRow == null || startRow.length == 0) {
431 startRow = new byte[] { 0, 0, 0, 0, 1 };
433 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
434 row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
435 // HBaseTestingUtility.createMultiRegions use 5 bytes key
436 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
437 e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, EnvironmentEdgeManager.currentTime(),
438 value));
439 log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
440 tableName, EnvironmentEdgeManager.currentTime(), mvcc), e);
441 if (0 == i % syncEvery) {
442 log.sync();
444 counts[i % n] += 1;
447 // done as two passes because the regions might share logs. shutdown is idempotent, but sync
448 // will cause errors if done after.
449 for (RegionInfo info : hris) {
450 WAL log = hrs.getWAL(info);
451 log.sync();
453 if (cleanShutdown) {
454 for (RegionInfo info : hris) {
455 WAL log = hrs.getWAL(info);
456 log.shutdown();
459 for (int i = 0; i < n; i++) {
460 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
462 return;
465 private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
466 int count = 0;
467 try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
468 WAL.Entry e;
469 while ((e = in.next()) != null) {
470 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
471 count++;
475 return count;
478 private void blockUntilNoRIT() throws Exception {
479 TEST_UTIL.waitUntilNoRegionsInTransition(60000);
482 private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
483 throws IOException {
484 for (int i = 0; i < numRows; i++) {
485 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
486 for (byte[] family : families) {
487 put.addColumn(family, qf, null);
489 region.put(put);
493 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
494 throws InterruptedException {
495 long curt = EnvironmentEdgeManager.currentTime();
496 long endt = curt + timems;
497 while (curt < endt) {
498 if (ctr.sum() == oldval) {
499 Thread.sleep(100);
500 curt = EnvironmentEdgeManager.currentTime();
501 } else {
502 assertEquals(newval, ctr.sum());
503 return;
506 fail();
509 private void abortMaster(SingleProcessHBaseCluster cluster) throws InterruptedException {
510 for (MasterThread mt : cluster.getLiveMasterThreads()) {
511 if (mt.getMaster().isActiveMaster()) {
512 mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
513 mt.join();
514 break;
517 LOG.debug("Master is aborted");
521 * Find a RS that has regions of a table.
522 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
524 private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
525 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
526 List<RegionInfo> regions = null;
527 HRegionServer hrs = null;
529 for (RegionServerThread rst : rsts) {
530 hrs = rst.getRegionServer();
531 while (rst.isAlive() && !hrs.isOnline()) {
532 Thread.sleep(100);
534 if (!rst.isAlive()) {
535 continue;
537 boolean isCarryingMeta = false;
538 boolean foundTableRegion = false;
539 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
540 for (RegionInfo region : regions) {
541 if (region.isMetaRegion()) {
542 isCarryingMeta = true;
544 if (region.getTable() == tableName) {
545 foundTableRegion = true;
547 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
548 break;
551 if (isCarryingMeta && hasMetaRegion) {
552 // clients ask for a RS with META
553 if (!foundTableRegion) {
554 HRegionServer destRS = hrs;
555 // the RS doesn't have regions of the specified table so we need move one to this RS
556 List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
557 RegionInfo hri = tableRegions.get(0);
558 TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
559 // wait for region move completes
560 RegionStates regionStates =
561 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
562 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
563 @Override
564 public boolean evaluate() throws Exception {
565 ServerName sn = regionStates.getRegionServerOfRegion(hri);
566 return (sn != null && sn.equals(destRS.getServerName()));
570 return hrs;
571 } else if (hasMetaRegion || isCarryingMeta) {
572 continue;
574 if (foundTableRegion) {
575 break;
579 return hrs;