2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import com
.codahale
.metrics
.Counter
;
21 import java
.io
.IOException
;
22 import java
.util
.List
;
23 import java
.util
.Optional
;
24 import java
.util
.concurrent
.CountDownLatch
;
25 import java
.util
.concurrent
.TimeUnit
;
26 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
27 import java
.util
.concurrent
.atomic
.AtomicInteger
;
28 import java
.util
.concurrent
.atomic
.AtomicLong
;
29 import java
.util
.concurrent
.atomic
.AtomicReference
;
30 import org
.apache
.hadoop
.conf
.Configuration
;
31 import org
.apache
.hadoop
.hbase
.Cell
;
32 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
33 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
34 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
35 import org
.apache
.hadoop
.hbase
.HConstants
;
36 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
37 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
38 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
39 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
40 import org
.apache
.hadoop
.hbase
.StartMiniClusterOption
;
41 import org
.apache
.hadoop
.hbase
.TableName
;
42 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
43 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
44 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
45 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
46 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
47 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
48 import org
.apache
.hadoop
.hbase
.regionserver
.InternalScanner
;
49 import org
.apache
.hadoop
.hbase
.regionserver
.StorefileRefresherChore
;
50 import org
.apache
.hadoop
.hbase
.regionserver
.TestRegionServerNoMaster
;
51 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
52 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
53 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
54 import org
.apache
.zookeeper
.KeeperException
;
55 import org
.junit
.After
;
56 import org
.junit
.AfterClass
;
57 import org
.junit
.Assert
;
58 import org
.junit
.Before
;
59 import org
.junit
.BeforeClass
;
60 import org
.junit
.ClassRule
;
61 import org
.junit
.Test
;
62 import org
.junit
.experimental
.categories
.Category
;
63 import org
.slf4j
.Logger
;
64 import org
.slf4j
.LoggerFactory
;
66 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
67 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
68 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.AdminProtos
;
71 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
72 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
74 @Category({LargeTests
.class, ClientTests
.class})
75 @SuppressWarnings("deprecation")
76 public class TestReplicasClient
{
79 public static final HBaseClassTestRule CLASS_RULE
=
80 HBaseClassTestRule
.forClass(TestReplicasClient
.class);
82 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicasClient
.class);
84 private static final int NB_SERVERS
= 1;
85 private static TableName TABLE_NAME
;
86 private Table table
= null;
87 private static final byte[] row
= Bytes
.toBytes(TestReplicasClient
.class.getName());;
89 private static RegionInfo hriPrimary
;
90 private static HRegionInfo hriSecondary
;
92 private static final HBaseTestingUtility HTU
= new HBaseTestingUtility();
93 private static final byte[] f
= HConstants
.CATALOG_FAMILY
;
95 private final static int REFRESH_PERIOD
= 1000;
98 * This copro is used to synchronize the tests.
100 public static class SlowMeCopro
implements RegionCoprocessor
, RegionObserver
{
101 static final AtomicLong sleepTime
= new AtomicLong(0);
102 static final AtomicBoolean slowDownNext
= new AtomicBoolean(false);
103 static final AtomicInteger countOfNext
= new AtomicInteger(0);
104 private static final AtomicReference
<CountDownLatch
> primaryCdl
=
105 new AtomicReference
<>(new CountDownLatch(0));
106 private static final AtomicReference
<CountDownLatch
> secondaryCdl
=
107 new AtomicReference
<>(new CountDownLatch(0));
108 public SlowMeCopro() {
112 public Optional
<RegionObserver
> getRegionObserver() {
113 return Optional
.of(this);
117 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
118 final Get get
, final List
<Cell
> results
) throws IOException
{
123 public void preScannerOpen(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
124 final Scan scan
) throws IOException
{
129 public boolean preScannerNext(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
130 final InternalScanner s
, final List
<Result
> results
,
131 final int limit
, final boolean hasMore
) throws IOException
{
132 //this will slow down a certain next operation if the conditions are met. The slowness
133 //will allow the call to go to a replica
134 if (slowDownNext
.get()) {
135 //have some "next" return successfully from the primary; hence countOfNext checked
136 if (countOfNext
.incrementAndGet() == 2) {
144 private void slowdownCode(final ObserverContext
<RegionCoprocessorEnvironment
> e
) {
145 if (e
.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
146 LOG
.info("We're the primary replicas.");
147 CountDownLatch latch
= getPrimaryCdl().get();
149 if (sleepTime
.get() > 0) {
150 LOG
.info("Sleeping for " + sleepTime
.get() + " ms");
151 Thread
.sleep(sleepTime
.get());
152 } else if (latch
.getCount() > 0) {
153 LOG
.info("Waiting for the counterCountDownLatch");
154 latch
.await(2, TimeUnit
.MINUTES
); // To help the tests to finish.
155 if (latch
.getCount() > 0) {
156 throw new RuntimeException("Can't wait more");
159 } catch (InterruptedException e1
) {
160 LOG
.error(e1
.toString(), e1
);
163 LOG
.info("We're not the primary replicas.");
164 CountDownLatch latch
= getSecondaryCdl().get();
166 if (latch
.getCount() > 0) {
167 LOG
.info("Waiting for the secondary counterCountDownLatch");
168 latch
.await(2, TimeUnit
.MINUTES
); // To help the tests to finish.
169 if (latch
.getCount() > 0) {
170 throw new RuntimeException("Can't wait more");
173 } catch (InterruptedException e1
) {
174 LOG
.error(e1
.toString(), e1
);
179 public static AtomicReference
<CountDownLatch
> getPrimaryCdl() {
183 public static AtomicReference
<CountDownLatch
> getSecondaryCdl() {
189 public static void beforeClass() throws Exception
{
190 // enable store file refreshing
191 HTU
.getConfiguration().setInt(
192 StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
, REFRESH_PERIOD
);
193 HTU
.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
194 HTU
.getConfiguration().setBoolean(MetricsConnection
.CLIENT_SIDE_METRICS_ENABLED_KEY
, true);
195 StartMiniClusterOption option
= StartMiniClusterOption
.builder().numRegionServers(1).
196 numAlwaysStandByMasters(1).numMasters(1).build();
197 HTU
.startMiniCluster(option
);
199 // Create table then get the single region for our new table.
200 HTableDescriptor hdt
= HTU
.createTableDescriptor(
201 TableName
.valueOf(TestReplicasClient
.class.getSimpleName()),
202 HColumnDescriptor
.DEFAULT_MIN_VERSIONS
, 3, HConstants
.FOREVER
,
203 HColumnDescriptor
.DEFAULT_KEEP_DELETED
);
204 hdt
.addCoprocessor(SlowMeCopro
.class.getName());
205 HTU
.createTable(hdt
, new byte[][]{f
}, null);
206 TABLE_NAME
= hdt
.getTableName();
207 try (RegionLocator locator
= HTU
.getConnection().getRegionLocator(hdt
.getTableName())) {
208 hriPrimary
= locator
.getRegionLocation(row
, false).getRegion();
211 // mock a secondary region info to open
212 hriSecondary
= new HRegionInfo(hriPrimary
.getTable(), hriPrimary
.getStartKey(),
213 hriPrimary
.getEndKey(), hriPrimary
.isSplit(), hriPrimary
.getRegionId(), 1);
216 LOG
.info("Master is going to be stopped");
217 TestRegionServerNoMaster
.stopMasterAndAssignMeta(HTU
);
218 Configuration c
= new Configuration(HTU
.getConfiguration());
219 c
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 1);
220 LOG
.info("Master has stopped");
224 public static void afterClass() throws Exception
{
225 HRegionServer
.TEST_SKIP_REPORTING_TRANSITION
= false;
226 HTU
.shutdownMiniCluster();
230 public void before() throws IOException
{
231 HTU
.getConnection().clearRegionLocationCache();
233 openRegion(hriPrimary
);
234 } catch (Exception ignored
) {
237 openRegion(hriSecondary
);
238 } catch (Exception ignored
) {
240 table
= HTU
.getConnection().getTable(TABLE_NAME
);
244 public void after() throws IOException
, KeeperException
{
246 closeRegion(hriSecondary
);
247 } catch (Exception ignored
) {
250 closeRegion(hriPrimary
);
251 } catch (Exception ignored
) {
253 HTU
.getConnection().clearRegionLocationCache();
256 private HRegionServer
getRS() {
257 return HTU
.getMiniHBaseCluster().getRegionServer(0);
260 private void openRegion(RegionInfo hri
) throws Exception
{
262 if (isRegionOpened(hri
)) return;
263 } catch (Exception e
){}
264 // first version is '0'
265 AdminProtos
.OpenRegionRequest orr
= RequestConverter
.buildOpenRegionRequest(
266 getRS().getServerName(), hri
, null);
267 AdminProtos
.OpenRegionResponse responseOpen
= getRS().getRSRpcServices().openRegion(null, orr
);
268 Assert
.assertEquals(1, responseOpen
.getOpeningStateCount());
269 Assert
.assertEquals(AdminProtos
.OpenRegionResponse
.RegionOpeningState
.OPENED
,
270 responseOpen
.getOpeningState(0));
271 checkRegionIsOpened(hri
);
274 private void closeRegion(RegionInfo hri
) throws Exception
{
275 AdminProtos
.CloseRegionRequest crr
= ProtobufUtil
.buildCloseRegionRequest(
276 getRS().getServerName(), hri
.getRegionName());
277 AdminProtos
.CloseRegionResponse responseClose
= getRS()
278 .getRSRpcServices().closeRegion(null, crr
);
279 Assert
.assertTrue(responseClose
.getClosed());
281 checkRegionIsClosed(hri
.getEncodedName());
284 private void checkRegionIsOpened(RegionInfo hri
) throws Exception
{
285 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
290 private boolean isRegionOpened(RegionInfo hri
) throws Exception
{
291 return getRS().getRegionByEncodedName(hri
.getEncodedName()).isAvailable();
294 private void checkRegionIsClosed(String encodedRegionName
) throws Exception
{
296 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
301 Assert
.assertFalse(getRS().getRegionByEncodedName(encodedRegionName
).isAvailable());
302 } catch (NotServingRegionException expected
) {
303 // That's how it work: if the region is closed we have an exception.
306 // We don't delete the znode here, because there is not always a znode.
309 private void flushRegion(RegionInfo regionInfo
) throws IOException
{
310 TestRegionServerNoMaster
.flushRegion(HTU
, regionInfo
);
314 public void testUseRegionWithoutReplica() throws Exception
{
315 byte[] b1
= Bytes
.toBytes("testUseRegionWithoutReplica");
316 openRegion(hriSecondary
);
317 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(0));
320 Result r
= table
.get(g
);
321 Assert
.assertFalse(r
.isStale());
323 closeRegion(hriSecondary
);
328 public void testLocations() throws Exception
{
329 byte[] b1
= Bytes
.toBytes("testLocations");
330 openRegion(hriSecondary
);
332 try (Connection conn
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
333 RegionLocator locator
= conn
.getRegionLocator(TABLE_NAME
)) {
334 conn
.clearRegionLocationCache();
335 List
<HRegionLocation
> rl
= locator
.getRegionLocations(b1
, true);
336 Assert
.assertEquals(2, rl
.size());
338 rl
= locator
.getRegionLocations(b1
, false);
339 Assert
.assertEquals(2, rl
.size());
341 conn
.clearRegionLocationCache();
342 rl
= locator
.getRegionLocations(b1
, false);
343 Assert
.assertEquals(2, rl
.size());
345 rl
= locator
.getRegionLocations(b1
, true);
346 Assert
.assertEquals(2, rl
.size());
348 closeRegion(hriSecondary
);
353 public void testGetNoResultNoStaleRegionWithReplica() throws Exception
{
354 byte[] b1
= Bytes
.toBytes("testGetNoResultNoStaleRegionWithReplica");
355 openRegion(hriSecondary
);
358 // A get works and is not stale
360 Result r
= table
.get(g
);
361 Assert
.assertFalse(r
.isStale());
363 closeRegion(hriSecondary
);
369 public void testGetNoResultStaleRegionWithReplica() throws Exception
{
370 byte[] b1
= Bytes
.toBytes("testGetNoResultStaleRegionWithReplica");
371 openRegion(hriSecondary
);
373 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
376 g
.setConsistency(Consistency
.TIMELINE
);
377 Result r
= table
.get(g
);
378 Assert
.assertTrue(r
.isStale());
380 SlowMeCopro
.getPrimaryCdl().get().countDown();
381 closeRegion(hriSecondary
);
386 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception
{
387 byte[] b1
= Bytes
.toBytes("testGetNoResultNotStaleSleepRegionWithReplica");
388 openRegion(hriSecondary
);
391 // We sleep; but we won't go to the stale region as we don't get the stale by default.
392 SlowMeCopro
.sleepTime
.set(2000);
394 Result r
= table
.get(g
);
395 Assert
.assertFalse(r
.isStale());
398 SlowMeCopro
.sleepTime
.set(0);
399 closeRegion(hriSecondary
);
404 public void testFlushTable() throws Exception
{
405 openRegion(hriSecondary
);
407 flushRegion(hriPrimary
);
408 flushRegion(hriSecondary
);
410 Put p
= new Put(row
);
411 p
.addColumn(f
, row
, row
);
414 flushRegion(hriPrimary
);
415 flushRegion(hriSecondary
);
417 Delete d
= new Delete(row
);
419 closeRegion(hriSecondary
);
424 public void testFlushPrimary() throws Exception
{
425 openRegion(hriSecondary
);
428 flushRegion(hriPrimary
);
430 Put p
= new Put(row
);
431 p
.addColumn(f
, row
, row
);
434 flushRegion(hriPrimary
);
436 Delete d
= new Delete(row
);
438 closeRegion(hriSecondary
);
443 public void testFlushSecondary() throws Exception
{
444 openRegion(hriSecondary
);
446 flushRegion(hriSecondary
);
448 Put p
= new Put(row
);
449 p
.addColumn(f
, row
, row
);
452 flushRegion(hriSecondary
);
453 } catch (TableNotFoundException expected
) {
455 Delete d
= new Delete(row
);
457 closeRegion(hriSecondary
);
462 public void testUseRegionWithReplica() throws Exception
{
463 byte[] b1
= Bytes
.toBytes("testUseRegionWithReplica");
464 openRegion(hriSecondary
);
467 // A simple put works, even if there here a second replica
469 p
.addColumn(f
, b1
, b1
);
471 LOG
.info("Put done");
473 // A get works and is not stale
475 Result r
= table
.get(g
);
476 Assert
.assertFalse(r
.isStale());
477 Assert
.assertFalse(r
.getColumnCells(f
, b1
).isEmpty());
478 LOG
.info("get works and is not stale done");
480 // Even if it we have to wait a little on the main region
481 SlowMeCopro
.sleepTime
.set(2000);
484 Assert
.assertFalse(r
.isStale());
485 Assert
.assertFalse(r
.getColumnCells(f
, b1
).isEmpty());
486 SlowMeCopro
.sleepTime
.set(0);
487 LOG
.info("sleep and is not stale done");
489 // But if we ask for stale we will get it
490 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
492 g
.setConsistency(Consistency
.TIMELINE
);
494 Assert
.assertTrue(r
.isStale());
495 Assert
.assertTrue(r
.getColumnCells(f
, b1
).isEmpty());
496 SlowMeCopro
.getPrimaryCdl().get().countDown();
498 LOG
.info("stale done");
500 // exists works and is not stale
502 g
.setCheckExistenceOnly(true);
504 Assert
.assertFalse(r
.isStale());
505 Assert
.assertTrue(r
.getExists());
506 LOG
.info("exists not stale done");
508 // exists works on stale but don't see the put
509 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
511 g
.setCheckExistenceOnly(true);
512 g
.setConsistency(Consistency
.TIMELINE
);
514 Assert
.assertTrue(r
.isStale());
515 Assert
.assertFalse("The secondary has stale data", r
.getExists());
516 SlowMeCopro
.getPrimaryCdl().get().countDown();
517 LOG
.info("exists stale before flush done");
519 flushRegion(hriPrimary
);
520 flushRegion(hriSecondary
);
521 LOG
.info("flush done");
522 Thread
.sleep(1000 + REFRESH_PERIOD
* 2);
524 // get works and is not stale
525 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
527 g
.setConsistency(Consistency
.TIMELINE
);
529 Assert
.assertTrue(r
.isStale());
530 Assert
.assertFalse(r
.isEmpty());
531 SlowMeCopro
.getPrimaryCdl().get().countDown();
532 LOG
.info("stale done");
534 // exists works on stale and we see the put after the flush
535 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
537 g
.setCheckExistenceOnly(true);
538 g
.setConsistency(Consistency
.TIMELINE
);
540 Assert
.assertTrue(r
.isStale());
541 Assert
.assertTrue(r
.getExists());
542 SlowMeCopro
.getPrimaryCdl().get().countDown();
543 LOG
.info("exists stale after flush done");
546 SlowMeCopro
.getPrimaryCdl().get().countDown();
547 SlowMeCopro
.sleepTime
.set(0);
548 Delete d
= new Delete(b1
);
550 closeRegion(hriSecondary
);
555 public void testHedgedRead() throws Exception
{
556 byte[] b1
= Bytes
.toBytes("testHedgedRead");
557 openRegion(hriSecondary
);
560 // A simple put works, even if there here a second replica
562 p
.addColumn(f
, b1
, b1
);
564 LOG
.info("Put done");
566 // A get works and is not stale
568 Result r
= table
.get(g
);
569 Assert
.assertFalse(r
.isStale());
570 Assert
.assertFalse(r
.getColumnCells(f
, b1
).isEmpty());
571 LOG
.info("get works and is not stale done");
574 AsyncConnectionImpl conn
= (AsyncConnectionImpl
) HTU
.getConnection().toAsyncConnection();
575 Counter hedgedReadOps
= conn
.getConnectionMetrics().get().hedgedReadOps
;
576 Counter hedgedReadWin
= conn
.getConnectionMetrics().get().hedgedReadWin
;
577 hedgedReadOps
.dec(hedgedReadOps
.getCount());
578 hedgedReadWin
.dec(hedgedReadWin
.getCount());
580 // Wait a little on the main region, just enough to happen once hedged read
581 // and hedged read did not returned faster
582 long primaryCallTimeoutNs
= conn
.connConf
.getPrimaryCallTimeoutNs();
583 // The resolution of our timer is 10ms, so we need to sleep a bit more otherwise we may not
584 // trigger the hedged read...
585 SlowMeCopro
.sleepTime
.set(TimeUnit
.NANOSECONDS
.toMillis(primaryCallTimeoutNs
) + 100);
586 SlowMeCopro
.getSecondaryCdl().set(new CountDownLatch(1));
588 g
.setConsistency(Consistency
.TIMELINE
);
590 Assert
.assertFalse(r
.isStale());
591 Assert
.assertFalse(r
.getColumnCells(f
, b1
).isEmpty());
592 Assert
.assertEquals(1, hedgedReadOps
.getCount());
593 Assert
.assertEquals(0, hedgedReadWin
.getCount());
594 SlowMeCopro
.sleepTime
.set(0);
595 SlowMeCopro
.getSecondaryCdl().get().countDown();
596 LOG
.info("hedged read occurred but not faster");
599 // But if we ask for stale we will get it and hedged read returned faster
600 SlowMeCopro
.getPrimaryCdl().set(new CountDownLatch(1));
602 g
.setConsistency(Consistency
.TIMELINE
);
604 Assert
.assertTrue(r
.isStale());
605 Assert
.assertTrue(r
.getColumnCells(f
, b1
).isEmpty());
606 Assert
.assertEquals(2, hedgedReadOps
.getCount());
607 // we update the metrics after we finish the request so we use a waitFor here, use assert
608 // directly may cause failure if we run too fast.
609 HTU
.waitFor(10000, () -> hedgedReadWin
.getCount() == 1);
610 SlowMeCopro
.getPrimaryCdl().get().countDown();
611 LOG
.info("hedged read occurred and faster");
614 SlowMeCopro
.getPrimaryCdl().get().countDown();
615 SlowMeCopro
.getSecondaryCdl().get().countDown();
616 SlowMeCopro
.sleepTime
.set(0);
617 Delete d
= new Delete(b1
);
619 closeRegion(hriSecondary
);