HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / ipc / CallRunner.java
blob1f85346908ffb9a7ec4e897d7cfa0f8c065acbb6
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.ipc;
20 import io.opentelemetry.api.trace.Span;
21 import io.opentelemetry.api.trace.StatusCode;
22 import io.opentelemetry.context.Scope;
23 import java.net.InetSocketAddress;
24 import java.nio.channels.ClosedChannelException;
25 import org.apache.hadoop.hbase.CallDroppedException;
26 import org.apache.hadoop.hbase.CellScanner;
27 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
29 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
30 import org.apache.hadoop.hbase.security.User;
31 import org.apache.hadoop.hbase.server.trace.IpcServerSpanBuilder;
32 import org.apache.hadoop.hbase.trace.TraceUtil;
33 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34 import org.apache.hadoop.hbase.util.Pair;
35 import org.apache.hadoop.util.StringUtils;
36 import org.apache.yetus.audience.InterfaceAudience;
37 import org.apache.yetus.audience.InterfaceStability;
38 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
40 /**
41 * The request processing logic, which is usually executed in thread pools provided by an
42 * {@link RpcScheduler}. Call {@link #run()} to actually execute the contained
43 * RpcServer.Call
45 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
46 @InterfaceStability.Evolving
47 public class CallRunner {
49 private static final CallDroppedException CALL_DROPPED_EXCEPTION
50 = new CallDroppedException();
52 private RpcCall call;
53 private RpcServerInterface rpcServer;
54 private MonitoredRPCHandler status;
55 private final Span span;
56 private volatile boolean successful;
58 /**
59 * On construction, adds the size of this call to the running count of outstanding call sizes.
60 * Presumption is that we are put on a queue while we wait on an executor to run us. During this
61 * time we occupy heap.
63 // The constructor is shutdown so only RpcServer in this class can make one of these.
64 CallRunner(final RpcServerInterface rpcServer, final RpcCall call) {
65 this.call = call;
66 this.rpcServer = rpcServer;
67 this.span = Span.current();
68 // Add size of the call to queue size.
69 if (call != null && rpcServer != null) {
70 this.rpcServer.addCallSize(call.getSize());
74 public RpcCall getRpcCall() {
75 return call;
78 public void setStatus(MonitoredRPCHandler status) {
79 this.status = status;
82 /**
83 * Cleanup after ourselves... let go of references.
85 private void cleanup() {
86 this.call.cleanup();
87 this.call = null;
88 this.rpcServer = null;
91 public void run() {
92 try (Scope ignored = span.makeCurrent()) {
93 if (call.disconnectSince() >= 0) {
94 RpcServer.LOG.debug("{}: skipped {}", Thread.currentThread().getName(), call);
95 span.addEvent("Client disconnect detected");
96 span.setStatus(StatusCode.OK);
97 return;
99 call.setStartTime(EnvironmentEdgeManager.currentTime());
100 if (call.getStartTime() > call.getDeadline()) {
101 RpcServer.LOG.warn("Dropping timed out call: {}", call);
102 this.rpcServer.getMetrics().callTimedOut();
103 span.addEvent("Call deadline exceeded");
104 span.setStatus(StatusCode.OK);
105 return;
107 this.status.setStatus("Setting up call");
108 this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
109 if (RpcServer.LOG.isTraceEnabled()) {
110 RpcServer.LOG.trace("{} executing as {}", call.toShortString(),
111 call.getRequestUser().map(User::getName).orElse("NULL principal"));
113 Throwable errorThrowable = null;
114 String error = null;
115 Pair<Message, CellScanner> resultPair = null;
116 RpcServer.CurCall.set(call);
117 final Span ipcServerSpan = new IpcServerSpanBuilder(call).build();
118 try (Scope ignored1 = ipcServerSpan.makeCurrent()) {
119 if (!this.rpcServer.isStarted()) {
120 InetSocketAddress address = rpcServer.getListenerAddress();
121 throw new ServerNotRunningYetException("Server " +
122 (address != null ? address : "(channel closed)") + " is not running yet");
124 // make the call
125 resultPair = this.rpcServer.call(call, this.status);
126 } catch (TimeoutIOException e) {
127 RpcServer.LOG.warn("Can not complete this request in time, drop it: {}", call);
128 TraceUtil.setError(ipcServerSpan, e);
129 return;
130 } catch (Throwable e) {
131 TraceUtil.setError(ipcServerSpan, e);
132 if (e instanceof ServerNotRunningYetException) {
133 // If ServerNotRunningYetException, don't spew stack trace.
134 if (RpcServer.LOG.isTraceEnabled()) {
135 RpcServer.LOG.trace(call.toShortString(), e);
137 } else {
138 // Don't dump full exception.. just String version
139 RpcServer.LOG.debug("{}, exception={}", call.toShortString(), e);
141 errorThrowable = e;
142 error = StringUtils.stringifyException(e);
143 if (e instanceof Error) {
144 throw (Error)e;
146 } finally {
147 RpcServer.CurCall.set(null);
148 if (resultPair != null) {
149 this.rpcServer.addCallSize(call.getSize() * -1);
150 ipcServerSpan.setStatus(StatusCode.OK);
151 successful = true;
153 ipcServerSpan.end();
155 this.status.markComplete("To send response");
156 // return the RPC request read BB we can do here. It is done by now.
157 call.cleanup();
158 // Set the response
159 Message param = resultPair != null ? resultPair.getFirst() : null;
160 CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
161 call.setResponse(param, cells, errorThrowable, error);
162 call.sendResponseIfReady();
163 // don't touch `span` here because its status and `end()` are managed in `call#setResponse()`
164 } catch (OutOfMemoryError e) {
165 TraceUtil.setError(span, e);
166 if (this.rpcServer.getErrorHandler() != null
167 && this.rpcServer.getErrorHandler().checkOOME(e)) {
168 RpcServer.LOG.info("{}: exiting on OutOfMemoryError", Thread.currentThread().getName());
169 // exception intentionally swallowed
170 } else {
171 // rethrow if no handler
172 throw e;
174 } catch (ClosedChannelException cce) {
175 InetSocketAddress address = rpcServer.getListenerAddress();
176 RpcServer.LOG.warn("{}: caught a ClosedChannelException, " +
177 "this means that the server " + (address != null ? address : "(channel closed)") +
178 " was processing a request but the client went away. The error message was: {}",
179 Thread.currentThread().getName(), cce.getMessage());
180 TraceUtil.setError(span, cce);
181 } catch (Exception e) {
182 RpcServer.LOG.warn("{}: caught: {}", Thread.currentThread().getName(),
183 StringUtils.stringifyException(e));
184 TraceUtil.setError(span, e);
185 } finally {
186 if (!successful) {
187 this.rpcServer.addCallSize(call.getSize() * -1);
190 if (this.status.isRPCRunning()) {
191 this.status.markComplete("Call error");
193 this.status.pause("Waiting for a call");
194 cleanup();
195 span.end();
200 * When we want to drop this call because of server is overloaded.
202 public void drop() {
203 try (Scope ignored = span.makeCurrent()) {
204 if (call.disconnectSince() >= 0) {
205 RpcServer.LOG.debug("{}: skipped {}", Thread.currentThread().getName(), call);
206 span.addEvent("Client disconnect detected");
207 span.setStatus(StatusCode.OK);
208 return;
211 // Set the response
212 InetSocketAddress address = rpcServer.getListenerAddress();
213 call.setResponse(null, null, CALL_DROPPED_EXCEPTION, "Call dropped, server "
214 + (address != null ? address : "(channel closed)") + " is overloaded, please retry.");
215 TraceUtil.setError(span, CALL_DROPPED_EXCEPTION);
216 call.sendResponseIfReady();
217 this.rpcServer.getMetrics().exception(CALL_DROPPED_EXCEPTION);
218 } catch (ClosedChannelException cce) {
219 InetSocketAddress address = rpcServer.getListenerAddress();
220 RpcServer.LOG.warn("{}: caught a ClosedChannelException, " +
221 "this means that the server " + (address != null ? address : "(channel closed)") +
222 " was processing a request but the client went away. The error message was: {}",
223 Thread.currentThread().getName(), cce.getMessage());
224 TraceUtil.setError(span, cce);
225 } catch (Exception e) {
226 RpcServer.LOG.warn("{}: caught: {}", Thread.currentThread().getName(),
227 StringUtils.stringifyException(e));
228 TraceUtil.setError(span, e);
229 } finally {
230 if (!successful) {
231 this.rpcServer.addCallSize(call.getSize() * -1);
233 cleanup();
234 span.end();