2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.ipc
;
20 import static org
.apache
.hadoop
.hbase
.ipc
.CallEvent
.Type
.CANCELLED
;
21 import static org
.apache
.hadoop
.hbase
.ipc
.CallEvent
.Type
.TIMEOUT
;
22 import static org
.apache
.hadoop
.hbase
.ipc
.IPCUtil
.execute
;
23 import static org
.apache
.hadoop
.hbase
.ipc
.IPCUtil
.setCancelled
;
24 import static org
.apache
.hadoop
.hbase
.ipc
.IPCUtil
.toIOE
;
26 import java
.io
.IOException
;
27 import java
.net
.InetSocketAddress
;
28 import java
.net
.UnknownHostException
;
29 import java
.util
.concurrent
.Executors
;
30 import java
.util
.concurrent
.ScheduledExecutorService
;
31 import java
.util
.concurrent
.ThreadLocalRandom
;
32 import java
.util
.concurrent
.TimeUnit
;
33 import org
.apache
.hadoop
.hbase
.ipc
.BufferCallBeforeInitHandler
.BufferCallEvent
;
34 import org
.apache
.hadoop
.hbase
.ipc
.HBaseRpcController
.CancellationCallback
;
35 import org
.apache
.hadoop
.hbase
.security
.NettyHBaseRpcConnectionHeaderHandler
;
36 import org
.apache
.hadoop
.hbase
.security
.NettyHBaseSaslRpcClientHandler
;
37 import org
.apache
.hadoop
.hbase
.security
.SaslChallengeDecoder
;
38 import org
.apache
.hadoop
.hbase
.util
.Threads
;
39 import org
.apache
.hadoop
.security
.UserGroupInformation
;
40 import org
.apache
.yetus
.audience
.InterfaceAudience
;
41 import org
.slf4j
.Logger
;
42 import org
.slf4j
.LoggerFactory
;
44 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
45 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcCallback
;
46 import org
.apache
.hbase
.thirdparty
.io
.netty
.bootstrap
.Bootstrap
;
47 import org
.apache
.hbase
.thirdparty
.io
.netty
.buffer
.ByteBuf
;
48 import org
.apache
.hbase
.thirdparty
.io
.netty
.buffer
.ByteBufOutputStream
;
49 import org
.apache
.hbase
.thirdparty
.io
.netty
.buffer
.Unpooled
;
50 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.Channel
;
51 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelFuture
;
52 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelFutureListener
;
53 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelHandler
;
54 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelOption
;
55 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelPipeline
;
56 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.EventLoop
;
57 import org
.apache
.hbase
.thirdparty
.io
.netty
.handler
.codec
.LengthFieldBasedFrameDecoder
;
58 import org
.apache
.hbase
.thirdparty
.io
.netty
.handler
.timeout
.IdleStateHandler
;
59 import org
.apache
.hbase
.thirdparty
.io
.netty
.handler
.timeout
.ReadTimeoutHandler
;
60 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.ReferenceCountUtil
;
61 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.concurrent
.Future
;
62 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.concurrent
.FutureListener
;
63 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.concurrent
.Promise
;
65 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RPCProtos
.ConnectionHeader
;
68 * RPC connection implementation based on netty.
70 * Most operations are executed in handlers. Netty handler is always executed in the same
71 * thread(EventLoop) so no lock is needed.
73 * <strong>Implementation assumptions:</strong> All the private methods should be called in the
74 * {@link #eventLoop} thread, otherwise there will be races.
77 @InterfaceAudience.Private
78 class NettyRpcConnection
extends RpcConnection
{
80 private static final Logger LOG
= LoggerFactory
.getLogger(NettyRpcConnection
.class);
82 private static final ScheduledExecutorService RELOGIN_EXECUTOR
= Executors
83 .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
84 .setDaemon(true).setUncaughtExceptionHandler(Threads
.LOGGING_EXCEPTION_HANDLER
).build());
86 private final NettyRpcClient rpcClient
;
88 // the event loop used to set up the connection, we will also execute other operations for this
89 // connection in this event loop, to avoid locking everywhere.
90 private final EventLoop eventLoop
;
92 private ByteBuf connectionHeaderPreamble
;
94 private ByteBuf connectionHeaderWithLength
;
96 // make it volatile so in the isActive method below we do not need to switch to the event loop
97 // thread to access this field.
98 private volatile Channel channel
;
100 NettyRpcConnection(NettyRpcClient rpcClient
, ConnectionId remoteId
) throws IOException
{
101 super(rpcClient
.conf
, AbstractRpcClient
.WHEEL_TIMER
, remoteId
, rpcClient
.clusterId
,
102 rpcClient
.userProvider
.isHBaseSecurityEnabled(), rpcClient
.codec
, rpcClient
.compressor
,
104 this.rpcClient
= rpcClient
;
105 this.eventLoop
= rpcClient
.group
.next();
106 byte[] connectionHeaderPreamble
= getConnectionHeaderPreamble();
107 this.connectionHeaderPreamble
=
108 Unpooled
.directBuffer(connectionHeaderPreamble
.length
).writeBytes(connectionHeaderPreamble
);
109 ConnectionHeader header
= getConnectionHeader();
110 this.connectionHeaderWithLength
= Unpooled
.directBuffer(4 + header
.getSerializedSize());
111 this.connectionHeaderWithLength
.writeInt(header
.getSerializedSize());
112 header
.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength
));
116 protected void callTimeout(Call call
) {
117 execute(eventLoop
, () -> {
118 if (channel
!= null) {
119 channel
.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT
, call
));
125 public boolean isActive() {
126 return channel
!= null;
129 private void shutdown0() {
130 assert eventLoop
.inEventLoop();
131 if (channel
!= null) {
138 public void shutdown() {
139 execute(eventLoop
, this::shutdown0
);
143 public void cleanupConnection() {
144 execute(eventLoop
, () -> {
145 if (connectionHeaderPreamble
!= null) {
146 ReferenceCountUtil
.safeRelease(connectionHeaderPreamble
);
147 connectionHeaderPreamble
= null;
149 if (connectionHeaderWithLength
!= null) {
150 ReferenceCountUtil
.safeRelease(connectionHeaderWithLength
);
151 connectionHeaderWithLength
= null;
156 private void established(Channel ch
) throws IOException
{
157 assert eventLoop
.inEventLoop();
158 ChannelPipeline p
= ch
.pipeline();
159 String addBeforeHandler
= p
.context(BufferCallBeforeInitHandler
.class).name();
160 p
.addBefore(addBeforeHandler
, null,
161 new IdleStateHandler(0, rpcClient
.minIdleTimeBeforeClose
, 0, TimeUnit
.MILLISECONDS
));
162 p
.addBefore(addBeforeHandler
, null, new LengthFieldBasedFrameDecoder(Integer
.MAX_VALUE
, 0, 4));
163 p
.addBefore(addBeforeHandler
, null,
164 new NettyRpcDuplexHandler(this, rpcClient
.cellBlockBuilder
, codec
, compressor
));
165 p
.fireUserEventTriggered(BufferCallEvent
.success());
168 private boolean reloginInProgress
;
170 private void scheduleRelogin(Throwable error
) {
171 assert eventLoop
.inEventLoop();
172 if (error
instanceof FallbackDisallowedException
) {
175 if (!provider
.canRetry()) {
176 LOG
.trace("SASL Provider does not support retries");
179 if (reloginInProgress
) {
182 reloginInProgress
= true;
183 RELOGIN_EXECUTOR
.schedule(() -> {
186 } catch (IOException e
) {
187 LOG
.warn("Relogin failed", e
);
189 eventLoop
.execute(() -> {
190 reloginInProgress
= false;
192 }, ThreadLocalRandom
.current().nextInt(reloginMaxBackoff
), TimeUnit
.MILLISECONDS
);
195 private void failInit(Channel ch
, IOException e
) {
196 assert eventLoop
.inEventLoop();
197 // fail all pending calls
198 ch
.pipeline().fireUserEventTriggered(BufferCallEvent
.fail(e
));
202 private void saslNegotiate(final Channel ch
) {
203 assert eventLoop
.inEventLoop();
204 UserGroupInformation ticket
= provider
.getRealUser(remoteId
.getTicket());
205 if (ticket
== null) {
206 failInit(ch
, new FatalConnectionException("ticket/user is null"));
209 Promise
<Boolean
> saslPromise
= ch
.eventLoop().newPromise();
210 final NettyHBaseSaslRpcClientHandler saslHandler
;
212 saslHandler
= new NettyHBaseSaslRpcClientHandler(saslPromise
, ticket
, provider
, token
,
213 ((InetSocketAddress
) ch
.remoteAddress()).getAddress(), securityInfo
,
214 rpcClient
.fallbackAllowed
, this.rpcClient
.conf
);
215 } catch (IOException e
) {
219 ch
.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler
);
220 saslPromise
.addListener(new FutureListener
<Boolean
>() {
223 public void operationComplete(Future
<Boolean
> future
) throws Exception
{
224 if (future
.isSuccess()) {
225 ChannelPipeline p
= ch
.pipeline();
226 p
.remove(SaslChallengeDecoder
.class);
227 p
.remove(NettyHBaseSaslRpcClientHandler
.class);
229 // check if negotiate with server for connection header is necessary
230 if (saslHandler
.isNeedProcessConnectionHeader()) {
231 Promise
<Boolean
> connectionHeaderPromise
= ch
.eventLoop().newPromise();
232 // create the handler to handle the connection header
233 ChannelHandler chHandler
= new NettyHBaseRpcConnectionHeaderHandler(
234 connectionHeaderPromise
, conf
, connectionHeaderWithLength
);
236 // add ReadTimeoutHandler to deal with server doesn't response connection header
237 // because of the different configuration in client side and server side
239 new ReadTimeoutHandler(RpcClient
.DEFAULT_SOCKET_TIMEOUT_READ
, TimeUnit
.MILLISECONDS
));
240 p
.addLast(chHandler
);
241 connectionHeaderPromise
.addListener(new FutureListener
<Boolean
>() {
243 public void operationComplete(Future
<Boolean
> future
) throws Exception
{
244 if (future
.isSuccess()) {
245 ChannelPipeline p
= ch
.pipeline();
246 p
.remove(ReadTimeoutHandler
.class);
247 p
.remove(NettyHBaseRpcConnectionHeaderHandler
.class);
248 // don't send connection header, NettyHbaseRpcConnectionHeaderHandler
252 final Throwable error
= future
.cause();
253 scheduleRelogin(error
);
254 failInit(ch
, toIOE(error
));
259 // send the connection header to server
260 ch
.write(connectionHeaderWithLength
.retainedDuplicate());
264 final Throwable error
= future
.cause();
265 scheduleRelogin(error
);
266 failInit(ch
, toIOE(error
));
272 private void connect() throws UnknownHostException
{
273 assert eventLoop
.inEventLoop();
274 LOG
.trace("Connecting to {}", remoteId
.getAddress());
275 InetSocketAddress remoteAddr
= getRemoteInetAddress(rpcClient
.metrics
);
276 this.channel
= new Bootstrap().group(eventLoop
).channel(rpcClient
.channelClass
)
277 .option(ChannelOption
.TCP_NODELAY
, rpcClient
.isTcpNoDelay())
278 .option(ChannelOption
.SO_KEEPALIVE
, rpcClient
.tcpKeepAlive
)
279 .option(ChannelOption
.CONNECT_TIMEOUT_MILLIS
, rpcClient
.connectTO
)
280 .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient
.localAddr
)
281 .remoteAddress(remoteAddr
).connect().addListener(new ChannelFutureListener() {
284 public void operationComplete(ChannelFuture future
) throws Exception
{
285 Channel ch
= future
.channel();
286 if (!future
.isSuccess()) {
287 failInit(ch
, toIOE(future
.cause()));
288 rpcClient
.failedServers
.addToFailedServers(remoteId
.getAddress(), future
.cause());
291 ch
.writeAndFlush(connectionHeaderPreamble
.retainedDuplicate());
295 // send the connection header to server
296 ch
.write(connectionHeaderWithLength
.retainedDuplicate());
303 private void sendRequest0(Call call
, HBaseRpcController hrc
) throws IOException
{
304 assert eventLoop
.inEventLoop();
305 if (reloginInProgress
) {
306 throw new IOException("Can not send request because relogin is in progress.");
308 hrc
.notifyOnCancel(new RpcCallback
<Object
>() {
311 public void run(Object parameter
) {
313 if (channel
!= null) {
314 channel
.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED
, call
));
317 }, new CancellationCallback() {
320 public void run(boolean cancelled
) throws IOException
{
324 if (channel
== null) {
327 scheduleTimeoutTask(call
);
328 channel
.writeAndFlush(call
).addListener(new ChannelFutureListener() {
331 public void operationComplete(ChannelFuture future
) throws Exception
{
332 // Fail the call if we failed to write it out. This usually because the channel is
333 // closed. This is needed because we may shutdown the channel inside event loop and
334 // there may still be some pending calls in the event loop queue after us.
335 if (!future
.isSuccess()) {
336 call
.setException(toIOE(future
.cause()));
346 public void sendRequest(final Call call
, HBaseRpcController hrc
) {
347 execute(eventLoop
, () -> {
349 sendRequest0(call
, hrc
);
350 } catch (Exception e
) {
351 call
.setException(toIOE(e
));