HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestReplicasClient.java
blob17e8121537ac6e91930354da42d06c9dc338a53d
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import com.codahale.metrics.Counter;
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.Optional;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HBaseClassTestRule;
33 import org.apache.hadoop.hbase.HBaseTestingUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionLocation;
36 import org.apache.hadoop.hbase.NotServingRegionException;
37 import org.apache.hadoop.hbase.StartTestingClusterOption;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.TableNotFoundException;
40 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
41 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
42 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
43 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
44 import org.apache.hadoop.hbase.regionserver.HRegionServer;
45 import org.apache.hadoop.hbase.regionserver.InternalScanner;
46 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
47 import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
48 import org.apache.hadoop.hbase.testclassification.ClientTests;
49 import org.apache.hadoop.hbase.testclassification.LargeTests;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.zookeeper.KeeperException;
52 import org.junit.After;
53 import org.junit.AfterClass;
54 import org.junit.Assert;
55 import org.junit.Before;
56 import org.junit.BeforeClass;
57 import org.junit.ClassRule;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
64 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
65 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
67 /**
68 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
69 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
71 @Category({LargeTests.class, ClientTests.class})
72 @SuppressWarnings("deprecation")
73 public class TestReplicasClient {
75 @ClassRule
76 public static final HBaseClassTestRule CLASS_RULE =
77 HBaseClassTestRule.forClass(TestReplicasClient.class);
79 private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
81 private static TableName TABLE_NAME;
82 private Table table = null;
83 private static final byte[] row = Bytes.toBytes(TestReplicasClient.class.getName());;
85 private static RegionInfo hriPrimary;
86 private static RegionInfo hriSecondary;
88 private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
89 private static final byte[] f = HConstants.CATALOG_FAMILY;
91 private final static int REFRESH_PERIOD = 1000;
93 /**
94 * This copro is used to synchronize the tests.
96 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
97 static final AtomicLong sleepTime = new AtomicLong(0);
98 static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
99 static final AtomicInteger countOfNext = new AtomicInteger(0);
100 private static final AtomicReference<CountDownLatch> primaryCdl =
101 new AtomicReference<>(new CountDownLatch(0));
102 private static final AtomicReference<CountDownLatch> secondaryCdl =
103 new AtomicReference<>(new CountDownLatch(0));
104 public SlowMeCopro() {
107 @Override
108 public Optional<RegionObserver> getRegionObserver() {
109 return Optional.of(this);
112 @Override
113 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
114 final Get get, final List<Cell> results) throws IOException {
115 slowdownCode(e);
118 @Override
119 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
120 final Scan scan) throws IOException {
121 slowdownCode(e);
124 @Override
125 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
126 final InternalScanner s, final List<Result> results,
127 final int limit, final boolean hasMore) throws IOException {
128 //this will slow down a certain next operation if the conditions are met. The slowness
129 //will allow the call to go to a replica
130 if (slowDownNext.get()) {
131 //have some "next" return successfully from the primary; hence countOfNext checked
132 if (countOfNext.incrementAndGet() == 2) {
133 sleepTime.set(2000);
134 slowdownCode(e);
137 return true;
140 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
141 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
142 LOG.info("We're the primary replicas.");
143 CountDownLatch latch = getPrimaryCdl().get();
144 try {
145 if (sleepTime.get() > 0) {
146 LOG.info("Sleeping for " + sleepTime.get() + " ms");
147 Thread.sleep(sleepTime.get());
148 } else if (latch.getCount() > 0) {
149 LOG.info("Waiting for the counterCountDownLatch");
150 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
151 if (latch.getCount() > 0) {
152 throw new RuntimeException("Can't wait more");
155 } catch (InterruptedException e1) {
156 LOG.error(e1.toString(), e1);
158 } else {
159 LOG.info("We're not the primary replicas.");
160 CountDownLatch latch = getSecondaryCdl().get();
161 try {
162 if (latch.getCount() > 0) {
163 LOG.info("Waiting for the secondary counterCountDownLatch");
164 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
165 if (latch.getCount() > 0) {
166 throw new RuntimeException("Can't wait more");
169 } catch (InterruptedException e1) {
170 LOG.error(e1.toString(), e1);
175 public static AtomicReference<CountDownLatch> getPrimaryCdl() {
176 return primaryCdl;
179 public static AtomicReference<CountDownLatch> getSecondaryCdl() {
180 return secondaryCdl;
184 @BeforeClass
185 public static void beforeClass() throws Exception {
186 // enable store file refreshing
187 HTU.getConfiguration().setInt(
188 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
189 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
190 HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
191 StartTestingClusterOption option = StartTestingClusterOption.builder().numRegionServers(1).
192 numAlwaysStandByMasters(1).numMasters(1).build();
193 HTU.startMiniCluster(option);
195 // Create table then get the single region for our new table.
196 TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor(
197 TableName.valueOf(TestReplicasClient.class.getSimpleName()),
198 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
199 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
200 builder.setCoprocessor(SlowMeCopro.class.getName());
201 TableDescriptor hdt = builder.build();
202 HTU.createTable(hdt, new byte[][]{f}, null);
203 TABLE_NAME = hdt.getTableName();
204 try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
205 hriPrimary = locator.getRegionLocation(row, false).getRegion();
208 // mock a secondary region info to open
209 hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
211 // No master
212 LOG.info("Master is going to be stopped");
213 TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
214 Configuration c = new Configuration(HTU.getConfiguration());
215 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
216 LOG.info("Master has stopped");
219 @AfterClass
220 public static void afterClass() throws Exception {
221 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
222 HTU.shutdownMiniCluster();
225 @Before
226 public void before() throws IOException {
227 HTU.getConnection().clearRegionLocationCache();
228 try {
229 openRegion(hriPrimary);
230 } catch (Exception ignored) {
232 try {
233 openRegion(hriSecondary);
234 } catch (Exception ignored) {
236 table = HTU.getConnection().getTable(TABLE_NAME);
239 @After
240 public void after() throws IOException, KeeperException {
241 try {
242 closeRegion(hriSecondary);
243 } catch (Exception ignored) {
245 try {
246 closeRegion(hriPrimary);
247 } catch (Exception ignored) {
249 HTU.getConnection().clearRegionLocationCache();
252 private HRegionServer getRS() {
253 return HTU.getMiniHBaseCluster().getRegionServer(0);
256 private void openRegion(RegionInfo hri) throws Exception {
257 try {
258 if (isRegionOpened(hri)) return;
259 } catch (Exception e){}
260 // first version is '0'
261 AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
262 getRS().getServerName(), hri, null);
263 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
264 Assert.assertEquals(1, responseOpen.getOpeningStateCount());
265 Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
266 responseOpen.getOpeningState(0));
267 checkRegionIsOpened(hri);
270 private void closeRegion(RegionInfo hri) throws Exception {
271 AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest(
272 getRS().getServerName(), hri.getRegionName());
273 AdminProtos.CloseRegionResponse responseClose = getRS()
274 .getRSRpcServices().closeRegion(null, crr);
275 Assert.assertTrue(responseClose.getClosed());
277 checkRegionIsClosed(hri.getEncodedName());
280 private void checkRegionIsOpened(RegionInfo hri) throws Exception {
281 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
282 Thread.sleep(1);
286 private boolean isRegionOpened(RegionInfo hri) throws Exception {
287 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
290 private void checkRegionIsClosed(String encodedRegionName) throws Exception {
292 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
293 Thread.sleep(1);
296 try {
297 Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
298 } catch (NotServingRegionException expected) {
299 // That's how it work: if the region is closed we have an exception.
302 // We don't delete the znode here, because there is not always a znode.
305 private void flushRegion(RegionInfo regionInfo) throws IOException {
306 TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
309 @Test
310 public void testUseRegionWithoutReplica() throws Exception {
311 byte[] b1 = Bytes.toBytes("testUseRegionWithoutReplica");
312 openRegion(hriSecondary);
313 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
314 try {
315 Get g = new Get(b1);
316 Result r = table.get(g);
317 Assert.assertFalse(r.isStale());
318 } finally {
319 closeRegion(hriSecondary);
323 @Test
324 public void testLocations() throws Exception {
325 byte[] b1 = Bytes.toBytes("testLocations");
326 openRegion(hriSecondary);
328 try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration());
329 RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
330 conn.clearRegionLocationCache();
331 List<HRegionLocation> rl = locator.getRegionLocations(b1, true);
332 Assert.assertEquals(2, rl.size());
334 rl = locator.getRegionLocations(b1, false);
335 Assert.assertEquals(2, rl.size());
337 conn.clearRegionLocationCache();
338 rl = locator.getRegionLocations(b1, false);
339 Assert.assertEquals(2, rl.size());
341 rl = locator.getRegionLocations(b1, true);
342 Assert.assertEquals(2, rl.size());
343 } finally {
344 closeRegion(hriSecondary);
348 @Test
349 public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
350 byte[] b1 = Bytes.toBytes("testGetNoResultNoStaleRegionWithReplica");
351 openRegion(hriSecondary);
353 try {
354 // A get works and is not stale
355 Get g = new Get(b1);
356 Result r = table.get(g);
357 Assert.assertFalse(r.isStale());
358 } finally {
359 closeRegion(hriSecondary);
364 @Test
365 public void testGetNoResultStaleRegionWithReplica() throws Exception {
366 byte[] b1 = Bytes.toBytes("testGetNoResultStaleRegionWithReplica");
367 openRegion(hriSecondary);
369 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
370 try {
371 Get g = new Get(b1);
372 g.setConsistency(Consistency.TIMELINE);
373 Result r = table.get(g);
374 Assert.assertTrue(r.isStale());
375 } finally {
376 SlowMeCopro.getPrimaryCdl().get().countDown();
377 closeRegion(hriSecondary);
381 @Test
382 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
383 byte[] b1 = Bytes.toBytes("testGetNoResultNotStaleSleepRegionWithReplica");
384 openRegion(hriSecondary);
386 try {
387 // We sleep; but we won't go to the stale region as we don't get the stale by default.
388 SlowMeCopro.sleepTime.set(2000);
389 Get g = new Get(b1);
390 Result r = table.get(g);
391 Assert.assertFalse(r.isStale());
393 } finally {
394 SlowMeCopro.sleepTime.set(0);
395 closeRegion(hriSecondary);
399 @Test
400 public void testFlushTable() throws Exception {
401 openRegion(hriSecondary);
402 try {
403 flushRegion(hriPrimary);
404 flushRegion(hriSecondary);
406 Put p = new Put(row);
407 p.addColumn(f, row, row);
408 table.put(p);
410 flushRegion(hriPrimary);
411 flushRegion(hriSecondary);
412 } finally {
413 Delete d = new Delete(row);
414 table.delete(d);
415 closeRegion(hriSecondary);
419 @Test
420 public void testFlushPrimary() throws Exception {
421 openRegion(hriSecondary);
423 try {
424 flushRegion(hriPrimary);
426 Put p = new Put(row);
427 p.addColumn(f, row, row);
428 table.put(p);
430 flushRegion(hriPrimary);
431 } finally {
432 Delete d = new Delete(row);
433 table.delete(d);
434 closeRegion(hriSecondary);
438 @Test
439 public void testFlushSecondary() throws Exception {
440 openRegion(hriSecondary);
441 try {
442 flushRegion(hriSecondary);
444 Put p = new Put(row);
445 p.addColumn(f, row, row);
446 table.put(p);
448 flushRegion(hriSecondary);
449 } catch (TableNotFoundException expected) {
450 } finally {
451 Delete d = new Delete(row);
452 table.delete(d);
453 closeRegion(hriSecondary);
457 @Test
458 public void testUseRegionWithReplica() throws Exception {
459 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica");
460 openRegion(hriSecondary);
462 try {
463 // A simple put works, even if there here a second replica
464 Put p = new Put(b1);
465 p.addColumn(f, b1, b1);
466 table.put(p);
467 LOG.info("Put done");
469 // A get works and is not stale
470 Get g = new Get(b1);
471 Result r = table.get(g);
472 Assert.assertFalse(r.isStale());
473 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
474 LOG.info("get works and is not stale done");
476 // Even if it we have to wait a little on the main region
477 SlowMeCopro.sleepTime.set(2000);
478 g = new Get(b1);
479 r = table.get(g);
480 Assert.assertFalse(r.isStale());
481 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
482 SlowMeCopro.sleepTime.set(0);
483 LOG.info("sleep and is not stale done");
485 // But if we ask for stale we will get it
486 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
487 g = new Get(b1);
488 g.setConsistency(Consistency.TIMELINE);
489 r = table.get(g);
490 Assert.assertTrue(r.isStale());
491 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
492 SlowMeCopro.getPrimaryCdl().get().countDown();
494 LOG.info("stale done");
496 // exists works and is not stale
497 g = new Get(b1);
498 g.setCheckExistenceOnly(true);
499 r = table.get(g);
500 Assert.assertFalse(r.isStale());
501 Assert.assertTrue(r.getExists());
502 LOG.info("exists not stale done");
504 // exists works on stale but don't see the put
505 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
506 g = new Get(b1);
507 g.setCheckExistenceOnly(true);
508 g.setConsistency(Consistency.TIMELINE);
509 r = table.get(g);
510 Assert.assertTrue(r.isStale());
511 Assert.assertFalse("The secondary has stale data", r.getExists());
512 SlowMeCopro.getPrimaryCdl().get().countDown();
513 LOG.info("exists stale before flush done");
515 flushRegion(hriPrimary);
516 flushRegion(hriSecondary);
517 LOG.info("flush done");
518 Thread.sleep(1000 + REFRESH_PERIOD * 2);
520 // get works and is not stale
521 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
522 g = new Get(b1);
523 g.setConsistency(Consistency.TIMELINE);
524 r = table.get(g);
525 Assert.assertTrue(r.isStale());
526 Assert.assertFalse(r.isEmpty());
527 SlowMeCopro.getPrimaryCdl().get().countDown();
528 LOG.info("stale done");
530 // exists works on stale and we see the put after the flush
531 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
532 g = new Get(b1);
533 g.setCheckExistenceOnly(true);
534 g.setConsistency(Consistency.TIMELINE);
535 r = table.get(g);
536 Assert.assertTrue(r.isStale());
537 Assert.assertTrue(r.getExists());
538 SlowMeCopro.getPrimaryCdl().get().countDown();
539 LOG.info("exists stale after flush done");
541 } finally {
542 SlowMeCopro.getPrimaryCdl().get().countDown();
543 SlowMeCopro.sleepTime.set(0);
544 Delete d = new Delete(b1);
545 table.delete(d);
546 closeRegion(hriSecondary);
550 @Test
551 public void testHedgedRead() throws Exception {
552 byte[] b1 = Bytes.toBytes("testHedgedRead");
553 openRegion(hriSecondary);
555 try {
556 // A simple put works, even if there here a second replica
557 Put p = new Put(b1);
558 p.addColumn(f, b1, b1);
559 table.put(p);
560 LOG.info("Put done");
562 // A get works and is not stale
563 Get g = new Get(b1);
564 Result r = table.get(g);
565 Assert.assertFalse(r.isStale());
566 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
567 LOG.info("get works and is not stale done");
569 //reset
570 AsyncConnectionImpl conn = (AsyncConnectionImpl) HTU.getConnection().toAsyncConnection();
571 Counter hedgedReadOps = conn.getConnectionMetrics().get().hedgedReadOps;
572 Counter hedgedReadWin = conn.getConnectionMetrics().get().hedgedReadWin;
573 hedgedReadOps.dec(hedgedReadOps.getCount());
574 hedgedReadWin.dec(hedgedReadWin.getCount());
576 // Wait a little on the main region, just enough to happen once hedged read
577 // and hedged read did not returned faster
578 long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
579 // The resolution of our timer is 10ms, so we need to sleep a bit more otherwise we may not
580 // trigger the hedged read...
581 SlowMeCopro.sleepTime.set(TimeUnit.NANOSECONDS.toMillis(primaryCallTimeoutNs) + 100);
582 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
583 g = new Get(b1);
584 g.setConsistency(Consistency.TIMELINE);
585 r = table.get(g);
586 Assert.assertFalse(r.isStale());
587 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
588 Assert.assertEquals(1, hedgedReadOps.getCount());
589 Assert.assertEquals(0, hedgedReadWin.getCount());
590 SlowMeCopro.sleepTime.set(0);
591 SlowMeCopro.getSecondaryCdl().get().countDown();
592 LOG.info("hedged read occurred but not faster");
595 // But if we ask for stale we will get it and hedged read returned faster
596 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
597 g = new Get(b1);
598 g.setConsistency(Consistency.TIMELINE);
599 r = table.get(g);
600 Assert.assertTrue(r.isStale());
601 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
602 Assert.assertEquals(2, hedgedReadOps.getCount());
603 // we update the metrics after we finish the request so we use a waitFor here, use assert
604 // directly may cause failure if we run too fast.
605 HTU.waitFor(10000, () -> hedgedReadWin.getCount() == 1);
606 SlowMeCopro.getPrimaryCdl().get().countDown();
607 LOG.info("hedged read occurred and faster");
609 } finally {
610 SlowMeCopro.getPrimaryCdl().get().countDown();
611 SlowMeCopro.getSecondaryCdl().get().countDown();
612 SlowMeCopro.sleepTime.set(0);
613 Delete d = new Delete(b1);
614 table.delete(d);
615 closeRegion(hriSecondary);