2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import java
.io
.IOException
;
21 import java
.util
.ArrayList
;
22 import java
.util
.Collections
;
23 import java
.util
.HashSet
;
24 import java
.util
.List
;
26 import java
.util
.concurrent
.Executors
;
27 import java
.util
.concurrent
.ScheduledExecutorService
;
28 import java
.util
.concurrent
.ThreadLocalRandom
;
29 import java
.util
.concurrent
.TimeUnit
;
30 import java
.util
.stream
.Collectors
;
31 import org
.apache
.hadoop
.conf
.Configuration
;
32 import org
.apache
.hadoop
.hbase
.HBaseRpcServicesBase
;
33 import org
.apache
.hadoop
.hbase
.ServerName
;
34 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
35 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
36 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
37 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
;
38 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
.ExponentialBackoffPolicyWithLimit
;
39 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
.RetryConfig
;
40 import org
.apache
.hadoop
.hbase
.util
.RetryCounterFactory
;
41 import org
.apache
.hadoop
.hbase
.zookeeper
.MasterAddressTracker
;
42 import org
.apache
.yetus
.audience
.InterfaceAudience
;
43 import org
.slf4j
.Logger
;
44 import org
.slf4j
.LoggerFactory
;
46 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
49 * Manage the bootstrap node list at region server side.
51 * It will request master first to get the initial set of bootstrap nodes(a sub set of live region
52 * servers), and then it will exchange the bootstrap nodes with other bootstrap nodes. In most
53 * cases, if the cluster is stable, we do not need to request master again until we reach the
54 * request master interval. And if the current number of bootstrap nodes is not enough, we will
55 * request master soon.
57 * The algorithm is very simple, as we will always fallback to request master. THe trick here is
58 * that, if we can not get enough bootstrap nodes from master, then the cluster will be small, so it
59 * will not put too much pressure on master if we always request master. And for large clusters, we
60 * will soon get enough bootstrap nodes and stop requesting master.
62 @InterfaceAudience.Private
63 public class BootstrapNodeManager
{
65 private static final Logger LOG
= LoggerFactory
.getLogger(BootstrapNodeManager
.class);
67 public static final String REQUEST_MASTER_INTERVAL_SECS
=
68 "hbase.server.bootstrap.request_master_interval.secs";
70 // default request every 10 minutes
71 public static final long DEFAULT_REQUEST_MASTER_INTERVAL_SECS
= TimeUnit
.MINUTES
.toSeconds(10);
73 public static final String REQUEST_MASTER_MIN_INTERVAL_SECS
=
74 "hbase.server.bootstrap.request_master_min_interval.secs";
77 public static final long DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS
= 30;
79 public static final String REQUEST_REGIONSERVER_INTERVAL_SECS
=
80 "hbase.server.bootstrap.request_regionserver_interval.secs";
82 // default request every 30 seconds
83 public static final long DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS
= 30;
85 private static final float JITTER
= 0.2f
;
87 private volatile List
<ServerName
> nodes
= Collections
.emptyList();
89 private final AsyncClusterConnection conn
;
91 private final MasterAddressTracker masterAddrTracker
;
93 private final ScheduledExecutorService executor
= Executors
.newSingleThreadScheduledExecutor(
94 new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName()).build());
96 private final long requestMasterIntervalSecs
;
98 private final long requestMasterMinIntervalSecs
;
100 private final long requestRegionServerIntervalSecs
;
102 private final int maxNodeCount
;
104 private final RetryCounterFactory retryCounterFactory
;
106 private RetryCounter retryCounter
;
108 private long lastRequestMasterTime
;
110 public BootstrapNodeManager(AsyncClusterConnection conn
, MasterAddressTracker masterAddrTracker
) {
112 this.masterAddrTracker
= masterAddrTracker
;
113 Configuration conf
= conn
.getConfiguration();
114 requestMasterIntervalSecs
=
115 conf
.getLong(REQUEST_MASTER_INTERVAL_SECS
, DEFAULT_REQUEST_MASTER_INTERVAL_SECS
);
116 requestMasterMinIntervalSecs
=
117 conf
.getLong(REQUEST_MASTER_MIN_INTERVAL_SECS
, DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS
);
118 requestRegionServerIntervalSecs
=
119 conf
.getLong(REQUEST_REGIONSERVER_INTERVAL_SECS
, DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS
);
120 maxNodeCount
= conf
.getInt(HBaseRpcServicesBase
.CLIENT_BOOTSTRAP_NODE_LIMIT
,
121 HBaseRpcServicesBase
.DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT
);
122 retryCounterFactory
= new RetryCounterFactory(
123 new RetryConfig().setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()).setJitter(JITTER
)
124 .setSleepInterval(requestMasterMinIntervalSecs
).setMaxSleepTime(requestMasterIntervalSecs
)
125 .setTimeUnit(TimeUnit
.SECONDS
));
126 executor
.schedule(this::getFromMaster
, getDelay(requestMasterMinIntervalSecs
),
130 private long getDelay(long delay
) {
131 long jitterDelay
= (long) (delay
* ThreadLocalRandom
.current().nextFloat() * JITTER
);
132 return delay
+ jitterDelay
;
135 private void getFromMaster() {
136 List
<ServerName
> liveRegionServers
;
138 // get 2 times number of node
140 FutureUtils
.get(conn
.getLiveRegionServers(masterAddrTracker
, maxNodeCount
* 2));
141 } catch (IOException e
) {
142 LOG
.warn("failed to get live region servers from master", e
);
143 if (retryCounter
== null) {
144 retryCounter
= retryCounterFactory
.create();
146 executor
.schedule(this::getFromMaster
, retryCounter
.getBackoffTimeAndIncrementAttempts(),
151 lastRequestMasterTime
= EnvironmentEdgeManager
.currentTime();
152 this.nodes
= Collections
.unmodifiableList(liveRegionServers
);
153 if (liveRegionServers
.size() < maxNodeCount
) {
154 // If the number of live region servers is small, it means the cluster is small, so requesting
155 // master with a higher frequency will not be a big problem, so here we will always request
156 // master to get the live region servers as bootstrap nodes.
157 executor
.schedule(this::getFromMaster
, getDelay(requestMasterMinIntervalSecs
),
161 // schedule tasks to exchange the bootstrap nodes with other region servers.
162 executor
.schedule(this::getFromRegionServer
, getDelay(requestRegionServerIntervalSecs
),
166 // this method is also used to test whether a given region server is still alive.
167 private void getFromRegionServer() {
168 if (EnvironmentEdgeManager
.currentTime() - lastRequestMasterTime
>= TimeUnit
.SECONDS
169 .toMillis(requestMasterIntervalSecs
)) {
170 // schedule a get from master task immediately if haven't request master for more than
171 // requestMasterIntervalSecs
172 executor
.execute(this::getFromMaster
);
175 List
<ServerName
> currentList
= this.nodes
;
176 ServerName peer
= currentList
.get(ThreadLocalRandom
.current().nextInt(currentList
.size()));
177 List
<ServerName
> otherList
;
179 otherList
= FutureUtils
.get(conn
.getAllBootstrapNodes(peer
));
180 } catch (IOException e
) {
181 LOG
.warn("failed to request region server {}", peer
, e
);
182 // remove this region server from the list since it can not respond successfully
183 List
<ServerName
> newList
= currentList
.stream().filter(sn
-> sn
!= peer
)
184 .collect(Collectors
.collectingAndThen(Collectors
.toList(), Collections
::unmodifiableList
));
185 this.nodes
= newList
;
186 if (newList
.size() < maxNodeCount
) {
187 // schedule a get from master task immediately
188 executor
.execute(this::getFromMaster
);
190 executor
.schedule(this::getFromRegionServer
, getDelay(requestRegionServerIntervalSecs
),
195 // randomly select new live region server list
196 Set
<ServerName
> newRegionServers
= new HashSet
<ServerName
>(currentList
);
197 newRegionServers
.addAll(otherList
);
198 List
<ServerName
> newList
= new ArrayList
<ServerName
>(newRegionServers
);
199 Collections
.shuffle(newList
, ThreadLocalRandom
.current());
200 int expectedListSize
= maxNodeCount
* 2;
201 if (newList
.size() <= expectedListSize
) {
202 this.nodes
= Collections
.unmodifiableList(newList
);
205 Collections
.unmodifiableList(new ArrayList
<>(newList
.subList(0, expectedListSize
)));
207 // schedule a new get from region server task
208 executor
.schedule(this::getFromRegionServer
, requestRegionServerIntervalSecs
, TimeUnit
.SECONDS
);
212 executor
.shutdownNow();
215 public List
<ServerName
> getBootstrapNodes() {