2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.ipc
;
20 import static org
.apache
.hadoop
.hbase
.trace
.HBaseSemanticAttributes
.RPC_METHOD_KEY
;
21 import static org
.apache
.hadoop
.hbase
.trace
.HBaseSemanticAttributes
.RPC_SERVICE_KEY
;
22 import io
.opentelemetry
.api
.trace
.Span
;
23 import io
.opentelemetry
.api
.trace
.StatusCode
;
24 import io
.opentelemetry
.context
.Context
;
25 import io
.opentelemetry
.context
.Scope
;
26 import java
.net
.InetSocketAddress
;
27 import java
.nio
.channels
.ClosedChannelException
;
28 import java
.util
.Optional
;
29 import org
.apache
.hadoop
.hbase
.CallDroppedException
;
30 import org
.apache
.hadoop
.hbase
.CellScanner
;
31 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
32 import org
.apache
.hadoop
.hbase
.exceptions
.TimeoutIOException
;
33 import org
.apache
.hadoop
.hbase
.monitoring
.MonitoredRPCHandler
;
34 import org
.apache
.hadoop
.hbase
.security
.User
;
35 import org
.apache
.hadoop
.hbase
.trace
.TraceUtil
;
36 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
37 import org
.apache
.hadoop
.hbase
.util
.Pair
;
38 import org
.apache
.hadoop
.util
.StringUtils
;
39 import org
.apache
.yetus
.audience
.InterfaceAudience
;
40 import org
.apache
.yetus
.audience
.InterfaceStability
;
42 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
45 * The request processing logic, which is usually executed in thread pools provided by an
46 * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained
49 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience
.COPROC
, HBaseInterfaceAudience
.PHOENIX
})
50 @InterfaceStability.Evolving
51 public class CallRunner
{
53 private static final CallDroppedException CALL_DROPPED_EXCEPTION
54 = new CallDroppedException();
57 private RpcServerInterface rpcServer
;
58 private MonitoredRPCHandler status
;
59 private volatile boolean sucessful
;
62 * On construction, adds the size of this call to the running count of outstanding call sizes.
63 * Presumption is that we are put on a queue while we wait on an executor to run us. During this
64 * time we occupy heap.
66 // The constructor is shutdown so only RpcServer in this class can make one of these.
67 CallRunner(final RpcServerInterface rpcServer
, final RpcCall call
) {
69 this.rpcServer
= rpcServer
;
70 // Add size of the call to queue size.
71 if (call
!= null && rpcServer
!= null) {
72 this.rpcServer
.addCallSize(call
.getSize());
76 public RpcCall
getRpcCall() {
80 public void setStatus(MonitoredRPCHandler status
) {
85 * Cleanup after ourselves... let go of references.
87 private void cleanup() {
90 this.rpcServer
= null;
93 private String
getServiceName() {
94 return call
.getService() != null ? call
.getService().getDescriptorForType().getName() : "";
97 private String
getMethodName() {
98 return call
.getMethod() != null ? call
.getMethod().getName() : "";
103 if (call
.disconnectSince() >= 0) {
104 if (RpcServer
.LOG
.isDebugEnabled()) {
105 RpcServer
.LOG
.debug(Thread
.currentThread().getName() + ": skipped " + call
);
109 call
.setStartTime(EnvironmentEdgeManager
.currentTime());
110 if (call
.getStartTime() > call
.getDeadline()) {
111 RpcServer
.LOG
.warn("Dropping timed out call: " + call
);
114 this.status
.setStatus("Setting up call");
115 this.status
.setConnection(call
.getRemoteAddress().getHostAddress(), call
.getRemotePort());
116 if (RpcServer
.LOG
.isTraceEnabled()) {
117 Optional
<User
> remoteUser
= call
.getRequestUser();
118 RpcServer
.LOG
.trace(call
.toShortString() + " executing as " +
119 (remoteUser
.isPresent() ? remoteUser
.get().getName() : "NULL principal"));
121 Throwable errorThrowable
= null;
123 Pair
<Message
, CellScanner
> resultPair
= null;
124 RpcServer
.CurCall
.set(call
);
125 String serviceName
= getServiceName();
126 String methodName
= getMethodName();
127 Span span
= TraceUtil
.getGlobalTracer().spanBuilder("RpcServer.callMethod")
128 .setParent(Context
.current().with(((ServerCall
<?
>) call
).getSpan())).startSpan()
129 .setAttribute(RPC_SERVICE_KEY
, serviceName
)
130 .setAttribute(RPC_METHOD_KEY
, methodName
);
131 try (Scope traceScope
= span
.makeCurrent()) {
132 if (!this.rpcServer
.isStarted()) {
133 InetSocketAddress address
= rpcServer
.getListenerAddress();
134 throw new ServerNotRunningYetException("Server " +
135 (address
!= null ? address
: "(channel closed)") + " is not running yet");
138 resultPair
= this.rpcServer
.call(call
, this.status
);
139 } catch (TimeoutIOException e
){
140 RpcServer
.LOG
.warn("Can not complete this request in time, drop it: " + call
);
141 TraceUtil
.setError(span
, e
);
143 } catch (Throwable e
) {
144 TraceUtil
.setError(span
, e
);
145 if (e
instanceof ServerNotRunningYetException
) {
146 // If ServerNotRunningYetException, don't spew stack trace.
147 if (RpcServer
.LOG
.isTraceEnabled()) {
148 RpcServer
.LOG
.trace(call
.toShortString(), e
);
151 // Don't dump full exception.. just String version
152 RpcServer
.LOG
.debug(call
.toShortString() + ", exception=" + e
);
155 error
= StringUtils
.stringifyException(e
);
156 if (e
instanceof Error
) {
160 RpcServer
.CurCall
.set(null);
161 if (resultPair
!= null) {
162 this.rpcServer
.addCallSize(call
.getSize() * -1);
163 span
.setStatus(StatusCode
.OK
);
168 this.status
.markComplete("To send response");
169 // return back the RPC request read BB we can do here. It is done by now.
172 Message param
= resultPair
!= null ? resultPair
.getFirst() : null;
173 CellScanner cells
= resultPair
!= null ? resultPair
.getSecond() : null;
174 call
.setResponse(param
, cells
, errorThrowable
, error
);
175 call
.sendResponseIfReady();
176 } catch (OutOfMemoryError e
) {
177 if (this.rpcServer
.getErrorHandler() != null) {
178 if (this.rpcServer
.getErrorHandler().checkOOME(e
)) {
179 RpcServer
.LOG
.info(Thread
.currentThread().getName() + ": exiting on OutOfMemoryError");
183 // rethrow if no handler
186 } catch (ClosedChannelException cce
) {
187 InetSocketAddress address
= rpcServer
.getListenerAddress();
188 RpcServer
.LOG
.warn(Thread
.currentThread().getName() + ": caught a ClosedChannelException, " +
189 "this means that the server " + (address
!= null ? address
: "(channel closed)") +
190 " was processing a request but the client went away. The error message was: " +
192 } catch (Exception e
) {
193 RpcServer
.LOG
.warn(Thread
.currentThread().getName()
194 + ": caught: " + StringUtils
.stringifyException(e
));
197 this.rpcServer
.addCallSize(call
.getSize() * -1);
200 if (this.status
.isRPCRunning()) {
201 this.status
.markComplete("Call error");
203 this.status
.pause("Waiting for a call");
209 * When we want to drop this call because of server is overloaded.
213 if (call
.disconnectSince() >= 0) {
214 if (RpcServer
.LOG
.isDebugEnabled()) {
215 RpcServer
.LOG
.debug(Thread
.currentThread().getName() + ": skipped " + call
);
221 InetSocketAddress address
= rpcServer
.getListenerAddress();
222 call
.setResponse(null, null, CALL_DROPPED_EXCEPTION
, "Call dropped, server "
223 + (address
!= null ? address
: "(channel closed)") + " is overloaded, please retry.");
224 call
.sendResponseIfReady();
225 } catch (ClosedChannelException cce
) {
226 InetSocketAddress address
= rpcServer
.getListenerAddress();
227 RpcServer
.LOG
.warn(Thread
.currentThread().getName() + ": caught a ClosedChannelException, " +
228 "this means that the server " + (address
!= null ? address
: "(channel closed)") +
229 " was processing a request but the client went away. The error message was: " +
231 } catch (Exception e
) {
232 RpcServer
.LOG
.warn(Thread
.currentThread().getName()
233 + ": caught: " + StringUtils
.stringifyException(e
));
236 this.rpcServer
.addCallSize(call
.getSize() * -1);