2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
22 import java
.io
.IOException
;
23 import java
.util
.Arrays
;
24 import java
.util
.List
;
25 import java
.util
.Optional
;
26 import java
.util
.concurrent
.ConcurrentHashMap
;
27 import java
.util
.concurrent
.ConcurrentMap
;
28 import java
.util
.concurrent
.ForkJoinPool
;
29 import java
.util
.concurrent
.atomic
.AtomicInteger
;
30 import java
.util
.function
.Supplier
;
31 import org
.apache
.commons
.io
.IOUtils
;
32 import org
.apache
.hadoop
.hbase
.Cell
;
33 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
34 import org
.apache
.hadoop
.hbase
.TableName
;
35 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
36 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
37 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
38 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
39 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
40 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
41 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
42 import org
.junit
.AfterClass
;
43 import org
.junit
.Rule
;
44 import org
.junit
.Test
;
45 import org
.junit
.rules
.TestName
;
46 import org
.junit
.runners
.Parameterized
.Parameter
;
47 import org
.junit
.runners
.Parameterized
.Parameters
;
49 public abstract class AbstractTestAsyncTableRegionReplicasRead
{
51 protected static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
53 protected static TableName TABLE_NAME
= TableName
.valueOf("async");
55 protected static byte[] FAMILY
= Bytes
.toBytes("cf");
57 protected static byte[] QUALIFIER
= Bytes
.toBytes("cq");
59 protected static byte[] ROW
= Bytes
.toBytes("row");
61 protected static byte[] VALUE
= Bytes
.toBytes("value");
63 protected static int REPLICA_COUNT
= 3;
65 protected static AsyncConnection ASYNC_CONN
;
68 public TestName testName
= new TestName();
71 public Supplier
<AsyncTable
<?
>> getTable
;
73 private static AsyncTable
<?
> getRawTable() {
74 return ASYNC_CONN
.getTable(TABLE_NAME
);
77 private static AsyncTable
<?
> getTable() {
78 return ASYNC_CONN
.getTable(TABLE_NAME
, ForkJoinPool
.commonPool());
82 public static List
<Object
[]> params() {
84 new Supplier
<?
>[] { AbstractTestAsyncTableRegionReplicasRead
::getRawTable
},
85 new Supplier
<?
>[] { AbstractTestAsyncTableRegionReplicasRead
::getTable
});
88 protected static volatile boolean FAIL_PRIMARY_GET
= false;
90 protected static ConcurrentMap
<Integer
, AtomicInteger
> REPLICA_ID_TO_COUNT
=
91 new ConcurrentHashMap
<>();
93 public static final class FailPrimaryGetCP
implements RegionObserver
, RegionCoprocessor
{
96 public Optional
<RegionObserver
> getRegionObserver() {
97 return Optional
.of(this);
100 private void recordAndTryFail(ObserverContext
<RegionCoprocessorEnvironment
> c
)
102 RegionInfo region
= c
.getEnvironment().getRegionInfo();
103 if (!region
.getTable().equals(TABLE_NAME
)) {
106 REPLICA_ID_TO_COUNT
.computeIfAbsent(region
.getReplicaId(), k
-> new AtomicInteger())
108 if (region
.getReplicaId() == RegionReplicaUtil
.DEFAULT_REPLICA_ID
&& FAIL_PRIMARY_GET
) {
109 throw new IOException("Inject error");
114 public void preGetOp(ObserverContext
<RegionCoprocessorEnvironment
> c
, Get get
,
115 List
<Cell
> result
) throws IOException
{
120 public void preScannerOpen(ObserverContext
<RegionCoprocessorEnvironment
> c
, Scan scan
)
126 private static boolean allReplicasHaveRow(byte[] row
) throws IOException
{
127 for (RegionServerThread t
: TEST_UTIL
.getMiniHBaseCluster().getRegionServerThreads()) {
128 for (HRegion region
: t
.getRegionServer().getRegions(TABLE_NAME
)) {
129 if (region
.get(new Get(row
), false).isEmpty()) {
137 protected static void startClusterAndCreateTable() throws Exception
{
138 TEST_UTIL
.startMiniCluster(3);
139 TEST_UTIL
.getAdmin().createTable(TableDescriptorBuilder
.newBuilder(TABLE_NAME
)
140 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
)).setRegionReplication(REPLICA_COUNT
)
141 .setCoprocessor(FailPrimaryGetCP
.class.getName()).build());
142 TEST_UTIL
.waitUntilAllRegionsAssigned(TABLE_NAME
);
143 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
146 protected static void waitUntilAllReplicasHaveRow(byte[] row
) throws IOException
{
147 // this is the fastest way to let all replicas have the row
148 TEST_UTIL
.getAdmin().disableTable(TABLE_NAME
);
149 TEST_UTIL
.getAdmin().enableTable(TABLE_NAME
);
150 TEST_UTIL
.waitFor(30000, () -> allReplicasHaveRow(row
));
154 public static void tearDownAfterClass() throws Exception
{
155 IOUtils
.closeQuietly(ASYNC_CONN
);
156 TEST_UTIL
.shutdownMiniCluster();
159 protected static int getSecondaryGetCount() {
160 return REPLICA_ID_TO_COUNT
.entrySet().stream()
161 .filter(e
-> e
.getKey().intValue() != RegionReplicaUtil
.DEFAULT_REPLICA_ID
)
162 .mapToInt(e
-> e
.getValue().get()).sum();
165 protected static int getPrimaryGetCount() {
166 AtomicInteger primaryGetCount
= REPLICA_ID_TO_COUNT
.get(RegionReplicaUtil
.DEFAULT_REPLICA_ID
);
167 return primaryGetCount
!= null ? primaryGetCount
.get() : 0;
170 // replicaId = -1 means do not set replica
171 protected abstract void readAndCheck(AsyncTable
<?
> table
, int replicaId
) throws Exception
;
174 public void testNoReplicaRead() throws Exception
{
175 FAIL_PRIMARY_GET
= false;
176 REPLICA_ID_TO_COUNT
.clear();
177 AsyncTable
<?
> table
= getTable
.get();
178 readAndCheck(table
, -1);
179 // the primary region is fine and the primary timeout is 1 second which is long enough, so we
180 // should not send any requests to secondary replicas even if the consistency is timeline.
182 assertEquals(0, getSecondaryGetCount());
186 public void testReplicaRead() throws Exception
{
187 // fail the primary get request
188 FAIL_PRIMARY_GET
= true;
189 REPLICA_ID_TO_COUNT
.clear();
190 // make sure that we could still get the value from secondary replicas
191 AsyncTable
<?
> table
= getTable
.get();
192 readAndCheck(table
, -1);
193 // make sure that the primary request has been canceled
195 int count
= getPrimaryGetCount();
197 assertEquals(count
, getPrimaryGetCount());
201 public void testReadSpecificReplica() throws Exception
{
202 FAIL_PRIMARY_GET
= false;
203 REPLICA_ID_TO_COUNT
.clear();
204 AsyncTable
<?
> table
= getTable
.get();
205 for (int replicaId
= 0; replicaId
< REPLICA_COUNT
; replicaId
++) {
206 readAndCheck(table
, replicaId
);
207 assertEquals(1, REPLICA_ID_TO_COUNT
.get(replicaId
).get());