2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import com
.google
.protobuf
.ServiceException
;
25 import java
.io
.IOException
;
26 import java
.util
.List
;
28 import java
.util
.concurrent
.ThreadLocalRandom
;
29 import java
.util
.concurrent
.atomic
.AtomicInteger
;
30 import java
.util
.concurrent
.atomic
.AtomicReference
;
31 import java
.util
.stream
.Collectors
;
32 import java
.util
.stream
.IntStream
;
33 import org
.apache
.hadoop
.conf
.Configuration
;
34 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
35 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
36 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
37 import org
.apache
.hadoop
.hbase
.HConstants
;
38 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
39 import org
.apache
.hadoop
.hbase
.ServerName
;
40 import org
.apache
.hadoop
.hbase
.TableName
;
41 import org
.apache
.hadoop
.hbase
.Waiter
;
42 import org
.apache
.hadoop
.hbase
.client
.coprocessor
.Batch
;
43 import org
.apache
.hadoop
.hbase
.coprocessor
.MultiRowMutationEndpoint
;
44 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
45 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.MultiRowMutationProtos
.MultiRowMutationService
;
46 import org
.apache
.hadoop
.hbase
.protobuf
.generated
.MultiRowMutationProtos
.MutateRowsResponse
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
48 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
49 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
50 import org
.apache
.hadoop
.hbase
.util
.ManualEnvironmentEdge
;
51 import org
.junit
.After
;
52 import org
.junit
.AfterClass
;
53 import org
.junit
.Assert
;
54 import org
.junit
.BeforeClass
;
55 import org
.junit
.ClassRule
;
56 import org
.junit
.Rule
;
57 import org
.junit
.Test
;
58 import org
.junit
.experimental
.categories
.Category
;
59 import org
.junit
.rules
.TestName
;
60 import org
.slf4j
.Logger
;
61 import org
.slf4j
.LoggerFactory
;
63 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.ResourceLeakDetector
;
64 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.ResourceLeakDetector
.Level
;
67 * This class is for testing {@link Connection}.
69 @Category({ LargeTests
.class })
70 public class TestConnection
{
73 public static final HBaseClassTestRule CLASS_RULE
=
74 HBaseClassTestRule
.forClass(TestConnection
.class);
76 private static final Logger LOG
= LoggerFactory
.getLogger(TestConnection
.class);
77 private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
79 private static final byte[] FAM_NAM
= Bytes
.toBytes("f");
80 private static final byte[] ROW
= Bytes
.toBytes("bbb");
81 private static final int RPC_RETRY
= 5;
84 public TestName name
= new TestName();
87 public static void setUpBeforeClass() throws Exception
{
88 ResourceLeakDetector
.setLevel(Level
.PARANOID
);
89 TEST_UTIL
.getConfiguration().setBoolean(HConstants
.STATUS_PUBLISHED
, true);
90 // Up the handlers; this test needs more than usual.
91 TEST_UTIL
.getConfiguration().setInt(HConstants
.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT
, 10);
92 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, RPC_RETRY
);
93 TEST_UTIL
.getConfiguration().setInt(HConstants
.REGION_SERVER_HANDLER_COUNT
, 3);
94 TEST_UTIL
.startMiniCluster(2);
99 public static void tearDownAfterClass() throws Exception
{
100 TEST_UTIL
.shutdownMiniCluster();
104 public void tearDown() throws IOException
{
105 TEST_UTIL
.getAdmin().balancerSwitch(true, true);
109 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
110 * @throws IOException Unable to construct admin
113 public void testAdminFactory() throws IOException
{
114 Connection con1
= ConnectionFactory
.createConnection(TEST_UTIL
.getConfiguration());
115 Admin admin
= con1
.getAdmin();
116 assertTrue(admin
.getConnection() == con1
);
117 assertTrue(admin
.getConfiguration() == TEST_UTIL
.getConfiguration());
122 * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
125 public void testConnectionCloseAllowsInterrupt() throws Exception
{
126 testConnectionClose(true);
130 public void testConnectionNotAllowsInterrupt() throws Exception
{
131 testConnectionClose(false);
134 private void testConnectionClose(boolean allowsInterrupt
) throws Exception
{
135 TableName tableName
= TableName
.valueOf("HCM-testConnectionClose" + allowsInterrupt
);
136 TEST_UTIL
.createTable(tableName
, FAM_NAM
).close();
138 TEST_UTIL
.getAdmin().balancerSwitch(false, true);
140 Configuration c2
= new Configuration(TEST_UTIL
.getConfiguration());
141 // We want to work on a separate connection.
142 c2
.set(HConstants
.HBASE_CLIENT_INSTANCE_ID
, String
.valueOf(-1));
143 c2
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 100); // retry a lot
144 c2
.setInt(HConstants
.HBASE_CLIENT_PAUSE
, 1); // don't wait between retries.
145 c2
.setInt(RpcClient
.FAILED_SERVER_EXPIRY_KEY
, 0); // Server do not really expire
146 c2
.setBoolean(RpcClient
.SPECIFIC_WRITE_THREAD
, allowsInterrupt
);
147 // to avoid the client to be stuck when do the Get
148 c2
.setInt(HConstants
.HBASE_CLIENT_META_OPERATION_TIMEOUT
, 10000);
149 c2
.setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 10000);
150 c2
.setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, 5000);
152 Connection connection
= ConnectionFactory
.createConnection(c2
);
153 final Table table
= connection
.getTable(tableName
);
155 Put put
= new Put(ROW
);
156 put
.addColumn(FAM_NAM
, ROW
, ROW
);
159 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
160 final AtomicInteger step
= new AtomicInteger(0);
162 final AtomicReference
<Throwable
> failed
= new AtomicReference
<>(null);
163 Thread t
= new Thread("testConnectionCloseThread") {
169 while (step
.get() == 1) {
170 Get get
= new Get(ROW
);
173 if (done
% 100 == 0) {
174 LOG
.info("done=" + done
);
176 // without the sleep, will cause the exception for too many files in
177 // org.apache.hadoop.hdfs.server.datanode.DataXceiver
180 } catch (Throwable t
) {
182 LOG
.error(t
.toString(), t
);
188 TEST_UTIL
.waitFor(20000, new Waiter
.Predicate
<Exception
>() {
190 public boolean evaluate() throws Exception
{
191 return step
.get() == 1;
196 try (RegionLocator rl
= connection
.getRegionLocator(tableName
)) {
197 sn
= rl
.getRegionLocation(ROW
).getServerName();
200 RpcClient rpcClient
= ((AsyncConnectionImpl
) connection
.toAsyncConnection()).rpcClient
;
202 LOG
.info("Going to cancel connections. connection=" + connection
.toString() + ", sn=" + sn
);
203 for (int i
= 0; i
< 500; i
++) {
204 rpcClient
.cancelConnections(sn
);
208 step
.compareAndSet(1, 2);
209 // The test may fail here if the thread doing the gets is stuck. The way to find
210 // out what's happening is to look for the thread named 'testConnectionCloseThread'
211 TEST_UTIL
.waitFor(40000, new Waiter
.Predicate
<Exception
>() {
213 public boolean evaluate() throws Exception
{
214 return step
.get() == 3;
219 Assert
.assertTrue("Unexpected exception is " + failed
.get(), failed
.get() == null);
223 * Test that connection can become idle without breaking everything.
226 public void testConnectionIdle() throws Exception
{
227 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
228 TEST_UTIL
.createTable(tableName
, FAM_NAM
).close();
229 int idleTime
= 20000;
230 boolean previousBalance
= TEST_UTIL
.getAdmin().balancerSwitch(false, true);
232 Configuration c2
= new Configuration(TEST_UTIL
.getConfiguration());
233 // We want to work on a separate connection.
234 c2
.set(HConstants
.HBASE_CLIENT_INSTANCE_ID
, String
.valueOf(-1));
235 c2
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 1); // Don't retry: retry = test failed
236 c2
.setInt(RpcClient
.IDLE_TIME
, idleTime
);
238 Connection connection
= ConnectionFactory
.createConnection(c2
);
239 final Table table
= connection
.getTable(tableName
);
241 Put put
= new Put(ROW
);
242 put
.addColumn(FAM_NAM
, ROW
, ROW
);
245 ManualEnvironmentEdge mee
= new ManualEnvironmentEdge();
246 mee
.setValue(System
.currentTimeMillis());
247 EnvironmentEdgeManager
.injectEdge(mee
);
248 LOG
.info("first get");
249 table
.get(new Get(ROW
));
251 LOG
.info("first get - changing the time & sleeping");
252 mee
.incValue(idleTime
+ 1000);
253 Thread
.sleep(1500); // we need to wait a little for the connection to be seen as idle.
254 // 1500 = sleep time in RpcClient#waitForWork + a margin
256 LOG
.info("second get - connection has been marked idle in the middle");
257 // To check that the connection actually became idle would need to read some private
258 // fields of RpcClient.
259 table
.get(new Get(ROW
));
260 mee
.incValue(idleTime
+ 1000);
262 LOG
.info("third get - connection is idle, but the reader doesn't know yet");
263 // We're testing here a special case:
264 // time limit reached BUT connection not yet reclaimed AND a new call.
265 // in this situation, we don't close the connection, instead we use it immediately.
266 // If we're very unlucky we can have a race condition in the test: the connection is already
267 // under closing when we do the get, so we have an exception, and we don't retry as the
268 // retry number is 1. The probability is very very low, and seems acceptable for now. It's
269 // a test issue only.
270 table
.get(new Get(ROW
));
272 LOG
.info("we're done - time will change back");
277 EnvironmentEdgeManager
.reset();
278 TEST_UTIL
.getAdmin().balancerSwitch(previousBalance
, true);
282 public void testClosing() throws Exception
{
283 Configuration configuration
= new Configuration(TEST_UTIL
.getConfiguration());
284 configuration
.set(HConstants
.HBASE_CLIENT_INSTANCE_ID
,
285 String
.valueOf(ThreadLocalRandom
.current().nextInt()));
287 // as connection caching is going away, now we're just testing
288 // that closed connection does actually get closed.
290 Connection c1
= ConnectionFactory
.createConnection(configuration
);
291 Connection c2
= ConnectionFactory
.createConnection(configuration
);
292 // no caching, different connections
293 assertTrue(c1
!= c2
);
295 // closing independently
297 assertTrue(c1
.isClosed());
298 assertFalse(c2
.isClosed());
301 assertTrue(c2
.isClosed());
305 * Trivial test to verify that nobody messes with
306 * {@link ConnectionFactory#createConnection(Configuration)}
309 public void testCreateConnection() throws Exception
{
310 Configuration configuration
= TEST_UTIL
.getConfiguration();
311 Connection c1
= ConnectionFactory
.createConnection(configuration
);
312 Connection c2
= ConnectionFactory
.createConnection(configuration
);
313 // created from the same configuration, yet they are different
314 assertTrue(c1
!= c2
);
315 assertTrue(c1
.getConfiguration() == c2
.getConfiguration());
319 ====> With MasterRegistry, connections cannot outlast the masters' lifetime.
321 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
322 Configuration config = new Configuration(TEST_UTIL.getConfiguration());
324 final TableName tableName = TableName.valueOf(name.getMethodName());
325 TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close();
327 Connection connection = ConnectionFactory.createConnection(config);
328 Table table = connection.getTable(tableName);
330 // this will cache the meta location and table's region location
331 table.get(new Get(Bytes.toBytes("foo")));
334 TEST_UTIL.shutdownMiniHBaseCluster();
335 TEST_UTIL.restartHBaseCluster(2);
336 // this should be able to discover new locations for meta and table's region
337 table.get(new Get(Bytes.toBytes("foo")));
338 TEST_UTIL.deleteTable(tableName);
345 public void testLocateRegionsWithRegionReplicas() throws IOException
{
346 int regionReplication
= 3;
347 byte[] family
= Bytes
.toBytes("cf");
348 TableName tableName
= TableName
.valueOf(name
.getMethodName());
350 // Create a table with region replicas
351 TableDescriptorBuilder builder
=
352 TableDescriptorBuilder
.newBuilder(tableName
).setRegionReplication(regionReplication
)
353 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(family
));
354 TEST_UTIL
.getAdmin().createTable(builder
.build());
356 try (Connection conn
= ConnectionFactory
.createConnection(TEST_UTIL
.getConfiguration());
357 RegionLocator locator
= conn
.getRegionLocator(tableName
)) {
358 // Get locations of the regions of the table
359 List
<HRegionLocation
> locations
= locator
.getAllRegionLocations();
361 // The size of the returned locations should be 3
362 assertEquals(regionReplication
, locations
.size());
364 // The replicaIds of the returned locations should be 0, 1 and 2
365 Set
<Integer
> expectedReplicaIds
=
366 IntStream
.range(0, regionReplication
).boxed().collect(Collectors
.toSet());
367 for (HRegionLocation location
: locations
) {
368 assertTrue(expectedReplicaIds
.remove(location
.getRegion().getReplicaId()));
371 TEST_UTIL
.deleteTable(tableName
);
375 @Test(expected
= DoNotRetryIOException
.class)
376 public void testClosedConnection() throws ServiceException
, Throwable
{
377 byte[] family
= Bytes
.toBytes("cf");
378 TableName tableName
= TableName
.valueOf(name
.getMethodName());
379 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
)
380 .setCoprocessor(MultiRowMutationEndpoint
.class.getName())
381 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(family
));
382 TEST_UTIL
.getAdmin().createTable(builder
.build());
384 Connection conn
= ConnectionFactory
.createConnection(TEST_UTIL
.getConfiguration());
385 // cache the location
386 try (Table table
= conn
.getTable(tableName
)) {
387 table
.get(new Get(Bytes
.toBytes(0)));
391 Batch
.Call
<MultiRowMutationService
, MutateRowsResponse
> callable
= service
-> {
392 throw new RuntimeException("Should not arrive here");
394 conn
.getTable(tableName
).coprocessorService(MultiRowMutationService
.class,
395 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
, callable
);
398 // There is no assertion, but you need to confirm that there is no resource leak output from netty
400 public void testCancelConnectionMemoryLeak() throws IOException
, InterruptedException
{
401 TableName tableName
= TableName
.valueOf(name
.getMethodName());
402 TEST_UTIL
.createTable(tableName
, FAM_NAM
).close();
403 TEST_UTIL
.getAdmin().balancerSwitch(false, true);
404 try (Connection connection
= ConnectionFactory
.createConnection(TEST_UTIL
.getConfiguration());
405 Table table
= connection
.getTable(tableName
)) {
406 table
.get(new Get(Bytes
.toBytes("1")));
407 ServerName sn
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getServerName();
408 RpcClient rpcClient
= ((AsyncConnectionImpl
) connection
.toAsyncConnection()).rpcClient
;
409 rpcClient
.cancelConnections(sn
);