HBASE-26286: Add support for specifying store file tracker when restoring or cloning...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / ipc / NettyRpcServer.java
bloba3ee71fc6fb2734e6af5f60d7f460019d0096001
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.ipc;
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.net.InetSocketAddress;
23 import java.util.List;
24 import java.util.concurrent.CountDownLatch;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
27 import org.apache.hadoop.hbase.HBaseServerBase;
28 import org.apache.hadoop.hbase.Server;
29 import org.apache.hadoop.hbase.regionserver.HRegionServer;
30 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
31 import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
32 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
33 import org.apache.yetus.audience.InterfaceAudience;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
38 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
39 import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
40 import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
41 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
42 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
43 import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
44 import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
45 import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
46 import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
47 import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
48 import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
49 import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
50 import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
52 /**
53 * An RPC server with Netty4 implementation.
54 * @since 2.0.0
56 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG})
57 public class NettyRpcServer extends RpcServer {
58 public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
60 /**
61 * Name of property to change netty rpc server eventloop thread count. Default is 0.
62 * Tests may set this down from unlimited.
64 public static final String HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY =
65 "hbase.netty.eventloop.rpcserver.thread.count";
66 private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;
68 private final InetSocketAddress bindAddress;
70 private final CountDownLatch closed = new CountDownLatch(1);
71 private final Channel serverChannel;
72 private final ChannelGroup allChannels =
73 new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
75 public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
76 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
77 boolean reservoirEnabled) throws IOException {
78 super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
79 this.bindAddress = bindAddress;
80 EventLoopGroup eventLoopGroup;
81 Class<? extends ServerChannel> channelClass;
82 if (server instanceof HRegionServer) {
83 NettyEventLoopGroupConfig config = ((HBaseServerBase) server).getEventLoopGroupConfig();
84 eventLoopGroup = config.group();
85 channelClass = config.serverChannelClass();
86 } else {
87 int threadCount = server == null? EVENTLOOP_THREADCOUNT_DEFAULT:
88 server.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY,
89 EVENTLOOP_THREADCOUNT_DEFAULT);
90 eventLoopGroup = new NioEventLoopGroup(threadCount,
91 new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
92 channelClass = NioServerSocketChannel.class;
94 ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
95 .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
96 .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
97 .childOption(ChannelOption.SO_REUSEADDR, true)
98 .childHandler(new ChannelInitializer<Channel>() {
100 @Override
101 protected void initChannel(Channel ch) throws Exception {
102 ChannelPipeline pipeline = ch.pipeline();
103 FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
104 preambleDecoder.setSingleDecode(true);
105 pipeline.addLast("preambleDecoder", preambleDecoder);
106 pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler());
107 pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize));
108 pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
109 pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
112 try {
113 serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
114 LOG.info("Bind to {}", serverChannel.localAddress());
115 } catch (InterruptedException e) {
116 throw new InterruptedIOException(e.getMessage());
118 initReconfigurable(conf);
119 this.scheduler.init(new RpcSchedulerContext(this));
122 @InterfaceAudience.Private
123 protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
124 return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
127 @Override
128 public synchronized void start() {
129 if (started) {
130 return;
132 authTokenSecretMgr = createSecretManager();
133 if (authTokenSecretMgr != null) {
134 // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
135 // LeaderElector start. See HBASE-25875
136 synchronized (authTokenSecretMgr) {
137 setSecretManager(authTokenSecretMgr);
138 authTokenSecretMgr.start();
141 this.authManager = new ServiceAuthorizationManager();
142 HBasePolicyProvider.init(conf, authManager);
143 scheduler.start();
144 started = true;
147 @Override
148 public synchronized void stop() {
149 if (!running) {
150 return;
152 LOG.info("Stopping server on " + this.serverChannel.localAddress());
153 if (authTokenSecretMgr != null) {
154 authTokenSecretMgr.stop();
155 authTokenSecretMgr = null;
157 allChannels.close().awaitUninterruptibly();
158 serverChannel.close();
159 scheduler.stop();
160 closed.countDown();
161 running = false;
164 @Override
165 public synchronized void join() throws InterruptedException {
166 closed.await();
169 @Override
170 public synchronized InetSocketAddress getListenerAddress() {
171 return ((InetSocketAddress) serverChannel.localAddress());
174 @Override
175 public void setSocketSendBufSize(int size) {
178 @Override
179 public int getNumOpenConnections() {
180 int channelsCount = allChannels.size();
181 // allChannels also contains the server channel, so exclude that from the count.
182 return channelsCount > 0 ? channelsCount - 1 : channelsCount;