2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.ipc
;
21 import static org
.apache
.hadoop
.fs
.CommonConfigurationKeysPublic
.HADOOP_SECURITY_AUTHORIZATION
;
23 import java
.io
.IOException
;
24 import java
.net
.InetAddress
;
25 import java
.net
.InetSocketAddress
;
26 import java
.nio
.ByteBuffer
;
27 import java
.nio
.channels
.ReadableByteChannel
;
28 import java
.nio
.channels
.WritableByteChannel
;
29 import java
.util
.Collections
;
30 import java
.util
.HashMap
;
31 import java
.util
.List
;
32 import java
.util
.Locale
;
34 import java
.util
.Optional
;
35 import java
.util
.concurrent
.atomic
.LongAdder
;
36 import org
.apache
.commons
.lang3
.StringUtils
;
37 import org
.apache
.hadoop
.conf
.Configuration
;
38 import org
.apache
.hadoop
.hbase
.CallQueueTooBigException
;
39 import org
.apache
.hadoop
.hbase
.CellScanner
;
40 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
41 import org
.apache
.hadoop
.hbase
.HConstants
;
42 import org
.apache
.hadoop
.hbase
.Server
;
43 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationObserver
;
44 import org
.apache
.hadoop
.hbase
.io
.ByteBuffAllocator
;
45 import org
.apache
.hadoop
.hbase
.monitoring
.MonitoredRPCHandler
;
46 import org
.apache
.hadoop
.hbase
.monitoring
.TaskMonitor
;
47 import org
.apache
.hadoop
.hbase
.namequeues
.NamedQueueRecorder
;
48 import org
.apache
.hadoop
.hbase
.namequeues
.RpcLogDetails
;
49 import org
.apache
.hadoop
.hbase
.regionserver
.RSRpcServices
;
50 import org
.apache
.hadoop
.hbase
.security
.HBasePolicyProvider
;
51 import org
.apache
.hadoop
.hbase
.security
.SaslUtil
;
52 import org
.apache
.hadoop
.hbase
.security
.SaslUtil
.QualityOfProtection
;
53 import org
.apache
.hadoop
.hbase
.security
.User
;
54 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
55 import org
.apache
.hadoop
.hbase
.security
.token
.AuthenticationTokenSecretManager
;
56 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
57 import org
.apache
.hadoop
.hbase
.util
.GsonUtil
;
58 import org
.apache
.hadoop
.hbase
.util
.Pair
;
59 import org
.apache
.hadoop
.security
.UserGroupInformation
;
60 import org
.apache
.hadoop
.security
.authorize
.AuthorizationException
;
61 import org
.apache
.hadoop
.security
.authorize
.PolicyProvider
;
62 import org
.apache
.hadoop
.security
.authorize
.ProxyUsers
;
63 import org
.apache
.hadoop
.security
.authorize
.ServiceAuthorizationManager
;
64 import org
.apache
.hadoop
.security
.token
.SecretManager
;
65 import org
.apache
.hadoop
.security
.token
.TokenIdentifier
;
66 import org
.apache
.yetus
.audience
.InterfaceAudience
;
67 import org
.slf4j
.Logger
;
68 import org
.slf4j
.LoggerFactory
;
70 import org
.apache
.hbase
.thirdparty
.com
.google
.gson
.Gson
;
71 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.BlockingService
;
72 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.MethodDescriptor
;
73 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
74 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
75 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.TextFormat
;
77 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
78 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
79 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RPCProtos
.ConnectionHeader
;
82 * An RPC server that hosts protobuf described Services.
85 @InterfaceAudience.Private
86 public abstract class RpcServer
implements RpcServerInterface
,
87 ConfigurationObserver
{
88 // LOG is being used in CallRunner and the log level is being changed in tests
89 public static final Logger LOG
= LoggerFactory
.getLogger(RpcServer
.class);
90 protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
91 = new CallQueueTooBigException();
93 private static final String MULTI_GETS
= "multi.gets";
94 private static final String MULTI_MUTATIONS
= "multi.mutations";
95 private static final String MULTI_SERVICE_CALLS
= "multi.service_calls";
97 private final boolean authorize
;
98 private final boolean isOnlineLogProviderEnabled
;
99 protected boolean isSecurityEnabled
;
101 public static final byte CURRENT_VERSION
= 0;
104 * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
106 public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH
=
107 "hbase.ipc.server.fallback-to-simple-auth-allowed";
110 * How many calls/handler are allowed in the queue.
112 protected static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER
= 10;
114 protected final CellBlockBuilder cellBlockBuilder
;
116 protected static final String AUTH_FAILED_FOR
= "Auth failed for ";
117 protected static final String AUTH_SUCCESSFUL_FOR
= "Auth successful for ";
118 protected static final Logger AUDITLOG
= LoggerFactory
.getLogger("SecurityLogger."
119 + Server
.class.getName());
120 protected SecretManager
<TokenIdentifier
> secretManager
;
121 protected final Map
<String
, String
> saslProps
;
123 protected ServiceAuthorizationManager authManager
;
125 /** This is set to Call object before Handler invokes an RPC and ybdie
126 * after the call returns.
128 protected static final ThreadLocal
<RpcCall
> CurCall
= new ThreadLocal
<>();
130 /** Keeps MonitoredRPCHandler per handler thread. */
131 protected static final ThreadLocal
<MonitoredRPCHandler
> MONITORED_RPC
= new ThreadLocal
<>();
133 protected final InetSocketAddress bindAddress
;
135 protected MetricsHBaseServer metrics
;
137 protected final Configuration conf
;
140 * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
141 * this size, then we will reject the call (after parsing it though). It will go back to the
142 * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
143 * call queue size gets incremented after we parse a call and before we add it to the queue of
144 * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
145 * size is kept in {@link #callQueueSizeInBytes}.
146 * @see #callQueueSizeInBytes
147 * @see #DEFAULT_MAX_CALLQUEUE_SIZE
149 protected final long maxQueueSizeInBytes
;
150 protected static final int DEFAULT_MAX_CALLQUEUE_SIZE
= 1024 * 1024 * 1024;
153 * This is a running count of the size in bytes of all outstanding calls whether currently
154 * executing or queued waiting to be run.
156 protected final LongAdder callQueueSizeInBytes
= new LongAdder();
158 protected final boolean tcpNoDelay
; // if T then disable Nagle's Algorithm
159 protected final boolean tcpKeepAlive
; // if T then use keepalives
162 * This flag is used to indicate to sub threads when they should go down. When we call
163 * {@link #start()}, all threads started will consult this flag on whether they should
164 * keep going. It is set to false when {@link #stop()} is called.
166 volatile boolean running
= true;
169 * This flag is set to true after all threads are up and 'running' and the server is then opened
170 * for business by the call to {@link #start()}.
172 volatile boolean started
= false;
174 protected AuthenticationTokenSecretManager authTokenSecretMgr
= null;
176 protected HBaseRPCErrorHandler errorHandler
= null;
178 public static final String MAX_REQUEST_SIZE
= "hbase.ipc.max.request.size";
180 protected static final String WARN_RESPONSE_TIME
= "hbase.ipc.warn.response.time";
181 protected static final String WARN_RESPONSE_SIZE
= "hbase.ipc.warn.response.size";
184 * Minimum allowable timeout (in milliseconds) in rpc request's header. This
185 * configuration exists to prevent the rpc service regarding this request as timeout immediately.
187 protected static final String MIN_CLIENT_REQUEST_TIMEOUT
= "hbase.ipc.min.client.request.timeout";
188 protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT
= 20;
190 /** Default value for above params */
191 public static final int DEFAULT_MAX_REQUEST_SIZE
= DEFAULT_MAX_CALLQUEUE_SIZE
/ 4; // 256M
192 protected static final int DEFAULT_WARN_RESPONSE_TIME
= 10000; // milliseconds
193 protected static final int DEFAULT_WARN_RESPONSE_SIZE
= 100 * 1024 * 1024;
195 protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH
= 1000;
196 protected static final String TRACE_LOG_MAX_LENGTH
= "hbase.ipc.trace.log.max.length";
197 protected static final String KEY_WORD_TRUNCATED
= " <TRUNCATED>";
199 protected static final Gson GSON
= GsonUtil
.createGsonWithDisableHtmlEscaping().create();
201 protected final int maxRequestSize
;
202 protected final int warnResponseTime
;
203 protected final int warnResponseSize
;
205 protected final int minClientRequestTimeout
;
207 protected final Server server
;
208 protected final List
<BlockingServiceAndInterface
> services
;
210 protected final RpcScheduler scheduler
;
212 protected UserProvider userProvider
;
214 protected final ByteBuffAllocator bbAllocator
;
216 protected volatile boolean allowFallbackToSimpleAuth
;
219 * Used to get details for scan with a scanner_id<br/>
220 * TODO try to figure out a better way and remove reference from regionserver package later.
222 private RSRpcServices rsRpcServices
;
226 * Use to add online slowlog responses
228 private NamedQueueRecorder namedQueueRecorder
;
231 protected interface CallCleanup
{
236 * Datastructure for passing a {@link BlockingService} and its associated class of
237 * protobuf service interface. For example, a server that fielded what is defined
238 * in the client protobuf service would pass in an implementation of the client blocking service
239 * and then its ClientService.BlockingInterface.class. Used checking connection setup.
241 public static class BlockingServiceAndInterface
{
242 private final BlockingService service
;
243 private final Class
<?
> serviceInterface
;
244 public BlockingServiceAndInterface(final BlockingService service
,
245 final Class
<?
> serviceInterface
) {
246 this.service
= service
;
247 this.serviceInterface
= serviceInterface
;
249 public Class
<?
> getServiceInterface() {
250 return this.serviceInterface
;
252 public BlockingService
getBlockingService() {
258 * Constructs a server listening on the named port and address.
259 * @param server hosting instance of {@link Server}. We will do authentications if an
260 * instance else pass null for no authentication check.
261 * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
262 * @param services A list of services.
263 * @param bindAddress Where to listen
266 * @param reservoirEnabled Enable ByteBufferPool or not.
268 public RpcServer(final Server server
, final String name
,
269 final List
<BlockingServiceAndInterface
> services
,
270 final InetSocketAddress bindAddress
, Configuration conf
,
271 RpcScheduler scheduler
, boolean reservoirEnabled
) throws IOException
{
272 this.bbAllocator
= ByteBuffAllocator
.create(conf
, reservoirEnabled
);
273 this.server
= server
;
274 this.services
= services
;
275 this.bindAddress
= bindAddress
;
277 // See declaration above for documentation on what this size is.
278 this.maxQueueSizeInBytes
=
279 this.conf
.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE
);
281 this.warnResponseTime
= conf
.getInt(WARN_RESPONSE_TIME
, DEFAULT_WARN_RESPONSE_TIME
);
282 this.warnResponseSize
= conf
.getInt(WARN_RESPONSE_SIZE
, DEFAULT_WARN_RESPONSE_SIZE
);
283 this.minClientRequestTimeout
= conf
.getInt(MIN_CLIENT_REQUEST_TIMEOUT
,
284 DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT
);
285 this.maxRequestSize
= conf
.getInt(MAX_REQUEST_SIZE
, DEFAULT_MAX_REQUEST_SIZE
);
287 this.metrics
= new MetricsHBaseServer(name
, new MetricsHBaseServerWrapperImpl(this));
288 this.tcpNoDelay
= conf
.getBoolean("hbase.ipc.server.tcpnodelay", true);
289 this.tcpKeepAlive
= conf
.getBoolean("hbase.ipc.server.tcpkeepalive", true);
291 this.cellBlockBuilder
= new CellBlockBuilder(conf
);
293 this.authorize
= conf
.getBoolean(HADOOP_SECURITY_AUTHORIZATION
, false);
294 this.userProvider
= UserProvider
.instantiate(conf
);
295 this.isSecurityEnabled
= userProvider
.isHBaseSecurityEnabled();
296 if (isSecurityEnabled
) {
297 saslProps
= SaslUtil
.initSaslProperties(conf
.get("hbase.rpc.protection",
298 QualityOfProtection
.AUTHENTICATION
.name().toLowerCase(Locale
.ROOT
)));
300 saslProps
= Collections
.emptyMap();
303 this.isOnlineLogProviderEnabled
= conf
.getBoolean(HConstants
.SLOW_LOG_BUFFER_ENABLED_KEY
,
304 HConstants
.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED
);
305 this.scheduler
= scheduler
;
309 public void onConfigurationChange(Configuration newConf
) {
310 initReconfigurable(newConf
);
311 if (scheduler
instanceof ConfigurationObserver
) {
312 ((ConfigurationObserver
) scheduler
).onConfigurationChange(newConf
);
315 refreshAuthManager(newConf
, new HBasePolicyProvider());
319 protected void initReconfigurable(Configuration confToLoad
) {
320 this.allowFallbackToSimpleAuth
= confToLoad
.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH
, false);
321 if (isSecurityEnabled
&& allowFallbackToSimpleAuth
) {
322 LOG
.warn("********* WARNING! *********");
323 LOG
.warn("This server is configured to allow connections from INSECURE clients");
324 LOG
.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH
+ " = true).");
325 LOG
.warn("While this option is enabled, client identities cannot be secured, and user");
326 LOG
.warn("impersonation is possible!");
327 LOG
.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
328 LOG
.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH
+ " = false in hbase-site.xml");
329 LOG
.warn("****************************");
333 Configuration
getConf() {
338 public boolean isStarted() {
343 public synchronized void refreshAuthManager(Configuration conf
, PolicyProvider pp
) {
344 // Ignore warnings that this should be accessed in a static way instead of via an instance;
345 // it'll break if you go via static route.
346 System
.setProperty("hadoop.policy.file", "hbase-policy.xml");
347 this.authManager
.refresh(conf
, pp
);
348 LOG
.info("Refreshed hbase-policy.xml successfully");
349 ProxyUsers
.refreshSuperUserGroupsConfiguration(conf
);
350 LOG
.info("Refreshed super and proxy users successfully");
353 protected AuthenticationTokenSecretManager
createSecretManager() {
354 if (!isSecurityEnabled
) return null;
355 if (server
== null) return null;
356 Configuration conf
= server
.getConfiguration();
357 long keyUpdateInterval
=
358 conf
.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
360 conf
.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
361 return new AuthenticationTokenSecretManager(conf
, server
.getZooKeeper(),
362 server
.getServerName().toString(), keyUpdateInterval
, maxAge
);
365 public SecretManager
<?
extends TokenIdentifier
> getSecretManager() {
366 return this.secretManager
;
369 @SuppressWarnings("unchecked")
370 public void setSecretManager(SecretManager
<?
extends TokenIdentifier
> secretManager
) {
371 this.secretManager
= (SecretManager
<TokenIdentifier
>) secretManager
;
375 * This is a server side method, which is invoked over RPC. On success
376 * the return response has protobuf response payload. On failure, the
377 * exception name and the stack trace are returned in the protobuf response.
380 public Pair
<Message
, CellScanner
> call(RpcCall call
,
381 MonitoredRPCHandler status
) throws IOException
{
383 MethodDescriptor md
= call
.getMethod();
384 Message param
= call
.getParam();
385 status
.setRPC(md
.getName(), new Object
[]{param
},
386 call
.getReceiveTime());
387 // TODO: Review after we add in encoded data blocks.
388 status
.setRPCPacket(param
);
389 status
.resume("Servicing call");
390 //get an instance of the method arg type
391 HBaseRpcController controller
= new HBaseRpcControllerImpl(call
.getCellScanner());
392 controller
.setCallTimeout(call
.getTimeout());
393 Message result
= call
.getService().callBlockingMethod(md
, controller
, param
);
394 long receiveTime
= call
.getReceiveTime();
395 long startTime
= call
.getStartTime();
396 long endTime
= EnvironmentEdgeManager
.currentTime();
397 int processingTime
= (int) (endTime
- startTime
);
398 int qTime
= (int) (startTime
- receiveTime
);
399 int totalTime
= (int) (endTime
- receiveTime
);
400 if (LOG
.isTraceEnabled()) {
401 LOG
.trace(CurCall
.get().toString() +
402 ", response " + TextFormat
.shortDebugString(result
) +
403 " queueTime: " + qTime
+
404 " processingTime: " + processingTime
+
405 " totalTime: " + totalTime
);
407 // Use the raw request call size for now.
408 long requestSize
= call
.getSize();
409 long responseSize
= result
.getSerializedSize();
410 if (call
.isClientCellBlockSupported()) {
411 // Include the payload size in HBaseRpcController
412 responseSize
+= call
.getResponseCellSize();
415 metrics
.dequeuedCall(qTime
);
416 metrics
.processedCall(processingTime
);
417 metrics
.totalCall(totalTime
);
418 metrics
.receivedRequest(requestSize
);
419 metrics
.sentResponse(responseSize
);
420 // log any RPC responses that are slower than the configured warn
421 // response time or larger than configured warning size
422 boolean tooSlow
= (processingTime
> warnResponseTime
&& warnResponseTime
> -1);
423 boolean tooLarge
= (responseSize
> warnResponseSize
&& warnResponseSize
> -1);
424 if (tooSlow
|| tooLarge
) {
425 final String userName
= call
.getRequestUserName().orElse(StringUtils
.EMPTY
);
426 // when tagging, we let TooLarge trump TooSmall to keep output simple
427 // note that large responses will often also be slow.
429 md
.getName(), md
.getName() + "(" + param
.getClass().getName() + ")",
431 status
.getClient(), startTime
, processingTime
, qTime
,
432 responseSize
, userName
);
433 if (this.namedQueueRecorder
!= null && this.isOnlineLogProviderEnabled
) {
434 // send logs to ring buffer owned by slowLogRecorder
435 final String className
=
436 server
== null ? StringUtils
.EMPTY
: server
.getClass().getSimpleName();
437 this.namedQueueRecorder
.addRecord(
438 new RpcLogDetails(call
, param
, status
.getClient(), responseSize
, className
, tooSlow
,
442 return new Pair
<>(result
, controller
.cellScanner());
443 } catch (Throwable e
) {
444 // The above callBlockingMethod will always return a SE. Strip the SE wrapper before
445 // putting it on the wire. Its needed to adhere to the pb Service Interface but we don't
446 // need to pass it over the wire.
447 if (e
instanceof ServiceException
) {
448 if (e
.getCause() == null) {
449 LOG
.debug("Caught a ServiceException with null cause", e
);
455 // increment the number of requests that were exceptions.
456 metrics
.exception(e
);
458 if (e
instanceof LinkageError
) throw new DoNotRetryIOException(e
);
459 if (e
instanceof IOException
) throw (IOException
)e
;
460 LOG
.error("Unexpected throwable object ", e
);
461 throw new IOException(e
.getMessage(), e
);
466 * Logs an RPC response to the LOG file, producing valid JSON objects for
468 * @param param The parameters received in the call.
469 * @param methodName The name of the method invoked
470 * @param call The string representation of the call
471 * @param tooLarge To indicate if the event is tooLarge
472 * @param tooSlow To indicate if the event is tooSlow
473 * @param clientAddress The address of the client who made this call.
474 * @param startTime The time that the call was initiated, in ms.
475 * @param processingTime The duration that the call took to run, in ms.
476 * @param qTime The duration that the call spent on the queue
477 * prior to being initiated, in ms.
478 * @param responseSize The size in bytes of the response buffer.
479 * @param userName UserName of the current RPC Call
481 void logResponse(Message param
, String methodName
, String call
, boolean tooLarge
,
482 boolean tooSlow
, String clientAddress
, long startTime
, int processingTime
, int qTime
,
483 long responseSize
, String userName
) {
484 final String className
= server
== null ? StringUtils
.EMPTY
:
485 server
.getClass().getSimpleName();
486 // base information that is reported regardless of type of call
487 Map
<String
, Object
> responseInfo
= new HashMap
<>();
488 responseInfo
.put("starttimems", startTime
);
489 responseInfo
.put("processingtimems", processingTime
);
490 responseInfo
.put("queuetimems", qTime
);
491 responseInfo
.put("responsesize", responseSize
);
492 responseInfo
.put("client", clientAddress
);
493 responseInfo
.put("class", className
);
494 responseInfo
.put("method", methodName
);
495 responseInfo
.put("call", call
);
496 // The params could be really big, make sure they don't kill us at WARN
497 String stringifiedParam
= ProtobufUtil
.getShortTextFormat(param
);
498 if (stringifiedParam
.length() > 150) {
499 // Truncate to 1000 chars if TRACE is on, else to 150 chars
500 stringifiedParam
= truncateTraceLog(stringifiedParam
);
502 responseInfo
.put("param", stringifiedParam
);
503 if (param
instanceof ClientProtos
.ScanRequest
&& rsRpcServices
!= null) {
504 ClientProtos
.ScanRequest request
= ((ClientProtos
.ScanRequest
) param
);
506 if (request
.hasScannerId()) {
507 long scannerId
= request
.getScannerId();
508 scanDetails
= rsRpcServices
.getScanDetailsWithId(scannerId
);
510 scanDetails
= rsRpcServices
.getScanDetailsWithRequest(request
);
512 if (scanDetails
!= null) {
513 responseInfo
.put("scandetails", scanDetails
);
516 if (param
instanceof ClientProtos
.MultiRequest
) {
518 int numMutations
= 0;
519 int numServiceCalls
= 0;
520 ClientProtos
.MultiRequest multi
= (ClientProtos
.MultiRequest
)param
;
521 for (ClientProtos
.RegionAction regionAction
: multi
.getRegionActionList()) {
522 for (ClientProtos
.Action action
: regionAction
.getActionList()) {
523 if (action
.hasMutation()) {
526 if (action
.hasGet()) {
529 if (action
.hasServiceCall()) {
534 responseInfo
.put(MULTI_GETS
, numGets
);
535 responseInfo
.put(MULTI_MUTATIONS
, numMutations
);
536 responseInfo
.put(MULTI_SERVICE_CALLS
, numServiceCalls
);
538 final String tag
= (tooLarge
&& tooSlow
) ?
"TooLarge & TooSlow"
539 : (tooSlow ?
"TooSlow" : "TooLarge");
540 LOG
.warn("(response" + tag
+ "): " + GSON
.toJson(responseInfo
));
545 * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
546 * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
547 * @param strParam stringifiedParam to be truncated
548 * @return truncated trace log string
550 String
truncateTraceLog(String strParam
) {
551 if (LOG
.isTraceEnabled()) {
552 int traceLogMaxLength
= getConf().getInt(TRACE_LOG_MAX_LENGTH
, DEFAULT_TRACE_LOG_MAX_LENGTH
);
553 int truncatedLength
=
554 strParam
.length() < traceLogMaxLength ? strParam
.length() : traceLogMaxLength
;
555 String truncatedFlag
= truncatedLength
== strParam
.length() ?
"" : KEY_WORD_TRUNCATED
;
556 return strParam
.subSequence(0, truncatedLength
) + truncatedFlag
;
558 return strParam
.subSequence(0, 150) + KEY_WORD_TRUNCATED
;
562 * Set the handler for calling out of RPC for error conditions.
563 * @param handler the handler implementation
566 public void setErrorHandler(HBaseRPCErrorHandler handler
) {
567 this.errorHandler
= handler
;
571 public HBaseRPCErrorHandler
getErrorHandler() {
572 return this.errorHandler
;
576 * Returns the metrics instance for reporting RPC call statistics
579 public MetricsHBaseServer
getMetrics() {
584 public void addCallSize(final long diff
) {
585 this.callQueueSizeInBytes
.add(diff
);
589 * Authorize the incoming client connection.
590 * @param user client user
591 * @param connection incoming connection
592 * @param addr InetAddress of incoming connection
593 * @throws AuthorizationException when the client isn't authorized to talk the protocol
595 public synchronized void authorize(UserGroupInformation user
, ConnectionHeader connection
,
596 InetAddress addr
) throws AuthorizationException
{
598 Class
<?
> c
= getServiceInterface(services
, connection
.getServiceName());
599 authManager
.authorize(user
, c
, getConf(), addr
);
604 * When the read or write buffer size is larger than this limit, i/o will be
605 * done in chunks of this size. Most RPC requests and responses would be
608 protected static final int NIO_BUFFER_LIMIT
= 64 * 1024; //should not be more than 64KB.
611 * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
612 * If the amount of data is large, it writes to channel in smaller chunks.
613 * This is to avoid jdk from creating many direct buffers as the size of
614 * ByteBuffer increases. There should not be any performance degredation.
616 * @param channel writable byte channel to write on
617 * @param buffer buffer to write
618 * @return number of bytes written
619 * @throws java.io.IOException e
620 * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
622 protected int channelRead(ReadableByteChannel channel
,
623 ByteBuffer buffer
) throws IOException
{
625 int count
= (buffer
.remaining() <= NIO_BUFFER_LIMIT
) ?
626 channel
.read(buffer
) : channelIO(channel
, null, buffer
);
628 metrics
.receivedBytes(count
);
634 * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
635 * Only one of readCh or writeCh should be non-null.
637 * @param readCh read channel
638 * @param writeCh write channel
639 * @param buf buffer to read or write into/out of
640 * @return bytes written
641 * @throws java.io.IOException e
642 * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
644 private static int channelIO(ReadableByteChannel readCh
,
645 WritableByteChannel writeCh
,
646 ByteBuffer buf
) throws IOException
{
648 int originalLimit
= buf
.limit();
649 int initialRemaining
= buf
.remaining();
652 while (buf
.remaining() > 0) {
654 int ioSize
= Math
.min(buf
.remaining(), NIO_BUFFER_LIMIT
);
655 buf
.limit(buf
.position() + ioSize
);
657 ret
= (readCh
== null) ? writeCh
.write(buf
) : readCh
.read(buf
);
664 buf
.limit(originalLimit
);
668 int nBytes
= initialRemaining
- buf
.remaining();
669 return (nBytes
> 0) ? nBytes
: ret
;
673 * Needed for features such as delayed calls. We need to be able to store the current call
674 * so that we can complete it later or ask questions of what is supported by the current ongoing
676 * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
678 public static Optional
<RpcCall
> getCurrentCall() {
679 return Optional
.ofNullable(CurCall
.get());
682 public static boolean isInRpcCallContext() {
683 return CurCall
.get() != null;
687 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
688 * master's rpc call, it may generate new procedure and mutate the region which store procedure.
689 * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
690 * call to avoid the rpc check.
691 * @return the currently ongoing rpc call
693 public static Optional
<RpcCall
> unsetCurrentCall() {
694 Optional
<RpcCall
> rpcCall
= getCurrentCall();
700 * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
701 * rpc call back after mutate region.
703 public static void setCurrentCall(RpcCall rpcCall
) {
704 CurCall
.set(rpcCall
);
708 * Returns the user credentials associated with the current RPC request or not present if no
709 * credentials were provided.
712 public static Optional
<User
> getRequestUser() {
713 Optional
<RpcCall
> ctx
= getCurrentCall();
714 return ctx
.isPresent() ? ctx
.get().getRequestUser() : Optional
.empty();
718 * The number of open RPC conections
719 * @return the number of open rpc connections
721 abstract public int getNumOpenConnections();
724 * Returns the username for any user associated with the current RPC
725 * request or not present if no user is set.
727 public static Optional
<String
> getRequestUserName() {
728 return getRequestUser().map(User
::getShortName
);
732 * @return Address of remote client if a request is ongoing, else null
734 public static Optional
<InetAddress
> getRemoteAddress() {
735 return getCurrentCall().map(RpcCall
::getRemoteAddress
);
739 * @param serviceName Some arbitrary string that represents a 'service'.
740 * @param services Available service instances
741 * @return Matching BlockingServiceAndInterface pair
743 protected static BlockingServiceAndInterface
getServiceAndInterface(
744 final List
<BlockingServiceAndInterface
> services
, final String serviceName
) {
745 for (BlockingServiceAndInterface bs
: services
) {
746 if (bs
.getBlockingService().getDescriptorForType().getName().equals(serviceName
)) {
754 * @param serviceName Some arbitrary string that represents a 'service'.
755 * @param services Available services and their service interfaces.
756 * @return Service interface class for <code>serviceName</code>
758 protected static Class
<?
> getServiceInterface(
759 final List
<BlockingServiceAndInterface
> services
,
760 final String serviceName
) {
761 BlockingServiceAndInterface bsasi
=
762 getServiceAndInterface(services
, serviceName
);
763 return bsasi
== null?
null: bsasi
.getServiceInterface();
767 * @param serviceName Some arbitrary string that represents a 'service'.
768 * @param services Available services and their service interfaces.
769 * @return BlockingService that goes with the passed <code>serviceName</code>
771 protected static BlockingService
getService(
772 final List
<BlockingServiceAndInterface
> services
,
773 final String serviceName
) {
774 BlockingServiceAndInterface bsasi
=
775 getServiceAndInterface(services
, serviceName
);
776 return bsasi
== null?
null: bsasi
.getBlockingService();
779 protected static MonitoredRPCHandler
getStatus() {
780 // It is ugly the way we park status up in RpcServer. Let it be for now. TODO.
781 MonitoredRPCHandler status
= RpcServer
.MONITORED_RPC
.get();
782 if (status
!= null) {
785 status
= TaskMonitor
.get().createRPCStatus(Thread
.currentThread().getName());
786 status
.pause("Waiting for a call");
787 RpcServer
.MONITORED_RPC
.set(status
);
791 /** Returns the remote side ip address when invoked inside an RPC
792 * Returns null incase of an error.
793 * @return InetAddress
795 public static InetAddress
getRemoteIp() {
796 RpcCall call
= CurCall
.get();
798 return call
.getRemoteAddress();
804 public RpcScheduler
getScheduler() {
809 public ByteBuffAllocator
getByteBuffAllocator() {
810 return this.bbAllocator
;
814 public void setRsRpcServices(RSRpcServices rsRpcServices
) {
815 this.rsRpcServices
= rsRpcServices
;
819 public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder
) {
820 this.namedQueueRecorder
= namedQueueRecorder
;
823 protected boolean needAuthorization() {