HBASE-23949 refactor loadBalancer implements for rsgroup balance by table to achieve...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / master / AbstractTestDLS.java
bloba576adc8d192f656ea16551edfe1a2538b21018e
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.master;
21 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
22 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
23 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
24 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
25 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
26 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
27 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
28 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
29 import static org.junit.Assert.assertEquals;
30 import static org.junit.Assert.assertFalse;
31 import static org.junit.Assert.assertTrue;
32 import static org.junit.Assert.fail;
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.NavigableSet;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.TimeoutException;
45 import java.util.concurrent.atomic.LongAdder;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FSDataOutputStream;
48 import org.apache.hadoop.fs.FileStatus;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.fs.PathFilter;
52 import org.apache.hadoop.hbase.HBaseTestingUtility;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.KeyValue;
55 import org.apache.hadoop.hbase.MiniHBaseCluster;
56 import org.apache.hadoop.hbase.NamespaceDescriptor;
57 import org.apache.hadoop.hbase.ServerName;
58 import org.apache.hadoop.hbase.SplitLogCounters;
59 import org.apache.hadoop.hbase.StartMiniClusterOption;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.Waiter;
62 import org.apache.hadoop.hbase.client.Put;
63 import org.apache.hadoop.hbase.client.RegionInfo;
64 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
65 import org.apache.hadoop.hbase.client.RegionLocator;
66 import org.apache.hadoop.hbase.client.Table;
67 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
68 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
69 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
70 import org.apache.hadoop.hbase.master.assignment.RegionStates;
71 import org.apache.hadoop.hbase.regionserver.HRegionServer;
72 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
73 import org.apache.hadoop.hbase.regionserver.Region;
74 import org.apache.hadoop.hbase.util.Bytes;
75 import org.apache.hadoop.hbase.util.FSUtils;
76 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
77 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
78 import org.apache.hadoop.hbase.util.Threads;
79 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
80 import org.apache.hadoop.hbase.wal.WAL;
81 import org.apache.hadoop.hbase.wal.WALEdit;
82 import org.apache.hadoop.hbase.wal.WALFactory;
83 import org.apache.hadoop.hbase.wal.WALKeyImpl;
84 import org.apache.hadoop.hbase.wal.WALSplitUtil;
85 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
86 import org.junit.After;
87 import org.junit.AfterClass;
88 import org.junit.Before;
89 import org.junit.BeforeClass;
90 import org.junit.Rule;
91 import org.junit.Test;
92 import org.junit.rules.TestName;
93 import org.slf4j.Logger;
94 import org.slf4j.LoggerFactory;
96 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
98 /**
99 * Base class for testing distributed log splitting.
101 public abstract class AbstractTestDLS {
102 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
104 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
106 // Start a cluster with 2 masters and 5 regionservers
107 private static final int NUM_MASTERS = 2;
108 private static final int NUM_RS = 5;
109 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family");
111 @Rule
112 public TestName testName = new TestName();
114 private TableName tableName;
115 private MiniHBaseCluster cluster;
116 private HMaster master;
117 private Configuration conf;
119 @Rule
120 public TestName name = new TestName();
122 @BeforeClass
123 public static void setup() throws Exception {
124 // Uncomment the following line if more verbosity is needed for
125 // debugging (see HBASE-12285 for details).
126 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
127 TEST_UTIL.startMiniZKCluster();
128 TEST_UTIL.startMiniDFSCluster(3);
131 @AfterClass
132 public static void tearDown() throws Exception {
133 TEST_UTIL.shutdownMiniCluster();
136 protected abstract String getWalProvider();
138 private void startCluster(int numRS) throws Exception {
139 SplitLogCounters.resetCounters();
140 LOG.info("Starting cluster");
141 conf.setLong("hbase.splitlog.max.resubmit", 0);
142 // Make the failure test faster
143 conf.setInt("zookeeper.recovery.retry", 0);
144 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
145 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
146 conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
147 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
148 conf.set("hbase.wal.provider", getWalProvider());
149 StartMiniClusterOption option = StartMiniClusterOption.builder()
150 .numMasters(NUM_MASTERS).numRegionServers(numRS).build();
151 TEST_UTIL.startMiniHBaseCluster(option);
152 cluster = TEST_UTIL.getHBaseCluster();
153 LOG.info("Waiting for active/ready master");
154 cluster.waitForActiveAndReadyMaster();
155 master = cluster.getMaster();
156 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
157 @Override
158 public boolean evaluate() throws Exception {
159 return cluster.getLiveRegionServerThreads().size() >= numRS;
164 @Before
165 public void before() throws Exception {
166 conf = TEST_UTIL.getConfiguration();
167 tableName = TableName.valueOf(testName.getMethodName());
170 @After
171 public void after() throws Exception {
172 TEST_UTIL.shutdownMiniHBaseCluster();
173 TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
174 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
177 @Test
178 public void testRecoveredEdits() throws Exception {
179 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
180 startCluster(NUM_RS);
182 int numLogLines = 10000;
183 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
184 // turn off load balancing to prevent regions from moving around otherwise
185 // they will consume recovered.edits
186 master.balanceSwitch(false);
187 FileSystem fs = master.getMasterFileSystem().getFileSystem();
189 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
191 Path rootdir = FSUtils.getRootDir(conf);
193 int numRegions = 50;
194 try (Table t = installTable(numRegions)) {
195 List<RegionInfo> regions = null;
196 HRegionServer hrs = null;
197 for (int i = 0; i < NUM_RS; i++) {
198 hrs = rsts.get(i).getRegionServer();
199 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
200 // At least one RS will have >= to average number of regions.
201 if (regions.size() >= numRegions / NUM_RS) {
202 break;
205 Path logDir = new Path(rootdir,
206 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
208 LOG.info("#regions = " + regions.size());
209 Iterator<RegionInfo> it = regions.iterator();
210 while (it.hasNext()) {
211 RegionInfo region = it.next();
212 if (region.getTable().getNamespaceAsString()
213 .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
214 it.remove();
218 makeWAL(hrs, regions, numLogLines, 100);
220 slm.splitLogDistributed(logDir);
222 int count = 0;
223 for (RegionInfo hri : regions) {
224 @SuppressWarnings("deprecation")
225 Path editsdir = WALSplitUtil
226 .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
227 tableName, hri.getEncodedName()));
228 LOG.debug("checking edits dir " + editsdir);
229 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
230 @Override
231 public boolean accept(Path p) {
232 if (WALSplitUtil.isSequenceIdFile(p)) {
233 return false;
235 return true;
238 assertTrue(
239 "edits dir should have more than a single file in it. instead has " + files.length,
240 files.length > 1);
241 for (int i = 0; i < files.length; i++) {
242 int c = countWAL(files[i].getPath(), fs, conf);
243 count += c;
245 LOG.info(count + " edits in " + files.length + " recovered edits files.");
248 // check that the log file is moved
249 assertFalse(fs.exists(logDir));
250 assertEquals(numLogLines, count);
254 @Test
255 public void testMasterStartsUpWithLogSplittingWork() throws Exception {
256 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
257 startCluster(NUM_RS);
259 int numRegionsToCreate = 40;
260 int numLogLines = 1000;
261 // turn off load balancing to prevent regions from moving around otherwise
262 // they will consume recovered.edits
263 master.balanceSwitch(false);
265 try (Table ht = installTable(numRegionsToCreate)) {
266 HRegionServer hrs = findRSToKill(false);
267 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
268 makeWAL(hrs, regions, numLogLines, 100);
270 // abort master
271 abortMaster(cluster);
273 // abort RS
274 LOG.info("Aborting region server: " + hrs.getServerName());
275 hrs.abort("testing");
277 // wait for abort completes
278 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
279 @Override
280 public boolean evaluate() throws Exception {
281 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1;
285 Thread.sleep(2000);
286 LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
288 // wait for abort completes
289 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
290 @Override
291 public boolean evaluate() throws Exception {
292 return (HBaseTestingUtility.getAllOnlineRegions(cluster)
293 .size() >= (numRegionsToCreate + 1));
297 LOG.info("Current Open Regions After Master Node Starts Up:" +
298 HBaseTestingUtility.getAllOnlineRegions(cluster).size());
300 assertEquals(numLogLines, TEST_UTIL.countRows(ht));
305 * The original intention of this test was to force an abort of a region server and to make sure
306 * that the failure path in the region servers is properly evaluated. But it is difficult to
307 * ensure that the region server doesn't finish the log splitting before it aborts. Also now,
308 * there is this code path where the master will preempt the region server when master detects
309 * that the region server has aborted.
310 * @throws Exception
312 // Was marked flaky before Distributed Log Replay cleanup.
313 @Test
314 public void testWorkerAbort() throws Exception {
315 LOG.info("testWorkerAbort");
316 startCluster(3);
317 int numLogLines = 10000;
318 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
319 FileSystem fs = master.getMasterFileSystem().getFileSystem();
321 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
322 HRegionServer hrs = findRSToKill(false);
323 Path rootdir = FSUtils.getRootDir(conf);
324 final Path logDir = new Path(rootdir,
325 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
327 try (Table t = installTable(40)) {
328 makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
330 new Thread() {
331 @Override
332 public void run() {
333 try {
334 waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
335 } catch (InterruptedException e) {
337 for (RegionServerThread rst : rsts) {
338 rst.getRegionServer().abort("testing");
339 break;
342 }.start();
343 FileStatus[] logfiles = fs.listStatus(logDir);
344 TaskBatch batch = new TaskBatch();
345 slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
346 // waitForCounter but for one of the 2 counters
347 long curt = System.currentTimeMillis();
348 long waitTime = 80000;
349 long endt = curt + waitTime;
350 while (curt < endt) {
351 if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
352 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
353 tot_wkr_preempt_task.sum()) == 0) {
354 Thread.sleep(100);
355 curt = System.currentTimeMillis();
356 } else {
357 assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
358 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
359 tot_wkr_preempt_task.sum()));
360 return;
363 fail("none of the following counters went up in " + waitTime + " milliseconds - " +
364 "tot_wkr_task_resigned, tot_wkr_task_err, " +
365 "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
369 @Test
370 public void testThreeRSAbort() throws Exception {
371 LOG.info("testThreeRSAbort");
372 int numRegionsToCreate = 40;
373 int numRowsPerRegion = 100;
375 startCluster(NUM_RS); // NUM_RS=6.
377 try (Table table = installTable(numRegionsToCreate)) {
378 populateDataInTable(numRowsPerRegion);
380 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
381 assertEquals(NUM_RS, rsts.size());
382 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName());
383 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName());
384 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName());
386 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() {
388 @Override
389 public boolean evaluate() throws Exception {
390 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3;
393 @Override
394 public String explainFailure() throws Exception {
395 return "Timed out waiting for server aborts.";
398 TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
399 int rows;
400 try {
401 rows = TEST_UTIL.countRows(table);
402 } catch (Exception e) {
403 Threads.printThreadInfo(System.out, "Thread dump before fail");
404 throw e;
406 assertEquals(numRegionsToCreate * numRowsPerRegion, rows);
410 @Test
411 public void testDelayedDeleteOnFailure() throws Exception {
412 LOG.info("testDelayedDeleteOnFailure");
413 startCluster(1);
414 final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
415 final FileSystem fs = master.getMasterFileSystem().getFileSystem();
416 final Path rootLogDir = new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME);
417 final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString());
418 fs.mkdirs(logDir);
419 ExecutorService executor = null;
420 try {
421 final Path corruptedLogFile = new Path(logDir, "x");
422 FSDataOutputStream out;
423 out = fs.create(corruptedLogFile);
424 out.write(0);
425 out.write(Bytes.toBytes("corrupted bytes"));
426 out.close();
427 ZKSplitLogManagerCoordination coordination =
428 (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
429 .getSplitLogManagerCoordination();
430 coordination.setIgnoreDeleteForTesting(true);
431 executor = Executors.newSingleThreadExecutor();
432 Runnable runnable = new Runnable() {
433 @Override
434 public void run() {
435 try {
436 // since the logDir is a fake, corrupted one, so the split log worker
437 // will finish it quickly with error, and this call will fail and throw
438 // an IOException.
439 slm.splitLogDistributed(logDir);
440 } catch (IOException ioe) {
441 try {
442 assertTrue(fs.exists(corruptedLogFile));
443 // this call will block waiting for the task to be removed from the
444 // tasks map which is not going to happen since ignoreZKDeleteForTesting
445 // is set to true, until it is interrupted.
446 slm.splitLogDistributed(logDir);
447 } catch (IOException e) {
448 assertTrue(Thread.currentThread().isInterrupted());
449 return;
451 fail("did not get the expected IOException from the 2nd call");
453 fail("did not get the expected IOException from the 1st call");
456 Future<?> result = executor.submit(runnable);
457 try {
458 result.get(2000, TimeUnit.MILLISECONDS);
459 } catch (TimeoutException te) {
460 // it is ok, expected.
462 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
463 executor.shutdownNow();
464 executor = null;
466 // make sure the runnable is finished with no exception thrown.
467 result.get();
468 } finally {
469 if (executor != null) {
470 // interrupt the thread in case the test fails in the middle.
471 // it has no effect if the thread is already terminated.
472 executor.shutdownNow();
474 fs.delete(logDir, true);
478 private Table installTable(int nrs) throws Exception {
479 return installTable(nrs, 0);
482 private Table installTable(int nrs, int existingRegions) throws Exception {
483 // Create a table with regions
484 byte[] family = Bytes.toBytes("family");
485 LOG.info("Creating table with " + nrs + " regions");
486 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs);
487 int numRegions = -1;
488 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
489 numRegions = r.getStartKeys().length;
491 assertEquals(nrs, numRegions);
492 LOG.info("Waiting for no more RIT\n");
493 blockUntilNoRIT();
494 // disable-enable cycle to get rid of table's dead regions left behind
495 // by createMultiRegions
496 assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
497 LOG.debug("Disabling table\n");
498 TEST_UTIL.getAdmin().disableTable(tableName);
499 LOG.debug("Waiting for no more RIT\n");
500 blockUntilNoRIT();
501 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
502 LOG.debug("Verifying only catalog region is assigned\n");
503 if (regions.size() != 1) {
504 for (String oregion : regions)
505 LOG.debug("Region still online: " + oregion);
507 assertEquals(1 + existingRegions, regions.size());
508 LOG.debug("Enabling table\n");
509 TEST_UTIL.getAdmin().enableTable(tableName);
510 LOG.debug("Waiting for no more RIT\n");
511 blockUntilNoRIT();
512 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
513 regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
514 assertEquals(numRegions + 1 + existingRegions, regions.size());
515 return table;
518 void populateDataInTable(int nrows) throws Exception {
519 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
520 assertEquals(NUM_RS, rsts.size());
522 for (RegionServerThread rst : rsts) {
523 HRegionServer hrs = rst.getRegionServer();
524 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
525 for (RegionInfo hri : hris) {
526 if (hri.getTable().isSystemTable()) {
527 continue;
529 LOG.debug(
530 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString());
531 Region region = hrs.getOnlineRegion(hri.getRegionName());
532 assertTrue(region != null);
533 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
537 for (MasterThread mt : cluster.getLiveMasterThreads()) {
538 HRegionServer hrs = mt.getMaster();
539 List<RegionInfo> hris;
540 try {
541 hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
542 } catch (ServerNotRunningYetException e) {
543 // It's ok: this master may be a backup. Ignored.
544 continue;
546 for (RegionInfo hri : hris) {
547 if (hri.getTable().isSystemTable()) {
548 continue;
550 LOG.debug(
551 "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString());
552 Region region = hrs.getOnlineRegion(hri.getRegionName());
553 assertTrue(region != null);
554 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY);
559 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size)
560 throws IOException {
561 makeWAL(hrs, regions, num_edits, edit_size, true);
564 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize,
565 boolean cleanShutdown) throws IOException {
566 // remove root and meta region
567 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO);
569 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) {
570 RegionInfo regionInfo = iter.next();
571 if (regionInfo.getTable().isSystemTable()) {
572 iter.remove();
575 byte[] value = new byte[editSize];
577 List<RegionInfo> hris = new ArrayList<>();
578 for (RegionInfo region : regions) {
579 if (region.getTable() != tableName) {
580 continue;
582 hris.add(region);
584 LOG.info("Creating wal edits across " + hris.size() + " regions.");
585 for (int i = 0; i < editSize; i++) {
586 value[i] = (byte) ('a' + (i % 26));
588 int n = hris.size();
589 int[] counts = new int[n];
590 // sync every ~30k to line up with desired wal rolls
591 final int syncEvery = 30 * 1024 / editSize;
592 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
593 if (n > 0) {
594 for (int i = 0; i < numEdits; i += 1) {
595 WALEdit e = new WALEdit();
596 RegionInfo curRegionInfo = hris.get(i % n);
597 WAL log = hrs.getWAL(curRegionInfo);
598 byte[] startRow = curRegionInfo.getStartKey();
599 if (startRow == null || startRow.length == 0) {
600 startRow = new byte[] { 0, 0, 0, 0, 1 };
602 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
603 row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
604 // HBaseTestingUtility.createMultiRegions use 5 bytes key
605 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
606 e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
607 log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(),
608 tableName, System.currentTimeMillis(), mvcc), e);
609 if (0 == i % syncEvery) {
610 log.sync();
612 counts[i % n] += 1;
615 // done as two passes because the regions might share logs. shutdown is idempotent, but sync
616 // will cause errors if done after.
617 for (RegionInfo info : hris) {
618 WAL log = hrs.getWAL(info);
619 log.sync();
621 if (cleanShutdown) {
622 for (RegionInfo info : hris) {
623 WAL log = hrs.getWAL(info);
624 log.shutdown();
627 for (int i = 0; i < n; i++) {
628 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
630 return;
633 private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
634 int count = 0;
635 try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
636 WAL.Entry e;
637 while ((e = in.next()) != null) {
638 if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
639 count++;
643 return count;
646 private void blockUntilNoRIT() throws Exception {
647 TEST_UTIL.waitUntilNoRegionsInTransition(60000);
650 private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families)
651 throws IOException {
652 for (int i = 0; i < numRows; i++) {
653 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
654 for (byte[] family : families) {
655 put.addColumn(family, qf, null);
657 region.put(put);
661 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
662 throws InterruptedException {
663 long curt = System.currentTimeMillis();
664 long endt = curt + timems;
665 while (curt < endt) {
666 if (ctr.sum() == oldval) {
667 Thread.sleep(100);
668 curt = System.currentTimeMillis();
669 } else {
670 assertEquals(newval, ctr.sum());
671 return;
674 fail();
677 private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
678 for (MasterThread mt : cluster.getLiveMasterThreads()) {
679 if (mt.getMaster().isActiveMaster()) {
680 mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
681 mt.join();
682 break;
685 LOG.debug("Master is aborted");
689 * Find a RS that has regions of a table.
690 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
692 private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception {
693 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
694 List<RegionInfo> regions = null;
695 HRegionServer hrs = null;
697 for (RegionServerThread rst : rsts) {
698 hrs = rst.getRegionServer();
699 while (rst.isAlive() && !hrs.isOnline()) {
700 Thread.sleep(100);
702 if (!rst.isAlive()) {
703 continue;
705 boolean isCarryingMeta = false;
706 boolean foundTableRegion = false;
707 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
708 for (RegionInfo region : regions) {
709 if (region.isMetaRegion()) {
710 isCarryingMeta = true;
712 if (region.getTable() == tableName) {
713 foundTableRegion = true;
715 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
716 break;
719 if (isCarryingMeta && hasMetaRegion) {
720 // clients ask for a RS with META
721 if (!foundTableRegion) {
722 HRegionServer destRS = hrs;
723 // the RS doesn't have regions of the specified table so we need move one to this RS
724 List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName);
725 RegionInfo hri = tableRegions.get(0);
726 TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName());
727 // wait for region move completes
728 RegionStates regionStates =
729 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
730 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
731 @Override
732 public boolean evaluate() throws Exception {
733 ServerName sn = regionStates.getRegionServerOfRegion(hri);
734 return (sn != null && sn.equals(destRS.getServerName()));
738 return hrs;
739 } else if (hasMetaRegion || isCarryingMeta) {
740 continue;
742 if (foundTableRegion) {
743 break;
747 return hrs;