2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.ipc
;
20 import io
.opentelemetry
.api
.trace
.Span
;
21 import io
.opentelemetry
.api
.trace
.StatusCode
;
22 import io
.opentelemetry
.context
.Scope
;
23 import java
.net
.InetSocketAddress
;
24 import java
.nio
.channels
.ClosedChannelException
;
25 import org
.apache
.hadoop
.hbase
.CallDroppedException
;
26 import org
.apache
.hadoop
.hbase
.CellScanner
;
27 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
28 import org
.apache
.hadoop
.hbase
.exceptions
.TimeoutIOException
;
29 import org
.apache
.hadoop
.hbase
.monitoring
.MonitoredRPCHandler
;
30 import org
.apache
.hadoop
.hbase
.security
.User
;
31 import org
.apache
.hadoop
.hbase
.server
.trace
.IpcServerSpanBuilder
;
32 import org
.apache
.hadoop
.hbase
.trace
.TraceUtil
;
33 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
34 import org
.apache
.hadoop
.hbase
.util
.Pair
;
35 import org
.apache
.hadoop
.util
.StringUtils
;
36 import org
.apache
.yetus
.audience
.InterfaceAudience
;
37 import org
.apache
.yetus
.audience
.InterfaceStability
;
38 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
41 * The request processing logic, which is usually executed in thread pools provided by an
42 * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained
45 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience
.COPROC
, HBaseInterfaceAudience
.PHOENIX
})
46 @InterfaceStability.Evolving
47 public class CallRunner
{
49 private static final CallDroppedException CALL_DROPPED_EXCEPTION
50 = new CallDroppedException();
53 private RpcServerInterface rpcServer
;
54 private MonitoredRPCHandler status
;
55 private final Span span
;
56 private volatile boolean successful
;
59 * On construction, adds the size of this call to the running count of outstanding call sizes.
60 * Presumption is that we are put on a queue while we wait on an executor to run us. During this
61 * time we occupy heap.
63 // The constructor is shutdown so only RpcServer in this class can make one of these.
64 CallRunner(final RpcServerInterface rpcServer
, final RpcCall call
) {
66 this.rpcServer
= rpcServer
;
67 this.span
= Span
.current();
68 // Add size of the call to queue size.
69 if (call
!= null && rpcServer
!= null) {
70 this.rpcServer
.addCallSize(call
.getSize());
74 public RpcCall
getRpcCall() {
78 public void setStatus(MonitoredRPCHandler status
) {
83 * Cleanup after ourselves... let go of references.
85 private void cleanup() {
88 this.rpcServer
= null;
92 try (Scope ignored
= span
.makeCurrent()) {
93 if (call
.disconnectSince() >= 0) {
94 RpcServer
.LOG
.debug("{}: skipped {}", Thread
.currentThread().getName(), call
);
95 span
.addEvent("Client disconnect detected");
96 span
.setStatus(StatusCode
.OK
);
99 call
.setStartTime(EnvironmentEdgeManager
.currentTime());
100 if (call
.getStartTime() > call
.getDeadline()) {
101 RpcServer
.LOG
.warn("Dropping timed out call: {}", call
);
102 this.rpcServer
.getMetrics().callTimedOut();
103 span
.addEvent("Call deadline exceeded");
104 span
.setStatus(StatusCode
.OK
);
107 this.status
.setStatus("Setting up call");
108 this.status
.setConnection(call
.getRemoteAddress().getHostAddress(), call
.getRemotePort());
109 if (RpcServer
.LOG
.isTraceEnabled()) {
110 RpcServer
.LOG
.trace("{} executing as {}", call
.toShortString(),
111 call
.getRequestUser().map(User
::getName
).orElse("NULL principal"));
113 Throwable errorThrowable
= null;
115 Pair
<Message
, CellScanner
> resultPair
= null;
116 RpcServer
.CurCall
.set(call
);
117 final Span ipcServerSpan
= new IpcServerSpanBuilder(call
).build();
118 try (Scope ignored1
= ipcServerSpan
.makeCurrent()) {
119 if (!this.rpcServer
.isStarted()) {
120 InetSocketAddress address
= rpcServer
.getListenerAddress();
121 throw new ServerNotRunningYetException("Server " +
122 (address
!= null ? address
: "(channel closed)") + " is not running yet");
125 resultPair
= this.rpcServer
.call(call
, this.status
);
126 } catch (TimeoutIOException e
) {
127 RpcServer
.LOG
.warn("Can not complete this request in time, drop it: {}", call
);
128 TraceUtil
.setError(ipcServerSpan
, e
);
130 } catch (Throwable e
) {
131 TraceUtil
.setError(ipcServerSpan
, e
);
132 if (e
instanceof ServerNotRunningYetException
) {
133 // If ServerNotRunningYetException, don't spew stack trace.
134 if (RpcServer
.LOG
.isTraceEnabled()) {
135 RpcServer
.LOG
.trace(call
.toShortString(), e
);
138 // Don't dump full exception.. just String version
139 RpcServer
.LOG
.debug("{}, exception={}", call
.toShortString(), e
);
142 error
= StringUtils
.stringifyException(e
);
143 if (e
instanceof Error
) {
147 RpcServer
.CurCall
.set(null);
148 if (resultPair
!= null) {
149 this.rpcServer
.addCallSize(call
.getSize() * -1);
150 ipcServerSpan
.setStatus(StatusCode
.OK
);
155 this.status
.markComplete("To send response");
156 // return the RPC request read BB we can do here. It is done by now.
159 Message param
= resultPair
!= null ? resultPair
.getFirst() : null;
160 CellScanner cells
= resultPair
!= null ? resultPair
.getSecond() : null;
161 call
.setResponse(param
, cells
, errorThrowable
, error
);
162 call
.sendResponseIfReady();
163 // don't touch `span` here because its status and `end()` are managed in `call#setResponse()`
164 } catch (OutOfMemoryError e
) {
165 TraceUtil
.setError(span
, e
);
166 if (this.rpcServer
.getErrorHandler() != null
167 && this.rpcServer
.getErrorHandler().checkOOME(e
)) {
168 RpcServer
.LOG
.info("{}: exiting on OutOfMemoryError", Thread
.currentThread().getName());
169 // exception intentionally swallowed
171 // rethrow if no handler
174 } catch (ClosedChannelException cce
) {
175 InetSocketAddress address
= rpcServer
.getListenerAddress();
176 RpcServer
.LOG
.warn("{}: caught a ClosedChannelException, " +
177 "this means that the server " + (address
!= null ? address
: "(channel closed)") +
178 " was processing a request but the client went away. The error message was: {}",
179 Thread
.currentThread().getName(), cce
.getMessage());
180 TraceUtil
.setError(span
, cce
);
181 } catch (Exception e
) {
182 RpcServer
.LOG
.warn("{}: caught: {}", Thread
.currentThread().getName(),
183 StringUtils
.stringifyException(e
));
184 TraceUtil
.setError(span
, e
);
187 this.rpcServer
.addCallSize(call
.getSize() * -1);
190 if (this.status
.isRPCRunning()) {
191 this.status
.markComplete("Call error");
193 this.status
.pause("Waiting for a call");
200 * When we want to drop this call because of server is overloaded.
203 try (Scope ignored
= span
.makeCurrent()) {
204 if (call
.disconnectSince() >= 0) {
205 RpcServer
.LOG
.debug("{}: skipped {}", Thread
.currentThread().getName(), call
);
206 span
.addEvent("Client disconnect detected");
207 span
.setStatus(StatusCode
.OK
);
212 InetSocketAddress address
= rpcServer
.getListenerAddress();
213 call
.setResponse(null, null, CALL_DROPPED_EXCEPTION
, "Call dropped, server "
214 + (address
!= null ? address
: "(channel closed)") + " is overloaded, please retry.");
215 TraceUtil
.setError(span
, CALL_DROPPED_EXCEPTION
);
216 call
.sendResponseIfReady();
217 this.rpcServer
.getMetrics().exception(CALL_DROPPED_EXCEPTION
);
218 } catch (ClosedChannelException cce
) {
219 InetSocketAddress address
= rpcServer
.getListenerAddress();
220 RpcServer
.LOG
.warn("{}: caught a ClosedChannelException, " +
221 "this means that the server " + (address
!= null ? address
: "(channel closed)") +
222 " was processing a request but the client went away. The error message was: {}",
223 Thread
.currentThread().getName(), cce
.getMessage());
224 TraceUtil
.setError(span
, cce
);
225 } catch (Exception e
) {
226 RpcServer
.LOG
.warn("{}: caught: {}", Thread
.currentThread().getName(),
227 StringUtils
.stringifyException(e
));
228 TraceUtil
.setError(span
, e
);
231 this.rpcServer
.addCallSize(call
.getSize() * -1);