HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestReplicasClient.java
blob28e2ec6040500ff77d66383357b0aa47d028e682
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import com.codahale.metrics.Counter;
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.Optional;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HBaseClassTestRule;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HRegionInfo;
37 import org.apache.hadoop.hbase.HRegionLocation;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.NotServingRegionException;
40 import org.apache.hadoop.hbase.StartMiniClusterOption;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.TableNotFoundException;
43 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
44 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
45 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
46 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
47 import org.apache.hadoop.hbase.regionserver.HRegionServer;
48 import org.apache.hadoop.hbase.regionserver.InternalScanner;
49 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
50 import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
51 import org.apache.hadoop.hbase.testclassification.ClientTests;
52 import org.apache.hadoop.hbase.testclassification.LargeTests;
53 import org.apache.hadoop.hbase.util.Bytes;
54 import org.apache.zookeeper.KeeperException;
55 import org.junit.After;
56 import org.junit.AfterClass;
57 import org.junit.Assert;
58 import org.junit.Before;
59 import org.junit.BeforeClass;
60 import org.junit.ClassRule;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
66 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
67 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
68 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
70 /**
71 * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
72 * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
74 @Category({LargeTests.class, ClientTests.class})
75 @SuppressWarnings("deprecation")
76 public class TestReplicasClient {
78 @ClassRule
79 public static final HBaseClassTestRule CLASS_RULE =
80 HBaseClassTestRule.forClass(TestReplicasClient.class);
82 private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
84 private static final int NB_SERVERS = 1;
85 private static TableName TABLE_NAME;
86 private Table table = null;
87 private static final byte[] row = Bytes.toBytes(TestReplicasClient.class.getName());;
89 private static RegionInfo hriPrimary;
90 private static HRegionInfo hriSecondary;
92 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
93 private static final byte[] f = HConstants.CATALOG_FAMILY;
95 private final static int REFRESH_PERIOD = 1000;
97 /**
98 * This copro is used to synchronize the tests.
100 public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
101 static final AtomicLong sleepTime = new AtomicLong(0);
102 static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
103 static final AtomicInteger countOfNext = new AtomicInteger(0);
104 private static final AtomicReference<CountDownLatch> primaryCdl =
105 new AtomicReference<>(new CountDownLatch(0));
106 private static final AtomicReference<CountDownLatch> secondaryCdl =
107 new AtomicReference<>(new CountDownLatch(0));
108 public SlowMeCopro() {
111 @Override
112 public Optional<RegionObserver> getRegionObserver() {
113 return Optional.of(this);
116 @Override
117 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
118 final Get get, final List<Cell> results) throws IOException {
119 slowdownCode(e);
122 @Override
123 public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
124 final Scan scan) throws IOException {
125 slowdownCode(e);
128 @Override
129 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
130 final InternalScanner s, final List<Result> results,
131 final int limit, final boolean hasMore) throws IOException {
132 //this will slow down a certain next operation if the conditions are met. The slowness
133 //will allow the call to go to a replica
134 if (slowDownNext.get()) {
135 //have some "next" return successfully from the primary; hence countOfNext checked
136 if (countOfNext.incrementAndGet() == 2) {
137 sleepTime.set(2000);
138 slowdownCode(e);
141 return true;
144 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
145 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
146 LOG.info("We're the primary replicas.");
147 CountDownLatch latch = getPrimaryCdl().get();
148 try {
149 if (sleepTime.get() > 0) {
150 LOG.info("Sleeping for " + sleepTime.get() + " ms");
151 Thread.sleep(sleepTime.get());
152 } else if (latch.getCount() > 0) {
153 LOG.info("Waiting for the counterCountDownLatch");
154 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
155 if (latch.getCount() > 0) {
156 throw new RuntimeException("Can't wait more");
159 } catch (InterruptedException e1) {
160 LOG.error(e1.toString(), e1);
162 } else {
163 LOG.info("We're not the primary replicas.");
164 CountDownLatch latch = getSecondaryCdl().get();
165 try {
166 if (latch.getCount() > 0) {
167 LOG.info("Waiting for the secondary counterCountDownLatch");
168 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
169 if (latch.getCount() > 0) {
170 throw new RuntimeException("Can't wait more");
173 } catch (InterruptedException e1) {
174 LOG.error(e1.toString(), e1);
179 public static AtomicReference<CountDownLatch> getPrimaryCdl() {
180 return primaryCdl;
183 public static AtomicReference<CountDownLatch> getSecondaryCdl() {
184 return secondaryCdl;
188 @BeforeClass
189 public static void beforeClass() throws Exception {
190 // enable store file refreshing
191 HTU.getConfiguration().setInt(
192 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
193 HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
194 HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
195 StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1).
196 numAlwaysStandByMasters(1).numMasters(1).build();
197 HTU.startMiniCluster(option);
199 // Create table then get the single region for our new table.
200 HTableDescriptor hdt = HTU.createTableDescriptor(
201 TableName.valueOf(TestReplicasClient.class.getSimpleName()),
202 HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
203 HColumnDescriptor.DEFAULT_KEEP_DELETED);
204 hdt.addCoprocessor(SlowMeCopro.class.getName());
205 HTU.createTable(hdt, new byte[][]{f}, null);
206 TABLE_NAME = hdt.getTableName();
207 try (RegionLocator locator = HTU.getConnection().getRegionLocator(hdt.getTableName())) {
208 hriPrimary = locator.getRegionLocation(row, false).getRegion();
211 // mock a secondary region info to open
212 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
213 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
215 // No master
216 LOG.info("Master is going to be stopped");
217 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
218 Configuration c = new Configuration(HTU.getConfiguration());
219 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
220 LOG.info("Master has stopped");
223 @AfterClass
224 public static void afterClass() throws Exception {
225 HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
226 HTU.shutdownMiniCluster();
229 @Before
230 public void before() throws IOException {
231 HTU.getConnection().clearRegionLocationCache();
232 try {
233 openRegion(hriPrimary);
234 } catch (Exception ignored) {
236 try {
237 openRegion(hriSecondary);
238 } catch (Exception ignored) {
240 table = HTU.getConnection().getTable(TABLE_NAME);
243 @After
244 public void after() throws IOException, KeeperException {
245 try {
246 closeRegion(hriSecondary);
247 } catch (Exception ignored) {
249 try {
250 closeRegion(hriPrimary);
251 } catch (Exception ignored) {
253 HTU.getConnection().clearRegionLocationCache();
256 private HRegionServer getRS() {
257 return HTU.getMiniHBaseCluster().getRegionServer(0);
260 private void openRegion(RegionInfo hri) throws Exception {
261 try {
262 if (isRegionOpened(hri)) return;
263 } catch (Exception e){}
264 // first version is '0'
265 AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
266 getRS().getServerName(), hri, null);
267 AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
268 Assert.assertEquals(1, responseOpen.getOpeningStateCount());
269 Assert.assertEquals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED,
270 responseOpen.getOpeningState(0));
271 checkRegionIsOpened(hri);
274 private void closeRegion(RegionInfo hri) throws Exception {
275 AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest(
276 getRS().getServerName(), hri.getRegionName());
277 AdminProtos.CloseRegionResponse responseClose = getRS()
278 .getRSRpcServices().closeRegion(null, crr);
279 Assert.assertTrue(responseClose.getClosed());
281 checkRegionIsClosed(hri.getEncodedName());
284 private void checkRegionIsOpened(RegionInfo hri) throws Exception {
285 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
286 Thread.sleep(1);
290 private boolean isRegionOpened(RegionInfo hri) throws Exception {
291 return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
294 private void checkRegionIsClosed(String encodedRegionName) throws Exception {
296 while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
297 Thread.sleep(1);
300 try {
301 Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
302 } catch (NotServingRegionException expected) {
303 // That's how it work: if the region is closed we have an exception.
306 // We don't delete the znode here, because there is not always a znode.
309 private void flushRegion(RegionInfo regionInfo) throws IOException {
310 TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
313 @Test
314 public void testUseRegionWithoutReplica() throws Exception {
315 byte[] b1 = Bytes.toBytes("testUseRegionWithoutReplica");
316 openRegion(hriSecondary);
317 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
318 try {
319 Get g = new Get(b1);
320 Result r = table.get(g);
321 Assert.assertFalse(r.isStale());
322 } finally {
323 closeRegion(hriSecondary);
327 @Test
328 public void testLocations() throws Exception {
329 byte[] b1 = Bytes.toBytes("testLocations");
330 openRegion(hriSecondary);
332 try (Connection conn = ConnectionFactory.createConnection(HTU.getConfiguration());
333 RegionLocator locator = conn.getRegionLocator(TABLE_NAME)) {
334 conn.clearRegionLocationCache();
335 List<HRegionLocation> rl = locator.getRegionLocations(b1, true);
336 Assert.assertEquals(2, rl.size());
338 rl = locator.getRegionLocations(b1, false);
339 Assert.assertEquals(2, rl.size());
341 conn.clearRegionLocationCache();
342 rl = locator.getRegionLocations(b1, false);
343 Assert.assertEquals(2, rl.size());
345 rl = locator.getRegionLocations(b1, true);
346 Assert.assertEquals(2, rl.size());
347 } finally {
348 closeRegion(hriSecondary);
352 @Test
353 public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
354 byte[] b1 = Bytes.toBytes("testGetNoResultNoStaleRegionWithReplica");
355 openRegion(hriSecondary);
357 try {
358 // A get works and is not stale
359 Get g = new Get(b1);
360 Result r = table.get(g);
361 Assert.assertFalse(r.isStale());
362 } finally {
363 closeRegion(hriSecondary);
368 @Test
369 public void testGetNoResultStaleRegionWithReplica() throws Exception {
370 byte[] b1 = Bytes.toBytes("testGetNoResultStaleRegionWithReplica");
371 openRegion(hriSecondary);
373 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
374 try {
375 Get g = new Get(b1);
376 g.setConsistency(Consistency.TIMELINE);
377 Result r = table.get(g);
378 Assert.assertTrue(r.isStale());
379 } finally {
380 SlowMeCopro.getPrimaryCdl().get().countDown();
381 closeRegion(hriSecondary);
385 @Test
386 public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
387 byte[] b1 = Bytes.toBytes("testGetNoResultNotStaleSleepRegionWithReplica");
388 openRegion(hriSecondary);
390 try {
391 // We sleep; but we won't go to the stale region as we don't get the stale by default.
392 SlowMeCopro.sleepTime.set(2000);
393 Get g = new Get(b1);
394 Result r = table.get(g);
395 Assert.assertFalse(r.isStale());
397 } finally {
398 SlowMeCopro.sleepTime.set(0);
399 closeRegion(hriSecondary);
403 @Test
404 public void testFlushTable() throws Exception {
405 openRegion(hriSecondary);
406 try {
407 flushRegion(hriPrimary);
408 flushRegion(hriSecondary);
410 Put p = new Put(row);
411 p.addColumn(f, row, row);
412 table.put(p);
414 flushRegion(hriPrimary);
415 flushRegion(hriSecondary);
416 } finally {
417 Delete d = new Delete(row);
418 table.delete(d);
419 closeRegion(hriSecondary);
423 @Test
424 public void testFlushPrimary() throws Exception {
425 openRegion(hriSecondary);
427 try {
428 flushRegion(hriPrimary);
430 Put p = new Put(row);
431 p.addColumn(f, row, row);
432 table.put(p);
434 flushRegion(hriPrimary);
435 } finally {
436 Delete d = new Delete(row);
437 table.delete(d);
438 closeRegion(hriSecondary);
442 @Test
443 public void testFlushSecondary() throws Exception {
444 openRegion(hriSecondary);
445 try {
446 flushRegion(hriSecondary);
448 Put p = new Put(row);
449 p.addColumn(f, row, row);
450 table.put(p);
452 flushRegion(hriSecondary);
453 } catch (TableNotFoundException expected) {
454 } finally {
455 Delete d = new Delete(row);
456 table.delete(d);
457 closeRegion(hriSecondary);
461 @Test
462 public void testUseRegionWithReplica() throws Exception {
463 byte[] b1 = Bytes.toBytes("testUseRegionWithReplica");
464 openRegion(hriSecondary);
466 try {
467 // A simple put works, even if there here a second replica
468 Put p = new Put(b1);
469 p.addColumn(f, b1, b1);
470 table.put(p);
471 LOG.info("Put done");
473 // A get works and is not stale
474 Get g = new Get(b1);
475 Result r = table.get(g);
476 Assert.assertFalse(r.isStale());
477 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
478 LOG.info("get works and is not stale done");
480 // Even if it we have to wait a little on the main region
481 SlowMeCopro.sleepTime.set(2000);
482 g = new Get(b1);
483 r = table.get(g);
484 Assert.assertFalse(r.isStale());
485 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
486 SlowMeCopro.sleepTime.set(0);
487 LOG.info("sleep and is not stale done");
489 // But if we ask for stale we will get it
490 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
491 g = new Get(b1);
492 g.setConsistency(Consistency.TIMELINE);
493 r = table.get(g);
494 Assert.assertTrue(r.isStale());
495 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
496 SlowMeCopro.getPrimaryCdl().get().countDown();
498 LOG.info("stale done");
500 // exists works and is not stale
501 g = new Get(b1);
502 g.setCheckExistenceOnly(true);
503 r = table.get(g);
504 Assert.assertFalse(r.isStale());
505 Assert.assertTrue(r.getExists());
506 LOG.info("exists not stale done");
508 // exists works on stale but don't see the put
509 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
510 g = new Get(b1);
511 g.setCheckExistenceOnly(true);
512 g.setConsistency(Consistency.TIMELINE);
513 r = table.get(g);
514 Assert.assertTrue(r.isStale());
515 Assert.assertFalse("The secondary has stale data", r.getExists());
516 SlowMeCopro.getPrimaryCdl().get().countDown();
517 LOG.info("exists stale before flush done");
519 flushRegion(hriPrimary);
520 flushRegion(hriSecondary);
521 LOG.info("flush done");
522 Thread.sleep(1000 + REFRESH_PERIOD * 2);
524 // get works and is not stale
525 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
526 g = new Get(b1);
527 g.setConsistency(Consistency.TIMELINE);
528 r = table.get(g);
529 Assert.assertTrue(r.isStale());
530 Assert.assertFalse(r.isEmpty());
531 SlowMeCopro.getPrimaryCdl().get().countDown();
532 LOG.info("stale done");
534 // exists works on stale and we see the put after the flush
535 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
536 g = new Get(b1);
537 g.setCheckExistenceOnly(true);
538 g.setConsistency(Consistency.TIMELINE);
539 r = table.get(g);
540 Assert.assertTrue(r.isStale());
541 Assert.assertTrue(r.getExists());
542 SlowMeCopro.getPrimaryCdl().get().countDown();
543 LOG.info("exists stale after flush done");
545 } finally {
546 SlowMeCopro.getPrimaryCdl().get().countDown();
547 SlowMeCopro.sleepTime.set(0);
548 Delete d = new Delete(b1);
549 table.delete(d);
550 closeRegion(hriSecondary);
554 @Test
555 public void testHedgedRead() throws Exception {
556 byte[] b1 = Bytes.toBytes("testHedgedRead");
557 openRegion(hriSecondary);
559 try {
560 // A simple put works, even if there here a second replica
561 Put p = new Put(b1);
562 p.addColumn(f, b1, b1);
563 table.put(p);
564 LOG.info("Put done");
566 // A get works and is not stale
567 Get g = new Get(b1);
568 Result r = table.get(g);
569 Assert.assertFalse(r.isStale());
570 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
571 LOG.info("get works and is not stale done");
573 //reset
574 AsyncConnectionImpl conn = (AsyncConnectionImpl) HTU.getConnection().toAsyncConnection();
575 Counter hedgedReadOps = conn.getConnectionMetrics().get().hedgedReadOps;
576 Counter hedgedReadWin = conn.getConnectionMetrics().get().hedgedReadWin;
577 hedgedReadOps.dec(hedgedReadOps.getCount());
578 hedgedReadWin.dec(hedgedReadWin.getCount());
580 // Wait a little on the main region, just enough to happen once hedged read
581 // and hedged read did not returned faster
582 long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
583 // The resolution of our timer is 10ms, so we need to sleep a bit more otherwise we may not
584 // trigger the hedged read...
585 SlowMeCopro.sleepTime.set(TimeUnit.NANOSECONDS.toMillis(primaryCallTimeoutNs) + 100);
586 SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
587 g = new Get(b1);
588 g.setConsistency(Consistency.TIMELINE);
589 r = table.get(g);
590 Assert.assertFalse(r.isStale());
591 Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
592 Assert.assertEquals(1, hedgedReadOps.getCount());
593 Assert.assertEquals(0, hedgedReadWin.getCount());
594 SlowMeCopro.sleepTime.set(0);
595 SlowMeCopro.getSecondaryCdl().get().countDown();
596 LOG.info("hedged read occurred but not faster");
599 // But if we ask for stale we will get it and hedged read returned faster
600 SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
601 g = new Get(b1);
602 g.setConsistency(Consistency.TIMELINE);
603 r = table.get(g);
604 Assert.assertTrue(r.isStale());
605 Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
606 Assert.assertEquals(2, hedgedReadOps.getCount());
607 // we update the metrics after we finish the request so we use a waitFor here, use assert
608 // directly may cause failure if we run too fast.
609 HTU.waitFor(10000, () -> hedgedReadWin.getCount() == 1);
610 SlowMeCopro.getPrimaryCdl().get().countDown();
611 LOG.info("hedged read occurred and faster");
613 } finally {
614 SlowMeCopro.getPrimaryCdl().get().countDown();
615 SlowMeCopro.getSecondaryCdl().get().countDown();
616 SlowMeCopro.sleepTime.set(0);
617 Delete d = new Delete(b1);
618 table.delete(d);
619 closeRegion(hriSecondary);