2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER
;
22 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_COORDINATED_BY_ZK
;
23 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_MAX_SPLITTER
;
24 import static org
.apache
.hadoop
.hbase
.util
.DNS
.UNSAFE_RS_HOSTNAME_KEY
;
26 import java
.io
.IOException
;
27 import java
.io
.PrintWriter
;
28 import java
.lang
.management
.MemoryUsage
;
29 import java
.lang
.reflect
.Constructor
;
30 import java
.net
.InetSocketAddress
;
31 import java
.time
.Duration
;
32 import java
.util
.ArrayList
;
33 import java
.util
.Collection
;
34 import java
.util
.Collections
;
35 import java
.util
.Comparator
;
36 import java
.util
.HashSet
;
37 import java
.util
.Iterator
;
38 import java
.util
.List
;
40 import java
.util
.Map
.Entry
;
41 import java
.util
.Objects
;
42 import java
.util
.Optional
;
44 import java
.util
.SortedMap
;
45 import java
.util
.Timer
;
46 import java
.util
.TimerTask
;
47 import java
.util
.TreeMap
;
48 import java
.util
.TreeSet
;
49 import java
.util
.concurrent
.ConcurrentHashMap
;
50 import java
.util
.concurrent
.ConcurrentMap
;
51 import java
.util
.concurrent
.ConcurrentSkipListMap
;
52 import java
.util
.concurrent
.ThreadLocalRandom
;
53 import java
.util
.concurrent
.TimeUnit
;
54 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
55 import java
.util
.concurrent
.locks
.ReentrantReadWriteLock
;
56 import java
.util
.stream
.Collectors
;
57 import javax
.management
.MalformedObjectNameException
;
58 import javax
.servlet
.http
.HttpServlet
;
59 import org
.apache
.commons
.lang3
.StringUtils
;
60 import org
.apache
.hadoop
.conf
.Configuration
;
61 import org
.apache
.hadoop
.fs
.FileSystem
;
62 import org
.apache
.hadoop
.fs
.Path
;
63 import org
.apache
.hadoop
.hbase
.Abortable
;
64 import org
.apache
.hadoop
.hbase
.CacheEvictionStats
;
65 import org
.apache
.hadoop
.hbase
.CallQueueTooBigException
;
66 import org
.apache
.hadoop
.hbase
.ClockOutOfSyncException
;
67 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
68 import org
.apache
.hadoop
.hbase
.ExecutorStatusChore
;
69 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
70 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
71 import org
.apache
.hadoop
.hbase
.HBaseServerBase
;
72 import org
.apache
.hadoop
.hbase
.HConstants
;
73 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
74 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
75 import org
.apache
.hadoop
.hbase
.HealthCheckChore
;
76 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
77 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
78 import org
.apache
.hadoop
.hbase
.PleaseHoldException
;
79 import org
.apache
.hadoop
.hbase
.ScheduledChore
;
80 import org
.apache
.hadoop
.hbase
.ServerName
;
81 import org
.apache
.hadoop
.hbase
.Stoppable
;
82 import org
.apache
.hadoop
.hbase
.TableName
;
83 import org
.apache
.hadoop
.hbase
.YouAreDeadException
;
84 import org
.apache
.hadoop
.hbase
.ZNodeClearer
;
85 import org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
;
86 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
87 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
88 import org
.apache
.hadoop
.hbase
.client
.locking
.EntityLock
;
89 import org
.apache
.hadoop
.hbase
.client
.locking
.LockServiceClient
;
90 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationManager
;
91 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
92 import org
.apache
.hadoop
.hbase
.exceptions
.RegionMovedException
;
93 import org
.apache
.hadoop
.hbase
.exceptions
.RegionOpeningException
;
94 import org
.apache
.hadoop
.hbase
.exceptions
.UnknownProtocolException
;
95 import org
.apache
.hadoop
.hbase
.executor
.ExecutorType
;
96 import org
.apache
.hadoop
.hbase
.http
.InfoServer
;
97 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
98 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCacheFactory
;
99 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
100 import org
.apache
.hadoop
.hbase
.io
.util
.MemorySizeUtil
;
101 import org
.apache
.hadoop
.hbase
.ipc
.CoprocessorRpcUtils
;
102 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
103 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
104 import org
.apache
.hadoop
.hbase
.ipc
.ServerNotRunningYetException
;
105 import org
.apache
.hadoop
.hbase
.ipc
.ServerRpcController
;
106 import org
.apache
.hadoop
.hbase
.log
.HBaseMarkers
;
107 import org
.apache
.hadoop
.hbase
.mob
.MobFileCache
;
108 import org
.apache
.hadoop
.hbase
.monitoring
.TaskMonitor
;
109 import org
.apache
.hadoop
.hbase
.namequeues
.NamedQueueRecorder
;
110 import org
.apache
.hadoop
.hbase
.namequeues
.SlowLogTableOpsChore
;
111 import org
.apache
.hadoop
.hbase
.net
.Address
;
112 import org
.apache
.hadoop
.hbase
.procedure
.RegionServerProcedureManagerHost
;
113 import org
.apache
.hadoop
.hbase
.procedure2
.RSProcedureCallable
;
114 import org
.apache
.hadoop
.hbase
.quotas
.FileSystemUtilizationChore
;
115 import org
.apache
.hadoop
.hbase
.quotas
.QuotaUtil
;
116 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerRpcQuotaManager
;
117 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerSpaceQuotaManager
;
118 import org
.apache
.hadoop
.hbase
.quotas
.RegionSize
;
119 import org
.apache
.hadoop
.hbase
.quotas
.RegionSizeStore
;
120 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionConfiguration
;
121 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionLifeCycleTracker
;
122 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionProgress
;
123 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequester
;
124 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseMetaHandler
;
125 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseRegionHandler
;
126 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.RSProcedureHandler
;
127 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.RegionReplicaFlushHandler
;
128 import org
.apache
.hadoop
.hbase
.regionserver
.http
.RSDumpServlet
;
129 import org
.apache
.hadoop
.hbase
.regionserver
.http
.RSStatusServlet
;
130 import org
.apache
.hadoop
.hbase
.regionserver
.regionreplication
.RegionReplicationBufferManager
;
131 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.FlushThroughputControllerFactory
;
132 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.ThroughputController
;
133 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationLoad
;
134 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationSourceInterface
;
135 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationStatus
;
136 import org
.apache
.hadoop
.hbase
.security
.SecurityConstants
;
137 import org
.apache
.hadoop
.hbase
.security
.Superusers
;
138 import org
.apache
.hadoop
.hbase
.security
.User
;
139 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
140 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
141 import org
.apache
.hadoop
.hbase
.util
.CompressionTest
;
142 import org
.apache
.hadoop
.hbase
.util
.CoprocessorConfigurationUtil
;
143 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
144 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
145 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
146 import org
.apache
.hadoop
.hbase
.util
.JvmPauseMonitor
;
147 import org
.apache
.hadoop
.hbase
.util
.Pair
;
148 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
;
149 import org
.apache
.hadoop
.hbase
.util
.RetryCounterFactory
;
150 import org
.apache
.hadoop
.hbase
.util
.ServerRegionReplicaUtil
;
151 import org
.apache
.hadoop
.hbase
.util
.Threads
;
152 import org
.apache
.hadoop
.hbase
.util
.VersionInfo
;
153 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
154 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
155 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
156 import org
.apache
.hadoop
.hbase
.zookeeper
.MasterAddressTracker
;
157 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKClusterId
;
158 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKNodeTracker
;
159 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
160 import org
.apache
.hadoop
.ipc
.RemoteException
;
161 import org
.apache
.hadoop
.util
.ReflectionUtils
;
162 import org
.apache
.yetus
.audience
.InterfaceAudience
;
163 import org
.apache
.zookeeper
.KeeperException
;
164 import org
.slf4j
.Logger
;
165 import org
.slf4j
.LoggerFactory
;
167 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
168 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
169 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.Cache
;
170 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.CacheBuilder
;
171 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
172 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.BlockingRpcChannel
;
173 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.MethodDescriptor
;
174 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.ServiceDescriptor
;
175 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
176 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
177 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Service
;
178 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
179 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.TextFormat
;
180 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.UnsafeByteOperations
;
182 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
183 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
184 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceCall
;
185 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceRequest
;
186 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceResponse
;
187 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
;
188 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionLoad
;
189 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionStoreSequenceIds
;
190 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.UserLoad
;
191 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
;
192 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
.Builder
;
193 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.NameStringPair
;
194 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionServerInfo
;
195 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
;
196 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
.RegionSpecifierType
;
197 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.LockServiceProtos
.LockService
;
198 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
;
199 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdRequest
;
200 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdResponse
;
201 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerReportRequest
;
202 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupRequest
;
203 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupResponse
;
204 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStatusService
;
205 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionSpaceUse
;
206 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionSpaceUseReportRequest
;
207 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
;
208 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
.TransitionCode
;
209 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportProcedureDoneRequest
;
210 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRSFatalErrorRequest
;
211 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionRequest
;
212 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionResponse
;
215 * HRegionServer makes a set of HRegions available to clients. It checks in with
216 * the HMaster. There are many HRegionServers in a single HBase deployment.
218 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
219 @SuppressWarnings({ "deprecation"})
220 public class HRegionServer
extends HBaseServerBase
<RSRpcServices
>
221 implements RegionServerServices
, LastSequenceId
{
223 private static final Logger LOG
= LoggerFactory
.getLogger(HRegionServer
.class);
226 * For testing only! Set to true to skip notifying region assignment to master .
228 @InterfaceAudience.Private
229 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="MS_SHOULD_BE_FINAL")
230 public static boolean TEST_SKIP_REPORTING_TRANSITION
= false;
233 * A map from RegionName to current action in progress. Boolean value indicates:
234 * true - if open region action in progress
235 * false - if close region action in progress
237 private final ConcurrentMap
<byte[], Boolean
> regionsInTransitionInRS
=
238 new ConcurrentSkipListMap
<>(Bytes
.BYTES_COMPARATOR
);
241 * Used to cache the open/close region procedures which already submitted.
242 * See {@link #submitRegionProcedure(long)}.
244 private final ConcurrentMap
<Long
, Long
> submittedRegionProcedures
= new ConcurrentHashMap
<>();
246 * Used to cache the open/close region procedures which already executed.
247 * See {@link #submitRegionProcedure(long)}.
249 private final Cache
<Long
, Long
> executedRegionProcedures
=
250 CacheBuilder
.newBuilder().expireAfterAccess(600, TimeUnit
.SECONDS
).build();
253 * Used to cache the moved-out regions
255 private final Cache
<String
, MovedRegionInfo
> movedRegionInfoCache
=
256 CacheBuilder
.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(),
257 TimeUnit
.MILLISECONDS
).build();
259 private MemStoreFlusher cacheFlusher
;
261 private HeapMemoryManager hMemManager
;
263 // Replication services. If no replication, this handler will be null.
264 private ReplicationSourceService replicationSourceHandler
;
265 private ReplicationSinkService replicationSinkHandler
;
266 private boolean sameReplicationSourceAndSink
;
269 private CompactSplit compactSplitThread
;
272 * Map of regions currently being served by this region server. Key is the
273 * encoded region name. All access should be synchronized.
275 private final Map
<String
, HRegion
> onlineRegions
= new ConcurrentHashMap
<>();
277 * Lock for gating access to {@link #onlineRegions}.
278 * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap?
280 private final ReentrantReadWriteLock onlineRegionsLock
= new ReentrantReadWriteLock();
283 * Map of encoded region names to the DataNode locations they should be hosted on
284 * We store the value as Address since InetSocketAddress is required by the HDFS
285 * API (create() that takes favored nodes as hints for placing file blocks).
286 * We could have used ServerName here as the value class, but we'd need to
287 * convert it to InetSocketAddress at some point before the HDFS API call, and
288 * it seems a bit weird to store ServerName since ServerName refers to RegionServers
289 * and here we really mean DataNode locations. We don't store it as InetSocketAddress
290 * here because the conversion on demand from Address to InetSocketAddress will
291 * guarantee the resolution results will be fresh when we need it.
293 private final Map
<String
, Address
[]> regionFavoredNodesMap
= new ConcurrentHashMap
<>();
295 private LeaseManager leaseManager
;
297 private volatile boolean dataFsOk
;
299 static final String ABORT_TIMEOUT
= "hbase.regionserver.abort.timeout";
300 // Default abort timeout is 1200 seconds for safe
301 private static final long DEFAULT_ABORT_TIMEOUT
= 1200000;
302 // Will run this task when abort timeout
303 static final String ABORT_TIMEOUT_TASK
= "hbase.regionserver.abort.timeout.task";
305 // A state before we go into stopped state. At this stage we're closing user
307 private boolean stopping
= false;
308 private volatile boolean killed
= false;
310 private final int threadWakeFrequency
;
312 private static final String PERIOD_COMPACTION
= "hbase.regionserver.compaction.check.period";
313 private final int compactionCheckFrequency
;
314 private static final String PERIOD_FLUSH
= "hbase.regionserver.flush.check.period";
315 private final int flushCheckFrequency
;
317 // Stub to do region server status calls against the master.
318 private volatile RegionServerStatusService
.BlockingInterface rssStub
;
319 private volatile LockService
.BlockingInterface lockStub
;
320 // RPC client. Used to make the stub above that does region server status checking.
321 private RpcClient rpcClient
;
323 private UncaughtExceptionHandler uncaughtExceptionHandler
;
325 private JvmPauseMonitor pauseMonitor
;
327 private RSSnapshotVerifier rsSnapshotVerifier
;
329 /** region server process name */
330 public static final String REGIONSERVER
= "regionserver";
333 private MetricsRegionServer metricsRegionServer
;
334 MetricsRegionServerWrapperImpl metricsRegionServerImpl
;
337 * Check for compactions requests.
339 private ScheduledChore compactionChecker
;
344 private ScheduledChore periodicFlusher
;
346 private volatile WALFactory walFactory
;
348 private LogRoller walRoller
;
350 // A thread which calls reportProcedureDone
351 private RemoteProcedureResultReporter procedureResultReporter
;
353 // flag set after we're done setting up server threads
354 final AtomicBoolean online
= new AtomicBoolean(false);
356 // master address tracker
357 private final MasterAddressTracker masterAddressTracker
;
359 // Log Splitting Worker
360 private SplitLogWorker splitLogWorker
;
362 private final int shortOperationTimeout
;
364 // Time to pause if master says 'please hold'
365 private final long retryPauseTime
;
367 private final RegionServerAccounting regionServerAccounting
;
369 private SlowLogTableOpsChore slowLogTableOpsChore
= null;
372 private BlockCache blockCache
;
373 // The cache for mob files
374 private MobFileCache mobFileCache
;
376 /** The health check chore. */
377 private HealthCheckChore healthCheckChore
;
379 /** The Executor status collect chore. */
380 private ExecutorStatusChore executorStatusChore
;
382 /** The nonce manager chore. */
383 private ScheduledChore nonceManagerChore
;
385 private Map
<String
, Service
> coprocessorServiceHandlers
= Maps
.newHashMap();
388 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
389 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
390 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
393 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
394 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
=
395 "hbase.regionserver.hostname.disable.master.reversedns";
398 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
399 * Exception will be thrown if both are used.
401 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
402 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
=
403 "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
406 * Unique identifier for the cluster we are a part of.
408 private String clusterId
;
410 // chore for refreshing store files for secondary regions
411 private StorefileRefresherChore storefileRefresher
;
413 private volatile RegionServerCoprocessorHost rsHost
;
415 private RegionServerProcedureManagerHost rspmHost
;
417 private RegionServerRpcQuotaManager rsQuotaManager
;
418 private RegionServerSpaceQuotaManager rsSpaceQuotaManager
;
421 * Nonce manager. Nonces are used to make operations like increment and append idempotent
422 * in the case where client doesn't receive the response from a successful operation and
423 * retries. We track the successful ops for some time via a nonce sent by client and handle
424 * duplicate operations (currently, by failing them; in future we might use MVCC to return
425 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
427 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
428 * of past records. If we don't read the records, we don't read and recover the nonces.
429 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
430 * - There's no WAL recovery during normal region move, so nonces will not be transfered.
431 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
432 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
433 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
434 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
435 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
436 * latest nonce in it expired. It can also be recovered during move.
438 final ServerNonceManager nonceManager
;
440 private BrokenStoreFileCleaner brokenStoreFileCleaner
;
442 @InterfaceAudience.Private
443 CompactedHFilesDischarger compactedFileDischarger
;
445 private volatile ThroughputController flushThroughputController
;
447 private SecureBulkLoadManager secureBulkLoadManager
;
449 private FileSystemUtilizationChore fsUtilizationChore
;
451 private BootstrapNodeManager bootstrapNodeManager
;
454 * True if this RegionServer is coming up in a cluster where there is no Master;
455 * means it needs to just come up and make do without a Master to talk to: e.g. in test or
456 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
457 * purpose is as a Replication-stream sink; see HBASE-18846 for more.
458 * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ?
460 private final boolean masterless
;
461 private static final String MASTERLESS_CONFIG_NAME
= "hbase.masterless";
463 /**regionserver codec list **/
464 private static final String REGIONSERVER_CODEC
= "hbase.regionserver.codecs";
466 // A timer to shutdown the process if abort takes too long
467 private Timer abortMonitor
;
469 private RegionReplicationBufferManager regionReplicationBufferManager
;
471 * Starts a HRegionServer at the default location.
473 * Don't start any services or managers in here in the Constructor.
474 * Defer till after we register with the Master as much as possible. See {@link #startServices}.
476 public HRegionServer(final Configuration conf
) throws IOException
{
477 super(conf
, "RegionServer"); // thread name
479 this.dataFsOk
= true;
480 this.masterless
= !clusterMode();
481 MemorySizeUtil
.checkForClusterFreeHeapMemoryLimit(this.conf
);
482 HFile
.checkHFileVersion(this.conf
);
483 checkCodecs(this.conf
);
484 FSUtils
.setupShortCircuitRead(this.conf
);
486 // Disable usage of meta replicas in the regionserver
487 this.conf
.setBoolean(HConstants
.USE_META_REPLICAS
, false);
489 this.threadWakeFrequency
= conf
.getInt(HConstants
.THREAD_WAKE_FREQUENCY
, 10 * 1000);
490 this.compactionCheckFrequency
= conf
.getInt(PERIOD_COMPACTION
, this.threadWakeFrequency
);
491 this.flushCheckFrequency
= conf
.getInt(PERIOD_FLUSH
, this.threadWakeFrequency
);
493 boolean isNoncesEnabled
= conf
.getBoolean(HConstants
.HBASE_RS_NONCES_ENABLED
, true);
494 this.nonceManager
= isNoncesEnabled ?
new ServerNonceManager(this.conf
) : null;
496 this.shortOperationTimeout
= conf
.getInt(HConstants
.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY
,
497 HConstants
.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT
);
499 this.retryPauseTime
= conf
.getLong(HConstants
.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME
,
500 HConstants
.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME
);
502 regionServerAccounting
= new RegionServerAccounting(conf
);
504 blockCache
= BlockCacheFactory
.createBlockCache(conf
);
505 mobFileCache
= new MobFileCache(conf
);
507 rsSnapshotVerifier
= new RSSnapshotVerifier(conf
);
509 uncaughtExceptionHandler
=
510 (t
, e
) -> abort("Uncaught exception in executorService thread " + t
.getName(), e
);
512 // If no master in cluster, skip trying to track one or look for a cluster status.
513 if (!this.masterless
) {
514 masterAddressTracker
= new MasterAddressTracker(getZooKeeper(), this);
515 masterAddressTracker
.start();
517 masterAddressTracker
= null;
519 this.rpcServices
.start(zooKeeper
);
520 } catch (Throwable t
) {
521 // Make sure we log the exception. HRegionServer is often started via reflection and the
522 // cause of failed startup is lost.
523 LOG
.error("Failed construction RegionServer", t
);
528 // HMaster should override this method to load the specific config for master
530 protected String
getUseThisHostnameInstead(Configuration conf
) throws IOException
{
531 String hostname
= conf
.get(UNSAFE_RS_HOSTNAME_KEY
);
532 if (conf
.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
, false)) {
533 if (!StringUtils
.isBlank(hostname
)) {
534 String msg
= UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
+ " and " +
535 UNSAFE_RS_HOSTNAME_KEY
+ " are mutually exclusive. Do not set " +
536 UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
+ " to true while " +
537 UNSAFE_RS_HOSTNAME_KEY
+ " is used";
538 throw new IOException(msg
);
540 return rpcServices
.getSocketAddress().getHostName();
548 protected void login(UserProvider user
, String host
) throws IOException
{
549 user
.login(SecurityConstants
.REGIONSERVER_KRB_KEYTAB_FILE
,
550 SecurityConstants
.REGIONSERVER_KRB_PRINCIPAL
, host
);
554 protected String
getProcessName() {
559 protected boolean canCreateBaseZNode() {
560 return !clusterMode();
564 protected boolean canUpdateTableDescriptor() {
569 protected boolean cacheTableDescriptor() {
573 protected RSRpcServices
createRpcServices() throws IOException
{
574 return new RSRpcServices(this);
578 protected void configureInfoServer(InfoServer infoServer
) {
579 infoServer
.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet
.class);
580 infoServer
.setAttribute(REGIONSERVER
, this);
584 protected Class
<?
extends HttpServlet
> getDumpServlet() {
585 return RSDumpServlet
.class;
589 * Used by {@link RSDumpServlet} to generate debugging information.
591 public void dumpRowLocks(final PrintWriter out
) {
592 StringBuilder sb
= new StringBuilder();
593 for (HRegion region
: getRegions()) {
594 if (region
.getLockedRows().size() > 0) {
595 for (HRegion
.RowLockContext rowLockContext
: region
.getLockedRows().values()) {
597 sb
.append(region
.getTableDescriptor().getTableName()).append(",")
598 .append(region
.getRegionInfo().getEncodedName()).append(",");
599 sb
.append(rowLockContext
.toString());
607 public boolean registerService(Service instance
) {
608 // No stacking of instances is allowed for a single executorService name
609 ServiceDescriptor serviceDesc
= instance
.getDescriptorForType();
610 String serviceName
= CoprocessorRpcUtils
.getServiceName(serviceDesc
);
611 if (coprocessorServiceHandlers
.containsKey(serviceName
)) {
612 LOG
.error("Coprocessor executorService " + serviceName
+
613 " already registered, rejecting request from " + instance
);
617 coprocessorServiceHandlers
.put(serviceName
, instance
);
618 if (LOG
.isDebugEnabled()) {
620 "Registered regionserver coprocessor executorService: executorService=" + serviceName
);
626 * Run test on configured codecs to make sure supporting libs are in place.
628 private static void checkCodecs(final Configuration c
) throws IOException
{
629 // check to see if the codec list is available:
630 String
[] codecs
= c
.getStrings(REGIONSERVER_CODEC
, (String
[])null);
631 if (codecs
== null) {
634 for (String codec
: codecs
) {
635 if (!CompressionTest
.testCompression(codec
)) {
636 throw new IOException("Compression codec " + codec
+
637 " not supported, aborting RS construction");
642 public String
getClusterId() {
643 return this.clusterId
;
647 * All initialization needed before we go register with Master.<br>
648 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
649 * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
651 private void preRegistrationInitialization() {
653 initializeZooKeeper();
654 setupClusterConnection();
655 bootstrapNodeManager
= new BootstrapNodeManager(asyncClusterConnection
, masterAddressTracker
);
656 regionReplicationBufferManager
= new RegionReplicationBufferManager(this);
657 // Setup RPC client for master communication
658 this.rpcClient
= asyncClusterConnection
.getRpcClient();
659 } catch (Throwable t
) {
660 // Call stop if error or process will stick around for ever since server
661 // puts up non-daemon threads.
662 this.rpcServices
.stop();
663 abort("Initialization of RS failed. Hence aborting RS.", t
);
668 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
669 * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
671 * Finally open long-living server short-circuit connection.
673 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
674 justification
="cluster Id znode read would give us correct response")
675 private void initializeZooKeeper() throws IOException
, InterruptedException
{
676 // Nothing to do in here if no Master in the mix.
677 if (this.masterless
) {
681 // Create the master address tracker, register with zk, and start it. Then
682 // block until a master is available. No point in starting up if no master
684 blockAndCheckIfStopped(this.masterAddressTracker
);
686 // Wait on cluster being up. Master will set this flag up in zookeeper
688 blockAndCheckIfStopped(this.clusterStatusTracker
);
690 // If we are HMaster then the cluster id should have already been set.
691 if (clusterId
== null) {
692 // Retrieve clusterId
693 // Since cluster status is now up
694 // ID should have already been set by HMaster
696 clusterId
= ZKClusterId
.readClusterIdZNode(this.zooKeeper
);
697 if (clusterId
== null) {
698 this.abort("Cluster ID has not been set");
700 LOG
.info("ClusterId : " + clusterId
);
701 } catch (KeeperException e
) {
702 this.abort("Failed to retrieve Cluster ID", e
);
706 if (isStopped() || isAborted()) {
707 return; // No need for further initialization
710 // watch for snapshots and other procedures
712 rspmHost
= new RegionServerProcedureManagerHost();
713 rspmHost
.loadProcedures(conf
);
714 rspmHost
.initialize(this);
715 } catch (KeeperException e
) {
716 this.abort("Failed to reach coordination cluster when creating procedure handler.", e
);
721 * Utilty method to wait indefinitely on a znode availability while checking
722 * if the region server is shut down
723 * @param tracker znode tracker to use
724 * @throws IOException any IO exception, plus if the RS is stopped
725 * @throws InterruptedException if the waiting thread is interrupted
727 private void blockAndCheckIfStopped(ZKNodeTracker tracker
)
728 throws IOException
, InterruptedException
{
729 while (tracker
.blockUntilAvailable(this.msgInterval
, false) == null) {
731 throw new IOException("Received the shutdown message while waiting.");
737 * @return True if the cluster is up.
740 public boolean isClusterUp() {
741 return this.masterless
||
742 (this.clusterStatusTracker
!= null && this.clusterStatusTracker
.isClusterUp());
746 * The HRegionServer sticks in this loop until closed.
751 LOG
.info("Skipping run; stopped");
755 // Do pre-registration initializations; zookeeper, lease threads, etc.
756 preRegistrationInitialization();
757 } catch (Throwable e
) {
758 abort("Fatal exception during initialization", e
);
762 if (!isStopped() && !isAborted()) {
763 ShutdownHook
.install(conf
, dataFs
, this, Thread
.currentThread());
764 // Initialize the RegionServerCoprocessorHost now that our ephemeral
765 // node was created, in case any coprocessors want to use ZooKeeper
766 this.rsHost
= new RegionServerCoprocessorHost(this, this.conf
);
768 // Try and register with the Master; tell it we are here. Break if server is stopped or
769 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
770 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
772 LOG
.debug("About to register with Master.");
773 RetryCounterFactory rcf
=
774 new RetryCounterFactory(Integer
.MAX_VALUE
, this.sleeper
.getPeriod(), 1000 * 60 * 5);
775 RetryCounter rc
= rcf
.create();
776 while (keepLooping()) {
777 RegionServerStartupResponse w
= reportForDuty();
779 long sleepTime
= rc
.getBackoffTimeAndIncrementAttempts();
780 LOG
.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime
);
781 this.sleeper
.sleep(sleepTime
);
783 handleReportForDutyResponse(w
);
789 if (!isStopped() && isHealthy()) {
790 // start the snapshot handler and other procedure handlers,
791 // since the server is ready to run
792 if (this.rspmHost
!= null) {
793 this.rspmHost
.start();
795 // Start the Quota Manager
796 if (this.rsQuotaManager
!= null) {
797 rsQuotaManager
.start(getRpcServer().getScheduler());
799 if (this.rsSpaceQuotaManager
!= null) {
800 this.rsSpaceQuotaManager
.start();
804 // We registered with the Master. Go into run mode.
805 long lastMsg
= EnvironmentEdgeManager
.currentTime();
806 long oldRequestCount
= -1;
807 // The main run loop.
808 while (!isStopped() && isHealthy()) {
809 if (!isClusterUp()) {
810 if (onlineRegions
.isEmpty()) {
811 stop("Exiting; cluster shutdown set and not carrying any regions");
812 } else if (!this.stopping
) {
813 this.stopping
= true;
814 LOG
.info("Closing user regions");
815 closeUserRegions(isAborted());
817 boolean allUserRegionsOffline
= areAllUserRegionsOffline();
818 if (allUserRegionsOffline
) {
819 // Set stopped if no more write requests tp meta tables
820 // since last time we went around the loop. Any open
821 // meta regions will be closed on our way out.
822 if (oldRequestCount
== getWriteRequestCount()) {
823 stop("Stopped; only catalog regions remaining online");
826 oldRequestCount
= getWriteRequestCount();
828 // Make sure all regions have been closed -- some regions may
829 // have not got it because we were splitting at the time of
830 // the call to closeUserRegions.
831 closeUserRegions(this.abortRequested
.get());
833 LOG
.debug("Waiting on " + getOnlineRegionsAsPrintableString());
836 long now
= EnvironmentEdgeManager
.currentTime();
837 if ((now
- lastMsg
) >= msgInterval
) {
838 tryRegionServerReport(lastMsg
, now
);
839 lastMsg
= EnvironmentEdgeManager
.currentTime();
841 if (!isStopped() && !isAborted()) {
842 this.sleeper
.sleep();
845 } catch (Throwable t
) {
846 if (!rpcServices
.checkOOME(t
)) {
847 String prefix
= t
instanceof YouAreDeadException?
"": "Unhandled: ";
848 abort(prefix
+ t
.getMessage(), t
);
852 if (this.leaseManager
!= null) {
853 this.leaseManager
.closeAfterLeasesExpire();
855 if (this.splitLogWorker
!= null) {
856 splitLogWorker
.stop();
859 // Send cache a shutdown.
860 if (blockCache
!= null) {
861 blockCache
.shutdown();
863 if (mobFileCache
!= null) {
864 mobFileCache
.shutdown();
867 // Send interrupts to wake up threads if sleeping so they notice shutdown.
868 // TODO: Should we check they are alive? If OOME could have exited already
869 if (this.hMemManager
!= null) {
870 this.hMemManager
.stop();
872 if (this.cacheFlusher
!= null) {
873 this.cacheFlusher
.interruptIfNecessary();
875 if (this.compactSplitThread
!= null) {
876 this.compactSplitThread
.interruptIfNecessary();
879 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
880 if (rspmHost
!= null) {
881 rspmHost
.stop(this.abortRequested
.get() || this.killed
);
885 // Just skip out w/o closing regions. Used when testing.
886 } else if (abortRequested
.get()) {
888 closeUserRegions(abortRequested
.get()); // Don't leave any open file handles
890 LOG
.info("aborting server " + this.serverName
);
892 closeUserRegions(abortRequested
.get());
893 LOG
.info("stopping server " + this.serverName
);
895 regionReplicationBufferManager
.stop();
896 closeClusterConnection();
897 // Closing the compactSplit thread before closing meta regions
898 if (!this.killed
&& containsMetaTableRegions()) {
899 if (!abortRequested
.get() || this.dataFsOk
) {
900 if (this.compactSplitThread
!= null) {
901 this.compactSplitThread
.join();
902 this.compactSplitThread
= null;
904 closeMetaTableRegions(abortRequested
.get());
908 if (!this.killed
&& this.dataFsOk
) {
909 waitOnAllRegionsToClose(abortRequested
.get());
910 LOG
.info("stopping server " + this.serverName
+ "; all regions closed.");
913 // Stop the quota manager
914 if (rsQuotaManager
!= null) {
915 rsQuotaManager
.stop();
917 if (rsSpaceQuotaManager
!= null) {
918 rsSpaceQuotaManager
.stop();
919 rsSpaceQuotaManager
= null;
922 // flag may be changed when closing regions throws exception.
924 shutdownWAL(!abortRequested
.get());
927 // Make sure the proxy is down.
928 if (this.rssStub
!= null) {
931 if (this.lockStub
!= null) {
932 this.lockStub
= null;
934 if (this.rpcClient
!= null) {
935 this.rpcClient
.close();
937 if (this.leaseManager
!= null) {
938 this.leaseManager
.close();
940 if (this.pauseMonitor
!= null) {
941 this.pauseMonitor
.stop();
945 stopServiceThreads();
948 if (this.rpcServices
!= null) {
949 this.rpcServices
.stop();
953 deleteMyEphemeralNode();
954 } catch (KeeperException
.NoNodeException nn
) {
956 } catch (KeeperException e
) {
957 LOG
.warn("Failed deleting my ephemeral node", e
);
959 // We may have failed to delete the znode at the previous step, but
960 // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
961 ZNodeClearer
.deleteMyEphemeralNodeOnDisk();
964 LOG
.info("Exiting; stopping=" + this.serverName
+ "; zookeeper connection closed.");
967 private boolean containsMetaTableRegions() {
968 return onlineRegions
.containsKey(RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
971 private boolean areAllUserRegionsOffline() {
972 if (getNumberOfOnlineRegions() > 2) {
975 boolean allUserRegionsOffline
= true;
976 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
977 if (!e
.getValue().getRegionInfo().isMetaRegion()) {
978 allUserRegionsOffline
= false;
982 return allUserRegionsOffline
;
986 * @return Current write count for all online regions.
988 private long getWriteRequestCount() {
990 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
991 writeCount
+= e
.getValue().getWriteRequestsCount();
996 @InterfaceAudience.Private
997 protected void tryRegionServerReport(long reportStartTime
, long reportEndTime
)
999 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
1001 // the current server could be stopping.
1004 ClusterStatusProtos
.ServerLoad sl
= buildServerLoad(reportStartTime
, reportEndTime
);
1006 RegionServerReportRequest
.Builder request
= RegionServerReportRequest
.newBuilder();
1007 request
.setServer(ProtobufUtil
.toServerName(this.serverName
));
1008 request
.setLoad(sl
);
1009 rss
.regionServerReport(null, request
.build());
1010 } catch (ServiceException se
) {
1011 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
1012 if (ioe
instanceof YouAreDeadException
) {
1013 // This will be caught and handled as a fatal error in run()
1016 if (rssStub
== rss
) {
1019 // Couldn't connect to the master, get location from zk and reconnect
1020 // Method blocks until new master is found or we are stopped
1021 createRegionServerStatusStub(true);
1026 * Reports the given map of Regions and their size on the filesystem to the active Master.
1028 * @param regionSizeStore The store containing region sizes
1029 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1031 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore
) {
1032 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
1034 // the current server could be stopping.
1035 LOG
.trace("Skipping Region size report to HMaster as stub is null");
1039 buildReportAndSend(rss
, regionSizeStore
);
1040 } catch (ServiceException se
) {
1041 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
1042 if (ioe
instanceof PleaseHoldException
) {
1043 LOG
.trace("Failed to report region sizes to Master because it is initializing."
1044 + " This will be retried.", ioe
);
1045 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1048 if (rssStub
== rss
) {
1051 createRegionServerStatusStub(true);
1052 if (ioe
instanceof DoNotRetryIOException
) {
1053 DoNotRetryIOException doNotRetryEx
= (DoNotRetryIOException
) ioe
;
1054 if (doNotRetryEx
.getCause() != null) {
1055 Throwable t
= doNotRetryEx
.getCause();
1056 if (t
instanceof UnsupportedOperationException
) {
1057 LOG
.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1062 LOG
.debug("Failed to report region sizes to Master. This will be retried.", ioe
);
1068 * Builds the region size report and sends it to the master. Upon successful sending of the
1069 * report, the region sizes that were sent are marked as sent.
1071 * @param rss The stub to send to the Master
1072 * @param regionSizeStore The store containing region sizes
1074 private void buildReportAndSend(RegionServerStatusService
.BlockingInterface rss
,
1075 RegionSizeStore regionSizeStore
) throws ServiceException
{
1076 RegionSpaceUseReportRequest request
=
1077 buildRegionSpaceUseReportRequest(Objects
.requireNonNull(regionSizeStore
));
1078 rss
.reportRegionSpaceUse(null, request
);
1079 // Record the number of size reports sent
1080 if (metricsRegionServer
!= null) {
1081 metricsRegionServer
.incrementNumRegionSizeReportsSent(regionSizeStore
.size());
1086 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1088 * @param regionSizes The size in bytes of regions
1089 * @return The corresponding protocol buffer message.
1091 RegionSpaceUseReportRequest
buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes
) {
1092 RegionSpaceUseReportRequest
.Builder request
= RegionSpaceUseReportRequest
.newBuilder();
1093 for (Entry
<RegionInfo
, RegionSize
> entry
: regionSizes
) {
1094 request
.addSpaceUse(convertRegionSize(entry
.getKey(), entry
.getValue().getSize()));
1096 return request
.build();
1100 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1103 * @param regionInfo The RegionInfo
1104 * @param sizeInBytes The size in bytes of the Region
1105 * @return The protocol buffer
1107 RegionSpaceUse
convertRegionSize(RegionInfo regionInfo
, Long sizeInBytes
) {
1108 return RegionSpaceUse
.newBuilder()
1109 .setRegionInfo(ProtobufUtil
.toRegionInfo(Objects
.requireNonNull(regionInfo
)))
1110 .setRegionSize(Objects
.requireNonNull(sizeInBytes
))
1114 private ClusterStatusProtos
.ServerLoad
buildServerLoad(long reportStartTime
, long reportEndTime
)
1115 throws IOException
{
1116 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1117 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1118 // the wrapper to compute those numbers in one place.
1119 // In the long term most of these should be moved off of ServerLoad and the heart beat.
1120 // Instead they should be stored in an HBase table so that external visibility into HBase is
1121 // improved; Additionally the load balancer will be able to take advantage of a more complete
1123 MetricsRegionServerWrapper regionServerWrapper
= metricsRegionServer
.getRegionServerWrapper();
1124 Collection
<HRegion
> regions
= getOnlineRegionsLocalContext();
1125 long usedMemory
= -1L;
1126 long maxMemory
= -1L;
1127 final MemoryUsage usage
= MemorySizeUtil
.safeGetHeapMemoryUsage();
1128 if (usage
!= null) {
1129 usedMemory
= usage
.getUsed();
1130 maxMemory
= usage
.getMax();
1133 ClusterStatusProtos
.ServerLoad
.Builder serverLoad
= ClusterStatusProtos
.ServerLoad
.newBuilder();
1134 serverLoad
.setNumberOfRequests((int) regionServerWrapper
.getRequestsPerSecond());
1135 serverLoad
.setTotalNumberOfRequests(regionServerWrapper
.getTotalRequestCount());
1136 serverLoad
.setUsedHeapMB((int)(usedMemory
/ 1024 / 1024));
1137 serverLoad
.setMaxHeapMB((int) (maxMemory
/ 1024 / 1024));
1138 serverLoad
.setReadRequestsCount(this.metricsRegionServerImpl
.getReadRequestsCount());
1139 serverLoad
.setWriteRequestsCount(this.metricsRegionServerImpl
.getWriteRequestsCount());
1140 Set
<String
> coprocessors
= getWAL(null).getCoprocessorHost().getCoprocessors();
1141 Builder coprocessorBuilder
= Coprocessor
.newBuilder();
1142 for (String coprocessor
: coprocessors
) {
1143 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1145 RegionLoad
.Builder regionLoadBldr
= RegionLoad
.newBuilder();
1146 RegionSpecifier
.Builder regionSpecifier
= RegionSpecifier
.newBuilder();
1147 for (HRegion region
: regions
) {
1148 if (region
.getCoprocessorHost() != null) {
1149 Set
<String
> regionCoprocessors
= region
.getCoprocessorHost().getCoprocessors();
1150 for (String regionCoprocessor
: regionCoprocessors
) {
1151 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(regionCoprocessor
).build());
1154 serverLoad
.addRegionLoads(createRegionLoad(region
, regionLoadBldr
, regionSpecifier
));
1155 for (String coprocessor
: getWAL(region
.getRegionInfo()).getCoprocessorHost()
1156 .getCoprocessors()) {
1157 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1160 serverLoad
.setReportStartTime(reportStartTime
);
1161 serverLoad
.setReportEndTime(reportEndTime
);
1162 if (this.infoServer
!= null) {
1163 serverLoad
.setInfoServerPort(this.infoServer
.getPort());
1165 serverLoad
.setInfoServerPort(-1);
1167 MetricsUserAggregateSource userSource
=
1168 metricsRegionServer
.getMetricsUserAggregate().getSource();
1169 if (userSource
!= null) {
1170 Map
<String
, MetricsUserSource
> userMetricMap
= userSource
.getUserSources();
1171 for (Entry
<String
, MetricsUserSource
> entry
: userMetricMap
.entrySet()) {
1172 serverLoad
.addUserLoads(createUserLoad(entry
.getKey(), entry
.getValue()));
1176 if (sameReplicationSourceAndSink
&& replicationSourceHandler
!= null) {
1177 // always refresh first to get the latest value
1178 ReplicationLoad rLoad
= replicationSourceHandler
.refreshAndGetReplicationLoad();
1179 if (rLoad
!= null) {
1180 serverLoad
.setReplLoadSink(rLoad
.getReplicationLoadSink());
1181 for (ClusterStatusProtos
.ReplicationLoadSource rLS
: rLoad
1182 .getReplicationLoadSourceEntries()) {
1183 serverLoad
.addReplLoadSource(rLS
);
1187 if (replicationSourceHandler
!= null) {
1188 ReplicationLoad rLoad
= replicationSourceHandler
.refreshAndGetReplicationLoad();
1189 if (rLoad
!= null) {
1190 for (ClusterStatusProtos
.ReplicationLoadSource rLS
: rLoad
1191 .getReplicationLoadSourceEntries()) {
1192 serverLoad
.addReplLoadSource(rLS
);
1196 if (replicationSinkHandler
!= null) {
1197 ReplicationLoad rLoad
= replicationSinkHandler
.refreshAndGetReplicationLoad();
1198 if (rLoad
!= null) {
1199 serverLoad
.setReplLoadSink(rLoad
.getReplicationLoadSink());
1204 TaskMonitor
.get().getTasks().forEach(task
->
1205 serverLoad
.addTasks(ClusterStatusProtos
.ServerTask
.newBuilder()
1206 .setDescription(task
.getDescription())
1207 .setStatus(task
.getStatus() != null ? task
.getStatus() : "")
1208 .setState(ClusterStatusProtos
.ServerTask
.State
.valueOf(task
.getState().name()))
1209 .setStartTime(task
.getStartTime())
1210 .setCompletionTime(task
.getCompletionTimestamp())
1213 return serverLoad
.build();
1216 private String
getOnlineRegionsAsPrintableString() {
1217 StringBuilder sb
= new StringBuilder();
1218 for (Region r
: this.onlineRegions
.values()) {
1219 if (sb
.length() > 0) {
1222 sb
.append(r
.getRegionInfo().getEncodedName());
1224 return sb
.toString();
1228 * Wait on regions close.
1230 private void waitOnAllRegionsToClose(final boolean abort
) {
1231 // Wait till all regions are closed before going out.
1233 long previousLogTime
= 0;
1234 Set
<String
> closedRegions
= new HashSet
<>();
1235 boolean interrupted
= false;
1237 while (!onlineRegions
.isEmpty()) {
1238 int count
= getNumberOfOnlineRegions();
1239 // Only print a message if the count of regions has changed.
1240 if (count
!= lastCount
) {
1241 // Log every second at most
1242 if (EnvironmentEdgeManager
.currentTime() > (previousLogTime
+ 1000)) {
1243 previousLogTime
= EnvironmentEdgeManager
.currentTime();
1245 LOG
.info("Waiting on " + count
+ " regions to close");
1246 // Only print out regions still closing if a small number else will
1248 if (count
< 10 && LOG
.isDebugEnabled()) {
1249 LOG
.debug("Online Regions=" + this.onlineRegions
);
1253 // Ensure all user regions have been sent a close. Use this to
1254 // protect against the case where an open comes in after we start the
1255 // iterator of onlineRegions to close all user regions.
1256 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1257 RegionInfo hri
= e
.getValue().getRegionInfo();
1258 if (!this.regionsInTransitionInRS
.containsKey(hri
.getEncodedNameAsBytes()) &&
1259 !closedRegions
.contains(hri
.getEncodedName())) {
1260 closedRegions
.add(hri
.getEncodedName());
1261 // Don't update zk with this close transition; pass false.
1262 closeRegionIgnoreErrors(hri
, abort
);
1265 // No regions in RIT, we could stop waiting now.
1266 if (this.regionsInTransitionInRS
.isEmpty()) {
1267 if (!onlineRegions
.isEmpty()) {
1268 LOG
.info("We were exiting though online regions are not empty," +
1269 " because some regions failed closing");
1273 LOG
.debug("Waiting on {}", this.regionsInTransitionInRS
.keySet().stream().
1274 map(e
-> Bytes
.toString(e
)).collect(Collectors
.joining(", ")));
1276 if (sleepInterrupted(200)) {
1282 Thread
.currentThread().interrupt();
1287 private static boolean sleepInterrupted(long millis
) {
1288 boolean interrupted
= false;
1290 Thread
.sleep(millis
);
1291 } catch (InterruptedException e
) {
1292 LOG
.warn("Interrupted while sleeping");
1298 private void shutdownWAL(final boolean close
) {
1299 if (this.walFactory
!= null) {
1304 walFactory
.shutdown();
1306 } catch (Throwable e
) {
1307 e
= e
instanceof RemoteException ?
((RemoteException
) e
).unwrapRemoteException() : e
;
1308 LOG
.error("Shutdown / close of WAL failed: " + e
);
1309 LOG
.debug("Shutdown / close exception details:", e
);
1315 * Run init. Sets up wal and starts up all server threads.
1317 * @param c Extra configuration.
1319 protected void handleReportForDutyResponse(final RegionServerStartupResponse c
)
1320 throws IOException
{
1322 boolean updateRootDir
= false;
1323 for (NameStringPair e
: c
.getMapEntriesList()) {
1324 String key
= e
.getName();
1325 // The hostname the master sees us as.
1326 if (key
.equals(HConstants
.KEY_FOR_HOSTNAME_SEEN_BY_MASTER
)) {
1327 String hostnameFromMasterPOV
= e
.getValue();
1328 this.serverName
= ServerName
.valueOf(hostnameFromMasterPOV
,
1329 rpcServices
.getSocketAddress().getPort(), this.startcode
);
1330 if (!StringUtils
.isBlank(useThisHostnameInstead
) &&
1331 !hostnameFromMasterPOV
.equals(useThisHostnameInstead
)) {
1332 String msg
= "Master passed us a different hostname to use; was=" +
1333 this.useThisHostnameInstead
+ ", but now=" + hostnameFromMasterPOV
;
1335 throw new IOException(msg
);
1337 if (StringUtils
.isBlank(useThisHostnameInstead
) &&
1338 !hostnameFromMasterPOV
.equals(rpcServices
.getSocketAddress().getHostName())) {
1339 String msg
= "Master passed us a different hostname to use; was=" +
1340 rpcServices
.getSocketAddress().getHostName() + ", but now=" + hostnameFromMasterPOV
;
1346 String value
= e
.getValue();
1347 if (key
.equals(HConstants
.HBASE_DIR
)) {
1348 if (value
!= null && !value
.equals(conf
.get(HConstants
.HBASE_DIR
))) {
1349 updateRootDir
= true;
1353 if (LOG
.isDebugEnabled()) {
1354 LOG
.debug("Config from master: " + key
+ "=" + value
);
1356 this.conf
.set(key
, value
);
1358 // Set our ephemeral znode up in zookeeper now we have a name.
1359 createMyEphemeralNode();
1361 if (updateRootDir
) {
1362 // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1363 initializeFileSystem();
1366 // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1367 // config param for task trackers, but we can piggyback off of it.
1368 if (this.conf
.get("mapreduce.task.attempt.id") == null) {
1369 this.conf
.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName
.toString());
1372 // Save it in a file, this will allow to see if we crash
1373 ZNodeClearer
.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1375 // This call sets up an initialized replication and WAL. Later we start it up.
1376 setupWALAndReplication();
1377 // Init in here rather than in constructor after thread name has been set
1378 final MetricsTable metricsTable
=
1379 new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1380 this.metricsRegionServerImpl
= new MetricsRegionServerWrapperImpl(this);
1381 this.metricsRegionServer
= new MetricsRegionServer(
1382 metricsRegionServerImpl
, conf
, metricsTable
);
1383 // Now that we have a metrics source, start the pause monitor
1384 this.pauseMonitor
= new JvmPauseMonitor(conf
, getMetrics().getMetricsSource());
1385 pauseMonitor
.start();
1387 // There is a rare case where we do NOT want services to start. Check config.
1388 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1391 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1392 // or make sense of it.
1393 startReplicationService();
1396 LOG
.info("Serving as " + this.serverName
+ ", RpcServer on " +
1397 rpcServices
.getSocketAddress() + ", sessionid=0x" +
1398 Long
.toHexString(this.zooKeeper
.getRecoverableZooKeeper().getSessionId()));
1400 // Wake up anyone waiting for this server to online
1401 synchronized (online
) {
1405 } catch (Throwable e
) {
1406 stop("Failed initialization");
1407 throw convertThrowableToIOE(cleanup(e
, "Failed init"),
1408 "Region server startup failed");
1410 sleeper
.skipSleepCycle();
1414 private void startHeapMemoryManager() {
1415 if (this.blockCache
!= null) {
1417 new HeapMemoryManager(this.blockCache
, this.cacheFlusher
, this, regionServerAccounting
);
1418 this.hMemManager
.start(getChoreService());
1422 private void createMyEphemeralNode() throws KeeperException
{
1423 RegionServerInfo
.Builder rsInfo
= RegionServerInfo
.newBuilder();
1424 rsInfo
.setInfoPort(infoServer
!= null ? infoServer
.getPort() : -1);
1425 rsInfo
.setVersionInfo(ProtobufUtil
.getVersionInfo());
1426 byte[] data
= ProtobufUtil
.prependPBMagic(rsInfo
.build().toByteArray());
1427 ZKUtil
.createEphemeralNodeAndWatch(this.zooKeeper
, getMyEphemeralNodePath(), data
);
1430 private void deleteMyEphemeralNode() throws KeeperException
{
1431 ZKUtil
.deleteNode(this.zooKeeper
, getMyEphemeralNodePath());
1435 public RegionServerAccounting
getRegionServerAccounting() {
1436 return regionServerAccounting
;
1439 // Round the size with KB or MB.
1440 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1441 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1442 // HBASE-26340 for more details on why this is important.
1443 private static int roundSize(long sizeInByte
, int sizeUnit
) {
1444 if (sizeInByte
== 0) {
1446 } else if (sizeInByte
< sizeUnit
) {
1449 return (int) Math
.min(sizeInByte
/ sizeUnit
, Integer
.MAX_VALUE
);
1454 * @param r Region to get RegionLoad for.
1455 * @param regionLoadBldr the RegionLoad.Builder, can be null
1456 * @param regionSpecifier the RegionSpecifier.Builder, can be null
1457 * @return RegionLoad instance.
1459 RegionLoad
createRegionLoad(final HRegion r
, RegionLoad
.Builder regionLoadBldr
,
1460 RegionSpecifier
.Builder regionSpecifier
) throws IOException
{
1461 byte[] name
= r
.getRegionInfo().getRegionName();
1464 int storeRefCount
= 0;
1465 int maxCompactedStoreFileRefCount
= 0;
1466 long storeUncompressedSize
= 0L;
1467 long storefileSize
= 0L;
1468 long storefileIndexSize
= 0L;
1469 long rootLevelIndexSize
= 0L;
1470 long totalStaticIndexSize
= 0L;
1471 long totalStaticBloomSize
= 0L;
1472 long totalCompactingKVs
= 0L;
1473 long currentCompactedKVs
= 0L;
1474 List
<HStore
> storeList
= r
.getStores();
1475 stores
+= storeList
.size();
1476 for (HStore store
: storeList
) {
1477 storefiles
+= store
.getStorefilesCount();
1478 int currentStoreRefCount
= store
.getStoreRefCount();
1479 storeRefCount
+= currentStoreRefCount
;
1480 int currentMaxCompactedStoreFileRefCount
= store
.getMaxCompactedStoreFileRefCount();
1481 maxCompactedStoreFileRefCount
= Math
.max(maxCompactedStoreFileRefCount
,
1482 currentMaxCompactedStoreFileRefCount
);
1483 storeUncompressedSize
+= store
.getStoreSizeUncompressed();
1484 storefileSize
+= store
.getStorefilesSize();
1485 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1486 storefileIndexSize
+= store
.getStorefilesRootLevelIndexSize();
1487 CompactionProgress progress
= store
.getCompactionProgress();
1488 if (progress
!= null) {
1489 totalCompactingKVs
+= progress
.getTotalCompactingKVs();
1490 currentCompactedKVs
+= progress
.currentCompactedKVs
;
1492 rootLevelIndexSize
+= store
.getStorefilesRootLevelIndexSize();
1493 totalStaticIndexSize
+= store
.getTotalStaticIndexSize();
1494 totalStaticBloomSize
+= store
.getTotalStaticBloomSize();
1497 int unitMB
= 1024 * 1024;
1500 int memstoreSizeMB
= roundSize(r
.getMemStoreDataSize(), unitMB
);
1501 int storeUncompressedSizeMB
= roundSize(storeUncompressedSize
, unitMB
);
1502 int storefileSizeMB
= roundSize(storefileSize
, unitMB
);
1503 int storefileIndexSizeKB
= roundSize(storefileIndexSize
, unitKB
);
1504 int rootLevelIndexSizeKB
= roundSize(rootLevelIndexSize
, unitKB
);
1505 int totalStaticIndexSizeKB
= roundSize(totalStaticIndexSize
, unitKB
);
1506 int totalStaticBloomSizeKB
= roundSize(totalStaticBloomSize
, unitKB
);
1508 HDFSBlocksDistribution hdfsBd
= r
.getHDFSBlocksDistribution();
1509 float dataLocality
= hdfsBd
.getBlockLocalityIndex(serverName
.getHostname());
1510 float dataLocalityForSsd
= hdfsBd
.getBlockLocalityIndexForSsd(serverName
.getHostname());
1511 long blocksTotalWeight
= hdfsBd
.getUniqueBlocksTotalWeight();
1512 long blocksLocalWeight
= hdfsBd
.getBlocksLocalWeight(serverName
.getHostname());
1513 long blocksLocalWithSsdWeight
= hdfsBd
.getBlocksLocalWithSsdWeight(serverName
.getHostname());
1514 if (regionLoadBldr
== null) {
1515 regionLoadBldr
= RegionLoad
.newBuilder();
1517 if (regionSpecifier
== null) {
1518 regionSpecifier
= RegionSpecifier
.newBuilder();
1521 regionSpecifier
.setType(RegionSpecifierType
.REGION_NAME
);
1522 regionSpecifier
.setValue(UnsafeByteOperations
.unsafeWrap(name
));
1523 regionLoadBldr
.setRegionSpecifier(regionSpecifier
.build())
1525 .setStorefiles(storefiles
)
1526 .setStoreRefCount(storeRefCount
)
1527 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount
)
1528 .setStoreUncompressedSizeMB(storeUncompressedSizeMB
)
1529 .setStorefileSizeMB(storefileSizeMB
)
1530 .setMemStoreSizeMB(memstoreSizeMB
)
1531 .setStorefileIndexSizeKB(storefileIndexSizeKB
)
1532 .setRootIndexSizeKB(rootLevelIndexSizeKB
)
1533 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB
)
1534 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB
)
1535 .setReadRequestsCount(r
.getReadRequestsCount())
1536 .setCpRequestsCount(r
.getCpRequestsCount())
1537 .setFilteredReadRequestsCount(r
.getFilteredReadRequestsCount())
1538 .setWriteRequestsCount(r
.getWriteRequestsCount())
1539 .setTotalCompactingKVs(totalCompactingKVs
)
1540 .setCurrentCompactedKVs(currentCompactedKVs
)
1541 .setDataLocality(dataLocality
)
1542 .setDataLocalityForSsd(dataLocalityForSsd
)
1543 .setBlocksLocalWeight(blocksLocalWeight
)
1544 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight
)
1545 .setBlocksTotalWeight(blocksTotalWeight
)
1546 .setCompactionState(ProtobufUtil
.createCompactionStateForRegionLoad(r
.getCompactionState()))
1547 .setLastMajorCompactionTs(r
.getOldestHfileTs(true));
1548 r
.setCompleteSequenceId(regionLoadBldr
);
1549 return regionLoadBldr
.build();
1552 private UserLoad
createUserLoad(String user
, MetricsUserSource userSource
) {
1553 UserLoad
.Builder userLoadBldr
= UserLoad
.newBuilder();
1554 userLoadBldr
.setUserName(user
);
1555 userSource
.getClientMetrics().values().stream().map(
1556 clientMetrics
-> ClusterStatusProtos
.ClientMetrics
.newBuilder()
1557 .setHostName(clientMetrics
.getHostName())
1558 .setWriteRequestsCount(clientMetrics
.getWriteRequestsCount())
1559 .setFilteredRequestsCount(clientMetrics
.getFilteredReadRequests())
1560 .setReadRequestsCount(clientMetrics
.getReadRequestsCount()).build())
1561 .forEach(userLoadBldr
::addClientMetrics
);
1562 return userLoadBldr
.build();
1565 public RegionLoad
createRegionLoad(final String encodedRegionName
) throws IOException
{
1566 HRegion r
= onlineRegions
.get(encodedRegionName
);
1567 return r
!= null ?
createRegionLoad(r
, null, null) : null;
1571 * Inner class that runs on a long period checking if regions need compaction.
1573 private static class CompactionChecker
extends ScheduledChore
{
1574 private final HRegionServer instance
;
1575 private final int majorCompactPriority
;
1576 private final static int DEFAULT_PRIORITY
= Integer
.MAX_VALUE
;
1577 //Iteration is 1-based rather than 0-based so we don't check for compaction
1578 // immediately upon region server startup
1579 private long iteration
= 1;
1581 CompactionChecker(final HRegionServer h
, final int sleepTime
, final Stoppable stopper
) {
1582 super("CompactionChecker", stopper
, sleepTime
);
1584 LOG
.info(this.getName() + " runs every " + Duration
.ofMillis(sleepTime
));
1586 /* MajorCompactPriority is configurable.
1587 * If not set, the compaction will use default priority.
1589 this.majorCompactPriority
= this.instance
.conf
.
1590 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1595 protected void chore() {
1596 for (Region r
: this.instance
.onlineRegions
.values()) {
1597 // Skip compaction if region is read only
1598 if (r
== null || r
.isReadOnly()) {
1602 HRegion hr
= (HRegion
) r
;
1603 for (HStore s
: hr
.stores
.values()) {
1605 long multiplier
= s
.getCompactionCheckMultiplier();
1606 assert multiplier
> 0;
1607 if (iteration
% multiplier
!= 0) {
1610 if (s
.needsCompaction()) {
1611 // Queue a compaction. Will recognize if major is needed.
1612 this.instance
.compactSplitThread
.requestSystemCompaction(hr
, s
,
1613 getName() + " requests compaction");
1614 } else if (s
.shouldPerformMajorCompaction()) {
1615 s
.triggerMajorCompaction();
1616 if (majorCompactPriority
== DEFAULT_PRIORITY
||
1617 majorCompactPriority
> hr
.getCompactPriority()) {
1618 this.instance
.compactSplitThread
.requestCompaction(hr
, s
,
1619 getName() + " requests major compaction; use default priority",
1621 CompactionLifeCycleTracker
.DUMMY
, null);
1623 this.instance
.compactSplitThread
.requestCompaction(hr
, s
,
1624 getName() + " requests major compaction; use configured priority",
1625 this.majorCompactPriority
, CompactionLifeCycleTracker
.DUMMY
, null);
1628 } catch (IOException e
) {
1629 LOG
.warn("Failed major compaction check on " + r
, e
);
1633 iteration
= (iteration
== Long
.MAX_VALUE
) ?
0 : (iteration
+ 1);
1637 private static class PeriodicMemStoreFlusher
extends ScheduledChore
{
1638 private final HRegionServer server
;
1639 private final static int RANGE_OF_DELAY
= 5 * 60; // 5 min in seconds
1640 private final static int MIN_DELAY_TIME
= 0; // millisec
1641 private final long rangeOfDelayMs
;
1643 PeriodicMemStoreFlusher(int cacheFlushInterval
, final HRegionServer server
) {
1644 super("MemstoreFlusherChore", server
, cacheFlushInterval
);
1645 this.server
= server
;
1647 final long configuredRangeOfDelay
= server
.getConfiguration().getInt(
1648 "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY
);
1649 this.rangeOfDelayMs
= TimeUnit
.SECONDS
.toMillis(configuredRangeOfDelay
);
1653 protected void chore() {
1654 final StringBuilder whyFlush
= new StringBuilder();
1655 for (HRegion r
: this.server
.onlineRegions
.values()) {
1659 if (r
.shouldFlush(whyFlush
)) {
1660 FlushRequester requester
= server
.getFlushRequester();
1661 if (requester
!= null) {
1662 long delay
= ThreadLocalRandom
.current().nextLong(rangeOfDelayMs
) + MIN_DELAY_TIME
;
1663 //Throttle the flushes by putting a delay. If we don't throttle, and there
1664 //is a balanced write-load on the regions in a table, we might end up
1665 //overwhelming the filesystem with too many flushes at once.
1666 if (requester
.requestDelayedFlush(r
, delay
)) {
1667 LOG
.info("{} requesting flush of {} because {} after random delay {} ms",
1668 getName(), r
.getRegionInfo().getRegionNameAsString(), whyFlush
.toString(),
1678 * Report the status of the server. A server is online once all the startup is
1679 * completed (setting up filesystem, starting executorService threads, etc.). This
1680 * method is designed mostly to be useful in tests.
1682 * @return true if online, false if not.
1684 public boolean isOnline() {
1685 return online
.get();
1689 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1690 * be hooked up to WAL.
1692 private void setupWALAndReplication() throws IOException
{
1693 WALFactory factory
= new WALFactory(conf
, serverName
.toString(), this, true);
1694 // TODO Replication make assumptions here based on the default filesystem impl
1695 Path oldLogDir
= new Path(walRootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
1696 String logName
= AbstractFSWALProvider
.getWALDirectoryName(this.serverName
.toString());
1698 Path logDir
= new Path(walRootDir
, logName
);
1699 LOG
.debug("logDir={}", logDir
);
1700 if (this.walFs
.exists(logDir
)) {
1701 throw new RegionServerRunningException(
1702 "Region server has already created directory at " + this.serverName
.toString());
1704 // Always create wal directory as now we need this when master restarts to find out the live
1706 if (!this.walFs
.mkdirs(logDir
)) {
1707 throw new IOException("Can not create wal directory " + logDir
);
1709 // Instantiate replication if replication enabled. Pass it the log directories.
1710 createNewReplicationInstance(conf
, this, this.walFs
, logDir
, oldLogDir
, factory
);
1711 this.walFactory
= factory
;
1715 * Start up replication source and sink handlers.
1717 private void startReplicationService() throws IOException
{
1718 if (sameReplicationSourceAndSink
&& this.replicationSourceHandler
!= null) {
1719 this.replicationSourceHandler
.startReplicationService();
1721 if (this.replicationSourceHandler
!= null) {
1722 this.replicationSourceHandler
.startReplicationService();
1724 if (this.replicationSinkHandler
!= null) {
1725 this.replicationSinkHandler
.startReplicationService();
1731 * @return Master address tracker instance.
1733 public MasterAddressTracker
getMasterAddressTracker() {
1734 return this.masterAddressTracker
;
1738 * Start maintenance Threads, Server, Worker and lease checker threads.
1739 * Start all threads we need to run. This is called after we've successfully
1740 * registered with the Master.
1741 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1742 * get an unhandled exception. We cannot set the handler on all threads.
1743 * Server's internal Listener thread is off limits. For Server, if an OOME, it
1744 * waits a while then retries. Meantime, a flush or a compaction that tries to
1745 * run should trigger same critical condition and the shutdown will run. On
1746 * its way out, this server will shut down Server. Leases are sort of
1747 * inbetween. It has an internal thread that while it inherits from Chore, it
1748 * keeps its own internal stop mechanism so needs to be stopped by this
1749 * hosting server. Worker logs the exception and exits.
1751 private void startServices() throws IOException
{
1752 if (!isStopped() && !isAborted()) {
1753 initializeThreads();
1755 this.secureBulkLoadManager
= new SecureBulkLoadManager(this.conf
, asyncClusterConnection
);
1756 this.secureBulkLoadManager
.start();
1758 // Health checker thread.
1759 if (isHealthCheckerConfigured()) {
1760 int sleepTime
= this.conf
.getInt(HConstants
.HEALTH_CHORE_WAKE_FREQ
,
1761 HConstants
.DEFAULT_THREAD_WAKE_FREQUENCY
);
1762 healthCheckChore
= new HealthCheckChore(sleepTime
, this, getConfiguration());
1764 // Executor status collect thread.
1765 if (this.conf
.getBoolean(HConstants
.EXECUTOR_STATUS_COLLECT_ENABLED
,
1766 HConstants
.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED
)) {
1767 int sleepTime
= this.conf
.getInt(ExecutorStatusChore
.WAKE_FREQ
,
1768 ExecutorStatusChore
.DEFAULT_WAKE_FREQ
);
1769 executorStatusChore
= new ExecutorStatusChore(sleepTime
, this, this.getExecutorService(),
1770 this.metricsRegionServer
.getMetricsSource());
1773 this.walRoller
= new LogRoller(this);
1774 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, conf
);
1775 this.procedureResultReporter
= new RemoteProcedureResultReporter(this);
1777 // Create the CompactedFileDischarger chore executorService. This chore helps to
1778 // remove the compacted files that will no longer be used in reads.
1779 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1780 // 2 mins so that compacted files can be archived before the TTLCleaner runs
1781 int cleanerInterval
=
1782 conf
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1783 this.compactedFileDischarger
=
1784 new CompactedHFilesDischarger(cleanerInterval
, this, this);
1785 choreService
.scheduleChore(compactedFileDischarger
);
1787 // Start executor services
1788 final int openRegionThreads
= conf
.getInt("hbase.regionserver.executor.openregion.threads", 3);
1789 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1790 ExecutorType
.RS_OPEN_REGION
).setCorePoolSize(openRegionThreads
));
1791 final int openMetaThreads
= conf
.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1792 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1793 ExecutorType
.RS_OPEN_META
).setCorePoolSize(openMetaThreads
));
1794 final int openPriorityRegionThreads
=
1795 conf
.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1796 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1797 ExecutorType
.RS_OPEN_PRIORITY_REGION
).setCorePoolSize(openPriorityRegionThreads
));
1798 final int closeRegionThreads
=
1799 conf
.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1800 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1801 ExecutorType
.RS_CLOSE_REGION
).setCorePoolSize(closeRegionThreads
));
1802 final int closeMetaThreads
= conf
.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1803 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1804 ExecutorType
.RS_CLOSE_META
).setCorePoolSize(closeMetaThreads
));
1805 if (conf
.getBoolean(StoreScanner
.STORESCANNER_PARALLEL_SEEK_ENABLE
, false)) {
1806 final int storeScannerParallelSeekThreads
=
1807 conf
.getInt("hbase.storescanner.parallel.seek.threads", 10);
1808 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1809 ExecutorType
.RS_PARALLEL_SEEK
).setCorePoolSize(storeScannerParallelSeekThreads
)
1810 .setAllowCoreThreadTimeout(true));
1812 final int logReplayOpsThreads
= conf
.getInt(
1813 HBASE_SPLIT_WAL_MAX_SPLITTER
, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER
);
1814 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1815 ExecutorType
.RS_LOG_REPLAY_OPS
).setCorePoolSize(logReplayOpsThreads
)
1816 .setAllowCoreThreadTimeout(true));
1817 // Start the threads for compacted files discharger
1818 final int compactionDischargerThreads
=
1819 conf
.getInt(CompactionConfiguration
.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT
, 10);
1820 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1821 ExecutorType
.RS_COMPACTED_FILES_DISCHARGER
).setCorePoolSize(compactionDischargerThreads
));
1822 if (ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(conf
)) {
1823 final int regionReplicaFlushThreads
= conf
.getInt(
1824 "hbase.regionserver.region.replica.flusher.threads", conf
.getInt(
1825 "hbase.regionserver.executor.openregion.threads", 3));
1826 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1827 ExecutorType
.RS_REGION_REPLICA_FLUSH_OPS
).setCorePoolSize(regionReplicaFlushThreads
));
1829 final int refreshPeerThreads
=
1830 conf
.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1831 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1832 ExecutorType
.RS_REFRESH_PEER
).setCorePoolSize(refreshPeerThreads
));
1833 final int replaySyncReplicationWALThreads
=
1834 conf
.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1835 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1836 ExecutorType
.RS_REPLAY_SYNC_REPLICATION_WAL
).setCorePoolSize(
1837 replaySyncReplicationWALThreads
));
1838 final int switchRpcThrottleThreads
=
1839 conf
.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1840 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1841 ExecutorType
.RS_SWITCH_RPC_THROTTLE
).setCorePoolSize(switchRpcThrottleThreads
));
1842 final int claimReplicationQueueThreads
=
1843 conf
.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1844 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1845 ExecutorType
.RS_CLAIM_REPLICATION_QUEUE
).setCorePoolSize(claimReplicationQueueThreads
));
1846 final int rsSnapshotOperationThreads
=
1847 conf
.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1848 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
1849 ExecutorType
.RS_SNAPSHOT_OPERATIONS
).setCorePoolSize(rsSnapshotOperationThreads
));
1851 Threads
.setDaemonThreadRunning(this.walRoller
, getName() + ".logRoller",
1852 uncaughtExceptionHandler
);
1853 if (this.cacheFlusher
!= null) {
1854 this.cacheFlusher
.start(uncaughtExceptionHandler
);
1856 Threads
.setDaemonThreadRunning(this.procedureResultReporter
,
1857 getName() + ".procedureResultReporter", uncaughtExceptionHandler
);
1859 if (this.compactionChecker
!= null) {
1860 choreService
.scheduleChore(compactionChecker
);
1862 if (this.periodicFlusher
!= null) {
1863 choreService
.scheduleChore(periodicFlusher
);
1865 if (this.healthCheckChore
!= null) {
1866 choreService
.scheduleChore(healthCheckChore
);
1868 if (this.executorStatusChore
!= null) {
1869 choreService
.scheduleChore(executorStatusChore
);
1871 if (this.nonceManagerChore
!= null) {
1872 choreService
.scheduleChore(nonceManagerChore
);
1874 if (this.storefileRefresher
!= null) {
1875 choreService
.scheduleChore(storefileRefresher
);
1877 if (this.fsUtilizationChore
!= null) {
1878 choreService
.scheduleChore(fsUtilizationChore
);
1880 if (this.slowLogTableOpsChore
!= null) {
1881 choreService
.scheduleChore(slowLogTableOpsChore
);
1883 if (this.brokenStoreFileCleaner
!= null) {
1884 choreService
.scheduleChore(brokenStoreFileCleaner
);
1887 // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1888 // an unhandled exception, it will just exit.
1889 Threads
.setDaemonThreadRunning(this.leaseManager
, getName() + ".leaseChecker",
1890 uncaughtExceptionHandler
);
1892 // Create the log splitting worker and start it
1893 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1894 // quite a while inside Connection layer. The worker won't be available for other
1895 // tasks even after current task is preempted after a split task times out.
1896 Configuration sinkConf
= HBaseConfiguration
.create(conf
);
1897 sinkConf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
1898 conf
.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1899 sinkConf
.setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
,
1900 conf
.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1901 sinkConf
.setInt(HConstants
.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER
, 1);
1902 if (this.csm
!= null && conf
.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK
,
1903 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
)) {
1904 // SplitLogWorker needs csm. If none, don't start this.
1905 this.splitLogWorker
= new SplitLogWorker(sinkConf
, this, this, walFactory
);
1906 splitLogWorker
.start();
1907 LOG
.debug("SplitLogWorker started");
1910 // Memstore services.
1911 startHeapMemoryManager();
1912 // Call it after starting HeapMemoryManager.
1913 initializeMemStoreChunkCreator(hMemManager
);
1916 private void initializeThreads() {
1917 // Cache flushing thread.
1918 this.cacheFlusher
= new MemStoreFlusher(conf
, this);
1920 // Compaction thread
1921 this.compactSplitThread
= new CompactSplit(this);
1923 // Background thread to check for compactions; needed if region has not gotten updates
1924 // in a while. It will take care of not checking too frequently on store-by-store basis.
1925 this.compactionChecker
= new CompactionChecker(this, this.compactionCheckFrequency
, this);
1926 this.periodicFlusher
= new PeriodicMemStoreFlusher(this.flushCheckFrequency
, this);
1927 this.leaseManager
= new LeaseManager(this.threadWakeFrequency
);
1929 final boolean isSlowLogTableEnabled
= conf
.getBoolean(HConstants
.SLOW_LOG_SYS_TABLE_ENABLED_KEY
,
1930 HConstants
.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY
);
1931 if (isSlowLogTableEnabled
) {
1932 // default chore duration: 10 min
1933 final int duration
= conf
.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
1934 slowLogTableOpsChore
= new SlowLogTableOpsChore(this, duration
, this.namedQueueRecorder
);
1937 if (this.nonceManager
!= null) {
1938 // Create the scheduled chore that cleans up nonces.
1939 nonceManagerChore
= this.nonceManager
.createCleanupScheduledChore(this);
1942 // Setup the Quota Manager
1943 rsQuotaManager
= new RegionServerRpcQuotaManager(this);
1944 rsSpaceQuotaManager
= new RegionServerSpaceQuotaManager(this);
1946 if (QuotaUtil
.isQuotaEnabled(conf
)) {
1947 this.fsUtilizationChore
= new FileSystemUtilizationChore(this);
1951 boolean onlyMetaRefresh
= false;
1952 int storefileRefreshPeriod
= conf
.getInt(
1953 StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
,
1954 StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
1955 if (storefileRefreshPeriod
== 0) {
1956 storefileRefreshPeriod
= conf
.getInt(
1957 StorefileRefresherChore
.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD
,
1958 StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
1959 onlyMetaRefresh
= true;
1961 if (storefileRefreshPeriod
> 0) {
1962 this.storefileRefresher
= new StorefileRefresherChore(storefileRefreshPeriod
,
1963 onlyMetaRefresh
, this, this);
1966 int brokenStoreFileCleanerPeriod
= conf
.getInt(
1967 BrokenStoreFileCleaner
.BROKEN_STOREFILE_CLEANER_PERIOD
,
1968 BrokenStoreFileCleaner
.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD
);
1969 int brokenStoreFileCleanerDelay
= conf
.getInt(
1970 BrokenStoreFileCleaner
.BROKEN_STOREFILE_CLEANER_DELAY
,
1971 BrokenStoreFileCleaner
.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY
);
1972 double brokenStoreFileCleanerDelayJitter
= conf
.getDouble(
1973 BrokenStoreFileCleaner
.BROKEN_STOREFILE_CLEANER_DELAY_JITTER
,
1974 BrokenStoreFileCleaner
.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER
);
1975 double jitterRate
= (ThreadLocalRandom
.current().nextDouble() - 0.5D
) *
1976 brokenStoreFileCleanerDelayJitter
;
1977 long jitterValue
= Math
.round(brokenStoreFileCleanerDelay
* jitterRate
);
1978 this.brokenStoreFileCleaner
=
1979 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay
+ jitterValue
),
1980 brokenStoreFileCleanerPeriod
, this, conf
, this);
1982 registerConfigurationObservers();
1985 private void registerConfigurationObservers() {
1986 // Registering the compactSplitThread object with the ConfigurationManager.
1987 configurationManager
.registerObserver(this.compactSplitThread
);
1988 configurationManager
.registerObserver(this.rpcServices
);
1989 configurationManager
.registerObserver(this);
1993 * Verify that server is healthy
1995 private boolean isHealthy() {
1997 // File system problem
2000 // Verify that all threads are alive
2001 boolean healthy
= (this.leaseManager
== null || this.leaseManager
.isAlive())
2002 && (this.cacheFlusher
== null || this.cacheFlusher
.isAlive())
2003 && (this.walRoller
== null || this.walRoller
.isAlive())
2004 && (this.compactionChecker
== null || this.compactionChecker
.isScheduled())
2005 && (this.periodicFlusher
== null || this.periodicFlusher
.isScheduled());
2007 stop("One or more threads are no longer alive -- stop");
2013 public List
<WAL
> getWALs() {
2014 return walFactory
.getWALs();
2018 public WAL
getWAL(RegionInfo regionInfo
) throws IOException
{
2019 WAL wal
= walFactory
.getWAL(regionInfo
);
2020 if (this.walRoller
!= null) {
2021 this.walRoller
.addWAL(wal
);
2026 public LogRoller
getWalRoller() {
2030 WALFactory
getWalFactory() {
2035 public void stop(final String msg
) {
2036 stop(msg
, false, RpcServer
.getRequestUser().orElse(null));
2040 * Stops the regionserver.
2041 * @param msg Status message
2042 * @param force True if this is a regionserver abort
2043 * @param user The user executing the stop request, or null if no user is associated
2045 public void stop(final String msg
, final boolean force
, final User user
) {
2046 if (!this.stopped
) {
2047 LOG
.info("***** STOPPING region server '" + this + "' *****");
2048 if (this.rsHost
!= null) {
2049 // when forced via abort don't allow CPs to override
2051 this.rsHost
.preStop(msg
, user
);
2052 } catch (IOException ioe
) {
2054 LOG
.warn("The region server did not stop", ioe
);
2057 LOG
.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe
);
2060 this.stopped
= true;
2061 LOG
.info("STOPPED: " + msg
);
2062 // Wakes run() if it is sleeping
2063 sleeper
.skipSleepCycle();
2067 public void waitForServerOnline(){
2068 while (!isStopped() && !isOnline()) {
2069 synchronized (online
) {
2071 online
.wait(msgInterval
);
2072 } catch (InterruptedException ie
) {
2073 Thread
.currentThread().interrupt();
2081 public void postOpenDeployTasks(final PostOpenDeployContext context
) throws IOException
{
2082 HRegion r
= context
.getRegion();
2083 long openProcId
= context
.getOpenProcId();
2084 long masterSystemTime
= context
.getMasterSystemTime();
2085 rpcServices
.checkOpen();
2086 LOG
.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2087 r
.getRegionInfo().getRegionNameAsString(), openProcId
, masterSystemTime
);
2088 // Do checks to see if we need to compact (references or too many files)
2089 // Skip compaction check if region is read only
2090 if (!r
.isReadOnly()) {
2091 for (HStore s
: r
.stores
.values()) {
2092 if (s
.hasReferences() || s
.needsCompaction()) {
2093 this.compactSplitThread
.requestSystemCompaction(r
, s
, "Opening Region");
2097 long openSeqNum
= r
.getOpenSeqNum();
2098 if (openSeqNum
== HConstants
.NO_SEQNUM
) {
2099 // If we opened a region, we should have read some sequence number from it.
2101 "No sequence number found when opening " + r
.getRegionInfo().getRegionNameAsString());
2106 if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode
.OPENED
,
2107 openSeqNum
, openProcId
, masterSystemTime
, r
.getRegionInfo()))) {
2108 throw new IOException(
2109 "Failed to report opened region to master: " + r
.getRegionInfo().getRegionNameAsString());
2112 triggerFlushInPrimaryRegion(r
);
2114 LOG
.debug("Finished post open deploy task for " + r
.getRegionInfo().getRegionNameAsString());
2118 * Helper method for use in tests. Skip the region transition report when there's no master
2119 * around to receive it.
2121 private boolean skipReportingTransition(final RegionStateTransitionContext context
) {
2122 final TransitionCode code
= context
.getCode();
2123 final long openSeqNum
= context
.getOpenSeqNum();
2124 long masterSystemTime
= context
.getMasterSystemTime();
2125 final RegionInfo
[] hris
= context
.getHris();
2127 if (code
== TransitionCode
.OPENED
) {
2128 Preconditions
.checkArgument(hris
!= null && hris
.length
== 1);
2129 if (hris
[0].isMetaRegion()) {
2131 "meta table location is stored in master local store, so we can not skip reporting");
2135 MetaTableAccessor
.updateRegionLocation(asyncClusterConnection
.toConnection(), hris
[0],
2136 serverName
, openSeqNum
, masterSystemTime
);
2137 } catch (IOException e
) {
2138 LOG
.info("Failed to update meta", e
);
2146 private ReportRegionStateTransitionRequest
createReportRegionStateTransitionRequest(
2147 final RegionStateTransitionContext context
) {
2148 final TransitionCode code
= context
.getCode();
2149 final long openSeqNum
= context
.getOpenSeqNum();
2150 final RegionInfo
[] hris
= context
.getHris();
2151 final long[] procIds
= context
.getProcIds();
2153 ReportRegionStateTransitionRequest
.Builder builder
=
2154 ReportRegionStateTransitionRequest
.newBuilder();
2155 builder
.setServer(ProtobufUtil
.toServerName(serverName
));
2156 RegionStateTransition
.Builder transition
= builder
.addTransitionBuilder();
2157 transition
.setTransitionCode(code
);
2158 if (code
== TransitionCode
.OPENED
&& openSeqNum
>= 0) {
2159 transition
.setOpenSeqNum(openSeqNum
);
2161 for (RegionInfo hri
: hris
) {
2162 transition
.addRegionInfo(ProtobufUtil
.toRegionInfo(hri
));
2164 for (long procId
: procIds
) {
2165 transition
.addProcId(procId
);
2168 return builder
.build();
2172 public boolean reportRegionStateTransition(final RegionStateTransitionContext context
) {
2173 if (TEST_SKIP_REPORTING_TRANSITION
) {
2174 return skipReportingTransition(context
);
2176 final ReportRegionStateTransitionRequest request
=
2177 createReportRegionStateTransitionRequest(context
);
2180 long pauseTime
= this.retryPauseTime
;
2181 // Keep looping till we get an error. We want to send reports even though server is going down.
2182 // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2183 // HRegionServer does down.
2184 while (this.asyncClusterConnection
!= null && !this.asyncClusterConnection
.isClosed()) {
2185 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2188 createRegionServerStatusStub();
2191 ReportRegionStateTransitionResponse response
=
2192 rss
.reportRegionStateTransition(null, request
);
2193 if (response
.hasErrorMessage()) {
2194 LOG
.info("TRANSITION FAILED " + request
+ ": " + response
.getErrorMessage());
2197 // Log if we had to retry else don't log unless TRACE. We want to
2198 // know if were successful after an attempt showed in logs as failed.
2199 if (tries
> 0 || LOG
.isTraceEnabled()) {
2200 LOG
.info("TRANSITION REPORTED " + request
);
2202 // NOTE: Return mid-method!!!
2204 } catch (ServiceException se
) {
2205 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2207 ioe
instanceof ServerNotRunningYetException
|| ioe
instanceof PleaseHoldException
2208 || ioe
instanceof CallQueueTooBigException
;
2210 // Do backoff else we flood the Master with requests.
2211 pauseTime
= ConnectionUtils
.getPauseTime(this.retryPauseTime
, tries
);
2213 pauseTime
= this.retryPauseTime
; // Reset.
2215 LOG
.info("Failed report transition " +
2216 TextFormat
.shortDebugString(request
) + "; retry (#" + tries
+ ")" +
2218 " after " + pauseTime
+ "ms delay (Master is coming online...).":
2222 Threads
.sleep(pauseTime
);
2225 if (rssStub
== rss
) {
2234 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2235 * block this thread. See RegionReplicaFlushHandler for details.
2237 private void triggerFlushInPrimaryRegion(final HRegion region
) {
2238 if (ServerRegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo())) {
2241 TableName tn
= region
.getTableDescriptor().getTableName();
2242 if (!ServerRegionReplicaUtil
.isRegionReplicaReplicationEnabled(region
.conf
, tn
) ||
2243 !ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(region
.conf
) ||
2244 // If the memstore replication not setup, we do not have to wait for observing a flush event
2245 // from primary before starting to serve reads, because gaps from replication is not
2246 // applicable,this logic is from
2247 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2249 !region
.getTableDescriptor().hasRegionMemStoreReplication()) {
2250 region
.setReadsEnabled(true);
2254 region
.setReadsEnabled(false); // disable reads before marking the region as opened.
2255 // RegionReplicaFlushHandler might reset this.
2257 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2258 if (this.executorService
!= null) {
2259 this.executorService
.submit(new RegionReplicaFlushHandler(this, region
));
2261 LOG
.info("Executor is null; not running flush of primary region replica for {}",
2262 region
.getRegionInfo());
2266 @InterfaceAudience.Private
2267 public RSRpcServices
getRSRpcServices() {
2272 * Cause the server to exit without closing the regions it is serving, the log
2273 * it is using and without notifying the master. Used unit testing and on
2274 * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2277 * the reason we are aborting
2279 * the exception that caused the abort, or null
2282 public void abort(String reason
, Throwable cause
) {
2283 if (!setAbortRequested()) {
2284 // Abort already in progress, ignore the new request.
2286 "Abort already in progress. Ignoring the current request with reason: {}", reason
);
2289 String msg
= "***** ABORTING region server " + this + ": " + reason
+ " *****";
2290 if (cause
!= null) {
2291 LOG
.error(HBaseMarkers
.FATAL
, msg
, cause
);
2293 LOG
.error(HBaseMarkers
.FATAL
, msg
);
2295 // HBASE-4014: show list of coprocessors that were loaded to help debug
2296 // regionserver crashes.Note that we're implicitly using
2297 // java.util.HashSet's toString() method to print the coprocessor names.
2298 LOG
.error(HBaseMarkers
.FATAL
, "RegionServer abort: loaded coprocessors are: " +
2299 CoprocessorHost
.getLoadedCoprocessors());
2300 // Try and dump metrics if abort -- might give clue as to how fatal came about....
2302 LOG
.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics
.dumpMetrics());
2303 } catch (MalformedObjectNameException
| IOException e
) {
2304 LOG
.warn("Failed dumping metrics", e
);
2307 // Do our best to report our abort to the master, but this may not work
2309 if (cause
!= null) {
2310 msg
+= "\nCause:\n" + Throwables
.getStackTraceAsString(cause
);
2312 // Report to the master but only if we have already registered with the master.
2313 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2314 if (rss
!= null && this.serverName
!= null) {
2315 ReportRSFatalErrorRequest
.Builder builder
=
2316 ReportRSFatalErrorRequest
.newBuilder();
2317 builder
.setServer(ProtobufUtil
.toServerName(this.serverName
));
2318 builder
.setErrorMessage(msg
);
2319 rss
.reportRSFatalError(null, builder
.build());
2321 } catch (Throwable t
) {
2322 LOG
.warn("Unable to report fatal error to master", t
);
2325 scheduleAbortTimer();
2326 // shutdown should be run as the internal user
2327 stop(reason
, true, null);
2331 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2332 * logs but it does close socket in case want to bring up server on old
2333 * hostname+port immediately.
2335 @InterfaceAudience.Private
2336 protected void kill() {
2338 abort("Simulated kill");
2341 // Limits the time spent in the shutdown process.
2342 private void scheduleAbortTimer() {
2343 if (this.abortMonitor
== null) {
2344 this.abortMonitor
= new Timer("Abort regionserver monitor", true);
2345 TimerTask abortTimeoutTask
= null;
2347 Constructor
<?
extends TimerTask
> timerTaskCtor
=
2348 Class
.forName(conf
.get(ABORT_TIMEOUT_TASK
, SystemExitWhenAbortTimeout
.class.getName()))
2349 .asSubclass(TimerTask
.class).getDeclaredConstructor();
2350 timerTaskCtor
.setAccessible(true);
2351 abortTimeoutTask
= timerTaskCtor
.newInstance();
2352 } catch (Exception e
) {
2353 LOG
.warn("Initialize abort timeout task failed", e
);
2355 if (abortTimeoutTask
!= null) {
2356 abortMonitor
.schedule(abortTimeoutTask
, conf
.getLong(ABORT_TIMEOUT
, DEFAULT_ABORT_TIMEOUT
));
2362 * Wait on all threads to finish. Presumption is that all closes and stops
2363 * have already been called.
2365 protected void stopServiceThreads() {
2366 // clean up the scheduled chores
2368 if (bootstrapNodeManager
!= null) {
2369 bootstrapNodeManager
.stop();
2371 if (this.cacheFlusher
!= null) {
2372 this.cacheFlusher
.join();
2374 if (this.walRoller
!= null) {
2375 this.walRoller
.close();
2377 if (this.compactSplitThread
!= null) {
2378 this.compactSplitThread
.join();
2380 stopExecutorService();
2381 if (sameReplicationSourceAndSink
&& this.replicationSourceHandler
!= null) {
2382 this.replicationSourceHandler
.stopReplicationService();
2384 if (this.replicationSourceHandler
!= null) {
2385 this.replicationSourceHandler
.stopReplicationService();
2387 if (this.replicationSinkHandler
!= null) {
2388 this.replicationSinkHandler
.stopReplicationService();
2394 * @return Return the object that implements the replication source executorService.
2397 public ReplicationSourceService
getReplicationSourceService() {
2398 return replicationSourceHandler
;
2402 * @return Return the object that implements the replication sink executorService.
2404 public ReplicationSinkService
getReplicationSinkService() {
2405 return replicationSinkHandler
;
2409 * Get the current master from ZooKeeper and open the RPC connection to it.
2410 * To get a fresh connection, the current rssStub must be null.
2411 * Method will block until a master is available. You can break from this
2412 * block by requesting the server stop.
2414 * @return master + port, or null if server has been stopped
2416 private synchronized ServerName
createRegionServerStatusStub() {
2417 // Create RS stub without refreshing the master node from ZK, use cached data
2418 return createRegionServerStatusStub(false);
2422 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2423 * connection, the current rssStub must be null. Method will block until a master is available.
2424 * You can break from this block by requesting the server stop.
2425 * @param refresh If true then master address will be read from ZK, otherwise use cached data
2426 * @return master + port, or null if server has been stopped
2428 @InterfaceAudience.Private
2429 protected synchronized ServerName
createRegionServerStatusStub(boolean refresh
) {
2430 if (rssStub
!= null) {
2431 return masterAddressTracker
.getMasterAddress();
2433 ServerName sn
= null;
2434 long previousLogTime
= 0;
2435 RegionServerStatusService
.BlockingInterface intRssStub
= null;
2436 LockService
.BlockingInterface intLockStub
= null;
2437 boolean interrupted
= false;
2439 while (keepLooping()) {
2440 sn
= this.masterAddressTracker
.getMasterAddress(refresh
);
2442 if (!keepLooping()) {
2443 // give up with no connection.
2444 LOG
.debug("No master found and cluster is stopped; bailing out");
2447 if (EnvironmentEdgeManager
.currentTime() > (previousLogTime
+ 1000)) {
2448 LOG
.debug("No master found; retry");
2449 previousLogTime
= EnvironmentEdgeManager
.currentTime();
2451 refresh
= true; // let's try pull it from ZK directly
2452 if (sleepInterrupted(200)) {
2458 BlockingRpcChannel channel
=
2459 this.rpcClient
.createBlockingRpcChannel(sn
, userProvider
.getCurrent(),
2460 shortOperationTimeout
);
2461 intRssStub
= RegionServerStatusService
.newBlockingStub(channel
);
2462 intLockStub
= LockService
.newBlockingStub(channel
);
2464 } catch (IOException e
) {
2465 if (EnvironmentEdgeManager
.currentTime() > (previousLogTime
+ 1000)) {
2466 e
= e
instanceof RemoteException ?
2467 ((RemoteException
)e
).unwrapRemoteException() : e
;
2468 if (e
instanceof ServerNotRunningYetException
) {
2469 LOG
.info("Master isn't available yet, retrying");
2471 LOG
.warn("Unable to connect to master. Retrying. Error was:", e
);
2473 previousLogTime
= EnvironmentEdgeManager
.currentTime();
2475 if (sleepInterrupted(200)) {
2482 Thread
.currentThread().interrupt();
2485 this.rssStub
= intRssStub
;
2486 this.lockStub
= intLockStub
;
2491 * @return True if we should break loop because cluster is going down or this server has been
2492 * stopped or hdfs has gone bad.
2494 private boolean keepLooping() {
2495 return !this.stopped
&& isClusterUp();
2499 * Let the master know we're here Run initialization using parameters passed
2501 * @return A Map of key/value configurations we got from the Master else
2502 * null if we failed to register.
2503 * @throws IOException
2505 private RegionServerStartupResponse
reportForDuty() throws IOException
{
2506 if (this.masterless
) {
2507 return RegionServerStartupResponse
.getDefaultInstance();
2509 ServerName masterServerName
= createRegionServerStatusStub(true);
2510 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2511 if (masterServerName
== null || rss
== null) {
2514 RegionServerStartupResponse result
= null;
2516 rpcServices
.requestCount
.reset();
2517 rpcServices
.rpcGetRequestCount
.reset();
2518 rpcServices
.rpcScanRequestCount
.reset();
2519 rpcServices
.rpcFullScanRequestCount
.reset();
2520 rpcServices
.rpcMultiRequestCount
.reset();
2521 rpcServices
.rpcMutateRequestCount
.reset();
2522 LOG
.info("reportForDuty to master=" + masterServerName
+ " with port="
2523 + rpcServices
.getSocketAddress().getPort() + ", startcode=" + this.startcode
);
2524 long now
= EnvironmentEdgeManager
.currentTime();
2525 int port
= rpcServices
.getSocketAddress().getPort();
2526 RegionServerStartupRequest
.Builder request
= RegionServerStartupRequest
.newBuilder();
2527 if (!StringUtils
.isBlank(useThisHostnameInstead
)) {
2528 request
.setUseThisHostnameInstead(useThisHostnameInstead
);
2530 request
.setPort(port
);
2531 request
.setServerStartCode(this.startcode
);
2532 request
.setServerCurrentTime(now
);
2533 result
= rss
.regionServerStartup(null, request
.build());
2534 } catch (ServiceException se
) {
2535 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2536 if (ioe
instanceof ClockOutOfSyncException
) {
2537 LOG
.error(HBaseMarkers
.FATAL
, "Master rejected startup because clock is out of sync",
2539 // Re-throw IOE will cause RS to abort
2541 } else if (ioe
instanceof ServerNotRunningYetException
) {
2542 LOG
.debug("Master is not running yet");
2544 LOG
.warn("error telling master we are up", se
);
2552 public RegionStoreSequenceIds
getLastSequenceId(byte[] encodedRegionName
) {
2554 GetLastFlushedSequenceIdRequest req
=
2555 RequestConverter
.buildGetLastFlushedSequenceIdRequest(encodedRegionName
);
2556 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2557 if (rss
== null) { // Try to connect one more time
2558 createRegionServerStatusStub();
2561 // Still no luck, we tried
2562 LOG
.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2563 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2567 GetLastFlushedSequenceIdResponse resp
= rss
.getLastFlushedSequenceId(null, req
);
2568 return RegionStoreSequenceIds
.newBuilder()
2569 .setLastFlushedSequenceId(resp
.getLastFlushedSequenceId())
2570 .addAllStoreSequenceId(resp
.getStoreLastFlushedSequenceIdList()).build();
2571 } catch (ServiceException e
) {
2572 LOG
.warn("Unable to connect to the master to check the last flushed sequence id", e
);
2573 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2579 * Close meta region if we carry it
2580 * @param abort Whether we're running an abort.
2582 private void closeMetaTableRegions(final boolean abort
) {
2583 HRegion meta
= null;
2584 this.onlineRegionsLock
.writeLock().lock();
2586 for (Map
.Entry
<String
, HRegion
> e
: onlineRegions
.entrySet()) {
2587 RegionInfo hri
= e
.getValue().getRegionInfo();
2588 if (hri
.isMetaRegion()) {
2589 meta
= e
.getValue();
2596 this.onlineRegionsLock
.writeLock().unlock();
2599 closeRegionIgnoreErrors(meta
.getRegionInfo(), abort
);
2604 * Schedule closes on all user regions.
2605 * Should be safe calling multiple times because it wont' close regions
2606 * that are already closed or that are closing.
2607 * @param abort Whether we're running an abort.
2609 private void closeUserRegions(final boolean abort
) {
2610 this.onlineRegionsLock
.writeLock().lock();
2612 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
2613 HRegion r
= e
.getValue();
2614 if (!r
.getRegionInfo().isMetaRegion() && r
.isAvailable()) {
2615 // Don't update zk with this close transition; pass false.
2616 closeRegionIgnoreErrors(r
.getRegionInfo(), abort
);
2620 this.onlineRegionsLock
.writeLock().unlock();
2624 protected Map
<String
, HRegion
> getOnlineRegions() {
2625 return this.onlineRegions
;
2628 public int getNumberOfOnlineRegions() {
2629 return this.onlineRegions
.size();
2633 * For tests, web ui and metrics.
2634 * This method will only work if HRegionServer is in the same JVM as client;
2635 * HRegion cannot be serialized to cross an rpc.
2637 public Collection
<HRegion
> getOnlineRegionsLocalContext() {
2638 Collection
<HRegion
> regions
= this.onlineRegions
.values();
2639 return Collections
.unmodifiableCollection(regions
);
2643 public void addRegion(HRegion region
) {
2644 this.onlineRegions
.put(region
.getRegionInfo().getEncodedName(), region
);
2645 configurationManager
.registerObserver(region
);
2648 private void addRegion(SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
, HRegion region
,
2650 if (!sortedRegions
.containsKey(size
)) {
2651 sortedRegions
.put(size
, new ArrayList
<>());
2653 sortedRegions
.get(size
).add(region
);
2656 * @return A new Map of online regions sorted by region off-heap size with the first entry being
2659 SortedMap
<Long
, Collection
<HRegion
>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2660 // we'll sort the regions in reverse
2661 SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
= new TreeMap
<>(Comparator
.reverseOrder());
2662 // Copy over all regions. Regions are sorted by size with biggest first.
2663 for (HRegion region
: this.onlineRegions
.values()) {
2664 addRegion(sortedRegions
, region
, region
.getMemStoreOffHeapSize());
2666 return sortedRegions
;
2670 * @return A new Map of online regions sorted by region heap size with the first entry being the
2673 SortedMap
<Long
, Collection
<HRegion
>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2674 // we'll sort the regions in reverse
2675 SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
= new TreeMap
<>(Comparator
.reverseOrder());
2676 // Copy over all regions. Regions are sorted by size with biggest first.
2677 for (HRegion region
: this.onlineRegions
.values()) {
2678 addRegion(sortedRegions
, region
, region
.getMemStoreHeapSize());
2680 return sortedRegions
;
2683 /** @return reference to FlushRequester */
2685 public FlushRequester
getFlushRequester() {
2686 return this.cacheFlusher
;
2690 public CompactionRequester
getCompactionRequestor() {
2691 return this.compactSplitThread
;
2695 public LeaseManager
getLeaseManager() {
2696 return leaseManager
;
2700 * @return {@code true} when the data file system is available, {@code false} otherwise.
2702 boolean isDataFileSystemOk() {
2703 return this.dataFsOk
;
2706 public RegionServerCoprocessorHost
getRegionServerCoprocessorHost(){
2711 public ConcurrentMap
<byte[], Boolean
> getRegionsInTransitionInRS() {
2712 return this.regionsInTransitionInRS
;
2716 public RegionServerRpcQuotaManager
getRegionServerRpcQuotaManager() {
2717 return rsQuotaManager
;
2721 // Main program and support routines
2724 * Load the replication executorService objects, if any
2726 private static void createNewReplicationInstance(Configuration conf
, HRegionServer server
,
2727 FileSystem walFs
, Path walDir
, Path oldWALDir
, WALFactory walFactory
) throws IOException
{
2728 // read in the name of the source replication class from the config file.
2729 String sourceClassname
= conf
.get(HConstants
.REPLICATION_SOURCE_SERVICE_CLASSNAME
,
2730 HConstants
.REPLICATION_SERVICE_CLASSNAME_DEFAULT
);
2732 // read in the name of the sink replication class from the config file.
2733 String sinkClassname
= conf
.get(HConstants
.REPLICATION_SINK_SERVICE_CLASSNAME
,
2734 HConstants
.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT
);
2736 // If both the sink and the source class names are the same, then instantiate
2738 if (sourceClassname
.equals(sinkClassname
)) {
2739 server
.replicationSourceHandler
= newReplicationInstance(sourceClassname
,
2740 ReplicationSourceService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walFactory
);
2741 server
.replicationSinkHandler
= (ReplicationSinkService
) server
.replicationSourceHandler
;
2742 server
.sameReplicationSourceAndSink
= true;
2744 server
.replicationSourceHandler
= newReplicationInstance(sourceClassname
,
2745 ReplicationSourceService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walFactory
);
2746 server
.replicationSinkHandler
= newReplicationInstance(sinkClassname
,
2747 ReplicationSinkService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walFactory
);
2748 server
.sameReplicationSourceAndSink
= false;
2752 private static <T
extends ReplicationService
> T
newReplicationInstance(String classname
,
2753 Class
<T
> xface
, Configuration conf
, HRegionServer server
, FileSystem walFs
, Path logDir
,
2754 Path oldLogDir
, WALFactory walFactory
) throws IOException
{
2755 final Class
<?
extends T
> clazz
;
2757 ClassLoader classLoader
= Thread
.currentThread().getContextClassLoader();
2758 clazz
= Class
.forName(classname
, true, classLoader
).asSubclass(xface
);
2759 } catch (java
.lang
.ClassNotFoundException nfe
) {
2760 throw new IOException("Could not find class for " + classname
);
2762 T service
= ReflectionUtils
.newInstance(clazz
, conf
);
2763 service
.initialize(server
, walFs
, logDir
, oldLogDir
, walFactory
);
2767 public Map
<String
, ReplicationStatus
> getWalGroupsReplicationStatus(){
2768 Map
<String
, ReplicationStatus
> walGroupsReplicationStatus
= new TreeMap
<>();
2769 if(!this.isOnline()){
2770 return walGroupsReplicationStatus
;
2772 List
<ReplicationSourceInterface
> allSources
= new ArrayList
<>();
2773 allSources
.addAll(replicationSourceHandler
.getReplicationManager().getSources());
2774 allSources
.addAll(replicationSourceHandler
.getReplicationManager().getOldSources());
2775 for(ReplicationSourceInterface source
: allSources
){
2776 walGroupsReplicationStatus
.putAll(source
.getWalGroupStatus());
2778 return walGroupsReplicationStatus
;
2782 * Utility for constructing an instance of the passed HRegionServer class.
2784 static HRegionServer
constructRegionServer(
2785 final Class
<?
extends HRegionServer
> regionServerClass
,
2786 final Configuration conf
2789 Constructor
<?
extends HRegionServer
> c
=
2790 regionServerClass
.getConstructor(Configuration
.class);
2791 return c
.newInstance(conf
);
2792 } catch (Exception e
) {
2793 throw new RuntimeException("Failed construction of " + "Regionserver: "
2794 + regionServerClass
.toString(), e
);
2799 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2801 public static void main(String
[] args
) {
2802 LOG
.info("STARTING executorService " + HRegionServer
.class.getSimpleName());
2803 VersionInfo
.logVersion();
2804 Configuration conf
= HBaseConfiguration
.create();
2805 @SuppressWarnings("unchecked")
2806 Class
<?
extends HRegionServer
> regionServerClass
= (Class
<?
extends HRegionServer
>) conf
2807 .getClass(HConstants
.REGION_SERVER_IMPL
, HRegionServer
.class);
2809 new HRegionServerCommandLine(regionServerClass
).doMain(args
);
2813 * Gets the online regions of the specified table.
2814 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>.
2815 * Only returns <em>online</em> regions. If a region on this table has been
2816 * closed during a disable, etc., it will not be included in the returned list.
2817 * So, the returned list may not necessarily be ALL regions in this table, its
2818 * all the ONLINE regions in the table.
2819 * @param tableName table to limit the scope of the query
2820 * @return Online regions from <code>tableName</code>
2823 public List
<HRegion
> getRegions(TableName tableName
) {
2824 List
<HRegion
> tableRegions
= new ArrayList
<>();
2825 synchronized (this.onlineRegions
) {
2826 for (HRegion region
: this.onlineRegions
.values()) {
2827 RegionInfo regionInfo
= region
.getRegionInfo();
2828 if(regionInfo
.getTable().equals(tableName
)) {
2829 tableRegions
.add(region
);
2833 return tableRegions
;
2837 public List
<HRegion
> getRegions() {
2838 List
<HRegion
> allRegions
;
2839 synchronized (this.onlineRegions
) {
2840 // Return a clone copy of the onlineRegions
2841 allRegions
= new ArrayList
<>(onlineRegions
.values());
2847 * Gets the online tables in this RS.
2848 * This method looks at the in-memory onlineRegions.
2849 * @return all the online tables in this RS
2851 public Set
<TableName
> getOnlineTables() {
2852 Set
<TableName
> tables
= new HashSet
<>();
2853 synchronized (this.onlineRegions
) {
2854 for (Region region
: this.onlineRegions
.values()) {
2855 tables
.add(region
.getTableDescriptor().getTableName());
2861 public String
[] getRegionServerCoprocessors() {
2862 TreeSet
<String
> coprocessors
= new TreeSet
<>();
2864 coprocessors
.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2865 } catch (IOException exception
) {
2866 LOG
.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2868 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
2870 Collection
<HRegion
> regions
= getOnlineRegionsLocalContext();
2871 for (HRegion region
: regions
) {
2872 coprocessors
.addAll(region
.getCoprocessorHost().getCoprocessors());
2874 coprocessors
.addAll(getWAL(region
.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2875 } catch (IOException exception
) {
2876 LOG
.warn("Exception attempting to fetch wal coprocessor information for region " + region
+
2878 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
2881 coprocessors
.addAll(rsHost
.getCoprocessors());
2882 return coprocessors
.toArray(new String
[0]);
2886 * Try to close the region, logs a warning on failure but continues.
2887 * @param region Region to close
2889 private void closeRegionIgnoreErrors(RegionInfo region
, final boolean abort
) {
2891 if (!closeRegion(region
.getEncodedName(), abort
, null)) {
2892 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
2893 " - ignoring and continuing");
2895 } catch (IOException e
) {
2896 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
2897 " - ignoring and continuing", e
);
2902 * Close asynchronously a region, can be called from the master or internally by the regionserver
2903 * when stopping. If called from the master, the region will update the status.
2906 * If an opening was in progress, this method will cancel it, but will not start a new close. The
2907 * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2911 * If a close was in progress, this new request will be ignored, and an exception thrown.
2914 * @param encodedName Region to close
2915 * @param abort True if we are aborting
2916 * @param destination Where the Region is being moved too... maybe null if unknown.
2917 * @return True if closed a region.
2918 * @throws NotServingRegionException if the region is not online
2920 protected boolean closeRegion(String encodedName
, final boolean abort
,
2921 final ServerName destination
)
2922 throws NotServingRegionException
{
2923 //Check for permissions to close.
2924 HRegion actualRegion
= this.getRegion(encodedName
);
2925 // Can be null if we're calling close on a region that's not online
2926 if ((actualRegion
!= null) && (actualRegion
.getCoprocessorHost() != null)) {
2928 actualRegion
.getCoprocessorHost().preClose(false);
2929 } catch (IOException exp
) {
2930 LOG
.warn("Unable to close region: the coprocessor launched an error ", exp
);
2935 // previous can come back 'null' if not in map.
2936 final Boolean previous
= this.regionsInTransitionInRS
.putIfAbsent(Bytes
.toBytes(encodedName
),
2939 if (Boolean
.TRUE
.equals(previous
)) {
2940 LOG
.info("Received CLOSE for the region:" + encodedName
+ " , which we are already " +
2941 "trying to OPEN. Cancelling OPENING.");
2942 if (!regionsInTransitionInRS
.replace(Bytes
.toBytes(encodedName
), previous
, Boolean
.FALSE
)) {
2943 // The replace failed. That should be an exceptional case, but theoretically it can happen.
2944 // We're going to try to do a standard close then.
2945 LOG
.warn("The opening for region " + encodedName
+ " was done before we could cancel it." +
2946 " Doing a standard close now");
2947 return closeRegion(encodedName
, abort
, destination
);
2949 // Let's get the region from the online region list again
2950 actualRegion
= this.getRegion(encodedName
);
2951 if (actualRegion
== null) { // If already online, we still need to close it.
2952 LOG
.info("The opening previously in progress has been cancelled by a CLOSE request.");
2953 // The master deletes the znode when it receives this exception.
2954 throw new NotServingRegionException("The region " + encodedName
+
2955 " was opening but not yet served. Opening is cancelled.");
2957 } else if (previous
== null) {
2958 LOG
.info("Received CLOSE for {}", encodedName
);
2959 } else if (Boolean
.FALSE
.equals(previous
)) {
2960 LOG
.info("Received CLOSE for the region: " + encodedName
+
2961 ", which we are already trying to CLOSE, but not completed yet");
2965 if (actualRegion
== null) {
2966 LOG
.debug("Received CLOSE for a region which is not online, and we're not opening.");
2967 this.regionsInTransitionInRS
.remove(Bytes
.toBytes(encodedName
));
2968 // The master deletes the znode when it receives this exception.
2969 throw new NotServingRegionException("The region " + encodedName
+
2970 " is not online, and is not opening.");
2973 CloseRegionHandler crh
;
2974 final RegionInfo hri
= actualRegion
.getRegionInfo();
2975 if (hri
.isMetaRegion()) {
2976 crh
= new CloseMetaHandler(this, this, hri
, abort
);
2978 crh
= new CloseRegionHandler(this, this, hri
, abort
, destination
);
2980 this.executorService
.submit(crh
);
2985 * @return HRegion for the passed binary <code>regionName</code> or null if
2986 * named region is not member of the online regions.
2988 public HRegion
getOnlineRegion(final byte[] regionName
) {
2989 String encodedRegionName
= RegionInfo
.encodeRegionName(regionName
);
2990 return this.onlineRegions
.get(encodedRegionName
);
2994 public HRegion
getRegion(final String encodedRegionName
) {
2995 return this.onlineRegions
.get(encodedRegionName
);
3000 public boolean removeRegion(final HRegion r
, ServerName destination
) {
3001 HRegion toReturn
= this.onlineRegions
.remove(r
.getRegionInfo().getEncodedName());
3002 metricsRegionServerImpl
.requestsCountCache
.remove(r
.getRegionInfo().getEncodedName());
3003 if (destination
!= null) {
3004 long closeSeqNum
= r
.getMaxFlushedSeqId();
3005 if (closeSeqNum
== HConstants
.NO_SEQNUM
) {
3006 // No edits in WAL for this region; get the sequence number when the region was opened.
3007 closeSeqNum
= r
.getOpenSeqNum();
3008 if (closeSeqNum
== HConstants
.NO_SEQNUM
) {
3012 boolean selfMove
= ServerName
.isSameAddress(destination
, this.getServerName());
3013 addToMovedRegions(r
.getRegionInfo().getEncodedName(), destination
, closeSeqNum
, selfMove
);
3015 this.regionServerAccounting
.getRetainedRegionRWRequestsCnt().put(
3016 r
.getRegionInfo().getEncodedName(),
3017 new Pair
<>(r
.getReadRequestsCount(), r
.getWriteRequestsCount()));
3020 this.regionFavoredNodesMap
.remove(r
.getRegionInfo().getEncodedName());
3021 configurationManager
.deregisterObserver(r
);
3022 return toReturn
!= null;
3026 * Protected Utility method for safely obtaining an HRegion handle.
3028 * @param regionName Name of online {@link HRegion} to return
3029 * @return {@link HRegion} for <code>regionName</code>
3031 protected HRegion
getRegion(final byte[] regionName
)
3032 throws NotServingRegionException
{
3033 String encodedRegionName
= RegionInfo
.encodeRegionName(regionName
);
3034 return getRegionByEncodedName(regionName
, encodedRegionName
);
3037 public HRegion
getRegionByEncodedName(String encodedRegionName
)
3038 throws NotServingRegionException
{
3039 return getRegionByEncodedName(null, encodedRegionName
);
3042 private HRegion
getRegionByEncodedName(byte[] regionName
, String encodedRegionName
)
3043 throws NotServingRegionException
{
3044 HRegion region
= this.onlineRegions
.get(encodedRegionName
);
3045 if (region
== null) {
3046 MovedRegionInfo moveInfo
= getMovedRegion(encodedRegionName
);
3047 if (moveInfo
!= null) {
3048 throw new RegionMovedException(moveInfo
.getServerName(), moveInfo
.getSeqNum());
3050 Boolean isOpening
= this.regionsInTransitionInRS
.get(Bytes
.toBytes(encodedRegionName
));
3051 String regionNameStr
= regionName
== null?
3052 encodedRegionName
: Bytes
.toStringBinary(regionName
);
3053 if (isOpening
!= null && isOpening
) {
3054 throw new RegionOpeningException("Region " + regionNameStr
+
3055 " is opening on " + this.serverName
);
3057 throw new NotServingRegionException("" + regionNameStr
+
3058 " is not online on " + this.serverName
);
3064 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3065 * IOE if it isn't already.
3067 * @param t Throwable
3068 * @param msg Message to log in error. Can be null.
3069 * @return Throwable converted to an IOE; methods can only let out IOEs.
3071 private Throwable
cleanup(final Throwable t
, final String msg
) {
3072 // Don't log as error if NSRE; NSRE is 'normal' operation.
3073 if (t
instanceof NotServingRegionException
) {
3074 LOG
.debug("NotServingRegionException; " + t
.getMessage());
3077 Throwable e
= t
instanceof RemoteException ?
((RemoteException
) t
).unwrapRemoteException() : t
;
3083 if (!rpcServices
.checkOOME(t
)) {
3090 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3091 * @return Make <code>t</code> an IOE if it isn't already.
3093 private IOException
convertThrowableToIOE(final Throwable t
, final String msg
) {
3094 return (t
instanceof IOException ?
(IOException
) t
: msg
== null
3095 || msg
.length() == 0 ?
new IOException(t
) : new IOException(msg
, t
));
3099 * Checks to see if the file system is still accessible. If not, sets
3100 * abortRequested and stopRequested
3102 * @return false if file system is not available
3104 boolean checkFileSystem() {
3105 if (this.dataFsOk
&& this.dataFs
!= null) {
3107 FSUtils
.checkFileSystemAvailable(this.dataFs
);
3108 } catch (IOException e
) {
3109 abort("File System not available", e
);
3110 this.dataFsOk
= false;
3113 return this.dataFsOk
;
3117 public void updateRegionFavoredNodesMapping(String encodedRegionName
,
3118 List
<org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.ServerName
> favoredNodes
) {
3119 Address
[] addr
= new Address
[favoredNodes
.size()];
3120 // Refer to the comment on the declaration of regionFavoredNodesMap on why
3121 // it is a map of region name to Address[]
3122 for (int i
= 0; i
< favoredNodes
.size(); i
++) {
3123 addr
[i
] = Address
.fromParts(favoredNodes
.get(i
).getHostName(),
3124 favoredNodes
.get(i
).getPort());
3126 regionFavoredNodesMap
.put(encodedRegionName
, addr
);
3130 * Return the favored nodes for a region given its encoded name. Look at the
3131 * comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[]
3133 * @param encodedRegionName the encoded region name.
3134 * @return array of favored locations
3137 public InetSocketAddress
[] getFavoredNodesForRegion(String encodedRegionName
) {
3138 return Address
.toSocketAddress(regionFavoredNodesMap
.get(encodedRegionName
));
3142 public ServerNonceManager
getNonceManager() {
3143 return this.nonceManager
;
3146 private static class MovedRegionInfo
{
3147 private final ServerName serverName
;
3148 private final long seqNum
;
3150 MovedRegionInfo(ServerName serverName
, long closeSeqNum
) {
3151 this.serverName
= serverName
;
3152 this.seqNum
= closeSeqNum
;
3155 public ServerName
getServerName() {
3159 public long getSeqNum() {
3165 * We need a timeout. If not there is a risk of giving a wrong information: this would double
3166 * the number of network calls instead of reducing them.
3168 private static final int TIMEOUT_REGION_MOVED
= (2 * 60 * 1000);
3170 private void addToMovedRegions(String encodedName
, ServerName destination
,
3171 long closeSeqNum
, boolean selfMove
) {
3173 LOG
.warn("Not adding moved region record: " + encodedName
+ " to self.");
3176 LOG
.info("Adding " + encodedName
+ " move to " + destination
+ " record at close sequenceid=" +
3178 movedRegionInfoCache
.put(encodedName
, new MovedRegionInfo(destination
, closeSeqNum
));
3181 void removeFromMovedRegions(String encodedName
) {
3182 movedRegionInfoCache
.invalidate(encodedName
);
3185 @InterfaceAudience.Private
3186 public MovedRegionInfo
getMovedRegion(String encodedRegionName
) {
3187 return movedRegionInfoCache
.getIfPresent(encodedRegionName
);
3190 @InterfaceAudience.Private
3191 public int movedRegionCacheExpiredTime() {
3192 return TIMEOUT_REGION_MOVED
;
3195 private String
getMyEphemeralNodePath() {
3196 return zooKeeper
.getZNodePaths().getRsPath(serverName
);
3199 private boolean isHealthCheckerConfigured() {
3200 String healthScriptLocation
= this.conf
.get(HConstants
.HEALTH_SCRIPT_LOC
);
3201 return org
.apache
.commons
.lang3
.StringUtils
.isNotBlank(healthScriptLocation
);
3205 * @return the underlying {@link CompactSplit} for the servers
3207 public CompactSplit
getCompactSplitThread() {
3208 return this.compactSplitThread
;
3211 CoprocessorServiceResponse
execRegionServerService(
3212 @SuppressWarnings("UnusedParameters") final RpcController controller
,
3213 final CoprocessorServiceRequest serviceRequest
) throws ServiceException
{
3215 ServerRpcController serviceController
= new ServerRpcController();
3216 CoprocessorServiceCall call
= serviceRequest
.getCall();
3217 String serviceName
= call
.getServiceName();
3218 Service service
= coprocessorServiceHandlers
.get(serviceName
);
3219 if (service
== null) {
3220 throw new UnknownProtocolException(null,
3221 "No registered coprocessor executorService found for " + serviceName
);
3223 ServiceDescriptor serviceDesc
=
3224 service
.getDescriptorForType();
3226 String methodName
= call
.getMethodName();
3227 MethodDescriptor methodDesc
=
3228 serviceDesc
.findMethodByName(methodName
);
3229 if (methodDesc
== null) {
3230 throw new UnknownProtocolException(service
.getClass(), "Unknown method " + methodName
+
3231 " called on executorService " + serviceName
);
3235 CoprocessorRpcUtils
.getRequest(service
, methodDesc
, call
.getRequest());
3236 final Message
.Builder responseBuilder
=
3237 service
.getResponsePrototype(methodDesc
).newBuilderForType();
3238 service
.callMethod(methodDesc
, serviceController
, request
, message
-> {
3239 if (message
!= null) {
3240 responseBuilder
.mergeFrom(message
);
3243 IOException exception
= CoprocessorRpcUtils
.getControllerException(serviceController
);
3244 if (exception
!= null) {
3247 return CoprocessorRpcUtils
.getResponse(responseBuilder
.build(), HConstants
.EMPTY_BYTE_ARRAY
);
3248 } catch (IOException ie
) {
3249 throw new ServiceException(ie
);
3254 * May be null if this is a master which not carry table.
3256 * @return The block cache instance used by the regionserver.
3259 public Optional
<BlockCache
> getBlockCache() {
3260 return Optional
.ofNullable(this.blockCache
);
3264 * May be null if this is a master which not carry table.
3266 * @return The cache for mob files used by the regionserver.
3269 public Optional
<MobFileCache
> getMobFileCache() {
3270 return Optional
.ofNullable(this.mobFileCache
);
3274 * @return : Returns the ConfigurationManager object for testing purposes.
3276 ConfigurationManager
getConfigurationManager() {
3277 return configurationManager
;
3280 CacheEvictionStats
clearRegionBlockCache(Region region
) {
3281 long evictedBlocks
= 0;
3283 for(Store store
: region
.getStores()) {
3284 for(StoreFile hFile
: store
.getStorefiles()) {
3285 evictedBlocks
+= blockCache
.evictBlocksByHfileName(hFile
.getPath().getName());
3289 return CacheEvictionStats
.builder()
3290 .withEvictedBlocks(evictedBlocks
)
3295 public double getCompactionPressure() {
3297 for (Region region
: onlineRegions
.values()) {
3298 for (Store store
: region
.getStores()) {
3299 double normCount
= store
.getCompactionPressure();
3300 if (normCount
> max
) {
3309 public HeapMemoryManager
getHeapMemoryManager() {
3313 public MemStoreFlusher
getMemStoreFlusher() {
3314 return cacheFlusher
;
3319 * @return whether all wal roll request finished for this regionserver
3321 @InterfaceAudience.Private
3322 public boolean walRollRequestFinished() {
3323 return this.walRoller
.walRollFinished();
3327 public ThroughputController
getFlushThroughputController() {
3328 return flushThroughputController
;
3332 public double getFlushPressure() {
3333 if (getRegionServerAccounting() == null || cacheFlusher
== null) {
3334 // return 0 during RS initialization
3337 return getRegionServerAccounting().getFlushPressure();
3341 public void onConfigurationChange(Configuration newConf
) {
3342 ThroughputController old
= this.flushThroughputController
;
3344 old
.stop("configuration change");
3346 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, newConf
);
3348 Superusers
.initialize(newConf
);
3349 } catch (IOException e
) {
3350 LOG
.warn("Failed to initialize SuperUsers on reloading of the configuration");
3353 // update region server coprocessor if the configuration has changed.
3354 if (CoprocessorConfigurationUtil
.checkConfigurationChange(getConfiguration(), newConf
,
3355 CoprocessorHost
.REGIONSERVER_COPROCESSOR_CONF_KEY
)) {
3356 LOG
.info("Update region server coprocessors because the configuration has changed");
3357 this.rsHost
= new RegionServerCoprocessorHost(this, newConf
);
3362 public MetricsRegionServer
getMetrics() {
3363 return metricsRegionServer
;
3367 public SecureBulkLoadManager
getSecureBulkLoadManager() {
3368 return this.secureBulkLoadManager
;
3372 public EntityLock
regionLock(final List
<RegionInfo
> regionInfo
, final String description
,
3373 final Abortable abort
) {
3374 final LockServiceClient client
=
3375 new LockServiceClient(conf
, lockStub
, asyncClusterConnection
.getNonceGenerator());
3376 return client
.regionLock(regionInfo
, description
, abort
);
3380 public void unassign(byte[] regionName
) throws IOException
{
3381 FutureUtils
.get(asyncClusterConnection
.getAdmin().unassign(regionName
, false));
3385 public RegionServerSpaceQuotaManager
getRegionServerSpaceQuotaManager() {
3386 return this.rsSpaceQuotaManager
;
3390 public boolean reportFileArchivalForQuotas(TableName tableName
,
3391 Collection
<Entry
<String
, Long
>> archivedFiles
) {
3392 if (TEST_SKIP_REPORTING_TRANSITION
) {
3395 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
3396 if (rss
== null || rsSpaceQuotaManager
== null) {
3397 // the current server could be stopping.
3398 LOG
.trace("Skipping file archival reporting to HMaster as stub is null");
3402 RegionServerStatusProtos
.FileArchiveNotificationRequest request
=
3403 rsSpaceQuotaManager
.buildFileArchiveRequest(tableName
, archivedFiles
);
3404 rss
.reportFileArchival(null, request
);
3405 } catch (ServiceException se
) {
3406 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
3407 if (ioe
instanceof PleaseHoldException
) {
3408 if (LOG
.isTraceEnabled()) {
3409 LOG
.trace("Failed to report file archival(s) to Master because it is initializing."
3410 + " This will be retried.", ioe
);
3412 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3415 if (rssStub
== rss
) {
3418 // re-create the stub if we failed to report the archival
3419 createRegionServerStatusStub(true);
3420 LOG
.debug("Failed to report file archival(s) to Master. This will be retried.", ioe
);
3426 void executeProcedure(long procId
, RSProcedureCallable callable
) {
3427 executorService
.submit(new RSProcedureHandler(this, procId
, callable
));
3430 public void remoteProcedureComplete(long procId
, Throwable error
) {
3431 procedureResultReporter
.complete(procId
, error
);
3434 void reportProcedureDone(ReportProcedureDoneRequest request
) throws IOException
{
3435 RegionServerStatusService
.BlockingInterface rss
;
3436 // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3442 createRegionServerStatusStub();
3445 rss
.reportProcedureDone(null, request
);
3446 } catch (ServiceException se
) {
3447 if (rssStub
== rss
) {
3450 throw ProtobufUtil
.getRemoteException(se
);
3455 * Will ignore the open/close region procedures which already submitted or executed.
3457 * When master had unfinished open/close region procedure and restarted, new active master may
3458 * send duplicate open/close region request to regionserver. The open/close request is submitted
3459 * to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3461 * After the open/close region request executed and report region transition succeed, cache it in
3462 * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3463 * transition succeed, master will not send the open/close region request to regionserver again.
3464 * And we thought that the ongoing duplicate open/close region request should not be delayed more
3465 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
3467 * See HBASE-22404 for more details.
3469 * @param procId the id of the open/close region procedure
3470 * @return true if the procedure can be submitted.
3472 boolean submitRegionProcedure(long procId
) {
3476 // Ignore the region procedures which already submitted.
3477 Long previous
= submittedRegionProcedures
.putIfAbsent(procId
, procId
);
3478 if (previous
!= null) {
3479 LOG
.warn("Received procedure pid={}, which already submitted, just ignore it", procId
);
3482 // Ignore the region procedures which already executed.
3483 if (executedRegionProcedures
.getIfPresent(procId
) != null) {
3484 LOG
.warn("Received procedure pid={}, which already executed, just ignore it", procId
);
3491 * See {@link #submitRegionProcedure(long)}.
3492 * @param procId the id of the open/close region procedure
3494 public void finishRegionProcedure(long procId
) {
3495 executedRegionProcedures
.put(procId
, procId
);
3496 submittedRegionProcedures
.remove(procId
);
3500 * Force to terminate region server when abort timeout.
3502 private static class SystemExitWhenAbortTimeout
extends TimerTask
{
3504 public SystemExitWhenAbortTimeout() {
3509 LOG
.warn("Aborting region server timed out, terminating forcibly" +
3510 " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3511 " Thread dump to stdout.");
3512 Threads
.printThreadInfo(System
.out
, "Zombie HRegionServer");
3513 Runtime
.getRuntime().halt(1);
3517 @InterfaceAudience.Private
3518 public CompactedHFilesDischarger
getCompactedHFilesDischarger() {
3519 return compactedFileDischarger
;
3523 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3524 * @return pause time
3526 @InterfaceAudience.Private
3527 public long getRetryPauseTime() {
3528 return this.retryPauseTime
;
3532 public Optional
<ServerName
> getActiveMaster() {
3533 return Optional
.ofNullable(masterAddressTracker
.getMasterAddress());
3537 public List
<ServerName
> getBackupMasters() {
3538 return masterAddressTracker
.getBackupMasters();
3542 public Iterator
<ServerName
> getBootstrapNodes() {
3543 return bootstrapNodeManager
.getBootstrapNodes().iterator();
3547 public List
<HRegionLocation
> getMetaLocations() {
3548 return metaRegionLocationCache
.getMetaRegionLocations();
3552 protected NamedQueueRecorder
createNamedQueueRecord() {
3553 final boolean isOnlineLogProviderEnabled
= conf
.getBoolean(
3554 HConstants
.SLOW_LOG_BUFFER_ENABLED_KEY
, HConstants
.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED
);
3555 if (isOnlineLogProviderEnabled
) {
3556 return NamedQueueRecorder
.getInstance(conf
);
3563 protected boolean clusterMode() {
3564 // this method will be called in the constructor of super class, so we can not return masterless
3565 // directly here, as it will always be false.
3566 return !conf
.getBoolean(MASTERLESS_CONFIG_NAME
, false);
3569 @InterfaceAudience.Private
3570 public BrokenStoreFileCleaner
getBrokenStoreFileCleaner(){
3571 return brokenStoreFileCleaner
;
3574 RSSnapshotVerifier
getRsSnapshotVerifier() {
3575 return rsSnapshotVerifier
;
3579 protected void stopChores() {
3580 shutdownChore(nonceManagerChore
);
3581 shutdownChore(compactionChecker
);
3582 shutdownChore(compactedFileDischarger
);
3583 shutdownChore(periodicFlusher
);
3584 shutdownChore(healthCheckChore
);
3585 shutdownChore(executorStatusChore
);
3586 shutdownChore(storefileRefresher
);
3587 shutdownChore(fsUtilizationChore
);
3588 shutdownChore(slowLogTableOpsChore
);
3589 shutdownChore(brokenStoreFileCleaner
);
3593 public RegionReplicationBufferManager
getRegionReplicationBufferManager() {
3594 return regionReplicationBufferManager
;