2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.ipc
;
20 import java
.io
.IOException
;
21 import java
.net
.InetSocketAddress
;
22 import java
.net
.UnknownHostException
;
23 import java
.util
.concurrent
.TimeUnit
;
25 import org
.apache
.hadoop
.conf
.Configuration
;
26 import org
.apache
.hadoop
.hbase
.HConstants
;
27 import org
.apache
.hadoop
.hbase
.client
.MetricsConnection
;
28 import org
.apache
.hadoop
.hbase
.codec
.Codec
;
29 import org
.apache
.hadoop
.hbase
.net
.Address
;
30 import org
.apache
.hadoop
.hbase
.security
.SecurityInfo
;
31 import org
.apache
.hadoop
.hbase
.security
.User
;
32 import org
.apache
.hadoop
.hbase
.security
.provider
.SaslClientAuthenticationProvider
;
33 import org
.apache
.hadoop
.hbase
.security
.provider
.SaslClientAuthenticationProviders
;
34 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
35 import org
.apache
.hadoop
.hbase
.util
.Pair
;
36 import org
.apache
.hadoop
.io
.compress
.CompressionCodec
;
37 import org
.apache
.hadoop
.security
.token
.Token
;
38 import org
.apache
.hadoop
.security
.token
.TokenIdentifier
;
39 import org
.apache
.yetus
.audience
.InterfaceAudience
;
40 import org
.slf4j
.Logger
;
41 import org
.slf4j
.LoggerFactory
;
43 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.HashedWheelTimer
;
44 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.Timeout
;
45 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.TimerTask
;
46 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
47 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RPCProtos
.ConnectionHeader
;
48 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RPCProtos
.UserInformation
;
51 * Base class for ipc connection.
53 @InterfaceAudience.Private
54 abstract class RpcConnection
{
56 private static final Logger LOG
= LoggerFactory
.getLogger(RpcConnection
.class);
58 protected final ConnectionId remoteId
;
60 protected final boolean useSasl
;
62 protected final Token
<?
extends TokenIdentifier
> token
;
64 protected final SecurityInfo securityInfo
;
66 protected final int reloginMaxBackoff
; // max pause before relogin on sasl failure
68 protected final Codec codec
;
70 protected final CompressionCodec compressor
;
72 protected final MetricsConnection metrics
;
74 protected final HashedWheelTimer timeoutTimer
;
76 protected final Configuration conf
;
78 protected static String CRYPTO_AES_ENABLED_KEY
= "hbase.rpc.crypto.encryption.aes.enabled";
80 protected static boolean CRYPTO_AES_ENABLED_DEFAULT
= false;
82 // the last time we were picked up from connection pool.
83 protected long lastTouched
;
85 protected SaslClientAuthenticationProvider provider
;
87 protected RpcConnection(Configuration conf
, HashedWheelTimer timeoutTimer
, ConnectionId remoteId
,
88 String clusterId
, boolean isSecurityEnabled
, Codec codec
, CompressionCodec compressor
,
89 MetricsConnection metrics
) throws IOException
{
90 this.timeoutTimer
= timeoutTimer
;
92 this.compressor
= compressor
;
94 this.metrics
= metrics
;
95 User ticket
= remoteId
.getTicket();
96 this.securityInfo
= SecurityInfo
.getInfo(remoteId
.getServiceName());
97 this.useSasl
= isSecurityEnabled
;
99 // Choose the correct Token and AuthenticationProvider for this client to use
100 SaslClientAuthenticationProviders providers
=
101 SaslClientAuthenticationProviders
.getInstance(conf
);
102 Pair
<SaslClientAuthenticationProvider
, Token
<?
extends TokenIdentifier
>> pair
;
103 if (useSasl
&& securityInfo
!= null) {
104 pair
= providers
.selectProvider(clusterId
, ticket
);
106 if (LOG
.isTraceEnabled()) {
107 LOG
.trace("Found no valid authentication method from providers={} with tokens={}",
108 providers
.toString(), ticket
.getTokens());
110 throw new RuntimeException("Found no valid authentication method from options");
112 } else if (!useSasl
) {
113 // Hack, while SIMPLE doesn't go via SASL.
114 pair
= providers
.getSimpleProvider();
116 throw new RuntimeException("Could not compute valid client authentication provider");
119 this.provider
= pair
.getFirst();
120 this.token
= pair
.getSecond();
122 LOG
.debug("Using {} authentication for service={}, sasl={}",
123 provider
.getSaslAuthMethod().getName(), remoteId
.serviceName
, useSasl
);
124 reloginMaxBackoff
= conf
.getInt("hbase.security.relogin.maxbackoff", 5000);
125 this.remoteId
= remoteId
;
128 protected final void scheduleTimeoutTask(final Call call
) {
129 if (call
.timeout
> 0) {
130 call
.timeoutTask
= timeoutTimer
.newTimeout(new TimerTask() {
133 public void run(Timeout timeout
) throws Exception
{
134 call
.setTimeout(new CallTimeoutException(call
.toShortString() + ", waitTime="
135 + (EnvironmentEdgeManager
.currentTime() - call
.getStartTime()) + "ms, rpcTimeout="
136 + call
.timeout
+ "ms"));
139 }, call
.timeout
, TimeUnit
.MILLISECONDS
);
143 protected final byte[] getConnectionHeaderPreamble() {
144 // Assemble the preamble up in a buffer first and then send it. Writing individual elements,
145 // they are getting sent across piecemeal according to wireshark and then server is messing
146 // up the reading on occasion (the passed in stream is not buffered yet).
148 // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE
149 int rpcHeaderLen
= HConstants
.RPC_HEADER
.length
;
150 byte[] preamble
= new byte[rpcHeaderLen
+ 2];
151 System
.arraycopy(HConstants
.RPC_HEADER
, 0, preamble
, 0, rpcHeaderLen
);
152 preamble
[rpcHeaderLen
] = HConstants
.RPC_CURRENT_VERSION
;
153 synchronized (this) {
154 preamble
[rpcHeaderLen
+ 1] = provider
.getSaslAuthMethod().getCode();
159 protected final ConnectionHeader
getConnectionHeader() {
160 final ConnectionHeader
.Builder builder
= ConnectionHeader
.newBuilder();
161 builder
.setServiceName(remoteId
.getServiceName());
162 final UserInformation userInfoPB
= provider
.getUserInfo(remoteId
.ticket
);
163 if (userInfoPB
!= null) {
164 builder
.setUserInfo(userInfoPB
);
166 if (this.codec
!= null) {
167 builder
.setCellBlockCodecClass(this.codec
.getClass().getCanonicalName());
169 if (this.compressor
!= null) {
170 builder
.setCellBlockCompressorClass(this.compressor
.getClass().getCanonicalName());
172 builder
.setVersionInfo(ProtobufUtil
.getVersionInfo());
173 boolean isCryptoAESEnable
= conf
.getBoolean(CRYPTO_AES_ENABLED_KEY
, CRYPTO_AES_ENABLED_DEFAULT
);
174 // if Crypto AES enable, setup Cipher transformation
175 if (isCryptoAESEnable
) {
176 builder
.setRpcCryptoCipherTransformation(
177 conf
.get("hbase.rpc.crypto.encryption.aes.cipher.transform", "AES/CTR/NoPadding"));
179 return builder
.build();
182 protected final InetSocketAddress
getRemoteInetAddress(MetricsConnection metrics
)
183 throws UnknownHostException
{
184 if (metrics
!= null) {
185 metrics
.incrNsLookups();
187 InetSocketAddress remoteAddr
= Address
.toSocketAddress(remoteId
.getAddress());
188 if (remoteAddr
.isUnresolved()) {
189 if (metrics
!= null) {
190 metrics
.incrNsLookupsFailed();
192 throw new UnknownHostException(remoteId
.getAddress() + " could not be resolved");
197 protected abstract void callTimeout(Call call
);
199 public ConnectionId
remoteId() {
203 public long getLastTouched() {
207 public void setLastTouched(long lastTouched
) {
208 this.lastTouched
= lastTouched
;
212 * Tell the idle connection sweeper whether we could be swept.
214 public abstract boolean isActive();
217 * Just close connection. Do not need to remove from connection pool.
219 public abstract void shutdown();
221 public abstract void sendRequest(Call call
, HBaseRpcController hrc
) throws IOException
;
224 * Does the clean up work after the connection is removed from the connection pool
226 public abstract void cleanupConnection();