HBASE-26567 Remove IndexType from ChunkCreator (#3947)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / AbstractTestAsyncTableRegionReplicasRead.java
blob6134a54bba548767a593fc21f3b1c0a710e266c5
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
22 import java.io.IOException;
23 import java.util.Arrays;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ForkJoinPool;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Supplier;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HBaseTestingUtil;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
35 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
36 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
37 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
38 import org.apache.hadoop.hbase.regionserver.HRegion;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
41 import org.junit.AfterClass;
42 import org.junit.Rule;
43 import org.junit.Test;
44 import org.junit.rules.TestName;
45 import org.junit.runners.Parameterized.Parameter;
46 import org.junit.runners.Parameterized.Parameters;
48 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
50 public abstract class AbstractTestAsyncTableRegionReplicasRead {
52 protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
54 protected static TableName TABLE_NAME = TableName.valueOf("async");
56 protected static byte[] FAMILY = Bytes.toBytes("cf");
58 protected static byte[] QUALIFIER = Bytes.toBytes("cq");
60 protected static byte[] ROW = Bytes.toBytes("row");
62 protected static byte[] VALUE = Bytes.toBytes("value");
64 protected static int REPLICA_COUNT = 3;
66 protected static AsyncConnection ASYNC_CONN;
68 @Rule
69 public TestName testName = new TestName();
71 @Parameter
72 public Supplier<AsyncTable<?>> getTable;
74 private static AsyncTable<?> getRawTable() {
75 return ASYNC_CONN.getTable(TABLE_NAME);
78 private static AsyncTable<?> getTable() {
79 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
82 @Parameters
83 public static List<Object[]> params() {
84 return Arrays.asList(
85 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable },
86 new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable });
89 protected static volatile boolean FAIL_PRIMARY_GET = false;
91 protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
92 new ConcurrentHashMap<>();
94 public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
96 @Override
97 public Optional<RegionObserver> getRegionObserver() {
98 return Optional.of(this);
101 private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c)
102 throws IOException {
103 RegionInfo region = c.getEnvironment().getRegionInfo();
104 if (!region.getTable().equals(TABLE_NAME)) {
105 return;
107 REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
108 .incrementAndGet();
109 if (region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
110 throw new IOException("Inject error");
114 @Override
115 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
116 List<Cell> result) throws IOException {
117 recordAndTryFail(c);
120 @Override
121 public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
122 throws IOException {
123 recordAndTryFail(c);
127 private static boolean allReplicasHaveRow(byte[] row) throws IOException {
128 for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
129 for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
130 if (region.get(new Get(row), false).isEmpty()) {
131 return false;
135 return true;
138 protected static void startClusterAndCreateTable() throws Exception {
139 TEST_UTIL.startMiniCluster(3);
140 TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
141 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
142 .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
143 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
144 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
147 protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException {
148 // this is the fastest way to let all replicas have the row
149 TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
150 TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
151 TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row));
154 @AfterClass
155 public static void tearDownAfterClass() throws Exception {
156 Closeables.close(ASYNC_CONN, true);
157 TEST_UTIL.shutdownMiniCluster();
160 protected static int getSecondaryGetCount() {
161 return REPLICA_ID_TO_COUNT.entrySet().stream()
162 .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
163 .mapToInt(e -> e.getValue().get()).sum();
166 protected static int getPrimaryGetCount() {
167 AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
168 return primaryGetCount != null ? primaryGetCount.get() : 0;
171 // replicaId = -1 means do not set replica
172 protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception;
174 @Test
175 public void testNoReplicaRead() throws Exception {
176 FAIL_PRIMARY_GET = false;
177 REPLICA_ID_TO_COUNT.clear();
178 AsyncTable<?> table = getTable.get();
179 readAndCheck(table, -1);
180 // the primary region is fine and the primary timeout is 1 second which is long enough, so we
181 // should not send any requests to secondary replicas even if the consistency is timeline.
182 Thread.sleep(5000);
183 assertEquals(0, getSecondaryGetCount());
186 @Test
187 public void testReplicaRead() throws Exception {
188 // fail the primary get request
189 FAIL_PRIMARY_GET = true;
190 REPLICA_ID_TO_COUNT.clear();
191 // make sure that we could still get the value from secondary replicas
192 AsyncTable<?> table = getTable.get();
193 readAndCheck(table, -1);
194 // make sure that the primary request has been canceled
195 Thread.sleep(5000);
196 int count = getPrimaryGetCount();
197 Thread.sleep(10000);
198 assertEquals(count, getPrimaryGetCount());
201 @Test
202 public void testReadSpecificReplica() throws Exception {
203 FAIL_PRIMARY_GET = false;
204 REPLICA_ID_TO_COUNT.clear();
205 AsyncTable<?> table = getTable.get();
206 for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
207 readAndCheck(table, replicaId);
208 assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());