3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.master
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_MAX_SPLITTER
;
22 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_wait_for_zk_delete
;
23 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_wkr_final_transition_failed
;
24 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_wkr_preempt_task
;
25 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_wkr_task_acquired
;
26 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_wkr_task_done
;
27 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_wkr_task_err
;
28 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_wkr_task_resigned
;
29 import static org
.junit
.Assert
.assertEquals
;
30 import static org
.junit
.Assert
.assertFalse
;
31 import static org
.junit
.Assert
.assertTrue
;
32 import static org
.junit
.Assert
.fail
;
34 import java
.io
.IOException
;
35 import java
.util
.ArrayList
;
36 import java
.util
.Arrays
;
37 import java
.util
.Iterator
;
38 import java
.util
.List
;
39 import java
.util
.NavigableSet
;
40 import java
.util
.concurrent
.ExecutorService
;
41 import java
.util
.concurrent
.Executors
;
42 import java
.util
.concurrent
.Future
;
43 import java
.util
.concurrent
.TimeUnit
;
44 import java
.util
.concurrent
.TimeoutException
;
45 import java
.util
.concurrent
.atomic
.LongAdder
;
46 import org
.apache
.hadoop
.conf
.Configuration
;
47 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
48 import org
.apache
.hadoop
.fs
.FileStatus
;
49 import org
.apache
.hadoop
.fs
.FileSystem
;
50 import org
.apache
.hadoop
.fs
.Path
;
51 import org
.apache
.hadoop
.fs
.PathFilter
;
52 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
53 import org
.apache
.hadoop
.hbase
.HConstants
;
54 import org
.apache
.hadoop
.hbase
.KeyValue
;
55 import org
.apache
.hadoop
.hbase
.MiniHBaseCluster
;
56 import org
.apache
.hadoop
.hbase
.NamespaceDescriptor
;
57 import org
.apache
.hadoop
.hbase
.ServerName
;
58 import org
.apache
.hadoop
.hbase
.SplitLogCounters
;
59 import org
.apache
.hadoop
.hbase
.StartMiniClusterOption
;
60 import org
.apache
.hadoop
.hbase
.TableName
;
61 import org
.apache
.hadoop
.hbase
.Waiter
;
62 import org
.apache
.hadoop
.hbase
.client
.Put
;
63 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
64 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
65 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
66 import org
.apache
.hadoop
.hbase
.client
.Table
;
67 import org
.apache
.hadoop
.hbase
.coordination
.ZKSplitLogManagerCoordination
;
68 import org
.apache
.hadoop
.hbase
.ipc
.ServerNotRunningYetException
;
69 import org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.TaskBatch
;
70 import org
.apache
.hadoop
.hbase
.master
.assignment
.RegionStates
;
71 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
72 import org
.apache
.hadoop
.hbase
.regionserver
.MultiVersionConcurrencyControl
;
73 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
74 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
75 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
76 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.MasterThread
;
77 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
78 import org
.apache
.hadoop
.hbase
.util
.Threads
;
79 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
80 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
81 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
82 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
83 import org
.apache
.hadoop
.hbase
.wal
.WALKeyImpl
;
84 import org
.apache
.hadoop
.hbase
.wal
.WALSplitUtil
;
85 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
86 import org
.junit
.After
;
87 import org
.junit
.AfterClass
;
88 import org
.junit
.Before
;
89 import org
.junit
.BeforeClass
;
90 import org
.junit
.Rule
;
91 import org
.junit
.Test
;
92 import org
.junit
.rules
.TestName
;
93 import org
.slf4j
.Logger
;
94 import org
.slf4j
.LoggerFactory
;
96 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
99 * Base class for testing distributed log splitting.
101 public abstract class AbstractTestDLS
{
102 private static final Logger LOG
= LoggerFactory
.getLogger(TestSplitLogManager
.class);
104 private static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
106 // Start a cluster with 2 masters and 5 regionservers
107 private static final int NUM_MASTERS
= 2;
108 private static final int NUM_RS
= 5;
109 private static byte[] COLUMN_FAMILY
= Bytes
.toBytes("family");
112 public TestName testName
= new TestName();
114 private TableName tableName
;
115 private MiniHBaseCluster cluster
;
116 private HMaster master
;
117 private Configuration conf
;
120 public TestName name
= new TestName();
123 public static void setup() throws Exception
{
124 // Uncomment the following line if more verbosity is needed for
125 // debugging (see HBASE-12285 for details).
126 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
127 TEST_UTIL
.startMiniZKCluster();
128 TEST_UTIL
.startMiniDFSCluster(3);
132 public static void tearDown() throws Exception
{
133 TEST_UTIL
.shutdownMiniCluster();
136 protected abstract String
getWalProvider();
138 private void startCluster(int numRS
) throws Exception
{
139 SplitLogCounters
.resetCounters();
140 LOG
.info("Starting cluster");
141 conf
.setLong("hbase.splitlog.max.resubmit", 0);
142 // Make the failure test faster
143 conf
.setInt("zookeeper.recovery.retry", 0);
144 conf
.setInt(HConstants
.REGIONSERVER_INFO_PORT
, -1);
145 conf
.setFloat(HConstants
.LOAD_BALANCER_SLOP_KEY
, (float) 100.0); // no load balancing
146 conf
.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER
, 3);
147 conf
.setInt(HConstants
.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT
, 10);
148 conf
.set("hbase.wal.provider", getWalProvider());
149 StartMiniClusterOption option
= StartMiniClusterOption
.builder()
150 .numMasters(NUM_MASTERS
).numRegionServers(numRS
).build();
151 TEST_UTIL
.startMiniHBaseCluster(option
);
152 cluster
= TEST_UTIL
.getHBaseCluster();
153 LOG
.info("Waiting for active/ready master");
154 cluster
.waitForActiveAndReadyMaster();
155 master
= cluster
.getMaster();
156 TEST_UTIL
.waitFor(120000, 200, new Waiter
.Predicate
<Exception
>() {
158 public boolean evaluate() throws Exception
{
159 return cluster
.getLiveRegionServerThreads().size() >= numRS
;
165 public void before() throws Exception
{
166 conf
= TEST_UTIL
.getConfiguration();
167 tableName
= TableName
.valueOf(testName
.getMethodName());
171 public void after() throws Exception
{
172 TEST_UTIL
.shutdownMiniHBaseCluster();
173 TEST_UTIL
.getTestFileSystem().delete(FSUtils
.getRootDir(TEST_UTIL
.getConfiguration()), true);
174 ZKUtil
.deleteNodeRecursively(TEST_UTIL
.getZooKeeperWatcher(), "/hbase");
178 public void testRecoveredEdits() throws Exception
{
179 conf
.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
180 startCluster(NUM_RS
);
182 int numLogLines
= 10000;
183 SplitLogManager slm
= master
.getMasterWalManager().getSplitLogManager();
184 // turn off load balancing to prevent regions from moving around otherwise
185 // they will consume recovered.edits
186 master
.balanceSwitch(false);
187 FileSystem fs
= master
.getMasterFileSystem().getFileSystem();
189 List
<RegionServerThread
> rsts
= cluster
.getLiveRegionServerThreads();
191 Path rootdir
= FSUtils
.getRootDir(conf
);
194 try (Table t
= installTable(numRegions
)) {
195 List
<RegionInfo
> regions
= null;
196 HRegionServer hrs
= null;
197 for (int i
= 0; i
< NUM_RS
; i
++) {
198 hrs
= rsts
.get(i
).getRegionServer();
199 regions
= ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices());
200 // At least one RS will have >= to average number of regions.
201 if (regions
.size() >= numRegions
/ NUM_RS
) {
205 Path logDir
= new Path(rootdir
,
206 AbstractFSWALProvider
.getWALDirectoryName(hrs
.getServerName().toString()));
208 LOG
.info("#regions = " + regions
.size());
209 Iterator
<RegionInfo
> it
= regions
.iterator();
210 while (it
.hasNext()) {
211 RegionInfo region
= it
.next();
212 if (region
.getTable().getNamespaceAsString()
213 .equals(NamespaceDescriptor
.SYSTEM_NAMESPACE_NAME_STR
)) {
218 makeWAL(hrs
, regions
, numLogLines
, 100);
220 slm
.splitLogDistributed(logDir
);
223 for (RegionInfo hri
: regions
) {
224 @SuppressWarnings("deprecation")
225 Path editsdir
= WALSplitUtil
226 .getRegionDirRecoveredEditsDir(FSUtils
.getWALRegionDir(conf
,
227 tableName
, hri
.getEncodedName()));
228 LOG
.debug("checking edits dir " + editsdir
);
229 FileStatus
[] files
= fs
.listStatus(editsdir
, new PathFilter() {
231 public boolean accept(Path p
) {
232 if (WALSplitUtil
.isSequenceIdFile(p
)) {
239 "edits dir should have more than a single file in it. instead has " + files
.length
,
241 for (int i
= 0; i
< files
.length
; i
++) {
242 int c
= countWAL(files
[i
].getPath(), fs
, conf
);
245 LOG
.info(count
+ " edits in " + files
.length
+ " recovered edits files.");
248 // check that the log file is moved
249 assertFalse(fs
.exists(logDir
));
250 assertEquals(numLogLines
, count
);
255 public void testMasterStartsUpWithLogSplittingWork() throws Exception
{
256 conf
.setInt(ServerManager
.WAIT_ON_REGIONSERVERS_MINTOSTART
, NUM_RS
- 1);
257 startCluster(NUM_RS
);
259 int numRegionsToCreate
= 40;
260 int numLogLines
= 1000;
261 // turn off load balancing to prevent regions from moving around otherwise
262 // they will consume recovered.edits
263 master
.balanceSwitch(false);
265 try (Table ht
= installTable(numRegionsToCreate
)) {
266 HRegionServer hrs
= findRSToKill(false);
267 List
<RegionInfo
> regions
= ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices());
268 makeWAL(hrs
, regions
, numLogLines
, 100);
271 abortMaster(cluster
);
274 LOG
.info("Aborting region server: " + hrs
.getServerName());
275 hrs
.abort("testing");
277 // wait for abort completes
278 TEST_UTIL
.waitFor(120000, 200, new Waiter
.Predicate
<Exception
>() {
280 public boolean evaluate() throws Exception
{
281 return cluster
.getLiveRegionServerThreads().size() <= NUM_RS
- 1;
286 LOG
.info("Current Open Regions:" + HBaseTestingUtility
.getAllOnlineRegions(cluster
).size());
288 // wait for abort completes
289 TEST_UTIL
.waitFor(120000, 200, new Waiter
.Predicate
<Exception
>() {
291 public boolean evaluate() throws Exception
{
292 return (HBaseTestingUtility
.getAllOnlineRegions(cluster
)
293 .size() >= (numRegionsToCreate
+ 1));
297 LOG
.info("Current Open Regions After Master Node Starts Up:" +
298 HBaseTestingUtility
.getAllOnlineRegions(cluster
).size());
300 assertEquals(numLogLines
, TEST_UTIL
.countRows(ht
));
305 * The original intention of this test was to force an abort of a region server and to make sure
306 * that the failure path in the region servers is properly evaluated. But it is difficult to
307 * ensure that the region server doesn't finish the log splitting before it aborts. Also now,
308 * there is this code path where the master will preempt the region server when master detects
309 * that the region server has aborted.
312 // Was marked flaky before Distributed Log Replay cleanup.
314 public void testWorkerAbort() throws Exception
{
315 LOG
.info("testWorkerAbort");
317 int numLogLines
= 10000;
318 SplitLogManager slm
= master
.getMasterWalManager().getSplitLogManager();
319 FileSystem fs
= master
.getMasterFileSystem().getFileSystem();
321 List
<RegionServerThread
> rsts
= cluster
.getLiveRegionServerThreads();
322 HRegionServer hrs
= findRSToKill(false);
323 Path rootdir
= FSUtils
.getRootDir(conf
);
324 final Path logDir
= new Path(rootdir
,
325 AbstractFSWALProvider
.getWALDirectoryName(hrs
.getServerName().toString()));
327 try (Table t
= installTable(40)) {
328 makeWAL(hrs
, ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices()), numLogLines
, 100);
334 waitForCounter(tot_wkr_task_acquired
, 0, 1, 1000);
335 } catch (InterruptedException e
) {
337 for (RegionServerThread rst
: rsts
) {
338 rst
.getRegionServer().abort("testing");
343 FileStatus
[] logfiles
= fs
.listStatus(logDir
);
344 TaskBatch batch
= new TaskBatch();
345 slm
.enqueueSplitTask(logfiles
[0].getPath().toString(), batch
);
346 // waitForCounter but for one of the 2 counters
347 long curt
= System
.currentTimeMillis();
348 long waitTime
= 80000;
349 long endt
= curt
+ waitTime
;
350 while (curt
< endt
) {
351 if ((tot_wkr_task_resigned
.sum() + tot_wkr_task_err
.sum() +
352 tot_wkr_final_transition_failed
.sum() + tot_wkr_task_done
.sum() +
353 tot_wkr_preempt_task
.sum()) == 0) {
355 curt
= System
.currentTimeMillis();
357 assertTrue(1 <= (tot_wkr_task_resigned
.sum() + tot_wkr_task_err
.sum() +
358 tot_wkr_final_transition_failed
.sum() + tot_wkr_task_done
.sum() +
359 tot_wkr_preempt_task
.sum()));
363 fail("none of the following counters went up in " + waitTime
+ " milliseconds - " +
364 "tot_wkr_task_resigned, tot_wkr_task_err, " +
365 "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
370 public void testThreeRSAbort() throws Exception
{
371 LOG
.info("testThreeRSAbort");
372 int numRegionsToCreate
= 40;
373 int numRowsPerRegion
= 100;
375 startCluster(NUM_RS
); // NUM_RS=6.
377 try (Table table
= installTable(numRegionsToCreate
)) {
378 populateDataInTable(numRowsPerRegion
);
380 List
<RegionServerThread
> rsts
= cluster
.getLiveRegionServerThreads();
381 assertEquals(NUM_RS
, rsts
.size());
382 cluster
.killRegionServer(rsts
.get(0).getRegionServer().getServerName());
383 cluster
.killRegionServer(rsts
.get(1).getRegionServer().getServerName());
384 cluster
.killRegionServer(rsts
.get(2).getRegionServer().getServerName());
386 TEST_UTIL
.waitFor(60000, new Waiter
.ExplainingPredicate
<Exception
>() {
389 public boolean evaluate() throws Exception
{
390 return cluster
.getLiveRegionServerThreads().size() <= NUM_RS
- 3;
394 public String
explainFailure() throws Exception
{
395 return "Timed out waiting for server aborts.";
398 TEST_UTIL
.waitUntilAllRegionsAssigned(tableName
);
401 rows
= TEST_UTIL
.countRows(table
);
402 } catch (Exception e
) {
403 Threads
.printThreadInfo(System
.out
, "Thread dump before fail");
406 assertEquals(numRegionsToCreate
* numRowsPerRegion
, rows
);
411 public void testDelayedDeleteOnFailure() throws Exception
{
412 LOG
.info("testDelayedDeleteOnFailure");
414 final SplitLogManager slm
= master
.getMasterWalManager().getSplitLogManager();
415 final FileSystem fs
= master
.getMasterFileSystem().getFileSystem();
416 final Path rootLogDir
= new Path(FSUtils
.getWALRootDir(conf
), HConstants
.HREGION_LOGDIR_NAME
);
417 final Path logDir
= new Path(rootLogDir
, ServerName
.valueOf("x", 1, 1).toString());
419 ExecutorService executor
= null;
421 final Path corruptedLogFile
= new Path(logDir
, "x");
422 FSDataOutputStream out
;
423 out
= fs
.create(corruptedLogFile
);
425 out
.write(Bytes
.toBytes("corrupted bytes"));
427 ZKSplitLogManagerCoordination coordination
=
428 (ZKSplitLogManagerCoordination
) (master
.getCoordinatedStateManager())
429 .getSplitLogManagerCoordination();
430 coordination
.setIgnoreDeleteForTesting(true);
431 executor
= Executors
.newSingleThreadExecutor();
432 Runnable runnable
= new Runnable() {
436 // since the logDir is a fake, corrupted one, so the split log worker
437 // will finish it quickly with error, and this call will fail and throw
439 slm
.splitLogDistributed(logDir
);
440 } catch (IOException ioe
) {
442 assertTrue(fs
.exists(corruptedLogFile
));
443 // this call will block waiting for the task to be removed from the
444 // tasks map which is not going to happen since ignoreZKDeleteForTesting
445 // is set to true, until it is interrupted.
446 slm
.splitLogDistributed(logDir
);
447 } catch (IOException e
) {
448 assertTrue(Thread
.currentThread().isInterrupted());
451 fail("did not get the expected IOException from the 2nd call");
453 fail("did not get the expected IOException from the 1st call");
456 Future
<?
> result
= executor
.submit(runnable
);
458 result
.get(2000, TimeUnit
.MILLISECONDS
);
459 } catch (TimeoutException te
) {
460 // it is ok, expected.
462 waitForCounter(tot_mgr_wait_for_zk_delete
, 0, 1, 10000);
463 executor
.shutdownNow();
466 // make sure the runnable is finished with no exception thrown.
469 if (executor
!= null) {
470 // interrupt the thread in case the test fails in the middle.
471 // it has no effect if the thread is already terminated.
472 executor
.shutdownNow();
474 fs
.delete(logDir
, true);
478 private Table
installTable(int nrs
) throws Exception
{
479 return installTable(nrs
, 0);
482 private Table
installTable(int nrs
, int existingRegions
) throws Exception
{
483 // Create a table with regions
484 byte[] family
= Bytes
.toBytes("family");
485 LOG
.info("Creating table with " + nrs
+ " regions");
486 Table table
= TEST_UTIL
.createMultiRegionTable(tableName
, family
, nrs
);
488 try (RegionLocator r
= TEST_UTIL
.getConnection().getRegionLocator(tableName
)) {
489 numRegions
= r
.getStartKeys().length
;
491 assertEquals(nrs
, numRegions
);
492 LOG
.info("Waiting for no more RIT\n");
494 // disable-enable cycle to get rid of table's dead regions left behind
495 // by createMultiRegions
496 assertTrue(TEST_UTIL
.getAdmin().isTableEnabled(tableName
));
497 LOG
.debug("Disabling table\n");
498 TEST_UTIL
.getAdmin().disableTable(tableName
);
499 LOG
.debug("Waiting for no more RIT\n");
501 NavigableSet
<String
> regions
= HBaseTestingUtility
.getAllOnlineRegions(cluster
);
502 LOG
.debug("Verifying only catalog region is assigned\n");
503 if (regions
.size() != 1) {
504 for (String oregion
: regions
)
505 LOG
.debug("Region still online: " + oregion
);
507 assertEquals(1 + existingRegions
, regions
.size());
508 LOG
.debug("Enabling table\n");
509 TEST_UTIL
.getAdmin().enableTable(tableName
);
510 LOG
.debug("Waiting for no more RIT\n");
512 LOG
.debug("Verifying there are " + numRegions
+ " assigned on cluster\n");
513 regions
= HBaseTestingUtility
.getAllOnlineRegions(cluster
);
514 assertEquals(numRegions
+ 1 + existingRegions
, regions
.size());
518 void populateDataInTable(int nrows
) throws Exception
{
519 List
<RegionServerThread
> rsts
= cluster
.getLiveRegionServerThreads();
520 assertEquals(NUM_RS
, rsts
.size());
522 for (RegionServerThread rst
: rsts
) {
523 HRegionServer hrs
= rst
.getRegionServer();
524 List
<RegionInfo
> hris
= ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices());
525 for (RegionInfo hri
: hris
) {
526 if (hri
.getTable().isSystemTable()) {
530 "adding data to rs = " + rst
.getName() + " region = " + hri
.getRegionNameAsString());
531 Region region
= hrs
.getOnlineRegion(hri
.getRegionName());
532 assertTrue(region
!= null);
533 putData(region
, hri
.getStartKey(), nrows
, Bytes
.toBytes("q"), COLUMN_FAMILY
);
537 for (MasterThread mt
: cluster
.getLiveMasterThreads()) {
538 HRegionServer hrs
= mt
.getMaster();
539 List
<RegionInfo
> hris
;
541 hris
= ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices());
542 } catch (ServerNotRunningYetException e
) {
543 // It's ok: this master may be a backup. Ignored.
546 for (RegionInfo hri
: hris
) {
547 if (hri
.getTable().isSystemTable()) {
551 "adding data to rs = " + mt
.getName() + " region = " + hri
.getRegionNameAsString());
552 Region region
= hrs
.getOnlineRegion(hri
.getRegionName());
553 assertTrue(region
!= null);
554 putData(region
, hri
.getStartKey(), nrows
, Bytes
.toBytes("q"), COLUMN_FAMILY
);
559 public void makeWAL(HRegionServer hrs
, List
<RegionInfo
> regions
, int num_edits
, int edit_size
)
561 makeWAL(hrs
, regions
, num_edits
, edit_size
, true);
564 public void makeWAL(HRegionServer hrs
, List
<RegionInfo
> regions
, int numEdits
, int editSize
,
565 boolean cleanShutdown
) throws IOException
{
566 // remove root and meta region
567 regions
.remove(RegionInfoBuilder
.FIRST_META_REGIONINFO
);
569 for (Iterator
<RegionInfo
> iter
= regions
.iterator(); iter
.hasNext();) {
570 RegionInfo regionInfo
= iter
.next();
571 if (regionInfo
.getTable().isSystemTable()) {
575 byte[] value
= new byte[editSize
];
577 List
<RegionInfo
> hris
= new ArrayList
<>();
578 for (RegionInfo region
: regions
) {
579 if (region
.getTable() != tableName
) {
584 LOG
.info("Creating wal edits across " + hris
.size() + " regions.");
585 for (int i
= 0; i
< editSize
; i
++) {
586 value
[i
] = (byte) ('a' + (i
% 26));
589 int[] counts
= new int[n
];
590 // sync every ~30k to line up with desired wal rolls
591 final int syncEvery
= 30 * 1024 / editSize
;
592 MultiVersionConcurrencyControl mvcc
= new MultiVersionConcurrencyControl();
594 for (int i
= 0; i
< numEdits
; i
+= 1) {
595 WALEdit e
= new WALEdit();
596 RegionInfo curRegionInfo
= hris
.get(i
% n
);
597 WAL log
= hrs
.getWAL(curRegionInfo
);
598 byte[] startRow
= curRegionInfo
.getStartKey();
599 if (startRow
== null || startRow
.length
== 0) {
600 startRow
= new byte[] { 0, 0, 0, 0, 1 };
602 byte[] row
= Bytes
.incrementBytes(startRow
, counts
[i
% n
]);
603 row
= Arrays
.copyOfRange(row
, 3, 8); // use last 5 bytes because
604 // HBaseTestingUtility.createMultiRegions use 5 bytes key
605 byte[] qualifier
= Bytes
.toBytes("c" + Integer
.toString(i
));
606 e
.add(new KeyValue(row
, COLUMN_FAMILY
, qualifier
, System
.currentTimeMillis(), value
));
607 log
.appendData(curRegionInfo
, new WALKeyImpl(curRegionInfo
.getEncodedNameAsBytes(),
608 tableName
, System
.currentTimeMillis(), mvcc
), e
);
609 if (0 == i
% syncEvery
) {
615 // done as two passes because the regions might share logs. shutdown is idempotent, but sync
616 // will cause errors if done after.
617 for (RegionInfo info
: hris
) {
618 WAL log
= hrs
.getWAL(info
);
622 for (RegionInfo info
: hris
) {
623 WAL log
= hrs
.getWAL(info
);
627 for (int i
= 0; i
< n
; i
++) {
628 LOG
.info("region " + hris
.get(i
).getRegionNameAsString() + " has " + counts
[i
] + " edits");
633 private int countWAL(Path log
, FileSystem fs
, Configuration conf
) throws IOException
{
635 try (WAL
.Reader in
= WALFactory
.createReader(fs
, log
, conf
)) {
637 while ((e
= in
.next()) != null) {
638 if (!WALEdit
.isMetaEditFamily(e
.getEdit().getCells().get(0))) {
646 private void blockUntilNoRIT() throws Exception
{
647 TEST_UTIL
.waitUntilNoRegionsInTransition(60000);
650 private void putData(Region region
, byte[] startRow
, int numRows
, byte[] qf
, byte[]... families
)
652 for (int i
= 0; i
< numRows
; i
++) {
653 Put put
= new Put(Bytes
.add(startRow
, Bytes
.toBytes(i
)));
654 for (byte[] family
: families
) {
655 put
.addColumn(family
, qf
, null);
661 private void waitForCounter(LongAdder ctr
, long oldval
, long newval
, long timems
)
662 throws InterruptedException
{
663 long curt
= System
.currentTimeMillis();
664 long endt
= curt
+ timems
;
665 while (curt
< endt
) {
666 if (ctr
.sum() == oldval
) {
668 curt
= System
.currentTimeMillis();
670 assertEquals(newval
, ctr
.sum());
677 private void abortMaster(MiniHBaseCluster cluster
) throws InterruptedException
{
678 for (MasterThread mt
: cluster
.getLiveMasterThreads()) {
679 if (mt
.getMaster().isActiveMaster()) {
680 mt
.getMaster().abort("Aborting for tests", new Exception("Trace info"));
685 LOG
.debug("Master is aborted");
689 * Find a RS that has regions of a table.
690 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
692 private HRegionServer
findRSToKill(boolean hasMetaRegion
) throws Exception
{
693 List
<RegionServerThread
> rsts
= cluster
.getLiveRegionServerThreads();
694 List
<RegionInfo
> regions
= null;
695 HRegionServer hrs
= null;
697 for (RegionServerThread rst
: rsts
) {
698 hrs
= rst
.getRegionServer();
699 while (rst
.isAlive() && !hrs
.isOnline()) {
702 if (!rst
.isAlive()) {
705 boolean isCarryingMeta
= false;
706 boolean foundTableRegion
= false;
707 regions
= ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices());
708 for (RegionInfo region
: regions
) {
709 if (region
.isMetaRegion()) {
710 isCarryingMeta
= true;
712 if (region
.getTable() == tableName
) {
713 foundTableRegion
= true;
715 if (foundTableRegion
&& (isCarryingMeta
|| !hasMetaRegion
)) {
719 if (isCarryingMeta
&& hasMetaRegion
) {
720 // clients ask for a RS with META
721 if (!foundTableRegion
) {
722 HRegionServer destRS
= hrs
;
723 // the RS doesn't have regions of the specified table so we need move one to this RS
724 List
<RegionInfo
> tableRegions
= TEST_UTIL
.getAdmin().getRegions(tableName
);
725 RegionInfo hri
= tableRegions
.get(0);
726 TEST_UTIL
.getAdmin().move(hri
.getEncodedNameAsBytes(), destRS
.getServerName());
727 // wait for region move completes
728 RegionStates regionStates
=
729 TEST_UTIL
.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
730 TEST_UTIL
.waitFor(45000, 200, new Waiter
.Predicate
<Exception
>() {
732 public boolean evaluate() throws Exception
{
733 ServerName sn
= regionStates
.getRegionServerOfRegion(hri
);
734 return (sn
!= null && sn
.equals(destRS
.getServerName()));
739 } else if (hasMetaRegion
|| isCarryingMeta
) {
742 if (foundTableRegion
) {