2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER
;
22 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_COORDINATED_BY_ZK
;
23 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_MAX_SPLITTER
;
24 import static org
.apache
.hadoop
.hbase
.util
.DNS
.UNSAFE_RS_HOSTNAME_KEY
;
26 import java
.io
.IOException
;
27 import java
.io
.PrintWriter
;
28 import java
.lang
.management
.MemoryType
;
29 import java
.lang
.management
.MemoryUsage
;
30 import java
.lang
.reflect
.Constructor
;
31 import java
.net
.BindException
;
32 import java
.net
.InetAddress
;
33 import java
.net
.InetSocketAddress
;
34 import java
.time
.Duration
;
35 import java
.util
.ArrayList
;
36 import java
.util
.Collection
;
37 import java
.util
.Collections
;
38 import java
.util
.Comparator
;
39 import java
.util
.HashSet
;
40 import java
.util
.List
;
42 import java
.util
.Map
.Entry
;
43 import java
.util
.Objects
;
44 import java
.util
.Optional
;
46 import java
.util
.SortedMap
;
47 import java
.util
.Timer
;
48 import java
.util
.TimerTask
;
49 import java
.util
.TreeMap
;
50 import java
.util
.TreeSet
;
51 import java
.util
.concurrent
.ConcurrentHashMap
;
52 import java
.util
.concurrent
.ConcurrentMap
;
53 import java
.util
.concurrent
.ConcurrentSkipListMap
;
54 import java
.util
.concurrent
.TimeUnit
;
55 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
56 import java
.util
.concurrent
.locks
.ReentrantReadWriteLock
;
57 import java
.util
.stream
.Collectors
;
58 import javax
.management
.MalformedObjectNameException
;
59 import javax
.servlet
.http
.HttpServlet
;
60 import org
.apache
.commons
.lang3
.RandomUtils
;
61 import org
.apache
.commons
.lang3
.StringUtils
;
62 import org
.apache
.commons
.lang3
.SystemUtils
;
63 import org
.apache
.hadoop
.conf
.Configuration
;
64 import org
.apache
.hadoop
.fs
.FileSystem
;
65 import org
.apache
.hadoop
.fs
.Path
;
66 import org
.apache
.hadoop
.hbase
.Abortable
;
67 import org
.apache
.hadoop
.hbase
.CacheEvictionStats
;
68 import org
.apache
.hadoop
.hbase
.CallQueueTooBigException
;
69 import org
.apache
.hadoop
.hbase
.ChoreService
;
70 import org
.apache
.hadoop
.hbase
.ClockOutOfSyncException
;
71 import org
.apache
.hadoop
.hbase
.CoordinatedStateManager
;
72 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
73 import org
.apache
.hadoop
.hbase
.ExecutorStatusChore
;
74 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
75 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
76 import org
.apache
.hadoop
.hbase
.HConstants
;
77 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
78 import org
.apache
.hadoop
.hbase
.HealthCheckChore
;
79 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
80 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
81 import org
.apache
.hadoop
.hbase
.PleaseHoldException
;
82 import org
.apache
.hadoop
.hbase
.ScheduledChore
;
83 import org
.apache
.hadoop
.hbase
.ServerName
;
84 import org
.apache
.hadoop
.hbase
.Stoppable
;
85 import org
.apache
.hadoop
.hbase
.TableDescriptors
;
86 import org
.apache
.hadoop
.hbase
.TableName
;
87 import org
.apache
.hadoop
.hbase
.YouAreDeadException
;
88 import org
.apache
.hadoop
.hbase
.ZNodeClearer
;
89 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
90 import org
.apache
.hadoop
.hbase
.client
.ClusterConnectionFactory
;
91 import org
.apache
.hadoop
.hbase
.client
.Connection
;
92 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
93 import org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
;
94 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
95 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
96 import org
.apache
.hadoop
.hbase
.client
.locking
.EntityLock
;
97 import org
.apache
.hadoop
.hbase
.client
.locking
.LockServiceClient
;
98 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationManager
;
99 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationObserver
;
100 import org
.apache
.hadoop
.hbase
.coordination
.ZkCoordinatedStateManager
;
101 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
102 import org
.apache
.hadoop
.hbase
.exceptions
.RegionMovedException
;
103 import org
.apache
.hadoop
.hbase
.exceptions
.RegionOpeningException
;
104 import org
.apache
.hadoop
.hbase
.exceptions
.UnknownProtocolException
;
105 import org
.apache
.hadoop
.hbase
.executor
.ExecutorService
;
106 import org
.apache
.hadoop
.hbase
.executor
.ExecutorType
;
107 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
;
108 import org
.apache
.hadoop
.hbase
.http
.InfoServer
;
109 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
110 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCacheFactory
;
111 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
112 import org
.apache
.hadoop
.hbase
.io
.util
.MemorySizeUtil
;
113 import org
.apache
.hadoop
.hbase
.ipc
.CoprocessorRpcUtils
;
114 import org
.apache
.hadoop
.hbase
.ipc
.NettyRpcClientConfigHelper
;
115 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
116 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
117 import org
.apache
.hadoop
.hbase
.ipc
.RpcServerInterface
;
118 import org
.apache
.hadoop
.hbase
.ipc
.ServerNotRunningYetException
;
119 import org
.apache
.hadoop
.hbase
.ipc
.ServerRpcController
;
120 import org
.apache
.hadoop
.hbase
.log
.HBaseMarkers
;
121 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
122 import org
.apache
.hadoop
.hbase
.master
.MasterRpcServicesVersionWrapper
;
123 import org
.apache
.hadoop
.hbase
.master
.RegionState
;
124 import org
.apache
.hadoop
.hbase
.master
.balancer
.BaseLoadBalancer
;
125 import org
.apache
.hadoop
.hbase
.mob
.MobFileCache
;
126 import org
.apache
.hadoop
.hbase
.namequeues
.NamedQueueRecorder
;
127 import org
.apache
.hadoop
.hbase
.namequeues
.SlowLogTableOpsChore
;
128 import org
.apache
.hadoop
.hbase
.net
.Address
;
129 import org
.apache
.hadoop
.hbase
.procedure
.RegionServerProcedureManagerHost
;
130 import org
.apache
.hadoop
.hbase
.procedure2
.RSProcedureCallable
;
131 import org
.apache
.hadoop
.hbase
.quotas
.FileSystemUtilizationChore
;
132 import org
.apache
.hadoop
.hbase
.quotas
.QuotaUtil
;
133 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerRpcQuotaManager
;
134 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerSpaceQuotaManager
;
135 import org
.apache
.hadoop
.hbase
.quotas
.RegionSize
;
136 import org
.apache
.hadoop
.hbase
.quotas
.RegionSizeStore
;
137 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionConfiguration
;
138 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionLifeCycleTracker
;
139 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionProgress
;
140 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequester
;
141 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseMetaHandler
;
142 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseRegionHandler
;
143 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.RSProcedureHandler
;
144 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.RegionReplicaFlushHandler
;
145 import org
.apache
.hadoop
.hbase
.regionserver
.http
.RSDumpServlet
;
146 import org
.apache
.hadoop
.hbase
.regionserver
.http
.RSStatusServlet
;
147 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.FlushThroughputControllerFactory
;
148 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.ThroughputController
;
149 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationLoad
;
150 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationSourceInterface
;
151 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationStatus
;
152 import org
.apache
.hadoop
.hbase
.security
.SecurityConstants
;
153 import org
.apache
.hadoop
.hbase
.security
.Superusers
;
154 import org
.apache
.hadoop
.hbase
.security
.User
;
155 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
156 import org
.apache
.hadoop
.hbase
.security
.access
.AccessChecker
;
157 import org
.apache
.hadoop
.hbase
.security
.access
.ZKPermissionWatcher
;
158 import org
.apache
.hadoop
.hbase
.util
.Addressing
;
159 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
160 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
161 import org
.apache
.hadoop
.hbase
.util
.CompressionTest
;
162 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
163 import org
.apache
.hadoop
.hbase
.util
.FSTableDescriptors
;
164 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
165 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
166 import org
.apache
.hadoop
.hbase
.util
.JvmPauseMonitor
;
167 import org
.apache
.hadoop
.hbase
.util
.NettyEventLoopGroupConfig
;
168 import org
.apache
.hadoop
.hbase
.util
.Pair
;
169 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
;
170 import org
.apache
.hadoop
.hbase
.util
.RetryCounterFactory
;
171 import org
.apache
.hadoop
.hbase
.util
.ServerRegionReplicaUtil
;
172 import org
.apache
.hadoop
.hbase
.util
.Sleeper
;
173 import org
.apache
.hadoop
.hbase
.util
.Threads
;
174 import org
.apache
.hadoop
.hbase
.util
.VersionInfo
;
175 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
176 import org
.apache
.hadoop
.hbase
.wal
.NettyAsyncFSWALConfigHelper
;
177 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
178 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
179 import org
.apache
.hadoop
.hbase
.zookeeper
.ClusterStatusTracker
;
180 import org
.apache
.hadoop
.hbase
.zookeeper
.MasterAddressTracker
;
181 import org
.apache
.hadoop
.hbase
.zookeeper
.MetaTableLocator
;
182 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKClusterId
;
183 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKNodeTracker
;
184 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
185 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
186 import org
.apache
.hadoop
.hbase
.zookeeper
.ZNodePaths
;
187 import org
.apache
.hadoop
.ipc
.RemoteException
;
188 import org
.apache
.hadoop
.util
.ReflectionUtils
;
189 import org
.apache
.yetus
.audience
.InterfaceAudience
;
190 import org
.apache
.zookeeper
.KeeperException
;
191 import org
.slf4j
.Logger
;
192 import org
.slf4j
.LoggerFactory
;
193 import sun
.misc
.Signal
;
195 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
196 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
197 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.Cache
;
198 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.CacheBuilder
;
199 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
200 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.BlockingRpcChannel
;
201 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.MethodDescriptor
;
202 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.ServiceDescriptor
;
203 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
204 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
205 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Service
;
206 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
207 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.TextFormat
;
208 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.UnsafeByteOperations
;
210 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
211 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
212 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceCall
;
213 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceRequest
;
214 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceResponse
;
215 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
;
216 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionLoad
;
217 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionStoreSequenceIds
;
218 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.UserLoad
;
219 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
;
220 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
.Builder
;
221 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.NameStringPair
;
222 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionServerInfo
;
223 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
;
224 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
.RegionSpecifierType
;
225 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.LockServiceProtos
.LockService
;
226 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
;
227 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdRequest
;
228 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdResponse
;
229 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerReportRequest
;
230 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupRequest
;
231 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupResponse
;
232 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStatusService
;
233 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionSpaceUse
;
234 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionSpaceUseReportRequest
;
235 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
;
236 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
.TransitionCode
;
237 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportProcedureDoneRequest
;
238 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRSFatalErrorRequest
;
239 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionRequest
;
240 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionResponse
;
243 * HRegionServer makes a set of HRegions available to clients. It checks in with
244 * the HMaster. There are many HRegionServers in a single HBase deployment.
246 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
247 @SuppressWarnings({ "deprecation"})
248 public class HRegionServer
extends Thread
implements
249 RegionServerServices
, LastSequenceId
, ConfigurationObserver
{
250 private static final Logger LOG
= LoggerFactory
.getLogger(HRegionServer
.class);
253 * For testing only! Set to true to skip notifying region assignment to master .
255 @InterfaceAudience.Private
256 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="MS_SHOULD_BE_FINAL")
257 public static boolean TEST_SKIP_REPORTING_TRANSITION
= false;
260 * A map from RegionName to current action in progress. Boolean value indicates:
261 * true - if open region action in progress
262 * false - if close region action in progress
264 private final ConcurrentMap
<byte[], Boolean
> regionsInTransitionInRS
=
265 new ConcurrentSkipListMap
<>(Bytes
.BYTES_COMPARATOR
);
268 * Used to cache the open/close region procedures which already submitted.
269 * See {@link #submitRegionProcedure(long)}.
271 private final ConcurrentMap
<Long
, Long
> submittedRegionProcedures
= new ConcurrentHashMap
<>();
273 * Used to cache the open/close region procedures which already executed.
274 * See {@link #submitRegionProcedure(long)}.
276 private final Cache
<Long
, Long
> executedRegionProcedures
=
277 CacheBuilder
.newBuilder().expireAfterAccess(600, TimeUnit
.SECONDS
).build();
280 * Used to cache the moved-out regions
282 private final Cache
<String
, MovedRegionInfo
> movedRegionInfoCache
=
283 CacheBuilder
.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(),
284 TimeUnit
.MILLISECONDS
).build();
286 private MemStoreFlusher cacheFlusher
;
288 private HeapMemoryManager hMemManager
;
291 * The asynchronous cluster connection to be shared by services.
293 protected AsyncClusterConnection asyncClusterConnection
;
296 * Go here to get table descriptors.
298 protected TableDescriptors tableDescriptors
;
300 // Replication services. If no replication, this handler will be null.
301 private ReplicationSourceService replicationSourceHandler
;
302 private ReplicationSinkService replicationSinkHandler
;
303 private boolean sameReplicationSourceAndSink
;
306 private CompactSplit compactSplitThread
;
309 * Map of regions currently being served by this region server. Key is the
310 * encoded region name. All access should be synchronized.
312 private final Map
<String
, HRegion
> onlineRegions
= new ConcurrentHashMap
<>();
314 * Lock for gating access to {@link #onlineRegions}.
315 * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap?
317 private final ReentrantReadWriteLock onlineRegionsLock
= new ReentrantReadWriteLock();
320 * Map of encoded region names to the DataNode locations they should be hosted on
321 * We store the value as Address since InetSocketAddress is required by the HDFS
322 * API (create() that takes favored nodes as hints for placing file blocks).
323 * We could have used ServerName here as the value class, but we'd need to
324 * convert it to InetSocketAddress at some point before the HDFS API call, and
325 * it seems a bit weird to store ServerName since ServerName refers to RegionServers
326 * and here we really mean DataNode locations. We don't store it as InetSocketAddress
327 * here because the conversion on demand from Address to InetSocketAddress will
328 * guarantee the resolution results will be fresh when we need it.
330 private final Map
<String
, Address
[]> regionFavoredNodesMap
= new ConcurrentHashMap
<>();
332 private LeaseManager leaseManager
;
334 // Instance of the hbase executor executorService.
335 protected ExecutorService executorService
;
337 private volatile boolean dataFsOk
;
338 private HFileSystem dataFs
;
339 private HFileSystem walFs
;
341 // Set when a report to the master comes back with a message asking us to
342 // shutdown. Also set by call to stop when debugging or running unit tests
343 // of HRegionServer in isolation.
344 private volatile boolean stopped
= false;
346 // Go down hard. Used if file system becomes unavailable and also in
347 // debugging and unit tests.
348 private AtomicBoolean abortRequested
;
349 static final String ABORT_TIMEOUT
= "hbase.regionserver.abort.timeout";
350 // Default abort timeout is 1200 seconds for safe
351 private static final long DEFAULT_ABORT_TIMEOUT
= 1200000;
352 // Will run this task when abort timeout
353 static final String ABORT_TIMEOUT_TASK
= "hbase.regionserver.abort.timeout.task";
355 // A state before we go into stopped state. At this stage we're closing user
357 private boolean stopping
= false;
358 private volatile boolean killed
= false;
359 private volatile boolean shutDown
= false;
361 protected final Configuration conf
;
363 private Path dataRootDir
;
364 private Path walRootDir
;
366 private final int threadWakeFrequency
;
367 final int msgInterval
;
369 private static final String PERIOD_COMPACTION
= "hbase.regionserver.compaction.check.period";
370 private final int compactionCheckFrequency
;
371 private static final String PERIOD_FLUSH
= "hbase.regionserver.flush.check.period";
372 private final int flushCheckFrequency
;
374 // Stub to do region server status calls against the master.
375 private volatile RegionServerStatusService
.BlockingInterface rssStub
;
376 private volatile LockService
.BlockingInterface lockStub
;
377 // RPC client. Used to make the stub above that does region server status checking.
378 private RpcClient rpcClient
;
380 private UncaughtExceptionHandler uncaughtExceptionHandler
;
382 // Info server. Default access so can be used by unit tests. REGIONSERVER
383 // is name of the webapp and the attribute name used stuffing this instance
385 protected InfoServer infoServer
;
386 private JvmPauseMonitor pauseMonitor
;
388 /** region server process name */
389 public static final String REGIONSERVER
= "regionserver";
392 private MetricsRegionServer metricsRegionServer
;
393 MetricsRegionServerWrapperImpl metricsRegionServerImpl
;
396 * ChoreService used to schedule tasks that we want to run periodically
398 private ChoreService choreService
;
401 * Check for compactions requests.
403 private ScheduledChore compactionChecker
;
408 private ScheduledChore periodicFlusher
;
410 private volatile WALFactory walFactory
;
412 private LogRoller walRoller
;
414 // A thread which calls reportProcedureDone
415 private RemoteProcedureResultReporter procedureResultReporter
;
417 // flag set after we're done setting up server threads
418 final AtomicBoolean online
= new AtomicBoolean(false);
420 // zookeeper connection and watcher
421 protected final ZKWatcher zooKeeper
;
423 // master address tracker
424 private final MasterAddressTracker masterAddressTracker
;
426 // Cluster Status Tracker
427 protected final ClusterStatusTracker clusterStatusTracker
;
429 // Log Splitting Worker
430 private SplitLogWorker splitLogWorker
;
432 // A sleeper that sleeps for msgInterval.
433 protected final Sleeper sleeper
;
435 private final int shortOperationTimeout
;
437 // Time to pause if master says 'please hold'
438 private final long retryPauseTime
;
440 private final RegionServerAccounting regionServerAccounting
;
442 private SlowLogTableOpsChore slowLogTableOpsChore
= null;
445 private BlockCache blockCache
;
446 // The cache for mob files
447 private MobFileCache mobFileCache
;
449 /** The health check chore. */
450 private HealthCheckChore healthCheckChore
;
452 /** The Executor status collect chore. */
453 private ExecutorStatusChore executorStatusChore
;
455 /** The nonce manager chore. */
456 private ScheduledChore nonceManagerChore
;
458 private Map
<String
, Service
> coprocessorServiceHandlers
= Maps
.newHashMap();
461 * The server name the Master sees us as. Its made from the hostname the
462 * master passes us, port, and server startcode. Gets set after registration
465 protected ServerName serverName
;
468 * hostname specified by hostname config
470 protected String useThisHostnameInstead
;
473 * @deprecated since 2.4.0 and will be removed in 4.0.0.
474 * Use {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
475 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
478 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
479 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
=
480 "hbase.regionserver.hostname.disable.master.reversedns";
483 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
484 * Exception will be thrown if both are used.
486 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
487 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
=
488 "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
491 * This servers startcode.
493 protected final long startcode
;
496 * Unique identifier for the cluster we are a part of.
498 protected String clusterId
;
500 // chore for refreshing store files for secondary regions
501 private StorefileRefresherChore storefileRefresher
;
503 private RegionServerCoprocessorHost rsHost
;
505 private RegionServerProcedureManagerHost rspmHost
;
507 private RegionServerRpcQuotaManager rsQuotaManager
;
508 private RegionServerSpaceQuotaManager rsSpaceQuotaManager
;
511 * Nonce manager. Nonces are used to make operations like increment and append idempotent
512 * in the case where client doesn't receive the response from a successful operation and
513 * retries. We track the successful ops for some time via a nonce sent by client and handle
514 * duplicate operations (currently, by failing them; in future we might use MVCC to return
515 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
517 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
518 * of past records. If we don't read the records, we don't read and recover the nonces.
519 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
520 * - There's no WAL recovery during normal region move, so nonces will not be transfered.
521 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
522 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
523 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
524 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
525 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
526 * latest nonce in it expired. It can also be recovered during move.
528 final ServerNonceManager nonceManager
;
530 private UserProvider userProvider
;
532 protected final RSRpcServices rpcServices
;
534 private CoordinatedStateManager csm
;
537 * Configuration manager is used to register/deregister and notify the configuration observers
538 * when the regionserver is notified that there was a change in the on disk configs.
540 protected final ConfigurationManager configurationManager
;
542 @InterfaceAudience.Private
543 CompactedHFilesDischarger compactedFileDischarger
;
545 private volatile ThroughputController flushThroughputController
;
547 private SecureBulkLoadManager secureBulkLoadManager
;
549 private FileSystemUtilizationChore fsUtilizationChore
;
551 private final NettyEventLoopGroupConfig eventLoopGroupConfig
;
554 * Provide online slow log responses from ringbuffer
556 private NamedQueueRecorder namedQueueRecorder
= null;
559 * True if this RegionServer is coming up in a cluster where there is no Master;
560 * means it needs to just come up and make do without a Master to talk to: e.g. in test or
561 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
562 * purpose is as a Replication-stream sink; see HBASE-18846 for more.
563 * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ?
565 private final boolean masterless
;
566 private static final String MASTERLESS_CONFIG_NAME
= "hbase.masterless";
568 /**regionserver codec list **/
569 private static final String REGIONSERVER_CODEC
= "hbase.regionserver.codecs";
571 // A timer to shutdown the process if abort takes too long
572 private Timer abortMonitor
;
575 * Starts a HRegionServer at the default location.
577 * Don't start any services or managers in here in the Constructor.
578 * Defer till after we register with the Master as much as possible. See {@link #startServices}.
580 public HRegionServer(final Configuration conf
) throws IOException
{
581 super("RegionServer"); // thread name
583 this.startcode
= EnvironmentEdgeManager
.currentTime();
585 this.dataFsOk
= true;
586 this.masterless
= conf
.getBoolean(MASTERLESS_CONFIG_NAME
, false);
587 this.eventLoopGroupConfig
= setupNetty(this.conf
);
588 MemorySizeUtil
.checkForClusterFreeHeapMemoryLimit(this.conf
);
589 HFile
.checkHFileVersion(this.conf
);
590 checkCodecs(this.conf
);
591 this.userProvider
= UserProvider
.instantiate(conf
);
592 FSUtils
.setupShortCircuitRead(this.conf
);
594 // Disable usage of meta replicas in the regionserver
595 this.conf
.setBoolean(HConstants
.USE_META_REPLICAS
, false);
597 this.threadWakeFrequency
= conf
.getInt(HConstants
.THREAD_WAKE_FREQUENCY
, 10 * 1000);
598 this.compactionCheckFrequency
= conf
.getInt(PERIOD_COMPACTION
, this.threadWakeFrequency
);
599 this.flushCheckFrequency
= conf
.getInt(PERIOD_FLUSH
, this.threadWakeFrequency
);
600 this.msgInterval
= conf
.getInt("hbase.regionserver.msginterval", 3 * 1000);
602 this.sleeper
= new Sleeper(this.msgInterval
, this);
604 boolean isNoncesEnabled
= conf
.getBoolean(HConstants
.HBASE_RS_NONCES_ENABLED
, true);
605 this.nonceManager
= isNoncesEnabled ?
new ServerNonceManager(this.conf
) : null;
607 this.shortOperationTimeout
= conf
.getInt(HConstants
.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY
,
608 HConstants
.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT
);
610 this.retryPauseTime
= conf
.getLong(HConstants
.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME
,
611 HConstants
.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME
);
613 this.abortRequested
= new AtomicBoolean(false);
614 this.stopped
= false;
616 initNamedQueueRecorder(conf
);
617 rpcServices
= createRpcServices();
618 useThisHostnameInstead
= getUseThisHostnameInstead(conf
);
620 StringUtils
.isBlank(useThisHostnameInstead
) ?
this.rpcServices
.isa
.getHostName()
621 : this.useThisHostnameInstead
;
622 serverName
= ServerName
.valueOf(hostName
, this.rpcServices
.isa
.getPort(), this.startcode
);
624 // login the zookeeper client principal (if using security)
625 ZKUtil
.loginClient(this.conf
, HConstants
.ZK_CLIENT_KEYTAB_FILE
,
626 HConstants
.ZK_CLIENT_KERBEROS_PRINCIPAL
, hostName
);
627 // login the server principal (if using secure Hadoop)
628 login(userProvider
, hostName
);
629 // init superusers and add the server principal (if using security)
630 // or process owner as default super user.
631 Superusers
.initialize(conf
);
632 regionServerAccounting
= new RegionServerAccounting(conf
);
634 boolean isMasterNotCarryTable
=
635 this instanceof HMaster
&& !((HMaster
) this).isInMaintenanceMode();
637 // no need to instantiate block cache and mob file cache when master not carry table
638 if (!isMasterNotCarryTable
) {
639 blockCache
= BlockCacheFactory
.createBlockCache(conf
);
640 mobFileCache
= new MobFileCache(conf
);
643 uncaughtExceptionHandler
=
644 (t
, e
) -> abort("Uncaught exception in executorService thread " + t
.getName(), e
);
646 initializeFileSystem();
648 this.configurationManager
= new ConfigurationManager();
649 setupWindows(getConfiguration(), getConfigurationManager());
651 // Some unit tests don't need a cluster, so no zookeeper at all
652 // Open connection to zookeeper and set primary watcher
653 zooKeeper
= new ZKWatcher(conf
, getProcessName() + ":" + rpcServices
.isa
.getPort(), this,
654 canCreateBaseZNode());
655 // If no master in cluster, skip trying to track one or look for a cluster status.
656 if (!this.masterless
) {
657 if (conf
.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK
,
658 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
)) {
659 this.csm
= new ZkCoordinatedStateManager(this);
662 masterAddressTracker
= new MasterAddressTracker(getZooKeeper(), this);
663 masterAddressTracker
.start();
665 clusterStatusTracker
= new ClusterStatusTracker(zooKeeper
, this);
666 clusterStatusTracker
.start();
668 masterAddressTracker
= null;
669 clusterStatusTracker
= null;
671 this.rpcServices
.start(zooKeeper
);
672 // This violates 'no starting stuff in Constructor' but Master depends on the below chore
673 // and executor being created and takes a different startup route. Lots of overlap between HRS
674 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
675 // Master expects Constructor to put up web servers. Ugh.
677 this.choreService
= new ChoreService(getName(), true);
678 this.executorService
= new ExecutorService(getName());
680 } catch (Throwable t
) {
681 // Make sure we log the exception. HRegionServer is often started via reflection and the
682 // cause of failed startup is lost.
683 LOG
.error("Failed construction RegionServer", t
);
688 private void initNamedQueueRecorder(Configuration conf
) {
689 if (!(this instanceof HMaster
)) {
690 final boolean isOnlineLogProviderEnabled
= conf
.getBoolean(
691 HConstants
.SLOW_LOG_BUFFER_ENABLED_KEY
,
692 HConstants
.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED
);
693 if (isOnlineLogProviderEnabled
) {
694 this.namedQueueRecorder
= NamedQueueRecorder
.getInstance(this.conf
);
697 final boolean isBalancerDecisionRecording
= conf
698 .getBoolean(BaseLoadBalancer
.BALANCER_DECISION_BUFFER_ENABLED
,
699 BaseLoadBalancer
.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED
);
700 final boolean isBalancerRejectionRecording
= conf
701 .getBoolean(BaseLoadBalancer
.BALANCER_REJECTION_BUFFER_ENABLED
,
702 BaseLoadBalancer
.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED
);
703 if (isBalancerDecisionRecording
|| isBalancerRejectionRecording
) {
704 this.namedQueueRecorder
= NamedQueueRecorder
.getInstance(this.conf
);
709 // HMaster should override this method to load the specific config for master
710 protected String
getUseThisHostnameInstead(Configuration conf
) throws IOException
{
711 String hostname
= conf
.get(UNSAFE_RS_HOSTNAME_KEY
);
712 if (conf
.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
, false)) {
713 if (!StringUtils
.isBlank(hostname
)) {
714 String msg
= UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
+ " and " + UNSAFE_RS_HOSTNAME_KEY
+
715 " are mutually exclusive. Do not set " + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
+
716 " to true while " + UNSAFE_RS_HOSTNAME_KEY
+ " is used";
717 throw new IOException(msg
);
719 return rpcServices
.isa
.getHostName();
727 * If running on Windows, do windows-specific setup.
729 private static void setupWindows(final Configuration conf
, ConfigurationManager cm
) {
730 if (!SystemUtils
.IS_OS_WINDOWS
) {
731 Signal
.handle(new Signal("HUP"), signal
-> {
732 conf
.reloadConfiguration();
733 cm
.notifyAllObservers(conf
);
738 private static NettyEventLoopGroupConfig
setupNetty(Configuration conf
) {
739 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
740 NettyEventLoopGroupConfig nelgc
=
741 new NettyEventLoopGroupConfig(conf
, "RS-EventLoopGroup");
742 NettyRpcClientConfigHelper
.setEventLoopConfig(conf
, nelgc
.group(), nelgc
.clientChannelClass());
743 NettyAsyncFSWALConfigHelper
.setEventLoopConfig(conf
, nelgc
.group(), nelgc
.clientChannelClass());
747 private void initializeFileSystem() throws IOException
{
748 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
749 // checksum verification enabled, then automatically switch off hdfs checksum verification.
750 boolean useHBaseChecksum
= conf
.getBoolean(HConstants
.HBASE_CHECKSUM_VERIFICATION
, true);
751 String walDirUri
= CommonFSUtils
.getDirUri(this.conf
,
752 new Path(conf
.get(CommonFSUtils
.HBASE_WAL_DIR
, conf
.get(HConstants
.HBASE_DIR
))));
754 if (walDirUri
!= null) {
755 CommonFSUtils
.setFsDefault(this.conf
, walDirUri
);
758 this.walFs
= new HFileSystem(this.conf
, useHBaseChecksum
);
759 this.walRootDir
= CommonFSUtils
.getWALRootDir(this.conf
);
760 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
761 // underlying hadoop hdfs accessors will be going against wrong filesystem
762 // (unless all is set to defaults).
764 CommonFSUtils
.getDirUri(this.conf
, new Path(conf
.get(HConstants
.HBASE_DIR
)));
765 if (rootDirUri
!= null) {
766 CommonFSUtils
.setFsDefault(this.conf
, rootDirUri
);
768 // init the filesystem
769 this.dataFs
= new HFileSystem(this.conf
, useHBaseChecksum
);
770 this.dataRootDir
= CommonFSUtils
.getRootDir(this.conf
);
771 this.tableDescriptors
= new FSTableDescriptors(this.dataFs
, this.dataRootDir
,
772 !canUpdateTableDescriptor(), cacheTableDescriptor());
775 protected void login(UserProvider user
, String host
) throws IOException
{
776 user
.login(SecurityConstants
.REGIONSERVER_KRB_KEYTAB_FILE
,
777 SecurityConstants
.REGIONSERVER_KRB_PRINCIPAL
, host
);
781 * Wait for an active Master.
782 * See override in Master superclass for how it is used.
784 protected void waitForMasterActive() {}
786 protected String
getProcessName() {
790 protected boolean canCreateBaseZNode() {
791 return this.masterless
;
794 protected boolean canUpdateTableDescriptor() {
798 protected boolean cacheTableDescriptor() {
802 protected RSRpcServices
createRpcServices() throws IOException
{
803 return new RSRpcServices(this);
806 protected void configureInfoServer() {
807 infoServer
.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet
.class);
808 infoServer
.setAttribute(REGIONSERVER
, this);
811 protected Class
<?
extends HttpServlet
> getDumpServlet() {
812 return RSDumpServlet
.class;
816 * Used by {@link RSDumpServlet} to generate debugging information.
818 public void dumpRowLocks(final PrintWriter out
) {
819 StringBuilder sb
= new StringBuilder();
820 for (HRegion region
: getRegions()) {
821 if (region
.getLockedRows().size() > 0) {
822 for (HRegion
.RowLockContext rowLockContext
: region
.getLockedRows().values()) {
824 sb
.append(region
.getTableDescriptor().getTableName()).append(",")
825 .append(region
.getRegionInfo().getEncodedName()).append(",");
826 sb
.append(rowLockContext
.toString());
834 public boolean registerService(Service instance
) {
835 // No stacking of instances is allowed for a single executorService name
836 ServiceDescriptor serviceDesc
= instance
.getDescriptorForType();
837 String serviceName
= CoprocessorRpcUtils
.getServiceName(serviceDesc
);
838 if (coprocessorServiceHandlers
.containsKey(serviceName
)) {
839 LOG
.error("Coprocessor executorService " + serviceName
+
840 " already registered, rejecting request from " + instance
);
844 coprocessorServiceHandlers
.put(serviceName
, instance
);
845 if (LOG
.isDebugEnabled()) {
847 "Registered regionserver coprocessor executorService: executorService=" + serviceName
);
852 private Configuration
cleanupConfiguration() {
853 Configuration conf
= this.conf
;
854 // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
855 // - Decouples RS and master life cycles. RegionServers can continue be up independent of
856 // masters' availability.
857 // - Configuration management for region servers (cluster internal) is much simpler when adding
858 // new masters or removing existing masters, since only clients' config needs to be updated.
859 // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
860 // other internal connections too.
861 conf
.set(HConstants
.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY
,
862 HConstants
.ZK_CONNECTION_REGISTRY_CLASS
);
863 if (conf
.get(HConstants
.CLIENT_ZOOKEEPER_QUORUM
) != null) {
864 // Use server ZK cluster for server-issued connections, so we clone
865 // the conf and unset the client ZK related properties
866 conf
= new Configuration(this.conf
);
867 conf
.unset(HConstants
.CLIENT_ZOOKEEPER_QUORUM
);
873 * Run test on configured codecs to make sure supporting libs are in place.
875 private static void checkCodecs(final Configuration c
) throws IOException
{
876 // check to see if the codec list is available:
877 String
[] codecs
= c
.getStrings(REGIONSERVER_CODEC
, (String
[])null);
878 if (codecs
== null) return;
879 for (String codec
: codecs
) {
880 if (!CompressionTest
.testCompression(codec
)) {
881 throw new IOException("Compression codec " + codec
+
882 " not supported, aborting RS construction");
887 public String
getClusterId() {
888 return this.clusterId
;
892 * Setup our cluster connection if not already initialized.
894 protected final synchronized void setupClusterConnection() throws IOException
{
895 if (asyncClusterConnection
== null) {
896 Configuration conf
= cleanupConfiguration();
897 InetSocketAddress localAddress
= new InetSocketAddress(this.rpcServices
.isa
.getAddress(), 0);
898 User user
= userProvider
.getCurrent();
899 asyncClusterConnection
=
900 ClusterConnectionFactory
.createAsyncClusterConnection(conf
, localAddress
, user
);
905 * All initialization needed before we go register with Master.<br>
906 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
907 * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
909 private void preRegistrationInitialization() {
911 initializeZooKeeper();
912 setupClusterConnection();
913 // Setup RPC client for master communication
914 this.rpcClient
= asyncClusterConnection
.getRpcClient();
915 } catch (Throwable t
) {
916 // Call stop if error or process will stick around for ever since server
917 // puts up non-daemon threads.
918 this.rpcServices
.stop();
919 abort("Initialization of RS failed. Hence aborting RS.", t
);
924 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
925 * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
927 * Finally open long-living server short-circuit connection.
929 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
930 justification
="cluster Id znode read would give us correct response")
931 private void initializeZooKeeper() throws IOException
, InterruptedException
{
932 // Nothing to do in here if no Master in the mix.
933 if (this.masterless
) {
937 // Create the master address tracker, register with zk, and start it. Then
938 // block until a master is available. No point in starting up if no master
940 blockAndCheckIfStopped(this.masterAddressTracker
);
942 // Wait on cluster being up. Master will set this flag up in zookeeper
944 blockAndCheckIfStopped(this.clusterStatusTracker
);
946 // If we are HMaster then the cluster id should have already been set.
947 if (clusterId
== null) {
948 // Retrieve clusterId
949 // Since cluster status is now up
950 // ID should have already been set by HMaster
952 clusterId
= ZKClusterId
.readClusterIdZNode(this.zooKeeper
);
953 if (clusterId
== null) {
954 this.abort("Cluster ID has not been set");
956 LOG
.info("ClusterId : " + clusterId
);
957 } catch (KeeperException e
) {
958 this.abort("Failed to retrieve Cluster ID", e
);
962 waitForMasterActive();
963 if (isStopped() || isAborted()) {
964 return; // No need for further initialization
967 // watch for snapshots and other procedures
969 rspmHost
= new RegionServerProcedureManagerHost();
970 rspmHost
.loadProcedures(conf
);
971 rspmHost
.initialize(this);
972 } catch (KeeperException e
) {
973 this.abort("Failed to reach coordination cluster when creating procedure handler.", e
);
978 * Utilty method to wait indefinitely on a znode availability while checking
979 * if the region server is shut down
980 * @param tracker znode tracker to use
981 * @throws IOException any IO exception, plus if the RS is stopped
982 * @throws InterruptedException if the waiting thread is interrupted
984 private void blockAndCheckIfStopped(ZKNodeTracker tracker
)
985 throws IOException
, InterruptedException
{
986 while (tracker
.blockUntilAvailable(this.msgInterval
, false) == null) {
988 throw new IOException("Received the shutdown message while waiting.");
994 * @return True if the cluster is up.
997 public boolean isClusterUp() {
998 return this.masterless
||
999 (this.clusterStatusTracker
!= null && this.clusterStatusTracker
.isClusterUp());
1003 * The HRegionServer sticks in this loop until closed.
1008 LOG
.info("Skipping run; stopped");
1012 // Do pre-registration initializations; zookeeper, lease threads, etc.
1013 preRegistrationInitialization();
1014 } catch (Throwable e
) {
1015 abort("Fatal exception during initialization", e
);
1019 if (!isStopped() && !isAborted()) {
1020 ShutdownHook
.install(conf
, dataFs
, this, Thread
.currentThread());
1021 // Initialize the RegionServerCoprocessorHost now that our ephemeral
1022 // node was created, in case any coprocessors want to use ZooKeeper
1023 this.rsHost
= new RegionServerCoprocessorHost(this, this.conf
);
1025 // Try and register with the Master; tell it we are here. Break if server is stopped or
1026 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
1027 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
1029 LOG
.debug("About to register with Master.");
1030 RetryCounterFactory rcf
=
1031 new RetryCounterFactory(Integer
.MAX_VALUE
, this.sleeper
.getPeriod(), 1000 * 60 * 5);
1032 RetryCounter rc
= rcf
.create();
1033 while (keepLooping()) {
1034 RegionServerStartupResponse w
= reportForDuty();
1036 long sleepTime
= rc
.getBackoffTimeAndIncrementAttempts();
1037 LOG
.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime
);
1038 this.sleeper
.sleep(sleepTime
);
1040 handleReportForDutyResponse(w
);
1046 if (!isStopped() && isHealthy()) {
1047 // start the snapshot handler and other procedure handlers,
1048 // since the server is ready to run
1049 if (this.rspmHost
!= null) {
1050 this.rspmHost
.start();
1052 // Start the Quota Manager
1053 if (this.rsQuotaManager
!= null) {
1054 rsQuotaManager
.start(getRpcServer().getScheduler());
1056 if (this.rsSpaceQuotaManager
!= null) {
1057 this.rsSpaceQuotaManager
.start();
1061 // We registered with the Master. Go into run mode.
1062 long lastMsg
= EnvironmentEdgeManager
.currentTime();
1063 long oldRequestCount
= -1;
1064 // The main run loop.
1065 while (!isStopped() && isHealthy()) {
1066 if (!isClusterUp()) {
1067 if (onlineRegions
.isEmpty()) {
1068 stop("Exiting; cluster shutdown set and not carrying any regions");
1069 } else if (!this.stopping
) {
1070 this.stopping
= true;
1071 LOG
.info("Closing user regions");
1072 closeUserRegions(this.abortRequested
.get());
1074 boolean allUserRegionsOffline
= areAllUserRegionsOffline();
1075 if (allUserRegionsOffline
) {
1076 // Set stopped if no more write requests tp meta tables
1077 // since last time we went around the loop. Any open
1078 // meta regions will be closed on our way out.
1079 if (oldRequestCount
== getWriteRequestCount()) {
1080 stop("Stopped; only catalog regions remaining online");
1083 oldRequestCount
= getWriteRequestCount();
1085 // Make sure all regions have been closed -- some regions may
1086 // have not got it because we were splitting at the time of
1087 // the call to closeUserRegions.
1088 closeUserRegions(this.abortRequested
.get());
1090 LOG
.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1093 long now
= EnvironmentEdgeManager
.currentTime();
1094 if ((now
- lastMsg
) >= msgInterval
) {
1095 tryRegionServerReport(lastMsg
, now
);
1096 lastMsg
= EnvironmentEdgeManager
.currentTime();
1098 if (!isStopped() && !isAborted()) {
1099 this.sleeper
.sleep();
1102 } catch (Throwable t
) {
1103 if (!rpcServices
.checkOOME(t
)) {
1104 String prefix
= t
instanceof YouAreDeadException?
"": "Unhandled: ";
1105 abort(prefix
+ t
.getMessage(), t
);
1109 if (this.leaseManager
!= null) {
1110 this.leaseManager
.closeAfterLeasesExpire();
1112 if (this.splitLogWorker
!= null) {
1113 splitLogWorker
.stop();
1115 if (this.infoServer
!= null) {
1116 LOG
.info("Stopping infoServer");
1118 this.infoServer
.stop();
1119 } catch (Exception e
) {
1120 LOG
.error("Failed to stop infoServer", e
);
1123 // Send cache a shutdown.
1124 if (blockCache
!= null) {
1125 blockCache
.shutdown();
1127 if (mobFileCache
!= null) {
1128 mobFileCache
.shutdown();
1131 // Send interrupts to wake up threads if sleeping so they notice shutdown.
1132 // TODO: Should we check they are alive? If OOME could have exited already
1133 if (this.hMemManager
!= null) this.hMemManager
.stop();
1134 if (this.cacheFlusher
!= null) this.cacheFlusher
.interruptIfNecessary();
1135 if (this.compactSplitThread
!= null) this.compactSplitThread
.interruptIfNecessary();
1137 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1138 if (rspmHost
!= null) {
1139 rspmHost
.stop(this.abortRequested
.get() || this.killed
);
1143 // Just skip out w/o closing regions. Used when testing.
1144 } else if (abortRequested
.get()) {
1145 if (this.dataFsOk
) {
1146 closeUserRegions(abortRequested
.get()); // Don't leave any open file handles
1148 LOG
.info("aborting server " + this.serverName
);
1150 closeUserRegions(abortRequested
.get());
1151 LOG
.info("stopping server " + this.serverName
);
1154 if (this.asyncClusterConnection
!= null) {
1156 this.asyncClusterConnection
.close();
1157 } catch (IOException e
) {
1158 // Although the {@link Closeable} interface throws an {@link
1159 // IOException}, in reality, the implementation would never do that.
1160 LOG
.warn("Attempt to close server's AsyncClusterConnection failed.", e
);
1163 // Closing the compactSplit thread before closing meta regions
1164 if (!this.killed
&& containsMetaTableRegions()) {
1165 if (!abortRequested
.get() || this.dataFsOk
) {
1166 if (this.compactSplitThread
!= null) {
1167 this.compactSplitThread
.join();
1168 this.compactSplitThread
= null;
1170 closeMetaTableRegions(abortRequested
.get());
1174 if (!this.killed
&& this.dataFsOk
) {
1175 waitOnAllRegionsToClose(abortRequested
.get());
1176 LOG
.info("stopping server " + this.serverName
+ "; all regions closed.");
1179 // Stop the quota manager
1180 if (rsQuotaManager
!= null) {
1181 rsQuotaManager
.stop();
1183 if (rsSpaceQuotaManager
!= null) {
1184 rsSpaceQuotaManager
.stop();
1185 rsSpaceQuotaManager
= null;
1188 // flag may be changed when closing regions throws exception.
1189 if (this.dataFsOk
) {
1190 shutdownWAL(!abortRequested
.get());
1193 // Make sure the proxy is down.
1194 if (this.rssStub
!= null) {
1195 this.rssStub
= null;
1197 if (this.lockStub
!= null) {
1198 this.lockStub
= null;
1200 if (this.rpcClient
!= null) {
1201 this.rpcClient
.close();
1203 if (this.leaseManager
!= null) {
1204 this.leaseManager
.close();
1206 if (this.pauseMonitor
!= null) {
1207 this.pauseMonitor
.stop();
1211 stopServiceThreads();
1214 if (this.rpcServices
!= null) {
1215 this.rpcServices
.stop();
1219 deleteMyEphemeralNode();
1220 } catch (KeeperException
.NoNodeException nn
) {
1222 } catch (KeeperException e
) {
1223 LOG
.warn("Failed deleting my ephemeral node", e
);
1225 // We may have failed to delete the znode at the previous step, but
1226 // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1227 ZNodeClearer
.deleteMyEphemeralNodeOnDisk();
1229 if (this.zooKeeper
!= null) {
1230 this.zooKeeper
.close();
1232 this.shutDown
= true;
1233 LOG
.info("Exiting; stopping=" + this.serverName
+ "; zookeeper connection closed.");
1236 private boolean containsMetaTableRegions() {
1237 return onlineRegions
.containsKey(RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
1240 private boolean areAllUserRegionsOffline() {
1241 if (getNumberOfOnlineRegions() > 2) return false;
1242 boolean allUserRegionsOffline
= true;
1243 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1244 if (!e
.getValue().getRegionInfo().isMetaRegion()) {
1245 allUserRegionsOffline
= false;
1249 return allUserRegionsOffline
;
1253 * @return Current write count for all online regions.
1255 private long getWriteRequestCount() {
1256 long writeCount
= 0;
1257 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1258 writeCount
+= e
.getValue().getWriteRequestsCount();
1263 @InterfaceAudience.Private
1264 protected void tryRegionServerReport(long reportStartTime
, long reportEndTime
)
1265 throws IOException
{
1266 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
1268 // the current server could be stopping.
1271 ClusterStatusProtos
.ServerLoad sl
= buildServerLoad(reportStartTime
, reportEndTime
);
1273 RegionServerReportRequest
.Builder request
= RegionServerReportRequest
.newBuilder();
1274 request
.setServer(ProtobufUtil
.toServerName(this.serverName
));
1275 request
.setLoad(sl
);
1276 rss
.regionServerReport(null, request
.build());
1277 } catch (ServiceException se
) {
1278 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
1279 if (ioe
instanceof YouAreDeadException
) {
1280 // This will be caught and handled as a fatal error in run()
1283 if (rssStub
== rss
) {
1286 // Couldn't connect to the master, get location from zk and reconnect
1287 // Method blocks until new master is found or we are stopped
1288 createRegionServerStatusStub(true);
1293 * Reports the given map of Regions and their size on the filesystem to the active Master.
1295 * @param regionSizeStore The store containing region sizes
1296 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1298 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore
) {
1299 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
1301 // the current server could be stopping.
1302 LOG
.trace("Skipping Region size report to HMaster as stub is null");
1306 buildReportAndSend(rss
, regionSizeStore
);
1307 } catch (ServiceException se
) {
1308 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
1309 if (ioe
instanceof PleaseHoldException
) {
1310 LOG
.trace("Failed to report region sizes to Master because it is initializing."
1311 + " This will be retried.", ioe
);
1312 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1315 if (rssStub
== rss
) {
1318 createRegionServerStatusStub(true);
1319 if (ioe
instanceof DoNotRetryIOException
) {
1320 DoNotRetryIOException doNotRetryEx
= (DoNotRetryIOException
) ioe
;
1321 if (doNotRetryEx
.getCause() != null) {
1322 Throwable t
= doNotRetryEx
.getCause();
1323 if (t
instanceof UnsupportedOperationException
) {
1324 LOG
.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1329 LOG
.debug("Failed to report region sizes to Master. This will be retried.", ioe
);
1335 * Builds the region size report and sends it to the master. Upon successful sending of the
1336 * report, the region sizes that were sent are marked as sent.
1338 * @param rss The stub to send to the Master
1339 * @param regionSizeStore The store containing region sizes
1341 private void buildReportAndSend(RegionServerStatusService
.BlockingInterface rss
,
1342 RegionSizeStore regionSizeStore
) throws ServiceException
{
1343 RegionSpaceUseReportRequest request
=
1344 buildRegionSpaceUseReportRequest(Objects
.requireNonNull(regionSizeStore
));
1345 rss
.reportRegionSpaceUse(null, request
);
1346 // Record the number of size reports sent
1347 if (metricsRegionServer
!= null) {
1348 metricsRegionServer
.incrementNumRegionSizeReportsSent(regionSizeStore
.size());
1353 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1355 * @param regionSizes The size in bytes of regions
1356 * @return The corresponding protocol buffer message.
1358 RegionSpaceUseReportRequest
buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes
) {
1359 RegionSpaceUseReportRequest
.Builder request
= RegionSpaceUseReportRequest
.newBuilder();
1360 for (Entry
<RegionInfo
, RegionSize
> entry
: regionSizes
) {
1361 request
.addSpaceUse(convertRegionSize(entry
.getKey(), entry
.getValue().getSize()));
1363 return request
.build();
1367 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1370 * @param regionInfo The RegionInfo
1371 * @param sizeInBytes The size in bytes of the Region
1372 * @return The protocol buffer
1374 RegionSpaceUse
convertRegionSize(RegionInfo regionInfo
, Long sizeInBytes
) {
1375 return RegionSpaceUse
.newBuilder()
1376 .setRegionInfo(ProtobufUtil
.toRegionInfo(Objects
.requireNonNull(regionInfo
)))
1377 .setRegionSize(Objects
.requireNonNull(sizeInBytes
))
1381 private ClusterStatusProtos
.ServerLoad
buildServerLoad(long reportStartTime
, long reportEndTime
)
1382 throws IOException
{
1383 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1384 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1385 // the wrapper to compute those numbers in one place.
1386 // In the long term most of these should be moved off of ServerLoad and the heart beat.
1387 // Instead they should be stored in an HBase table so that external visibility into HBase is
1388 // improved; Additionally the load balancer will be able to take advantage of a more complete
1390 MetricsRegionServerWrapper regionServerWrapper
= metricsRegionServer
.getRegionServerWrapper();
1391 Collection
<HRegion
> regions
= getOnlineRegionsLocalContext();
1392 long usedMemory
= -1L;
1393 long maxMemory
= -1L;
1394 final MemoryUsage usage
= MemorySizeUtil
.safeGetHeapMemoryUsage();
1395 if (usage
!= null) {
1396 usedMemory
= usage
.getUsed();
1397 maxMemory
= usage
.getMax();
1400 ClusterStatusProtos
.ServerLoad
.Builder serverLoad
= ClusterStatusProtos
.ServerLoad
.newBuilder();
1401 serverLoad
.setNumberOfRequests((int) regionServerWrapper
.getRequestsPerSecond());
1402 serverLoad
.setTotalNumberOfRequests(regionServerWrapper
.getTotalRequestCount());
1403 serverLoad
.setUsedHeapMB((int)(usedMemory
/ 1024 / 1024));
1404 serverLoad
.setMaxHeapMB((int) (maxMemory
/ 1024 / 1024));
1405 serverLoad
.setReadRequestsCount(this.metricsRegionServerImpl
.getReadRequestsCount());
1406 serverLoad
.setWriteRequestsCount(this.metricsRegionServerImpl
.getWriteRequestsCount());
1407 Set
<String
> coprocessors
= getWAL(null).getCoprocessorHost().getCoprocessors();
1408 Builder coprocessorBuilder
= Coprocessor
.newBuilder();
1409 for (String coprocessor
: coprocessors
) {
1410 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1412 RegionLoad
.Builder regionLoadBldr
= RegionLoad
.newBuilder();
1413 RegionSpecifier
.Builder regionSpecifier
= RegionSpecifier
.newBuilder();
1414 for (HRegion region
: regions
) {
1415 if (region
.getCoprocessorHost() != null) {
1416 Set
<String
> regionCoprocessors
= region
.getCoprocessorHost().getCoprocessors();
1417 for (String regionCoprocessor
: regionCoprocessors
) {
1418 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(regionCoprocessor
).build());
1421 serverLoad
.addRegionLoads(createRegionLoad(region
, regionLoadBldr
, regionSpecifier
));
1422 for (String coprocessor
: getWAL(region
.getRegionInfo()).getCoprocessorHost()
1423 .getCoprocessors()) {
1424 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1427 serverLoad
.setReportStartTime(reportStartTime
);
1428 serverLoad
.setReportEndTime(reportEndTime
);
1429 if (this.infoServer
!= null) {
1430 serverLoad
.setInfoServerPort(this.infoServer
.getPort());
1432 serverLoad
.setInfoServerPort(-1);
1434 MetricsUserAggregateSource userSource
=
1435 metricsRegionServer
.getMetricsUserAggregate().getSource();
1436 if (userSource
!= null) {
1437 Map
<String
, MetricsUserSource
> userMetricMap
= userSource
.getUserSources();
1438 for (Entry
<String
, MetricsUserSource
> entry
: userMetricMap
.entrySet()) {
1439 serverLoad
.addUserLoads(createUserLoad(entry
.getKey(), entry
.getValue()));
1443 if (sameReplicationSourceAndSink
&& replicationSourceHandler
!= null) {
1444 // always refresh first to get the latest value
1445 ReplicationLoad rLoad
= replicationSourceHandler
.refreshAndGetReplicationLoad();
1446 if (rLoad
!= null) {
1447 serverLoad
.setReplLoadSink(rLoad
.getReplicationLoadSink());
1448 for (ClusterStatusProtos
.ReplicationLoadSource rLS
: rLoad
1449 .getReplicationLoadSourceEntries()) {
1450 serverLoad
.addReplLoadSource(rLS
);
1454 if (replicationSourceHandler
!= null) {
1455 ReplicationLoad rLoad
= replicationSourceHandler
.refreshAndGetReplicationLoad();
1456 if (rLoad
!= null) {
1457 for (ClusterStatusProtos
.ReplicationLoadSource rLS
: rLoad
1458 .getReplicationLoadSourceEntries()) {
1459 serverLoad
.addReplLoadSource(rLS
);
1463 if (replicationSinkHandler
!= null) {
1464 ReplicationLoad rLoad
= replicationSinkHandler
.refreshAndGetReplicationLoad();
1465 if (rLoad
!= null) {
1466 serverLoad
.setReplLoadSink(rLoad
.getReplicationLoadSink());
1471 return serverLoad
.build();
1474 private String
getOnlineRegionsAsPrintableString() {
1475 StringBuilder sb
= new StringBuilder();
1476 for (Region r
: this.onlineRegions
.values()) {
1477 if (sb
.length() > 0) sb
.append(", ");
1478 sb
.append(r
.getRegionInfo().getEncodedName());
1480 return sb
.toString();
1484 * Wait on regions close.
1486 private void waitOnAllRegionsToClose(final boolean abort
) {
1487 // Wait till all regions are closed before going out.
1489 long previousLogTime
= 0;
1490 Set
<String
> closedRegions
= new HashSet
<>();
1491 boolean interrupted
= false;
1493 while (!onlineRegions
.isEmpty()) {
1494 int count
= getNumberOfOnlineRegions();
1495 // Only print a message if the count of regions has changed.
1496 if (count
!= lastCount
) {
1497 // Log every second at most
1498 if (EnvironmentEdgeManager
.currentTime() > (previousLogTime
+ 1000)) {
1499 previousLogTime
= EnvironmentEdgeManager
.currentTime();
1501 LOG
.info("Waiting on " + count
+ " regions to close");
1502 // Only print out regions still closing if a small number else will
1504 if (count
< 10 && LOG
.isDebugEnabled()) {
1505 LOG
.debug("Online Regions=" + this.onlineRegions
);
1509 // Ensure all user regions have been sent a close. Use this to
1510 // protect against the case where an open comes in after we start the
1511 // iterator of onlineRegions to close all user regions.
1512 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1513 RegionInfo hri
= e
.getValue().getRegionInfo();
1514 if (!this.regionsInTransitionInRS
.containsKey(hri
.getEncodedNameAsBytes()) &&
1515 !closedRegions
.contains(hri
.getEncodedName())) {
1516 closedRegions
.add(hri
.getEncodedName());
1517 // Don't update zk with this close transition; pass false.
1518 closeRegionIgnoreErrors(hri
, abort
);
1521 // No regions in RIT, we could stop waiting now.
1522 if (this.regionsInTransitionInRS
.isEmpty()) {
1523 if (!onlineRegions
.isEmpty()) {
1524 LOG
.info("We were exiting though online regions are not empty," +
1525 " because some regions failed closing");
1529 LOG
.debug("Waiting on {}", this.regionsInTransitionInRS
.keySet().stream().
1530 map(e
-> Bytes
.toString(e
)).collect(Collectors
.joining(", ")));
1532 if (sleepInterrupted(200)) {
1538 Thread
.currentThread().interrupt();
1543 private static boolean sleepInterrupted(long millis
) {
1544 boolean interrupted
= false;
1546 Thread
.sleep(millis
);
1547 } catch (InterruptedException e
) {
1548 LOG
.warn("Interrupted while sleeping");
1554 private void shutdownWAL(final boolean close
) {
1555 if (this.walFactory
!= null) {
1560 walFactory
.shutdown();
1562 } catch (Throwable e
) {
1563 e
= e
instanceof RemoteException ?
((RemoteException
) e
).unwrapRemoteException() : e
;
1564 LOG
.error("Shutdown / close of WAL failed: " + e
);
1565 LOG
.debug("Shutdown / close exception details:", e
);
1571 * get NamedQueue Provider to add different logs to ringbuffer
1573 * @return NamedQueueRecorder
1575 public NamedQueueRecorder
getNamedQueueRecorder() {
1576 return this.namedQueueRecorder
;
1580 * Run init. Sets up wal and starts up all server threads.
1582 * @param c Extra configuration.
1584 protected void handleReportForDutyResponse(final RegionServerStartupResponse c
)
1585 throws IOException
{
1587 boolean updateRootDir
= false;
1588 for (NameStringPair e
: c
.getMapEntriesList()) {
1589 String key
= e
.getName();
1590 // The hostname the master sees us as.
1591 if (key
.equals(HConstants
.KEY_FOR_HOSTNAME_SEEN_BY_MASTER
)) {
1592 String hostnameFromMasterPOV
= e
.getValue();
1593 this.serverName
= ServerName
.valueOf(hostnameFromMasterPOV
, rpcServices
.isa
.getPort(),
1595 if (!StringUtils
.isBlank(useThisHostnameInstead
) &&
1596 !hostnameFromMasterPOV
.equals(useThisHostnameInstead
)) {
1597 String msg
= "Master passed us a different hostname to use; was=" +
1598 this.useThisHostnameInstead
+ ", but now=" + hostnameFromMasterPOV
;
1600 throw new IOException(msg
);
1602 if (StringUtils
.isBlank(useThisHostnameInstead
) &&
1603 !hostnameFromMasterPOV
.equals(rpcServices
.isa
.getHostName())) {
1604 String msg
= "Master passed us a different hostname to use; was=" +
1605 rpcServices
.isa
.getHostName() + ", but now=" + hostnameFromMasterPOV
;
1611 String value
= e
.getValue();
1612 if (key
.equals(HConstants
.HBASE_DIR
)) {
1613 if (value
!= null && !value
.equals(conf
.get(HConstants
.HBASE_DIR
))) {
1614 updateRootDir
= true;
1618 if (LOG
.isDebugEnabled()) {
1619 LOG
.debug("Config from master: " + key
+ "=" + value
);
1621 this.conf
.set(key
, value
);
1623 // Set our ephemeral znode up in zookeeper now we have a name.
1624 createMyEphemeralNode();
1626 if (updateRootDir
) {
1627 // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1628 initializeFileSystem();
1631 // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1632 // config param for task trackers, but we can piggyback off of it.
1633 if (this.conf
.get("mapreduce.task.attempt.id") == null) {
1634 this.conf
.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName
.toString());
1637 // Save it in a file, this will allow to see if we crash
1638 ZNodeClearer
.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1640 // This call sets up an initialized replication and WAL. Later we start it up.
1641 setupWALAndReplication();
1642 // Init in here rather than in constructor after thread name has been set
1643 final MetricsTable metricsTable
=
1644 new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1645 this.metricsRegionServerImpl
= new MetricsRegionServerWrapperImpl(this);
1646 this.metricsRegionServer
= new MetricsRegionServer(
1647 metricsRegionServerImpl
, conf
, metricsTable
);
1648 // Now that we have a metrics source, start the pause monitor
1649 this.pauseMonitor
= new JvmPauseMonitor(conf
, getMetrics().getMetricsSource());
1650 pauseMonitor
.start();
1652 // There is a rare case where we do NOT want services to start. Check config.
1653 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1656 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1657 // or make sense of it.
1658 startReplicationService();
1662 LOG
.info("Serving as " + this.serverName
+ ", RpcServer on " + rpcServices
.isa
+
1664 Long
.toHexString(this.zooKeeper
.getRecoverableZooKeeper().getSessionId()));
1666 // Wake up anyone waiting for this server to online
1667 synchronized (online
) {
1671 } catch (Throwable e
) {
1672 stop("Failed initialization");
1673 throw convertThrowableToIOE(cleanup(e
, "Failed init"),
1674 "Region server startup failed");
1676 sleeper
.skipSleepCycle();
1680 protected void initializeMemStoreChunkCreator() {
1681 if (MemStoreLAB
.isEnabled(conf
)) {
1682 // MSLAB is enabled. So initialize MemStoreChunkPool
1683 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1685 Pair
<Long
, MemoryType
> pair
= MemorySizeUtil
.getGlobalMemStoreSize(conf
);
1686 long globalMemStoreSize
= pair
.getFirst();
1687 boolean offheap
= this.regionServerAccounting
.isOffheap();
1688 // When off heap memstore in use, take full area for chunk pool.
1689 float poolSizePercentage
= offheap ?
1.0F
:
1690 conf
.getFloat(MemStoreLAB
.CHUNK_POOL_MAXSIZE_KEY
, MemStoreLAB
.POOL_MAX_SIZE_DEFAULT
);
1691 float initialCountPercentage
= conf
.getFloat(MemStoreLAB
.CHUNK_POOL_INITIALSIZE_KEY
,
1692 MemStoreLAB
.POOL_INITIAL_SIZE_DEFAULT
);
1693 int chunkSize
= conf
.getInt(MemStoreLAB
.CHUNK_SIZE_KEY
, MemStoreLAB
.CHUNK_SIZE_DEFAULT
);
1694 float indexChunkSizePercent
= conf
.getFloat(MemStoreLAB
.INDEX_CHUNK_SIZE_PERCENTAGE_KEY
,
1695 MemStoreLAB
.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT
);
1696 // init the chunkCreator
1697 ChunkCreator
.initialize(chunkSize
, offheap
, globalMemStoreSize
, poolSizePercentage
,
1698 initialCountPercentage
, this.hMemManager
, indexChunkSizePercent
);
1702 private void startHeapMemoryManager() {
1703 if (this.blockCache
!= null) {
1705 new HeapMemoryManager(this.blockCache
, this.cacheFlusher
, this, regionServerAccounting
);
1706 this.hMemManager
.start(getChoreService());
1710 private void createMyEphemeralNode() throws KeeperException
{
1711 RegionServerInfo
.Builder rsInfo
= RegionServerInfo
.newBuilder();
1712 rsInfo
.setInfoPort(infoServer
!= null ? infoServer
.getPort() : -1);
1713 rsInfo
.setVersionInfo(ProtobufUtil
.getVersionInfo());
1714 byte[] data
= ProtobufUtil
.prependPBMagic(rsInfo
.build().toByteArray());
1715 ZKUtil
.createEphemeralNodeAndWatch(this.zooKeeper
, getMyEphemeralNodePath(), data
);
1718 private void deleteMyEphemeralNode() throws KeeperException
{
1719 ZKUtil
.deleteNode(this.zooKeeper
, getMyEphemeralNodePath());
1723 public RegionServerAccounting
getRegionServerAccounting() {
1724 return regionServerAccounting
;
1728 * @param r Region to get RegionLoad for.
1729 * @param regionLoadBldr the RegionLoad.Builder, can be null
1730 * @param regionSpecifier the RegionSpecifier.Builder, can be null
1731 * @return RegionLoad instance.
1733 RegionLoad
createRegionLoad(final HRegion r
, RegionLoad
.Builder regionLoadBldr
,
1734 RegionSpecifier
.Builder regionSpecifier
) throws IOException
{
1735 byte[] name
= r
.getRegionInfo().getRegionName();
1738 int storeRefCount
= 0;
1739 int maxCompactedStoreFileRefCount
= 0;
1740 int storeUncompressedSizeMB
= 0;
1741 int storefileSizeMB
= 0;
1742 int memstoreSizeMB
= (int) (r
.getMemStoreDataSize() / 1024 / 1024);
1743 long storefileIndexSizeKB
= 0;
1744 int rootLevelIndexSizeKB
= 0;
1745 int totalStaticIndexSizeKB
= 0;
1746 int totalStaticBloomSizeKB
= 0;
1747 long totalCompactingKVs
= 0;
1748 long currentCompactedKVs
= 0;
1749 List
<HStore
> storeList
= r
.getStores();
1750 stores
+= storeList
.size();
1751 for (HStore store
: storeList
) {
1752 storefiles
+= store
.getStorefilesCount();
1753 int currentStoreRefCount
= store
.getStoreRefCount();
1754 storeRefCount
+= currentStoreRefCount
;
1755 int currentMaxCompactedStoreFileRefCount
= store
.getMaxCompactedStoreFileRefCount();
1756 maxCompactedStoreFileRefCount
= Math
.max(maxCompactedStoreFileRefCount
,
1757 currentMaxCompactedStoreFileRefCount
);
1758 storeUncompressedSizeMB
+= (int) (store
.getStoreSizeUncompressed() / 1024 / 1024);
1759 storefileSizeMB
+= (int) (store
.getStorefilesSize() / 1024 / 1024);
1760 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1761 storefileIndexSizeKB
+= store
.getStorefilesRootLevelIndexSize() / 1024;
1762 CompactionProgress progress
= store
.getCompactionProgress();
1763 if (progress
!= null) {
1764 totalCompactingKVs
+= progress
.getTotalCompactingKVs();
1765 currentCompactedKVs
+= progress
.currentCompactedKVs
;
1767 rootLevelIndexSizeKB
+= (int) (store
.getStorefilesRootLevelIndexSize() / 1024);
1768 totalStaticIndexSizeKB
+= (int) (store
.getTotalStaticIndexSize() / 1024);
1769 totalStaticBloomSizeKB
+= (int) (store
.getTotalStaticBloomSize() / 1024);
1772 HDFSBlocksDistribution hdfsBd
= r
.getHDFSBlocksDistribution();
1773 float dataLocality
= hdfsBd
.getBlockLocalityIndex(serverName
.getHostname());
1774 float dataLocalityForSsd
= hdfsBd
.getBlockLocalityIndexForSsd(serverName
.getHostname());
1775 long blocksTotalWeight
= hdfsBd
.getUniqueBlocksTotalWeight();
1776 long blocksLocalWeight
= hdfsBd
.getBlocksLocalWeight(serverName
.getHostname());
1777 long blocksLocalWithSsdWeight
= hdfsBd
.getBlocksLocalWithSsdWeight(serverName
.getHostname());
1778 if (regionLoadBldr
== null) {
1779 regionLoadBldr
= RegionLoad
.newBuilder();
1781 if (regionSpecifier
== null) {
1782 regionSpecifier
= RegionSpecifier
.newBuilder();
1784 regionSpecifier
.setType(RegionSpecifierType
.REGION_NAME
);
1785 regionSpecifier
.setValue(UnsafeByteOperations
.unsafeWrap(name
));
1786 regionLoadBldr
.setRegionSpecifier(regionSpecifier
.build())
1788 .setStorefiles(storefiles
)
1789 .setStoreRefCount(storeRefCount
)
1790 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount
)
1791 .setStoreUncompressedSizeMB(storeUncompressedSizeMB
)
1792 .setStorefileSizeMB(storefileSizeMB
)
1793 .setMemStoreSizeMB(memstoreSizeMB
)
1794 .setStorefileIndexSizeKB(storefileIndexSizeKB
)
1795 .setRootIndexSizeKB(rootLevelIndexSizeKB
)
1796 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB
)
1797 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB
)
1798 .setReadRequestsCount(r
.getReadRequestsCount())
1799 .setCpRequestsCount(r
.getCpRequestsCount())
1800 .setFilteredReadRequestsCount(r
.getFilteredReadRequestsCount())
1801 .setWriteRequestsCount(r
.getWriteRequestsCount())
1802 .setTotalCompactingKVs(totalCompactingKVs
)
1803 .setCurrentCompactedKVs(currentCompactedKVs
)
1804 .setDataLocality(dataLocality
)
1805 .setDataLocalityForSsd(dataLocalityForSsd
)
1806 .setBlocksLocalWeight(blocksLocalWeight
)
1807 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight
)
1808 .setBlocksTotalWeight(blocksTotalWeight
)
1809 .setCompactionState(ProtobufUtil
.createCompactionStateForRegionLoad(r
.getCompactionState()))
1810 .setLastMajorCompactionTs(r
.getOldestHfileTs(true));
1811 r
.setCompleteSequenceId(regionLoadBldr
);
1812 return regionLoadBldr
.build();
1815 private UserLoad
createUserLoad(String user
, MetricsUserSource userSource
) {
1816 UserLoad
.Builder userLoadBldr
= UserLoad
.newBuilder();
1817 userLoadBldr
.setUserName(user
);
1818 userSource
.getClientMetrics().values().stream().map(
1819 clientMetrics
-> ClusterStatusProtos
.ClientMetrics
.newBuilder()
1820 .setHostName(clientMetrics
.getHostName())
1821 .setWriteRequestsCount(clientMetrics
.getWriteRequestsCount())
1822 .setFilteredRequestsCount(clientMetrics
.getFilteredReadRequests())
1823 .setReadRequestsCount(clientMetrics
.getReadRequestsCount()).build())
1824 .forEach(userLoadBldr
::addClientMetrics
);
1825 return userLoadBldr
.build();
1828 public RegionLoad
createRegionLoad(final String encodedRegionName
) throws IOException
{
1829 HRegion r
= onlineRegions
.get(encodedRegionName
);
1830 return r
!= null ?
createRegionLoad(r
, null, null) : null;
1834 * Inner class that runs on a long period checking if regions need compaction.
1836 private static class CompactionChecker
extends ScheduledChore
{
1837 private final HRegionServer instance
;
1838 private final int majorCompactPriority
;
1839 private final static int DEFAULT_PRIORITY
= Integer
.MAX_VALUE
;
1840 //Iteration is 1-based rather than 0-based so we don't check for compaction
1841 // immediately upon region server startup
1842 private long iteration
= 1;
1844 CompactionChecker(final HRegionServer h
, final int sleepTime
, final Stoppable stopper
) {
1845 super("CompactionChecker", stopper
, sleepTime
);
1847 LOG
.info(this.getName() + " runs every " + Duration
.ofMillis(sleepTime
));
1849 /* MajorCompactPriority is configurable.
1850 * If not set, the compaction will use default priority.
1852 this.majorCompactPriority
= this.instance
.conf
.
1853 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1858 protected void chore() {
1859 for (Region r
: this.instance
.onlineRegions
.values()) {
1860 // Skip compaction if region is read only
1861 if (r
== null || r
.isReadOnly()) {
1865 HRegion hr
= (HRegion
) r
;
1866 for (HStore s
: hr
.stores
.values()) {
1868 long multiplier
= s
.getCompactionCheckMultiplier();
1869 assert multiplier
> 0;
1870 if (iteration
% multiplier
!= 0) {
1873 if (s
.needsCompaction()) {
1874 // Queue a compaction. Will recognize if major is needed.
1875 this.instance
.compactSplitThread
.requestSystemCompaction(hr
, s
,
1876 getName() + " requests compaction");
1877 } else if (s
.shouldPerformMajorCompaction()) {
1878 s
.triggerMajorCompaction();
1879 if (majorCompactPriority
== DEFAULT_PRIORITY
||
1880 majorCompactPriority
> hr
.getCompactPriority()) {
1881 this.instance
.compactSplitThread
.requestCompaction(hr
, s
,
1882 getName() + " requests major compaction; use default priority",
1884 CompactionLifeCycleTracker
.DUMMY
, null);
1886 this.instance
.compactSplitThread
.requestCompaction(hr
, s
,
1887 getName() + " requests major compaction; use configured priority",
1888 this.majorCompactPriority
, CompactionLifeCycleTracker
.DUMMY
, null);
1891 } catch (IOException e
) {
1892 LOG
.warn("Failed major compaction check on " + r
, e
);
1896 iteration
= (iteration
== Long
.MAX_VALUE
) ?
0 : (iteration
+ 1);
1900 private static class PeriodicMemStoreFlusher
extends ScheduledChore
{
1901 private final HRegionServer server
;
1902 private final static int RANGE_OF_DELAY
= 5 * 60; // 5 min in seconds
1903 private final static int MIN_DELAY_TIME
= 0; // millisec
1904 private final long rangeOfDelayMs
;
1906 PeriodicMemStoreFlusher(int cacheFlushInterval
, final HRegionServer server
) {
1907 super("MemstoreFlusherChore", server
, cacheFlushInterval
);
1908 this.server
= server
;
1910 final long configuredRangeOfDelay
= server
.getConfiguration().getInt(
1911 "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY
);
1912 this.rangeOfDelayMs
= TimeUnit
.SECONDS
.toMillis(configuredRangeOfDelay
);
1916 protected void chore() {
1917 final StringBuilder whyFlush
= new StringBuilder();
1918 for (HRegion r
: this.server
.onlineRegions
.values()) {
1919 if (r
== null) continue;
1920 if (r
.shouldFlush(whyFlush
)) {
1921 FlushRequester requester
= server
.getFlushRequester();
1922 if (requester
!= null) {
1923 long randomDelay
= RandomUtils
.nextLong(0, rangeOfDelayMs
) + MIN_DELAY_TIME
;
1924 //Throttle the flushes by putting a delay. If we don't throttle, and there
1925 //is a balanced write-load on the regions in a table, we might end up
1926 //overwhelming the filesystem with too many flushes at once.
1927 if (requester
.requestDelayedFlush(r
, randomDelay
)) {
1928 LOG
.info("{} requesting flush of {} because {} after random delay {} ms",
1929 getName(), r
.getRegionInfo().getRegionNameAsString(), whyFlush
.toString(),
1939 * Report the status of the server. A server is online once all the startup is
1940 * completed (setting up filesystem, starting executorService threads, etc.). This
1941 * method is designed mostly to be useful in tests.
1943 * @return true if online, false if not.
1945 public boolean isOnline() {
1946 return online
.get();
1950 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1951 * be hooked up to WAL.
1953 private void setupWALAndReplication() throws IOException
{
1954 boolean isMaster
= this instanceof HMaster
;
1955 WALFactory factory
=
1956 new WALFactory(conf
, serverName
.toString(), this, !isMaster
);
1958 // TODO Replication make assumptions here based on the default filesystem impl
1959 Path oldLogDir
= new Path(walRootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
1960 String logName
= AbstractFSWALProvider
.getWALDirectoryName(this.serverName
.toString());
1962 Path logDir
= new Path(walRootDir
, logName
);
1963 LOG
.debug("logDir={}", logDir
);
1964 if (this.walFs
.exists(logDir
)) {
1965 throw new RegionServerRunningException(
1966 "Region server has already created directory at " + this.serverName
.toString());
1968 // Always create wal directory as now we need this when master restarts to find out the live
1970 if (!this.walFs
.mkdirs(logDir
)) {
1971 throw new IOException("Can not create wal directory " + logDir
);
1973 // Instantiate replication if replication enabled. Pass it the log directories.
1974 createNewReplicationInstance(conf
, this, this.walFs
, logDir
, oldLogDir
, factory
);
1976 this.walFactory
= factory
;
1980 * Start up replication source and sink handlers.
1982 private void startReplicationService() throws IOException
{
1983 if (sameReplicationSourceAndSink
&& this.replicationSourceHandler
!= null) {
1984 this.replicationSourceHandler
.startReplicationService();
1986 if (this.replicationSourceHandler
!= null) {
1987 this.replicationSourceHandler
.startReplicationService();
1989 if (this.replicationSinkHandler
!= null) {
1990 this.replicationSinkHandler
.startReplicationService();
1996 * @return Master address tracker instance.
1998 public MasterAddressTracker
getMasterAddressTracker() {
1999 return this.masterAddressTracker
;
2003 * Start maintenance Threads, Server, Worker and lease checker threads.
2004 * Start all threads we need to run. This is called after we've successfully
2005 * registered with the Master.
2006 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
2007 * get an unhandled exception. We cannot set the handler on all threads.
2008 * Server's internal Listener thread is off limits. For Server, if an OOME, it
2009 * waits a while then retries. Meantime, a flush or a compaction that tries to
2010 * run should trigger same critical condition and the shutdown will run. On
2011 * its way out, this server will shut down Server. Leases are sort of
2012 * inbetween. It has an internal thread that while it inherits from Chore, it
2013 * keeps its own internal stop mechanism so needs to be stopped by this
2014 * hosting server. Worker logs the exception and exits.
2016 private void startServices() throws IOException
{
2017 if (!isStopped() && !isAborted()) {
2018 initializeThreads();
2020 this.secureBulkLoadManager
= new SecureBulkLoadManager(this.conf
, asyncClusterConnection
);
2021 this.secureBulkLoadManager
.start();
2023 // Health checker thread.
2024 if (isHealthCheckerConfigured()) {
2025 int sleepTime
= this.conf
.getInt(HConstants
.HEALTH_CHORE_WAKE_FREQ
,
2026 HConstants
.DEFAULT_THREAD_WAKE_FREQUENCY
);
2027 healthCheckChore
= new HealthCheckChore(sleepTime
, this, getConfiguration());
2029 // Executor status collect thread.
2030 if (this.conf
.getBoolean(HConstants
.EXECUTOR_STATUS_COLLECT_ENABLED
,
2031 HConstants
.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED
)) {
2032 int sleepTime
= this.conf
.getInt(ExecutorStatusChore
.WAKE_FREQ
,
2033 ExecutorStatusChore
.DEFAULT_WAKE_FREQ
);
2034 executorStatusChore
= new ExecutorStatusChore(sleepTime
, this, this.getExecutorService(),
2035 this.metricsRegionServer
.getMetricsSource());
2038 this.walRoller
= new LogRoller(this);
2039 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, conf
);
2040 this.procedureResultReporter
= new RemoteProcedureResultReporter(this);
2042 // Create the CompactedFileDischarger chore executorService. This chore helps to
2043 // remove the compacted files that will no longer be used in reads.
2044 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
2045 // 2 mins so that compacted files can be archived before the TTLCleaner runs
2046 int cleanerInterval
=
2047 conf
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
2048 this.compactedFileDischarger
=
2049 new CompactedHFilesDischarger(cleanerInterval
, this, this);
2050 choreService
.scheduleChore(compactedFileDischarger
);
2052 // Start executor services
2053 final int openRegionThreads
= conf
.getInt("hbase.regionserver.executor.openregion.threads", 3);
2054 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2055 ExecutorType
.RS_OPEN_REGION
).setCorePoolSize(openRegionThreads
));
2056 final int openMetaThreads
= conf
.getInt("hbase.regionserver.executor.openmeta.threads", 1);
2057 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2058 ExecutorType
.RS_OPEN_META
).setCorePoolSize(openMetaThreads
));
2059 final int openPriorityRegionThreads
=
2060 conf
.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
2061 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2062 ExecutorType
.RS_OPEN_PRIORITY_REGION
).setCorePoolSize(openPriorityRegionThreads
));
2063 final int closeRegionThreads
=
2064 conf
.getInt("hbase.regionserver.executor.closeregion.threads", 3);
2065 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2066 ExecutorType
.RS_CLOSE_REGION
).setCorePoolSize(closeRegionThreads
));
2067 final int closeMetaThreads
= conf
.getInt("hbase.regionserver.executor.closemeta.threads", 1);
2068 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2069 ExecutorType
.RS_CLOSE_META
).setCorePoolSize(closeMetaThreads
));
2070 if (conf
.getBoolean(StoreScanner
.STORESCANNER_PARALLEL_SEEK_ENABLE
, false)) {
2071 final int storeScannerParallelSeekThreads
=
2072 conf
.getInt("hbase.storescanner.parallel.seek.threads", 10);
2073 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2074 ExecutorType
.RS_PARALLEL_SEEK
).setCorePoolSize(storeScannerParallelSeekThreads
)
2075 .setAllowCoreThreadTimeout(true));
2077 final int logReplayOpsThreads
= conf
.getInt(
2078 HBASE_SPLIT_WAL_MAX_SPLITTER
, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER
);
2079 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2080 ExecutorType
.RS_LOG_REPLAY_OPS
).setCorePoolSize(logReplayOpsThreads
)
2081 .setAllowCoreThreadTimeout(true));
2082 // Start the threads for compacted files discharger
2083 final int compactionDischargerThreads
=
2084 conf
.getInt(CompactionConfiguration
.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT
, 10);
2085 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2086 ExecutorType
.RS_COMPACTED_FILES_DISCHARGER
).setCorePoolSize(compactionDischargerThreads
));
2087 if (ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(conf
)) {
2088 final int regionReplicaFlushThreads
= conf
.getInt(
2089 "hbase.regionserver.region.replica.flusher.threads", conf
.getInt(
2090 "hbase.regionserver.executor.openregion.threads", 3));
2091 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2092 ExecutorType
.RS_REGION_REPLICA_FLUSH_OPS
).setCorePoolSize(regionReplicaFlushThreads
));
2094 final int refreshPeerThreads
=
2095 conf
.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
2096 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2097 ExecutorType
.RS_REFRESH_PEER
).setCorePoolSize(refreshPeerThreads
));
2098 final int replaySyncReplicationWALThreads
=
2099 conf
.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
2100 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2101 ExecutorType
.RS_REPLAY_SYNC_REPLICATION_WAL
).setCorePoolSize(
2102 replaySyncReplicationWALThreads
));
2103 final int switchRpcThrottleThreads
=
2104 conf
.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
2105 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2106 ExecutorType
.RS_SWITCH_RPC_THROTTLE
).setCorePoolSize(switchRpcThrottleThreads
));
2107 final int claimReplicationQueueThreads
=
2108 conf
.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
2109 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2110 ExecutorType
.RS_CLAIM_REPLICATION_QUEUE
).setCorePoolSize(claimReplicationQueueThreads
));
2112 Threads
.setDaemonThreadRunning(this.walRoller
, getName() + ".logRoller",
2113 uncaughtExceptionHandler
);
2114 if (this.cacheFlusher
!= null) {
2115 this.cacheFlusher
.start(uncaughtExceptionHandler
);
2117 Threads
.setDaemonThreadRunning(this.procedureResultReporter
,
2118 getName() + ".procedureResultReporter", uncaughtExceptionHandler
);
2120 if (this.compactionChecker
!= null) {
2121 choreService
.scheduleChore(compactionChecker
);
2123 if (this.periodicFlusher
!= null) {
2124 choreService
.scheduleChore(periodicFlusher
);
2126 if (this.healthCheckChore
!= null) {
2127 choreService
.scheduleChore(healthCheckChore
);
2129 if (this.executorStatusChore
!= null) {
2130 choreService
.scheduleChore(executorStatusChore
);
2132 if (this.nonceManagerChore
!= null) {
2133 choreService
.scheduleChore(nonceManagerChore
);
2135 if (this.storefileRefresher
!= null) {
2136 choreService
.scheduleChore(storefileRefresher
);
2138 if (this.fsUtilizationChore
!= null) {
2139 choreService
.scheduleChore(fsUtilizationChore
);
2141 if (this.slowLogTableOpsChore
!= null) {
2142 choreService
.scheduleChore(slowLogTableOpsChore
);
2145 // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2146 // an unhandled exception, it will just exit.
2147 Threads
.setDaemonThreadRunning(this.leaseManager
, getName() + ".leaseChecker",
2148 uncaughtExceptionHandler
);
2150 // Create the log splitting worker and start it
2151 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2152 // quite a while inside Connection layer. The worker won't be available for other
2153 // tasks even after current task is preempted after a split task times out.
2154 Configuration sinkConf
= HBaseConfiguration
.create(conf
);
2155 sinkConf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
2156 conf
.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2157 sinkConf
.setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
,
2158 conf
.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2159 sinkConf
.setInt(HConstants
.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER
, 1);
2160 if (this.csm
!= null && conf
.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK
,
2161 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
)) {
2162 // SplitLogWorker needs csm. If none, don't start this.
2163 this.splitLogWorker
= new SplitLogWorker(sinkConf
, this, this, walFactory
);
2164 splitLogWorker
.start();
2165 LOG
.debug("SplitLogWorker started");
2168 // Memstore services.
2169 startHeapMemoryManager();
2170 // Call it after starting HeapMemoryManager.
2171 initializeMemStoreChunkCreator();
2174 private void initializeThreads() {
2175 // Cache flushing thread.
2176 this.cacheFlusher
= new MemStoreFlusher(conf
, this);
2178 // Compaction thread
2179 this.compactSplitThread
= new CompactSplit(this);
2181 // Background thread to check for compactions; needed if region has not gotten updates
2182 // in a while. It will take care of not checking too frequently on store-by-store basis.
2183 this.compactionChecker
= new CompactionChecker(this, this.compactionCheckFrequency
, this);
2184 this.periodicFlusher
= new PeriodicMemStoreFlusher(this.flushCheckFrequency
, this);
2185 this.leaseManager
= new LeaseManager(this.threadWakeFrequency
);
2187 final boolean isSlowLogTableEnabled
= conf
.getBoolean(HConstants
.SLOW_LOG_SYS_TABLE_ENABLED_KEY
,
2188 HConstants
.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY
);
2189 if (isSlowLogTableEnabled
) {
2190 // default chore duration: 10 min
2191 final int duration
= conf
.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
2192 slowLogTableOpsChore
= new SlowLogTableOpsChore(this, duration
, this.namedQueueRecorder
);
2195 if (this.nonceManager
!= null) {
2196 // Create the scheduled chore that cleans up nonces.
2197 nonceManagerChore
= this.nonceManager
.createCleanupScheduledChore(this);
2200 // Setup the Quota Manager
2201 rsQuotaManager
= new RegionServerRpcQuotaManager(this);
2202 rsSpaceQuotaManager
= new RegionServerSpaceQuotaManager(this);
2204 if (QuotaUtil
.isQuotaEnabled(conf
)) {
2205 this.fsUtilizationChore
= new FileSystemUtilizationChore(this);
2209 boolean onlyMetaRefresh
= false;
2210 int storefileRefreshPeriod
= conf
.getInt(
2211 StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
,
2212 StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
2213 if (storefileRefreshPeriod
== 0) {
2214 storefileRefreshPeriod
= conf
.getInt(
2215 StorefileRefresherChore
.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD
,
2216 StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
2217 onlyMetaRefresh
= true;
2219 if (storefileRefreshPeriod
> 0) {
2220 this.storefileRefresher
= new StorefileRefresherChore(storefileRefreshPeriod
,
2221 onlyMetaRefresh
, this, this);
2223 registerConfigurationObservers();
2226 private void registerConfigurationObservers() {
2227 // Registering the compactSplitThread object with the ConfigurationManager.
2228 configurationManager
.registerObserver(this.compactSplitThread
);
2229 configurationManager
.registerObserver(this.rpcServices
);
2230 configurationManager
.registerObserver(this);
2234 * Puts up the webui.
2236 private void putUpWebUI() throws IOException
{
2237 int port
= this.conf
.getInt(HConstants
.REGIONSERVER_INFO_PORT
,
2238 HConstants
.DEFAULT_REGIONSERVER_INFOPORT
);
2239 String addr
= this.conf
.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
2241 if(this instanceof HMaster
) {
2242 port
= conf
.getInt(HConstants
.MASTER_INFO_PORT
,
2243 HConstants
.DEFAULT_MASTER_INFOPORT
);
2244 addr
= this.conf
.get("hbase.master.info.bindAddress", "0.0.0.0");
2246 // -1 is for disabling info server
2251 if (!Addressing
.isLocalAddress(InetAddress
.getByName(addr
))) {
2253 "Failed to start http info server. Address " + addr
2254 + " does not belong to this host. Correct configuration parameter: "
2255 + "hbase.regionserver.info.bindAddress";
2257 throw new IOException(msg
);
2259 // check if auto port bind enabled
2260 boolean auto
= this.conf
.getBoolean(HConstants
.REGIONSERVER_INFO_PORT_AUTO
, false);
2263 this.infoServer
= new InfoServer(getProcessName(), addr
, port
, false, this.conf
);
2264 infoServer
.addPrivilegedServlet("dump", "/dump", getDumpServlet());
2265 configureInfoServer();
2266 this.infoServer
.start();
2268 } catch (BindException e
) {
2270 // auto bind disabled throw BindException
2271 LOG
.error("Failed binding http info server to port: " + port
);
2274 // auto bind enabled, try to use another port
2275 LOG
.info("Failed binding http info server to port: " + port
);
2277 LOG
.info("Retry starting http info server with port: " + port
);
2280 port
= this.infoServer
.getPort();
2281 conf
.setInt(HConstants
.REGIONSERVER_INFO_PORT
, port
);
2282 int masterInfoPort
= conf
.getInt(HConstants
.MASTER_INFO_PORT
,
2283 HConstants
.DEFAULT_MASTER_INFOPORT
);
2284 conf
.setInt("hbase.master.info.port.orig", masterInfoPort
);
2285 conf
.setInt(HConstants
.MASTER_INFO_PORT
, port
);
2289 * Verify that server is healthy
2291 private boolean isHealthy() {
2293 // File system problem
2296 // Verify that all threads are alive
2297 boolean healthy
= (this.leaseManager
== null || this.leaseManager
.isAlive())
2298 && (this.cacheFlusher
== null || this.cacheFlusher
.isAlive())
2299 && (this.walRoller
== null || this.walRoller
.isAlive())
2300 && (this.compactionChecker
== null || this.compactionChecker
.isScheduled())
2301 && (this.periodicFlusher
== null || this.periodicFlusher
.isScheduled());
2303 stop("One or more threads are no longer alive -- stop");
2309 public List
<WAL
> getWALs() {
2310 return walFactory
.getWALs();
2314 public WAL
getWAL(RegionInfo regionInfo
) throws IOException
{
2315 WAL wal
= walFactory
.getWAL(regionInfo
);
2316 if (this.walRoller
!= null) {
2317 this.walRoller
.addWAL(wal
);
2322 public LogRoller
getWalRoller() {
2326 WALFactory
getWalFactory() {
2331 public Connection
getConnection() {
2332 return getAsyncConnection().toConnection();
2336 public void stop(final String msg
) {
2337 stop(msg
, false, RpcServer
.getRequestUser().orElse(null));
2341 * Stops the regionserver.
2342 * @param msg Status message
2343 * @param force True if this is a regionserver abort
2344 * @param user The user executing the stop request, or null if no user is associated
2346 public void stop(final String msg
, final boolean force
, final User user
) {
2347 if (!this.stopped
) {
2348 LOG
.info("***** STOPPING region server '" + this + "' *****");
2349 if (this.rsHost
!= null) {
2350 // when forced via abort don't allow CPs to override
2352 this.rsHost
.preStop(msg
, user
);
2353 } catch (IOException ioe
) {
2355 LOG
.warn("The region server did not stop", ioe
);
2358 LOG
.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe
);
2361 this.stopped
= true;
2362 LOG
.info("STOPPED: " + msg
);
2363 // Wakes run() if it is sleeping
2364 sleeper
.skipSleepCycle();
2368 public void waitForServerOnline(){
2369 while (!isStopped() && !isOnline()) {
2370 synchronized (online
) {
2372 online
.wait(msgInterval
);
2373 } catch (InterruptedException ie
) {
2374 Thread
.currentThread().interrupt();
2382 public void postOpenDeployTasks(final PostOpenDeployContext context
) throws IOException
{
2383 HRegion r
= context
.getRegion();
2384 long openProcId
= context
.getOpenProcId();
2385 long masterSystemTime
= context
.getMasterSystemTime();
2386 rpcServices
.checkOpen();
2387 LOG
.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2388 r
.getRegionInfo().getRegionNameAsString(), openProcId
, masterSystemTime
);
2389 // Do checks to see if we need to compact (references or too many files)
2390 // Skip compaction check if region is read only
2391 if (!r
.isReadOnly()) {
2392 for (HStore s
: r
.stores
.values()) {
2393 if (s
.hasReferences() || s
.needsCompaction()) {
2394 this.compactSplitThread
.requestSystemCompaction(r
, s
, "Opening Region");
2398 long openSeqNum
= r
.getOpenSeqNum();
2399 if (openSeqNum
== HConstants
.NO_SEQNUM
) {
2400 // If we opened a region, we should have read some sequence number from it.
2402 "No sequence number found when opening " + r
.getRegionInfo().getRegionNameAsString());
2407 if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode
.OPENED
,
2408 openSeqNum
, openProcId
, masterSystemTime
, r
.getRegionInfo()))) {
2409 throw new IOException(
2410 "Failed to report opened region to master: " + r
.getRegionInfo().getRegionNameAsString());
2413 triggerFlushInPrimaryRegion(r
);
2415 LOG
.debug("Finished post open deploy task for " + r
.getRegionInfo().getRegionNameAsString());
2419 * Helper method for use in tests. Skip the region transition report when there's no master
2420 * around to receive it.
2422 private boolean skipReportingTransition(final RegionStateTransitionContext context
) {
2423 final TransitionCode code
= context
.getCode();
2424 final long openSeqNum
= context
.getOpenSeqNum();
2425 long masterSystemTime
= context
.getMasterSystemTime();
2426 final RegionInfo
[] hris
= context
.getHris();
2428 if (code
== TransitionCode
.OPENED
) {
2429 Preconditions
.checkArgument(hris
!= null && hris
.length
== 1);
2430 if (hris
[0].isMetaRegion()) {
2432 MetaTableLocator
.setMetaLocation(getZooKeeper(), serverName
,
2433 hris
[0].getReplicaId(), RegionState
.State
.OPEN
);
2434 } catch (KeeperException e
) {
2435 LOG
.info("Failed to update meta location", e
);
2440 MetaTableAccessor
.updateRegionLocation(asyncClusterConnection
.toConnection(), hris
[0],
2441 serverName
, openSeqNum
, masterSystemTime
);
2442 } catch (IOException e
) {
2443 LOG
.info("Failed to update meta", e
);
2451 private ReportRegionStateTransitionRequest
createReportRegionStateTransitionRequest(
2452 final RegionStateTransitionContext context
) {
2453 final TransitionCode code
= context
.getCode();
2454 final long openSeqNum
= context
.getOpenSeqNum();
2455 final RegionInfo
[] hris
= context
.getHris();
2456 final long[] procIds
= context
.getProcIds();
2458 ReportRegionStateTransitionRequest
.Builder builder
=
2459 ReportRegionStateTransitionRequest
.newBuilder();
2460 builder
.setServer(ProtobufUtil
.toServerName(serverName
));
2461 RegionStateTransition
.Builder transition
= builder
.addTransitionBuilder();
2462 transition
.setTransitionCode(code
);
2463 if (code
== TransitionCode
.OPENED
&& openSeqNum
>= 0) {
2464 transition
.setOpenSeqNum(openSeqNum
);
2466 for (RegionInfo hri
: hris
) {
2467 transition
.addRegionInfo(ProtobufUtil
.toRegionInfo(hri
));
2469 for (long procId
: procIds
) {
2470 transition
.addProcId(procId
);
2473 return builder
.build();
2477 public boolean reportRegionStateTransition(final RegionStateTransitionContext context
) {
2478 if (TEST_SKIP_REPORTING_TRANSITION
) {
2479 return skipReportingTransition(context
);
2481 final ReportRegionStateTransitionRequest request
=
2482 createReportRegionStateTransitionRequest(context
);
2485 long pauseTime
= this.retryPauseTime
;
2486 // Keep looping till we get an error. We want to send reports even though server is going down.
2487 // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2488 // HRegionServer does down.
2489 while (this.asyncClusterConnection
!= null && !this.asyncClusterConnection
.isClosed()) {
2490 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2493 createRegionServerStatusStub();
2496 ReportRegionStateTransitionResponse response
=
2497 rss
.reportRegionStateTransition(null, request
);
2498 if (response
.hasErrorMessage()) {
2499 LOG
.info("TRANSITION FAILED " + request
+ ": " + response
.getErrorMessage());
2502 // Log if we had to retry else don't log unless TRACE. We want to
2503 // know if were successful after an attempt showed in logs as failed.
2504 if (tries
> 0 || LOG
.isTraceEnabled()) {
2505 LOG
.info("TRANSITION REPORTED " + request
);
2507 // NOTE: Return mid-method!!!
2509 } catch (ServiceException se
) {
2510 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2512 ioe
instanceof ServerNotRunningYetException
|| ioe
instanceof PleaseHoldException
2513 || ioe
instanceof CallQueueTooBigException
;
2515 // Do backoff else we flood the Master with requests.
2516 pauseTime
= ConnectionUtils
.getPauseTime(this.retryPauseTime
, tries
);
2518 pauseTime
= this.retryPauseTime
; // Reset.
2520 LOG
.info("Failed report transition " +
2521 TextFormat
.shortDebugString(request
) + "; retry (#" + tries
+ ")" +
2523 " after " + pauseTime
+ "ms delay (Master is coming online...).":
2526 if (pause
) Threads
.sleep(pauseTime
);
2528 if (rssStub
== rss
) {
2537 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2538 * block this thread. See RegionReplicaFlushHandler for details.
2540 private void triggerFlushInPrimaryRegion(final HRegion region
) {
2541 if (ServerRegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo())) {
2544 TableName tn
= region
.getTableDescriptor().getTableName();
2545 if (!ServerRegionReplicaUtil
.isRegionReplicaReplicationEnabled(region
.conf
, tn
) ||
2546 !ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(region
.conf
)) {
2547 region
.setReadsEnabled(true);
2551 region
.setReadsEnabled(false); // disable reads before marking the region as opened.
2552 // RegionReplicaFlushHandler might reset this.
2554 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2555 if (this.executorService
!= null) {
2556 this.executorService
.submit(new RegionReplicaFlushHandler(this, region
));
2558 LOG
.info("Executor is null; not running flush of primary region replica for {}",
2559 region
.getRegionInfo());
2564 public RpcServerInterface
getRpcServer() {
2565 return rpcServices
.rpcServer
;
2568 @InterfaceAudience.Private
2569 public RSRpcServices
getRSRpcServices() {
2574 * Cause the server to exit without closing the regions it is serving, the log
2575 * it is using and without notifying the master. Used unit testing and on
2576 * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2579 * the reason we are aborting
2581 * the exception that caused the abort, or null
2584 public void abort(String reason
, Throwable cause
) {
2585 if (!setAbortRequested()) {
2586 // Abort already in progress, ignore the new request.
2588 "Abort already in progress. Ignoring the current request with reason: {}", reason
);
2591 String msg
= "***** ABORTING region server " + this + ": " + reason
+ " *****";
2592 if (cause
!= null) {
2593 LOG
.error(HBaseMarkers
.FATAL
, msg
, cause
);
2595 LOG
.error(HBaseMarkers
.FATAL
, msg
);
2597 // HBASE-4014: show list of coprocessors that were loaded to help debug
2598 // regionserver crashes.Note that we're implicitly using
2599 // java.util.HashSet's toString() method to print the coprocessor names.
2600 LOG
.error(HBaseMarkers
.FATAL
, "RegionServer abort: loaded coprocessors are: " +
2601 CoprocessorHost
.getLoadedCoprocessors());
2602 // Try and dump metrics if abort -- might give clue as to how fatal came about....
2604 LOG
.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics
.dumpMetrics());
2605 } catch (MalformedObjectNameException
| IOException e
) {
2606 LOG
.warn("Failed dumping metrics", e
);
2609 // Do our best to report our abort to the master, but this may not work
2611 if (cause
!= null) {
2612 msg
+= "\nCause:\n" + Throwables
.getStackTraceAsString(cause
);
2614 // Report to the master but only if we have already registered with the master.
2615 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2616 if (rss
!= null && this.serverName
!= null) {
2617 ReportRSFatalErrorRequest
.Builder builder
=
2618 ReportRSFatalErrorRequest
.newBuilder();
2619 builder
.setServer(ProtobufUtil
.toServerName(this.serverName
));
2620 builder
.setErrorMessage(msg
);
2621 rss
.reportRSFatalError(null, builder
.build());
2623 } catch (Throwable t
) {
2624 LOG
.warn("Unable to report fatal error to master", t
);
2627 scheduleAbortTimer();
2628 // shutdown should be run as the internal user
2629 stop(reason
, true, null);
2633 * Sets the abort state if not already set.
2634 * @return True if abortRequested set to True successfully, false if an abort is already in
2637 protected boolean setAbortRequested() {
2638 return abortRequested
.compareAndSet(false, true);
2642 * @see HRegionServer#abort(String, Throwable)
2644 public void abort(String reason
) {
2645 abort(reason
, null);
2649 public boolean isAborted() {
2650 return abortRequested
.get();
2654 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2655 * logs but it does close socket in case want to bring up server on old
2656 * hostname+port immediately.
2658 @InterfaceAudience.Private
2659 protected void kill() {
2661 abort("Simulated kill");
2664 // Limits the time spent in the shutdown process.
2665 private void scheduleAbortTimer() {
2666 if (this.abortMonitor
== null) {
2667 this.abortMonitor
= new Timer("Abort regionserver monitor", true);
2668 TimerTask abortTimeoutTask
= null;
2670 Constructor
<?
extends TimerTask
> timerTaskCtor
=
2671 Class
.forName(conf
.get(ABORT_TIMEOUT_TASK
, SystemExitWhenAbortTimeout
.class.getName()))
2672 .asSubclass(TimerTask
.class).getDeclaredConstructor();
2673 timerTaskCtor
.setAccessible(true);
2674 abortTimeoutTask
= timerTaskCtor
.newInstance();
2675 } catch (Exception e
) {
2676 LOG
.warn("Initialize abort timeout task failed", e
);
2678 if (abortTimeoutTask
!= null) {
2679 abortMonitor
.schedule(abortTimeoutTask
, conf
.getLong(ABORT_TIMEOUT
, DEFAULT_ABORT_TIMEOUT
));
2684 protected final void shutdownChore(ScheduledChore chore
) {
2685 if (chore
!= null) {
2690 * Wait on all threads to finish. Presumption is that all closes and stops
2691 * have already been called.
2693 protected void stopServiceThreads() {
2694 // clean up the scheduled chores
2695 if (this.choreService
!= null) {
2696 shutdownChore(nonceManagerChore
);
2697 shutdownChore(compactionChecker
);
2698 shutdownChore(periodicFlusher
);
2699 shutdownChore(healthCheckChore
);
2700 shutdownChore(executorStatusChore
);
2701 shutdownChore(storefileRefresher
);
2702 shutdownChore(fsUtilizationChore
);
2703 shutdownChore(slowLogTableOpsChore
);
2704 // cancel the remaining scheduled chores (in case we missed out any)
2705 // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
2706 choreService
.shutdown();
2709 if (this.cacheFlusher
!= null) {
2710 this.cacheFlusher
.join();
2712 if (this.walRoller
!= null) {
2713 this.walRoller
.close();
2715 if (this.compactSplitThread
!= null) {
2716 this.compactSplitThread
.join();
2718 if (this.executorService
!= null) {
2719 this.executorService
.shutdown();
2721 if (sameReplicationSourceAndSink
&& this.replicationSourceHandler
!= null) {
2722 this.replicationSourceHandler
.stopReplicationService();
2724 if (this.replicationSourceHandler
!= null) {
2725 this.replicationSourceHandler
.stopReplicationService();
2727 if (this.replicationSinkHandler
!= null) {
2728 this.replicationSinkHandler
.stopReplicationService();
2734 * @return Return the object that implements the replication
2735 * source executorService.
2738 public ReplicationSourceService
getReplicationSourceService() {
2739 return replicationSourceHandler
;
2743 * @return Return the object that implements the replication sink executorService.
2745 public ReplicationSinkService
getReplicationSinkService() {
2746 return replicationSinkHandler
;
2750 * Get the current master from ZooKeeper and open the RPC connection to it.
2751 * To get a fresh connection, the current rssStub must be null.
2752 * Method will block until a master is available. You can break from this
2753 * block by requesting the server stop.
2755 * @return master + port, or null if server has been stopped
2757 private synchronized ServerName
createRegionServerStatusStub() {
2758 // Create RS stub without refreshing the master node from ZK, use cached data
2759 return createRegionServerStatusStub(false);
2763 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2764 * connection, the current rssStub must be null. Method will block until a master is available.
2765 * You can break from this block by requesting the server stop.
2766 * @param refresh If true then master address will be read from ZK, otherwise use cached data
2767 * @return master + port, or null if server has been stopped
2769 @InterfaceAudience.Private
2770 protected synchronized ServerName
createRegionServerStatusStub(boolean refresh
) {
2771 if (rssStub
!= null) {
2772 return masterAddressTracker
.getMasterAddress();
2774 ServerName sn
= null;
2775 long previousLogTime
= 0;
2776 RegionServerStatusService
.BlockingInterface intRssStub
= null;
2777 LockService
.BlockingInterface intLockStub
= null;
2778 boolean interrupted
= false;
2780 while (keepLooping()) {
2781 sn
= this.masterAddressTracker
.getMasterAddress(refresh
);
2783 if (!keepLooping()) {
2784 // give up with no connection.
2785 LOG
.debug("No master found and cluster is stopped; bailing out");
2788 if (EnvironmentEdgeManager
.currentTime() > (previousLogTime
+ 1000)) {
2789 LOG
.debug("No master found; retry");
2790 previousLogTime
= EnvironmentEdgeManager
.currentTime();
2792 refresh
= true; // let's try pull it from ZK directly
2793 if (sleepInterrupted(200)) {
2799 // If we are on the active master, use the shortcut
2800 if (this instanceof HMaster
&& sn
.equals(getServerName())) {
2801 // Wrap the shortcut in a class providing our version to the calls where it's relevant.
2802 // Normally, RpcServer-based threadlocals do that.
2803 intRssStub
= new MasterRpcServicesVersionWrapper(((HMaster
)this).getMasterRpcServices());
2804 intLockStub
= ((HMaster
)this).getMasterRpcServices();
2808 BlockingRpcChannel channel
=
2809 this.rpcClient
.createBlockingRpcChannel(sn
, userProvider
.getCurrent(),
2810 shortOperationTimeout
);
2811 intRssStub
= RegionServerStatusService
.newBlockingStub(channel
);
2812 intLockStub
= LockService
.newBlockingStub(channel
);
2814 } catch (IOException e
) {
2815 if (EnvironmentEdgeManager
.currentTime() > (previousLogTime
+ 1000)) {
2816 e
= e
instanceof RemoteException ?
2817 ((RemoteException
)e
).unwrapRemoteException() : e
;
2818 if (e
instanceof ServerNotRunningYetException
) {
2819 LOG
.info("Master isn't available yet, retrying");
2821 LOG
.warn("Unable to connect to master. Retrying. Error was:", e
);
2823 previousLogTime
= EnvironmentEdgeManager
.currentTime();
2825 if (sleepInterrupted(200)) {
2832 Thread
.currentThread().interrupt();
2835 this.rssStub
= intRssStub
;
2836 this.lockStub
= intLockStub
;
2841 * @return True if we should break loop because cluster is going down or
2842 * this server has been stopped or hdfs has gone bad.
2844 private boolean keepLooping() {
2845 return !this.stopped
&& isClusterUp();
2849 * Let the master know we're here Run initialization using parameters passed
2851 * @return A Map of key/value configurations we got from the Master else
2852 * null if we failed to register.
2853 * @throws IOException
2855 private RegionServerStartupResponse
reportForDuty() throws IOException
{
2856 if (this.masterless
) return RegionServerStartupResponse
.getDefaultInstance();
2857 ServerName masterServerName
= createRegionServerStatusStub(true);
2858 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2859 if (masterServerName
== null || rss
== null) return null;
2860 RegionServerStartupResponse result
= null;
2862 rpcServices
.requestCount
.reset();
2863 rpcServices
.rpcGetRequestCount
.reset();
2864 rpcServices
.rpcScanRequestCount
.reset();
2865 rpcServices
.rpcFullScanRequestCount
.reset();
2866 rpcServices
.rpcMultiRequestCount
.reset();
2867 rpcServices
.rpcMutateRequestCount
.reset();
2868 LOG
.info("reportForDuty to master=" + masterServerName
+ " with isa="
2869 + rpcServices
.isa
+ ", startcode=" + this.startcode
);
2870 long now
= EnvironmentEdgeManager
.currentTime();
2871 int port
= rpcServices
.isa
.getPort();
2872 RegionServerStartupRequest
.Builder request
= RegionServerStartupRequest
.newBuilder();
2873 if (!StringUtils
.isBlank(useThisHostnameInstead
)) {
2874 request
.setUseThisHostnameInstead(useThisHostnameInstead
);
2876 request
.setPort(port
);
2877 request
.setServerStartCode(this.startcode
);
2878 request
.setServerCurrentTime(now
);
2879 result
= rss
.regionServerStartup(null, request
.build());
2880 } catch (ServiceException se
) {
2881 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2882 if (ioe
instanceof ClockOutOfSyncException
) {
2883 LOG
.error(HBaseMarkers
.FATAL
, "Master rejected startup because clock is out of sync",
2885 // Re-throw IOE will cause RS to abort
2887 } else if (ioe
instanceof ServerNotRunningYetException
) {
2888 LOG
.debug("Master is not running yet");
2890 LOG
.warn("error telling master we are up", se
);
2898 public RegionStoreSequenceIds
getLastSequenceId(byte[] encodedRegionName
) {
2900 GetLastFlushedSequenceIdRequest req
=
2901 RequestConverter
.buildGetLastFlushedSequenceIdRequest(encodedRegionName
);
2902 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2903 if (rss
== null) { // Try to connect one more time
2904 createRegionServerStatusStub();
2907 // Still no luck, we tried
2908 LOG
.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2909 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2913 GetLastFlushedSequenceIdResponse resp
= rss
.getLastFlushedSequenceId(null, req
);
2914 return RegionStoreSequenceIds
.newBuilder()
2915 .setLastFlushedSequenceId(resp
.getLastFlushedSequenceId())
2916 .addAllStoreSequenceId(resp
.getStoreLastFlushedSequenceIdList()).build();
2917 } catch (ServiceException e
) {
2918 LOG
.warn("Unable to connect to the master to check the last flushed sequence id", e
);
2919 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2925 * Close meta region if we carry it
2926 * @param abort Whether we're running an abort.
2928 private void closeMetaTableRegions(final boolean abort
) {
2929 HRegion meta
= null;
2930 this.onlineRegionsLock
.writeLock().lock();
2932 for (Map
.Entry
<String
, HRegion
> e
: onlineRegions
.entrySet()) {
2933 RegionInfo hri
= e
.getValue().getRegionInfo();
2934 if (hri
.isMetaRegion()) {
2935 meta
= e
.getValue();
2937 if (meta
!= null) break;
2940 this.onlineRegionsLock
.writeLock().unlock();
2942 if (meta
!= null) closeRegionIgnoreErrors(meta
.getRegionInfo(), abort
);
2946 * Schedule closes on all user regions.
2947 * Should be safe calling multiple times because it wont' close regions
2948 * that are already closed or that are closing.
2949 * @param abort Whether we're running an abort.
2951 private void closeUserRegions(final boolean abort
) {
2952 this.onlineRegionsLock
.writeLock().lock();
2954 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
2955 HRegion r
= e
.getValue();
2956 if (!r
.getRegionInfo().isMetaRegion() && r
.isAvailable()) {
2957 // Don't update zk with this close transition; pass false.
2958 closeRegionIgnoreErrors(r
.getRegionInfo(), abort
);
2962 this.onlineRegionsLock
.writeLock().unlock();
2966 /** @return the info server */
2967 public InfoServer
getInfoServer() {
2972 * @return true if a stop has been requested.
2975 public boolean isStopped() {
2976 return this.stopped
;
2980 public boolean isStopping() {
2981 return this.stopping
;
2985 public Configuration
getConfiguration() {
2989 protected Map
<String
, HRegion
> getOnlineRegions() {
2990 return this.onlineRegions
;
2993 public int getNumberOfOnlineRegions() {
2994 return this.onlineRegions
.size();
2998 * For tests, web ui and metrics.
2999 * This method will only work if HRegionServer is in the same JVM as client;
3000 * HRegion cannot be serialized to cross an rpc.
3002 public Collection
<HRegion
> getOnlineRegionsLocalContext() {
3003 Collection
<HRegion
> regions
= this.onlineRegions
.values();
3004 return Collections
.unmodifiableCollection(regions
);
3008 public void addRegion(HRegion region
) {
3009 this.onlineRegions
.put(region
.getRegionInfo().getEncodedName(), region
);
3010 configurationManager
.registerObserver(region
);
3013 private void addRegion(SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
, HRegion region
,
3015 if (!sortedRegions
.containsKey(size
)) {
3016 sortedRegions
.put(size
, new ArrayList
<>());
3018 sortedRegions
.get(size
).add(region
);
3021 * @return A new Map of online regions sorted by region off-heap size with the first entry being
3024 SortedMap
<Long
, Collection
<HRegion
>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
3025 // we'll sort the regions in reverse
3026 SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
= new TreeMap
<>(Comparator
.reverseOrder());
3027 // Copy over all regions. Regions are sorted by size with biggest first.
3028 for (HRegion region
: this.onlineRegions
.values()) {
3029 addRegion(sortedRegions
, region
, region
.getMemStoreOffHeapSize());
3031 return sortedRegions
;
3035 * @return A new Map of online regions sorted by region heap size with the first entry being the
3038 SortedMap
<Long
, Collection
<HRegion
>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
3039 // we'll sort the regions in reverse
3040 SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
= new TreeMap
<>(Comparator
.reverseOrder());
3041 // Copy over all regions. Regions are sorted by size with biggest first.
3042 for (HRegion region
: this.onlineRegions
.values()) {
3043 addRegion(sortedRegions
, region
, region
.getMemStoreHeapSize());
3045 return sortedRegions
;
3049 * @return time stamp in millis of when this region server was started
3051 public long getStartcode() {
3052 return this.startcode
;
3055 /** @return reference to FlushRequester */
3057 public FlushRequester
getFlushRequester() {
3058 return this.cacheFlusher
;
3062 public CompactionRequester
getCompactionRequestor() {
3063 return this.compactSplitThread
;
3067 public LeaseManager
getLeaseManager() {
3068 return leaseManager
;
3072 * @return Return the rootDir.
3074 protected Path
getDataRootDir() {
3079 public FileSystem
getFileSystem() {
3084 * @return {@code true} when the data file system is available, {@code false} otherwise.
3086 boolean isDataFileSystemOk() {
3087 return this.dataFsOk
;
3091 * @return Return the walRootDir.
3093 public Path
getWALRootDir() {
3098 * @return Return the walFs.
3100 public FileSystem
getWALFileSystem() {
3105 public String
toString() {
3106 return getServerName().toString();
3110 public ZKWatcher
getZooKeeper() {
3115 public CoordinatedStateManager
getCoordinatedStateManager() {
3120 public ServerName
getServerName() {
3124 public RegionServerCoprocessorHost
getRegionServerCoprocessorHost(){
3129 public ConcurrentMap
<byte[], Boolean
> getRegionsInTransitionInRS() {
3130 return this.regionsInTransitionInRS
;
3134 public ExecutorService
getExecutorService() {
3135 return executorService
;
3139 public ChoreService
getChoreService() {
3140 return choreService
;
3144 public RegionServerRpcQuotaManager
getRegionServerRpcQuotaManager() {
3145 return rsQuotaManager
;
3149 // Main program and support routines
3152 * Load the replication executorService objects, if any
3154 private static void createNewReplicationInstance(Configuration conf
, HRegionServer server
,
3155 FileSystem walFs
, Path walDir
, Path oldWALDir
, WALFactory walFactory
) throws IOException
{
3156 // read in the name of the source replication class from the config file.
3157 String sourceClassname
= conf
.get(HConstants
.REPLICATION_SOURCE_SERVICE_CLASSNAME
,
3158 HConstants
.REPLICATION_SERVICE_CLASSNAME_DEFAULT
);
3160 // read in the name of the sink replication class from the config file.
3161 String sinkClassname
= conf
.get(HConstants
.REPLICATION_SINK_SERVICE_CLASSNAME
,
3162 HConstants
.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT
);
3164 // If both the sink and the source class names are the same, then instantiate
3166 if (sourceClassname
.equals(sinkClassname
)) {
3167 server
.replicationSourceHandler
= newReplicationInstance(sourceClassname
,
3168 ReplicationSourceService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walFactory
);
3169 server
.replicationSinkHandler
= (ReplicationSinkService
) server
.replicationSourceHandler
;
3170 server
.sameReplicationSourceAndSink
= true;
3172 server
.replicationSourceHandler
= newReplicationInstance(sourceClassname
,
3173 ReplicationSourceService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walFactory
);
3174 server
.replicationSinkHandler
= newReplicationInstance(sinkClassname
,
3175 ReplicationSinkService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walFactory
);
3176 server
.sameReplicationSourceAndSink
= false;
3180 private static <T
extends ReplicationService
> T
newReplicationInstance(String classname
,
3181 Class
<T
> xface
, Configuration conf
, HRegionServer server
, FileSystem walFs
, Path logDir
,
3182 Path oldLogDir
, WALFactory walFactory
) throws IOException
{
3183 final Class
<?
extends T
> clazz
;
3185 ClassLoader classLoader
= Thread
.currentThread().getContextClassLoader();
3186 clazz
= Class
.forName(classname
, true, classLoader
).asSubclass(xface
);
3187 } catch (java
.lang
.ClassNotFoundException nfe
) {
3188 throw new IOException("Could not find class for " + classname
);
3190 T service
= ReflectionUtils
.newInstance(clazz
, conf
);
3191 service
.initialize(server
, walFs
, logDir
, oldLogDir
, walFactory
);
3195 public Map
<String
, ReplicationStatus
> getWalGroupsReplicationStatus(){
3196 Map
<String
, ReplicationStatus
> walGroupsReplicationStatus
= new TreeMap
<>();
3197 if(!this.isOnline()){
3198 return walGroupsReplicationStatus
;
3200 List
<ReplicationSourceInterface
> allSources
= new ArrayList
<>();
3201 allSources
.addAll(replicationSourceHandler
.getReplicationManager().getSources());
3202 allSources
.addAll(replicationSourceHandler
.getReplicationManager().getOldSources());
3203 for(ReplicationSourceInterface source
: allSources
){
3204 walGroupsReplicationStatus
.putAll(source
.getWalGroupStatus());
3206 return walGroupsReplicationStatus
;
3210 * Utility for constructing an instance of the passed HRegionServer class.
3212 static HRegionServer
constructRegionServer(
3213 final Class
<?
extends HRegionServer
> regionServerClass
,
3214 final Configuration conf
3217 Constructor
<?
extends HRegionServer
> c
=
3218 regionServerClass
.getConstructor(Configuration
.class);
3219 return c
.newInstance(conf
);
3220 } catch (Exception e
) {
3221 throw new RuntimeException("Failed construction of " + "Regionserver: "
3222 + regionServerClass
.toString(), e
);
3227 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
3229 public static void main(String
[] args
) {
3230 LOG
.info("STARTING executorService " + HRegionServer
.class.getSimpleName());
3231 VersionInfo
.logVersion();
3232 Configuration conf
= HBaseConfiguration
.create();
3233 @SuppressWarnings("unchecked")
3234 Class
<?
extends HRegionServer
> regionServerClass
= (Class
<?
extends HRegionServer
>) conf
3235 .getClass(HConstants
.REGION_SERVER_IMPL
, HRegionServer
.class);
3237 new HRegionServerCommandLine(regionServerClass
).doMain(args
);
3241 * Gets the online regions of the specified table.
3242 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>.
3243 * Only returns <em>online</em> regions. If a region on this table has been
3244 * closed during a disable, etc., it will not be included in the returned list.
3245 * So, the returned list may not necessarily be ALL regions in this table, its
3246 * all the ONLINE regions in the table.
3247 * @param tableName table to limit the scope of the query
3248 * @return Online regions from <code>tableName</code>
3251 public List
<HRegion
> getRegions(TableName tableName
) {
3252 List
<HRegion
> tableRegions
= new ArrayList
<>();
3253 synchronized (this.onlineRegions
) {
3254 for (HRegion region
: this.onlineRegions
.values()) {
3255 RegionInfo regionInfo
= region
.getRegionInfo();
3256 if(regionInfo
.getTable().equals(tableName
)) {
3257 tableRegions
.add(region
);
3261 return tableRegions
;
3265 public List
<HRegion
> getRegions() {
3266 List
<HRegion
> allRegions
;
3267 synchronized (this.onlineRegions
) {
3268 // Return a clone copy of the onlineRegions
3269 allRegions
= new ArrayList
<>(onlineRegions
.values());
3275 * Gets the online tables in this RS.
3276 * This method looks at the in-memory onlineRegions.
3277 * @return all the online tables in this RS
3279 public Set
<TableName
> getOnlineTables() {
3280 Set
<TableName
> tables
= new HashSet
<>();
3281 synchronized (this.onlineRegions
) {
3282 for (Region region
: this.onlineRegions
.values()) {
3283 tables
.add(region
.getTableDescriptor().getTableName());
3289 public String
[] getRegionServerCoprocessors() {
3290 TreeSet
<String
> coprocessors
= new TreeSet
<>();
3292 coprocessors
.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3293 } catch (IOException exception
) {
3294 LOG
.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
3296 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
3298 Collection
<HRegion
> regions
= getOnlineRegionsLocalContext();
3299 for (HRegion region
: regions
) {
3300 coprocessors
.addAll(region
.getCoprocessorHost().getCoprocessors());
3302 coprocessors
.addAll(getWAL(region
.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3303 } catch (IOException exception
) {
3304 LOG
.warn("Exception attempting to fetch wal coprocessor information for region " + region
+
3306 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
3309 coprocessors
.addAll(rsHost
.getCoprocessors());
3310 return coprocessors
.toArray(new String
[0]);
3314 * Try to close the region, logs a warning on failure but continues.
3315 * @param region Region to close
3317 private void closeRegionIgnoreErrors(RegionInfo region
, final boolean abort
) {
3319 if (!closeRegion(region
.getEncodedName(), abort
, null)) {
3320 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
3321 " - ignoring and continuing");
3323 } catch (IOException e
) {
3324 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
3325 " - ignoring and continuing", e
);
3330 * Close asynchronously a region, can be called from the master or internally by the regionserver
3331 * when stopping. If called from the master, the region will update the status.
3334 * If an opening was in progress, this method will cancel it, but will not start a new close. The
3335 * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3339 * If a close was in progress, this new request will be ignored, and an exception thrown.
3342 * @param encodedName Region to close
3343 * @param abort True if we are aborting
3344 * @param destination Where the Region is being moved too... maybe null if unknown.
3345 * @return True if closed a region.
3346 * @throws NotServingRegionException if the region is not online
3348 protected boolean closeRegion(String encodedName
, final boolean abort
,
3349 final ServerName destination
)
3350 throws NotServingRegionException
{
3351 //Check for permissions to close.
3352 HRegion actualRegion
= this.getRegion(encodedName
);
3353 // Can be null if we're calling close on a region that's not online
3354 if ((actualRegion
!= null) && (actualRegion
.getCoprocessorHost() != null)) {
3356 actualRegion
.getCoprocessorHost().preClose(false);
3357 } catch (IOException exp
) {
3358 LOG
.warn("Unable to close region: the coprocessor launched an error ", exp
);
3363 // previous can come back 'null' if not in map.
3364 final Boolean previous
= this.regionsInTransitionInRS
.putIfAbsent(Bytes
.toBytes(encodedName
),
3367 if (Boolean
.TRUE
.equals(previous
)) {
3368 LOG
.info("Received CLOSE for the region:" + encodedName
+ " , which we are already " +
3369 "trying to OPEN. Cancelling OPENING.");
3370 if (!regionsInTransitionInRS
.replace(Bytes
.toBytes(encodedName
), previous
, Boolean
.FALSE
)) {
3371 // The replace failed. That should be an exceptional case, but theoretically it can happen.
3372 // We're going to try to do a standard close then.
3373 LOG
.warn("The opening for region " + encodedName
+ " was done before we could cancel it." +
3374 " Doing a standard close now");
3375 return closeRegion(encodedName
, abort
, destination
);
3377 // Let's get the region from the online region list again
3378 actualRegion
= this.getRegion(encodedName
);
3379 if (actualRegion
== null) { // If already online, we still need to close it.
3380 LOG
.info("The opening previously in progress has been cancelled by a CLOSE request.");
3381 // The master deletes the znode when it receives this exception.
3382 throw new NotServingRegionException("The region " + encodedName
+
3383 " was opening but not yet served. Opening is cancelled.");
3385 } else if (previous
== null) {
3386 LOG
.info("Received CLOSE for {}", encodedName
);
3387 } else if (Boolean
.FALSE
.equals(previous
)) {
3388 LOG
.info("Received CLOSE for the region: " + encodedName
+
3389 ", which we are already trying to CLOSE, but not completed yet");
3393 if (actualRegion
== null) {
3394 LOG
.debug("Received CLOSE for a region which is not online, and we're not opening.");
3395 this.regionsInTransitionInRS
.remove(Bytes
.toBytes(encodedName
));
3396 // The master deletes the znode when it receives this exception.
3397 throw new NotServingRegionException("The region " + encodedName
+
3398 " is not online, and is not opening.");
3401 CloseRegionHandler crh
;
3402 final RegionInfo hri
= actualRegion
.getRegionInfo();
3403 if (hri
.isMetaRegion()) {
3404 crh
= new CloseMetaHandler(this, this, hri
, abort
);
3406 crh
= new CloseRegionHandler(this, this, hri
, abort
, destination
);
3408 this.executorService
.submit(crh
);
3413 * @return HRegion for the passed binary <code>regionName</code> or null if
3414 * named region is not member of the online regions.
3416 public HRegion
getOnlineRegion(final byte[] regionName
) {
3417 String encodedRegionName
= RegionInfo
.encodeRegionName(regionName
);
3418 return this.onlineRegions
.get(encodedRegionName
);
3422 public HRegion
getRegion(final String encodedRegionName
) {
3423 return this.onlineRegions
.get(encodedRegionName
);
3428 public boolean removeRegion(final HRegion r
, ServerName destination
) {
3429 HRegion toReturn
= this.onlineRegions
.remove(r
.getRegionInfo().getEncodedName());
3430 metricsRegionServerImpl
.requestsCountCache
.remove(r
.getRegionInfo().getEncodedName());
3431 if (destination
!= null) {
3432 long closeSeqNum
= r
.getMaxFlushedSeqId();
3433 if (closeSeqNum
== HConstants
.NO_SEQNUM
) {
3434 // No edits in WAL for this region; get the sequence number when the region was opened.
3435 closeSeqNum
= r
.getOpenSeqNum();
3436 if (closeSeqNum
== HConstants
.NO_SEQNUM
) closeSeqNum
= 0;
3438 boolean selfMove
= ServerName
.isSameAddress(destination
, this.getServerName());
3439 addToMovedRegions(r
.getRegionInfo().getEncodedName(), destination
, closeSeqNum
, selfMove
);
3441 this.regionServerAccounting
.getRetainedRegionRWRequestsCnt().put(r
.getRegionInfo().getEncodedName()
3442 , new Pair
<>(r
.getReadRequestsCount(), r
.getWriteRequestsCount()));
3445 this.regionFavoredNodesMap
.remove(r
.getRegionInfo().getEncodedName());
3446 configurationManager
.deregisterObserver(r
);
3447 return toReturn
!= null;
3451 * Protected Utility method for safely obtaining an HRegion handle.
3453 * @param regionName Name of online {@link HRegion} to return
3454 * @return {@link HRegion} for <code>regionName</code>
3456 protected HRegion
getRegion(final byte[] regionName
)
3457 throws NotServingRegionException
{
3458 String encodedRegionName
= RegionInfo
.encodeRegionName(regionName
);
3459 return getRegionByEncodedName(regionName
, encodedRegionName
);
3462 public HRegion
getRegionByEncodedName(String encodedRegionName
)
3463 throws NotServingRegionException
{
3464 return getRegionByEncodedName(null, encodedRegionName
);
3467 private HRegion
getRegionByEncodedName(byte[] regionName
, String encodedRegionName
)
3468 throws NotServingRegionException
{
3469 HRegion region
= this.onlineRegions
.get(encodedRegionName
);
3470 if (region
== null) {
3471 MovedRegionInfo moveInfo
= getMovedRegion(encodedRegionName
);
3472 if (moveInfo
!= null) {
3473 throw new RegionMovedException(moveInfo
.getServerName(), moveInfo
.getSeqNum());
3475 Boolean isOpening
= this.regionsInTransitionInRS
.get(Bytes
.toBytes(encodedRegionName
));
3476 String regionNameStr
= regionName
== null?
3477 encodedRegionName
: Bytes
.toStringBinary(regionName
);
3478 if (isOpening
!= null && isOpening
) {
3479 throw new RegionOpeningException("Region " + regionNameStr
+
3480 " is opening on " + this.serverName
);
3482 throw new NotServingRegionException("" + regionNameStr
+
3483 " is not online on " + this.serverName
);
3489 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3490 * IOE if it isn't already.
3492 * @param t Throwable
3493 * @param msg Message to log in error. Can be null.
3494 * @return Throwable converted to an IOE; methods can only let out IOEs.
3496 private Throwable
cleanup(final Throwable t
, final String msg
) {
3497 // Don't log as error if NSRE; NSRE is 'normal' operation.
3498 if (t
instanceof NotServingRegionException
) {
3499 LOG
.debug("NotServingRegionException; " + t
.getMessage());
3502 Throwable e
= t
instanceof RemoteException ?
((RemoteException
) t
).unwrapRemoteException() : t
;
3508 if (!rpcServices
.checkOOME(t
)) {
3515 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3516 * @return Make <code>t</code> an IOE if it isn't already.
3518 private IOException
convertThrowableToIOE(final Throwable t
, final String msg
) {
3519 return (t
instanceof IOException ?
(IOException
) t
: msg
== null
3520 || msg
.length() == 0 ?
new IOException(t
) : new IOException(msg
, t
));
3524 * Checks to see if the file system is still accessible. If not, sets
3525 * abortRequested and stopRequested
3527 * @return false if file system is not available
3529 boolean checkFileSystem() {
3530 if (this.dataFsOk
&& this.dataFs
!= null) {
3532 FSUtils
.checkFileSystemAvailable(this.dataFs
);
3533 } catch (IOException e
) {
3534 abort("File System not available", e
);
3535 this.dataFsOk
= false;
3538 return this.dataFsOk
;
3542 public void updateRegionFavoredNodesMapping(String encodedRegionName
,
3543 List
<org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.ServerName
> favoredNodes
) {
3544 Address
[] addr
= new Address
[favoredNodes
.size()];
3545 // Refer to the comment on the declaration of regionFavoredNodesMap on why
3546 // it is a map of region name to Address[]
3547 for (int i
= 0; i
< favoredNodes
.size(); i
++) {
3548 addr
[i
] = Address
.fromParts(favoredNodes
.get(i
).getHostName(),
3549 favoredNodes
.get(i
).getPort());
3551 regionFavoredNodesMap
.put(encodedRegionName
, addr
);
3555 * Return the favored nodes for a region given its encoded name. Look at the
3556 * comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[]
3558 * @param encodedRegionName
3559 * @return array of favored locations
3562 public InetSocketAddress
[] getFavoredNodesForRegion(String encodedRegionName
) {
3563 return Address
.toSocketAddress(regionFavoredNodesMap
.get(encodedRegionName
));
3567 public ServerNonceManager
getNonceManager() {
3568 return this.nonceManager
;
3571 private static class MovedRegionInfo
{
3572 private final ServerName serverName
;
3573 private final long seqNum
;
3575 MovedRegionInfo(ServerName serverName
, long closeSeqNum
) {
3576 this.serverName
= serverName
;
3577 this.seqNum
= closeSeqNum
;
3580 public ServerName
getServerName() {
3584 public long getSeqNum() {
3590 * We need a timeout. If not there is a risk of giving a wrong information: this would double
3591 * the number of network calls instead of reducing them.
3593 private static final int TIMEOUT_REGION_MOVED
= (2 * 60 * 1000);
3595 private void addToMovedRegions(String encodedName
, ServerName destination
, long closeSeqNum
, boolean selfMove
) {
3597 LOG
.warn("Not adding moved region record: " + encodedName
+ " to self.");
3600 LOG
.info("Adding " + encodedName
+ " move to " + destination
+ " record at close sequenceid=" +
3602 movedRegionInfoCache
.put(encodedName
, new MovedRegionInfo(destination
, closeSeqNum
));
3605 void removeFromMovedRegions(String encodedName
) {
3606 movedRegionInfoCache
.invalidate(encodedName
);
3609 @InterfaceAudience.Private
3610 public MovedRegionInfo
getMovedRegion(String encodedRegionName
) {
3611 return movedRegionInfoCache
.getIfPresent(encodedRegionName
);
3614 @InterfaceAudience.Private
3615 public int movedRegionCacheExpiredTime() {
3616 return TIMEOUT_REGION_MOVED
;
3619 private String
getMyEphemeralNodePath() {
3620 return ZNodePaths
.joinZNode(this.zooKeeper
.getZNodePaths().rsZNode
, getServerName().toString());
3623 private boolean isHealthCheckerConfigured() {
3624 String healthScriptLocation
= this.conf
.get(HConstants
.HEALTH_SCRIPT_LOC
);
3625 return org
.apache
.commons
.lang3
.StringUtils
.isNotBlank(healthScriptLocation
);
3629 * @return the underlying {@link CompactSplit} for the servers
3631 public CompactSplit
getCompactSplitThread() {
3632 return this.compactSplitThread
;
3635 CoprocessorServiceResponse
execRegionServerService(
3636 @SuppressWarnings("UnusedParameters") final RpcController controller
,
3637 final CoprocessorServiceRequest serviceRequest
) throws ServiceException
{
3639 ServerRpcController serviceController
= new ServerRpcController();
3640 CoprocessorServiceCall call
= serviceRequest
.getCall();
3641 String serviceName
= call
.getServiceName();
3642 Service service
= coprocessorServiceHandlers
.get(serviceName
);
3643 if (service
== null) {
3644 throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
3647 ServiceDescriptor serviceDesc
=
3648 service
.getDescriptorForType();
3650 String methodName
= call
.getMethodName();
3651 MethodDescriptor methodDesc
=
3652 serviceDesc
.findMethodByName(methodName
);
3653 if (methodDesc
== null) {
3654 throw new UnknownProtocolException(service
.getClass(), "Unknown method " + methodName
+
3655 " called on executorService " + serviceName
);
3659 CoprocessorRpcUtils
.getRequest(service
, methodDesc
, call
.getRequest());
3660 final Message
.Builder responseBuilder
=
3661 service
.getResponsePrototype(methodDesc
).newBuilderForType();
3662 service
.callMethod(methodDesc
, serviceController
, request
, message
-> {
3663 if (message
!= null) {
3664 responseBuilder
.mergeFrom(message
);
3667 IOException exception
= CoprocessorRpcUtils
.getControllerException(serviceController
);
3668 if (exception
!= null) {
3671 return CoprocessorRpcUtils
.getResponse(responseBuilder
.build(), HConstants
.EMPTY_BYTE_ARRAY
);
3672 } catch (IOException ie
) {
3673 throw new ServiceException(ie
);
3678 * May be null if this is a master which not carry table.
3680 * @return The block cache instance used by the regionserver.
3683 public Optional
<BlockCache
> getBlockCache() {
3684 return Optional
.ofNullable(this.blockCache
);
3688 * May be null if this is a master which not carry table.
3690 * @return The cache for mob files used by the regionserver.
3693 public Optional
<MobFileCache
> getMobFileCache() {
3694 return Optional
.ofNullable(this.mobFileCache
);
3698 public AccessChecker
getAccessChecker() {
3699 return rpcServices
.getAccessChecker();
3703 public ZKPermissionWatcher
getZKPermissionWatcher() {
3704 return rpcServices
.getZkPermissionWatcher();
3708 * @return : Returns the ConfigurationManager object for testing purposes.
3710 @InterfaceAudience.Private
3711 ConfigurationManager
getConfigurationManager() {
3712 return configurationManager
;
3716 * @return Return table descriptors implementation.
3719 public TableDescriptors
getTableDescriptors() {
3720 return this.tableDescriptors
;
3724 * Reload the configuration from disk.
3726 void updateConfiguration() {
3727 LOG
.info("Reloading the configuration from disk.");
3728 // Reload the configuration from disk.
3729 conf
.reloadConfiguration();
3730 configurationManager
.notifyAllObservers(conf
);
3733 CacheEvictionStats
clearRegionBlockCache(Region region
) {
3734 long evictedBlocks
= 0;
3736 for(Store store
: region
.getStores()) {
3737 for(StoreFile hFile
: store
.getStorefiles()) {
3738 evictedBlocks
+= blockCache
.evictBlocksByHfileName(hFile
.getPath().getName());
3742 return CacheEvictionStats
.builder()
3743 .withEvictedBlocks(evictedBlocks
)
3748 public double getCompactionPressure() {
3750 for (Region region
: onlineRegions
.values()) {
3751 for (Store store
: region
.getStores()) {
3752 double normCount
= store
.getCompactionPressure();
3753 if (normCount
> max
) {
3762 public HeapMemoryManager
getHeapMemoryManager() {
3766 public MemStoreFlusher
getMemStoreFlusher() {
3767 return cacheFlusher
;
3772 * @return whether all wal roll request finished for this regionserver
3774 @InterfaceAudience.Private
3775 public boolean walRollRequestFinished() {
3776 return this.walRoller
.walRollFinished();
3780 public ThroughputController
getFlushThroughputController() {
3781 return flushThroughputController
;
3785 public double getFlushPressure() {
3786 if (getRegionServerAccounting() == null || cacheFlusher
== null) {
3787 // return 0 during RS initialization
3790 return getRegionServerAccounting().getFlushPressure();
3794 public void onConfigurationChange(Configuration newConf
) {
3795 ThroughputController old
= this.flushThroughputController
;
3797 old
.stop("configuration change");
3799 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, newConf
);
3801 Superusers
.initialize(newConf
);
3802 } catch (IOException e
) {
3803 LOG
.warn("Failed to initialize SuperUsers on reloading of the configuration");
3808 public MetricsRegionServer
getMetrics() {
3809 return metricsRegionServer
;
3813 public SecureBulkLoadManager
getSecureBulkLoadManager() {
3814 return this.secureBulkLoadManager
;
3818 public EntityLock
regionLock(final List
<RegionInfo
> regionInfo
, final String description
,
3819 final Abortable abort
) {
3820 final LockServiceClient client
=
3821 new LockServiceClient(conf
, lockStub
, asyncClusterConnection
.getNonceGenerator());
3822 return client
.regionLock(regionInfo
, description
, abort
);
3826 public void unassign(byte[] regionName
) throws IOException
{
3827 FutureUtils
.get(asyncClusterConnection
.getAdmin().unassign(regionName
, false));
3831 public RegionServerSpaceQuotaManager
getRegionServerSpaceQuotaManager() {
3832 return this.rsSpaceQuotaManager
;
3836 public boolean reportFileArchivalForQuotas(TableName tableName
,
3837 Collection
<Entry
<String
, Long
>> archivedFiles
) {
3838 if (TEST_SKIP_REPORTING_TRANSITION
) {
3841 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
3842 if (rss
== null || rsSpaceQuotaManager
== null) {
3843 // the current server could be stopping.
3844 LOG
.trace("Skipping file archival reporting to HMaster as stub is null");
3848 RegionServerStatusProtos
.FileArchiveNotificationRequest request
=
3849 rsSpaceQuotaManager
.buildFileArchiveRequest(tableName
, archivedFiles
);
3850 rss
.reportFileArchival(null, request
);
3851 } catch (ServiceException se
) {
3852 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
3853 if (ioe
instanceof PleaseHoldException
) {
3854 if (LOG
.isTraceEnabled()) {
3855 LOG
.trace("Failed to report file archival(s) to Master because it is initializing."
3856 + " This will be retried.", ioe
);
3858 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3861 if (rssStub
== rss
) {
3864 // re-create the stub if we failed to report the archival
3865 createRegionServerStatusStub(true);
3866 LOG
.debug("Failed to report file archival(s) to Master. This will be retried.", ioe
);
3872 public NettyEventLoopGroupConfig
getEventLoopGroupConfig() {
3873 return eventLoopGroupConfig
;
3877 public Connection
createConnection(Configuration conf
) throws IOException
{
3878 User user
= UserProvider
.instantiate(conf
).getCurrent();
3879 return ConnectionFactory
.createConnection(conf
, null, user
);
3882 void executeProcedure(long procId
, RSProcedureCallable callable
) {
3883 executorService
.submit(new RSProcedureHandler(this, procId
, callable
));
3886 public void remoteProcedureComplete(long procId
, Throwable error
) {
3887 procedureResultReporter
.complete(procId
, error
);
3890 void reportProcedureDone(ReportProcedureDoneRequest request
) throws IOException
{
3891 RegionServerStatusService
.BlockingInterface rss
;
3892 // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3898 createRegionServerStatusStub();
3901 rss
.reportProcedureDone(null, request
);
3902 } catch (ServiceException se
) {
3903 if (rssStub
== rss
) {
3906 throw ProtobufUtil
.getRemoteException(se
);
3911 * Will ignore the open/close region procedures which already submitted or executed.
3913 * When master had unfinished open/close region procedure and restarted, new active master may
3914 * send duplicate open/close region request to regionserver. The open/close request is submitted
3915 * to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3917 * After the open/close region request executed and report region transition succeed, cache it in
3918 * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3919 * transition succeed, master will not send the open/close region request to regionserver again.
3920 * And we thought that the ongoing duplicate open/close region request should not be delayed more
3921 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
3923 * See HBASE-22404 for more details.
3925 * @param procId the id of the open/close region procedure
3926 * @return true if the procedure can be submitted.
3928 boolean submitRegionProcedure(long procId
) {
3932 // Ignore the region procedures which already submitted.
3933 Long previous
= submittedRegionProcedures
.putIfAbsent(procId
, procId
);
3934 if (previous
!= null) {
3935 LOG
.warn("Received procedure pid={}, which already submitted, just ignore it", procId
);
3938 // Ignore the region procedures which already executed.
3939 if (executedRegionProcedures
.getIfPresent(procId
) != null) {
3940 LOG
.warn("Received procedure pid={}, which already executed, just ignore it", procId
);
3947 * See {@link #submitRegionProcedure(long)}.
3948 * @param procId the id of the open/close region procedure
3950 public void finishRegionProcedure(long procId
) {
3951 executedRegionProcedures
.put(procId
, procId
);
3952 submittedRegionProcedures
.remove(procId
);
3955 public boolean isShutDown() {
3960 * Force to terminate region server when abort timeout.
3962 private static class SystemExitWhenAbortTimeout
extends TimerTask
{
3964 public SystemExitWhenAbortTimeout() {
3969 LOG
.warn("Aborting region server timed out, terminating forcibly" +
3970 " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3971 " Thread dump to stdout.");
3972 Threads
.printThreadInfo(System
.out
, "Zombie HRegionServer");
3973 Runtime
.getRuntime().halt(1);
3978 public AsyncClusterConnection
getAsyncClusterConnection() {
3979 return asyncClusterConnection
;
3982 @InterfaceAudience.Private
3983 public CompactedHFilesDischarger
getCompactedHFilesDischarger() {
3984 return compactedFileDischarger
;
3988 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3989 * @return pause time
3991 @InterfaceAudience.Private
3992 public long getRetryPauseTime() {
3993 return this.retryPauseTime
;