HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / AbstractTestAsyncTableRegionReplicasRead.java
blob65c537aa31bc0893b4231f81f5eb3222c256ff80
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
22 import java.io.IOException;
23 import java.util.Arrays;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ForkJoinPool;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Supplier;
31 import org.apache.commons.io.IOUtils;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
36 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
37 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
38 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
39 import org.apache.hadoop.hbase.regionserver.HRegion;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
42 import org.junit.AfterClass;
43 import org.junit.Rule;
44 import org.junit.Test;
45 import org.junit.rules.TestName;
46 import org.junit.runners.Parameterized.Parameter;
47 import org.junit.runners.Parameterized.Parameters;
49 public abstract class AbstractTestAsyncTableRegionReplicasRead {
51 protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
53 protected static TableName TABLE_NAME = TableName.valueOf("async");
55 protected static byte[] FAMILY = Bytes.toBytes("cf");
57 protected static byte[] QUALIFIER = Bytes.toBytes("cq");
59 protected static byte[] ROW = Bytes.toBytes("row");
61 protected static byte[] VALUE = Bytes.toBytes("value");
63 protected static int REPLICA_COUNT = 3;
65 protected static AsyncConnection ASYNC_CONN;
67 @Rule
68 public TestName testName = new TestName();
70 @Parameter
71 public Supplier<AsyncTable<?>> getTable;
73 private static AsyncTable<?> getRawTable() {
74 return ASYNC_CONN.getTable(TABLE_NAME);
77 private static AsyncTable<?> getTable() {
78 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
81 @Parameters
82 public static List<Object[]> params() {
83 return Arrays.asList(
84 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable },
85 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable });
88 protected static volatile boolean FAIL_PRIMARY_GET = false;
90 protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
91 new ConcurrentHashMap<>();
93 public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
95 @Override
96 public Optional<RegionObserver> getRegionObserver() {
97 return Optional.of(this);
100 private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c)
101 throws IOException {
102 RegionInfo region = c.getEnvironment().getRegionInfo();
103 if (!region.getTable().equals(TABLE_NAME)) {
104 return;
106 REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
107 .incrementAndGet();
108 if (region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
109 throw new IOException("Inject error");
113 @Override
114 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
115 List<Cell> result) throws IOException {
116 recordAndTryFail(c);
119 @Override
120 public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
121 throws IOException {
122 recordAndTryFail(c);
126 private static boolean allReplicasHaveRow(byte[] row) throws IOException {
127 for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
128 for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
129 if (region.get(new Get(row), false).isEmpty()) {
130 return false;
134 return true;
137 protected static void startClusterAndCreateTable() throws Exception {
138 TEST_UTIL.startMiniCluster(3);
139 TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
140 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
141 .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
142 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
143 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
146 protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException {
147 // this is the fastest way to let all replicas have the row
148 TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
149 TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
150 TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row));
153 @AfterClass
154 public static void tearDownAfterClass() throws Exception {
155 IOUtils.closeQuietly(ASYNC_CONN);
156 TEST_UTIL.shutdownMiniCluster();
159 protected static int getSecondaryGetCount() {
160 return REPLICA_ID_TO_COUNT.entrySet().stream()
161 .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
162 .mapToInt(e -> e.getValue().get()).sum();
165 protected static int getPrimaryGetCount() {
166 AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
167 return primaryGetCount != null ? primaryGetCount.get() : 0;
170 // replicaId = -1 means do not set replica
171 protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception;
173 @Test
174 public void testNoReplicaRead() throws Exception {
175 FAIL_PRIMARY_GET = false;
176 REPLICA_ID_TO_COUNT.clear();
177 AsyncTable<?> table = getTable.get();
178 readAndCheck(table, -1);
179 // the primary region is fine and the primary timeout is 1 second which is long enough, so we
180 // should not send any requests to secondary replicas even if the consistency is timeline.
181 Thread.sleep(5000);
182 assertEquals(0, getSecondaryGetCount());
185 @Test
186 public void testReplicaRead() throws Exception {
187 // fail the primary get request
188 FAIL_PRIMARY_GET = true;
189 REPLICA_ID_TO_COUNT.clear();
190 // make sure that we could still get the value from secondary replicas
191 AsyncTable<?> table = getTable.get();
192 readAndCheck(table, -1);
193 // make sure that the primary request has been canceled
194 Thread.sleep(5000);
195 int count = getPrimaryGetCount();
196 Thread.sleep(10000);
197 assertEquals(count, getPrimaryGetCount());
200 @Test
201 public void testReadSpecificReplica() throws Exception {
202 FAIL_PRIMARY_GET = false;
203 REPLICA_ID_TO_COUNT.clear();
204 AsyncTable<?> table = getTable.get();
205 for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
206 readAndCheck(table, replicaId);
207 assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());