2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
22 import java
.io
.IOException
;
23 import java
.util
.Arrays
;
24 import java
.util
.List
;
25 import java
.util
.Optional
;
26 import java
.util
.concurrent
.ConcurrentHashMap
;
27 import java
.util
.concurrent
.ConcurrentMap
;
28 import java
.util
.concurrent
.ForkJoinPool
;
29 import java
.util
.concurrent
.atomic
.AtomicInteger
;
30 import java
.util
.function
.Supplier
;
31 import org
.apache
.hadoop
.hbase
.Cell
;
32 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
35 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
36 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
37 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
38 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
39 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
40 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
41 import org
.junit
.AfterClass
;
42 import org
.junit
.Rule
;
43 import org
.junit
.Test
;
44 import org
.junit
.rules
.TestName
;
45 import org
.junit
.runners
.Parameterized
.Parameter
;
46 import org
.junit
.runners
.Parameterized
.Parameters
;
48 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.io
.Closeables
;
50 public abstract class AbstractTestAsyncTableRegionReplicasRead
{
52 protected static final HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
54 protected static TableName TABLE_NAME
= TableName
.valueOf("async");
56 protected static byte[] FAMILY
= Bytes
.toBytes("cf");
58 protected static byte[] QUALIFIER
= Bytes
.toBytes("cq");
60 protected static byte[] ROW
= Bytes
.toBytes("row");
62 protected static byte[] VALUE
= Bytes
.toBytes("value");
64 protected static int REPLICA_COUNT
= 3;
66 protected static AsyncConnection ASYNC_CONN
;
69 public TestName testName
= new TestName();
72 public Supplier
<AsyncTable
<?
>> getTable
;
74 private static AsyncTable
<?
> getRawTable() {
75 return ASYNC_CONN
.getTable(TABLE_NAME
);
78 private static AsyncTable
<?
> getTable() {
79 return ASYNC_CONN
.getTable(TABLE_NAME
, ForkJoinPool
.commonPool());
83 public static List
<Object
[]> params() {
85 new Supplier
<?
>[] { AbstractTestAsyncTableRegionReplicasRead
::getRawTable
},
86 new Supplier
<?
>[] { AbstractTestAsyncTableRegionReplicasRead
::getTable
});
89 protected static volatile boolean FAIL_PRIMARY_GET
= false;
91 protected static ConcurrentMap
<Integer
, AtomicInteger
> REPLICA_ID_TO_COUNT
=
92 new ConcurrentHashMap
<>();
94 public static final class FailPrimaryGetCP
implements RegionObserver
, RegionCoprocessor
{
97 public Optional
<RegionObserver
> getRegionObserver() {
98 return Optional
.of(this);
101 private void recordAndTryFail(ObserverContext
<RegionCoprocessorEnvironment
> c
)
103 RegionInfo region
= c
.getEnvironment().getRegionInfo();
104 if (!region
.getTable().equals(TABLE_NAME
)) {
107 REPLICA_ID_TO_COUNT
.computeIfAbsent(region
.getReplicaId(), k
-> new AtomicInteger())
109 if (region
.getReplicaId() == RegionReplicaUtil
.DEFAULT_REPLICA_ID
&& FAIL_PRIMARY_GET
) {
110 throw new IOException("Inject error");
115 public void preGetOp(ObserverContext
<RegionCoprocessorEnvironment
> c
, Get get
,
116 List
<Cell
> result
) throws IOException
{
121 public void preScannerOpen(ObserverContext
<RegionCoprocessorEnvironment
> c
, Scan scan
)
127 private static boolean allReplicasHaveRow(byte[] row
) throws IOException
{
128 for (RegionServerThread t
: TEST_UTIL
.getMiniHBaseCluster().getRegionServerThreads()) {
129 for (HRegion region
: t
.getRegionServer().getRegions(TABLE_NAME
)) {
130 if (region
.get(new Get(row
), false).isEmpty()) {
138 protected static void startClusterAndCreateTable() throws Exception
{
139 TEST_UTIL
.startMiniCluster(3);
140 TEST_UTIL
.getAdmin().createTable(TableDescriptorBuilder
.newBuilder(TABLE_NAME
)
141 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
)).setRegionReplication(REPLICA_COUNT
)
142 .setCoprocessor(FailPrimaryGetCP
.class.getName()).build());
143 TEST_UTIL
.waitUntilAllRegionsAssigned(TABLE_NAME
);
144 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
147 protected static void waitUntilAllReplicasHaveRow(byte[] row
) throws IOException
{
148 // this is the fastest way to let all replicas have the row
149 TEST_UTIL
.getAdmin().disableTable(TABLE_NAME
);
150 TEST_UTIL
.getAdmin().enableTable(TABLE_NAME
);
151 TEST_UTIL
.waitFor(30000, () -> allReplicasHaveRow(row
));
155 public static void tearDownAfterClass() throws Exception
{
156 Closeables
.close(ASYNC_CONN
, true);
157 TEST_UTIL
.shutdownMiniCluster();
160 protected static int getSecondaryGetCount() {
161 return REPLICA_ID_TO_COUNT
.entrySet().stream()
162 .filter(e
-> e
.getKey().intValue() != RegionReplicaUtil
.DEFAULT_REPLICA_ID
)
163 .mapToInt(e
-> e
.getValue().get()).sum();
166 protected static int getPrimaryGetCount() {
167 AtomicInteger primaryGetCount
= REPLICA_ID_TO_COUNT
.get(RegionReplicaUtil
.DEFAULT_REPLICA_ID
);
168 return primaryGetCount
!= null ? primaryGetCount
.get() : 0;
171 // replicaId = -1 means do not set replica
172 protected abstract void readAndCheck(AsyncTable
<?
> table
, int replicaId
) throws Exception
;
175 public void testNoReplicaRead() throws Exception
{
176 FAIL_PRIMARY_GET
= false;
177 REPLICA_ID_TO_COUNT
.clear();
178 AsyncTable
<?
> table
= getTable
.get();
179 readAndCheck(table
, -1);
180 // the primary region is fine and the primary timeout is 1 second which is long enough, so we
181 // should not send any requests to secondary replicas even if the consistency is timeline.
183 assertEquals(0, getSecondaryGetCount());
187 public void testReplicaRead() throws Exception
{
188 // fail the primary get request
189 FAIL_PRIMARY_GET
= true;
190 REPLICA_ID_TO_COUNT
.clear();
191 // make sure that we could still get the value from secondary replicas
192 AsyncTable
<?
> table
= getTable
.get();
193 readAndCheck(table
, -1);
194 // make sure that the primary request has been canceled
196 int count
= getPrimaryGetCount();
198 assertEquals(count
, getPrimaryGetCount());
202 public void testReadSpecificReplica() throws Exception
{
203 FAIL_PRIMARY_GET
= false;
204 REPLICA_ID_TO_COUNT
.clear();
205 AsyncTable
<?
> table
= getTable
.get();
206 for (int replicaId
= 0; replicaId
< REPLICA_COUNT
; replicaId
++) {
207 readAndCheck(table
, replicaId
);
208 assertEquals(1, REPLICA_ID_TO_COUNT
.get(replicaId
).get());