HBASE-26688 Threads shared EMPTY_RESULT may lead to unexpected client job down. ...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / ipc / NettyRpcConnection.java
blobd0a13ca33d6c5ae38c4d17942a5ad3bd5c280044
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.ipc;
20 import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
21 import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
22 import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
23 import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
24 import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
26 import java.io.IOException;
27 import java.net.InetSocketAddress;
28 import java.net.UnknownHostException;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ThreadLocalRandom;
32 import java.util.concurrent.TimeUnit;
33 import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
34 import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
35 import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
36 import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
37 import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
38 import org.apache.hadoop.hbase.util.Threads;
39 import org.apache.hadoop.security.UserGroupInformation;
40 import org.apache.yetus.audience.InterfaceAudience;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
45 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
46 import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
47 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
48 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
49 import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
50 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
51 import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
52 import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
53 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
54 import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
55 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
56 import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
57 import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
58 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
59 import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
60 import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
61 import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
62 import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
63 import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
65 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
67 /**
68 * RPC connection implementation based on netty.
69 * <p/>
70 * Most operations are executed in handlers. Netty handler is always executed in the same
71 * thread(EventLoop) so no lock is needed.
72 * <p/>
73 * <strong>Implementation assumptions:</strong> All the private methods should be called in the
74 * {@link #eventLoop} thread, otherwise there will be races.
75 * @since 2.0.0
77 @InterfaceAudience.Private
78 class NettyRpcConnection extends RpcConnection {
80 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
82 private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
83 .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
84 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
86 private final NettyRpcClient rpcClient;
88 // the event loop used to set up the connection, we will also execute other operations for this
89 // connection in this event loop, to avoid locking everywhere.
90 private final EventLoop eventLoop;
92 private ByteBuf connectionHeaderPreamble;
94 private ByteBuf connectionHeaderWithLength;
96 // make it volatile so in the isActive method below we do not need to switch to the event loop
97 // thread to access this field.
98 private volatile Channel channel;
100 NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
101 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
102 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
103 rpcClient.metrics);
104 this.rpcClient = rpcClient;
105 this.eventLoop = rpcClient.group.next();
106 byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
107 this.connectionHeaderPreamble =
108 Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
109 ConnectionHeader header = getConnectionHeader();
110 this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
111 this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
112 header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
115 @Override
116 protected void callTimeout(Call call) {
117 execute(eventLoop, () -> {
118 if (channel != null) {
119 channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
124 @Override
125 public boolean isActive() {
126 return channel != null;
129 private void shutdown0() {
130 assert eventLoop.inEventLoop();
131 if (channel != null) {
132 channel.close();
133 channel = null;
137 @Override
138 public void shutdown() {
139 execute(eventLoop, this::shutdown0);
142 @Override
143 public void cleanupConnection() {
144 execute(eventLoop, () -> {
145 if (connectionHeaderPreamble != null) {
146 ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
147 connectionHeaderPreamble = null;
149 if (connectionHeaderWithLength != null) {
150 ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
151 connectionHeaderWithLength = null;
156 private void established(Channel ch) throws IOException {
157 assert eventLoop.inEventLoop();
158 ChannelPipeline p = ch.pipeline();
159 String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
160 p.addBefore(addBeforeHandler, null,
161 new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
162 p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
163 p.addBefore(addBeforeHandler, null,
164 new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
165 p.fireUserEventTriggered(BufferCallEvent.success());
168 private boolean reloginInProgress;
170 private void scheduleRelogin(Throwable error) {
171 assert eventLoop.inEventLoop();
172 if (error instanceof FallbackDisallowedException) {
173 return;
175 if (!provider.canRetry()) {
176 LOG.trace("SASL Provider does not support retries");
177 return;
179 if (reloginInProgress) {
180 return;
182 reloginInProgress = true;
183 RELOGIN_EXECUTOR.schedule(() -> {
184 try {
185 provider.relogin();
186 } catch (IOException e) {
187 LOG.warn("Relogin failed", e);
189 eventLoop.execute(() -> {
190 reloginInProgress = false;
192 }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
195 private void failInit(Channel ch, IOException e) {
196 assert eventLoop.inEventLoop();
197 // fail all pending calls
198 ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
199 shutdown0();
202 private void saslNegotiate(final Channel ch) {
203 assert eventLoop.inEventLoop();
204 UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
205 if (ticket == null) {
206 failInit(ch, new FatalConnectionException("ticket/user is null"));
207 return;
209 Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
210 final NettyHBaseSaslRpcClientHandler saslHandler;
211 try {
212 saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
213 ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
214 rpcClient.fallbackAllowed, this.rpcClient.conf);
215 } catch (IOException e) {
216 failInit(ch, e);
217 return;
219 ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
220 saslPromise.addListener(new FutureListener<Boolean>() {
222 @Override
223 public void operationComplete(Future<Boolean> future) throws Exception {
224 if (future.isSuccess()) {
225 ChannelPipeline p = ch.pipeline();
226 p.remove(SaslChallengeDecoder.class);
227 p.remove(NettyHBaseSaslRpcClientHandler.class);
229 // check if negotiate with server for connection header is necessary
230 if (saslHandler.isNeedProcessConnectionHeader()) {
231 Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
232 // create the handler to handle the connection header
233 ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
234 connectionHeaderPromise, conf, connectionHeaderWithLength);
236 // add ReadTimeoutHandler to deal with server doesn't response connection header
237 // because of the different configuration in client side and server side
238 p.addFirst(
239 new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
240 p.addLast(chHandler);
241 connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
242 @Override
243 public void operationComplete(Future<Boolean> future) throws Exception {
244 if (future.isSuccess()) {
245 ChannelPipeline p = ch.pipeline();
246 p.remove(ReadTimeoutHandler.class);
247 p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
248 // don't send connection header, NettyHbaseRpcConnectionHeaderHandler
249 // sent it already
250 established(ch);
251 } else {
252 final Throwable error = future.cause();
253 scheduleRelogin(error);
254 failInit(ch, toIOE(error));
258 } else {
259 // send the connection header to server
260 ch.write(connectionHeaderWithLength.retainedDuplicate());
261 established(ch);
263 } else {
264 final Throwable error = future.cause();
265 scheduleRelogin(error);
266 failInit(ch, toIOE(error));
272 private void connect() throws UnknownHostException {
273 assert eventLoop.inEventLoop();
274 LOG.trace("Connecting to {}", remoteId.getAddress());
275 InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
276 this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
277 .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
278 .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
279 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
280 .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
281 .remoteAddress(remoteAddr).connect().addListener(new ChannelFutureListener() {
283 @Override
284 public void operationComplete(ChannelFuture future) throws Exception {
285 Channel ch = future.channel();
286 if (!future.isSuccess()) {
287 failInit(ch, toIOE(future.cause()));
288 rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), future.cause());
289 return;
291 ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
292 if (useSasl) {
293 saslNegotiate(ch);
294 } else {
295 // send the connection header to server
296 ch.write(connectionHeaderWithLength.retainedDuplicate());
297 established(ch);
300 }).channel();
303 private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException {
304 assert eventLoop.inEventLoop();
305 if (reloginInProgress) {
306 throw new IOException("Can not send request because relogin is in progress.");
308 hrc.notifyOnCancel(new RpcCallback<Object>() {
310 @Override
311 public void run(Object parameter) {
312 setCancelled(call);
313 if (channel != null) {
314 channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
317 }, new CancellationCallback() {
319 @Override
320 public void run(boolean cancelled) throws IOException {
321 if (cancelled) {
322 setCancelled(call);
323 } else {
324 if (channel == null) {
325 connect();
327 scheduleTimeoutTask(call);
328 channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
330 @Override
331 public void operationComplete(ChannelFuture future) throws Exception {
332 // Fail the call if we failed to write it out. This usually because the channel is
333 // closed. This is needed because we may shutdown the channel inside event loop and
334 // there may still be some pending calls in the event loop queue after us.
335 if (!future.isSuccess()) {
336 call.setException(toIOE(future.cause()));
345 @Override
346 public void sendRequest(final Call call, HBaseRpcController hrc) {
347 execute(eventLoop, () -> {
348 try {
349 sendRequest0(call, hrc);
350 } catch (Exception e) {
351 call.setException(toIOE(e));