2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.ipc
;
20 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtil
.fam1
;
21 import static org
.junit
.Assert
.assertTrue
;
22 import static org
.junit
.Assert
.fail
;
24 import java
.io
.IOException
;
25 import java
.net
.Socket
;
26 import java
.net
.SocketAddress
;
27 import java
.util
.concurrent
.BlockingQueue
;
28 import java
.util
.concurrent
.LinkedBlockingQueue
;
29 import org
.apache
.hadoop
.conf
.Configuration
;
30 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.client
.Connection
;
35 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
36 import org
.apache
.hadoop
.hbase
.client
.Get
;
37 import org
.apache
.hadoop
.hbase
.client
.MetricsConnection
;
38 import org
.apache
.hadoop
.hbase
.client
.RetriesExhaustedException
;
39 import org
.apache
.hadoop
.hbase
.client
.Table
;
40 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
41 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
42 import org
.junit
.AfterClass
;
43 import org
.junit
.BeforeClass
;
44 import org
.junit
.ClassRule
;
45 import org
.junit
.Rule
;
46 import org
.junit
.Test
;
47 import org
.junit
.experimental
.categories
.Category
;
48 import org
.junit
.rules
.TestName
;
49 import org
.slf4j
.Logger
;
50 import org
.slf4j
.LoggerFactory
;
52 @Category(MediumTests
.class)
53 public class TestRpcClientLeaks
{
56 public static final HBaseClassTestRule CLASS_RULE
=
57 HBaseClassTestRule
.forClass(TestRpcClientLeaks
.class);
60 public TestName name
= new TestName();
62 private static BlockingQueue
<Socket
> SAVED_SOCKETS
= new LinkedBlockingQueue
<>();
64 public static class MyRpcClientImpl
extends BlockingRpcClient
{
66 // Exceptions thrown only when this is set to false.
67 private static boolean throwException
= false;
69 public MyRpcClientImpl(Configuration conf
) {
73 public MyRpcClientImpl(Configuration conf
, String clusterId
, SocketAddress address
,
74 MetricsConnection metrics
) {
75 super(conf
, clusterId
, address
, metrics
);
79 protected BlockingRpcConnection
createConnection(ConnectionId remoteId
) throws IOException
{
80 return new BlockingRpcConnection(this, remoteId
) {
82 protected synchronized void setupConnection() throws IOException
{
83 super.setupConnection();
85 SAVED_SOCKETS
.add(socket
);
86 throw new IOException(
87 "Sample exception for verifying socket closure in case of exceptions.");
93 public static void enableThrowExceptions() {
94 throwException
= true;
98 private static final HBaseTestingUtil UTIL
= new HBaseTestingUtil();
101 public static void setup() throws Exception
{
102 UTIL
.startMiniCluster();
106 public static void teardown() throws Exception
{
107 UTIL
.shutdownMiniCluster();
110 public static final Logger LOG
= LoggerFactory
.getLogger(TestRpcClientLeaks
.class);
113 public void testSocketClosed() throws IOException
, InterruptedException
{
114 TableName tableName
= TableName
.valueOf(name
.getMethodName());
115 UTIL
.createTable(tableName
, fam1
).close();
117 Configuration conf
= new Configuration(UTIL
.getConfiguration());
118 conf
.set(RpcClientFactory
.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY
, MyRpcClientImpl
.class.getName());
119 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
120 try (Connection connection
= ConnectionFactory
.createConnection(conf
);
121 Table table
= connection
.getTable(TableName
.valueOf(name
.getMethodName()))) {
122 MyRpcClientImpl
.enableThrowExceptions();
123 table
.get(new Get(Bytes
.toBytes("asd")));
124 fail("Should fail because the injected error");
125 } catch (RetriesExhaustedException e
) {
128 for (Socket socket
: SAVED_SOCKETS
) {
129 assertTrue("Socket " + socket
+ " is not closed", socket
.isClosed());