HBASE-26286: Add support for specifying store file tracker when restoring or cloning...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / ipc / CallRunner.java
blobef37247f412f28aa150d987f6329ecc51effd028
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.ipc;
20 import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY;
21 import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY;
22 import io.opentelemetry.api.trace.Span;
23 import io.opentelemetry.api.trace.StatusCode;
24 import io.opentelemetry.context.Context;
25 import io.opentelemetry.context.Scope;
26 import java.net.InetSocketAddress;
27 import java.nio.channels.ClosedChannelException;
28 import java.util.Optional;
29 import org.apache.hadoop.hbase.CallDroppedException;
30 import org.apache.hadoop.hbase.CellScanner;
31 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
33 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
34 import org.apache.hadoop.hbase.security.User;
35 import org.apache.hadoop.hbase.trace.TraceUtil;
36 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
37 import org.apache.hadoop.hbase.util.Pair;
38 import org.apache.hadoop.util.StringUtils;
39 import org.apache.yetus.audience.InterfaceAudience;
40 import org.apache.yetus.audience.InterfaceStability;
42 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
44 /**
45 * The request processing logic, which is usually executed in thread pools provided by an
46 * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained
47 * RpcServer.Call
49 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
50 @InterfaceStability.Evolving
51 public class CallRunner {
53 private static final CallDroppedException CALL_DROPPED_EXCEPTION
54 = new CallDroppedException();
56 private RpcCall call;
57 private RpcServerInterface rpcServer;
58 private MonitoredRPCHandler status;
59 private volatile boolean sucessful;
61 /**
62 * On construction, adds the size of this call to the running count of outstanding call sizes.
63 * Presumption is that we are put on a queue while we wait on an executor to run us. During this
64 * time we occupy heap.
66 // The constructor is shutdown so only RpcServer in this class can make one of these.
67 CallRunner(final RpcServerInterface rpcServer, final RpcCall call) {
68 this.call = call;
69 this.rpcServer = rpcServer;
70 // Add size of the call to queue size.
71 if (call != null && rpcServer != null) {
72 this.rpcServer.addCallSize(call.getSize());
76 public RpcCall getRpcCall() {
77 return call;
80 public void setStatus(MonitoredRPCHandler status) {
81 this.status = status;
84 /**
85 * Cleanup after ourselves... let go of references.
87 private void cleanup() {
88 this.call.cleanup();
89 this.call = null;
90 this.rpcServer = null;
93 private String getServiceName() {
94 return call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
97 private String getMethodName() {
98 return call.getMethod() != null ? call.getMethod().getName() : "";
101 public void run() {
102 try {
103 if (call.disconnectSince() >= 0) {
104 if (RpcServer.LOG.isDebugEnabled()) {
105 RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
107 return;
109 call.setStartTime(EnvironmentEdgeManager.currentTime());
110 if (call.getStartTime() > call.getDeadline()) {
111 RpcServer.LOG.warn("Dropping timed out call: " + call);
112 return;
114 this.status.setStatus("Setting up call");
115 this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
116 if (RpcServer.LOG.isTraceEnabled()) {
117 Optional<User> remoteUser = call.getRequestUser();
118 RpcServer.LOG.trace(call.toShortString() + " executing as " +
119 (remoteUser.isPresent() ? remoteUser.get().getName() : "NULL principal"));
121 Throwable errorThrowable = null;
122 String error = null;
123 Pair<Message, CellScanner> resultPair = null;
124 RpcServer.CurCall.set(call);
125 String serviceName = getServiceName();
126 String methodName = getMethodName();
127 Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcServer.callMethod")
128 .setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan()
129 .setAttribute(RPC_SERVICE_KEY, serviceName)
130 .setAttribute(RPC_METHOD_KEY, methodName);
131 try (Scope traceScope = span.makeCurrent()) {
132 if (!this.rpcServer.isStarted()) {
133 InetSocketAddress address = rpcServer.getListenerAddress();
134 throw new ServerNotRunningYetException("Server " +
135 (address != null ? address : "(channel closed)") + " is not running yet");
137 // make the call
138 resultPair = this.rpcServer.call(call, this.status);
139 } catch (TimeoutIOException e){
140 RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
141 TraceUtil.setError(span, e);
142 return;
143 } catch (Throwable e) {
144 TraceUtil.setError(span, e);
145 if (e instanceof ServerNotRunningYetException) {
146 // If ServerNotRunningYetException, don't spew stack trace.
147 if (RpcServer.LOG.isTraceEnabled()) {
148 RpcServer.LOG.trace(call.toShortString(), e);
150 } else {
151 // Don't dump full exception.. just String version
152 RpcServer.LOG.debug(call.toShortString() + ", exception=" + e);
154 errorThrowable = e;
155 error = StringUtils.stringifyException(e);
156 if (e instanceof Error) {
157 throw (Error)e;
159 } finally {
160 RpcServer.CurCall.set(null);
161 if (resultPair != null) {
162 this.rpcServer.addCallSize(call.getSize() * -1);
163 span.setStatus(StatusCode.OK);
164 sucessful = true;
166 span.end();
168 this.status.markComplete("To send response");
169 // return back the RPC request read BB we can do here. It is done by now.
170 call.cleanup();
171 // Set the response
172 Message param = resultPair != null ? resultPair.getFirst() : null;
173 CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
174 call.setResponse(param, cells, errorThrowable, error);
175 call.sendResponseIfReady();
176 } catch (OutOfMemoryError e) {
177 if (this.rpcServer.getErrorHandler() != null) {
178 if (this.rpcServer.getErrorHandler().checkOOME(e)) {
179 RpcServer.LOG.info(Thread.currentThread().getName() + ": exiting on OutOfMemoryError");
180 return;
182 } else {
183 // rethrow if no handler
184 throw e;
186 } catch (ClosedChannelException cce) {
187 InetSocketAddress address = rpcServer.getListenerAddress();
188 RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
189 "this means that the server " + (address != null ? address : "(channel closed)") +
190 " was processing a request but the client went away. The error message was: " +
191 cce.getMessage());
192 } catch (Exception e) {
193 RpcServer.LOG.warn(Thread.currentThread().getName()
194 + ": caught: " + StringUtils.stringifyException(e));
195 } finally {
196 if (!sucessful) {
197 this.rpcServer.addCallSize(call.getSize() * -1);
200 if (this.status.isRPCRunning()) {
201 this.status.markComplete("Call error");
203 this.status.pause("Waiting for a call");
204 cleanup();
209 * When we want to drop this call because of server is overloaded.
211 public void drop() {
212 try {
213 if (call.disconnectSince() >= 0) {
214 if (RpcServer.LOG.isDebugEnabled()) {
215 RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
217 return;
220 // Set the response
221 InetSocketAddress address = rpcServer.getListenerAddress();
222 call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
223 + (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
224 call.sendResponseIfReady();
225 } catch (ClosedChannelException cce) {
226 InetSocketAddress address = rpcServer.getListenerAddress();
227 RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
228 "this means that the server " + (address != null ? address : "(channel closed)") +
229 " was processing a request but the client went away. The error message was: " +
230 cce.getMessage());
231 } catch (Exception e) {
232 RpcServer.LOG.warn(Thread.currentThread().getName()
233 + ": caught: " + StringUtils.stringifyException(e));
234 } finally {
235 if (!sucessful) {
236 this.rpcServer.addCallSize(call.getSize() * -1);
238 cleanup();