2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.master
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_MAX_SPLITTER
;
21 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_wait_for_zk_delete
;
22 import static org
.junit
.Assert
.assertEquals
;
23 import static org
.junit
.Assert
.assertTrue
;
24 import static org
.junit
.Assert
.fail
;
26 import java
.io
.IOException
;
27 import java
.util
.ArrayList
;
28 import java
.util
.Arrays
;
29 import java
.util
.Iterator
;
30 import java
.util
.List
;
31 import java
.util
.NavigableSet
;
32 import java
.util
.concurrent
.ExecutorService
;
33 import java
.util
.concurrent
.Executors
;
34 import java
.util
.concurrent
.Future
;
35 import java
.util
.concurrent
.TimeUnit
;
36 import java
.util
.concurrent
.TimeoutException
;
37 import java
.util
.concurrent
.atomic
.LongAdder
;
38 import org
.apache
.hadoop
.conf
.Configuration
;
39 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
40 import org
.apache
.hadoop
.fs
.FileSystem
;
41 import org
.apache
.hadoop
.fs
.Path
;
42 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
43 import org
.apache
.hadoop
.hbase
.HConstants
;
44 import org
.apache
.hadoop
.hbase
.KeyValue
;
45 import org
.apache
.hadoop
.hbase
.ServerName
;
46 import org
.apache
.hadoop
.hbase
.SingleProcessHBaseCluster
;
47 import org
.apache
.hadoop
.hbase
.SplitLogCounters
;
48 import org
.apache
.hadoop
.hbase
.StartTestingClusterOption
;
49 import org
.apache
.hadoop
.hbase
.TableName
;
50 import org
.apache
.hadoop
.hbase
.Waiter
;
51 import org
.apache
.hadoop
.hbase
.client
.Put
;
52 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
53 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
54 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
55 import org
.apache
.hadoop
.hbase
.client
.Table
;
56 import org
.apache
.hadoop
.hbase
.coordination
.ZKSplitLogManagerCoordination
;
57 import org
.apache
.hadoop
.hbase
.master
.assignment
.RegionStates
;
58 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
59 import org
.apache
.hadoop
.hbase
.regionserver
.MultiVersionConcurrencyControl
;
60 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
61 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
62 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
63 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
64 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.MasterThread
;
65 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
66 import org
.apache
.hadoop
.hbase
.util
.Threads
;
67 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
68 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
69 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
70 import org
.apache
.hadoop
.hbase
.wal
.WALKeyImpl
;
71 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
72 import org
.junit
.After
;
73 import org
.junit
.AfterClass
;
74 import org
.junit
.Before
;
75 import org
.junit
.BeforeClass
;
76 import org
.junit
.Rule
;
77 import org
.junit
.Test
;
78 import org
.junit
.rules
.TestName
;
79 import org
.slf4j
.Logger
;
80 import org
.slf4j
.LoggerFactory
;
82 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
85 * Base class for testing distributed log splitting.
87 public abstract class AbstractTestDLS
{
88 private static final Logger LOG
= LoggerFactory
.getLogger(TestSplitLogManager
.class);
90 private static final HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
92 // Start a cluster with 2 masters and 5 regionservers
93 private static final int NUM_MASTERS
= 2;
94 private static final int NUM_RS
= 5;
95 private static byte[] COLUMN_FAMILY
= Bytes
.toBytes("family");
98 public TestName testName
= new TestName();
100 private TableName tableName
;
101 private SingleProcessHBaseCluster cluster
;
102 private HMaster master
;
103 private Configuration conf
;
106 public TestName name
= new TestName();
109 public static void setup() throws Exception
{
110 // Uncomment the following line if more verbosity is needed for
111 // debugging (see HBASE-12285 for details).
112 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
113 TEST_UTIL
.startMiniZKCluster();
114 TEST_UTIL
.startMiniDFSCluster(3);
118 public static void tearDown() throws Exception
{
119 TEST_UTIL
.shutdownMiniCluster();
122 protected abstract String
getWalProvider();
124 private void startCluster(int numRS
) throws Exception
{
125 SplitLogCounters
.resetCounters();
126 LOG
.info("Starting cluster");
127 conf
.setLong("hbase.splitlog.max.resubmit", 0);
128 // Make the failure test faster
129 conf
.setInt("zookeeper.recovery.retry", 0);
130 conf
.setInt(HConstants
.REGIONSERVER_INFO_PORT
, -1);
131 conf
.setFloat(HConstants
.LOAD_BALANCER_SLOP_KEY
, (float) 100.0); // no load balancing
132 conf
.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER
, 3);
133 conf
.setInt(HConstants
.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT
, 10);
134 conf
.set("hbase.wal.provider", getWalProvider());
135 StartTestingClusterOption option
= StartTestingClusterOption
.builder()
136 .numMasters(NUM_MASTERS
).numRegionServers(numRS
).build();
137 TEST_UTIL
.startMiniHBaseCluster(option
);
138 cluster
= TEST_UTIL
.getHBaseCluster();
139 LOG
.info("Waiting for active/ready master");
140 cluster
.waitForActiveAndReadyMaster();
141 master
= cluster
.getMaster();
142 TEST_UTIL
.waitFor(120000, 200, new Waiter
.Predicate
<Exception
>() {
144 public boolean evaluate() throws Exception
{
145 return cluster
.getLiveRegionServerThreads().size() >= numRS
;
151 public void before() throws Exception
{
152 conf
= TEST_UTIL
.getConfiguration();
153 tableName
= TableName
.valueOf(testName
.getMethodName());
157 public void after() throws Exception
{
158 TEST_UTIL
.shutdownMiniHBaseCluster();
159 TEST_UTIL
.getTestFileSystem().delete(CommonFSUtils
.getRootDir(TEST_UTIL
.getConfiguration()),
161 ZKUtil
.deleteNodeRecursively(TEST_UTIL
.getZooKeeperWatcher(), "/hbase");
165 public void testMasterStartsUpWithLogSplittingWork() throws Exception
{
166 conf
.setInt(ServerManager
.WAIT_ON_REGIONSERVERS_MINTOSTART
, NUM_RS
- 1);
167 startCluster(NUM_RS
);
169 int numRegionsToCreate
= 40;
170 int numLogLines
= 1000;
171 // turn off load balancing to prevent regions from moving around otherwise
172 // they will consume recovered.edits
173 master
.balanceSwitch(false);
175 try (Table ht
= installTable(numRegionsToCreate
)) {
176 HRegionServer hrs
= findRSToKill(false);
177 List
<RegionInfo
> regions
= ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices());
178 makeWAL(hrs
, regions
, numLogLines
, 100);
181 abortMaster(cluster
);
184 LOG
.info("Aborting region server: " + hrs
.getServerName());
185 hrs
.abort("testing");
187 // wait for abort completes
188 TEST_UTIL
.waitFor(120000, 200, new Waiter
.Predicate
<Exception
>() {
190 public boolean evaluate() throws Exception
{
191 return cluster
.getLiveRegionServerThreads().size() <= NUM_RS
- 1;
196 LOG
.info("Current Open Regions:" + HBaseTestingUtil
.getAllOnlineRegions(cluster
).size());
198 // wait for abort completes
199 TEST_UTIL
.waitFor(120000, 200, new Waiter
.Predicate
<Exception
>() {
201 public boolean evaluate() throws Exception
{
202 return (HBaseTestingUtil
.getAllOnlineRegions(cluster
)
203 .size() >= (numRegionsToCreate
+ 1));
207 LOG
.info("Current Open Regions After Master Node Starts Up:" +
208 HBaseTestingUtil
.getAllOnlineRegions(cluster
).size());
210 assertEquals(numLogLines
, TEST_UTIL
.countRows(ht
));
215 public void testThreeRSAbort() throws Exception
{
216 LOG
.info("testThreeRSAbort");
217 int numRegionsToCreate
= 40;
218 int numRowsPerRegion
= 100;
220 startCluster(NUM_RS
); // NUM_RS=6.
222 try (Table table
= installTable(numRegionsToCreate
)) {
223 populateDataInTable(numRowsPerRegion
);
225 List
<RegionServerThread
> rsts
= cluster
.getLiveRegionServerThreads();
226 assertEquals(NUM_RS
, rsts
.size());
227 cluster
.killRegionServer(rsts
.get(0).getRegionServer().getServerName());
228 cluster
.killRegionServer(rsts
.get(1).getRegionServer().getServerName());
229 cluster
.killRegionServer(rsts
.get(2).getRegionServer().getServerName());
231 TEST_UTIL
.waitFor(60000, new Waiter
.ExplainingPredicate
<Exception
>() {
234 public boolean evaluate() throws Exception
{
235 return cluster
.getLiveRegionServerThreads().size() <= NUM_RS
- 3;
239 public String
explainFailure() throws Exception
{
240 return "Timed out waiting for server aborts.";
243 TEST_UTIL
.waitUntilAllRegionsAssigned(tableName
);
246 rows
= TEST_UTIL
.countRows(table
);
247 } catch (Exception e
) {
248 Threads
.printThreadInfo(System
.out
, "Thread dump before fail");
251 assertEquals(numRegionsToCreate
* numRowsPerRegion
, rows
);
256 public void testDelayedDeleteOnFailure() throws Exception
{
257 if (!this.conf
.getBoolean(HConstants
.HBASE_SPLIT_WAL_COORDINATED_BY_ZK
,
258 HConstants
.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
)) {
259 // This test depends on zk coordination....
262 LOG
.info("testDelayedDeleteOnFailure");
264 final SplitLogManager slm
= master
.getMasterWalManager().getSplitLogManager();
265 final FileSystem fs
= master
.getMasterFileSystem().getFileSystem();
266 final Path rootLogDir
=
267 new Path(CommonFSUtils
.getWALRootDir(conf
), HConstants
.HREGION_LOGDIR_NAME
);
268 final Path logDir
= new Path(rootLogDir
, ServerName
.valueOf("x", 1, 1).toString());
270 ExecutorService executor
= null;
272 final Path corruptedLogFile
= new Path(logDir
, "x");
273 FSDataOutputStream out
;
274 out
= fs
.create(corruptedLogFile
);
276 out
.write(Bytes
.toBytes("corrupted bytes"));
278 ZKSplitLogManagerCoordination coordination
=
279 (ZKSplitLogManagerCoordination
) (master
.getCoordinatedStateManager())
280 .getSplitLogManagerCoordination();
281 coordination
.setIgnoreDeleteForTesting(true);
282 executor
= Executors
.newSingleThreadExecutor();
283 Runnable runnable
= new Runnable() {
287 // since the logDir is a fake, corrupted one, so the split log worker
288 // will finish it quickly with error, and this call will fail and throw
290 slm
.splitLogDistributed(logDir
);
291 } catch (IOException ioe
) {
293 assertTrue(fs
.exists(corruptedLogFile
));
294 // this call will block waiting for the task to be removed from the
295 // tasks map which is not going to happen since ignoreZKDeleteForTesting
296 // is set to true, until it is interrupted.
297 slm
.splitLogDistributed(logDir
);
298 } catch (IOException e
) {
299 assertTrue(Thread
.currentThread().isInterrupted());
302 fail("did not get the expected IOException from the 2nd call");
304 fail("did not get the expected IOException from the 1st call");
307 Future
<?
> result
= executor
.submit(runnable
);
309 result
.get(2000, TimeUnit
.MILLISECONDS
);
310 } catch (TimeoutException te
) {
311 // it is ok, expected.
313 waitForCounter(tot_mgr_wait_for_zk_delete
, 0, 1, 10000);
314 executor
.shutdownNow();
317 // make sure the runnable is finished with no exception thrown.
320 if (executor
!= null) {
321 // interrupt the thread in case the test fails in the middle.
322 // it has no effect if the thread is already terminated.
323 executor
.shutdownNow();
325 fs
.delete(logDir
, true);
329 private Table
installTable(int nrs
) throws Exception
{
330 return installTable(nrs
, 0);
333 private Table
installTable(int nrs
, int existingRegions
) throws Exception
{
334 // Create a table with regions
335 byte[] family
= Bytes
.toBytes("family");
336 LOG
.info("Creating table with " + nrs
+ " regions");
337 Table table
= TEST_UTIL
.createMultiRegionTable(tableName
, family
, nrs
);
339 try (RegionLocator r
= TEST_UTIL
.getConnection().getRegionLocator(tableName
)) {
340 numRegions
= r
.getStartKeys().length
;
342 assertEquals(nrs
, numRegions
);
343 LOG
.info("Waiting for no more RIT\n");
345 // disable-enable cycle to get rid of table's dead regions left behind
346 // by createMultiRegions
347 assertTrue(TEST_UTIL
.getAdmin().isTableEnabled(tableName
));
348 LOG
.debug("Disabling table\n");
349 TEST_UTIL
.getAdmin().disableTable(tableName
);
350 LOG
.debug("Waiting for no more RIT\n");
352 NavigableSet
<String
> regions
= HBaseTestingUtil
.getAllOnlineRegions(cluster
);
353 LOG
.debug("Verifying only catalog region is assigned\n");
354 if (regions
.size() != 1) {
355 for (String oregion
: regions
) {
356 LOG
.debug("Region still online: " + oregion
);
359 assertEquals(1 + existingRegions
, regions
.size());
360 LOG
.debug("Enabling table\n");
361 TEST_UTIL
.getAdmin().enableTable(tableName
);
362 LOG
.debug("Waiting for no more RIT\n");
364 LOG
.debug("Verifying there are " + numRegions
+ " assigned on cluster\n");
365 regions
= HBaseTestingUtil
.getAllOnlineRegions(cluster
);
366 assertEquals(numRegions
+ 1 + existingRegions
, regions
.size());
370 void populateDataInTable(int nrows
) throws Exception
{
371 List
<RegionServerThread
> rsts
= cluster
.getLiveRegionServerThreads();
372 assertEquals(NUM_RS
, rsts
.size());
374 for (RegionServerThread rst
: rsts
) {
375 HRegionServer hrs
= rst
.getRegionServer();
376 List
<RegionInfo
> hris
= ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices());
377 for (RegionInfo hri
: hris
) {
378 if (hri
.getTable().isSystemTable()) {
382 "adding data to rs = " + rst
.getName() + " region = " + hri
.getRegionNameAsString());
383 Region region
= hrs
.getOnlineRegion(hri
.getRegionName());
384 assertTrue(region
!= null);
385 putData(region
, hri
.getStartKey(), nrows
, Bytes
.toBytes("q"), COLUMN_FAMILY
);
390 public void makeWAL(HRegionServer hrs
, List
<RegionInfo
> regions
, int num_edits
, int edit_size
)
392 makeWAL(hrs
, regions
, num_edits
, edit_size
, true);
395 public void makeWAL(HRegionServer hrs
, List
<RegionInfo
> regions
, int numEdits
, int editSize
,
396 boolean cleanShutdown
) throws IOException
{
397 // remove root and meta region
398 regions
.remove(RegionInfoBuilder
.FIRST_META_REGIONINFO
);
400 for (Iterator
<RegionInfo
> iter
= regions
.iterator(); iter
.hasNext();) {
401 RegionInfo regionInfo
= iter
.next();
402 if (regionInfo
.getTable().isSystemTable()) {
406 byte[] value
= new byte[editSize
];
408 List
<RegionInfo
> hris
= new ArrayList
<>();
409 for (RegionInfo region
: regions
) {
410 if (region
.getTable() != tableName
) {
415 LOG
.info("Creating wal edits across " + hris
.size() + " regions.");
416 for (int i
= 0; i
< editSize
; i
++) {
417 value
[i
] = (byte) ('a' + (i
% 26));
420 int[] counts
= new int[n
];
421 // sync every ~30k to line up with desired wal rolls
422 final int syncEvery
= 30 * 1024 / editSize
;
423 MultiVersionConcurrencyControl mvcc
= new MultiVersionConcurrencyControl();
425 for (int i
= 0; i
< numEdits
; i
+= 1) {
426 WALEdit e
= new WALEdit();
427 RegionInfo curRegionInfo
= hris
.get(i
% n
);
428 WAL log
= hrs
.getWAL(curRegionInfo
);
429 byte[] startRow
= curRegionInfo
.getStartKey();
430 if (startRow
== null || startRow
.length
== 0) {
431 startRow
= new byte[] { 0, 0, 0, 0, 1 };
433 byte[] row
= Bytes
.incrementBytes(startRow
, counts
[i
% n
]);
434 row
= Arrays
.copyOfRange(row
, 3, 8); // use last 5 bytes because
435 // HBaseTestingUtility.createMultiRegions use 5 bytes key
436 byte[] qualifier
= Bytes
.toBytes("c" + Integer
.toString(i
));
437 e
.add(new KeyValue(row
, COLUMN_FAMILY
, qualifier
, EnvironmentEdgeManager
.currentTime(),
439 log
.appendData(curRegionInfo
, new WALKeyImpl(curRegionInfo
.getEncodedNameAsBytes(),
440 tableName
, EnvironmentEdgeManager
.currentTime(), mvcc
), e
);
441 if (0 == i
% syncEvery
) {
447 // done as two passes because the regions might share logs. shutdown is idempotent, but sync
448 // will cause errors if done after.
449 for (RegionInfo info
: hris
) {
450 WAL log
= hrs
.getWAL(info
);
454 for (RegionInfo info
: hris
) {
455 WAL log
= hrs
.getWAL(info
);
459 for (int i
= 0; i
< n
; i
++) {
460 LOG
.info("region " + hris
.get(i
).getRegionNameAsString() + " has " + counts
[i
] + " edits");
465 private int countWAL(Path log
, FileSystem fs
, Configuration conf
) throws IOException
{
467 try (WAL
.Reader in
= WALFactory
.createReader(fs
, log
, conf
)) {
469 while ((e
= in
.next()) != null) {
470 if (!WALEdit
.isMetaEditFamily(e
.getEdit().getCells().get(0))) {
478 private void blockUntilNoRIT() throws Exception
{
479 TEST_UTIL
.waitUntilNoRegionsInTransition(60000);
482 private void putData(Region region
, byte[] startRow
, int numRows
, byte[] qf
, byte[]... families
)
484 for (int i
= 0; i
< numRows
; i
++) {
485 Put put
= new Put(Bytes
.add(startRow
, Bytes
.toBytes(i
)));
486 for (byte[] family
: families
) {
487 put
.addColumn(family
, qf
, null);
493 private void waitForCounter(LongAdder ctr
, long oldval
, long newval
, long timems
)
494 throws InterruptedException
{
495 long curt
= EnvironmentEdgeManager
.currentTime();
496 long endt
= curt
+ timems
;
497 while (curt
< endt
) {
498 if (ctr
.sum() == oldval
) {
500 curt
= EnvironmentEdgeManager
.currentTime();
502 assertEquals(newval
, ctr
.sum());
509 private void abortMaster(SingleProcessHBaseCluster cluster
) throws InterruptedException
{
510 for (MasterThread mt
: cluster
.getLiveMasterThreads()) {
511 if (mt
.getMaster().isActiveMaster()) {
512 mt
.getMaster().abort("Aborting for tests", new Exception("Trace info"));
517 LOG
.debug("Master is aborted");
521 * Find a RS that has regions of a table.
522 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
524 private HRegionServer
findRSToKill(boolean hasMetaRegion
) throws Exception
{
525 List
<RegionServerThread
> rsts
= cluster
.getLiveRegionServerThreads();
526 List
<RegionInfo
> regions
= null;
527 HRegionServer hrs
= null;
529 for (RegionServerThread rst
: rsts
) {
530 hrs
= rst
.getRegionServer();
531 while (rst
.isAlive() && !hrs
.isOnline()) {
534 if (!rst
.isAlive()) {
537 boolean isCarryingMeta
= false;
538 boolean foundTableRegion
= false;
539 regions
= ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices());
540 for (RegionInfo region
: regions
) {
541 if (region
.isMetaRegion()) {
542 isCarryingMeta
= true;
544 if (region
.getTable() == tableName
) {
545 foundTableRegion
= true;
547 if (foundTableRegion
&& (isCarryingMeta
|| !hasMetaRegion
)) {
551 if (isCarryingMeta
&& hasMetaRegion
) {
552 // clients ask for a RS with META
553 if (!foundTableRegion
) {
554 HRegionServer destRS
= hrs
;
555 // the RS doesn't have regions of the specified table so we need move one to this RS
556 List
<RegionInfo
> tableRegions
= TEST_UTIL
.getAdmin().getRegions(tableName
);
557 RegionInfo hri
= tableRegions
.get(0);
558 TEST_UTIL
.getAdmin().move(hri
.getEncodedNameAsBytes(), destRS
.getServerName());
559 // wait for region move completes
560 RegionStates regionStates
=
561 TEST_UTIL
.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
562 TEST_UTIL
.waitFor(45000, 200, new Waiter
.Predicate
<Exception
>() {
564 public boolean evaluate() throws Exception
{
565 ServerName sn
= regionStates
.getRegionServerOfRegion(hri
);
566 return (sn
!= null && sn
.equals(destRS
.getServerName()));
571 } else if (hasMetaRegion
|| isCarryingMeta
) {
574 if (foundTableRegion
) {