HBASE-26286: Add support for specifying store file tracker when restoring or cloning...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / ipc / SimpleRpcServer.java
blob20ea1f54418236e621140ea6578bb0c03438750b
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.ipc;
20 import java.io.IOException;
21 import java.net.BindException;
22 import java.net.InetSocketAddress;
23 import java.net.ServerSocket;
24 import java.net.SocketException;
25 import java.net.UnknownHostException;
26 import java.nio.channels.CancelledKeyException;
27 import java.nio.channels.GatheringByteChannel;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.util.Collections;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Set;
36 import java.util.Timer;
37 import java.util.TimerTask;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.LinkedBlockingQueue;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.Server;
47 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49 import org.apache.hadoop.hbase.util.Threads;
50 import org.apache.hadoop.io.IOUtils;
51 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
52 import org.apache.yetus.audience.InterfaceAudience;
54 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
56 /**
57 * The RPC server with native java NIO implementation deriving from Hadoop to
58 * host protobuf described Services. It's the original one before HBASE-17262,
59 * and the default RPC server for now.
61 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number
62 * of Readers in an ExecutorPool, 10 by default. The Listener does an accept and then
63 * round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does
64 * total read off the channel and the parse from which it makes a Call. The call is wrapped in a
65 * CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done
66 * and loops till done.
68 * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
69 * has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run
70 * taking from the queue. They run the CallRunner#run method on each item gotten from queue
71 * and keep taking while the server is up.
73 * CallRunner#run executes the call. When done, asks the included Call to put itself on new
74 * queue for Responder to pull from and return result to client.
76 * @see BlockingRpcClient
78 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG})
79 public class SimpleRpcServer extends RpcServer {
81 protected int port; // port we listen on
82 protected InetSocketAddress address; // inet address we listen on
83 private int readThreads; // number of read threads
85 protected int socketSendBufferSize;
86 protected final long purgeTimeout; // in milliseconds
88 // maintains the set of client connections and handles idle timeouts
89 private ConnectionManager connectionManager;
90 private Listener listener = null;
91 protected SimpleRpcServerResponder responder = null;
93 /** Listens on the socket. Creates jobs for the handler threads*/
94 private class Listener extends Thread {
96 private ServerSocketChannel acceptChannel = null; //the accept channel
97 private Selector selector = null; //the selector that we use for the server
98 private Reader[] readers = null;
99 private int currentReader = 0;
100 private final int readerPendingConnectionQueueLength;
102 private ExecutorService readPool;
104 public Listener(final String name) throws IOException {
105 super(name);
106 // The backlog of requests that we will have the serversocket carry.
107 int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
108 readerPendingConnectionQueueLength =
109 conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
110 // Create a new server socket and set to non blocking mode
111 acceptChannel = ServerSocketChannel.open();
112 acceptChannel.configureBlocking(false);
114 // Bind the server socket to the binding addrees (can be different from the default interface)
115 bind(acceptChannel.socket(), bindAddress, backlogLength);
116 port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
117 address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
118 // create a selector;
119 selector = Selector.open();
121 readers = new Reader[readThreads];
122 // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
123 // has an advantage in that it is easy to shutdown the pool.
124 readPool = Executors.newFixedThreadPool(readThreads,
125 new ThreadFactoryBuilder().setNameFormat(
126 "Reader=%d,bindAddress=" + bindAddress.getHostName() +
127 ",port=" + port).setDaemon(true)
128 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
129 for (int i = 0; i < readThreads; ++i) {
130 Reader reader = new Reader();
131 readers[i] = reader;
132 readPool.execute(reader);
134 LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
136 // Register accepts on the server socket with the selector.
137 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
138 this.setName("Listener,port=" + port);
139 this.setDaemon(true);
143 private class Reader implements Runnable {
144 final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections;
145 private final Selector readSelector;
147 Reader() throws IOException {
148 this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength);
149 this.readSelector = Selector.open();
152 @Override
153 public void run() {
154 try {
155 doRunLoop();
156 } finally {
157 try {
158 readSelector.close();
159 } catch (IOException ioe) {
160 LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
165 private synchronized void doRunLoop() {
166 while (running) {
167 try {
168 // Consume as many connections as currently queued to avoid
169 // unbridled acceptance of connections that starves the select
170 int size = pendingConnections.size();
171 for (int i=size; i>0; i--) {
172 SimpleServerRpcConnection conn = pendingConnections.take();
173 conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
175 readSelector.select();
176 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
177 while (iter.hasNext()) {
178 SelectionKey key = iter.next();
179 iter.remove();
180 if (key.isValid()) {
181 if (key.isReadable()) {
182 doRead(key);
185 key = null;
187 } catch (InterruptedException e) {
188 if (running) { // unexpected -- log it
189 LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
191 } catch (CancelledKeyException e) {
192 LOG.error(getName() + ": CancelledKeyException in Reader", e);
193 } catch (IOException ex) {
194 LOG.info(getName() + ": IOException in Reader", ex);
200 * Updating the readSelector while it's being used is not thread-safe,
201 * so the connection must be queued. The reader will drain the queue
202 * and update its readSelector before performing the next select
204 public void addConnection(SimpleServerRpcConnection conn) throws IOException {
205 pendingConnections.add(conn);
206 readSelector.wakeup();
210 @Override
211 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
212 justification="selector access is not synchronized; seems fine but concerned changing " +
213 "it will have per impact")
214 public void run() {
215 LOG.info(getName() + ": starting");
216 connectionManager.startIdleScan();
217 while (running) {
218 SelectionKey key = null;
219 try {
220 selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
221 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
222 while (iter.hasNext()) {
223 key = iter.next();
224 iter.remove();
225 try {
226 if (key.isValid()) {
227 if (key.isAcceptable())
228 doAccept(key);
230 } catch (IOException ignored) {
231 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
233 key = null;
235 } catch (OutOfMemoryError e) {
236 if (errorHandler != null) {
237 if (errorHandler.checkOOME(e)) {
238 LOG.info(getName() + ": exiting on OutOfMemoryError");
239 closeCurrentConnection(key, e);
240 connectionManager.closeIdle(true);
241 return;
243 } else {
244 // we can run out of memory if we have too many threads
245 // log the event and sleep for a minute and give
246 // some thread(s) a chance to finish
247 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
248 closeCurrentConnection(key, e);
249 connectionManager.closeIdle(true);
250 try {
251 Thread.sleep(60000);
252 } catch (InterruptedException ex) {
253 LOG.debug("Interrupted while sleeping");
256 } catch (Exception e) {
257 closeCurrentConnection(key, e);
260 LOG.info(getName() + ": stopping");
261 synchronized (this) {
262 try {
263 acceptChannel.close();
264 selector.close();
265 } catch (IOException ignored) {
266 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
269 selector= null;
270 acceptChannel= null;
272 // close all connections
273 connectionManager.stopIdleScan();
274 connectionManager.closeAll();
278 private void closeCurrentConnection(SelectionKey key, Throwable e) {
279 if (key != null) {
280 SimpleServerRpcConnection c = (SimpleServerRpcConnection)key.attachment();
281 if (c != null) {
282 closeConnection(c);
283 key.attach(null);
288 InetSocketAddress getAddress() {
289 return address;
292 void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
293 ServerSocketChannel server = (ServerSocketChannel) key.channel();
294 SocketChannel channel;
295 while ((channel = server.accept()) != null) {
296 channel.configureBlocking(false);
297 channel.socket().setTcpNoDelay(tcpNoDelay);
298 channel.socket().setKeepAlive(tcpKeepAlive);
299 Reader reader = getReader();
300 SimpleServerRpcConnection c = connectionManager.register(channel);
301 // If the connectionManager can't take it, close the connection.
302 if (c == null) {
303 if (channel.isOpen()) {
304 IOUtils.cleanupWithLogger(LOG, channel);
306 continue;
308 key.attach(c); // so closeCurrentConnection can get the object
309 reader.addConnection(c);
313 void doRead(SelectionKey key) throws InterruptedException {
314 int count;
315 SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
316 if (c == null) {
317 return;
319 c.setLastContact(EnvironmentEdgeManager.currentTime());
320 try {
321 count = c.readAndProcess();
322 } catch (InterruptedException ieo) {
323 LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
324 throw ieo;
325 } catch (Exception e) {
326 if (LOG.isDebugEnabled()) {
327 LOG.debug("Caught exception while reading:", e);
329 count = -1; //so that the (count < 0) block is executed
331 if (count < 0) {
332 closeConnection(c);
333 c = null;
334 } else {
335 c.setLastContact(EnvironmentEdgeManager.currentTime());
339 synchronized void doStop() {
340 if (selector != null) {
341 selector.wakeup();
342 Thread.yield();
344 if (acceptChannel != null) {
345 try {
346 acceptChannel.socket().close();
347 } catch (IOException e) {
348 LOG.info(getName() + ": exception in closing listener socket. " + e);
351 readPool.shutdownNow();
354 // The method that will return the next reader to work with
355 // Simplistic implementation of round robin for now
356 Reader getReader() {
357 currentReader = (currentReader + 1) % readers.length;
358 return readers[currentReader];
363 * Constructs a server listening on the named port and address.
364 * @param server hosting instance of {@link Server}. We will do authentications if an
365 * instance else pass null for no authentication check.
366 * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
367 * @param services A list of services.
368 * @param bindAddress Where to listen
369 * @param conf
370 * @param scheduler
371 * @param reservoirEnabled Enable ByteBufferPool or not.
373 public SimpleRpcServer(final Server server, final String name,
374 final List<BlockingServiceAndInterface> services,
375 final InetSocketAddress bindAddress, Configuration conf,
376 RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
377 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
378 this.socketSendBufferSize = 0;
379 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
380 this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
381 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
383 // Start the listener here and let it bind to the port
384 listener = new Listener(name);
385 this.port = listener.getAddress().getPort();
387 // Create the responder here
388 responder = new SimpleRpcServerResponder(this);
389 connectionManager = new ConnectionManager();
390 initReconfigurable(conf);
392 this.scheduler.init(new RpcSchedulerContext(this));
396 * Subclasses of HBaseServer can override this to provide their own
397 * Connection implementations.
399 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
400 return new SimpleServerRpcConnection(this, channel, time);
403 protected void closeConnection(SimpleServerRpcConnection connection) {
404 connectionManager.close(connection);
407 /** Sets the socket buffer size used for responding to RPCs.
408 * @param size send size
410 @Override
411 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
413 /** Starts the service. Must be called before any calls will be handled. */
414 @Override
415 public synchronized void start() {
416 if (started) {
417 return;
419 authTokenSecretMgr = createSecretManager();
420 if (authTokenSecretMgr != null) {
421 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
422 // LeaderElector start. See HBASE-25875
423 synchronized (authTokenSecretMgr) {
424 setSecretManager(authTokenSecretMgr);
425 authTokenSecretMgr.start();
428 this.authManager = new ServiceAuthorizationManager();
429 HBasePolicyProvider.init(conf, authManager);
430 responder.start();
431 listener.start();
432 scheduler.start();
433 started = true;
436 /** Stops the service. No new calls will be handled after this is called. */
437 @Override
438 public synchronized void stop() {
439 LOG.info("Stopping server on " + port);
440 running = false;
441 if (authTokenSecretMgr != null) {
442 authTokenSecretMgr.stop();
443 authTokenSecretMgr = null;
445 listener.interrupt();
446 listener.doStop();
447 responder.interrupt();
448 scheduler.stop();
449 notifyAll();
453 * Wait for the server to be stopped. Does not wait for all subthreads to finish.
454 * @see #stop()
456 @Override
457 public synchronized void join() throws InterruptedException {
458 while (running) {
459 wait();
464 * Return the socket (ip+port) on which the RPC server is listening to. May return null if
465 * the listener channel is closed.
466 * @return the socket (ip+port) on which the RPC server is listening to, or null if this
467 * information cannot be determined
469 @Override
470 public synchronized InetSocketAddress getListenerAddress() {
471 if (listener == null) {
472 return null;
474 return listener.getAddress();
478 * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
479 * If the amount of data is large, it writes to channel in smaller chunks.
480 * This is to avoid jdk from creating many direct buffers as the size of
481 * buffer increases. This also minimizes extra copies in NIO layer
482 * as a result of multiple write operations required to write a large
483 * buffer.
485 * @param channel writable byte channel to write to
486 * @param bufferChain Chain of buffers to write
487 * @return number of bytes written
488 * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
490 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
491 throws IOException {
492 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
493 if (count > 0) {
494 this.metrics.sentBytes(count);
496 return count;
500 * A convenience method to bind to a given address and report
501 * better exceptions if the address is not a valid host.
502 * @param socket the socket to bind
503 * @param address the address to bind to
504 * @param backlog the number of connections allowed in the queue
505 * @throws BindException if the address can't be bound
506 * @throws UnknownHostException if the address isn't a valid host name
507 * @throws IOException other random errors from bind
509 public static void bind(ServerSocket socket, InetSocketAddress address, int backlog)
510 throws IOException {
511 try {
512 socket.bind(address, backlog);
513 } catch (BindException e) {
514 BindException bindException =
515 new BindException("Problem binding to " + address + " : " + e.getMessage());
516 bindException.initCause(e);
517 throw bindException;
518 } catch (SocketException e) {
519 // If they try to bind to a different host's address, give a better
520 // error message.
521 if ("Unresolved address".equals(e.getMessage())) {
522 throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
524 throw e;
529 * The number of open RPC conections
530 * @return the number of open rpc connections
532 @Override
533 public int getNumOpenConnections() {
534 return connectionManager.size();
537 private class ConnectionManager {
538 final private AtomicInteger count = new AtomicInteger();
539 final private Set<SimpleServerRpcConnection> connections;
541 final private Timer idleScanTimer;
542 final private int idleScanThreshold;
543 final private int idleScanInterval;
544 final private int maxIdleTime;
545 final private int maxIdleToClose;
547 ConnectionManager() {
548 this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
549 this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
550 this.idleScanInterval =
551 conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
552 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
553 this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
554 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
555 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
556 int maxConnectionQueueSize =
557 handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
558 // create a set with concurrency -and- a thread-safe iterator, add 2
559 // for listener and idle closer threads
560 this.connections = Collections.newSetFromMap(
561 new ConcurrentHashMap<SimpleServerRpcConnection,Boolean>(
562 maxConnectionQueueSize, 0.75f, readThreads+2));
565 private boolean add(SimpleServerRpcConnection connection) {
566 boolean added = connections.add(connection);
567 if (added) {
568 count.getAndIncrement();
570 return added;
573 private boolean remove(SimpleServerRpcConnection connection) {
574 boolean removed = connections.remove(connection);
575 if (removed) {
576 count.getAndDecrement();
578 return removed;
581 int size() {
582 return count.get();
585 SimpleServerRpcConnection[] toArray() {
586 return connections.toArray(new SimpleServerRpcConnection[0]);
589 SimpleServerRpcConnection register(SocketChannel channel) {
590 SimpleServerRpcConnection connection = getConnection(channel,
591 EnvironmentEdgeManager.currentTime());
592 add(connection);
593 if (LOG.isTraceEnabled()) {
594 LOG.trace("Connection from " + connection +
595 "; connections=" + size() +
596 ", queued calls size (bytes)=" + callQueueSizeInBytes.sum() +
597 ", general queued calls=" + scheduler.getGeneralQueueLength() +
598 ", priority queued calls=" + scheduler.getPriorityQueueLength() +
599 ", meta priority queued calls=" + scheduler.getMetaPriorityQueueLength());
601 return connection;
604 boolean close(SimpleServerRpcConnection connection) {
605 boolean exists = remove(connection);
606 if (exists) {
607 if (LOG.isTraceEnabled()) {
608 LOG.trace(Thread.currentThread().getName() +
609 ": disconnecting client " + connection +
610 ". Number of active connections: "+ size());
612 // only close if actually removed to avoid double-closing due
613 // to possible races
614 connection.close();
616 return exists;
619 // synch'ed to avoid explicit invocation upon OOM from colliding with
620 // timer task firing
621 synchronized void closeIdle(boolean scanAll) {
622 long minLastContact = EnvironmentEdgeManager.currentTime() - maxIdleTime;
623 // concurrent iterator might miss new connections added
624 // during the iteration, but that's ok because they won't
625 // be idle yet anyway and will be caught on next scan
626 int closed = 0;
627 for (SimpleServerRpcConnection connection : connections) {
628 // stop if connections dropped below threshold unless scanning all
629 if (!scanAll && size() < idleScanThreshold) {
630 break;
632 // stop if not scanning all and max connections are closed
633 if (connection.isIdle() &&
634 connection.getLastContact() < minLastContact &&
635 close(connection) &&
636 !scanAll && (++closed == maxIdleToClose)) {
637 break;
642 void closeAll() {
643 // use a copy of the connections to be absolutely sure the concurrent
644 // iterator doesn't miss a connection
645 for (SimpleServerRpcConnection connection : toArray()) {
646 close(connection);
650 void startIdleScan() {
651 scheduleIdleScanTask();
654 void stopIdleScan() {
655 idleScanTimer.cancel();
658 private void scheduleIdleScanTask() {
659 if (!running) {
660 return;
662 TimerTask idleScanTask = new TimerTask(){
663 @Override
664 public void run() {
665 if (!running) {
666 return;
668 if (LOG.isTraceEnabled()) {
669 LOG.trace("running");
671 try {
672 closeIdle(false);
673 } finally {
674 // explicitly reschedule so next execution occurs relative
675 // to the end of this scan, not the beginning
676 scheduleIdleScanTask();
680 idleScanTimer.schedule(idleScanTask, idleScanInterval);