HBASE-26286: Add support for specifying store file tracker when restoring or cloning...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / ipc / RpcServer.java
blobb41619a87765522205fecd607123dfd64cac6d2d
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.ipc;
21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.nio.ByteBuffer;
27 import java.nio.channels.ReadableByteChannel;
28 import java.nio.channels.WritableByteChannel;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Locale;
33 import java.util.Map;
34 import java.util.Optional;
35 import java.util.concurrent.atomic.LongAdder;
36 import org.apache.commons.lang3.StringUtils;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.CallQueueTooBigException;
39 import org.apache.hadoop.hbase.CellScanner;
40 import org.apache.hadoop.hbase.DoNotRetryIOException;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.Server;
43 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
44 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
45 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
46 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
47 import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
48 import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
49 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
50 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
51 import org.apache.hadoop.hbase.security.SaslUtil;
52 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
53 import org.apache.hadoop.hbase.security.User;
54 import org.apache.hadoop.hbase.security.UserProvider;
55 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
56 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
57 import org.apache.hadoop.hbase.util.GsonUtil;
58 import org.apache.hadoop.hbase.util.Pair;
59 import org.apache.hadoop.security.UserGroupInformation;
60 import org.apache.hadoop.security.authorize.AuthorizationException;
61 import org.apache.hadoop.security.authorize.PolicyProvider;
62 import org.apache.hadoop.security.authorize.ProxyUsers;
63 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
64 import org.apache.hadoop.security.token.SecretManager;
65 import org.apache.hadoop.security.token.TokenIdentifier;
66 import org.apache.yetus.audience.InterfaceAudience;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
70 import org.apache.hbase.thirdparty.com.google.gson.Gson;
71 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
72 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
73 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
74 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
75 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
77 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
78 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
79 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
81 /**
82 * An RPC server that hosts protobuf described Services.
85 @InterfaceAudience.Private
86 public abstract class RpcServer implements RpcServerInterface,
87 ConfigurationObserver {
88 // LOG is being used in CallRunner and the log level is being changed in tests
89 public static final Logger LOG = LoggerFactory.getLogger(RpcServer.class);
90 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
91 = new CallQueueTooBigException();
93 private static final String MULTI_GETS = "multi.gets";
94 private static final String MULTI_MUTATIONS = "multi.mutations";
95 private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
97 private final boolean authorize;
98 private final boolean isOnlineLogProviderEnabled;
99 protected boolean isSecurityEnabled;
101 public static final byte CURRENT_VERSION = 0;
104 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
106 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
107 "hbase.ipc.server.fallback-to-simple-auth-allowed";
110 * How many calls/handler are allowed in the queue.
112 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
114 protected final CellBlockBuilder cellBlockBuilder;
116 protected static final String AUTH_FAILED_FOR = "Auth failed for ";
117 protected static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
118 protected static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."
119 + Server.class.getName());
120 protected SecretManager<TokenIdentifier> secretManager;
121 protected final Map<String, String> saslProps;
123 protected ServiceAuthorizationManager authManager;
125 /** This is set to Call object before Handler invokes an RPC and ybdie
126 * after the call returns.
128 protected static final ThreadLocal<RpcCall> CurCall = new ThreadLocal<>();
130 /** Keeps MonitoredRPCHandler per handler thread. */
131 protected static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC = new ThreadLocal<>();
133 protected final InetSocketAddress bindAddress;
135 protected MetricsHBaseServer metrics;
137 protected final Configuration conf;
140 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
141 * this size, then we will reject the call (after parsing it though). It will go back to the
142 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
143 * call queue size gets incremented after we parse a call and before we add it to the queue of
144 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
145 * size is kept in {@link #callQueueSizeInBytes}.
146 * @see #callQueueSizeInBytes
147 * @see #DEFAULT_MAX_CALLQUEUE_SIZE
149 protected final long maxQueueSizeInBytes;
150 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
153 * This is a running count of the size in bytes of all outstanding calls whether currently
154 * executing or queued waiting to be run.
156 protected final LongAdder callQueueSizeInBytes = new LongAdder();
158 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
159 protected final boolean tcpKeepAlive; // if T then use keepalives
162 * This flag is used to indicate to sub threads when they should go down. When we call
163 * {@link #start()}, all threads started will consult this flag on whether they should
164 * keep going. It is set to false when {@link #stop()} is called.
166 volatile boolean running = true;
169 * This flag is set to true after all threads are up and 'running' and the server is then opened
170 * for business by the call to {@link #start()}.
172 volatile boolean started = false;
174 protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
176 protected HBaseRPCErrorHandler errorHandler = null;
178 public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
180 protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
181 protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
184 * Minimum allowable timeout (in milliseconds) in rpc request's header. This
185 * configuration exists to prevent the rpc service regarding this request as timeout immediately.
187 protected static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout";
188 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
190 /** Default value for above params */
191 public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
192 protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
193 protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
195 protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
196 protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
197 protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
199 protected static final Gson GSON = GsonUtil.createGsonWithDisableHtmlEscaping().create();
201 protected final int maxRequestSize;
202 protected final int warnResponseTime;
203 protected final int warnResponseSize;
205 protected final int minClientRequestTimeout;
207 protected final Server server;
208 protected final List<BlockingServiceAndInterface> services;
210 protected final RpcScheduler scheduler;
212 protected UserProvider userProvider;
214 protected final ByteBuffAllocator bbAllocator;
216 protected volatile boolean allowFallbackToSimpleAuth;
219 * Used to get details for scan with a scanner_id<br/>
220 * TODO try to figure out a better way and remove reference from regionserver package later.
222 private RSRpcServices rsRpcServices;
226 * Use to add online slowlog responses
228 private NamedQueueRecorder namedQueueRecorder;
230 @FunctionalInterface
231 protected interface CallCleanup {
232 void run();
236 * Datastructure for passing a {@link BlockingService} and its associated class of
237 * protobuf service interface. For example, a server that fielded what is defined
238 * in the client protobuf service would pass in an implementation of the client blocking service
239 * and then its ClientService.BlockingInterface.class. Used checking connection setup.
241 public static class BlockingServiceAndInterface {
242 private final BlockingService service;
243 private final Class<?> serviceInterface;
244 public BlockingServiceAndInterface(final BlockingService service,
245 final Class<?> serviceInterface) {
246 this.service = service;
247 this.serviceInterface = serviceInterface;
249 public Class<?> getServiceInterface() {
250 return this.serviceInterface;
252 public BlockingService getBlockingService() {
253 return this.service;
258 * Constructs a server listening on the named port and address.
259 * @param server hosting instance of {@link Server}. We will do authentications if an
260 * instance else pass null for no authentication check.
261 * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
262 * @param services A list of services.
263 * @param bindAddress Where to listen
264 * @param conf
265 * @param scheduler
266 * @param reservoirEnabled Enable ByteBufferPool or not.
268 public RpcServer(final Server server, final String name,
269 final List<BlockingServiceAndInterface> services,
270 final InetSocketAddress bindAddress, Configuration conf,
271 RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
272 this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
273 this.server = server;
274 this.services = services;
275 this.bindAddress = bindAddress;
276 this.conf = conf;
277 // See declaration above for documentation on what this size is.
278 this.maxQueueSizeInBytes =
279 this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
281 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
282 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
283 this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
284 DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
285 this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
287 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
288 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
289 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
291 this.cellBlockBuilder = new CellBlockBuilder(conf);
293 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
294 this.userProvider = UserProvider.instantiate(conf);
295 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
296 if (isSecurityEnabled) {
297 saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
298 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
299 } else {
300 saslProps = Collections.emptyMap();
303 this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
304 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
305 this.scheduler = scheduler;
308 @Override
309 public void onConfigurationChange(Configuration newConf) {
310 initReconfigurable(newConf);
311 if (scheduler instanceof ConfigurationObserver) {
312 ((ConfigurationObserver) scheduler).onConfigurationChange(newConf);
314 if (authorize) {
315 refreshAuthManager(newConf, new HBasePolicyProvider());
319 protected void initReconfigurable(Configuration confToLoad) {
320 this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
321 if (isSecurityEnabled && allowFallbackToSimpleAuth) {
322 LOG.warn("********* WARNING! *********");
323 LOG.warn("This server is configured to allow connections from INSECURE clients");
324 LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
325 LOG.warn("While this option is enabled, client identities cannot be secured, and user");
326 LOG.warn("impersonation is possible!");
327 LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
328 LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
329 LOG.warn("****************************");
333 Configuration getConf() {
334 return conf;
337 @Override
338 public boolean isStarted() {
339 return this.started;
342 @Override
343 public synchronized void refreshAuthManager(Configuration conf, PolicyProvider pp) {
344 // Ignore warnings that this should be accessed in a static way instead of via an instance;
345 // it'll break if you go via static route.
346 System.setProperty("hadoop.policy.file", "hbase-policy.xml");
347 this.authManager.refresh(conf, pp);
348 LOG.info("Refreshed hbase-policy.xml successfully");
349 ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
350 LOG.info("Refreshed super and proxy users successfully");
353 protected AuthenticationTokenSecretManager createSecretManager() {
354 if (!isSecurityEnabled) return null;
355 if (server == null) return null;
356 Configuration conf = server.getConfiguration();
357 long keyUpdateInterval =
358 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
359 long maxAge =
360 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
361 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
362 server.getServerName().toString(), keyUpdateInterval, maxAge);
365 public SecretManager<? extends TokenIdentifier> getSecretManager() {
366 return this.secretManager;
369 @SuppressWarnings("unchecked")
370 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
371 this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
375 * This is a server side method, which is invoked over RPC. On success
376 * the return response has protobuf response payload. On failure, the
377 * exception name and the stack trace are returned in the protobuf response.
379 @Override
380 public Pair<Message, CellScanner> call(RpcCall call,
381 MonitoredRPCHandler status) throws IOException {
382 try {
383 MethodDescriptor md = call.getMethod();
384 Message param = call.getParam();
385 status.setRPC(md.getName(), new Object[]{param},
386 call.getReceiveTime());
387 // TODO: Review after we add in encoded data blocks.
388 status.setRPCPacket(param);
389 status.resume("Servicing call");
390 //get an instance of the method arg type
391 HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
392 controller.setCallTimeout(call.getTimeout());
393 Message result = call.getService().callBlockingMethod(md, controller, param);
394 long receiveTime = call.getReceiveTime();
395 long startTime = call.getStartTime();
396 long endTime = EnvironmentEdgeManager.currentTime();
397 int processingTime = (int) (endTime - startTime);
398 int qTime = (int) (startTime - receiveTime);
399 int totalTime = (int) (endTime - receiveTime);
400 if (LOG.isTraceEnabled()) {
401 LOG.trace(CurCall.get().toString() +
402 ", response " + TextFormat.shortDebugString(result) +
403 " queueTime: " + qTime +
404 " processingTime: " + processingTime +
405 " totalTime: " + totalTime);
407 // Use the raw request call size for now.
408 long requestSize = call.getSize();
409 long responseSize = result.getSerializedSize();
410 if (call.isClientCellBlockSupported()) {
411 // Include the payload size in HBaseRpcController
412 responseSize += call.getResponseCellSize();
415 metrics.dequeuedCall(qTime);
416 metrics.processedCall(processingTime);
417 metrics.totalCall(totalTime);
418 metrics.receivedRequest(requestSize);
419 metrics.sentResponse(responseSize);
420 // log any RPC responses that are slower than the configured warn
421 // response time or larger than configured warning size
422 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
423 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
424 if (tooSlow || tooLarge) {
425 final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
426 // when tagging, we let TooLarge trump TooSmall to keep output simple
427 // note that large responses will often also be slow.
428 logResponse(param,
429 md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
430 tooLarge, tooSlow,
431 status.getClient(), startTime, processingTime, qTime,
432 responseSize, userName);
433 if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
434 // send logs to ring buffer owned by slowLogRecorder
435 final String className =
436 server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
437 this.namedQueueRecorder.addRecord(
438 new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
439 tooLarge));
442 return new Pair<>(result, controller.cellScanner());
443 } catch (Throwable e) {
444 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before
445 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
446 // need to pass it over the wire.
447 if (e instanceof ServiceException) {
448 if (e.getCause() == null) {
449 LOG.debug("Caught a ServiceException with null cause", e);
450 } else {
451 e = e.getCause();
455 // increment the number of requests that were exceptions.
456 metrics.exception(e);
458 if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
459 if (e instanceof IOException) throw (IOException)e;
460 LOG.error("Unexpected throwable object ", e);
461 throw new IOException(e.getMessage(), e);
466 * Logs an RPC response to the LOG file, producing valid JSON objects for
467 * client Operations.
468 * @param param The parameters received in the call.
469 * @param methodName The name of the method invoked
470 * @param call The string representation of the call
471 * @param tooLarge To indicate if the event is tooLarge
472 * @param tooSlow To indicate if the event is tooSlow
473 * @param clientAddress The address of the client who made this call.
474 * @param startTime The time that the call was initiated, in ms.
475 * @param processingTime The duration that the call took to run, in ms.
476 * @param qTime The duration that the call spent on the queue
477 * prior to being initiated, in ms.
478 * @param responseSize The size in bytes of the response buffer.
479 * @param userName UserName of the current RPC Call
481 void logResponse(Message param, String methodName, String call, boolean tooLarge,
482 boolean tooSlow, String clientAddress, long startTime, int processingTime, int qTime,
483 long responseSize, String userName) {
484 final String className = server == null ? StringUtils.EMPTY :
485 server.getClass().getSimpleName();
486 // base information that is reported regardless of type of call
487 Map<String, Object> responseInfo = new HashMap<>();
488 responseInfo.put("starttimems", startTime);
489 responseInfo.put("processingtimems", processingTime);
490 responseInfo.put("queuetimems", qTime);
491 responseInfo.put("responsesize", responseSize);
492 responseInfo.put("client", clientAddress);
493 responseInfo.put("class", className);
494 responseInfo.put("method", methodName);
495 responseInfo.put("call", call);
496 // The params could be really big, make sure they don't kill us at WARN
497 String stringifiedParam = ProtobufUtil.getShortTextFormat(param);
498 if (stringifiedParam.length() > 150) {
499 // Truncate to 1000 chars if TRACE is on, else to 150 chars
500 stringifiedParam = truncateTraceLog(stringifiedParam);
502 responseInfo.put("param", stringifiedParam);
503 if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) {
504 ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param);
505 String scanDetails;
506 if (request.hasScannerId()) {
507 long scannerId = request.getScannerId();
508 scanDetails = rsRpcServices.getScanDetailsWithId(scannerId);
509 } else {
510 scanDetails = rsRpcServices.getScanDetailsWithRequest(request);
512 if (scanDetails != null) {
513 responseInfo.put("scandetails", scanDetails);
516 if (param instanceof ClientProtos.MultiRequest) {
517 int numGets = 0;
518 int numMutations = 0;
519 int numServiceCalls = 0;
520 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param;
521 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
522 for (ClientProtos.Action action: regionAction.getActionList()) {
523 if (action.hasMutation()) {
524 numMutations++;
526 if (action.hasGet()) {
527 numGets++;
529 if (action.hasServiceCall()) {
530 numServiceCalls++;
534 responseInfo.put(MULTI_GETS, numGets);
535 responseInfo.put(MULTI_MUTATIONS, numMutations);
536 responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
538 final String tag = (tooLarge && tooSlow) ? "TooLarge & TooSlow"
539 : (tooSlow ? "TooSlow" : "TooLarge");
540 LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
545 * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
546 * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
547 * @param strParam stringifiedParam to be truncated
548 * @return truncated trace log string
550 String truncateTraceLog(String strParam) {
551 if (LOG.isTraceEnabled()) {
552 int traceLogMaxLength = getConf().getInt(TRACE_LOG_MAX_LENGTH, DEFAULT_TRACE_LOG_MAX_LENGTH);
553 int truncatedLength =
554 strParam.length() < traceLogMaxLength ? strParam.length() : traceLogMaxLength;
555 String truncatedFlag = truncatedLength == strParam.length() ? "" : KEY_WORD_TRUNCATED;
556 return strParam.subSequence(0, truncatedLength) + truncatedFlag;
558 return strParam.subSequence(0, 150) + KEY_WORD_TRUNCATED;
562 * Set the handler for calling out of RPC for error conditions.
563 * @param handler the handler implementation
565 @Override
566 public void setErrorHandler(HBaseRPCErrorHandler handler) {
567 this.errorHandler = handler;
570 @Override
571 public HBaseRPCErrorHandler getErrorHandler() {
572 return this.errorHandler;
576 * Returns the metrics instance for reporting RPC call statistics
578 @Override
579 public MetricsHBaseServer getMetrics() {
580 return metrics;
583 @Override
584 public void addCallSize(final long diff) {
585 this.callQueueSizeInBytes.add(diff);
589 * Authorize the incoming client connection.
590 * @param user client user
591 * @param connection incoming connection
592 * @param addr InetAddress of incoming connection
593 * @throws AuthorizationException when the client isn't authorized to talk the protocol
595 public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
596 InetAddress addr) throws AuthorizationException {
597 if (authorize) {
598 Class<?> c = getServiceInterface(services, connection.getServiceName());
599 authManager.authorize(user, c, getConf(), addr);
604 * When the read or write buffer size is larger than this limit, i/o will be
605 * done in chunks of this size. Most RPC requests and responses would be
606 * be smaller.
608 protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
611 * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
612 * If the amount of data is large, it writes to channel in smaller chunks.
613 * This is to avoid jdk from creating many direct buffers as the size of
614 * ByteBuffer increases. There should not be any performance degredation.
616 * @param channel writable byte channel to write on
617 * @param buffer buffer to write
618 * @return number of bytes written
619 * @throws java.io.IOException e
620 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
622 protected int channelRead(ReadableByteChannel channel,
623 ByteBuffer buffer) throws IOException {
625 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
626 channel.read(buffer) : channelIO(channel, null, buffer);
627 if (count > 0) {
628 metrics.receivedBytes(count);
630 return count;
634 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
635 * Only one of readCh or writeCh should be non-null.
637 * @param readCh read channel
638 * @param writeCh write channel
639 * @param buf buffer to read or write into/out of
640 * @return bytes written
641 * @throws java.io.IOException e
642 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
644 private static int channelIO(ReadableByteChannel readCh,
645 WritableByteChannel writeCh,
646 ByteBuffer buf) throws IOException {
648 int originalLimit = buf.limit();
649 int initialRemaining = buf.remaining();
650 int ret = 0;
652 while (buf.remaining() > 0) {
653 try {
654 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
655 buf.limit(buf.position() + ioSize);
657 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
659 if (ret < ioSize) {
660 break;
663 } finally {
664 buf.limit(originalLimit);
668 int nBytes = initialRemaining - buf.remaining();
669 return (nBytes > 0) ? nBytes : ret;
673 * Needed for features such as delayed calls. We need to be able to store the current call
674 * so that we can complete it later or ask questions of what is supported by the current ongoing
675 * call.
676 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
678 public static Optional<RpcCall> getCurrentCall() {
679 return Optional.ofNullable(CurCall.get());
682 public static boolean isInRpcCallContext() {
683 return CurCall.get() != null;
687 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
688 * master's rpc call, it may generate new procedure and mutate the region which store procedure.
689 * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
690 * call to avoid the rpc check.
691 * @return the currently ongoing rpc call
693 public static Optional<RpcCall> unsetCurrentCall() {
694 Optional<RpcCall> rpcCall = getCurrentCall();
695 CurCall.set(null);
696 return rpcCall;
700 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
701 * rpc call back after mutate region.
703 public static void setCurrentCall(RpcCall rpcCall) {
704 CurCall.set(rpcCall);
708 * Returns the user credentials associated with the current RPC request or not present if no
709 * credentials were provided.
710 * @return A User
712 public static Optional<User> getRequestUser() {
713 Optional<RpcCall> ctx = getCurrentCall();
714 return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
718 * The number of open RPC conections
719 * @return the number of open rpc connections
721 abstract public int getNumOpenConnections();
724 * Returns the username for any user associated with the current RPC
725 * request or not present if no user is set.
727 public static Optional<String> getRequestUserName() {
728 return getRequestUser().map(User::getShortName);
732 * @return Address of remote client if a request is ongoing, else null
734 public static Optional<InetAddress> getRemoteAddress() {
735 return getCurrentCall().map(RpcCall::getRemoteAddress);
739 * @param serviceName Some arbitrary string that represents a 'service'.
740 * @param services Available service instances
741 * @return Matching BlockingServiceAndInterface pair
743 protected static BlockingServiceAndInterface getServiceAndInterface(
744 final List<BlockingServiceAndInterface> services, final String serviceName) {
745 for (BlockingServiceAndInterface bs : services) {
746 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
747 return bs;
750 return null;
754 * @param serviceName Some arbitrary string that represents a 'service'.
755 * @param services Available services and their service interfaces.
756 * @return Service interface class for <code>serviceName</code>
758 protected static Class<?> getServiceInterface(
759 final List<BlockingServiceAndInterface> services,
760 final String serviceName) {
761 BlockingServiceAndInterface bsasi =
762 getServiceAndInterface(services, serviceName);
763 return bsasi == null? null: bsasi.getServiceInterface();
767 * @param serviceName Some arbitrary string that represents a 'service'.
768 * @param services Available services and their service interfaces.
769 * @return BlockingService that goes with the passed <code>serviceName</code>
771 protected static BlockingService getService(
772 final List<BlockingServiceAndInterface> services,
773 final String serviceName) {
774 BlockingServiceAndInterface bsasi =
775 getServiceAndInterface(services, serviceName);
776 return bsasi == null? null: bsasi.getBlockingService();
779 protected static MonitoredRPCHandler getStatus() {
780 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
781 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
782 if (status != null) {
783 return status;
785 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
786 status.pause("Waiting for a call");
787 RpcServer.MONITORED_RPC.set(status);
788 return status;
791 /** Returns the remote side ip address when invoked inside an RPC
792 * Returns null incase of an error.
793 * @return InetAddress
795 public static InetAddress getRemoteIp() {
796 RpcCall call = CurCall.get();
797 if (call != null) {
798 return call.getRemoteAddress();
800 return null;
803 @Override
804 public RpcScheduler getScheduler() {
805 return scheduler;
808 @Override
809 public ByteBuffAllocator getByteBuffAllocator() {
810 return this.bbAllocator;
813 @Override
814 public void setRsRpcServices(RSRpcServices rsRpcServices) {
815 this.rsRpcServices = rsRpcServices;
818 @Override
819 public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
820 this.namedQueueRecorder = namedQueueRecorder;
823 protected boolean needAuthorization() {
824 return authorize;