HBASE-26688 Threads shared EMPTY_RESULT may lead to unexpected client job down. ...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / ipc / NettyRpcDuplexHandler.java
blobc67d96f0a7567eaedf4f116326669300b0c9abbb
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.ipc;
20 import io.opentelemetry.context.Scope;
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.Map;
24 import org.apache.hadoop.hbase.CellScanner;
25 import org.apache.hadoop.hbase.codec.Codec;
26 import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
27 import org.apache.hadoop.io.compress.CompressionCodec;
28 import org.apache.hadoop.ipc.RemoteException;
29 import org.apache.yetus.audience.InterfaceAudience;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
34 import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
35 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
36 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
37 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
38 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
39 import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
40 import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
41 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
42 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
43 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
44 import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
46 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
47 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
48 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
49 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
51 /**
52 * The netty rpc handler.
53 * @since 2.0.0
55 @InterfaceAudience.Private
56 class NettyRpcDuplexHandler extends ChannelDuplexHandler {
58 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcDuplexHandler.class);
60 private final NettyRpcConnection conn;
62 private final CellBlockBuilder cellBlockBuilder;
64 private final Codec codec;
66 private final CompressionCodec compressor;
68 private final Map<Integer, Call> id2Call = new HashMap<>();
70 public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder,
71 Codec codec, CompressionCodec compressor) {
72 this.conn = conn;
73 this.cellBlockBuilder = cellBlockBuilder;
74 this.codec = codec;
75 this.compressor = compressor;
79 private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise)
80 throws IOException {
81 id2Call.put(call.id, call);
82 ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, call.cells, ctx.alloc());
83 CellBlockMeta cellBlockMeta;
84 if (cellBlock != null) {
85 CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder();
86 cellBlockMetaBuilder.setLength(cellBlock.writerIndex());
87 cellBlockMeta = cellBlockMetaBuilder.build();
88 } else {
89 cellBlockMeta = null;
91 RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta);
92 int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param);
93 int totalSize = cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex()
94 : sizeWithoutCellBlock;
95 ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4);
96 buf.writeInt(totalSize);
97 try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf)) {
98 requestHeader.writeDelimitedTo(bbos);
99 if (call.param != null) {
100 call.param.writeDelimitedTo(bbos);
102 if (cellBlock != null) {
103 ChannelPromise withoutCellBlockPromise = ctx.newPromise();
104 ctx.write(buf, withoutCellBlockPromise);
105 ChannelPromise cellBlockPromise = ctx.newPromise();
106 ctx.write(cellBlock, cellBlockPromise);
107 PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
108 combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise);
109 combiner.finish(promise);
110 } else {
111 ctx.write(buf, promise);
116 @Override
117 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
118 throws Exception {
119 if (msg instanceof Call) {
120 Call call = (Call) msg;
121 try (Scope scope = call.span.makeCurrent()) {
122 writeRequest(ctx, call, promise);
124 } else {
125 ctx.write(msg, promise);
129 private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {
130 int totalSize = buf.readInt();
131 ByteBufInputStream in = new ByteBufInputStream(buf);
132 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
133 int id = responseHeader.getCallId();
134 if (LOG.isTraceEnabled()) {
135 LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader)
136 + ", totalSize: " + totalSize + " bytes");
138 RemoteException remoteExc;
139 if (responseHeader.hasException()) {
140 ExceptionResponse exceptionResponse = responseHeader.getException();
141 remoteExc = IPCUtil.createRemoteException(exceptionResponse);
142 if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
143 // Here we will cleanup all calls so do not need to fall back, just return.
144 exceptionCaught(ctx, remoteExc);
145 return;
147 } else {
148 remoteExc = null;
150 Call call = id2Call.remove(id);
151 if (call == null) {
152 // So we got a response for which we have no corresponding 'call' here on the client-side.
153 // We probably timed out waiting, cleaned up all references, and now the server decides
154 // to return a response. There is nothing we can do w/ the response at this stage. Clean
155 // out the wire of the response so its out of the way and we can get other responses on
156 // this connection.
157 if (LOG.isDebugEnabled()) {
158 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
159 int whatIsLeftToRead = totalSize - readSoFar;
160 LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead
161 + " bytes");
163 return;
165 if (remoteExc != null) {
166 call.setException(remoteExc);
167 return;
169 Message value;
170 if (call.responseDefaultType != null) {
171 Builder builder = call.responseDefaultType.newBuilderForType();
172 builder.mergeDelimitedFrom(in);
173 value = builder.build();
174 } else {
175 value = null;
177 CellScanner cellBlockScanner;
178 if (responseHeader.hasCellBlockMeta()) {
179 int size = responseHeader.getCellBlockMeta().getLength();
180 // Maybe we could read directly from the ByteBuf.
181 // The problem here is that we do not know when to release it.
182 byte[] cellBlock = new byte[size];
183 buf.readBytes(cellBlock);
184 cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
185 } else {
186 cellBlockScanner = null;
188 call.setResponse(value, cellBlockScanner);
191 @Override
192 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
193 if (msg instanceof ByteBuf) {
194 ByteBuf buf = (ByteBuf) msg;
195 try {
196 readResponse(ctx, buf);
197 } finally {
198 buf.release();
200 } else {
201 super.channelRead(ctx, msg);
205 private void cleanupCalls(ChannelHandlerContext ctx, IOException error) {
206 for (Call call : id2Call.values()) {
207 call.setException(error);
209 id2Call.clear();
212 @Override
213 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
214 if (!id2Call.isEmpty()) {
215 cleanupCalls(ctx, new ConnectionClosedException("Connection closed"));
217 conn.shutdown();
218 ctx.fireChannelInactive();
221 @Override
222 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
223 if (!id2Call.isEmpty()) {
224 cleanupCalls(ctx, IPCUtil.toIOE(cause));
226 conn.shutdown();
229 @Override
230 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
231 if (evt instanceof IdleStateEvent) {
232 IdleStateEvent idleEvt = (IdleStateEvent) evt;
233 switch (idleEvt.state()) {
234 case WRITER_IDLE:
235 if (id2Call.isEmpty()) {
236 if (LOG.isTraceEnabled()) {
237 LOG.trace("shutdown connection to " + conn.remoteId().address
238 + " because idle for a long time");
240 // It may happen that there are still some pending calls in the event loop queue and
241 // they will get a closed channel exception. But this is not a big deal as it rarely
242 // rarely happens and the upper layer could retry immediately.
243 conn.shutdown();
245 break;
246 default:
247 LOG.warn("Unrecognized idle state " + idleEvt.state());
248 break;
250 } else if (evt instanceof CallEvent) {
251 // just remove the call for now until we add other call event other than timeout and cancel.
252 id2Call.remove(((CallEvent) evt).call.id);
253 } else {
254 ctx.fireUserEventTriggered(evt);