2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import com
.codahale
.metrics
.Counter
;
21 import java
.io
.IOException
;
22 import java
.util
.List
;
23 import java
.util
.Optional
;
24 import java
.util
.concurrent
.CountDownLatch
;
25 import java
.util
.concurrent
.TimeUnit
;
26 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
27 import java
.util
.concurrent
.atomic
.AtomicInteger
;
28 import java
.util
.concurrent
.atomic
.AtomicLong
;
29 import java
.util
.concurrent
.atomic
.AtomicReference
;
30 import org
.apache
.hadoop
.conf
.Configuration
;
31 import org
.apache
.hadoop
.hbase
.Cell
;
32 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
33 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
34 import org
.apache
.hadoop
.hbase
.HConstants
;
35 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
36 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
37 import org
.apache
.hadoop
.hbase
.StartTestingClusterOption
;
38 import org
.apache
.hadoop
.hbase
.TableName
;
39 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
40 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
41 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
42 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
43 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
44 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.InternalScanner
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.StorefileRefresherChore
;
47 import org
.apache
.hadoop
.hbase
.regionserver
.TestRegionServerNoMaster
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
49 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
50 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
51 import org
.apache
.zookeeper
.KeeperException
;
52 import org
.junit
.After
;
53 import org
.junit
.AfterClass
;
54 import org
.junit
.Assert
;
55 import org
.junit
.Before
;
56 import org
.junit
.BeforeClass
;
57 import org
.junit
.ClassRule
;
58 import org
.junit
.Test
;
59 import org
.junit
.experimental
.categories
.Category
;
60 import org
.slf4j
.Logger
;
61 import org
.slf4j
.LoggerFactory
;
63 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
64 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
65 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.AdminProtos
;
68 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
69 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
71 @Category({LargeTests
.class, ClientTests
.class})
72 @SuppressWarnings("deprecation")
73 public class TestReplicasClient
{
76 public static final HBaseClassTestRule CLASS_RULE
=
77 HBaseClassTestRule
.forClass(TestReplicasClient
.class);
79 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicasClient
.class);
81 private static TableName TABLE_NAME
;
82 private Table table
= null;
83 private static final byte[] row
= Bytes
.toBytes(TestReplicasClient
.class.getName());;
85 private static RegionInfo hriPrimary
;
86 private static RegionInfo hriSecondary
;
88 private static final HBaseTestingUtil HTU
= new HBaseTestingUtil();
89 private static final byte[] f
= HConstants
.CATALOG_FAMILY
;
91 private final static int REFRESH_PERIOD
= 1000;
94 * This copro is used to synchronize the tests.
96 public static class SlowMeCopro
implements RegionCoprocessor
, RegionObserver
{
97 static final AtomicLong sleepTime
= new AtomicLong(0);
98 static final AtomicBoolean slowDownNext
= new AtomicBoolean(false);
99 static final AtomicInteger countOfNext
= new AtomicInteger(0);
100 private static final AtomicReference
<CountDownLatch
> primaryCdl
=
101 new AtomicReference
<>(new CountDownLatch(0));
102 private static final AtomicReference
<CountDownLatch
> secondaryCdl
=
103 new AtomicReference
<>(new CountDownLatch(0));
104 public SlowMeCopro() {
108 public Optional
<RegionObserver
> getRegionObserver() {
109 return Optional
.of(this);
113 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
114 final Get get
, final List
<Cell
> results
) throws IOException
{
119 public void preScannerOpen(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
120 final Scan scan
) throws IOException
{
125 public boolean preScannerNext(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
126 final InternalScanner s
, final List
<Result
> results
,
127 final int limit
, final boolean hasMore
) throws IOException
{
128 //this will slow down a certain next operation if the conditions are met. The slowness
129 //will allow the call to go to a replica
130 if (slowDownNext
.get()) {
131 //have some "next" return successfully from the primary; hence countOfNext checked
132 if (countOfNext
.incrementAndGet() == 2) {
140 private void slowdownCode(final ObserverContext
<RegionCoprocessorEnvironment
> e
) {
141 if (e
.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
142 LOG
.info("We're the primary replicas.");
143 CountDownLatch latch
= getPrimaryCdl().get();
145 if (sleepTime
.get() > 0) {
146 LOG
.info("Sleeping for " + sleepTime
.get() + " ms");
147 Thread
.sleep(sleepTime
.get());
148 } else if (latch
.getCount() > 0) {
149 LOG
.info("Waiting for the counterCountDownLatch");
150 latch
.await(2, TimeUnit
.MINUTES
); // To help the tests to finish.
151 if (latch
.getCount() > 0) {
152 throw new RuntimeException("Can't wait more");
155 } catch (InterruptedException e1
) {
156 LOG
.error(e1
.toString(), e1
);
159 LOG
.info("We're not the primary replicas.");
160 CountDownLatch latch
= getSecondaryCdl().get();
162 if (latch
.getCount() > 0) {
163 LOG
.info("Waiting for the secondary counterCountDownLatch");
164 latch
.await(2, TimeUnit
.MINUTES
); // To help the tests to finish.
165 if (latch
.getCount() > 0) {
166 throw new RuntimeException("Can't wait more");
169 } catch (InterruptedException e1
) {
170 LOG
.error(e1
.toString(), e1
);
175 public static AtomicReference
<CountDownLatch
> getPrimaryCdl() {
179 public static AtomicReference
<CountDownLatch
> getSecondaryCdl() {
185 public static void beforeClass() throws Exception
{
186 // enable store file refreshing
187 HTU
.getConfiguration().setInt(
188 StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
, REFRESH_PERIOD
);
189 HTU
.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
190 HTU
.getConfiguration().setBoolean(MetricsConnection
.CLIENT_SIDE_METRICS_ENABLED_KEY
, true);
191 StartTestingClusterOption option
= StartTestingClusterOption
.builder().numRegionServers(1).
192 numAlwaysStandByMasters(1).numMasters(1).build();
193 HTU
.startMiniCluster(option
);
195 // Create table then get the single region for our new table.
196 TableDescriptorBuilder builder
= HTU
.createModifyableTableDescriptor(
197 TableName
.valueOf(TestReplicasClient
.class.getSimpleName()),
198 ColumnFamilyDescriptorBuilder
.DEFAULT_MIN_VERSIONS
, 3, HConstants
.FOREVER
,
199 ColumnFamilyDescriptorBuilder
.DEFAULT_KEEP_DELETED
);
200 builder
.setCoprocessor(SlowMeCopro
.class.getName());
201 TableDescriptor hdt
= builder
.build();
202 HTU
.createTable(hdt
, new byte[][]{f
}, null);
203 TABLE_NAME
= hdt
.getTableName();
204 try (RegionLocator locator
= HTU
.getConnection().getRegionLocator(hdt
.getTableName())) {
205 hriPrimary
= locator
.getRegionLocation(row
, false).getRegion();
208 // mock a secondary region info to open
209 hriSecondary
= RegionReplicaUtil
.getRegionInfoForReplica(hriPrimary
, 1);
212 LOG
.info("Master is going to be stopped");
213 TestRegionServerNoMaster
.stopMasterAndCacheMetaLocation(HTU
);
214 Configuration c
= new Configuration(HTU
.getConfiguration());
215 c
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 1);
216 LOG
.info("Master has stopped");
220 public static void afterClass() throws Exception
{
221 HRegionServer
.TEST_SKIP_REPORTING_TRANSITION
= false;
222 HTU
.shutdownMiniCluster();
226 public void before() throws IOException
{
227 HTU
.getConnection().clearRegionLocationCache();
229 openRegion(hriPrimary
);
230 } catch (Exception ignored
) {
233 openRegion(hriSecondary
);
234 } catch (Exception ignored
) {
236 table
= HTU
.getConnection().getTable(TABLE_NAME
);
240 public void after() throws IOException
, KeeperException
{
242 closeRegion(hriSecondary
);
243 } catch (Exception ignored
) {
246 closeRegion(hriPrimary
);
247 } catch (Exception ignored
) {
249 HTU
.getConnection().clearRegionLocationCache();
252 private HRegionServer
getRS() {
253 return HTU
.getMiniHBaseCluster().getRegionServer(0);
256 private void openRegion(RegionInfo hri
) throws Exception
{
258 if (isRegionOpened(hri
)) return;
259 } catch (Exception e
){}
260 // first version is '0'
261 AdminProtos
.OpenRegionRequest orr
= RequestConverter
.buildOpenRegionRequest(
262 getRS().getServerName(), hri
, null);
263 AdminProtos
.OpenRegionResponse responseOpen
= getRS().getRSRpcServices().openRegion(null, orr
);
264 Assert
.assertEquals(1, responseOpen
.getOpeningStateCount());
265 Assert
.assertEquals(AdminProtos
.OpenRegionResponse
.RegionOpeningState
.OPENED
,
266 responseOpen
.getOpeningState(0));
267 checkRegionIsOpened(hri
);
270 private void closeRegion(RegionInfo hri
) throws Exception
{
271 AdminProtos
.CloseRegionRequest crr
= ProtobufUtil
.buildCloseRegionRequest(
272 getRS().getServerName(), hri
.getRegionName());
273 AdminProtos
.CloseRegionResponse responseClose
= getRS()
274 .getRSRpcServices().closeRegion(null, crr
);
275 Assert
.assertTrue(responseClose
.getClosed());
277 checkRegionIsClosed(hri
.getEncodedName());
280 private void checkRegionIsOpened(RegionInfo hri
) throws Exception
{
281 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
286 private boolean isRegionOpened(RegionInfo hri
) throws Exception
{
287 return getRS().getRegionByEncodedName(hri
.getEncodedName()).isAvailable();
290 private void checkRegionIsClosed(String encodedRegionName
) throws Exception
{
292 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
297 Assert
.assertFalse(getRS().getRegionByEncodedName(encodedRegionName
).isAvailable());
298 } catch (NotServingRegionException expected
) {
299 // That's how it work: if the region is closed we have an exception.
302 // We don't delete the znode here, because there is not always a znode.
305 private void flushRegion(RegionInfo regionInfo
) throws IOException
{
306 TestRegionServerNoMaster
.flushRegion(HTU
, regionInfo
);
310 public void testUseRegionWithoutReplica() throws Exception
{
311 byte[] b1
= Bytes
.toBytes("testUseRegionWithoutReplica");
312 openRegion(hriSecondary
);
313 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(0));
316 Result r
= table
.get(g
);
317 Assert
.assertFalse(r
.isStale());
319 closeRegion(hriSecondary
);
324 public void testLocations() throws Exception
{
325 byte[] b1
= Bytes
.toBytes("testLocations");
326 openRegion(hriSecondary
);
328 try (Connection conn
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
329 RegionLocator locator
= conn
.getRegionLocator(TABLE_NAME
)) {
330 conn
.clearRegionLocationCache();
331 List
<HRegionLocation
> rl
= locator
.getRegionLocations(b1
, true);
332 Assert
.assertEquals(2, rl
.size());
334 rl
= locator
.getRegionLocations(b1
, false);
335 Assert
.assertEquals(2, rl
.size());
337 conn
.clearRegionLocationCache();
338 rl
= locator
.getRegionLocations(b1
, false);
339 Assert
.assertEquals(2, rl
.size());
341 rl
= locator
.getRegionLocations(b1
, true);
342 Assert
.assertEquals(2, rl
.size());
344 closeRegion(hriSecondary
);
349 public void testGetNoResultNoStaleRegionWithReplica() throws Exception
{
350 byte[] b1
= Bytes
.toBytes("testGetNoResultNoStaleRegionWithReplica");
351 openRegion(hriSecondary
);
354 // A get works and is not stale
356 Result r
= table
.get(g
);
357 Assert
.assertFalse(r
.isStale());
359 closeRegion(hriSecondary
);
365 public void testGetNoResultStaleRegionWithReplica() throws Exception
{
366 byte[] b1
= Bytes
.toBytes("testGetNoResultStaleRegionWithReplica");
367 openRegion(hriSecondary
);
369 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
372 g
.setConsistency(Consistency
.TIMELINE
);
373 Result r
= table
.get(g
);
374 Assert
.assertTrue(r
.isStale());
376 SlowMeCopro
.getPrimaryCdl().get().countDown();
377 closeRegion(hriSecondary
);
382 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception
{
383 byte[] b1
= Bytes
.toBytes("testGetNoResultNotStaleSleepRegionWithReplica");
384 openRegion(hriSecondary
);
387 // We sleep; but we won't go to the stale region as we don't get the stale by default.
388 SlowMeCopro
.sleepTime
.set(2000);
390 Result r
= table
.get(g
);
391 Assert
.assertFalse(r
.isStale());
394 SlowMeCopro
.sleepTime
.set(0);
395 closeRegion(hriSecondary
);
400 public void testFlushTable() throws Exception
{
401 openRegion(hriSecondary
);
403 flushRegion(hriPrimary
);
404 flushRegion(hriSecondary
);
406 Put p
= new Put(row
);
407 p
.addColumn(f
, row
, row
);
410 flushRegion(hriPrimary
);
411 flushRegion(hriSecondary
);
413 Delete d
= new Delete(row
);
415 closeRegion(hriSecondary
);
420 public void testFlushPrimary() throws Exception
{
421 openRegion(hriSecondary
);
424 flushRegion(hriPrimary
);
426 Put p
= new Put(row
);
427 p
.addColumn(f
, row
, row
);
430 flushRegion(hriPrimary
);
432 Delete d
= new Delete(row
);
434 closeRegion(hriSecondary
);
439 public void testFlushSecondary() throws Exception
{
440 openRegion(hriSecondary
);
442 flushRegion(hriSecondary
);
444 Put p
= new Put(row
);
445 p
.addColumn(f
, row
, row
);
448 flushRegion(hriSecondary
);
449 } catch (TableNotFoundException expected
) {
451 Delete d
= new Delete(row
);
453 closeRegion(hriSecondary
);
458 public void testUseRegionWithReplica() throws Exception
{
459 byte[] b1
= Bytes
.toBytes("testUseRegionWithReplica");
460 openRegion(hriSecondary
);
463 // A simple put works, even if there here a second replica
465 p
.addColumn(f
, b1
, b1
);
467 LOG
.info("Put done");
469 // A get works and is not stale
471 Result r
= table
.get(g
);
472 Assert
.assertFalse(r
.isStale());
473 Assert
.assertFalse(r
.getColumnCells(f
, b1
).isEmpty());
474 LOG
.info("get works and is not stale done");
476 // Even if it we have to wait a little on the main region
477 SlowMeCopro
.sleepTime
.set(2000);
480 Assert
.assertFalse(r
.isStale());
481 Assert
.assertFalse(r
.getColumnCells(f
, b1
).isEmpty());
482 SlowMeCopro
.sleepTime
.set(0);
483 LOG
.info("sleep and is not stale done");
485 // But if we ask for stale we will get it
486 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
488 g
.setConsistency(Consistency
.TIMELINE
);
490 Assert
.assertTrue(r
.isStale());
491 Assert
.assertTrue(r
.getColumnCells(f
, b1
).isEmpty());
492 SlowMeCopro
.getPrimaryCdl().get().countDown();
494 LOG
.info("stale done");
496 // exists works and is not stale
498 g
.setCheckExistenceOnly(true);
500 Assert
.assertFalse(r
.isStale());
501 Assert
.assertTrue(r
.getExists());
502 LOG
.info("exists not stale done");
504 // exists works on stale but don't see the put
505 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
507 g
.setCheckExistenceOnly(true);
508 g
.setConsistency(Consistency
.TIMELINE
);
510 Assert
.assertTrue(r
.isStale());
511 Assert
.assertFalse("The secondary has stale data", r
.getExists());
512 SlowMeCopro
.getPrimaryCdl().get().countDown();
513 LOG
.info("exists stale before flush done");
515 flushRegion(hriPrimary
);
516 flushRegion(hriSecondary
);
517 LOG
.info("flush done");
518 Thread
.sleep(1000 + REFRESH_PERIOD
* 2);
520 // get works and is not stale
521 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
523 g
.setConsistency(Consistency
.TIMELINE
);
525 Assert
.assertTrue(r
.isStale());
526 Assert
.assertFalse(r
.isEmpty());
527 SlowMeCopro
.getPrimaryCdl().get().countDown();
528 LOG
.info("stale done");
530 // exists works on stale and we see the put after the flush
531 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
533 g
.setCheckExistenceOnly(true);
534 g
.setConsistency(Consistency
.TIMELINE
);
536 Assert
.assertTrue(r
.isStale());
537 Assert
.assertTrue(r
.getExists());
538 SlowMeCopro
.getPrimaryCdl().get().countDown();
539 LOG
.info("exists stale after flush done");
542 SlowMeCopro
.getPrimaryCdl().get().countDown();
543 SlowMeCopro
.sleepTime
.set(0);
544 Delete d
= new Delete(b1
);
546 closeRegion(hriSecondary
);
551 public void testHedgedRead() throws Exception
{
552 byte[] b1
= Bytes
.toBytes("testHedgedRead");
553 openRegion(hriSecondary
);
556 // A simple put works, even if there here a second replica
558 p
.addColumn(f
, b1
, b1
);
560 LOG
.info("Put done");
562 // A get works and is not stale
564 Result r
= table
.get(g
);
565 Assert
.assertFalse(r
.isStale());
566 Assert
.assertFalse(r
.getColumnCells(f
, b1
).isEmpty());
567 LOG
.info("get works and is not stale done");
570 AsyncConnectionImpl conn
= (AsyncConnectionImpl
) HTU
.getConnection().toAsyncConnection();
571 Counter hedgedReadOps
= conn
.getConnectionMetrics().get().hedgedReadOps
;
572 Counter hedgedReadWin
= conn
.getConnectionMetrics().get().hedgedReadWin
;
573 hedgedReadOps
.dec(hedgedReadOps
.getCount());
574 hedgedReadWin
.dec(hedgedReadWin
.getCount());
576 // Wait a little on the main region, just enough to happen once hedged read
577 // and hedged read did not returned faster
578 long primaryCallTimeoutNs
= conn
.connConf
.getPrimaryCallTimeoutNs();
579 // The resolution of our timer is 10ms, so we need to sleep a bit more otherwise we may not
580 // trigger the hedged read...
581 SlowMeCopro
.sleepTime
.set(TimeUnit
.NANOSECONDS
.toMillis(primaryCallTimeoutNs
) + 100);
582 SlowMeCopro
.getSecondaryCdl().set(new CountDownLatch(1));
584 g
.setConsistency(Consistency
.TIMELINE
);
586 Assert
.assertFalse(r
.isStale());
587 Assert
.assertFalse(r
.getColumnCells(f
, b1
).isEmpty());
588 Assert
.assertEquals(1, hedgedReadOps
.getCount());
589 Assert
.assertEquals(0, hedgedReadWin
.getCount());
590 SlowMeCopro
.sleepTime
.set(0);
591 SlowMeCopro
.getSecondaryCdl().get().countDown();
592 LOG
.info("hedged read occurred but not faster");
595 // But if we ask for stale we will get it and hedged read returned faster
596 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
598 g
.setConsistency(Consistency
.TIMELINE
);
600 Assert
.assertTrue(r
.isStale());
601 Assert
.assertTrue(r
.getColumnCells(f
, b1
).isEmpty());
602 Assert
.assertEquals(2, hedgedReadOps
.getCount());
603 // we update the metrics after we finish the request so we use a waitFor here, use assert
604 // directly may cause failure if we run too fast.
605 HTU
.waitFor(10000, () -> hedgedReadWin
.getCount() == 1);
606 SlowMeCopro
.getPrimaryCdl().get().countDown();
607 LOG
.info("hedged read occurred and faster");
610 SlowMeCopro
.getPrimaryCdl().get().countDown();
611 SlowMeCopro
.getSecondaryCdl().get().countDown();
612 SlowMeCopro
.sleepTime
.set(0);
613 Delete d
= new Delete(b1
);
615 closeRegion(hriSecondary
);