2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.ipc
;
21 import static org
.apache
.hadoop
.hbase
.ipc
.IPCUtil
.toIOE
;
22 import static org
.apache
.hadoop
.hbase
.ipc
.IPCUtil
.wrapException
;
23 import static org
.apache
.hadoop
.hbase
.trace
.HBaseSemanticAttributes
.REMOTE_HOST_KEY
;
24 import static org
.apache
.hadoop
.hbase
.trace
.HBaseSemanticAttributes
.REMOTE_PORT_KEY
;
25 import static org
.apache
.hadoop
.hbase
.trace
.HBaseSemanticAttributes
.RPC_METHOD_KEY
;
26 import static org
.apache
.hadoop
.hbase
.trace
.HBaseSemanticAttributes
.RPC_SERVICE_KEY
;
28 import io
.opentelemetry
.api
.trace
.Span
;
29 import io
.opentelemetry
.api
.trace
.StatusCode
;
30 import io
.opentelemetry
.context
.Scope
;
31 import java
.io
.IOException
;
32 import java
.net
.SocketAddress
;
33 import java
.util
.Collection
;
34 import java
.util
.concurrent
.Executors
;
35 import java
.util
.concurrent
.ScheduledExecutorService
;
36 import java
.util
.concurrent
.ScheduledFuture
;
37 import java
.util
.concurrent
.TimeUnit
;
38 import java
.util
.concurrent
.atomic
.AtomicInteger
;
39 import org
.apache
.hadoop
.conf
.Configuration
;
40 import org
.apache
.hadoop
.hbase
.HConstants
;
41 import org
.apache
.hadoop
.hbase
.ServerName
;
42 import org
.apache
.hadoop
.hbase
.client
.MetricsConnection
;
43 import org
.apache
.hadoop
.hbase
.codec
.Codec
;
44 import org
.apache
.hadoop
.hbase
.codec
.KeyValueCodec
;
45 import org
.apache
.hadoop
.hbase
.net
.Address
;
46 import org
.apache
.hadoop
.hbase
.security
.User
;
47 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
48 import org
.apache
.hadoop
.hbase
.trace
.TraceUtil
;
49 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
50 import org
.apache
.hadoop
.hbase
.util
.PoolMap
;
51 import org
.apache
.hadoop
.hbase
.util
.Threads
;
52 import org
.apache
.hadoop
.io
.compress
.CompressionCodec
;
53 import org
.apache
.hadoop
.ipc
.RemoteException
;
54 import org
.apache
.yetus
.audience
.InterfaceAudience
;
55 import org
.slf4j
.Logger
;
56 import org
.slf4j
.LoggerFactory
;
58 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
59 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.CacheBuilder
;
60 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.CacheLoader
;
61 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.LoadingCache
;
62 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
63 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.BlockingRpcChannel
;
64 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
;
65 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
66 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcCallback
;
67 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcChannel
;
68 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
69 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
70 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.HashedWheelTimer
;
72 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
75 * Provides the basics for a RpcClient implementation like configuration and Logging.
77 * Locking schema of the current IPC implementation
79 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
81 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
82 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
83 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
84 * of how to deal with cancel.</li>
85 * <li>For connection implementation, the construction of a connection should be as fast as possible
86 * because the creation is protected under a lock. Connect to remote side when needed. There is no
87 * forced locking schema for a connection implementation.</li>
88 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
89 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
90 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
91 * of the callbacks are free to hold any lock.</li>
95 @InterfaceAudience.Private
96 public abstract class AbstractRpcClient
<T
extends RpcConnection
> implements RpcClient
{
97 // Log level is being changed in tests
98 public static final Logger LOG
= LoggerFactory
.getLogger(AbstractRpcClient
.class);
100 protected static final HashedWheelTimer WHEEL_TIMER
= new HashedWheelTimer(
101 new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true)
102 .setUncaughtExceptionHandler(Threads
.LOGGING_EXCEPTION_HANDLER
).build(),
103 10, TimeUnit
.MILLISECONDS
);
105 private static final ScheduledExecutorService IDLE_CONN_SWEEPER
=
106 Executors
.newScheduledThreadPool(1,
107 new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true)
108 .setUncaughtExceptionHandler(Threads
.LOGGING_EXCEPTION_HANDLER
).build());
110 private boolean running
= true; // if client runs
112 protected final Configuration conf
;
113 protected final String clusterId
;
114 protected final SocketAddress localAddr
;
115 protected final MetricsConnection metrics
;
117 protected final UserProvider userProvider
;
118 protected final CellBlockBuilder cellBlockBuilder
;
120 protected final int minIdleTimeBeforeClose
; // if the connection is idle for more than this
121 // time (in ms), it will be closed at any moment.
122 protected final int maxRetries
; // the max. no. of retries for socket connections
123 protected final long failureSleep
; // Time to sleep before retry on failure.
124 protected final boolean tcpNoDelay
; // if T then disable Nagle's Algorithm
125 protected final boolean tcpKeepAlive
; // if T then use keepalives
126 protected final Codec codec
;
127 protected final CompressionCodec compressor
;
128 protected final boolean fallbackAllowed
;
130 protected final FailedServers failedServers
;
132 protected final int connectTO
;
133 protected final int readTO
;
134 protected final int writeTO
;
136 private final PoolMap
<ConnectionId
, T
> connections
;
138 private final AtomicInteger callIdCnt
= new AtomicInteger(0);
140 private final ScheduledFuture
<?
> cleanupIdleConnectionTask
;
142 private int maxConcurrentCallsPerServer
;
144 private static final LoadingCache
<Address
, AtomicInteger
> concurrentCounterCache
=
145 CacheBuilder
.newBuilder().expireAfterAccess(1, TimeUnit
.HOURS
).
146 build(new CacheLoader
<Address
, AtomicInteger
>() {
147 @Override public AtomicInteger
load(Address key
) throws Exception
{
148 return new AtomicInteger(0);
153 * Construct an IPC client for the cluster <code>clusterId</code>
154 * @param conf configuration
155 * @param clusterId the cluster id
156 * @param localAddr client socket bind address.
157 * @param metrics the connection metrics
159 public AbstractRpcClient(Configuration conf
, String clusterId
, SocketAddress localAddr
,
160 MetricsConnection metrics
) {
161 this.userProvider
= UserProvider
.instantiate(conf
);
162 this.localAddr
= localAddr
;
163 this.tcpKeepAlive
= conf
.getBoolean("hbase.ipc.client.tcpkeepalive", true);
164 this.clusterId
= clusterId
!= null ? clusterId
: HConstants
.CLUSTER_ID_DEFAULT
;
165 this.failureSleep
= conf
.getLong(HConstants
.HBASE_CLIENT_PAUSE
,
166 HConstants
.DEFAULT_HBASE_CLIENT_PAUSE
);
167 this.maxRetries
= conf
.getInt("hbase.ipc.client.connect.max.retries", 0);
168 this.tcpNoDelay
= conf
.getBoolean("hbase.ipc.client.tcpnodelay", true);
169 this.cellBlockBuilder
= new CellBlockBuilder(conf
);
171 this.minIdleTimeBeforeClose
= conf
.getInt(IDLE_TIME
, 120000); // 2 minutes
173 this.codec
= getCodec();
174 this.compressor
= getCompressor(conf
);
175 this.fallbackAllowed
= conf
.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY
,
176 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT
);
177 this.failedServers
= new FailedServers(conf
);
178 this.connectTO
= conf
.getInt(SOCKET_TIMEOUT_CONNECT
, DEFAULT_SOCKET_TIMEOUT_CONNECT
);
179 this.readTO
= conf
.getInt(SOCKET_TIMEOUT_READ
, DEFAULT_SOCKET_TIMEOUT_READ
);
180 this.writeTO
= conf
.getInt(SOCKET_TIMEOUT_WRITE
, DEFAULT_SOCKET_TIMEOUT_WRITE
);
181 this.metrics
= metrics
;
182 this.maxConcurrentCallsPerServer
= conf
.getInt(
183 HConstants
.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
,
184 HConstants
.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
);
186 this.connections
= new PoolMap
<>(getPoolType(conf
), getPoolSize(conf
));
188 this.cleanupIdleConnectionTask
= IDLE_CONN_SWEEPER
.scheduleAtFixedRate(new Runnable() {
192 cleanupIdleConnections();
194 }, minIdleTimeBeforeClose
, minIdleTimeBeforeClose
, TimeUnit
.MILLISECONDS
);
196 if (LOG
.isDebugEnabled()) {
197 LOG
.debug("Codec=" + this.codec
+ ", compressor=" + this.compressor
+ ", tcpKeepAlive="
198 + this.tcpKeepAlive
+ ", tcpNoDelay=" + this.tcpNoDelay
+ ", connectTO=" + this.connectTO
199 + ", readTO=" + this.readTO
+ ", writeTO=" + this.writeTO
+ ", minIdleTimeBeforeClose="
200 + this.minIdleTimeBeforeClose
+ ", maxRetries=" + this.maxRetries
+ ", fallbackAllowed="
201 + this.fallbackAllowed
+ ", bind address="
202 + (this.localAddr
!= null ?
this.localAddr
: "null"));
206 private void cleanupIdleConnections() {
207 long closeBeforeTime
= EnvironmentEdgeManager
.currentTime() - minIdleTimeBeforeClose
;
208 synchronized (connections
) {
209 for (T conn
: connections
.values()) {
210 // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
211 // connection itself has already shutdown. The latter check is because we may still
212 // have some pending calls on connection so we should not shutdown the connection outside.
213 // The connection itself will disconnect if there is no pending call for maxIdleTime.
214 if (conn
.getLastTouched() < closeBeforeTime
&& !conn
.isActive()) {
215 if (LOG
.isTraceEnabled()) {
216 LOG
.trace("Cleanup idle connection to {}", conn
.remoteId().getAddress());
218 connections
.remove(conn
.remoteId(), conn
);
219 conn
.cleanupConnection();
225 public static String
getDefaultCodec(final Configuration c
) {
226 // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
227 // Configuration will complain -- then no default codec (and we'll pb everything). Else
228 // default is KeyValueCodec
229 return c
.get(DEFAULT_CODEC_CLASS
, KeyValueCodec
.class.getCanonicalName());
233 * Encapsulate the ugly casting and RuntimeException conversion in private method.
234 * @return Codec to use on this client.
237 // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
238 // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
239 String className
= conf
.get(HConstants
.RPC_CODEC_CONF_KEY
, getDefaultCodec(this.conf
));
240 if (className
== null || className
.length() == 0) {
244 return (Codec
) Class
.forName(className
).getDeclaredConstructor().newInstance();
245 } catch (Exception e
) {
246 throw new RuntimeException("Failed getting codec " + className
, e
);
251 public boolean hasCellBlockSupport() {
252 return this.codec
!= null;
255 // for writing tests that want to throw exception when connecting.
256 boolean isTcpNoDelay() {
261 * Encapsulate the ugly casting and RuntimeException conversion in private method.
262 * @param conf configuration
263 * @return The compressor to use on this client.
265 private static CompressionCodec
getCompressor(final Configuration conf
) {
266 String className
= conf
.get("hbase.client.rpc.compressor", null);
267 if (className
== null || className
.isEmpty()) {
271 return (CompressionCodec
) Class
.forName(className
).getDeclaredConstructor().newInstance();
272 } catch (Exception e
) {
273 throw new RuntimeException("Failed getting compressor " + className
, e
);
278 * Return the pool type specified in the configuration, which must be set to either
279 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
280 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
281 * former. For applications with many user threads, use a small round-robin pool. For applications
282 * with few user threads, you may want to try using a thread-local pool. In any case, the number
283 * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
284 * system's hard limit on the number of connections.
285 * @param config configuration
286 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
287 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
289 private static PoolMap
.PoolType
getPoolType(Configuration config
) {
290 return PoolMap
.PoolType
.valueOf(config
.get(HConstants
.HBASE_CLIENT_IPC_POOL_TYPE
),
291 PoolMap
.PoolType
.RoundRobin
);
295 * Return the pool size specified in the configuration, which is applicable only if the pool type
296 * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
297 * @param config configuration
298 * @return the maximum pool size
300 private static int getPoolSize(Configuration config
) {
301 int poolSize
= config
.getInt(HConstants
.HBASE_CLIENT_IPC_POOL_SIZE
, 1);
304 LOG
.warn("{} must be positive. Using default value: 1", HConstants
.HBASE_CLIENT_IPC_POOL_SIZE
);
311 private int nextCallId() {
314 id
= callIdCnt
.get();
315 next
= id
< Integer
.MAX_VALUE ? id
+ 1 : 0;
316 } while (!callIdCnt
.compareAndSet(id
, next
));
321 * Make a blocking call. Throws exceptions if there are network problems or if the remote code
322 * threw an exception.
323 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
324 * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
325 * new Connection each time.
326 * @return A pair with the Message response and the Cell data (if any).
328 private Message
callBlockingMethod(Descriptors
.MethodDescriptor md
, HBaseRpcController hrc
,
329 Message param
, Message returnType
, final User ticket
, final Address isa
)
330 throws ServiceException
{
331 BlockingRpcCallback
<Message
> done
= new BlockingRpcCallback
<>();
332 callMethod(md
, hrc
, param
, returnType
, ticket
, isa
, done
);
336 } catch (IOException e
) {
337 throw new ServiceException(e
);
340 throw new ServiceException(hrc
.getFailed());
347 * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
348 * given host/port are reused.
350 private T
getConnection(ConnectionId remoteId
) throws IOException
{
351 if (failedServers
.isFailedServer(remoteId
.getAddress())) {
352 if (LOG
.isDebugEnabled()) {
353 LOG
.debug("Not trying to connect to " + remoteId
.getAddress()
354 + " this server is in the failed servers list");
356 throw new FailedServerException(
357 "This server is in the failed servers list: " + remoteId
.getAddress());
360 synchronized (connections
) {
362 throw new StoppedRpcClientException();
364 conn
= connections
.getOrCreate(remoteId
, () -> createConnection(remoteId
));
365 conn
.setLastTouched(EnvironmentEdgeManager
.currentTime());
373 protected abstract T
createConnection(ConnectionId remoteId
) throws IOException
;
375 private void onCallFinished(Call call
, HBaseRpcController hrc
, Address addr
,
376 RpcCallback
<Message
> callback
) {
377 call
.callStats
.setCallTimeMs(EnvironmentEdgeManager
.currentTime() - call
.getStartTime());
378 if (metrics
!= null) {
379 metrics
.updateRpc(call
.md
, call
.param
, call
.callStats
);
381 if (LOG
.isTraceEnabled()) {
383 "Call: " + call
.md
.getName() + ", callTime: " + call
.callStats
.getCallTimeMs() + "ms");
385 if (call
.error
!= null) {
386 if (call
.error
instanceof RemoteException
) {
387 call
.error
.fillInStackTrace();
388 hrc
.setFailed(call
.error
);
390 hrc
.setFailed(wrapException(addr
, hrc
.getRegionInfo(), call
.error
));
394 hrc
.setDone(call
.cells
);
395 callback
.run(call
.response
);
399 private Call
callMethod(final Descriptors
.MethodDescriptor md
, final HBaseRpcController hrc
,
400 final Message param
, Message returnType
, final User ticket
, final Address addr
,
401 final RpcCallback
<Message
> callback
) {
402 Span span
= TraceUtil
.createClientSpan("RpcClient.callMethod")
403 .setAttribute(RPC_SERVICE_KEY
, md
.getService().getName())
404 .setAttribute(RPC_METHOD_KEY
, md
.getName())
405 .setAttribute(REMOTE_HOST_KEY
, addr
.getHostName())
406 .setAttribute(REMOTE_PORT_KEY
, addr
.getPort());
407 try (Scope scope
= span
.makeCurrent()) {
408 final MetricsConnection
.CallStats cs
= MetricsConnection
.newCallStats();
409 cs
.setStartTime(EnvironmentEdgeManager
.currentTime());
411 if (param
instanceof ClientProtos
.MultiRequest
) {
412 ClientProtos
.MultiRequest req
= (ClientProtos
.MultiRequest
) param
;
414 for (ClientProtos
.RegionAction regionAction
: req
.getRegionActionList()) {
415 numActions
+= regionAction
.getActionCount();
418 cs
.setNumActionsPerServer(numActions
);
421 final AtomicInteger counter
= concurrentCounterCache
.getUnchecked(addr
);
422 Call call
= new Call(nextCallId(), md
, param
, hrc
.cellScanner(), returnType
,
423 hrc
.getCallTimeout(), hrc
.getPriority(), new RpcCallback
<Call
>() {
425 public void run(Call call
) {
426 try (Scope scope
= call
.span
.makeCurrent()) {
427 counter
.decrementAndGet();
428 onCallFinished(call
, hrc
, addr
, callback
);
431 TraceUtil
.setError(span
, hrc
.getFailed());
433 span
.setStatus(StatusCode
.OK
);
439 ConnectionId remoteId
= new ConnectionId(ticket
, md
.getService().getName(), addr
);
440 int count
= counter
.incrementAndGet();
442 if (count
> maxConcurrentCallsPerServer
) {
443 throw new ServerTooBusyException(addr
, count
);
445 cs
.setConcurrentCallsPerServer(count
);
446 T connection
= getConnection(remoteId
);
447 connection
.sendRequest(call
, hrc
);
448 } catch (Exception e
) {
449 call
.setException(toIOE(e
));
456 private static Address
createAddr(ServerName sn
) {
457 return Address
.fromParts(sn
.getHostname(), sn
.getPort());
461 * Interrupt the connections to the given ip:port server. This should be called if the server is
462 * known as actually dead. This will not prevent current operation to be retried, and, depending
463 * on their own behavior, they may retry on the same server. This can be a feature, for example at
464 * startup. In any case, they're likely to get connection refused (if the process died) or no
465 * route to host: i.e. their next retries should be faster and with a safe exception.
468 public void cancelConnections(ServerName sn
) {
469 synchronized (connections
) {
470 for (T connection
: connections
.values()) {
471 ConnectionId remoteId
= connection
.remoteId();
472 if (remoteId
.getAddress().getPort() == sn
.getPort()
473 && remoteId
.getAddress().getHostName().equals(sn
.getHostname())) {
474 LOG
.info("The server on " + sn
.toString() + " is dead - stopping the connection "
475 + connection
.remoteId
);
476 connections
.remove(remoteId
, connection
);
477 connection
.shutdown();
478 connection
.cleanupConnection();
484 * Configure an hbase rpccontroller
485 * @param controller to configure
486 * @param channelOperationTimeout timeout for operation
487 * @return configured controller
489 static HBaseRpcController
configureHBaseRpcController(
490 RpcController controller
, int channelOperationTimeout
) {
491 HBaseRpcController hrc
;
492 if (controller
!= null && controller
instanceof HBaseRpcController
) {
493 hrc
= (HBaseRpcController
) controller
;
494 if (!hrc
.hasCallTimeout()) {
495 hrc
.setCallTimeout(channelOperationTimeout
);
498 hrc
= new HBaseRpcControllerImpl();
499 hrc
.setCallTimeout(channelOperationTimeout
);
504 protected abstract void closeInternal();
507 public void close() {
508 if (LOG
.isDebugEnabled()) {
509 LOG
.debug("Stopping rpc client");
511 Collection
<T
> connToClose
;
512 synchronized (connections
) {
517 connToClose
= connections
.values();
520 cleanupIdleConnectionTask
.cancel(true);
521 for (T conn
: connToClose
) {
525 for (T conn
: connToClose
) {
526 conn
.cleanupConnection();
531 public BlockingRpcChannel
createBlockingRpcChannel(final ServerName sn
, final User ticket
,
533 return new BlockingRpcChannelImplementation(this, createAddr(sn
), ticket
, rpcTimeout
);
537 public RpcChannel
createRpcChannel(ServerName sn
, User user
, int rpcTimeout
) {
538 return new RpcChannelImplementation(this, createAddr(sn
), user
, rpcTimeout
);
541 private static class AbstractRpcChannel
{
543 protected final Address addr
;
545 protected final AbstractRpcClient
<?
> rpcClient
;
547 protected final User ticket
;
549 protected final int rpcTimeout
;
551 protected AbstractRpcChannel(AbstractRpcClient
<?
> rpcClient
, Address addr
,
552 User ticket
, int rpcTimeout
) {
554 this.rpcClient
= rpcClient
;
555 this.ticket
= ticket
;
556 this.rpcTimeout
= rpcTimeout
;
560 * Configure an rpc controller
561 * @param controller to configure
562 * @return configured rpc controller
564 protected HBaseRpcController
configureRpcController(RpcController controller
) {
565 HBaseRpcController hrc
;
566 // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
567 // side. And now we may use ServerRpcController.
568 if (controller
!= null && controller
instanceof HBaseRpcController
) {
569 hrc
= (HBaseRpcController
) controller
;
570 if (!hrc
.hasCallTimeout()) {
571 hrc
.setCallTimeout(rpcTimeout
);
574 hrc
= new HBaseRpcControllerImpl();
575 hrc
.setCallTimeout(rpcTimeout
);
582 * Blocking rpc channel that goes via hbase rpc.
584 public static class BlockingRpcChannelImplementation
extends AbstractRpcChannel
585 implements BlockingRpcChannel
{
587 protected BlockingRpcChannelImplementation(AbstractRpcClient
<?
> rpcClient
,
588 Address addr
, User ticket
, int rpcTimeout
) {
589 super(rpcClient
, addr
, ticket
, rpcTimeout
);
593 public Message
callBlockingMethod(Descriptors
.MethodDescriptor md
, RpcController controller
,
594 Message param
, Message returnType
) throws ServiceException
{
595 return rpcClient
.callBlockingMethod(md
, configureRpcController(controller
), param
, returnType
,
601 * Async rpc channel that goes via hbase rpc.
603 public static class RpcChannelImplementation
extends AbstractRpcChannel
implements
606 protected RpcChannelImplementation(AbstractRpcClient
<?
> rpcClient
, Address addr
,
607 User ticket
, int rpcTimeout
) {
608 super(rpcClient
, addr
, ticket
, rpcTimeout
);
612 public void callMethod(Descriptors
.MethodDescriptor md
, RpcController controller
, Message param
,
613 Message returnType
, RpcCallback
<Message
> done
) {
614 HBaseRpcController configuredController
= configureRpcController(
615 Preconditions
.checkNotNull(controller
, "RpcController can not be null for async rpc call"));
616 // This method does not throw any exceptions, so the caller must provide a
617 // HBaseRpcController which is used to pass the exceptions.
618 this.rpcClient
.callMethod(md
, configuredController
, param
, returnType
, ticket
, addr
, done
);