HBASE-26474 Implement connection-level attributes (addendum)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / ipc / AbstractRpcClient.java
blob9b50ae2c70e94550ccc79d7b504909cb7d728b40
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.ipc;
21 import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
22 import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
23 import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_HOST_KEY;
24 import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_PORT_KEY;
25 import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY;
26 import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY;
28 import io.opentelemetry.api.trace.Span;
29 import io.opentelemetry.api.trace.StatusCode;
30 import io.opentelemetry.context.Scope;
31 import java.io.IOException;
32 import java.net.SocketAddress;
33 import java.util.Collection;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.client.MetricsConnection;
43 import org.apache.hadoop.hbase.codec.Codec;
44 import org.apache.hadoop.hbase.codec.KeyValueCodec;
45 import org.apache.hadoop.hbase.net.Address;
46 import org.apache.hadoop.hbase.security.User;
47 import org.apache.hadoop.hbase.security.UserProvider;
48 import org.apache.hadoop.hbase.trace.TraceUtil;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50 import org.apache.hadoop.hbase.util.PoolMap;
51 import org.apache.hadoop.hbase.util.Threads;
52 import org.apache.hadoop.io.compress.CompressionCodec;
53 import org.apache.hadoop.ipc.RemoteException;
54 import org.apache.yetus.audience.InterfaceAudience;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
59 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
60 import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
61 import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
62 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
63 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
64 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
65 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
66 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
67 import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
68 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
69 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
70 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
72 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
74 /**
75 * Provides the basics for a RpcClient implementation like configuration and Logging.
76 * <p>
77 * Locking schema of the current IPC implementation
78 * <ul>
79 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
80 * connection.</li>
81 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
82 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
83 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
84 * of how to deal with cancel.</li>
85 * <li>For connection implementation, the construction of a connection should be as fast as possible
86 * because the creation is protected under a lock. Connect to remote side when needed. There is no
87 * forced locking schema for a connection implementation.</li>
88 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
89 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
90 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
91 * of the callbacks are free to hold any lock.</li>
92 * </ul>
93 * @since 2.0.0
95 @InterfaceAudience.Private
96 public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
97 // Log level is being changed in tests
98 public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
100 protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
101 new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true)
102 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
103 10, TimeUnit.MILLISECONDS);
105 private static final ScheduledExecutorService IDLE_CONN_SWEEPER =
106 Executors.newScheduledThreadPool(1,
107 new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true)
108 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
110 private boolean running = true; // if client runs
112 protected final Configuration conf;
113 protected final String clusterId;
114 protected final SocketAddress localAddr;
115 protected final MetricsConnection metrics;
117 protected final UserProvider userProvider;
118 protected final CellBlockBuilder cellBlockBuilder;
120 protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
121 // time (in ms), it will be closed at any moment.
122 protected final int maxRetries; // the max. no. of retries for socket connections
123 protected final long failureSleep; // Time to sleep before retry on failure.
124 protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
125 protected final boolean tcpKeepAlive; // if T then use keepalives
126 protected final Codec codec;
127 protected final CompressionCodec compressor;
128 protected final boolean fallbackAllowed;
130 protected final FailedServers failedServers;
132 protected final int connectTO;
133 protected final int readTO;
134 protected final int writeTO;
136 private final PoolMap<ConnectionId, T> connections;
138 private final AtomicInteger callIdCnt = new AtomicInteger(0);
140 private final ScheduledFuture<?> cleanupIdleConnectionTask;
142 private int maxConcurrentCallsPerServer;
144 private static final LoadingCache<Address, AtomicInteger> concurrentCounterCache =
145 CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
146 build(new CacheLoader<Address, AtomicInteger>() {
147 @Override public AtomicInteger load(Address key) throws Exception {
148 return new AtomicInteger(0);
153 * Construct an IPC client for the cluster <code>clusterId</code>
154 * @param conf configuration
155 * @param clusterId the cluster id
156 * @param localAddr client socket bind address.
157 * @param metrics the connection metrics
159 public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
160 MetricsConnection metrics) {
161 this.userProvider = UserProvider.instantiate(conf);
162 this.localAddr = localAddr;
163 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
164 this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
165 this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
166 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
167 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
168 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
169 this.cellBlockBuilder = new CellBlockBuilder(conf);
171 this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
172 this.conf = conf;
173 this.codec = getCodec();
174 this.compressor = getCompressor(conf);
175 this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
176 IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
177 this.failedServers = new FailedServers(conf);
178 this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
179 this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
180 this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
181 this.metrics = metrics;
182 this.maxConcurrentCallsPerServer = conf.getInt(
183 HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
184 HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
186 this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
188 this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
190 @Override
191 public void run() {
192 cleanupIdleConnections();
194 }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
196 if (LOG.isDebugEnabled()) {
197 LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
198 + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
199 + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
200 + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
201 + this.fallbackAllowed + ", bind address="
202 + (this.localAddr != null ? this.localAddr : "null"));
206 private void cleanupIdleConnections() {
207 long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
208 synchronized (connections) {
209 for (T conn : connections.values()) {
210 // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
211 // connection itself has already shutdown. The latter check is because we may still
212 // have some pending calls on connection so we should not shutdown the connection outside.
213 // The connection itself will disconnect if there is no pending call for maxIdleTime.
214 if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
215 if (LOG.isTraceEnabled()) {
216 LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress());
218 connections.remove(conn.remoteId(), conn);
219 conn.cleanupConnection();
225 public static String getDefaultCodec(final Configuration c) {
226 // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
227 // Configuration will complain -- then no default codec (and we'll pb everything). Else
228 // default is KeyValueCodec
229 return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
233 * Encapsulate the ugly casting and RuntimeException conversion in private method.
234 * @return Codec to use on this client.
236 Codec getCodec() {
237 // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
238 // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
239 String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
240 if (className == null || className.length() == 0) {
241 return null;
243 try {
244 return (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
245 } catch (Exception e) {
246 throw new RuntimeException("Failed getting codec " + className, e);
250 @Override
251 public boolean hasCellBlockSupport() {
252 return this.codec != null;
255 // for writing tests that want to throw exception when connecting.
256 boolean isTcpNoDelay() {
257 return tcpNoDelay;
261 * Encapsulate the ugly casting and RuntimeException conversion in private method.
262 * @param conf configuration
263 * @return The compressor to use on this client.
265 private static CompressionCodec getCompressor(final Configuration conf) {
266 String className = conf.get("hbase.client.rpc.compressor", null);
267 if (className == null || className.isEmpty()) {
268 return null;
270 try {
271 return (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance();
272 } catch (Exception e) {
273 throw new RuntimeException("Failed getting compressor " + className, e);
278 * Return the pool type specified in the configuration, which must be set to either
279 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
280 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
281 * former. For applications with many user threads, use a small round-robin pool. For applications
282 * with few user threads, you may want to try using a thread-local pool. In any case, the number
283 * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
284 * system's hard limit on the number of connections.
285 * @param config configuration
286 * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
287 * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
289 private static PoolMap.PoolType getPoolType(Configuration config) {
290 return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
291 PoolMap.PoolType.RoundRobin);
295 * Return the pool size specified in the configuration, which is applicable only if the pool type
296 * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
297 * @param config configuration
298 * @return the maximum pool size
300 private static int getPoolSize(Configuration config) {
301 int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
303 if (poolSize <= 0) {
304 LOG.warn("{} must be positive. Using default value: 1", HConstants.HBASE_CLIENT_IPC_POOL_SIZE);
305 return 1;
306 } else {
307 return poolSize;
311 private int nextCallId() {
312 int id, next;
313 do {
314 id = callIdCnt.get();
315 next = id < Integer.MAX_VALUE ? id + 1 : 0;
316 } while (!callIdCnt.compareAndSet(id, next));
317 return id;
321 * Make a blocking call. Throws exceptions if there are network problems or if the remote code
322 * threw an exception.
323 * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
324 * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
325 * new Connection each time.
326 * @return A pair with the Message response and the Cell data (if any).
328 private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
329 Message param, Message returnType, final User ticket, final Address isa)
330 throws ServiceException {
331 BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
332 callMethod(md, hrc, param, returnType, ticket, isa, done);
333 Message val;
334 try {
335 val = done.get();
336 } catch (IOException e) {
337 throw new ServiceException(e);
339 if (hrc.failed()) {
340 throw new ServiceException(hrc.getFailed());
341 } else {
342 return val;
347 * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
348 * given host/port are reused.
350 private T getConnection(ConnectionId remoteId) throws IOException {
351 if (failedServers.isFailedServer(remoteId.getAddress())) {
352 if (LOG.isDebugEnabled()) {
353 LOG.debug("Not trying to connect to " + remoteId.getAddress()
354 + " this server is in the failed servers list");
356 throw new FailedServerException(
357 "This server is in the failed servers list: " + remoteId.getAddress());
359 T conn;
360 synchronized (connections) {
361 if (!running) {
362 throw new StoppedRpcClientException();
364 conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId));
365 conn.setLastTouched(EnvironmentEdgeManager.currentTime());
367 return conn;
371 * Not connected.
373 protected abstract T createConnection(ConnectionId remoteId) throws IOException;
375 private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
376 RpcCallback<Message> callback) {
377 call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
378 if (metrics != null) {
379 metrics.updateRpc(call.md, call.param, call.callStats);
381 if (LOG.isTraceEnabled()) {
382 LOG.trace(
383 "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
385 if (call.error != null) {
386 if (call.error instanceof RemoteException) {
387 call.error.fillInStackTrace();
388 hrc.setFailed(call.error);
389 } else {
390 hrc.setFailed(wrapException(addr, hrc.getRegionInfo(), call.error));
392 callback.run(null);
393 } else {
394 hrc.setDone(call.cells);
395 callback.run(call.response);
399 private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
400 final Message param, Message returnType, final User ticket, final Address addr,
401 final RpcCallback<Message> callback) {
402 Span span = TraceUtil.createClientSpan("RpcClient.callMethod")
403 .setAttribute(RPC_SERVICE_KEY, md.getService().getName())
404 .setAttribute(RPC_METHOD_KEY, md.getName())
405 .setAttribute(REMOTE_HOST_KEY, addr.getHostName())
406 .setAttribute(REMOTE_PORT_KEY, addr.getPort());
407 try (Scope scope = span.makeCurrent()) {
408 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
409 cs.setStartTime(EnvironmentEdgeManager.currentTime());
411 if (param instanceof ClientProtos.MultiRequest) {
412 ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
413 int numActions = 0;
414 for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
415 numActions += regionAction.getActionCount();
418 cs.setNumActionsPerServer(numActions);
421 final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
422 Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
423 hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
424 @Override
425 public void run(Call call) {
426 try (Scope scope = call.span.makeCurrent()) {
427 counter.decrementAndGet();
428 onCallFinished(call, hrc, addr, callback);
429 } finally {
430 if (hrc.failed()) {
431 TraceUtil.setError(span, hrc.getFailed());
432 } else {
433 span.setStatus(StatusCode.OK);
435 span.end();
438 }, cs);
439 ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
440 int count = counter.incrementAndGet();
441 try {
442 if (count > maxConcurrentCallsPerServer) {
443 throw new ServerTooBusyException(addr, count);
445 cs.setConcurrentCallsPerServer(count);
446 T connection = getConnection(remoteId);
447 connection.sendRequest(call, hrc);
448 } catch (Exception e) {
449 call.setException(toIOE(e));
450 span.end();
452 return call;
456 private static Address createAddr(ServerName sn) {
457 return Address.fromParts(sn.getHostname(), sn.getPort());
461 * Interrupt the connections to the given ip:port server. This should be called if the server is
462 * known as actually dead. This will not prevent current operation to be retried, and, depending
463 * on their own behavior, they may retry on the same server. This can be a feature, for example at
464 * startup. In any case, they're likely to get connection refused (if the process died) or no
465 * route to host: i.e. their next retries should be faster and with a safe exception.
467 @Override
468 public void cancelConnections(ServerName sn) {
469 synchronized (connections) {
470 for (T connection : connections.values()) {
471 ConnectionId remoteId = connection.remoteId();
472 if (remoteId.getAddress().getPort() == sn.getPort()
473 && remoteId.getAddress().getHostName().equals(sn.getHostname())) {
474 LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
475 + connection.remoteId);
476 connections.remove(remoteId, connection);
477 connection.shutdown();
478 connection.cleanupConnection();
484 * Configure an hbase rpccontroller
485 * @param controller to configure
486 * @param channelOperationTimeout timeout for operation
487 * @return configured controller
489 static HBaseRpcController configureHBaseRpcController(
490 RpcController controller, int channelOperationTimeout) {
491 HBaseRpcController hrc;
492 if (controller != null && controller instanceof HBaseRpcController) {
493 hrc = (HBaseRpcController) controller;
494 if (!hrc.hasCallTimeout()) {
495 hrc.setCallTimeout(channelOperationTimeout);
497 } else {
498 hrc = new HBaseRpcControllerImpl();
499 hrc.setCallTimeout(channelOperationTimeout);
501 return hrc;
504 protected abstract void closeInternal();
506 @Override
507 public void close() {
508 if (LOG.isDebugEnabled()) {
509 LOG.debug("Stopping rpc client");
511 Collection<T> connToClose;
512 synchronized (connections) {
513 if (!running) {
514 return;
516 running = false;
517 connToClose = connections.values();
518 connections.clear();
520 cleanupIdleConnectionTask.cancel(true);
521 for (T conn : connToClose) {
522 conn.shutdown();
524 closeInternal();
525 for (T conn : connToClose) {
526 conn.cleanupConnection();
530 @Override
531 public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
532 int rpcTimeout) {
533 return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
536 @Override
537 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
538 return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
541 private static class AbstractRpcChannel {
543 protected final Address addr;
545 protected final AbstractRpcClient<?> rpcClient;
547 protected final User ticket;
549 protected final int rpcTimeout;
551 protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, Address addr,
552 User ticket, int rpcTimeout) {
553 this.addr = addr;
554 this.rpcClient = rpcClient;
555 this.ticket = ticket;
556 this.rpcTimeout = rpcTimeout;
560 * Configure an rpc controller
561 * @param controller to configure
562 * @return configured rpc controller
564 protected HBaseRpcController configureRpcController(RpcController controller) {
565 HBaseRpcController hrc;
566 // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
567 // side. And now we may use ServerRpcController.
568 if (controller != null && controller instanceof HBaseRpcController) {
569 hrc = (HBaseRpcController) controller;
570 if (!hrc.hasCallTimeout()) {
571 hrc.setCallTimeout(rpcTimeout);
573 } else {
574 hrc = new HBaseRpcControllerImpl();
575 hrc.setCallTimeout(rpcTimeout);
577 return hrc;
582 * Blocking rpc channel that goes via hbase rpc.
584 public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
585 implements BlockingRpcChannel {
587 protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
588 Address addr, User ticket, int rpcTimeout) {
589 super(rpcClient, addr, ticket, rpcTimeout);
592 @Override
593 public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
594 Message param, Message returnType) throws ServiceException {
595 return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
596 ticket, addr);
601 * Async rpc channel that goes via hbase rpc.
603 public static class RpcChannelImplementation extends AbstractRpcChannel implements
604 RpcChannel {
606 protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr,
607 User ticket, int rpcTimeout) {
608 super(rpcClient, addr, ticket, rpcTimeout);
611 @Override
612 public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param,
613 Message returnType, RpcCallback<Message> done) {
614 HBaseRpcController configuredController = configureRpcController(
615 Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
616 // This method does not throw any exceptions, so the caller must provide a
617 // HBaseRpcController which is used to pass the exceptions.
618 this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);