2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.ipc
;
20 import java
.io
.IOException
;
21 import java
.io
.InterruptedIOException
;
22 import java
.net
.InetSocketAddress
;
23 import java
.util
.List
;
24 import java
.util
.concurrent
.CountDownLatch
;
25 import org
.apache
.hadoop
.conf
.Configuration
;
26 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
27 import org
.apache
.hadoop
.hbase
.HBaseServerBase
;
28 import org
.apache
.hadoop
.hbase
.Server
;
29 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
30 import org
.apache
.hadoop
.hbase
.security
.HBasePolicyProvider
;
31 import org
.apache
.hadoop
.hbase
.util
.NettyEventLoopGroupConfig
;
32 import org
.apache
.hadoop
.security
.authorize
.ServiceAuthorizationManager
;
33 import org
.apache
.yetus
.audience
.InterfaceAudience
;
34 import org
.slf4j
.Logger
;
35 import org
.slf4j
.LoggerFactory
;
37 import org
.apache
.hbase
.thirdparty
.io
.netty
.bootstrap
.ServerBootstrap
;
38 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.Channel
;
39 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelInitializer
;
40 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelOption
;
41 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ChannelPipeline
;
42 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.EventLoopGroup
;
43 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.ServerChannel
;
44 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.group
.ChannelGroup
;
45 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.group
.DefaultChannelGroup
;
46 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.nio
.NioEventLoopGroup
;
47 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.socket
.nio
.NioServerSocketChannel
;
48 import org
.apache
.hbase
.thirdparty
.io
.netty
.handler
.codec
.FixedLengthFrameDecoder
;
49 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.concurrent
.DefaultThreadFactory
;
50 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.concurrent
.GlobalEventExecutor
;
53 * An RPC server with Netty4 implementation.
56 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience
.CONFIG
})
57 public class NettyRpcServer
extends RpcServer
{
58 public static final Logger LOG
= LoggerFactory
.getLogger(NettyRpcServer
.class);
61 * Name of property to change netty rpc server eventloop thread count. Default is 0.
62 * Tests may set this down from unlimited.
64 public static final String HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY
=
65 "hbase.netty.eventloop.rpcserver.thread.count";
66 private static final int EVENTLOOP_THREADCOUNT_DEFAULT
= 0;
68 private final InetSocketAddress bindAddress
;
70 private final CountDownLatch closed
= new CountDownLatch(1);
71 private final Channel serverChannel
;
72 private final ChannelGroup allChannels
=
73 new DefaultChannelGroup(GlobalEventExecutor
.INSTANCE
, true);
75 public NettyRpcServer(Server server
, String name
, List
<BlockingServiceAndInterface
> services
,
76 InetSocketAddress bindAddress
, Configuration conf
, RpcScheduler scheduler
,
77 boolean reservoirEnabled
) throws IOException
{
78 super(server
, name
, services
, bindAddress
, conf
, scheduler
, reservoirEnabled
);
79 this.bindAddress
= bindAddress
;
80 EventLoopGroup eventLoopGroup
;
81 Class
<?
extends ServerChannel
> channelClass
;
82 if (server
instanceof HRegionServer
) {
83 NettyEventLoopGroupConfig config
= ((HBaseServerBase
) server
).getEventLoopGroupConfig();
84 eventLoopGroup
= config
.group();
85 channelClass
= config
.serverChannelClass();
87 int threadCount
= server
== null? EVENTLOOP_THREADCOUNT_DEFAULT
:
88 server
.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY
,
89 EVENTLOOP_THREADCOUNT_DEFAULT
);
90 eventLoopGroup
= new NioEventLoopGroup(threadCount
,
91 new DefaultThreadFactory("NettyRpcServer", true, Thread
.MAX_PRIORITY
));
92 channelClass
= NioServerSocketChannel
.class;
94 ServerBootstrap bootstrap
= new ServerBootstrap().group(eventLoopGroup
).channel(channelClass
)
95 .childOption(ChannelOption
.TCP_NODELAY
, tcpNoDelay
)
96 .childOption(ChannelOption
.SO_KEEPALIVE
, tcpKeepAlive
)
97 .childOption(ChannelOption
.SO_REUSEADDR
, true)
98 .childHandler(new ChannelInitializer
<Channel
>() {
101 protected void initChannel(Channel ch
) throws Exception
{
102 ChannelPipeline pipeline
= ch
.pipeline();
103 FixedLengthFrameDecoder preambleDecoder
= new FixedLengthFrameDecoder(6);
104 preambleDecoder
.setSingleDecode(true);
105 pipeline
.addLast("preambleDecoder", preambleDecoder
);
106 pipeline
.addLast("preambleHandler", createNettyRpcServerPreambleHandler());
107 pipeline
.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize
));
108 pipeline
.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels
, metrics
));
109 pipeline
.addLast("encoder", new NettyRpcServerResponseEncoder(metrics
));
113 serverChannel
= bootstrap
.bind(this.bindAddress
).sync().channel();
114 LOG
.info("Bind to {}", serverChannel
.localAddress());
115 } catch (InterruptedException e
) {
116 throw new InterruptedIOException(e
.getMessage());
118 initReconfigurable(conf
);
119 this.scheduler
.init(new RpcSchedulerContext(this));
122 @InterfaceAudience.Private
123 protected NettyRpcServerPreambleHandler
createNettyRpcServerPreambleHandler() {
124 return new NettyRpcServerPreambleHandler(NettyRpcServer
.this);
128 public synchronized void start() {
132 authTokenSecretMgr
= createSecretManager();
133 if (authTokenSecretMgr
!= null) {
134 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
135 // LeaderElector start. See HBASE-25875
136 synchronized (authTokenSecretMgr
) {
137 setSecretManager(authTokenSecretMgr
);
138 authTokenSecretMgr
.start();
141 this.authManager
= new ServiceAuthorizationManager();
142 HBasePolicyProvider
.init(conf
, authManager
);
148 public synchronized void stop() {
152 LOG
.info("Stopping server on " + this.serverChannel
.localAddress());
153 if (authTokenSecretMgr
!= null) {
154 authTokenSecretMgr
.stop();
155 authTokenSecretMgr
= null;
157 allChannels
.close().awaitUninterruptibly();
158 serverChannel
.close();
165 public synchronized void join() throws InterruptedException
{
170 public synchronized InetSocketAddress
getListenerAddress() {
171 return ((InetSocketAddress
) serverChannel
.localAddress());
175 public void setSocketSendBufSize(int size
) {
179 public int getNumOpenConnections() {
180 int channelsCount
= allChannels
.size();
181 // allChannels also contains the server channel, so exclude that from the count.
182 return channelsCount
> 0 ? channelsCount
- 1 : channelsCount
;