2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.util
.List
;
27 import java
.util
.concurrent
.ThreadLocalRandom
;
28 import java
.util
.concurrent
.atomic
.AtomicInteger
;
29 import java
.util
.concurrent
.atomic
.AtomicReference
;
30 import java
.util
.stream
.Collectors
;
31 import java
.util
.stream
.IntStream
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
34 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
35 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
36 import org
.apache
.hadoop
.hbase
.HConstants
;
37 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
38 import org
.apache
.hadoop
.hbase
.ServerName
;
39 import org
.apache
.hadoop
.hbase
.TableName
;
40 import org
.apache
.hadoop
.hbase
.Waiter
;
41 import org
.apache
.hadoop
.hbase
.client
.coprocessor
.Batch
;
42 import org
.apache
.hadoop
.hbase
.coprocessor
.MultiRowMutationEndpoint
;
43 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
44 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
45 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
46 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
47 import org
.apache
.hadoop
.hbase
.util
.ManualEnvironmentEdge
;
48 import org
.junit
.After
;
49 import org
.junit
.AfterClass
;
50 import org
.junit
.Assert
;
51 import org
.junit
.BeforeClass
;
52 import org
.junit
.ClassRule
;
53 import org
.junit
.Rule
;
54 import org
.junit
.Test
;
55 import org
.junit
.experimental
.categories
.Category
;
56 import org
.junit
.rules
.TestName
;
57 import org
.slf4j
.Logger
;
58 import org
.slf4j
.LoggerFactory
;
60 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
61 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.ResourceLeakDetector
;
62 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.ResourceLeakDetector
.Level
;
64 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MultiRowMutationProtos
.MultiRowMutationService
;
65 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MultiRowMutationProtos
.MutateRowsResponse
;
68 * This class is for testing {@link Connection}.
70 @Category({ LargeTests
.class })
71 public class TestConnection
{
74 public static final HBaseClassTestRule CLASS_RULE
=
75 HBaseClassTestRule
.forClass(TestConnection
.class);
77 private static final Logger LOG
= LoggerFactory
.getLogger(TestConnection
.class);
78 private final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
80 private static final byte[] FAM_NAM
= Bytes
.toBytes("f");
81 private static final byte[] ROW
= Bytes
.toBytes("bbb");
82 private static final int RPC_RETRY
= 5;
85 public TestName name
= new TestName();
88 public static void setUpBeforeClass() throws Exception
{
89 ResourceLeakDetector
.setLevel(Level
.PARANOID
);
90 TEST_UTIL
.getConfiguration().setBoolean(HConstants
.STATUS_PUBLISHED
, true);
91 // Up the handlers; this test needs more than usual.
92 TEST_UTIL
.getConfiguration().setInt(HConstants
.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT
, 10);
93 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, RPC_RETRY
);
94 TEST_UTIL
.getConfiguration().setInt(HConstants
.REGION_SERVER_HANDLER_COUNT
, 3);
95 TEST_UTIL
.startMiniCluster(2);
100 public static void tearDownAfterClass() throws Exception
{
101 TEST_UTIL
.shutdownMiniCluster();
105 public void tearDown() throws IOException
{
106 TEST_UTIL
.getAdmin().balancerSwitch(true, true);
110 * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
111 * @throws IOException Unable to construct admin
114 public void testAdminFactory() throws IOException
{
115 Connection con1
= ConnectionFactory
.createConnection(TEST_UTIL
.getConfiguration());
116 Admin admin
= con1
.getAdmin();
117 assertTrue(admin
.getConnection() == con1
);
118 assertTrue(admin
.getConfiguration() == TEST_UTIL
.getConfiguration());
123 * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
126 public void testConnectionCloseAllowsInterrupt() throws Exception
{
127 testConnectionClose(true);
131 public void testConnectionNotAllowsInterrupt() throws Exception
{
132 testConnectionClose(false);
135 private void testConnectionClose(boolean allowsInterrupt
) throws Exception
{
136 TableName tableName
= TableName
.valueOf("HCM-testConnectionClose" + allowsInterrupt
);
137 TEST_UTIL
.createTable(tableName
, FAM_NAM
).close();
139 TEST_UTIL
.getAdmin().balancerSwitch(false, true);
141 Configuration c2
= new Configuration(TEST_UTIL
.getConfiguration());
142 // We want to work on a separate connection.
143 c2
.set(HConstants
.HBASE_CLIENT_INSTANCE_ID
, String
.valueOf(-1));
144 c2
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 100); // retry a lot
145 c2
.setInt(HConstants
.HBASE_CLIENT_PAUSE
, 1); // don't wait between retries.
146 c2
.setInt(RpcClient
.FAILED_SERVER_EXPIRY_KEY
, 0); // Server do not really expire
147 c2
.setBoolean(RpcClient
.SPECIFIC_WRITE_THREAD
, allowsInterrupt
);
148 // to avoid the client to be stuck when do the Get
149 c2
.setInt(HConstants
.HBASE_CLIENT_META_OPERATION_TIMEOUT
, 10000);
150 c2
.setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 10000);
151 c2
.setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, 5000);
153 Connection connection
= ConnectionFactory
.createConnection(c2
);
154 final Table table
= connection
.getTable(tableName
);
156 Put put
= new Put(ROW
);
157 put
.addColumn(FAM_NAM
, ROW
, ROW
);
160 // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
161 final AtomicInteger step
= new AtomicInteger(0);
163 final AtomicReference
<Throwable
> failed
= new AtomicReference
<>(null);
164 Thread t
= new Thread("testConnectionCloseThread") {
170 while (step
.get() == 1) {
171 Get get
= new Get(ROW
);
174 if (done
% 100 == 0) {
175 LOG
.info("done=" + done
);
177 // without the sleep, will cause the exception for too many files in
178 // org.apache.hadoop.hdfs.server.datanode.DataXceiver
181 } catch (Throwable t
) {
183 LOG
.error(t
.toString(), t
);
189 TEST_UTIL
.waitFor(20000, new Waiter
.Predicate
<Exception
>() {
191 public boolean evaluate() throws Exception
{
192 return step
.get() == 1;
197 try (RegionLocator rl
= connection
.getRegionLocator(tableName
)) {
198 sn
= rl
.getRegionLocation(ROW
).getServerName();
201 RpcClient rpcClient
= ((AsyncConnectionImpl
) connection
.toAsyncConnection()).rpcClient
;
203 LOG
.info("Going to cancel connections. connection=" + connection
.toString() + ", sn=" + sn
);
204 for (int i
= 0; i
< 500; i
++) {
205 rpcClient
.cancelConnections(sn
);
209 step
.compareAndSet(1, 2);
210 // The test may fail here if the thread doing the gets is stuck. The way to find
211 // out what's happening is to look for the thread named 'testConnectionCloseThread'
212 TEST_UTIL
.waitFor(40000, new Waiter
.Predicate
<Exception
>() {
214 public boolean evaluate() throws Exception
{
215 return step
.get() == 3;
220 Assert
.assertTrue("Unexpected exception is " + failed
.get(), failed
.get() == null);
224 * Test that connection can become idle without breaking everything.
227 public void testConnectionIdle() throws Exception
{
228 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
229 TEST_UTIL
.createTable(tableName
, FAM_NAM
).close();
230 int idleTime
= 20000;
231 boolean previousBalance
= TEST_UTIL
.getAdmin().balancerSwitch(false, true);
233 Configuration c2
= new Configuration(TEST_UTIL
.getConfiguration());
234 // We want to work on a separate connection.
235 c2
.set(HConstants
.HBASE_CLIENT_INSTANCE_ID
, String
.valueOf(-1));
236 c2
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 1); // Don't retry: retry = test failed
237 c2
.setInt(RpcClient
.IDLE_TIME
, idleTime
);
239 Connection connection
= ConnectionFactory
.createConnection(c2
);
240 final Table table
= connection
.getTable(tableName
);
242 Put put
= new Put(ROW
);
243 put
.addColumn(FAM_NAM
, ROW
, ROW
);
246 ManualEnvironmentEdge mee
= new ManualEnvironmentEdge();
247 mee
.setValue(EnvironmentEdgeManager
.currentTime());
248 EnvironmentEdgeManager
.injectEdge(mee
);
249 LOG
.info("first get");
250 table
.get(new Get(ROW
));
252 LOG
.info("first get - changing the time & sleeping");
253 mee
.incValue(idleTime
+ 1000);
254 Thread
.sleep(1500); // we need to wait a little for the connection to be seen as idle.
255 // 1500 = sleep time in RpcClient#waitForWork + a margin
257 LOG
.info("second get - connection has been marked idle in the middle");
258 // To check that the connection actually became idle would need to read some private
259 // fields of RpcClient.
260 table
.get(new Get(ROW
));
261 mee
.incValue(idleTime
+ 1000);
263 LOG
.info("third get - connection is idle, but the reader doesn't know yet");
264 // We're testing here a special case:
265 // time limit reached BUT connection not yet reclaimed AND a new call.
266 // in this situation, we don't close the connection, instead we use it immediately.
267 // If we're very unlucky we can have a race condition in the test: the connection is already
268 // under closing when we do the get, so we have an exception, and we don't retry as the
269 // retry number is 1. The probability is very very low, and seems acceptable for now. It's
270 // a test issue only.
271 table
.get(new Get(ROW
));
273 LOG
.info("we're done - time will change back");
278 EnvironmentEdgeManager
.reset();
279 TEST_UTIL
.getAdmin().balancerSwitch(previousBalance
, true);
283 public void testClosing() throws Exception
{
284 Configuration configuration
= new Configuration(TEST_UTIL
.getConfiguration());
285 configuration
.set(HConstants
.HBASE_CLIENT_INSTANCE_ID
,
286 String
.valueOf(ThreadLocalRandom
.current().nextInt()));
288 // as connection caching is going away, now we're just testing
289 // that closed connection does actually get closed.
291 Connection c1
= ConnectionFactory
.createConnection(configuration
);
292 Connection c2
= ConnectionFactory
.createConnection(configuration
);
293 // no caching, different connections
294 assertTrue(c1
!= c2
);
296 // closing independently
298 assertTrue(c1
.isClosed());
299 assertFalse(c2
.isClosed());
302 assertTrue(c2
.isClosed());
306 * Trivial test to verify that nobody messes with
307 * {@link ConnectionFactory#createConnection(Configuration)}
310 public void testCreateConnection() throws Exception
{
311 Configuration configuration
= TEST_UTIL
.getConfiguration();
312 Connection c1
= ConnectionFactory
.createConnection(configuration
);
313 Connection c2
= ConnectionFactory
.createConnection(configuration
);
314 // created from the same configuration, yet they are different
315 assertTrue(c1
!= c2
);
316 assertTrue(c1
.getConfiguration() == c2
.getConfiguration());
320 ====> With MasterRegistry, connections cannot outlast the masters' lifetime.
322 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
323 Configuration config = new Configuration(TEST_UTIL.getConfiguration());
325 final TableName tableName = TableName.valueOf(name.getMethodName());
326 TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close();
328 Connection connection = ConnectionFactory.createConnection(config);
329 Table table = connection.getTable(tableName);
331 // this will cache the meta location and table's region location
332 table.get(new Get(Bytes.toBytes("foo")));
335 TEST_UTIL.shutdownMiniHBaseCluster();
336 TEST_UTIL.restartHBaseCluster(2);
337 // this should be able to discover new locations for meta and table's region
338 table.get(new Get(Bytes.toBytes("foo")));
339 TEST_UTIL.deleteTable(tableName);
346 public void testLocateRegionsWithRegionReplicas() throws IOException
{
347 int regionReplication
= 3;
348 byte[] family
= Bytes
.toBytes("cf");
349 TableName tableName
= TableName
.valueOf(name
.getMethodName());
351 // Create a table with region replicas
352 TableDescriptorBuilder builder
=
353 TableDescriptorBuilder
.newBuilder(tableName
).setRegionReplication(regionReplication
)
354 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(family
));
355 TEST_UTIL
.getAdmin().createTable(builder
.build());
357 try (Connection conn
= ConnectionFactory
.createConnection(TEST_UTIL
.getConfiguration());
358 RegionLocator locator
= conn
.getRegionLocator(tableName
)) {
359 // Get locations of the regions of the table
360 List
<HRegionLocation
> locations
= locator
.getAllRegionLocations();
362 // The size of the returned locations should be 3
363 assertEquals(regionReplication
, locations
.size());
365 // The replicaIds of the returned locations should be 0, 1 and 2
366 Set
<Integer
> expectedReplicaIds
=
367 IntStream
.range(0, regionReplication
).boxed().collect(Collectors
.toSet());
368 for (HRegionLocation location
: locations
) {
369 assertTrue(expectedReplicaIds
.remove(location
.getRegion().getReplicaId()));
372 TEST_UTIL
.deleteTable(tableName
);
376 @Test(expected
= DoNotRetryIOException
.class)
377 public void testClosedConnection() throws ServiceException
, Throwable
{
378 byte[] family
= Bytes
.toBytes("cf");
379 TableName tableName
= TableName
.valueOf(name
.getMethodName());
380 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
)
381 .setCoprocessor(MultiRowMutationEndpoint
.class.getName())
382 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(family
));
383 TEST_UTIL
.getAdmin().createTable(builder
.build());
385 Connection conn
= ConnectionFactory
.createConnection(TEST_UTIL
.getConfiguration());
386 // cache the location
387 try (Table table
= conn
.getTable(tableName
)) {
388 table
.get(new Get(Bytes
.toBytes(0)));
392 Batch
.Call
<MultiRowMutationService
, MutateRowsResponse
> callable
= service
-> {
393 throw new RuntimeException("Should not arrive here");
395 conn
.getTable(tableName
).coprocessorService(MultiRowMutationService
.class,
396 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
, callable
);
399 // There is no assertion, but you need to confirm that there is no resource leak output from netty
401 public void testCancelConnectionMemoryLeak() throws IOException
, InterruptedException
{
402 TableName tableName
= TableName
.valueOf(name
.getMethodName());
403 TEST_UTIL
.createTable(tableName
, FAM_NAM
).close();
404 TEST_UTIL
.getAdmin().balancerSwitch(false, true);
405 try (Connection connection
= ConnectionFactory
.createConnection(TEST_UTIL
.getConfiguration());
406 Table table
= connection
.getTable(tableName
)) {
407 table
.get(new Get(Bytes
.toBytes("1")));
408 ServerName sn
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getServerName();
409 RpcClient rpcClient
= ((AsyncConnectionImpl
) connection
.toAsyncConnection()).rpcClient
;
410 rpcClient
.cancelConnections(sn
);