2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.ipc
;
20 import io
.opentelemetry
.context
.Scope
;
21 import java
.io
.IOException
;
22 import java
.util
.HashMap
;
24 import org
.apache
.hadoop
.hbase
.CellScanner
;
25 import org
.apache
.hadoop
.hbase
.codec
.Codec
;
26 import org
.apache
.hadoop
.hbase
.exceptions
.ConnectionClosedException
;
27 import org
.apache
.hadoop
.io
.compress
.CompressionCodec
;
28 import org
.apache
.hadoop
.ipc
.RemoteException
;
29 import org
.apache
.yetus
.audience
.InterfaceAudience
;
30 import org
.slf4j
.Logger
;
31 import org
.slf4j
.LoggerFactory
;
33 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
34 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
.Builder
;
35 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.TextFormat
;
36 import org
.apache
.hbase
.thirdparty
.io
.netty
.buffer
.ByteBuf
;
37 import org
.apache
.hbase
.thirdparty
.io
.netty
.buffer
.ByteBufInputStream
;
38 import org
.apache
.hbase
.thirdparty
.io
.netty
.buffer
.ByteBufOutputStream
;
39 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelDuplexHandler
;
40 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelFuture
;
41 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelHandlerContext
;
42 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelPromise
;
43 import org
.apache
.hbase
.thirdparty
.io
.netty
.handler
.timeout
.IdleStateEvent
;
44 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.concurrent
.PromiseCombiner
;
46 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RPCProtos
.CellBlockMeta
;
47 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RPCProtos
.ExceptionResponse
;
48 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RPCProtos
.RequestHeader
;
49 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RPCProtos
.ResponseHeader
;
52 * The netty rpc handler.
55 @InterfaceAudience.Private
56 class NettyRpcDuplexHandler
extends ChannelDuplexHandler
{
58 private static final Logger LOG
= LoggerFactory
.getLogger(NettyRpcDuplexHandler
.class);
60 private final NettyRpcConnection conn
;
62 private final CellBlockBuilder cellBlockBuilder
;
64 private final Codec codec
;
66 private final CompressionCodec compressor
;
68 private final Map
<Integer
, Call
> id2Call
= new HashMap
<>();
70 public NettyRpcDuplexHandler(NettyRpcConnection conn
, CellBlockBuilder cellBlockBuilder
,
71 Codec codec
, CompressionCodec compressor
) {
73 this.cellBlockBuilder
= cellBlockBuilder
;
75 this.compressor
= compressor
;
79 private void writeRequest(ChannelHandlerContext ctx
, Call call
, ChannelPromise promise
)
81 id2Call
.put(call
.id
, call
);
82 ByteBuf cellBlock
= cellBlockBuilder
.buildCellBlock(codec
, compressor
, call
.cells
, ctx
.alloc());
83 CellBlockMeta cellBlockMeta
;
84 if (cellBlock
!= null) {
85 CellBlockMeta
.Builder cellBlockMetaBuilder
= CellBlockMeta
.newBuilder();
86 cellBlockMetaBuilder
.setLength(cellBlock
.writerIndex());
87 cellBlockMeta
= cellBlockMetaBuilder
.build();
91 RequestHeader requestHeader
= IPCUtil
.buildRequestHeader(call
, cellBlockMeta
);
92 int sizeWithoutCellBlock
= IPCUtil
.getTotalSizeWhenWrittenDelimited(requestHeader
, call
.param
);
93 int totalSize
= cellBlock
!= null ? sizeWithoutCellBlock
+ cellBlock
.writerIndex()
94 : sizeWithoutCellBlock
;
95 ByteBuf buf
= ctx
.alloc().buffer(sizeWithoutCellBlock
+ 4);
96 buf
.writeInt(totalSize
);
97 try (ByteBufOutputStream bbos
= new ByteBufOutputStream(buf
)) {
98 requestHeader
.writeDelimitedTo(bbos
);
99 if (call
.param
!= null) {
100 call
.param
.writeDelimitedTo(bbos
);
102 if (cellBlock
!= null) {
103 ChannelPromise withoutCellBlockPromise
= ctx
.newPromise();
104 ctx
.write(buf
, withoutCellBlockPromise
);
105 ChannelPromise cellBlockPromise
= ctx
.newPromise();
106 ctx
.write(cellBlock
, cellBlockPromise
);
107 PromiseCombiner combiner
= new PromiseCombiner(ctx
.executor());
108 combiner
.addAll((ChannelFuture
) withoutCellBlockPromise
, cellBlockPromise
);
109 combiner
.finish(promise
);
111 ctx
.write(buf
, promise
);
117 public void write(ChannelHandlerContext ctx
, Object msg
, ChannelPromise promise
)
119 if (msg
instanceof Call
) {
120 Call call
= (Call
) msg
;
121 try (Scope scope
= call
.span
.makeCurrent()) {
122 writeRequest(ctx
, call
, promise
);
125 ctx
.write(msg
, promise
);
129 private void readResponse(ChannelHandlerContext ctx
, ByteBuf buf
) throws IOException
{
130 int totalSize
= buf
.readInt();
131 ByteBufInputStream in
= new ByteBufInputStream(buf
);
132 ResponseHeader responseHeader
= ResponseHeader
.parseDelimitedFrom(in
);
133 int id
= responseHeader
.getCallId();
134 if (LOG
.isTraceEnabled()) {
135 LOG
.trace("got response header " + TextFormat
.shortDebugString(responseHeader
)
136 + ", totalSize: " + totalSize
+ " bytes");
138 RemoteException remoteExc
;
139 if (responseHeader
.hasException()) {
140 ExceptionResponse exceptionResponse
= responseHeader
.getException();
141 remoteExc
= IPCUtil
.createRemoteException(exceptionResponse
);
142 if (IPCUtil
.isFatalConnectionException(exceptionResponse
)) {
143 // Here we will cleanup all calls so do not need to fall back, just return.
144 exceptionCaught(ctx
, remoteExc
);
150 Call call
= id2Call
.remove(id
);
152 // So we got a response for which we have no corresponding 'call' here on the client-side.
153 // We probably timed out waiting, cleaned up all references, and now the server decides
154 // to return a response. There is nothing we can do w/ the response at this stage. Clean
155 // out the wire of the response so its out of the way and we can get other responses on
157 if (LOG
.isDebugEnabled()) {
158 int readSoFar
= IPCUtil
.getTotalSizeWhenWrittenDelimited(responseHeader
);
159 int whatIsLeftToRead
= totalSize
- readSoFar
;
160 LOG
.debug("Unknown callId: " + id
+ ", skipping over this response of " + whatIsLeftToRead
165 if (remoteExc
!= null) {
166 call
.setException(remoteExc
);
170 if (call
.responseDefaultType
!= null) {
171 Builder builder
= call
.responseDefaultType
.newBuilderForType();
172 builder
.mergeDelimitedFrom(in
);
173 value
= builder
.build();
177 CellScanner cellBlockScanner
;
178 if (responseHeader
.hasCellBlockMeta()) {
179 int size
= responseHeader
.getCellBlockMeta().getLength();
180 // Maybe we could read directly from the ByteBuf.
181 // The problem here is that we do not know when to release it.
182 byte[] cellBlock
= new byte[size
];
183 buf
.readBytes(cellBlock
);
184 cellBlockScanner
= cellBlockBuilder
.createCellScanner(this.codec
, this.compressor
, cellBlock
);
186 cellBlockScanner
= null;
188 call
.setResponse(value
, cellBlockScanner
);
192 public void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{
193 if (msg
instanceof ByteBuf
) {
194 ByteBuf buf
= (ByteBuf
) msg
;
196 readResponse(ctx
, buf
);
201 super.channelRead(ctx
, msg
);
205 private void cleanupCalls(ChannelHandlerContext ctx
, IOException error
) {
206 for (Call call
: id2Call
.values()) {
207 call
.setException(error
);
213 public void channelInactive(ChannelHandlerContext ctx
) throws Exception
{
214 if (!id2Call
.isEmpty()) {
215 cleanupCalls(ctx
, new ConnectionClosedException("Connection closed"));
218 ctx
.fireChannelInactive();
222 public void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) {
223 if (!id2Call
.isEmpty()) {
224 cleanupCalls(ctx
, IPCUtil
.toIOE(cause
));
230 public void userEventTriggered(ChannelHandlerContext ctx
, Object evt
) throws Exception
{
231 if (evt
instanceof IdleStateEvent
) {
232 IdleStateEvent idleEvt
= (IdleStateEvent
) evt
;
233 switch (idleEvt
.state()) {
235 if (id2Call
.isEmpty()) {
236 if (LOG
.isTraceEnabled()) {
237 LOG
.trace("shutdown connection to " + conn
.remoteId().address
238 + " because idle for a long time");
240 // It may happen that there are still some pending calls in the event loop queue and
241 // they will get a closed channel exception. But this is not a big deal as it rarely
242 // rarely happens and the upper layer could retry immediately.
247 LOG
.warn("Unrecognized idle state " + idleEvt
.state());
250 } else if (evt
instanceof CallEvent
) {
251 // just remove the call for now until we add other call event other than timeout and cancel.
252 id2Call
.remove(((CallEvent
) evt
).call
.id
);
254 ctx
.fireUserEventTriggered(evt
);