2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.ipc
;
20 import java
.io
.IOException
;
21 import java
.net
.BindException
;
22 import java
.net
.InetSocketAddress
;
23 import java
.net
.ServerSocket
;
24 import java
.net
.SocketException
;
25 import java
.net
.UnknownHostException
;
26 import java
.nio
.channels
.CancelledKeyException
;
27 import java
.nio
.channels
.GatheringByteChannel
;
28 import java
.nio
.channels
.SelectionKey
;
29 import java
.nio
.channels
.Selector
;
30 import java
.nio
.channels
.ServerSocketChannel
;
31 import java
.nio
.channels
.SocketChannel
;
32 import java
.util
.Collections
;
33 import java
.util
.Iterator
;
34 import java
.util
.List
;
36 import java
.util
.Timer
;
37 import java
.util
.TimerTask
;
38 import java
.util
.concurrent
.ConcurrentHashMap
;
39 import java
.util
.concurrent
.ExecutorService
;
40 import java
.util
.concurrent
.Executors
;
41 import java
.util
.concurrent
.LinkedBlockingQueue
;
42 import java
.util
.concurrent
.atomic
.AtomicInteger
;
43 import org
.apache
.hadoop
.conf
.Configuration
;
44 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
45 import org
.apache
.hadoop
.hbase
.HConstants
;
46 import org
.apache
.hadoop
.hbase
.Server
;
47 import org
.apache
.hadoop
.hbase
.security
.HBasePolicyProvider
;
48 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
49 import org
.apache
.hadoop
.hbase
.util
.Threads
;
50 import org
.apache
.hadoop
.io
.IOUtils
;
51 import org
.apache
.hadoop
.security
.authorize
.ServiceAuthorizationManager
;
52 import org
.apache
.yetus
.audience
.InterfaceAudience
;
54 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
57 * The RPC server with native java NIO implementation deriving from Hadoop to
58 * host protobuf described Services. It's the original one before HBASE-17262,
59 * and the default RPC server for now.
61 * An RpcServer instance has a Listener that hosts the socket. Listener has fixed number
62 * of Readers in an ExecutorPool, 10 by default. The Listener does an accept and then
63 * round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does
64 * total read off the channel and the parse from which it makes a Call. The call is wrapped in a
65 * CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done
66 * and loops till done.
68 * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
69 * has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run
70 * taking from the queue. They run the CallRunner#run method on each item gotten from queue
71 * and keep taking while the server is up.
73 * CallRunner#run executes the call. When done, asks the included Call to put itself on new
74 * queue for Responder to pull from and return result to client.
76 * @see BlockingRpcClient
78 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience
.CONFIG
})
79 public class SimpleRpcServer
extends RpcServer
{
81 protected int port
; // port we listen on
82 protected InetSocketAddress address
; // inet address we listen on
83 private int readThreads
; // number of read threads
85 protected int socketSendBufferSize
;
86 protected final long purgeTimeout
; // in milliseconds
88 // maintains the set of client connections and handles idle timeouts
89 private ConnectionManager connectionManager
;
90 private Listener listener
= null;
91 protected SimpleRpcServerResponder responder
= null;
93 /** Listens on the socket. Creates jobs for the handler threads*/
94 private class Listener
extends Thread
{
96 private ServerSocketChannel acceptChannel
= null; //the accept channel
97 private Selector selector
= null; //the selector that we use for the server
98 private Reader
[] readers
= null;
99 private int currentReader
= 0;
100 private final int readerPendingConnectionQueueLength
;
102 private ExecutorService readPool
;
104 public Listener(final String name
) throws IOException
{
106 // The backlog of requests that we will have the serversocket carry.
107 int backlogLength
= conf
.getInt("hbase.ipc.server.listen.queue.size", 128);
108 readerPendingConnectionQueueLength
=
109 conf
.getInt("hbase.ipc.server.read.connection-queue.size", 100);
110 // Create a new server socket and set to non blocking mode
111 acceptChannel
= ServerSocketChannel
.open();
112 acceptChannel
.configureBlocking(false);
114 // Bind the server socket to the binding addrees (can be different from the default interface)
115 bind(acceptChannel
.socket(), bindAddress
, backlogLength
);
116 port
= acceptChannel
.socket().getLocalPort(); //Could be an ephemeral port
117 address
= (InetSocketAddress
)acceptChannel
.socket().getLocalSocketAddress();
118 // create a selector;
119 selector
= Selector
.open();
121 readers
= new Reader
[readThreads
];
122 // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
123 // has an advantage in that it is easy to shutdown the pool.
124 readPool
= Executors
.newFixedThreadPool(readThreads
,
125 new ThreadFactoryBuilder().setNameFormat(
126 "Reader=%d,bindAddress=" + bindAddress
.getHostName() +
127 ",port=" + port
).setDaemon(true)
128 .setUncaughtExceptionHandler(Threads
.LOGGING_EXCEPTION_HANDLER
).build());
129 for (int i
= 0; i
< readThreads
; ++i
) {
130 Reader reader
= new Reader();
132 readPool
.execute(reader
);
134 LOG
.info(getName() + ": started " + readThreads
+ " reader(s) listening on port=" + port
);
136 // Register accepts on the server socket with the selector.
137 acceptChannel
.register(selector
, SelectionKey
.OP_ACCEPT
);
138 this.setName("Listener,port=" + port
);
139 this.setDaemon(true);
143 private class Reader
implements Runnable
{
144 final private LinkedBlockingQueue
<SimpleServerRpcConnection
> pendingConnections
;
145 private final Selector readSelector
;
147 Reader() throws IOException
{
148 this.pendingConnections
= new LinkedBlockingQueue
<>(readerPendingConnectionQueueLength
);
149 this.readSelector
= Selector
.open();
158 readSelector
.close();
159 } catch (IOException ioe
) {
160 LOG
.error(getName() + ": error closing read selector in " + getName(), ioe
);
165 private synchronized void doRunLoop() {
168 // Consume as many connections as currently queued to avoid
169 // unbridled acceptance of connections that starves the select
170 int size
= pendingConnections
.size();
171 for (int i
=size
; i
>0; i
--) {
172 SimpleServerRpcConnection conn
= pendingConnections
.take();
173 conn
.channel
.register(readSelector
, SelectionKey
.OP_READ
, conn
);
175 readSelector
.select();
176 Iterator
<SelectionKey
> iter
= readSelector
.selectedKeys().iterator();
177 while (iter
.hasNext()) {
178 SelectionKey key
= iter
.next();
181 if (key
.isReadable()) {
187 } catch (InterruptedException e
) {
188 if (running
) { // unexpected -- log it
189 LOG
.info(Thread
.currentThread().getName() + " unexpectedly interrupted", e
);
191 } catch (CancelledKeyException e
) {
192 LOG
.error(getName() + ": CancelledKeyException in Reader", e
);
193 } catch (IOException ex
) {
194 LOG
.info(getName() + ": IOException in Reader", ex
);
200 * Updating the readSelector while it's being used is not thread-safe,
201 * so the connection must be queued. The reader will drain the queue
202 * and update its readSelector before performing the next select
204 public void addConnection(SimpleServerRpcConnection conn
) throws IOException
{
205 pendingConnections
.add(conn
);
206 readSelector
.wakeup();
211 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="IS2_INCONSISTENT_SYNC",
212 justification
="selector access is not synchronized; seems fine but concerned changing " +
213 "it will have per impact")
215 LOG
.info(getName() + ": starting");
216 connectionManager
.startIdleScan();
218 SelectionKey key
= null;
220 selector
.select(); // FindBugs IS2_INCONSISTENT_SYNC
221 Iterator
<SelectionKey
> iter
= selector
.selectedKeys().iterator();
222 while (iter
.hasNext()) {
227 if (key
.isAcceptable())
230 } catch (IOException ignored
) {
231 if (LOG
.isTraceEnabled()) LOG
.trace("ignored", ignored
);
235 } catch (OutOfMemoryError e
) {
236 if (errorHandler
!= null) {
237 if (errorHandler
.checkOOME(e
)) {
238 LOG
.info(getName() + ": exiting on OutOfMemoryError");
239 closeCurrentConnection(key
, e
);
240 connectionManager
.closeIdle(true);
244 // we can run out of memory if we have too many threads
245 // log the event and sleep for a minute and give
246 // some thread(s) a chance to finish
247 LOG
.warn(getName() + ": OutOfMemoryError in server select", e
);
248 closeCurrentConnection(key
, e
);
249 connectionManager
.closeIdle(true);
252 } catch (InterruptedException ex
) {
253 LOG
.debug("Interrupted while sleeping");
256 } catch (Exception e
) {
257 closeCurrentConnection(key
, e
);
260 LOG
.info(getName() + ": stopping");
261 synchronized (this) {
263 acceptChannel
.close();
265 } catch (IOException ignored
) {
266 if (LOG
.isTraceEnabled()) LOG
.trace("ignored", ignored
);
272 // close all connections
273 connectionManager
.stopIdleScan();
274 connectionManager
.closeAll();
278 private void closeCurrentConnection(SelectionKey key
, Throwable e
) {
280 SimpleServerRpcConnection c
= (SimpleServerRpcConnection
)key
.attachment();
288 InetSocketAddress
getAddress() {
292 void doAccept(SelectionKey key
) throws InterruptedException
, IOException
, OutOfMemoryError
{
293 ServerSocketChannel server
= (ServerSocketChannel
) key
.channel();
294 SocketChannel channel
;
295 while ((channel
= server
.accept()) != null) {
296 channel
.configureBlocking(false);
297 channel
.socket().setTcpNoDelay(tcpNoDelay
);
298 channel
.socket().setKeepAlive(tcpKeepAlive
);
299 Reader reader
= getReader();
300 SimpleServerRpcConnection c
= connectionManager
.register(channel
);
301 // If the connectionManager can't take it, close the connection.
303 if (channel
.isOpen()) {
304 IOUtils
.cleanupWithLogger(LOG
, channel
);
308 key
.attach(c
); // so closeCurrentConnection can get the object
309 reader
.addConnection(c
);
313 void doRead(SelectionKey key
) throws InterruptedException
{
315 SimpleServerRpcConnection c
= (SimpleServerRpcConnection
) key
.attachment();
319 c
.setLastContact(EnvironmentEdgeManager
.currentTime());
321 count
= c
.readAndProcess();
322 } catch (InterruptedException ieo
) {
323 LOG
.info(Thread
.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo
);
325 } catch (Exception e
) {
326 if (LOG
.isDebugEnabled()) {
327 LOG
.debug("Caught exception while reading:", e
);
329 count
= -1; //so that the (count < 0) block is executed
335 c
.setLastContact(EnvironmentEdgeManager
.currentTime());
339 synchronized void doStop() {
340 if (selector
!= null) {
344 if (acceptChannel
!= null) {
346 acceptChannel
.socket().close();
347 } catch (IOException e
) {
348 LOG
.info(getName() + ": exception in closing listener socket. " + e
);
351 readPool
.shutdownNow();
354 // The method that will return the next reader to work with
355 // Simplistic implementation of round robin for now
357 currentReader
= (currentReader
+ 1) % readers
.length
;
358 return readers
[currentReader
];
363 * Constructs a server listening on the named port and address.
364 * @param server hosting instance of {@link Server}. We will do authentications if an
365 * instance else pass null for no authentication check.
366 * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
367 * @param services A list of services.
368 * @param bindAddress Where to listen
371 * @param reservoirEnabled Enable ByteBufferPool or not.
373 public SimpleRpcServer(final Server server
, final String name
,
374 final List
<BlockingServiceAndInterface
> services
,
375 final InetSocketAddress bindAddress
, Configuration conf
,
376 RpcScheduler scheduler
, boolean reservoirEnabled
) throws IOException
{
377 super(server
, name
, services
, bindAddress
, conf
, scheduler
, reservoirEnabled
);
378 this.socketSendBufferSize
= 0;
379 this.readThreads
= conf
.getInt("hbase.ipc.server.read.threadpool.size", 10);
380 this.purgeTimeout
= conf
.getLong("hbase.ipc.client.call.purge.timeout",
381 2 * HConstants
.DEFAULT_HBASE_RPC_TIMEOUT
);
383 // Start the listener here and let it bind to the port
384 listener
= new Listener(name
);
385 this.port
= listener
.getAddress().getPort();
387 // Create the responder here
388 responder
= new SimpleRpcServerResponder(this);
389 connectionManager
= new ConnectionManager();
390 initReconfigurable(conf
);
392 this.scheduler
.init(new RpcSchedulerContext(this));
396 * Subclasses of HBaseServer can override this to provide their own
397 * Connection implementations.
399 protected SimpleServerRpcConnection
getConnection(SocketChannel channel
, long time
) {
400 return new SimpleServerRpcConnection(this, channel
, time
);
403 protected void closeConnection(SimpleServerRpcConnection connection
) {
404 connectionManager
.close(connection
);
407 /** Sets the socket buffer size used for responding to RPCs.
408 * @param size send size
411 public void setSocketSendBufSize(int size
) { this.socketSendBufferSize
= size
; }
413 /** Starts the service. Must be called before any calls will be handled. */
415 public synchronized void start() {
419 authTokenSecretMgr
= createSecretManager();
420 if (authTokenSecretMgr
!= null) {
421 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
422 // LeaderElector start. See HBASE-25875
423 synchronized (authTokenSecretMgr
) {
424 setSecretManager(authTokenSecretMgr
);
425 authTokenSecretMgr
.start();
428 this.authManager
= new ServiceAuthorizationManager();
429 HBasePolicyProvider
.init(conf
, authManager
);
436 /** Stops the service. No new calls will be handled after this is called. */
438 public synchronized void stop() {
439 LOG
.info("Stopping server on " + port
);
441 if (authTokenSecretMgr
!= null) {
442 authTokenSecretMgr
.stop();
443 authTokenSecretMgr
= null;
445 listener
.interrupt();
447 responder
.interrupt();
453 * Wait for the server to be stopped. Does not wait for all subthreads to finish.
457 public synchronized void join() throws InterruptedException
{
464 * Return the socket (ip+port) on which the RPC server is listening to. May return null if
465 * the listener channel is closed.
466 * @return the socket (ip+port) on which the RPC server is listening to, or null if this
467 * information cannot be determined
470 public synchronized InetSocketAddress
getListenerAddress() {
471 if (listener
== null) {
474 return listener
.getAddress();
478 * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
479 * If the amount of data is large, it writes to channel in smaller chunks.
480 * This is to avoid jdk from creating many direct buffers as the size of
481 * buffer increases. This also minimizes extra copies in NIO layer
482 * as a result of multiple write operations required to write a large
485 * @param channel writable byte channel to write to
486 * @param bufferChain Chain of buffers to write
487 * @return number of bytes written
488 * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
490 protected long channelWrite(GatheringByteChannel channel
, BufferChain bufferChain
)
492 long count
= bufferChain
.write(channel
, NIO_BUFFER_LIMIT
);
494 this.metrics
.sentBytes(count
);
500 * A convenience method to bind to a given address and report
501 * better exceptions if the address is not a valid host.
502 * @param socket the socket to bind
503 * @param address the address to bind to
504 * @param backlog the number of connections allowed in the queue
505 * @throws BindException if the address can't be bound
506 * @throws UnknownHostException if the address isn't a valid host name
507 * @throws IOException other random errors from bind
509 public static void bind(ServerSocket socket
, InetSocketAddress address
, int backlog
)
512 socket
.bind(address
, backlog
);
513 } catch (BindException e
) {
514 BindException bindException
=
515 new BindException("Problem binding to " + address
+ " : " + e
.getMessage());
516 bindException
.initCause(e
);
518 } catch (SocketException e
) {
519 // If they try to bind to a different host's address, give a better
521 if ("Unresolved address".equals(e
.getMessage())) {
522 throw new UnknownHostException("Invalid hostname for server: " + address
.getHostName());
529 * The number of open RPC conections
530 * @return the number of open rpc connections
533 public int getNumOpenConnections() {
534 return connectionManager
.size();
537 private class ConnectionManager
{
538 final private AtomicInteger count
= new AtomicInteger();
539 final private Set
<SimpleServerRpcConnection
> connections
;
541 final private Timer idleScanTimer
;
542 final private int idleScanThreshold
;
543 final private int idleScanInterval
;
544 final private int maxIdleTime
;
545 final private int maxIdleToClose
;
547 ConnectionManager() {
548 this.idleScanTimer
= new Timer("RpcServer idle connection scanner for port " + port
, true);
549 this.idleScanThreshold
= conf
.getInt("hbase.ipc.client.idlethreshold", 4000);
550 this.idleScanInterval
=
551 conf
.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
552 this.maxIdleTime
= 2 * conf
.getInt("hbase.ipc.client.connection.maxidletime", 10000);
553 this.maxIdleToClose
= conf
.getInt("hbase.ipc.client.kill.max", 10);
554 int handlerCount
= conf
.getInt(HConstants
.REGION_SERVER_HANDLER_COUNT
,
555 HConstants
.DEFAULT_REGION_SERVER_HANDLER_COUNT
);
556 int maxConnectionQueueSize
=
557 handlerCount
* conf
.getInt("hbase.ipc.server.handler.queue.size", 100);
558 // create a set with concurrency -and- a thread-safe iterator, add 2
559 // for listener and idle closer threads
560 this.connections
= Collections
.newSetFromMap(
561 new ConcurrentHashMap
<SimpleServerRpcConnection
,Boolean
>(
562 maxConnectionQueueSize
, 0.75f
, readThreads
+2));
565 private boolean add(SimpleServerRpcConnection connection
) {
566 boolean added
= connections
.add(connection
);
568 count
.getAndIncrement();
573 private boolean remove(SimpleServerRpcConnection connection
) {
574 boolean removed
= connections
.remove(connection
);
576 count
.getAndDecrement();
585 SimpleServerRpcConnection
[] toArray() {
586 return connections
.toArray(new SimpleServerRpcConnection
[0]);
589 SimpleServerRpcConnection
register(SocketChannel channel
) {
590 SimpleServerRpcConnection connection
= getConnection(channel
,
591 EnvironmentEdgeManager
.currentTime());
593 if (LOG
.isTraceEnabled()) {
594 LOG
.trace("Connection from " + connection
+
595 "; connections=" + size() +
596 ", queued calls size (bytes)=" + callQueueSizeInBytes
.sum() +
597 ", general queued calls=" + scheduler
.getGeneralQueueLength() +
598 ", priority queued calls=" + scheduler
.getPriorityQueueLength() +
599 ", meta priority queued calls=" + scheduler
.getMetaPriorityQueueLength());
604 boolean close(SimpleServerRpcConnection connection
) {
605 boolean exists
= remove(connection
);
607 if (LOG
.isTraceEnabled()) {
608 LOG
.trace(Thread
.currentThread().getName() +
609 ": disconnecting client " + connection
+
610 ". Number of active connections: "+ size());
612 // only close if actually removed to avoid double-closing due
619 // synch'ed to avoid explicit invocation upon OOM from colliding with
621 synchronized void closeIdle(boolean scanAll
) {
622 long minLastContact
= EnvironmentEdgeManager
.currentTime() - maxIdleTime
;
623 // concurrent iterator might miss new connections added
624 // during the iteration, but that's ok because they won't
625 // be idle yet anyway and will be caught on next scan
627 for (SimpleServerRpcConnection connection
: connections
) {
628 // stop if connections dropped below threshold unless scanning all
629 if (!scanAll
&& size() < idleScanThreshold
) {
632 // stop if not scanning all and max connections are closed
633 if (connection
.isIdle() &&
634 connection
.getLastContact() < minLastContact
&&
636 !scanAll
&& (++closed
== maxIdleToClose
)) {
643 // use a copy of the connections to be absolutely sure the concurrent
644 // iterator doesn't miss a connection
645 for (SimpleServerRpcConnection connection
: toArray()) {
650 void startIdleScan() {
651 scheduleIdleScanTask();
654 void stopIdleScan() {
655 idleScanTimer
.cancel();
658 private void scheduleIdleScanTask() {
662 TimerTask idleScanTask
= new TimerTask(){
668 if (LOG
.isTraceEnabled()) {
669 LOG
.trace("running");
674 // explicitly reschedule so next execution occurs relative
675 // to the end of this scan, not the beginning
676 scheduleIdleScanTask();
680 idleScanTimer
.schedule(idleScanTask
, idleScanInterval
);