2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER
;
22 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_COORDINATED_BY_ZK
;
23 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_MAX_SPLITTER
;
24 import static org
.apache
.hadoop
.hbase
.util
.DNS
.UNSAFE_RS_HOSTNAME_KEY
;
26 import java
.io
.IOException
;
27 import java
.lang
.management
.MemoryType
;
28 import java
.lang
.management
.MemoryUsage
;
29 import java
.lang
.reflect
.Constructor
;
30 import java
.net
.BindException
;
31 import java
.net
.InetAddress
;
32 import java
.net
.InetSocketAddress
;
33 import java
.time
.Duration
;
34 import java
.util
.ArrayList
;
35 import java
.util
.Collection
;
36 import java
.util
.Collections
;
37 import java
.util
.Comparator
;
38 import java
.util
.HashSet
;
39 import java
.util
.List
;
41 import java
.util
.Map
.Entry
;
42 import java
.util
.Objects
;
43 import java
.util
.Optional
;
45 import java
.util
.SortedMap
;
46 import java
.util
.Timer
;
47 import java
.util
.TimerTask
;
48 import java
.util
.TreeMap
;
49 import java
.util
.TreeSet
;
50 import java
.util
.concurrent
.ConcurrentHashMap
;
51 import java
.util
.concurrent
.ConcurrentMap
;
52 import java
.util
.concurrent
.ConcurrentSkipListMap
;
53 import java
.util
.concurrent
.TimeUnit
;
54 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
55 import java
.util
.concurrent
.locks
.ReentrantReadWriteLock
;
56 import java
.util
.stream
.Collectors
;
57 import javax
.management
.MalformedObjectNameException
;
58 import javax
.servlet
.http
.HttpServlet
;
59 import org
.apache
.commons
.lang3
.RandomUtils
;
60 import org
.apache
.commons
.lang3
.StringUtils
;
61 import org
.apache
.commons
.lang3
.SystemUtils
;
62 import org
.apache
.hadoop
.conf
.Configuration
;
63 import org
.apache
.hadoop
.fs
.FileSystem
;
64 import org
.apache
.hadoop
.fs
.Path
;
65 import org
.apache
.hadoop
.hbase
.Abortable
;
66 import org
.apache
.hadoop
.hbase
.CacheEvictionStats
;
67 import org
.apache
.hadoop
.hbase
.CallQueueTooBigException
;
68 import org
.apache
.hadoop
.hbase
.ChoreService
;
69 import org
.apache
.hadoop
.hbase
.ClockOutOfSyncException
;
70 import org
.apache
.hadoop
.hbase
.CoordinatedStateManager
;
71 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
72 import org
.apache
.hadoop
.hbase
.ExecutorStatusChore
;
73 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
74 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
75 import org
.apache
.hadoop
.hbase
.HConstants
;
76 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
77 import org
.apache
.hadoop
.hbase
.HealthCheckChore
;
78 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
79 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
80 import org
.apache
.hadoop
.hbase
.PleaseHoldException
;
81 import org
.apache
.hadoop
.hbase
.ScheduledChore
;
82 import org
.apache
.hadoop
.hbase
.ServerName
;
83 import org
.apache
.hadoop
.hbase
.Stoppable
;
84 import org
.apache
.hadoop
.hbase
.TableDescriptors
;
85 import org
.apache
.hadoop
.hbase
.TableName
;
86 import org
.apache
.hadoop
.hbase
.YouAreDeadException
;
87 import org
.apache
.hadoop
.hbase
.ZNodeClearer
;
88 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
89 import org
.apache
.hadoop
.hbase
.client
.ClusterConnectionFactory
;
90 import org
.apache
.hadoop
.hbase
.client
.Connection
;
91 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
92 import org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
;
93 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
94 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
95 import org
.apache
.hadoop
.hbase
.client
.locking
.EntityLock
;
96 import org
.apache
.hadoop
.hbase
.client
.locking
.LockServiceClient
;
97 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationManager
;
98 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationObserver
;
99 import org
.apache
.hadoop
.hbase
.coordination
.ZkCoordinatedStateManager
;
100 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
101 import org
.apache
.hadoop
.hbase
.exceptions
.RegionMovedException
;
102 import org
.apache
.hadoop
.hbase
.exceptions
.RegionOpeningException
;
103 import org
.apache
.hadoop
.hbase
.exceptions
.UnknownProtocolException
;
104 import org
.apache
.hadoop
.hbase
.executor
.ExecutorService
;
105 import org
.apache
.hadoop
.hbase
.executor
.ExecutorService
.ExecutorConfig
;
106 import org
.apache
.hadoop
.hbase
.executor
.ExecutorType
;
107 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
;
108 import org
.apache
.hadoop
.hbase
.http
.InfoServer
;
109 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
110 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCacheFactory
;
111 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
112 import org
.apache
.hadoop
.hbase
.io
.util
.MemorySizeUtil
;
113 import org
.apache
.hadoop
.hbase
.ipc
.CoprocessorRpcUtils
;
114 import org
.apache
.hadoop
.hbase
.ipc
.NettyRpcClientConfigHelper
;
115 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
116 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
117 import org
.apache
.hadoop
.hbase
.ipc
.RpcServerInterface
;
118 import org
.apache
.hadoop
.hbase
.ipc
.ServerNotRunningYetException
;
119 import org
.apache
.hadoop
.hbase
.ipc
.ServerRpcController
;
120 import org
.apache
.hadoop
.hbase
.log
.HBaseMarkers
;
121 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
122 import org
.apache
.hadoop
.hbase
.master
.LoadBalancer
;
123 import org
.apache
.hadoop
.hbase
.master
.MasterRpcServicesVersionWrapper
;
124 import org
.apache
.hadoop
.hbase
.master
.RegionState
;
125 import org
.apache
.hadoop
.hbase
.master
.balancer
.BaseLoadBalancer
;
126 import org
.apache
.hadoop
.hbase
.mob
.MobFileCache
;
127 import org
.apache
.hadoop
.hbase
.namequeues
.NamedQueueRecorder
;
128 import org
.apache
.hadoop
.hbase
.namequeues
.SlowLogTableOpsChore
;
129 import org
.apache
.hadoop
.hbase
.net
.Address
;
130 import org
.apache
.hadoop
.hbase
.procedure
.RegionServerProcedureManagerHost
;
131 import org
.apache
.hadoop
.hbase
.procedure2
.RSProcedureCallable
;
132 import org
.apache
.hadoop
.hbase
.quotas
.FileSystemUtilizationChore
;
133 import org
.apache
.hadoop
.hbase
.quotas
.QuotaUtil
;
134 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerRpcQuotaManager
;
135 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerSpaceQuotaManager
;
136 import org
.apache
.hadoop
.hbase
.quotas
.RegionSize
;
137 import org
.apache
.hadoop
.hbase
.quotas
.RegionSizeStore
;
138 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionConfiguration
;
139 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionLifeCycleTracker
;
140 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionProgress
;
141 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequester
;
142 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseMetaHandler
;
143 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseRegionHandler
;
144 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.RSProcedureHandler
;
145 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.RegionReplicaFlushHandler
;
146 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.FlushThroughputControllerFactory
;
147 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.ThroughputController
;
148 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationLoad
;
149 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationSourceInterface
;
150 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationStatus
;
151 import org
.apache
.hadoop
.hbase
.security
.SecurityConstants
;
152 import org
.apache
.hadoop
.hbase
.security
.Superusers
;
153 import org
.apache
.hadoop
.hbase
.security
.User
;
154 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
155 import org
.apache
.hadoop
.hbase
.security
.access
.AccessChecker
;
156 import org
.apache
.hadoop
.hbase
.security
.access
.ZKPermissionWatcher
;
157 import org
.apache
.hadoop
.hbase
.trace
.SpanReceiverHost
;
158 import org
.apache
.hadoop
.hbase
.trace
.TraceUtil
;
159 import org
.apache
.hadoop
.hbase
.util
.Addressing
;
160 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
161 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
162 import org
.apache
.hadoop
.hbase
.util
.CompressionTest
;
163 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
164 import org
.apache
.hadoop
.hbase
.util
.FSTableDescriptors
;
165 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
166 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
167 import org
.apache
.hadoop
.hbase
.util
.JvmPauseMonitor
;
168 import org
.apache
.hadoop
.hbase
.util
.NettyEventLoopGroupConfig
;
169 import org
.apache
.hadoop
.hbase
.util
.Pair
;
170 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
;
171 import org
.apache
.hadoop
.hbase
.util
.RetryCounterFactory
;
172 import org
.apache
.hadoop
.hbase
.util
.ServerRegionReplicaUtil
;
173 import org
.apache
.hadoop
.hbase
.util
.Sleeper
;
174 import org
.apache
.hadoop
.hbase
.util
.Threads
;
175 import org
.apache
.hadoop
.hbase
.util
.VersionInfo
;
176 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
177 import org
.apache
.hadoop
.hbase
.wal
.NettyAsyncFSWALConfigHelper
;
178 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
179 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
180 import org
.apache
.hadoop
.hbase
.zookeeper
.ClusterStatusTracker
;
181 import org
.apache
.hadoop
.hbase
.zookeeper
.MasterAddressTracker
;
182 import org
.apache
.hadoop
.hbase
.zookeeper
.MetaTableLocator
;
183 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKClusterId
;
184 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKNodeTracker
;
185 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
186 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
187 import org
.apache
.hadoop
.hbase
.zookeeper
.ZNodePaths
;
188 import org
.apache
.hadoop
.ipc
.RemoteException
;
189 import org
.apache
.hadoop
.util
.ReflectionUtils
;
190 import org
.apache
.yetus
.audience
.InterfaceAudience
;
191 import org
.apache
.zookeeper
.KeeperException
;
192 import org
.slf4j
.Logger
;
193 import org
.slf4j
.LoggerFactory
;
194 import sun
.misc
.Signal
;
196 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
197 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
198 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.Cache
;
199 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.CacheBuilder
;
200 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
201 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.BlockingRpcChannel
;
202 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.MethodDescriptor
;
203 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.ServiceDescriptor
;
204 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
205 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
206 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Service
;
207 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
208 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.TextFormat
;
209 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.UnsafeByteOperations
;
211 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
212 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
213 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceCall
;
214 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceRequest
;
215 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceResponse
;
216 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
;
217 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionLoad
;
218 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionStoreSequenceIds
;
219 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.UserLoad
;
220 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
;
221 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
.Builder
;
222 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.NameStringPair
;
223 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionServerInfo
;
224 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
;
225 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
.RegionSpecifierType
;
226 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.LockServiceProtos
.LockService
;
227 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
;
228 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdRequest
;
229 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdResponse
;
230 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerReportRequest
;
231 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupRequest
;
232 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupResponse
;
233 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStatusService
;
234 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionSpaceUse
;
235 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionSpaceUseReportRequest
;
236 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
;
237 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
.TransitionCode
;
238 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportProcedureDoneRequest
;
239 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRSFatalErrorRequest
;
240 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionRequest
;
241 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionResponse
;
244 * HRegionServer makes a set of HRegions available to clients. It checks in with
245 * the HMaster. There are many HRegionServers in a single HBase deployment.
247 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
248 @SuppressWarnings({ "deprecation"})
249 public class HRegionServer
extends Thread
implements
250 RegionServerServices
, LastSequenceId
, ConfigurationObserver
{
251 private static final Logger LOG
= LoggerFactory
.getLogger(HRegionServer
.class);
254 * For testing only! Set to true to skip notifying region assignment to master .
256 @InterfaceAudience.Private
257 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="MS_SHOULD_BE_FINAL")
258 public static boolean TEST_SKIP_REPORTING_TRANSITION
= false;
261 * A map from RegionName to current action in progress. Boolean value indicates:
262 * true - if open region action in progress
263 * false - if close region action in progress
265 private final ConcurrentMap
<byte[], Boolean
> regionsInTransitionInRS
=
266 new ConcurrentSkipListMap
<>(Bytes
.BYTES_COMPARATOR
);
269 * Used to cache the open/close region procedures which already submitted.
270 * See {@link #submitRegionProcedure(long)}.
272 private final ConcurrentMap
<Long
, Long
> submittedRegionProcedures
= new ConcurrentHashMap
<>();
274 * Used to cache the open/close region procedures which already executed.
275 * See {@link #submitRegionProcedure(long)}.
277 private final Cache
<Long
, Long
> executedRegionProcedures
=
278 CacheBuilder
.newBuilder().expireAfterAccess(600, TimeUnit
.SECONDS
).build();
281 * Used to cache the moved-out regions
283 private final Cache
<String
, MovedRegionInfo
> movedRegionInfoCache
=
284 CacheBuilder
.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(),
285 TimeUnit
.MILLISECONDS
).build();
287 private MemStoreFlusher cacheFlusher
;
289 private HeapMemoryManager hMemManager
;
292 * The asynchronous cluster connection to be shared by services.
294 protected AsyncClusterConnection asyncClusterConnection
;
297 * Go here to get table descriptors.
299 protected TableDescriptors tableDescriptors
;
301 // Replication services. If no replication, this handler will be null.
302 private ReplicationSourceService replicationSourceHandler
;
303 private ReplicationSinkService replicationSinkHandler
;
304 private boolean sameReplicationSourceAndSink
;
307 public CompactSplit compactSplitThread
;
310 * Map of regions currently being served by this region server. Key is the
311 * encoded region name. All access should be synchronized.
313 private final Map
<String
, HRegion
> onlineRegions
= new ConcurrentHashMap
<>();
315 * Lock for gating access to {@link #onlineRegions}.
316 * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap?
318 private final ReentrantReadWriteLock onlineRegionsLock
= new ReentrantReadWriteLock();
321 * Map of encoded region names to the DataNode locations they should be hosted on
322 * We store the value as Address since InetSocketAddress is required by the HDFS
323 * API (create() that takes favored nodes as hints for placing file blocks).
324 * We could have used ServerName here as the value class, but we'd need to
325 * convert it to InetSocketAddress at some point before the HDFS API call, and
326 * it seems a bit weird to store ServerName since ServerName refers to RegionServers
327 * and here we really mean DataNode locations. We don't store it as InetSocketAddress
328 * here because the conversion on demand from Address to InetSocketAddress will
329 * guarantee the resolution results will be fresh when we need it.
331 private final Map
<String
, Address
[]> regionFavoredNodesMap
= new ConcurrentHashMap
<>();
333 private LeaseManager leaseManager
;
335 // Instance of the hbase executor executorService.
336 protected ExecutorService executorService
;
338 private volatile boolean dataFsOk
;
339 private HFileSystem dataFs
;
340 private HFileSystem walFs
;
342 // Set when a report to the master comes back with a message asking us to
343 // shutdown. Also set by call to stop when debugging or running unit tests
344 // of HRegionServer in isolation.
345 private volatile boolean stopped
= false;
347 // Go down hard. Used if file system becomes unavailable and also in
348 // debugging and unit tests.
349 private AtomicBoolean abortRequested
;
350 static final String ABORT_TIMEOUT
= "hbase.regionserver.abort.timeout";
351 // Default abort timeout is 1200 seconds for safe
352 private static final long DEFAULT_ABORT_TIMEOUT
= 1200000;
353 // Will run this task when abort timeout
354 static final String ABORT_TIMEOUT_TASK
= "hbase.regionserver.abort.timeout.task";
356 // A state before we go into stopped state. At this stage we're closing user
358 private boolean stopping
= false;
359 private volatile boolean killed
= false;
360 private volatile boolean shutDown
= false;
362 protected final Configuration conf
;
364 private Path dataRootDir
;
365 private Path walRootDir
;
367 private final int threadWakeFrequency
;
368 final int msgInterval
;
370 private static final String PERIOD_COMPACTION
= "hbase.regionserver.compaction.check.period";
371 private final int compactionCheckFrequency
;
372 private static final String PERIOD_FLUSH
= "hbase.regionserver.flush.check.period";
373 private final int flushCheckFrequency
;
375 // Stub to do region server status calls against the master.
376 private volatile RegionServerStatusService
.BlockingInterface rssStub
;
377 private volatile LockService
.BlockingInterface lockStub
;
378 // RPC client. Used to make the stub above that does region server status checking.
379 private RpcClient rpcClient
;
381 private UncaughtExceptionHandler uncaughtExceptionHandler
;
383 // Info server. Default access so can be used by unit tests. REGIONSERVER
384 // is name of the webapp and the attribute name used stuffing this instance
386 protected InfoServer infoServer
;
387 private JvmPauseMonitor pauseMonitor
;
389 /** region server process name */
390 public static final String REGIONSERVER
= "regionserver";
393 private MetricsRegionServer metricsRegionServer
;
394 MetricsRegionServerWrapperImpl metricsRegionServerImpl
;
395 private SpanReceiverHost spanReceiverHost
;
398 * ChoreService used to schedule tasks that we want to run periodically
400 private ChoreService choreService
;
403 * Check for compactions requests.
405 private ScheduledChore compactionChecker
;
410 private ScheduledChore periodicFlusher
;
412 private volatile WALFactory walFactory
;
414 private LogRoller walRoller
;
416 // A thread which calls reportProcedureDone
417 private RemoteProcedureResultReporter procedureResultReporter
;
419 // flag set after we're done setting up server threads
420 final AtomicBoolean online
= new AtomicBoolean(false);
422 // zookeeper connection and watcher
423 protected final ZKWatcher zooKeeper
;
425 // master address tracker
426 private final MasterAddressTracker masterAddressTracker
;
428 // Cluster Status Tracker
429 protected final ClusterStatusTracker clusterStatusTracker
;
431 // Log Splitting Worker
432 private SplitLogWorker splitLogWorker
;
434 // A sleeper that sleeps for msgInterval.
435 protected final Sleeper sleeper
;
437 private final int shortOperationTimeout
;
439 // Time to pause if master says 'please hold'
440 private final long retryPauseTime
;
442 private final RegionServerAccounting regionServerAccounting
;
444 private SlowLogTableOpsChore slowLogTableOpsChore
= null;
447 private BlockCache blockCache
;
448 // The cache for mob files
449 private MobFileCache mobFileCache
;
451 /** The health check chore. */
452 private HealthCheckChore healthCheckChore
;
454 /** The Executor status collect chore. */
455 private ExecutorStatusChore executorStatusChore
;
457 /** The nonce manager chore. */
458 private ScheduledChore nonceManagerChore
;
460 private Map
<String
, Service
> coprocessorServiceHandlers
= Maps
.newHashMap();
463 * The server name the Master sees us as. Its made from the hostname the
464 * master passes us, port, and server startcode. Gets set after registration
467 protected ServerName serverName
;
470 * hostname specified by hostname config
472 protected String useThisHostnameInstead
;
475 * @deprecated since 2.4.0 and will be removed in 4.0.0.
476 * Use {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
477 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
480 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
481 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
=
482 "hbase.regionserver.hostname.disable.master.reversedns";
485 * HBASE-18226: This config and hbase.unasfe.regionserver.hostname are mutually exclusive.
486 * Exception will be thrown if both are used.
488 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
489 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
=
490 "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
493 * HBASE-24667: This config hbase.regionserver.hostname.disable.master.reversedns will be replaced by
494 * hbase.unsafe.regionserver.hostname.disable.master.reversedns. Keep the old config keys here for backward
498 Configuration
.addDeprecation(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
, UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
);
502 * This servers startcode.
504 protected final long startcode
;
507 * Unique identifier for the cluster we are a part of.
509 protected String clusterId
;
511 // chore for refreshing store files for secondary regions
512 private StorefileRefresherChore storefileRefresher
;
514 private RegionServerCoprocessorHost rsHost
;
516 private RegionServerProcedureManagerHost rspmHost
;
518 private RegionServerRpcQuotaManager rsQuotaManager
;
519 private RegionServerSpaceQuotaManager rsSpaceQuotaManager
;
522 * Nonce manager. Nonces are used to make operations like increment and append idempotent
523 * in the case where client doesn't receive the response from a successful operation and
524 * retries. We track the successful ops for some time via a nonce sent by client and handle
525 * duplicate operations (currently, by failing them; in future we might use MVCC to return
526 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
528 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
529 * of past records. If we don't read the records, we don't read and recover the nonces.
530 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
531 * - There's no WAL recovery during normal region move, so nonces will not be transfered.
532 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
533 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
534 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
535 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
536 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
537 * latest nonce in it expired. It can also be recovered during move.
539 final ServerNonceManager nonceManager
;
541 private UserProvider userProvider
;
543 protected final RSRpcServices rpcServices
;
545 private CoordinatedStateManager csm
;
548 * Configuration manager is used to register/deregister and notify the configuration observers
549 * when the regionserver is notified that there was a change in the on disk configs.
551 protected final ConfigurationManager configurationManager
;
553 @InterfaceAudience.Private
554 CompactedHFilesDischarger compactedFileDischarger
;
556 private volatile ThroughputController flushThroughputController
;
558 private SecureBulkLoadManager secureBulkLoadManager
;
560 private FileSystemUtilizationChore fsUtilizationChore
;
562 private final NettyEventLoopGroupConfig eventLoopGroupConfig
;
565 * Provide online slow log responses from ringbuffer
567 private NamedQueueRecorder namedQueueRecorder
= null;
570 * True if this RegionServer is coming up in a cluster where there is no Master;
571 * means it needs to just come up and make do without a Master to talk to: e.g. in test or
572 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
573 * purpose is as a Replication-stream sink; see HBASE-18846 for more.
574 * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ?
576 private final boolean masterless
;
577 private static final String MASTERLESS_CONFIG_NAME
= "hbase.masterless";
579 /**regionserver codec list **/
580 private static final String REGIONSERVER_CODEC
= "hbase.regionserver.codecs";
582 // A timer to shutdown the process if abort takes too long
583 private Timer abortMonitor
;
586 * Starts a HRegionServer at the default location.
588 * Don't start any services or managers in here in the Constructor.
589 * Defer till after we register with the Master as much as possible. See {@link #startServices}.
591 public HRegionServer(final Configuration conf
) throws IOException
{
592 super("RegionServer"); // thread name
593 TraceUtil
.initTracer(conf
);
595 this.startcode
= System
.currentTimeMillis();
597 this.dataFsOk
= true;
598 this.masterless
= conf
.getBoolean(MASTERLESS_CONFIG_NAME
, false);
599 this.eventLoopGroupConfig
= setupNetty(this.conf
);
600 MemorySizeUtil
.checkForClusterFreeHeapMemoryLimit(this.conf
);
601 HFile
.checkHFileVersion(this.conf
);
602 checkCodecs(this.conf
);
603 this.userProvider
= UserProvider
.instantiate(conf
);
604 FSUtils
.setupShortCircuitRead(this.conf
);
606 // Disable usage of meta replicas in the regionserver
607 this.conf
.setBoolean(HConstants
.USE_META_REPLICAS
, false);
609 this.threadWakeFrequency
= conf
.getInt(HConstants
.THREAD_WAKE_FREQUENCY
, 10 * 1000);
610 this.compactionCheckFrequency
= conf
.getInt(PERIOD_COMPACTION
, this.threadWakeFrequency
);
611 this.flushCheckFrequency
= conf
.getInt(PERIOD_FLUSH
, this.threadWakeFrequency
);
612 this.msgInterval
= conf
.getInt("hbase.regionserver.msginterval", 3 * 1000);
614 this.sleeper
= new Sleeper(this.msgInterval
, this);
616 boolean isNoncesEnabled
= conf
.getBoolean(HConstants
.HBASE_RS_NONCES_ENABLED
, true);
617 this.nonceManager
= isNoncesEnabled ?
new ServerNonceManager(this.conf
) : null;
619 this.shortOperationTimeout
= conf
.getInt(HConstants
.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY
,
620 HConstants
.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT
);
622 this.retryPauseTime
= conf
.getLong(HConstants
.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME
,
623 HConstants
.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME
);
625 this.abortRequested
= new AtomicBoolean(false);
626 this.stopped
= false;
628 initNamedQueueRecorder(conf
);
629 rpcServices
= createRpcServices();
630 useThisHostnameInstead
= getUseThisHostnameInstead(conf
);
632 StringUtils
.isBlank(useThisHostnameInstead
) ?
this.rpcServices
.isa
.getHostName()
633 : this.useThisHostnameInstead
;
634 serverName
= ServerName
.valueOf(hostName
, this.rpcServices
.isa
.getPort(), this.startcode
);
636 // login the zookeeper client principal (if using security)
637 ZKUtil
.loginClient(this.conf
, HConstants
.ZK_CLIENT_KEYTAB_FILE
,
638 HConstants
.ZK_CLIENT_KERBEROS_PRINCIPAL
, hostName
);
639 // login the server principal (if using secure Hadoop)
640 login(userProvider
, hostName
);
641 // init superusers and add the server principal (if using security)
642 // or process owner as default super user.
643 Superusers
.initialize(conf
);
644 regionServerAccounting
= new RegionServerAccounting(conf
);
646 boolean isMasterNotCarryTable
=
647 this instanceof HMaster
&& !LoadBalancer
.isTablesOnMaster(conf
);
649 // no need to instantiate block cache and mob file cache when master not carry table
650 if (!isMasterNotCarryTable
) {
651 blockCache
= BlockCacheFactory
.createBlockCache(conf
);
652 mobFileCache
= new MobFileCache(conf
);
655 uncaughtExceptionHandler
=
656 (t
, e
) -> abort("Uncaught exception in executorService thread " + t
.getName(), e
);
658 initializeFileSystem();
659 spanReceiverHost
= SpanReceiverHost
.getInstance(getConfiguration());
661 this.configurationManager
= new ConfigurationManager();
662 setupWindows(getConfiguration(), getConfigurationManager());
664 // Some unit tests don't need a cluster, so no zookeeper at all
665 // Open connection to zookeeper and set primary watcher
666 zooKeeper
= new ZKWatcher(conf
, getProcessName() + ":" + rpcServices
.isa
.getPort(), this,
667 canCreateBaseZNode());
668 // If no master in cluster, skip trying to track one or look for a cluster status.
669 if (!this.masterless
) {
670 if (conf
.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK
,
671 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
)) {
672 this.csm
= new ZkCoordinatedStateManager(this);
675 masterAddressTracker
= new MasterAddressTracker(getZooKeeper(), this);
676 masterAddressTracker
.start();
678 clusterStatusTracker
= new ClusterStatusTracker(zooKeeper
, this);
679 clusterStatusTracker
.start();
681 masterAddressTracker
= null;
682 clusterStatusTracker
= null;
684 this.rpcServices
.start(zooKeeper
);
685 // This violates 'no starting stuff in Constructor' but Master depends on the below chore
686 // and executor being created and takes a different startup route. Lots of overlap between HRS
687 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
688 // Master expects Constructor to put up web servers. Ugh.
690 this.choreService
= new ChoreService(getName(), true);
691 this.executorService
= new ExecutorService(getName());
693 } catch (Throwable t
) {
694 // Make sure we log the exception. HRegionServer is often started via reflection and the
695 // cause of failed startup is lost.
696 LOG
.error("Failed construction RegionServer", t
);
701 private void initNamedQueueRecorder(Configuration conf
) {
702 if (!(this instanceof HMaster
)) {
703 final boolean isOnlineLogProviderEnabled
= conf
.getBoolean(
704 HConstants
.SLOW_LOG_BUFFER_ENABLED_KEY
,
705 HConstants
.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED
);
706 if (isOnlineLogProviderEnabled
) {
707 this.namedQueueRecorder
= NamedQueueRecorder
.getInstance(this.conf
);
710 final boolean isBalancerDecisionRecording
= conf
711 .getBoolean(BaseLoadBalancer
.BALANCER_DECISION_BUFFER_ENABLED
,
712 BaseLoadBalancer
.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED
);
713 if (isBalancerDecisionRecording
) {
714 this.namedQueueRecorder
= NamedQueueRecorder
.getInstance(this.conf
);
719 // HMaster should override this method to load the specific config for master
720 protected String
getUseThisHostnameInstead(Configuration conf
) throws IOException
{
721 String hostname
= conf
.get(UNSAFE_RS_HOSTNAME_KEY
);
722 if (conf
.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
, false)) {
723 if (!StringUtils
.isBlank(hostname
)) {
724 String msg
= UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
+ " and " + UNSAFE_RS_HOSTNAME_KEY
+
725 " are mutually exclusive. Do not set " + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
+
726 " to true while " + UNSAFE_RS_HOSTNAME_KEY
+ " is used";
727 throw new IOException(msg
);
729 return rpcServices
.isa
.getHostName();
737 * If running on Windows, do windows-specific setup.
739 private static void setupWindows(final Configuration conf
, ConfigurationManager cm
) {
740 if (!SystemUtils
.IS_OS_WINDOWS
) {
741 Signal
.handle(new Signal("HUP"), signal
-> {
742 conf
.reloadConfiguration();
743 cm
.notifyAllObservers(conf
);
748 private static NettyEventLoopGroupConfig
setupNetty(Configuration conf
) {
749 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
750 NettyEventLoopGroupConfig nelgc
=
751 new NettyEventLoopGroupConfig(conf
, "RS-EventLoopGroup");
752 NettyRpcClientConfigHelper
.setEventLoopConfig(conf
, nelgc
.group(), nelgc
.clientChannelClass());
753 NettyAsyncFSWALConfigHelper
.setEventLoopConfig(conf
, nelgc
.group(), nelgc
.clientChannelClass());
757 private void initializeFileSystem() throws IOException
{
758 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
759 // checksum verification enabled, then automatically switch off hdfs checksum verification.
760 boolean useHBaseChecksum
= conf
.getBoolean(HConstants
.HBASE_CHECKSUM_VERIFICATION
, true);
761 String walDirUri
= CommonFSUtils
.getDirUri(this.conf
,
762 new Path(conf
.get(CommonFSUtils
.HBASE_WAL_DIR
, conf
.get(HConstants
.HBASE_DIR
))));
764 if (walDirUri
!= null) {
765 CommonFSUtils
.setFsDefault(this.conf
, walDirUri
);
768 this.walFs
= new HFileSystem(this.conf
, useHBaseChecksum
);
769 this.walRootDir
= CommonFSUtils
.getWALRootDir(this.conf
);
770 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
771 // underlying hadoop hdfs accessors will be going against wrong filesystem
772 // (unless all is set to defaults).
774 CommonFSUtils
.getDirUri(this.conf
, new Path(conf
.get(HConstants
.HBASE_DIR
)));
775 if (rootDirUri
!= null) {
776 CommonFSUtils
.setFsDefault(this.conf
, rootDirUri
);
778 // init the filesystem
779 this.dataFs
= new HFileSystem(this.conf
, useHBaseChecksum
);
780 this.dataRootDir
= CommonFSUtils
.getRootDir(this.conf
);
781 this.tableDescriptors
= new FSTableDescriptors(this.dataFs
, this.dataRootDir
,
782 !canUpdateTableDescriptor(), cacheTableDescriptor());
785 protected void login(UserProvider user
, String host
) throws IOException
{
786 user
.login(SecurityConstants
.REGIONSERVER_KRB_KEYTAB_FILE
,
787 SecurityConstants
.REGIONSERVER_KRB_PRINCIPAL
, host
);
791 * Wait for an active Master.
792 * See override in Master superclass for how it is used.
794 protected void waitForMasterActive() {}
796 protected String
getProcessName() {
800 protected boolean canCreateBaseZNode() {
801 return this.masterless
;
804 protected boolean canUpdateTableDescriptor() {
808 protected boolean cacheTableDescriptor() {
812 protected RSRpcServices
createRpcServices() throws IOException
{
813 return new RSRpcServices(this);
816 protected void configureInfoServer() {
817 infoServer
.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet
.class);
818 infoServer
.setAttribute(REGIONSERVER
, this);
821 protected Class
<?
extends HttpServlet
> getDumpServlet() {
822 return RSDumpServlet
.class;
826 public boolean registerService(Service instance
) {
827 // No stacking of instances is allowed for a single executorService name
828 ServiceDescriptor serviceDesc
= instance
.getDescriptorForType();
829 String serviceName
= CoprocessorRpcUtils
.getServiceName(serviceDesc
);
830 if (coprocessorServiceHandlers
.containsKey(serviceName
)) {
831 LOG
.error("Coprocessor executorService " + serviceName
+
832 " already registered, rejecting request from " + instance
);
836 coprocessorServiceHandlers
.put(serviceName
, instance
);
837 if (LOG
.isDebugEnabled()) {
839 "Registered regionserver coprocessor executorService: executorService=" + serviceName
);
844 private Configuration
cleanupConfiguration() {
845 Configuration conf
= this.conf
;
846 // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
847 // - Decouples RS and master life cycles. RegionServers can continue be up independent of
848 // masters' availability.
849 // - Configuration management for region servers (cluster internal) is much simpler when adding
850 // new masters or removing existing masters, since only clients' config needs to be updated.
851 // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
852 // other internal connections too.
853 conf
.set(HConstants
.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY
,
854 HConstants
.ZK_CONNECTION_REGISTRY_CLASS
);
855 if (conf
.get(HConstants
.CLIENT_ZOOKEEPER_QUORUM
) != null) {
856 // Use server ZK cluster for server-issued connections, so we clone
857 // the conf and unset the client ZK related properties
858 conf
= new Configuration(this.conf
);
859 conf
.unset(HConstants
.CLIENT_ZOOKEEPER_QUORUM
);
865 * Run test on configured codecs to make sure supporting libs are in place.
867 private static void checkCodecs(final Configuration c
) throws IOException
{
868 // check to see if the codec list is available:
869 String
[] codecs
= c
.getStrings(REGIONSERVER_CODEC
, (String
[])null);
870 if (codecs
== null) return;
871 for (String codec
: codecs
) {
872 if (!CompressionTest
.testCompression(codec
)) {
873 throw new IOException("Compression codec " + codec
+
874 " not supported, aborting RS construction");
879 public String
getClusterId() {
880 return this.clusterId
;
884 * Setup our cluster connection if not already initialized.
886 protected final synchronized void setupClusterConnection() throws IOException
{
887 if (asyncClusterConnection
== null) {
888 Configuration conf
= cleanupConfiguration();
889 InetSocketAddress localAddress
= new InetSocketAddress(this.rpcServices
.isa
.getAddress(), 0);
890 User user
= userProvider
.getCurrent();
891 asyncClusterConnection
=
892 ClusterConnectionFactory
.createAsyncClusterConnection(conf
, localAddress
, user
);
897 * All initialization needed before we go register with Master.<br>
898 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
899 * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
901 private void preRegistrationInitialization() {
903 initializeZooKeeper();
904 setupClusterConnection();
905 // Setup RPC client for master communication
906 this.rpcClient
= asyncClusterConnection
.getRpcClient();
907 } catch (Throwable t
) {
908 // Call stop if error or process will stick around for ever since server
909 // puts up non-daemon threads.
910 this.rpcServices
.stop();
911 abort("Initialization of RS failed. Hence aborting RS.", t
);
916 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
917 * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
919 * Finally open long-living server short-circuit connection.
921 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
922 justification
="cluster Id znode read would give us correct response")
923 private void initializeZooKeeper() throws IOException
, InterruptedException
{
924 // Nothing to do in here if no Master in the mix.
925 if (this.masterless
) {
929 // Create the master address tracker, register with zk, and start it. Then
930 // block until a master is available. No point in starting up if no master
932 blockAndCheckIfStopped(this.masterAddressTracker
);
934 // Wait on cluster being up. Master will set this flag up in zookeeper
936 blockAndCheckIfStopped(this.clusterStatusTracker
);
938 // If we are HMaster then the cluster id should have already been set.
939 if (clusterId
== null) {
940 // Retrieve clusterId
941 // Since cluster status is now up
942 // ID should have already been set by HMaster
944 clusterId
= ZKClusterId
.readClusterIdZNode(this.zooKeeper
);
945 if (clusterId
== null) {
946 this.abort("Cluster ID has not been set");
948 LOG
.info("ClusterId : " + clusterId
);
949 } catch (KeeperException e
) {
950 this.abort("Failed to retrieve Cluster ID", e
);
954 waitForMasterActive();
955 if (isStopped() || isAborted()) {
956 return; // No need for further initialization
959 // watch for snapshots and other procedures
961 rspmHost
= new RegionServerProcedureManagerHost();
962 rspmHost
.loadProcedures(conf
);
963 rspmHost
.initialize(this);
964 } catch (KeeperException e
) {
965 this.abort("Failed to reach coordination cluster when creating procedure handler.", e
);
970 * Utilty method to wait indefinitely on a znode availability while checking
971 * if the region server is shut down
972 * @param tracker znode tracker to use
973 * @throws IOException any IO exception, plus if the RS is stopped
974 * @throws InterruptedException if the waiting thread is interrupted
976 private void blockAndCheckIfStopped(ZKNodeTracker tracker
)
977 throws IOException
, InterruptedException
{
978 while (tracker
.blockUntilAvailable(this.msgInterval
, false) == null) {
980 throw new IOException("Received the shutdown message while waiting.");
986 * @return True if the cluster is up.
989 public boolean isClusterUp() {
990 return this.masterless
||
991 (this.clusterStatusTracker
!= null && this.clusterStatusTracker
.isClusterUp());
995 * The HRegionServer sticks in this loop until closed.
1000 LOG
.info("Skipping run; stopped");
1004 // Do pre-registration initializations; zookeeper, lease threads, etc.
1005 preRegistrationInitialization();
1006 } catch (Throwable e
) {
1007 abort("Fatal exception during initialization", e
);
1011 if (!isStopped() && !isAborted()) {
1012 ShutdownHook
.install(conf
, dataFs
, this, Thread
.currentThread());
1013 // Initialize the RegionServerCoprocessorHost now that our ephemeral
1014 // node was created, in case any coprocessors want to use ZooKeeper
1015 this.rsHost
= new RegionServerCoprocessorHost(this, this.conf
);
1017 // Try and register with the Master; tell it we are here. Break if server is stopped or
1018 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
1019 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
1021 LOG
.debug("About to register with Master.");
1022 RetryCounterFactory rcf
=
1023 new RetryCounterFactory(Integer
.MAX_VALUE
, this.sleeper
.getPeriod(), 1000 * 60 * 5);
1024 RetryCounter rc
= rcf
.create();
1025 while (keepLooping()) {
1026 RegionServerStartupResponse w
= reportForDuty();
1028 long sleepTime
= rc
.getBackoffTimeAndIncrementAttempts();
1029 LOG
.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime
);
1030 this.sleeper
.sleep(sleepTime
);
1032 handleReportForDutyResponse(w
);
1038 if (!isStopped() && isHealthy()) {
1039 // start the snapshot handler and other procedure handlers,
1040 // since the server is ready to run
1041 if (this.rspmHost
!= null) {
1042 this.rspmHost
.start();
1044 // Start the Quota Manager
1045 if (this.rsQuotaManager
!= null) {
1046 rsQuotaManager
.start(getRpcServer().getScheduler());
1048 if (this.rsSpaceQuotaManager
!= null) {
1049 this.rsSpaceQuotaManager
.start();
1053 // We registered with the Master. Go into run mode.
1054 long lastMsg
= System
.currentTimeMillis();
1055 long oldRequestCount
= -1;
1056 // The main run loop.
1057 while (!isStopped() && isHealthy()) {
1058 if (!isClusterUp()) {
1059 if (onlineRegions
.isEmpty()) {
1060 stop("Exiting; cluster shutdown set and not carrying any regions");
1061 } else if (!this.stopping
) {
1062 this.stopping
= true;
1063 LOG
.info("Closing user regions");
1064 closeUserRegions(this.abortRequested
.get());
1066 boolean allUserRegionsOffline
= areAllUserRegionsOffline();
1067 if (allUserRegionsOffline
) {
1068 // Set stopped if no more write requests tp meta tables
1069 // since last time we went around the loop. Any open
1070 // meta regions will be closed on our way out.
1071 if (oldRequestCount
== getWriteRequestCount()) {
1072 stop("Stopped; only catalog regions remaining online");
1075 oldRequestCount
= getWriteRequestCount();
1077 // Make sure all regions have been closed -- some regions may
1078 // have not got it because we were splitting at the time of
1079 // the call to closeUserRegions.
1080 closeUserRegions(this.abortRequested
.get());
1082 LOG
.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1085 long now
= System
.currentTimeMillis();
1086 if ((now
- lastMsg
) >= msgInterval
) {
1087 tryRegionServerReport(lastMsg
, now
);
1088 lastMsg
= System
.currentTimeMillis();
1090 if (!isStopped() && !isAborted()) {
1091 this.sleeper
.sleep();
1094 } catch (Throwable t
) {
1095 if (!rpcServices
.checkOOME(t
)) {
1096 String prefix
= t
instanceof YouAreDeadException?
"": "Unhandled: ";
1097 abort(prefix
+ t
.getMessage(), t
);
1101 if (this.leaseManager
!= null) {
1102 this.leaseManager
.closeAfterLeasesExpire();
1104 if (this.splitLogWorker
!= null) {
1105 splitLogWorker
.stop();
1107 if (this.infoServer
!= null) {
1108 LOG
.info("Stopping infoServer");
1110 this.infoServer
.stop();
1111 } catch (Exception e
) {
1112 LOG
.error("Failed to stop infoServer", e
);
1115 // Send cache a shutdown.
1116 if (blockCache
!= null) {
1117 blockCache
.shutdown();
1119 if (mobFileCache
!= null) {
1120 mobFileCache
.shutdown();
1123 // Send interrupts to wake up threads if sleeping so they notice shutdown.
1124 // TODO: Should we check they are alive? If OOME could have exited already
1125 if (this.hMemManager
!= null) this.hMemManager
.stop();
1126 if (this.cacheFlusher
!= null) this.cacheFlusher
.interruptIfNecessary();
1127 if (this.compactSplitThread
!= null) this.compactSplitThread
.interruptIfNecessary();
1129 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1130 if (rspmHost
!= null) {
1131 rspmHost
.stop(this.abortRequested
.get() || this.killed
);
1135 // Just skip out w/o closing regions. Used when testing.
1136 } else if (abortRequested
.get()) {
1137 if (this.dataFsOk
) {
1138 closeUserRegions(abortRequested
.get()); // Don't leave any open file handles
1140 LOG
.info("aborting server " + this.serverName
);
1142 closeUserRegions(abortRequested
.get());
1143 LOG
.info("stopping server " + this.serverName
);
1146 if (this.asyncClusterConnection
!= null) {
1148 this.asyncClusterConnection
.close();
1149 } catch (IOException e
) {
1150 // Although the {@link Closeable} interface throws an {@link
1151 // IOException}, in reality, the implementation would never do that.
1152 LOG
.warn("Attempt to close server's AsyncClusterConnection failed.", e
);
1155 // Closing the compactSplit thread before closing meta regions
1156 if (!this.killed
&& containsMetaTableRegions()) {
1157 if (!abortRequested
.get() || this.dataFsOk
) {
1158 if (this.compactSplitThread
!= null) {
1159 this.compactSplitThread
.join();
1160 this.compactSplitThread
= null;
1162 closeMetaTableRegions(abortRequested
.get());
1166 if (!this.killed
&& this.dataFsOk
) {
1167 waitOnAllRegionsToClose(abortRequested
.get());
1168 LOG
.info("stopping server " + this.serverName
+ "; all regions closed.");
1171 // Stop the quota manager
1172 if (rsQuotaManager
!= null) {
1173 rsQuotaManager
.stop();
1175 if (rsSpaceQuotaManager
!= null) {
1176 rsSpaceQuotaManager
.stop();
1177 rsSpaceQuotaManager
= null;
1180 // flag may be changed when closing regions throws exception.
1181 if (this.dataFsOk
) {
1182 shutdownWAL(!abortRequested
.get());
1185 // Make sure the proxy is down.
1186 if (this.rssStub
!= null) {
1187 this.rssStub
= null;
1189 if (this.lockStub
!= null) {
1190 this.lockStub
= null;
1192 if (this.rpcClient
!= null) {
1193 this.rpcClient
.close();
1195 if (this.leaseManager
!= null) {
1196 this.leaseManager
.close();
1198 if (this.pauseMonitor
!= null) {
1199 this.pauseMonitor
.stop();
1203 stopServiceThreads();
1206 if (this.rpcServices
!= null) {
1207 this.rpcServices
.stop();
1211 deleteMyEphemeralNode();
1212 } catch (KeeperException
.NoNodeException nn
) {
1214 } catch (KeeperException e
) {
1215 LOG
.warn("Failed deleting my ephemeral node", e
);
1217 // We may have failed to delete the znode at the previous step, but
1218 // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1219 ZNodeClearer
.deleteMyEphemeralNodeOnDisk();
1221 if (this.zooKeeper
!= null) {
1222 this.zooKeeper
.close();
1224 this.shutDown
= true;
1225 LOG
.info("Exiting; stopping=" + this.serverName
+ "; zookeeper connection closed.");
1228 private boolean containsMetaTableRegions() {
1229 return onlineRegions
.containsKey(RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
1232 private boolean areAllUserRegionsOffline() {
1233 if (getNumberOfOnlineRegions() > 2) return false;
1234 boolean allUserRegionsOffline
= true;
1235 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1236 if (!e
.getValue().getRegionInfo().isMetaRegion()) {
1237 allUserRegionsOffline
= false;
1241 return allUserRegionsOffline
;
1245 * @return Current write count for all online regions.
1247 private long getWriteRequestCount() {
1248 long writeCount
= 0;
1249 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1250 writeCount
+= e
.getValue().getWriteRequestsCount();
1255 @InterfaceAudience.Private
1256 protected void tryRegionServerReport(long reportStartTime
, long reportEndTime
)
1257 throws IOException
{
1258 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
1260 // the current server could be stopping.
1263 ClusterStatusProtos
.ServerLoad sl
= buildServerLoad(reportStartTime
, reportEndTime
);
1265 RegionServerReportRequest
.Builder request
= RegionServerReportRequest
.newBuilder();
1266 request
.setServer(ProtobufUtil
.toServerName(this.serverName
));
1267 request
.setLoad(sl
);
1268 rss
.regionServerReport(null, request
.build());
1269 } catch (ServiceException se
) {
1270 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
1271 if (ioe
instanceof YouAreDeadException
) {
1272 // This will be caught and handled as a fatal error in run()
1275 if (rssStub
== rss
) {
1278 // Couldn't connect to the master, get location from zk and reconnect
1279 // Method blocks until new master is found or we are stopped
1280 createRegionServerStatusStub(true);
1285 * Reports the given map of Regions and their size on the filesystem to the active Master.
1287 * @param regionSizeStore The store containing region sizes
1288 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1290 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore
) {
1291 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
1293 // the current server could be stopping.
1294 LOG
.trace("Skipping Region size report to HMaster as stub is null");
1298 buildReportAndSend(rss
, regionSizeStore
);
1299 } catch (ServiceException se
) {
1300 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
1301 if (ioe
instanceof PleaseHoldException
) {
1302 LOG
.trace("Failed to report region sizes to Master because it is initializing."
1303 + " This will be retried.", ioe
);
1304 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1307 if (rssStub
== rss
) {
1310 createRegionServerStatusStub(true);
1311 if (ioe
instanceof DoNotRetryIOException
) {
1312 DoNotRetryIOException doNotRetryEx
= (DoNotRetryIOException
) ioe
;
1313 if (doNotRetryEx
.getCause() != null) {
1314 Throwable t
= doNotRetryEx
.getCause();
1315 if (t
instanceof UnsupportedOperationException
) {
1316 LOG
.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1321 LOG
.debug("Failed to report region sizes to Master. This will be retried.", ioe
);
1327 * Builds the region size report and sends it to the master. Upon successful sending of the
1328 * report, the region sizes that were sent are marked as sent.
1330 * @param rss The stub to send to the Master
1331 * @param regionSizeStore The store containing region sizes
1333 private void buildReportAndSend(RegionServerStatusService
.BlockingInterface rss
,
1334 RegionSizeStore regionSizeStore
) throws ServiceException
{
1335 RegionSpaceUseReportRequest request
=
1336 buildRegionSpaceUseReportRequest(Objects
.requireNonNull(regionSizeStore
));
1337 rss
.reportRegionSpaceUse(null, request
);
1338 // Record the number of size reports sent
1339 if (metricsRegionServer
!= null) {
1340 metricsRegionServer
.incrementNumRegionSizeReportsSent(regionSizeStore
.size());
1345 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1347 * @param regionSizes The size in bytes of regions
1348 * @return The corresponding protocol buffer message.
1350 RegionSpaceUseReportRequest
buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes
) {
1351 RegionSpaceUseReportRequest
.Builder request
= RegionSpaceUseReportRequest
.newBuilder();
1352 for (Entry
<RegionInfo
, RegionSize
> entry
: regionSizes
) {
1353 request
.addSpaceUse(convertRegionSize(entry
.getKey(), entry
.getValue().getSize()));
1355 return request
.build();
1359 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1362 * @param regionInfo The RegionInfo
1363 * @param sizeInBytes The size in bytes of the Region
1364 * @return The protocol buffer
1366 RegionSpaceUse
convertRegionSize(RegionInfo regionInfo
, Long sizeInBytes
) {
1367 return RegionSpaceUse
.newBuilder()
1368 .setRegionInfo(ProtobufUtil
.toRegionInfo(Objects
.requireNonNull(regionInfo
)))
1369 .setRegionSize(Objects
.requireNonNull(sizeInBytes
))
1373 private ClusterStatusProtos
.ServerLoad
buildServerLoad(long reportStartTime
, long reportEndTime
)
1374 throws IOException
{
1375 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1376 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1377 // the wrapper to compute those numbers in one place.
1378 // In the long term most of these should be moved off of ServerLoad and the heart beat.
1379 // Instead they should be stored in an HBase table so that external visibility into HBase is
1380 // improved; Additionally the load balancer will be able to take advantage of a more complete
1382 MetricsRegionServerWrapper regionServerWrapper
= metricsRegionServer
.getRegionServerWrapper();
1383 Collection
<HRegion
> regions
= getOnlineRegionsLocalContext();
1384 long usedMemory
= -1L;
1385 long maxMemory
= -1L;
1386 final MemoryUsage usage
= MemorySizeUtil
.safeGetHeapMemoryUsage();
1387 if (usage
!= null) {
1388 usedMemory
= usage
.getUsed();
1389 maxMemory
= usage
.getMax();
1392 ClusterStatusProtos
.ServerLoad
.Builder serverLoad
= ClusterStatusProtos
.ServerLoad
.newBuilder();
1393 serverLoad
.setNumberOfRequests((int) regionServerWrapper
.getRequestsPerSecond());
1394 serverLoad
.setTotalNumberOfRequests(regionServerWrapper
.getTotalRequestCount());
1395 serverLoad
.setUsedHeapMB((int)(usedMemory
/ 1024 / 1024));
1396 serverLoad
.setMaxHeapMB((int) (maxMemory
/ 1024 / 1024));
1397 Set
<String
> coprocessors
= getWAL(null).getCoprocessorHost().getCoprocessors();
1398 Builder coprocessorBuilder
= Coprocessor
.newBuilder();
1399 for (String coprocessor
: coprocessors
) {
1400 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1402 RegionLoad
.Builder regionLoadBldr
= RegionLoad
.newBuilder();
1403 RegionSpecifier
.Builder regionSpecifier
= RegionSpecifier
.newBuilder();
1404 for (HRegion region
: regions
) {
1405 if (region
.getCoprocessorHost() != null) {
1406 Set
<String
> regionCoprocessors
= region
.getCoprocessorHost().getCoprocessors();
1407 for (String regionCoprocessor
: regionCoprocessors
) {
1408 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(regionCoprocessor
).build());
1411 serverLoad
.addRegionLoads(createRegionLoad(region
, regionLoadBldr
, regionSpecifier
));
1412 for (String coprocessor
: getWAL(region
.getRegionInfo()).getCoprocessorHost()
1413 .getCoprocessors()) {
1414 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1417 serverLoad
.setReportStartTime(reportStartTime
);
1418 serverLoad
.setReportEndTime(reportEndTime
);
1419 if (this.infoServer
!= null) {
1420 serverLoad
.setInfoServerPort(this.infoServer
.getPort());
1422 serverLoad
.setInfoServerPort(-1);
1424 MetricsUserAggregateSource userSource
=
1425 metricsRegionServer
.getMetricsUserAggregate().getSource();
1426 if (userSource
!= null) {
1427 Map
<String
, MetricsUserSource
> userMetricMap
= userSource
.getUserSources();
1428 for (Entry
<String
, MetricsUserSource
> entry
: userMetricMap
.entrySet()) {
1429 serverLoad
.addUserLoads(createUserLoad(entry
.getKey(), entry
.getValue()));
1433 if (sameReplicationSourceAndSink
&& replicationSourceHandler
!= null) {
1434 // always refresh first to get the latest value
1435 ReplicationLoad rLoad
= replicationSourceHandler
.refreshAndGetReplicationLoad();
1436 if (rLoad
!= null) {
1437 serverLoad
.setReplLoadSink(rLoad
.getReplicationLoadSink());
1438 for (ClusterStatusProtos
.ReplicationLoadSource rLS
: rLoad
1439 .getReplicationLoadSourceEntries()) {
1440 serverLoad
.addReplLoadSource(rLS
);
1444 if (replicationSourceHandler
!= null) {
1445 ReplicationLoad rLoad
= replicationSourceHandler
.refreshAndGetReplicationLoad();
1446 if (rLoad
!= null) {
1447 for (ClusterStatusProtos
.ReplicationLoadSource rLS
: rLoad
1448 .getReplicationLoadSourceEntries()) {
1449 serverLoad
.addReplLoadSource(rLS
);
1453 if (replicationSinkHandler
!= null) {
1454 ReplicationLoad rLoad
= replicationSinkHandler
.refreshAndGetReplicationLoad();
1455 if (rLoad
!= null) {
1456 serverLoad
.setReplLoadSink(rLoad
.getReplicationLoadSink());
1461 return serverLoad
.build();
1464 private String
getOnlineRegionsAsPrintableString() {
1465 StringBuilder sb
= new StringBuilder();
1466 for (Region r
: this.onlineRegions
.values()) {
1467 if (sb
.length() > 0) sb
.append(", ");
1468 sb
.append(r
.getRegionInfo().getEncodedName());
1470 return sb
.toString();
1474 * Wait on regions close.
1476 private void waitOnAllRegionsToClose(final boolean abort
) {
1477 // Wait till all regions are closed before going out.
1479 long previousLogTime
= 0;
1480 Set
<String
> closedRegions
= new HashSet
<>();
1481 boolean interrupted
= false;
1483 while (!onlineRegions
.isEmpty()) {
1484 int count
= getNumberOfOnlineRegions();
1485 // Only print a message if the count of regions has changed.
1486 if (count
!= lastCount
) {
1487 // Log every second at most
1488 if (System
.currentTimeMillis() > (previousLogTime
+ 1000)) {
1489 previousLogTime
= System
.currentTimeMillis();
1491 LOG
.info("Waiting on " + count
+ " regions to close");
1492 // Only print out regions still closing if a small number else will
1494 if (count
< 10 && LOG
.isDebugEnabled()) {
1495 LOG
.debug("Online Regions=" + this.onlineRegions
);
1499 // Ensure all user regions have been sent a close. Use this to
1500 // protect against the case where an open comes in after we start the
1501 // iterator of onlineRegions to close all user regions.
1502 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1503 RegionInfo hri
= e
.getValue().getRegionInfo();
1504 if (!this.regionsInTransitionInRS
.containsKey(hri
.getEncodedNameAsBytes()) &&
1505 !closedRegions
.contains(hri
.getEncodedName())) {
1506 closedRegions
.add(hri
.getEncodedName());
1507 // Don't update zk with this close transition; pass false.
1508 closeRegionIgnoreErrors(hri
, abort
);
1511 // No regions in RIT, we could stop waiting now.
1512 if (this.regionsInTransitionInRS
.isEmpty()) {
1513 if (!onlineRegions
.isEmpty()) {
1514 LOG
.info("We were exiting though online regions are not empty," +
1515 " because some regions failed closing");
1519 LOG
.debug("Waiting on {}", this.regionsInTransitionInRS
.keySet().stream().
1520 map(e
-> Bytes
.toString(e
)).collect(Collectors
.joining(", ")));
1522 if (sleepInterrupted(200)) {
1528 Thread
.currentThread().interrupt();
1533 private static boolean sleepInterrupted(long millis
) {
1534 boolean interrupted
= false;
1536 Thread
.sleep(millis
);
1537 } catch (InterruptedException e
) {
1538 LOG
.warn("Interrupted while sleeping");
1544 private void shutdownWAL(final boolean close
) {
1545 if (this.walFactory
!= null) {
1550 walFactory
.shutdown();
1552 } catch (Throwable e
) {
1553 e
= e
instanceof RemoteException ?
((RemoteException
) e
).unwrapRemoteException() : e
;
1554 LOG
.error("Shutdown / close of WAL failed: " + e
);
1555 LOG
.debug("Shutdown / close exception details:", e
);
1561 * get NamedQueue Provider to add different logs to ringbuffer
1563 * @return NamedQueueRecorder
1565 public NamedQueueRecorder
getNamedQueueRecorder() {
1566 return this.namedQueueRecorder
;
1570 * Run init. Sets up wal and starts up all server threads.
1572 * @param c Extra configuration.
1574 protected void handleReportForDutyResponse(final RegionServerStartupResponse c
)
1575 throws IOException
{
1577 boolean updateRootDir
= false;
1578 for (NameStringPair e
: c
.getMapEntriesList()) {
1579 String key
= e
.getName();
1580 // The hostname the master sees us as.
1581 if (key
.equals(HConstants
.KEY_FOR_HOSTNAME_SEEN_BY_MASTER
)) {
1582 String hostnameFromMasterPOV
= e
.getValue();
1583 this.serverName
= ServerName
.valueOf(hostnameFromMasterPOV
, rpcServices
.isa
.getPort(),
1585 if (!StringUtils
.isBlank(useThisHostnameInstead
) &&
1586 !hostnameFromMasterPOV
.equals(useThisHostnameInstead
)) {
1587 String msg
= "Master passed us a different hostname to use; was=" +
1588 this.useThisHostnameInstead
+ ", but now=" + hostnameFromMasterPOV
;
1590 throw new IOException(msg
);
1592 if (StringUtils
.isBlank(useThisHostnameInstead
) &&
1593 !hostnameFromMasterPOV
.equals(rpcServices
.isa
.getHostName())) {
1594 String msg
= "Master passed us a different hostname to use; was=" +
1595 rpcServices
.isa
.getHostName() + ", but now=" + hostnameFromMasterPOV
;
1601 String value
= e
.getValue();
1602 if (key
.equals(HConstants
.HBASE_DIR
)) {
1603 if (value
!= null && !value
.equals(conf
.get(HConstants
.HBASE_DIR
))) {
1604 updateRootDir
= true;
1608 if (LOG
.isDebugEnabled()) {
1609 LOG
.debug("Config from master: " + key
+ "=" + value
);
1611 this.conf
.set(key
, value
);
1613 // Set our ephemeral znode up in zookeeper now we have a name.
1614 createMyEphemeralNode();
1616 if (updateRootDir
) {
1617 // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1618 initializeFileSystem();
1621 // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1622 // config param for task trackers, but we can piggyback off of it.
1623 if (this.conf
.get("mapreduce.task.attempt.id") == null) {
1624 this.conf
.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName
.toString());
1627 // Save it in a file, this will allow to see if we crash
1628 ZNodeClearer
.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1630 // This call sets up an initialized replication and WAL. Later we start it up.
1631 setupWALAndReplication();
1632 // Init in here rather than in constructor after thread name has been set
1633 final MetricsTable metricsTable
=
1634 new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1635 this.metricsRegionServerImpl
= new MetricsRegionServerWrapperImpl(this);
1636 this.metricsRegionServer
= new MetricsRegionServer(
1637 metricsRegionServerImpl
, conf
, metricsTable
);
1638 // Now that we have a metrics source, start the pause monitor
1639 this.pauseMonitor
= new JvmPauseMonitor(conf
, getMetrics().getMetricsSource());
1640 pauseMonitor
.start();
1642 // There is a rare case where we do NOT want services to start. Check config.
1643 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1646 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1647 // or make sense of it.
1648 startReplicationService();
1652 LOG
.info("Serving as " + this.serverName
+ ", RpcServer on " + rpcServices
.isa
+
1654 Long
.toHexString(this.zooKeeper
.getRecoverableZooKeeper().getSessionId()));
1656 // Wake up anyone waiting for this server to online
1657 synchronized (online
) {
1661 } catch (Throwable e
) {
1662 stop("Failed initialization");
1663 throw convertThrowableToIOE(cleanup(e
, "Failed init"),
1664 "Region server startup failed");
1666 sleeper
.skipSleepCycle();
1670 protected void initializeMemStoreChunkCreator() {
1671 if (MemStoreLAB
.isEnabled(conf
)) {
1672 // MSLAB is enabled. So initialize MemStoreChunkPool
1673 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1675 Pair
<Long
, MemoryType
> pair
= MemorySizeUtil
.getGlobalMemStoreSize(conf
);
1676 long globalMemStoreSize
= pair
.getFirst();
1677 boolean offheap
= this.regionServerAccounting
.isOffheap();
1678 // When off heap memstore in use, take full area for chunk pool.
1679 float poolSizePercentage
= offheap ?
1.0F
:
1680 conf
.getFloat(MemStoreLAB
.CHUNK_POOL_MAXSIZE_KEY
, MemStoreLAB
.POOL_MAX_SIZE_DEFAULT
);
1681 float initialCountPercentage
= conf
.getFloat(MemStoreLAB
.CHUNK_POOL_INITIALSIZE_KEY
,
1682 MemStoreLAB
.POOL_INITIAL_SIZE_DEFAULT
);
1683 int chunkSize
= conf
.getInt(MemStoreLAB
.CHUNK_SIZE_KEY
, MemStoreLAB
.CHUNK_SIZE_DEFAULT
);
1684 float indexChunkSizePercent
= conf
.getFloat(MemStoreLAB
.INDEX_CHUNK_SIZE_PERCENTAGE_KEY
,
1685 MemStoreLAB
.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT
);
1686 // init the chunkCreator
1687 ChunkCreator
.initialize(chunkSize
, offheap
, globalMemStoreSize
, poolSizePercentage
,
1688 initialCountPercentage
, this.hMemManager
, indexChunkSizePercent
);
1692 private void startHeapMemoryManager() {
1693 if (this.blockCache
!= null) {
1695 new HeapMemoryManager(this.blockCache
, this.cacheFlusher
, this, regionServerAccounting
);
1696 this.hMemManager
.start(getChoreService());
1700 private void createMyEphemeralNode() throws KeeperException
{
1701 RegionServerInfo
.Builder rsInfo
= RegionServerInfo
.newBuilder();
1702 rsInfo
.setInfoPort(infoServer
!= null ? infoServer
.getPort() : -1);
1703 rsInfo
.setVersionInfo(ProtobufUtil
.getVersionInfo());
1704 byte[] data
= ProtobufUtil
.prependPBMagic(rsInfo
.build().toByteArray());
1705 ZKUtil
.createEphemeralNodeAndWatch(this.zooKeeper
, getMyEphemeralNodePath(), data
);
1708 private void deleteMyEphemeralNode() throws KeeperException
{
1709 ZKUtil
.deleteNode(this.zooKeeper
, getMyEphemeralNodePath());
1713 public RegionServerAccounting
getRegionServerAccounting() {
1714 return regionServerAccounting
;
1718 * @param r Region to get RegionLoad for.
1719 * @param regionLoadBldr the RegionLoad.Builder, can be null
1720 * @param regionSpecifier the RegionSpecifier.Builder, can be null
1721 * @return RegionLoad instance.
1723 RegionLoad
createRegionLoad(final HRegion r
, RegionLoad
.Builder regionLoadBldr
,
1724 RegionSpecifier
.Builder regionSpecifier
) throws IOException
{
1725 byte[] name
= r
.getRegionInfo().getRegionName();
1728 int storeRefCount
= 0;
1729 int maxCompactedStoreFileRefCount
= 0;
1730 int storeUncompressedSizeMB
= 0;
1731 int storefileSizeMB
= 0;
1732 int memstoreSizeMB
= (int) (r
.getMemStoreDataSize() / 1024 / 1024);
1733 long storefileIndexSizeKB
= 0;
1734 int rootLevelIndexSizeKB
= 0;
1735 int totalStaticIndexSizeKB
= 0;
1736 int totalStaticBloomSizeKB
= 0;
1737 long totalCompactingKVs
= 0;
1738 long currentCompactedKVs
= 0;
1739 List
<HStore
> storeList
= r
.getStores();
1740 stores
+= storeList
.size();
1741 for (HStore store
: storeList
) {
1742 storefiles
+= store
.getStorefilesCount();
1743 int currentStoreRefCount
= store
.getStoreRefCount();
1744 storeRefCount
+= currentStoreRefCount
;
1745 int currentMaxCompactedStoreFileRefCount
= store
.getMaxCompactedStoreFileRefCount();
1746 maxCompactedStoreFileRefCount
= Math
.max(maxCompactedStoreFileRefCount
,
1747 currentMaxCompactedStoreFileRefCount
);
1748 storeUncompressedSizeMB
+= (int) (store
.getStoreSizeUncompressed() / 1024 / 1024);
1749 storefileSizeMB
+= (int) (store
.getStorefilesSize() / 1024 / 1024);
1750 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1751 storefileIndexSizeKB
+= store
.getStorefilesRootLevelIndexSize() / 1024;
1752 CompactionProgress progress
= store
.getCompactionProgress();
1753 if (progress
!= null) {
1754 totalCompactingKVs
+= progress
.getTotalCompactingKVs();
1755 currentCompactedKVs
+= progress
.currentCompactedKVs
;
1757 rootLevelIndexSizeKB
+= (int) (store
.getStorefilesRootLevelIndexSize() / 1024);
1758 totalStaticIndexSizeKB
+= (int) (store
.getTotalStaticIndexSize() / 1024);
1759 totalStaticBloomSizeKB
+= (int) (store
.getTotalStaticBloomSize() / 1024);
1762 HDFSBlocksDistribution hdfsBd
= r
.getHDFSBlocksDistribution();
1763 float dataLocality
= hdfsBd
.getBlockLocalityIndex(serverName
.getHostname());
1764 float dataLocalityForSsd
= hdfsBd
.getBlockLocalityIndexForSsd(serverName
.getHostname());
1765 long blocksTotalWeight
= hdfsBd
.getUniqueBlocksTotalWeight();
1766 long blocksLocalWeight
= hdfsBd
.getBlocksLocalWeight(serverName
.getHostname());
1767 long blocksLocalWithSsdWeight
= hdfsBd
.getBlocksLocalWithSsdWeight(serverName
.getHostname());
1768 if (regionLoadBldr
== null) {
1769 regionLoadBldr
= RegionLoad
.newBuilder();
1771 if (regionSpecifier
== null) {
1772 regionSpecifier
= RegionSpecifier
.newBuilder();
1774 regionSpecifier
.setType(RegionSpecifierType
.REGION_NAME
);
1775 regionSpecifier
.setValue(UnsafeByteOperations
.unsafeWrap(name
));
1776 regionLoadBldr
.setRegionSpecifier(regionSpecifier
.build())
1778 .setStorefiles(storefiles
)
1779 .setStoreRefCount(storeRefCount
)
1780 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount
)
1781 .setStoreUncompressedSizeMB(storeUncompressedSizeMB
)
1782 .setStorefileSizeMB(storefileSizeMB
)
1783 .setMemStoreSizeMB(memstoreSizeMB
)
1784 .setStorefileIndexSizeKB(storefileIndexSizeKB
)
1785 .setRootIndexSizeKB(rootLevelIndexSizeKB
)
1786 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB
)
1787 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB
)
1788 .setReadRequestsCount(r
.getReadRequestsCount())
1789 .setCpRequestsCount(r
.getCpRequestsCount())
1790 .setFilteredReadRequestsCount(r
.getFilteredReadRequestsCount())
1791 .setWriteRequestsCount(r
.getWriteRequestsCount())
1792 .setTotalCompactingKVs(totalCompactingKVs
)
1793 .setCurrentCompactedKVs(currentCompactedKVs
)
1794 .setDataLocality(dataLocality
)
1795 .setDataLocalityForSsd(dataLocalityForSsd
)
1796 .setBlocksLocalWeight(blocksLocalWeight
)
1797 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight
)
1798 .setBlocksTotalWeight(blocksTotalWeight
)
1799 .setCompactionState(ProtobufUtil
.createCompactionStateForRegionLoad(r
.getCompactionState()))
1800 .setLastMajorCompactionTs(r
.getOldestHfileTs(true));
1801 r
.setCompleteSequenceId(regionLoadBldr
);
1802 return regionLoadBldr
.build();
1805 private UserLoad
createUserLoad(String user
, MetricsUserSource userSource
) {
1806 UserLoad
.Builder userLoadBldr
= UserLoad
.newBuilder();
1807 userLoadBldr
.setUserName(user
);
1808 userSource
.getClientMetrics().values().stream().map(
1809 clientMetrics
-> ClusterStatusProtos
.ClientMetrics
.newBuilder()
1810 .setHostName(clientMetrics
.getHostName())
1811 .setWriteRequestsCount(clientMetrics
.getWriteRequestsCount())
1812 .setFilteredRequestsCount(clientMetrics
.getFilteredReadRequests())
1813 .setReadRequestsCount(clientMetrics
.getReadRequestsCount()).build())
1814 .forEach(userLoadBldr
::addClientMetrics
);
1815 return userLoadBldr
.build();
1818 public RegionLoad
createRegionLoad(final String encodedRegionName
) throws IOException
{
1819 HRegion r
= onlineRegions
.get(encodedRegionName
);
1820 return r
!= null ?
createRegionLoad(r
, null, null) : null;
1824 * Inner class that runs on a long period checking if regions need compaction.
1826 private static class CompactionChecker
extends ScheduledChore
{
1827 private final HRegionServer instance
;
1828 private final int majorCompactPriority
;
1829 private final static int DEFAULT_PRIORITY
= Integer
.MAX_VALUE
;
1830 //Iteration is 1-based rather than 0-based so we don't check for compaction
1831 // immediately upon region server startup
1832 private long iteration
= 1;
1834 CompactionChecker(final HRegionServer h
, final int sleepTime
, final Stoppable stopper
) {
1835 super("CompactionChecker", stopper
, sleepTime
);
1837 LOG
.info(this.getName() + " runs every " + Duration
.ofMillis(sleepTime
));
1839 /* MajorCompactPriority is configurable.
1840 * If not set, the compaction will use default priority.
1842 this.majorCompactPriority
= this.instance
.conf
.
1843 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1848 protected void chore() {
1849 for (Region r
: this.instance
.onlineRegions
.values()) {
1850 // Skip compaction if region is read only
1851 if (r
== null || r
.isReadOnly()) {
1855 HRegion hr
= (HRegion
) r
;
1856 for (HStore s
: hr
.stores
.values()) {
1858 long multiplier
= s
.getCompactionCheckMultiplier();
1859 assert multiplier
> 0;
1860 if (iteration
% multiplier
!= 0) {
1863 if (s
.needsCompaction()) {
1864 // Queue a compaction. Will recognize if major is needed.
1865 this.instance
.compactSplitThread
.requestSystemCompaction(hr
, s
,
1866 getName() + " requests compaction");
1867 } else if (s
.shouldPerformMajorCompaction()) {
1868 s
.triggerMajorCompaction();
1869 if (majorCompactPriority
== DEFAULT_PRIORITY
||
1870 majorCompactPriority
> hr
.getCompactPriority()) {
1871 this.instance
.compactSplitThread
.requestCompaction(hr
, s
,
1872 getName() + " requests major compaction; use default priority",
1874 CompactionLifeCycleTracker
.DUMMY
, null);
1876 this.instance
.compactSplitThread
.requestCompaction(hr
, s
,
1877 getName() + " requests major compaction; use configured priority",
1878 this.majorCompactPriority
, CompactionLifeCycleTracker
.DUMMY
, null);
1881 } catch (IOException e
) {
1882 LOG
.warn("Failed major compaction check on " + r
, e
);
1886 iteration
= (iteration
== Long
.MAX_VALUE
) ?
0 : (iteration
+ 1);
1890 private static class PeriodicMemStoreFlusher
extends ScheduledChore
{
1891 private final HRegionServer server
;
1892 private final static int RANGE_OF_DELAY
= 5 * 60; // 5 min in seconds
1893 private final static int MIN_DELAY_TIME
= 0; // millisec
1894 private final long rangeOfDelayMs
;
1896 PeriodicMemStoreFlusher(int cacheFlushInterval
, final HRegionServer server
) {
1897 super("MemstoreFlusherChore", server
, cacheFlushInterval
);
1898 this.server
= server
;
1900 final long configuredRangeOfDelay
= server
.getConfiguration().getInt(
1901 "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY
);
1902 this.rangeOfDelayMs
= TimeUnit
.SECONDS
.toMillis(configuredRangeOfDelay
);
1906 protected void chore() {
1907 final StringBuilder whyFlush
= new StringBuilder();
1908 for (HRegion r
: this.server
.onlineRegions
.values()) {
1909 if (r
== null) continue;
1910 if (r
.shouldFlush(whyFlush
)) {
1911 FlushRequester requester
= server
.getFlushRequester();
1912 if (requester
!= null) {
1913 long randomDelay
= RandomUtils
.nextLong(0, rangeOfDelayMs
) + MIN_DELAY_TIME
;
1914 //Throttle the flushes by putting a delay. If we don't throttle, and there
1915 //is a balanced write-load on the regions in a table, we might end up
1916 //overwhelming the filesystem with too many flushes at once.
1917 if (requester
.requestDelayedFlush(r
, randomDelay
)) {
1918 LOG
.info("{} requesting flush of {} because {} after random delay {} ms",
1919 getName(), r
.getRegionInfo().getRegionNameAsString(), whyFlush
.toString(),
1929 * Report the status of the server. A server is online once all the startup is
1930 * completed (setting up filesystem, starting executorService threads, etc.). This
1931 * method is designed mostly to be useful in tests.
1933 * @return true if online, false if not.
1935 public boolean isOnline() {
1936 return online
.get();
1940 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1941 * be hooked up to WAL.
1943 private void setupWALAndReplication() throws IOException
{
1944 boolean isMasterNoTableOrSystemTableOnly
= this instanceof HMaster
&&
1945 !LoadBalancer
.isMasterCanHostUserRegions(conf
);
1946 WALFactory factory
=
1947 new WALFactory(conf
, serverName
.toString(), this, !isMasterNoTableOrSystemTableOnly
);
1948 if (!isMasterNoTableOrSystemTableOnly
) {
1949 // TODO Replication make assumptions here based on the default filesystem impl
1950 Path oldLogDir
= new Path(walRootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
1951 String logName
= AbstractFSWALProvider
.getWALDirectoryName(this.serverName
.toString());
1953 Path logDir
= new Path(walRootDir
, logName
);
1954 LOG
.debug("logDir={}", logDir
);
1955 if (this.walFs
.exists(logDir
)) {
1956 throw new RegionServerRunningException(
1957 "Region server has already created directory at " + this.serverName
.toString());
1959 // Always create wal directory as now we need this when master restarts to find out the live
1961 if (!this.walFs
.mkdirs(logDir
)) {
1962 throw new IOException("Can not create wal directory " + logDir
);
1964 // Instantiate replication if replication enabled. Pass it the log directories.
1965 createNewReplicationInstance(conf
, this, this.walFs
, logDir
, oldLogDir
, factory
);
1967 this.walFactory
= factory
;
1971 * Start up replication source and sink handlers.
1973 private void startReplicationService() throws IOException
{
1974 if (sameReplicationSourceAndSink
&& this.replicationSourceHandler
!= null) {
1975 this.replicationSourceHandler
.startReplicationService();
1977 if (this.replicationSourceHandler
!= null) {
1978 this.replicationSourceHandler
.startReplicationService();
1980 if (this.replicationSinkHandler
!= null) {
1981 this.replicationSinkHandler
.startReplicationService();
1987 * @return Master address tracker instance.
1989 public MasterAddressTracker
getMasterAddressTracker() {
1990 return this.masterAddressTracker
;
1994 * Start maintenance Threads, Server, Worker and lease checker threads.
1995 * Start all threads we need to run. This is called after we've successfully
1996 * registered with the Master.
1997 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1998 * get an unhandled exception. We cannot set the handler on all threads.
1999 * Server's internal Listener thread is off limits. For Server, if an OOME, it
2000 * waits a while then retries. Meantime, a flush or a compaction that tries to
2001 * run should trigger same critical condition and the shutdown will run. On
2002 * its way out, this server will shut down Server. Leases are sort of
2003 * inbetween. It has an internal thread that while it inherits from Chore, it
2004 * keeps its own internal stop mechanism so needs to be stopped by this
2005 * hosting server. Worker logs the exception and exits.
2007 private void startServices() throws IOException
{
2008 if (!isStopped() && !isAborted()) {
2009 initializeThreads();
2011 this.secureBulkLoadManager
= new SecureBulkLoadManager(this.conf
, asyncClusterConnection
);
2012 this.secureBulkLoadManager
.start();
2014 // Health checker thread.
2015 if (isHealthCheckerConfigured()) {
2016 int sleepTime
= this.conf
.getInt(HConstants
.HEALTH_CHORE_WAKE_FREQ
,
2017 HConstants
.DEFAULT_THREAD_WAKE_FREQUENCY
);
2018 healthCheckChore
= new HealthCheckChore(sleepTime
, this, getConfiguration());
2020 // Executor status collect thread.
2021 if (this.conf
.getBoolean(HConstants
.EXECUTOR_STATUS_COLLECT_ENABLED
,
2022 HConstants
.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED
)) {
2023 int sleepTime
= this.conf
.getInt(ExecutorStatusChore
.WAKE_FREQ
,
2024 ExecutorStatusChore
.DEFAULT_WAKE_FREQ
);
2025 executorStatusChore
= new ExecutorStatusChore(sleepTime
, this, this.getExecutorService(),
2026 this.metricsRegionServer
.getMetricsSource());
2029 this.walRoller
= new LogRoller(this);
2030 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, conf
);
2031 this.procedureResultReporter
= new RemoteProcedureResultReporter(this);
2033 // Create the CompactedFileDischarger chore executorService. This chore helps to
2034 // remove the compacted files that will no longer be used in reads.
2035 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
2036 // 2 mins so that compacted files can be archived before the TTLCleaner runs
2037 int cleanerInterval
=
2038 conf
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
2039 this.compactedFileDischarger
=
2040 new CompactedHFilesDischarger(cleanerInterval
, this, this);
2041 choreService
.scheduleChore(compactedFileDischarger
);
2043 // Start executor services
2044 final int openRegionThreads
= conf
.getInt("hbase.regionserver.executor.openregion.threads", 3);
2045 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2046 ExecutorType
.RS_OPEN_REGION
).setCorePoolSize(openRegionThreads
));
2047 final int openMetaThreads
= conf
.getInt("hbase.regionserver.executor.openmeta.threads", 1);
2048 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2049 ExecutorType
.RS_OPEN_META
).setCorePoolSize(openMetaThreads
));
2050 final int openPriorityRegionThreads
=
2051 conf
.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
2052 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2053 ExecutorType
.RS_OPEN_PRIORITY_REGION
).setCorePoolSize(openPriorityRegionThreads
));
2054 final int closeRegionThreads
=
2055 conf
.getInt("hbase.regionserver.executor.closeregion.threads", 3);
2056 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2057 ExecutorType
.RS_CLOSE_REGION
).setCorePoolSize(closeRegionThreads
));
2058 final int closeMetaThreads
= conf
.getInt("hbase.regionserver.executor.closemeta.threads", 1);
2059 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2060 ExecutorType
.RS_CLOSE_META
).setCorePoolSize(closeMetaThreads
));
2061 if (conf
.getBoolean(StoreScanner
.STORESCANNER_PARALLEL_SEEK_ENABLE
, false)) {
2062 final int storeScannerParallelSeekThreads
=
2063 conf
.getInt("hbase.storescanner.parallel.seek.threads", 10);
2064 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2065 ExecutorType
.RS_PARALLEL_SEEK
).setCorePoolSize(storeScannerParallelSeekThreads
)
2066 .setAllowCoreThreadTimeout(true));
2068 final int logReplayOpsThreads
= conf
.getInt(
2069 HBASE_SPLIT_WAL_MAX_SPLITTER
, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER
);
2070 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2071 ExecutorType
.RS_LOG_REPLAY_OPS
).setCorePoolSize(logReplayOpsThreads
)
2072 .setAllowCoreThreadTimeout(true));
2073 // Start the threads for compacted files discharger
2074 final int compactionDischargerThreads
=
2075 conf
.getInt(CompactionConfiguration
.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT
, 10);
2076 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2077 ExecutorType
.RS_COMPACTED_FILES_DISCHARGER
).setCorePoolSize(compactionDischargerThreads
));
2078 if (ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(conf
)) {
2079 final int regionReplicaFlushThreads
= conf
.getInt(
2080 "hbase.regionserver.region.replica.flusher.threads", conf
.getInt(
2081 "hbase.regionserver.executor.openregion.threads", 3));
2082 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2083 ExecutorType
.RS_REGION_REPLICA_FLUSH_OPS
).setCorePoolSize(regionReplicaFlushThreads
));
2085 final int refreshPeerThreads
=
2086 conf
.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
2087 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2088 ExecutorType
.RS_REFRESH_PEER
).setCorePoolSize(refreshPeerThreads
));
2089 final int replaySyncReplicationWALThreads
=
2090 conf
.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
2091 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2092 ExecutorType
.RS_REPLAY_SYNC_REPLICATION_WAL
).setCorePoolSize(
2093 replaySyncReplicationWALThreads
));
2094 final int switchRpcThrottleThreads
=
2095 conf
.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
2096 executorService
.startExecutorService(executorService
.new ExecutorConfig().setExecutorType(
2097 ExecutorType
.RS_SWITCH_RPC_THROTTLE
).setCorePoolSize(switchRpcThrottleThreads
));
2099 Threads
.setDaemonThreadRunning(this.walRoller
, getName() + ".logRoller",
2100 uncaughtExceptionHandler
);
2101 if (this.cacheFlusher
!= null) {
2102 this.cacheFlusher
.start(uncaughtExceptionHandler
);
2104 Threads
.setDaemonThreadRunning(this.procedureResultReporter
,
2105 getName() + ".procedureResultReporter", uncaughtExceptionHandler
);
2107 if (this.compactionChecker
!= null) {
2108 choreService
.scheduleChore(compactionChecker
);
2110 if (this.periodicFlusher
!= null) {
2111 choreService
.scheduleChore(periodicFlusher
);
2113 if (this.healthCheckChore
!= null) {
2114 choreService
.scheduleChore(healthCheckChore
);
2116 if (this.executorStatusChore
!= null) {
2117 choreService
.scheduleChore(executorStatusChore
);
2119 if (this.nonceManagerChore
!= null) {
2120 choreService
.scheduleChore(nonceManagerChore
);
2122 if (this.storefileRefresher
!= null) {
2123 choreService
.scheduleChore(storefileRefresher
);
2125 if (this.fsUtilizationChore
!= null) {
2126 choreService
.scheduleChore(fsUtilizationChore
);
2128 if (this.slowLogTableOpsChore
!= null) {
2129 choreService
.scheduleChore(slowLogTableOpsChore
);
2132 // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2133 // an unhandled exception, it will just exit.
2134 Threads
.setDaemonThreadRunning(this.leaseManager
, getName() + ".leaseChecker",
2135 uncaughtExceptionHandler
);
2137 // Create the log splitting worker and start it
2138 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2139 // quite a while inside Connection layer. The worker won't be available for other
2140 // tasks even after current task is preempted after a split task times out.
2141 Configuration sinkConf
= HBaseConfiguration
.create(conf
);
2142 sinkConf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
2143 conf
.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2144 sinkConf
.setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
,
2145 conf
.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2146 sinkConf
.setInt(HConstants
.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER
, 1);
2147 if (this.csm
!= null && conf
.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK
,
2148 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
)) {
2149 // SplitLogWorker needs csm. If none, don't start this.
2150 this.splitLogWorker
= new SplitLogWorker(sinkConf
, this, this, walFactory
);
2151 splitLogWorker
.start();
2152 LOG
.debug("SplitLogWorker started");
2155 // Memstore services.
2156 startHeapMemoryManager();
2157 // Call it after starting HeapMemoryManager.
2158 initializeMemStoreChunkCreator();
2161 private void initializeThreads() {
2162 // Cache flushing thread.
2163 this.cacheFlusher
= new MemStoreFlusher(conf
, this);
2165 // Compaction thread
2166 this.compactSplitThread
= new CompactSplit(this);
2168 // Background thread to check for compactions; needed if region has not gotten updates
2169 // in a while. It will take care of not checking too frequently on store-by-store basis.
2170 this.compactionChecker
= new CompactionChecker(this, this.compactionCheckFrequency
, this);
2171 this.periodicFlusher
= new PeriodicMemStoreFlusher(this.flushCheckFrequency
, this);
2172 this.leaseManager
= new LeaseManager(this.threadWakeFrequency
);
2174 final boolean isSlowLogTableEnabled
= conf
.getBoolean(HConstants
.SLOW_LOG_SYS_TABLE_ENABLED_KEY
,
2175 HConstants
.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY
);
2176 if (isSlowLogTableEnabled
) {
2177 // default chore duration: 10 min
2178 final int duration
= conf
.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
2179 slowLogTableOpsChore
= new SlowLogTableOpsChore(this, duration
, this.namedQueueRecorder
);
2182 if (this.nonceManager
!= null) {
2183 // Create the scheduled chore that cleans up nonces.
2184 nonceManagerChore
= this.nonceManager
.createCleanupScheduledChore(this);
2187 // Setup the Quota Manager
2188 rsQuotaManager
= new RegionServerRpcQuotaManager(this);
2189 rsSpaceQuotaManager
= new RegionServerSpaceQuotaManager(this);
2191 if (QuotaUtil
.isQuotaEnabled(conf
)) {
2192 this.fsUtilizationChore
= new FileSystemUtilizationChore(this);
2196 boolean onlyMetaRefresh
= false;
2197 int storefileRefreshPeriod
= conf
.getInt(
2198 StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
,
2199 StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
2200 if (storefileRefreshPeriod
== 0) {
2201 storefileRefreshPeriod
= conf
.getInt(
2202 StorefileRefresherChore
.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD
,
2203 StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
2204 onlyMetaRefresh
= true;
2206 if (storefileRefreshPeriod
> 0) {
2207 this.storefileRefresher
= new StorefileRefresherChore(storefileRefreshPeriod
,
2208 onlyMetaRefresh
, this, this);
2210 registerConfigurationObservers();
2213 private void registerConfigurationObservers() {
2214 // Registering the compactSplitThread object with the ConfigurationManager.
2215 configurationManager
.registerObserver(this.compactSplitThread
);
2216 configurationManager
.registerObserver(this.rpcServices
);
2217 configurationManager
.registerObserver(this);
2221 * Puts up the webui.
2223 private void putUpWebUI() throws IOException
{
2224 int port
= this.conf
.getInt(HConstants
.REGIONSERVER_INFO_PORT
,
2225 HConstants
.DEFAULT_REGIONSERVER_INFOPORT
);
2226 String addr
= this.conf
.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
2228 if(this instanceof HMaster
) {
2229 port
= conf
.getInt(HConstants
.MASTER_INFO_PORT
,
2230 HConstants
.DEFAULT_MASTER_INFOPORT
);
2231 addr
= this.conf
.get("hbase.master.info.bindAddress", "0.0.0.0");
2233 // -1 is for disabling info server
2238 if (!Addressing
.isLocalAddress(InetAddress
.getByName(addr
))) {
2240 "Failed to start http info server. Address " + addr
2241 + " does not belong to this host. Correct configuration parameter: "
2242 + "hbase.regionserver.info.bindAddress";
2244 throw new IOException(msg
);
2246 // check if auto port bind enabled
2247 boolean auto
= this.conf
.getBoolean(HConstants
.REGIONSERVER_INFO_PORT_AUTO
, false);
2250 this.infoServer
= new InfoServer(getProcessName(), addr
, port
, false, this.conf
);
2251 infoServer
.addPrivilegedServlet("dump", "/dump", getDumpServlet());
2252 configureInfoServer();
2253 this.infoServer
.start();
2255 } catch (BindException e
) {
2257 // auto bind disabled throw BindException
2258 LOG
.error("Failed binding http info server to port: " + port
);
2261 // auto bind enabled, try to use another port
2262 LOG
.info("Failed binding http info server to port: " + port
);
2266 port
= this.infoServer
.getPort();
2267 conf
.setInt(HConstants
.REGIONSERVER_INFO_PORT
, port
);
2268 int masterInfoPort
= conf
.getInt(HConstants
.MASTER_INFO_PORT
,
2269 HConstants
.DEFAULT_MASTER_INFOPORT
);
2270 conf
.setInt("hbase.master.info.port.orig", masterInfoPort
);
2271 conf
.setInt(HConstants
.MASTER_INFO_PORT
, port
);
2275 * Verify that server is healthy
2277 private boolean isHealthy() {
2279 // File system problem
2282 // Verify that all threads are alive
2283 boolean healthy
= (this.leaseManager
== null || this.leaseManager
.isAlive())
2284 && (this.cacheFlusher
== null || this.cacheFlusher
.isAlive())
2285 && (this.walRoller
== null || this.walRoller
.isAlive())
2286 && (this.compactionChecker
== null || this.compactionChecker
.isScheduled())
2287 && (this.periodicFlusher
== null || this.periodicFlusher
.isScheduled());
2289 stop("One or more threads are no longer alive -- stop");
2295 public List
<WAL
> getWALs() {
2296 return walFactory
.getWALs();
2300 public WAL
getWAL(RegionInfo regionInfo
) throws IOException
{
2301 WAL wal
= walFactory
.getWAL(regionInfo
);
2302 if (this.walRoller
!= null) {
2303 this.walRoller
.addWAL(wal
);
2308 public LogRoller
getWalRoller() {
2312 WALFactory
getWalFactory() {
2317 public Connection
getConnection() {
2318 return getAsyncConnection().toConnection();
2322 public void stop(final String msg
) {
2323 stop(msg
, false, RpcServer
.getRequestUser().orElse(null));
2327 * Stops the regionserver.
2328 * @param msg Status message
2329 * @param force True if this is a regionserver abort
2330 * @param user The user executing the stop request, or null if no user is associated
2332 public void stop(final String msg
, final boolean force
, final User user
) {
2333 if (!this.stopped
) {
2334 LOG
.info("***** STOPPING region server '" + this + "' *****");
2335 if (this.rsHost
!= null) {
2336 // when forced via abort don't allow CPs to override
2338 this.rsHost
.preStop(msg
, user
);
2339 } catch (IOException ioe
) {
2341 LOG
.warn("The region server did not stop", ioe
);
2344 LOG
.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe
);
2347 this.stopped
= true;
2348 LOG
.info("STOPPED: " + msg
);
2349 // Wakes run() if it is sleeping
2350 sleeper
.skipSleepCycle();
2354 public void waitForServerOnline(){
2355 while (!isStopped() && !isOnline()) {
2356 synchronized (online
) {
2358 online
.wait(msgInterval
);
2359 } catch (InterruptedException ie
) {
2360 Thread
.currentThread().interrupt();
2368 public void postOpenDeployTasks(final PostOpenDeployContext context
) throws IOException
{
2369 HRegion r
= context
.getRegion();
2370 long openProcId
= context
.getOpenProcId();
2371 long masterSystemTime
= context
.getMasterSystemTime();
2372 rpcServices
.checkOpen();
2373 LOG
.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2374 r
.getRegionInfo().getRegionNameAsString(), openProcId
, masterSystemTime
);
2375 // Do checks to see if we need to compact (references or too many files)
2376 // Skip compaction check if region is read only
2377 if (!r
.isReadOnly()) {
2378 for (HStore s
: r
.stores
.values()) {
2379 if (s
.hasReferences() || s
.needsCompaction()) {
2380 this.compactSplitThread
.requestSystemCompaction(r
, s
, "Opening Region");
2384 long openSeqNum
= r
.getOpenSeqNum();
2385 if (openSeqNum
== HConstants
.NO_SEQNUM
) {
2386 // If we opened a region, we should have read some sequence number from it.
2388 "No sequence number found when opening " + r
.getRegionInfo().getRegionNameAsString());
2393 if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode
.OPENED
,
2394 openSeqNum
, openProcId
, masterSystemTime
, r
.getRegionInfo()))) {
2395 throw new IOException(
2396 "Failed to report opened region to master: " + r
.getRegionInfo().getRegionNameAsString());
2399 triggerFlushInPrimaryRegion(r
);
2401 LOG
.debug("Finished post open deploy task for " + r
.getRegionInfo().getRegionNameAsString());
2405 * Helper method for use in tests. Skip the region transition report when there's no master
2406 * around to receive it.
2408 private boolean skipReportingTransition(final RegionStateTransitionContext context
) {
2409 final TransitionCode code
= context
.getCode();
2410 final long openSeqNum
= context
.getOpenSeqNum();
2411 long masterSystemTime
= context
.getMasterSystemTime();
2412 final RegionInfo
[] hris
= context
.getHris();
2414 if (code
== TransitionCode
.OPENED
) {
2415 Preconditions
.checkArgument(hris
!= null && hris
.length
== 1);
2416 if (hris
[0].isMetaRegion()) {
2418 MetaTableLocator
.setMetaLocation(getZooKeeper(), serverName
,
2419 hris
[0].getReplicaId(), RegionState
.State
.OPEN
);
2420 } catch (KeeperException e
) {
2421 LOG
.info("Failed to update meta location", e
);
2426 MetaTableAccessor
.updateRegionLocation(asyncClusterConnection
.toConnection(), hris
[0],
2427 serverName
, openSeqNum
, masterSystemTime
);
2428 } catch (IOException e
) {
2429 LOG
.info("Failed to update meta", e
);
2437 private ReportRegionStateTransitionRequest
createReportRegionStateTransitionRequest(
2438 final RegionStateTransitionContext context
) {
2439 final TransitionCode code
= context
.getCode();
2440 final long openSeqNum
= context
.getOpenSeqNum();
2441 final RegionInfo
[] hris
= context
.getHris();
2442 final long[] procIds
= context
.getProcIds();
2444 ReportRegionStateTransitionRequest
.Builder builder
=
2445 ReportRegionStateTransitionRequest
.newBuilder();
2446 builder
.setServer(ProtobufUtil
.toServerName(serverName
));
2447 RegionStateTransition
.Builder transition
= builder
.addTransitionBuilder();
2448 transition
.setTransitionCode(code
);
2449 if (code
== TransitionCode
.OPENED
&& openSeqNum
>= 0) {
2450 transition
.setOpenSeqNum(openSeqNum
);
2452 for (RegionInfo hri
: hris
) {
2453 transition
.addRegionInfo(ProtobufUtil
.toRegionInfo(hri
));
2455 for (long procId
: procIds
) {
2456 transition
.addProcId(procId
);
2459 return builder
.build();
2463 public boolean reportRegionStateTransition(final RegionStateTransitionContext context
) {
2464 if (TEST_SKIP_REPORTING_TRANSITION
) {
2465 return skipReportingTransition(context
);
2467 final ReportRegionStateTransitionRequest request
=
2468 createReportRegionStateTransitionRequest(context
);
2471 long pauseTime
= this.retryPauseTime
;
2472 // Keep looping till we get an error. We want to send reports even though server is going down.
2473 // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2474 // HRegionServer does down.
2475 while (this.asyncClusterConnection
!= null && !this.asyncClusterConnection
.isClosed()) {
2476 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2479 createRegionServerStatusStub();
2482 ReportRegionStateTransitionResponse response
=
2483 rss
.reportRegionStateTransition(null, request
);
2484 if (response
.hasErrorMessage()) {
2485 LOG
.info("TRANSITION FAILED " + request
+ ": " + response
.getErrorMessage());
2488 // Log if we had to retry else don't log unless TRACE. We want to
2489 // know if were successful after an attempt showed in logs as failed.
2490 if (tries
> 0 || LOG
.isTraceEnabled()) {
2491 LOG
.info("TRANSITION REPORTED " + request
);
2493 // NOTE: Return mid-method!!!
2495 } catch (ServiceException se
) {
2496 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2498 ioe
instanceof ServerNotRunningYetException
|| ioe
instanceof PleaseHoldException
2499 || ioe
instanceof CallQueueTooBigException
;
2501 // Do backoff else we flood the Master with requests.
2502 pauseTime
= ConnectionUtils
.getPauseTime(this.retryPauseTime
, tries
);
2504 pauseTime
= this.retryPauseTime
; // Reset.
2506 LOG
.info("Failed report transition " +
2507 TextFormat
.shortDebugString(request
) + "; retry (#" + tries
+ ")" +
2509 " after " + pauseTime
+ "ms delay (Master is coming online...).":
2512 if (pause
) Threads
.sleep(pauseTime
);
2514 if (rssStub
== rss
) {
2523 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2524 * block this thread. See RegionReplicaFlushHandler for details.
2526 private void triggerFlushInPrimaryRegion(final HRegion region
) {
2527 if (ServerRegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo())) {
2530 TableName tn
= region
.getTableDescriptor().getTableName();
2531 if (!ServerRegionReplicaUtil
.isRegionReplicaReplicationEnabled(region
.conf
, tn
) ||
2532 !ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(region
.conf
)) {
2533 region
.setReadsEnabled(true);
2537 region
.setReadsEnabled(false); // disable reads before marking the region as opened.
2538 // RegionReplicaFlushHandler might reset this.
2540 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2541 if (this.executorService
!= null) {
2542 this.executorService
.submit(new RegionReplicaFlushHandler(this, region
));
2544 LOG
.info("Executor is null; not running flush of primary region replica for {}",
2545 region
.getRegionInfo());
2550 public RpcServerInterface
getRpcServer() {
2551 return rpcServices
.rpcServer
;
2554 @InterfaceAudience.Private
2555 public RSRpcServices
getRSRpcServices() {
2560 * Cause the server to exit without closing the regions it is serving, the log
2561 * it is using and without notifying the master. Used unit testing and on
2562 * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2565 * the reason we are aborting
2567 * the exception that caused the abort, or null
2570 public void abort(String reason
, Throwable cause
) {
2571 if (!setAbortRequested()) {
2572 // Abort already in progress, ignore the new request.
2574 "Abort already in progress. Ignoring the current request with reason: {}", reason
);
2577 String msg
= "***** ABORTING region server " + this + ": " + reason
+ " *****";
2578 if (cause
!= null) {
2579 LOG
.error(HBaseMarkers
.FATAL
, msg
, cause
);
2581 LOG
.error(HBaseMarkers
.FATAL
, msg
);
2583 // HBASE-4014: show list of coprocessors that were loaded to help debug
2584 // regionserver crashes.Note that we're implicitly using
2585 // java.util.HashSet's toString() method to print the coprocessor names.
2586 LOG
.error(HBaseMarkers
.FATAL
, "RegionServer abort: loaded coprocessors are: " +
2587 CoprocessorHost
.getLoadedCoprocessors());
2588 // Try and dump metrics if abort -- might give clue as to how fatal came about....
2590 LOG
.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics
.dumpMetrics());
2591 } catch (MalformedObjectNameException
| IOException e
) {
2592 LOG
.warn("Failed dumping metrics", e
);
2595 // Do our best to report our abort to the master, but this may not work
2597 if (cause
!= null) {
2598 msg
+= "\nCause:\n" + Throwables
.getStackTraceAsString(cause
);
2600 // Report to the master but only if we have already registered with the master.
2601 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2602 if (rss
!= null && this.serverName
!= null) {
2603 ReportRSFatalErrorRequest
.Builder builder
=
2604 ReportRSFatalErrorRequest
.newBuilder();
2605 builder
.setServer(ProtobufUtil
.toServerName(this.serverName
));
2606 builder
.setErrorMessage(msg
);
2607 rss
.reportRSFatalError(null, builder
.build());
2609 } catch (Throwable t
) {
2610 LOG
.warn("Unable to report fatal error to master", t
);
2613 scheduleAbortTimer();
2614 // shutdown should be run as the internal user
2615 stop(reason
, true, null);
2619 * Sets the abort state if not already set.
2620 * @return True if abortRequested set to True successfully, false if an abort is already in
2623 protected boolean setAbortRequested() {
2624 return abortRequested
.compareAndSet(false, true);
2628 * @see HRegionServer#abort(String, Throwable)
2630 public void abort(String reason
) {
2631 abort(reason
, null);
2635 public boolean isAborted() {
2636 return abortRequested
.get();
2640 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2641 * logs but it does close socket in case want to bring up server on old
2642 * hostname+port immediately.
2644 @InterfaceAudience.Private
2645 protected void kill() {
2647 abort("Simulated kill");
2650 // Limits the time spent in the shutdown process.
2651 private void scheduleAbortTimer() {
2652 if (this.abortMonitor
== null) {
2653 this.abortMonitor
= new Timer("Abort regionserver monitor", true);
2654 TimerTask abortTimeoutTask
= null;
2656 Constructor
<?
extends TimerTask
> timerTaskCtor
=
2657 Class
.forName(conf
.get(ABORT_TIMEOUT_TASK
, SystemExitWhenAbortTimeout
.class.getName()))
2658 .asSubclass(TimerTask
.class).getDeclaredConstructor();
2659 timerTaskCtor
.setAccessible(true);
2660 abortTimeoutTask
= timerTaskCtor
.newInstance();
2661 } catch (Exception e
) {
2662 LOG
.warn("Initialize abort timeout task failed", e
);
2664 if (abortTimeoutTask
!= null) {
2665 abortMonitor
.schedule(abortTimeoutTask
, conf
.getLong(ABORT_TIMEOUT
, DEFAULT_ABORT_TIMEOUT
));
2670 protected final void shutdownChore(ScheduledChore chore
) {
2671 if (chore
!= null) {
2676 * Wait on all threads to finish. Presumption is that all closes and stops
2677 * have already been called.
2679 protected void stopServiceThreads() {
2680 // clean up the scheduled chores
2681 if (this.choreService
!= null) {
2682 shutdownChore(nonceManagerChore
);
2683 shutdownChore(compactionChecker
);
2684 shutdownChore(periodicFlusher
);
2685 shutdownChore(healthCheckChore
);
2686 shutdownChore(executorStatusChore
);
2687 shutdownChore(storefileRefresher
);
2688 shutdownChore(fsUtilizationChore
);
2689 shutdownChore(slowLogTableOpsChore
);
2690 // cancel the remaining scheduled chores (in case we missed out any)
2691 // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
2692 choreService
.shutdown();
2695 if (this.cacheFlusher
!= null) {
2696 this.cacheFlusher
.join();
2699 if (this.spanReceiverHost
!= null) {
2700 this.spanReceiverHost
.closeReceivers();
2702 if (this.walRoller
!= null) {
2703 this.walRoller
.close();
2705 if (this.compactSplitThread
!= null) {
2706 this.compactSplitThread
.join();
2708 if (this.executorService
!= null) {
2709 this.executorService
.shutdown();
2711 if (sameReplicationSourceAndSink
&& this.replicationSourceHandler
!= null) {
2712 this.replicationSourceHandler
.stopReplicationService();
2714 if (this.replicationSourceHandler
!= null) {
2715 this.replicationSourceHandler
.stopReplicationService();
2717 if (this.replicationSinkHandler
!= null) {
2718 this.replicationSinkHandler
.stopReplicationService();
2724 * @return Return the object that implements the replication
2725 * source executorService.
2728 public ReplicationSourceService
getReplicationSourceService() {
2729 return replicationSourceHandler
;
2733 * @return Return the object that implements the replication sink executorService.
2735 public ReplicationSinkService
getReplicationSinkService() {
2736 return replicationSinkHandler
;
2740 * Get the current master from ZooKeeper and open the RPC connection to it.
2741 * To get a fresh connection, the current rssStub must be null.
2742 * Method will block until a master is available. You can break from this
2743 * block by requesting the server stop.
2745 * @return master + port, or null if server has been stopped
2747 private synchronized ServerName
createRegionServerStatusStub() {
2748 // Create RS stub without refreshing the master node from ZK, use cached data
2749 return createRegionServerStatusStub(false);
2753 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2754 * connection, the current rssStub must be null. Method will block until a master is available.
2755 * You can break from this block by requesting the server stop.
2756 * @param refresh If true then master address will be read from ZK, otherwise use cached data
2757 * @return master + port, or null if server has been stopped
2759 @InterfaceAudience.Private
2760 protected synchronized ServerName
createRegionServerStatusStub(boolean refresh
) {
2761 if (rssStub
!= null) {
2762 return masterAddressTracker
.getMasterAddress();
2764 ServerName sn
= null;
2765 long previousLogTime
= 0;
2766 RegionServerStatusService
.BlockingInterface intRssStub
= null;
2767 LockService
.BlockingInterface intLockStub
= null;
2768 boolean interrupted
= false;
2770 while (keepLooping()) {
2771 sn
= this.masterAddressTracker
.getMasterAddress(refresh
);
2773 if (!keepLooping()) {
2774 // give up with no connection.
2775 LOG
.debug("No master found and cluster is stopped; bailing out");
2778 if (System
.currentTimeMillis() > (previousLogTime
+ 1000)) {
2779 LOG
.debug("No master found; retry");
2780 previousLogTime
= System
.currentTimeMillis();
2782 refresh
= true; // let's try pull it from ZK directly
2783 if (sleepInterrupted(200)) {
2789 // If we are on the active master, use the shortcut
2790 if (this instanceof HMaster
&& sn
.equals(getServerName())) {
2791 // Wrap the shortcut in a class providing our version to the calls where it's relevant.
2792 // Normally, RpcServer-based threadlocals do that.
2793 intRssStub
= new MasterRpcServicesVersionWrapper(((HMaster
)this).getMasterRpcServices());
2794 intLockStub
= ((HMaster
)this).getMasterRpcServices();
2798 BlockingRpcChannel channel
=
2799 this.rpcClient
.createBlockingRpcChannel(sn
, userProvider
.getCurrent(),
2800 shortOperationTimeout
);
2801 intRssStub
= RegionServerStatusService
.newBlockingStub(channel
);
2802 intLockStub
= LockService
.newBlockingStub(channel
);
2804 } catch (IOException e
) {
2805 if (System
.currentTimeMillis() > (previousLogTime
+ 1000)) {
2806 e
= e
instanceof RemoteException ?
2807 ((RemoteException
)e
).unwrapRemoteException() : e
;
2808 if (e
instanceof ServerNotRunningYetException
) {
2809 LOG
.info("Master isn't available yet, retrying");
2811 LOG
.warn("Unable to connect to master. Retrying. Error was:", e
);
2813 previousLogTime
= System
.currentTimeMillis();
2815 if (sleepInterrupted(200)) {
2822 Thread
.currentThread().interrupt();
2825 this.rssStub
= intRssStub
;
2826 this.lockStub
= intLockStub
;
2831 * @return True if we should break loop because cluster is going down or
2832 * this server has been stopped or hdfs has gone bad.
2834 private boolean keepLooping() {
2835 return !this.stopped
&& isClusterUp();
2839 * Let the master know we're here Run initialization using parameters passed
2841 * @return A Map of key/value configurations we got from the Master else
2842 * null if we failed to register.
2843 * @throws IOException
2845 private RegionServerStartupResponse
reportForDuty() throws IOException
{
2846 if (this.masterless
) return RegionServerStartupResponse
.getDefaultInstance();
2847 ServerName masterServerName
= createRegionServerStatusStub(true);
2848 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2849 if (masterServerName
== null || rss
== null) return null;
2850 RegionServerStartupResponse result
= null;
2852 rpcServices
.requestCount
.reset();
2853 rpcServices
.rpcGetRequestCount
.reset();
2854 rpcServices
.rpcScanRequestCount
.reset();
2855 rpcServices
.rpcFullScanRequestCount
.reset();
2856 rpcServices
.rpcMultiRequestCount
.reset();
2857 rpcServices
.rpcMutateRequestCount
.reset();
2858 LOG
.info("reportForDuty to master=" + masterServerName
+ " with port="
2859 + rpcServices
.isa
.getPort() + ", startcode=" + this.startcode
);
2860 long now
= EnvironmentEdgeManager
.currentTime();
2861 int port
= rpcServices
.isa
.getPort();
2862 RegionServerStartupRequest
.Builder request
= RegionServerStartupRequest
.newBuilder();
2863 if (!StringUtils
.isBlank(useThisHostnameInstead
)) {
2864 request
.setUseThisHostnameInstead(useThisHostnameInstead
);
2866 request
.setPort(port
);
2867 request
.setServerStartCode(this.startcode
);
2868 request
.setServerCurrentTime(now
);
2869 result
= rss
.regionServerStartup(null, request
.build());
2870 } catch (ServiceException se
) {
2871 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2872 if (ioe
instanceof ClockOutOfSyncException
) {
2873 LOG
.error(HBaseMarkers
.FATAL
, "Master rejected startup because clock is out of sync",
2875 // Re-throw IOE will cause RS to abort
2877 } else if (ioe
instanceof ServerNotRunningYetException
) {
2878 LOG
.debug("Master is not running yet");
2880 LOG
.warn("error telling master we are up", se
);
2888 public RegionStoreSequenceIds
getLastSequenceId(byte[] encodedRegionName
) {
2890 GetLastFlushedSequenceIdRequest req
=
2891 RequestConverter
.buildGetLastFlushedSequenceIdRequest(encodedRegionName
);
2892 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2893 if (rss
== null) { // Try to connect one more time
2894 createRegionServerStatusStub();
2897 // Still no luck, we tried
2898 LOG
.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2899 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2903 GetLastFlushedSequenceIdResponse resp
= rss
.getLastFlushedSequenceId(null, req
);
2904 return RegionStoreSequenceIds
.newBuilder()
2905 .setLastFlushedSequenceId(resp
.getLastFlushedSequenceId())
2906 .addAllStoreSequenceId(resp
.getStoreLastFlushedSequenceIdList()).build();
2907 } catch (ServiceException e
) {
2908 LOG
.warn("Unable to connect to the master to check the last flushed sequence id", e
);
2909 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2915 * Close meta region if we carry it
2916 * @param abort Whether we're running an abort.
2918 private void closeMetaTableRegions(final boolean abort
) {
2919 HRegion meta
= null;
2920 this.onlineRegionsLock
.writeLock().lock();
2922 for (Map
.Entry
<String
, HRegion
> e
: onlineRegions
.entrySet()) {
2923 RegionInfo hri
= e
.getValue().getRegionInfo();
2924 if (hri
.isMetaRegion()) {
2925 meta
= e
.getValue();
2927 if (meta
!= null) break;
2930 this.onlineRegionsLock
.writeLock().unlock();
2932 if (meta
!= null) closeRegionIgnoreErrors(meta
.getRegionInfo(), abort
);
2936 * Schedule closes on all user regions.
2937 * Should be safe calling multiple times because it wont' close regions
2938 * that are already closed or that are closing.
2939 * @param abort Whether we're running an abort.
2941 private void closeUserRegions(final boolean abort
) {
2942 this.onlineRegionsLock
.writeLock().lock();
2944 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
2945 HRegion r
= e
.getValue();
2946 if (!r
.getRegionInfo().isMetaRegion() && r
.isAvailable()) {
2947 // Don't update zk with this close transition; pass false.
2948 closeRegionIgnoreErrors(r
.getRegionInfo(), abort
);
2952 this.onlineRegionsLock
.writeLock().unlock();
2956 /** @return the info server */
2957 public InfoServer
getInfoServer() {
2962 * @return true if a stop has been requested.
2965 public boolean isStopped() {
2966 return this.stopped
;
2970 public boolean isStopping() {
2971 return this.stopping
;
2975 public Configuration
getConfiguration() {
2979 protected Map
<String
, HRegion
> getOnlineRegions() {
2980 return this.onlineRegions
;
2983 public int getNumberOfOnlineRegions() {
2984 return this.onlineRegions
.size();
2988 * For tests, web ui and metrics.
2989 * This method will only work if HRegionServer is in the same JVM as client;
2990 * HRegion cannot be serialized to cross an rpc.
2992 public Collection
<HRegion
> getOnlineRegionsLocalContext() {
2993 Collection
<HRegion
> regions
= this.onlineRegions
.values();
2994 return Collections
.unmodifiableCollection(regions
);
2998 public void addRegion(HRegion region
) {
2999 this.onlineRegions
.put(region
.getRegionInfo().getEncodedName(), region
);
3000 configurationManager
.registerObserver(region
);
3003 private void addRegion(SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
, HRegion region
,
3005 if (!sortedRegions
.containsKey(size
)) {
3006 sortedRegions
.put(size
, new ArrayList
<>());
3008 sortedRegions
.get(size
).add(region
);
3011 * @return A new Map of online regions sorted by region off-heap size with the first entry being
3014 SortedMap
<Long
, Collection
<HRegion
>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
3015 // we'll sort the regions in reverse
3016 SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
= new TreeMap
<>(Comparator
.reverseOrder());
3017 // Copy over all regions. Regions are sorted by size with biggest first.
3018 for (HRegion region
: this.onlineRegions
.values()) {
3019 addRegion(sortedRegions
, region
, region
.getMemStoreOffHeapSize());
3021 return sortedRegions
;
3025 * @return A new Map of online regions sorted by region heap size with the first entry being the
3028 SortedMap
<Long
, Collection
<HRegion
>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
3029 // we'll sort the regions in reverse
3030 SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
= new TreeMap
<>(Comparator
.reverseOrder());
3031 // Copy over all regions. Regions are sorted by size with biggest first.
3032 for (HRegion region
: this.onlineRegions
.values()) {
3033 addRegion(sortedRegions
, region
, region
.getMemStoreHeapSize());
3035 return sortedRegions
;
3039 * @return time stamp in millis of when this region server was started
3041 public long getStartcode() {
3042 return this.startcode
;
3045 /** @return reference to FlushRequester */
3047 public FlushRequester
getFlushRequester() {
3048 return this.cacheFlusher
;
3052 public CompactionRequester
getCompactionRequestor() {
3053 return this.compactSplitThread
;
3057 public LeaseManager
getLeaseManager() {
3058 return leaseManager
;
3062 * @return Return the rootDir.
3064 protected Path
getDataRootDir() {
3069 public FileSystem
getFileSystem() {
3074 * @return {@code true} when the data file system is available, {@code false} otherwise.
3076 boolean isDataFileSystemOk() {
3077 return this.dataFsOk
;
3081 * @return Return the walRootDir.
3083 public Path
getWALRootDir() {
3088 * @return Return the walFs.
3090 public FileSystem
getWALFileSystem() {
3095 public String
toString() {
3096 return getServerName().toString();
3100 public ZKWatcher
getZooKeeper() {
3105 public CoordinatedStateManager
getCoordinatedStateManager() {
3110 public ServerName
getServerName() {
3114 public RegionServerCoprocessorHost
getRegionServerCoprocessorHost(){
3119 public ConcurrentMap
<byte[], Boolean
> getRegionsInTransitionInRS() {
3120 return this.regionsInTransitionInRS
;
3124 public ExecutorService
getExecutorService() {
3125 return executorService
;
3129 public ChoreService
getChoreService() {
3130 return choreService
;
3134 public RegionServerRpcQuotaManager
getRegionServerRpcQuotaManager() {
3135 return rsQuotaManager
;
3139 // Main program and support routines
3142 * Load the replication executorService objects, if any
3144 private static void createNewReplicationInstance(Configuration conf
, HRegionServer server
,
3145 FileSystem walFs
, Path walDir
, Path oldWALDir
, WALFactory walFactory
) throws IOException
{
3146 // read in the name of the source replication class from the config file.
3147 String sourceClassname
= conf
.get(HConstants
.REPLICATION_SOURCE_SERVICE_CLASSNAME
,
3148 HConstants
.REPLICATION_SERVICE_CLASSNAME_DEFAULT
);
3150 // read in the name of the sink replication class from the config file.
3151 String sinkClassname
= conf
.get(HConstants
.REPLICATION_SINK_SERVICE_CLASSNAME
,
3152 HConstants
.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT
);
3154 // If both the sink and the source class names are the same, then instantiate
3156 if (sourceClassname
.equals(sinkClassname
)) {
3157 server
.replicationSourceHandler
= newReplicationInstance(sourceClassname
,
3158 ReplicationSourceService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walFactory
);
3159 server
.replicationSinkHandler
= (ReplicationSinkService
) server
.replicationSourceHandler
;
3160 server
.sameReplicationSourceAndSink
= true;
3162 server
.replicationSourceHandler
= newReplicationInstance(sourceClassname
,
3163 ReplicationSourceService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walFactory
);
3164 server
.replicationSinkHandler
= newReplicationInstance(sinkClassname
,
3165 ReplicationSinkService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walFactory
);
3166 server
.sameReplicationSourceAndSink
= false;
3170 private static <T
extends ReplicationService
> T
newReplicationInstance(String classname
,
3171 Class
<T
> xface
, Configuration conf
, HRegionServer server
, FileSystem walFs
, Path logDir
,
3172 Path oldLogDir
, WALFactory walFactory
) throws IOException
{
3173 final Class
<?
extends T
> clazz
;
3175 ClassLoader classLoader
= Thread
.currentThread().getContextClassLoader();
3176 clazz
= Class
.forName(classname
, true, classLoader
).asSubclass(xface
);
3177 } catch (java
.lang
.ClassNotFoundException nfe
) {
3178 throw new IOException("Could not find class for " + classname
);
3180 T service
= ReflectionUtils
.newInstance(clazz
, conf
);
3181 service
.initialize(server
, walFs
, logDir
, oldLogDir
, walFactory
);
3185 public Map
<String
, ReplicationStatus
> getWalGroupsReplicationStatus(){
3186 Map
<String
, ReplicationStatus
> walGroupsReplicationStatus
= new TreeMap
<>();
3187 if(!this.isOnline()){
3188 return walGroupsReplicationStatus
;
3190 List
<ReplicationSourceInterface
> allSources
= new ArrayList
<>();
3191 allSources
.addAll(replicationSourceHandler
.getReplicationManager().getSources());
3192 allSources
.addAll(replicationSourceHandler
.getReplicationManager().getOldSources());
3193 for(ReplicationSourceInterface source
: allSources
){
3194 walGroupsReplicationStatus
.putAll(source
.getWalGroupStatus());
3196 return walGroupsReplicationStatus
;
3200 * Utility for constructing an instance of the passed HRegionServer class.
3202 static HRegionServer
constructRegionServer(
3203 final Class
<?
extends HRegionServer
> regionServerClass
,
3204 final Configuration conf
3207 Constructor
<?
extends HRegionServer
> c
=
3208 regionServerClass
.getConstructor(Configuration
.class);
3209 return c
.newInstance(conf
);
3210 } catch (Exception e
) {
3211 throw new RuntimeException("Failed construction of " + "Regionserver: "
3212 + regionServerClass
.toString(), e
);
3217 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
3219 public static void main(String
[] args
) {
3220 LOG
.info("STARTING executorService " + HRegionServer
.class.getSimpleName());
3221 VersionInfo
.logVersion();
3222 Configuration conf
= HBaseConfiguration
.create();
3223 @SuppressWarnings("unchecked")
3224 Class
<?
extends HRegionServer
> regionServerClass
= (Class
<?
extends HRegionServer
>) conf
3225 .getClass(HConstants
.REGION_SERVER_IMPL
, HRegionServer
.class);
3227 new HRegionServerCommandLine(regionServerClass
).doMain(args
);
3231 * Gets the online regions of the specified table.
3232 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>.
3233 * Only returns <em>online</em> regions. If a region on this table has been
3234 * closed during a disable, etc., it will not be included in the returned list.
3235 * So, the returned list may not necessarily be ALL regions in this table, its
3236 * all the ONLINE regions in the table.
3237 * @param tableName table to limit the scope of the query
3238 * @return Online regions from <code>tableName</code>
3241 public List
<HRegion
> getRegions(TableName tableName
) {
3242 List
<HRegion
> tableRegions
= new ArrayList
<>();
3243 synchronized (this.onlineRegions
) {
3244 for (HRegion region
: this.onlineRegions
.values()) {
3245 RegionInfo regionInfo
= region
.getRegionInfo();
3246 if(regionInfo
.getTable().equals(tableName
)) {
3247 tableRegions
.add(region
);
3251 return tableRegions
;
3255 public List
<HRegion
> getRegions() {
3256 List
<HRegion
> allRegions
;
3257 synchronized (this.onlineRegions
) {
3258 // Return a clone copy of the onlineRegions
3259 allRegions
= new ArrayList
<>(onlineRegions
.values());
3265 * Gets the online tables in this RS.
3266 * This method looks at the in-memory onlineRegions.
3267 * @return all the online tables in this RS
3269 public Set
<TableName
> getOnlineTables() {
3270 Set
<TableName
> tables
= new HashSet
<>();
3271 synchronized (this.onlineRegions
) {
3272 for (Region region
: this.onlineRegions
.values()) {
3273 tables
.add(region
.getTableDescriptor().getTableName());
3279 public String
[] getRegionServerCoprocessors() {
3280 TreeSet
<String
> coprocessors
= new TreeSet
<>();
3282 coprocessors
.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3283 } catch (IOException exception
) {
3284 LOG
.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
3286 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
3288 Collection
<HRegion
> regions
= getOnlineRegionsLocalContext();
3289 for (HRegion region
: regions
) {
3290 coprocessors
.addAll(region
.getCoprocessorHost().getCoprocessors());
3292 coprocessors
.addAll(getWAL(region
.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3293 } catch (IOException exception
) {
3294 LOG
.warn("Exception attempting to fetch wal coprocessor information for region " + region
+
3296 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
3299 coprocessors
.addAll(rsHost
.getCoprocessors());
3300 return coprocessors
.toArray(new String
[0]);
3304 * Try to close the region, logs a warning on failure but continues.
3305 * @param region Region to close
3307 private void closeRegionIgnoreErrors(RegionInfo region
, final boolean abort
) {
3309 if (!closeRegion(region
.getEncodedName(), abort
, null)) {
3310 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
3311 " - ignoring and continuing");
3313 } catch (IOException e
) {
3314 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
3315 " - ignoring and continuing", e
);
3320 * Close asynchronously a region, can be called from the master or internally by the regionserver
3321 * when stopping. If called from the master, the region will update the status.
3324 * If an opening was in progress, this method will cancel it, but will not start a new close. The
3325 * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3329 * If a close was in progress, this new request will be ignored, and an exception thrown.
3332 * @param encodedName Region to close
3333 * @param abort True if we are aborting
3334 * @param destination Where the Region is being moved too... maybe null if unknown.
3335 * @return True if closed a region.
3336 * @throws NotServingRegionException if the region is not online
3338 protected boolean closeRegion(String encodedName
, final boolean abort
,
3339 final ServerName destination
)
3340 throws NotServingRegionException
{
3341 //Check for permissions to close.
3342 HRegion actualRegion
= this.getRegion(encodedName
);
3343 // Can be null if we're calling close on a region that's not online
3344 if ((actualRegion
!= null) && (actualRegion
.getCoprocessorHost() != null)) {
3346 actualRegion
.getCoprocessorHost().preClose(false);
3347 } catch (IOException exp
) {
3348 LOG
.warn("Unable to close region: the coprocessor launched an error ", exp
);
3353 // previous can come back 'null' if not in map.
3354 final Boolean previous
= this.regionsInTransitionInRS
.putIfAbsent(Bytes
.toBytes(encodedName
),
3357 if (Boolean
.TRUE
.equals(previous
)) {
3358 LOG
.info("Received CLOSE for the region:" + encodedName
+ " , which we are already " +
3359 "trying to OPEN. Cancelling OPENING.");
3360 if (!regionsInTransitionInRS
.replace(Bytes
.toBytes(encodedName
), previous
, Boolean
.FALSE
)) {
3361 // The replace failed. That should be an exceptional case, but theoretically it can happen.
3362 // We're going to try to do a standard close then.
3363 LOG
.warn("The opening for region " + encodedName
+ " was done before we could cancel it." +
3364 " Doing a standard close now");
3365 return closeRegion(encodedName
, abort
, destination
);
3367 // Let's get the region from the online region list again
3368 actualRegion
= this.getRegion(encodedName
);
3369 if (actualRegion
== null) { // If already online, we still need to close it.
3370 LOG
.info("The opening previously in progress has been cancelled by a CLOSE request.");
3371 // The master deletes the znode when it receives this exception.
3372 throw new NotServingRegionException("The region " + encodedName
+
3373 " was opening but not yet served. Opening is cancelled.");
3375 } else if (previous
== null) {
3376 LOG
.info("Received CLOSE for {}", encodedName
);
3377 } else if (Boolean
.FALSE
.equals(previous
)) {
3378 LOG
.info("Received CLOSE for the region: " + encodedName
+
3379 ", which we are already trying to CLOSE, but not completed yet");
3383 if (actualRegion
== null) {
3384 LOG
.debug("Received CLOSE for a region which is not online, and we're not opening.");
3385 this.regionsInTransitionInRS
.remove(Bytes
.toBytes(encodedName
));
3386 // The master deletes the znode when it receives this exception.
3387 throw new NotServingRegionException("The region " + encodedName
+
3388 " is not online, and is not opening.");
3391 CloseRegionHandler crh
;
3392 final RegionInfo hri
= actualRegion
.getRegionInfo();
3393 if (hri
.isMetaRegion()) {
3394 crh
= new CloseMetaHandler(this, this, hri
, abort
);
3396 crh
= new CloseRegionHandler(this, this, hri
, abort
, destination
);
3398 this.executorService
.submit(crh
);
3403 * @return HRegion for the passed binary <code>regionName</code> or null if
3404 * named region is not member of the online regions.
3406 public HRegion
getOnlineRegion(final byte[] regionName
) {
3407 String encodedRegionName
= RegionInfo
.encodeRegionName(regionName
);
3408 return this.onlineRegions
.get(encodedRegionName
);
3412 public HRegion
getRegion(final String encodedRegionName
) {
3413 return this.onlineRegions
.get(encodedRegionName
);
3418 public boolean removeRegion(final HRegion r
, ServerName destination
) {
3419 HRegion toReturn
= this.onlineRegions
.remove(r
.getRegionInfo().getEncodedName());
3420 metricsRegionServerImpl
.requestsCountCache
.remove(r
.getRegionInfo().getEncodedName());
3421 if (destination
!= null) {
3422 long closeSeqNum
= r
.getMaxFlushedSeqId();
3423 if (closeSeqNum
== HConstants
.NO_SEQNUM
) {
3424 // No edits in WAL for this region; get the sequence number when the region was opened.
3425 closeSeqNum
= r
.getOpenSeqNum();
3426 if (closeSeqNum
== HConstants
.NO_SEQNUM
) closeSeqNum
= 0;
3428 boolean selfMove
= ServerName
.isSameAddress(destination
, this.getServerName());
3429 addToMovedRegions(r
.getRegionInfo().getEncodedName(), destination
, closeSeqNum
, selfMove
);
3431 this.regionServerAccounting
.getRetainedRegionRWRequestsCnt().put(r
.getRegionInfo().getEncodedName()
3432 , new Pair
<>(r
.getReadRequestsCount(), r
.getWriteRequestsCount()));
3435 this.regionFavoredNodesMap
.remove(r
.getRegionInfo().getEncodedName());
3436 configurationManager
.deregisterObserver(r
);
3437 return toReturn
!= null;
3441 * Protected Utility method for safely obtaining an HRegion handle.
3443 * @param regionName Name of online {@link HRegion} to return
3444 * @return {@link HRegion} for <code>regionName</code>
3446 protected HRegion
getRegion(final byte[] regionName
)
3447 throws NotServingRegionException
{
3448 String encodedRegionName
= RegionInfo
.encodeRegionName(regionName
);
3449 return getRegionByEncodedName(regionName
, encodedRegionName
);
3452 public HRegion
getRegionByEncodedName(String encodedRegionName
)
3453 throws NotServingRegionException
{
3454 return getRegionByEncodedName(null, encodedRegionName
);
3457 private HRegion
getRegionByEncodedName(byte[] regionName
, String encodedRegionName
)
3458 throws NotServingRegionException
{
3459 HRegion region
= this.onlineRegions
.get(encodedRegionName
);
3460 if (region
== null) {
3461 MovedRegionInfo moveInfo
= getMovedRegion(encodedRegionName
);
3462 if (moveInfo
!= null) {
3463 throw new RegionMovedException(moveInfo
.getServerName(), moveInfo
.getSeqNum());
3465 Boolean isOpening
= this.regionsInTransitionInRS
.get(Bytes
.toBytes(encodedRegionName
));
3466 String regionNameStr
= regionName
== null?
3467 encodedRegionName
: Bytes
.toStringBinary(regionName
);
3468 if (isOpening
!= null && isOpening
) {
3469 throw new RegionOpeningException("Region " + regionNameStr
+
3470 " is opening on " + this.serverName
);
3472 throw new NotServingRegionException("" + regionNameStr
+
3473 " is not online on " + this.serverName
);
3479 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3480 * IOE if it isn't already.
3482 * @param t Throwable
3483 * @param msg Message to log in error. Can be null.
3484 * @return Throwable converted to an IOE; methods can only let out IOEs.
3486 private Throwable
cleanup(final Throwable t
, final String msg
) {
3487 // Don't log as error if NSRE; NSRE is 'normal' operation.
3488 if (t
instanceof NotServingRegionException
) {
3489 LOG
.debug("NotServingRegionException; " + t
.getMessage());
3492 Throwable e
= t
instanceof RemoteException ?
((RemoteException
) t
).unwrapRemoteException() : t
;
3498 if (!rpcServices
.checkOOME(t
)) {
3505 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3506 * @return Make <code>t</code> an IOE if it isn't already.
3508 private IOException
convertThrowableToIOE(final Throwable t
, final String msg
) {
3509 return (t
instanceof IOException ?
(IOException
) t
: msg
== null
3510 || msg
.length() == 0 ?
new IOException(t
) : new IOException(msg
, t
));
3514 * Checks to see if the file system is still accessible. If not, sets
3515 * abortRequested and stopRequested
3517 * @return false if file system is not available
3519 boolean checkFileSystem() {
3520 if (this.dataFsOk
&& this.dataFs
!= null) {
3522 FSUtils
.checkFileSystemAvailable(this.dataFs
);
3523 } catch (IOException e
) {
3524 abort("File System not available", e
);
3525 this.dataFsOk
= false;
3528 return this.dataFsOk
;
3532 public void updateRegionFavoredNodesMapping(String encodedRegionName
,
3533 List
<org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.ServerName
> favoredNodes
) {
3534 Address
[] addr
= new Address
[favoredNodes
.size()];
3535 // Refer to the comment on the declaration of regionFavoredNodesMap on why
3536 // it is a map of region name to Address[]
3537 for (int i
= 0; i
< favoredNodes
.size(); i
++) {
3538 addr
[i
] = Address
.fromParts(favoredNodes
.get(i
).getHostName(),
3539 favoredNodes
.get(i
).getPort());
3541 regionFavoredNodesMap
.put(encodedRegionName
, addr
);
3545 * Return the favored nodes for a region given its encoded name. Look at the
3546 * comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[]
3548 * @param encodedRegionName
3549 * @return array of favored locations
3552 public InetSocketAddress
[] getFavoredNodesForRegion(String encodedRegionName
) {
3553 return Address
.toSocketAddress(regionFavoredNodesMap
.get(encodedRegionName
));
3557 public ServerNonceManager
getNonceManager() {
3558 return this.nonceManager
;
3561 private static class MovedRegionInfo
{
3562 private final ServerName serverName
;
3563 private final long seqNum
;
3565 MovedRegionInfo(ServerName serverName
, long closeSeqNum
) {
3566 this.serverName
= serverName
;
3567 this.seqNum
= closeSeqNum
;
3570 public ServerName
getServerName() {
3574 public long getSeqNum() {
3580 * We need a timeout. If not there is a risk of giving a wrong information: this would double
3581 * the number of network calls instead of reducing them.
3583 private static final int TIMEOUT_REGION_MOVED
= (2 * 60 * 1000);
3585 private void addToMovedRegions(String encodedName
, ServerName destination
, long closeSeqNum
, boolean selfMove
) {
3587 LOG
.warn("Not adding moved region record: " + encodedName
+ " to self.");
3590 LOG
.info("Adding " + encodedName
+ " move to " + destination
+ " record at close sequenceid=" +
3592 movedRegionInfoCache
.put(encodedName
, new MovedRegionInfo(destination
, closeSeqNum
));
3595 void removeFromMovedRegions(String encodedName
) {
3596 movedRegionInfoCache
.invalidate(encodedName
);
3599 @InterfaceAudience.Private
3600 public MovedRegionInfo
getMovedRegion(String encodedRegionName
) {
3601 return movedRegionInfoCache
.getIfPresent(encodedRegionName
);
3604 @InterfaceAudience.Private
3605 public int movedRegionCacheExpiredTime() {
3606 return TIMEOUT_REGION_MOVED
;
3609 private String
getMyEphemeralNodePath() {
3610 return ZNodePaths
.joinZNode(this.zooKeeper
.getZNodePaths().rsZNode
, getServerName().toString());
3613 private boolean isHealthCheckerConfigured() {
3614 String healthScriptLocation
= this.conf
.get(HConstants
.HEALTH_SCRIPT_LOC
);
3615 return org
.apache
.commons
.lang3
.StringUtils
.isNotBlank(healthScriptLocation
);
3619 * @return the underlying {@link CompactSplit} for the servers
3621 public CompactSplit
getCompactSplitThread() {
3622 return this.compactSplitThread
;
3625 CoprocessorServiceResponse
execRegionServerService(
3626 @SuppressWarnings("UnusedParameters") final RpcController controller
,
3627 final CoprocessorServiceRequest serviceRequest
) throws ServiceException
{
3629 ServerRpcController serviceController
= new ServerRpcController();
3630 CoprocessorServiceCall call
= serviceRequest
.getCall();
3631 String serviceName
= call
.getServiceName();
3632 Service service
= coprocessorServiceHandlers
.get(serviceName
);
3633 if (service
== null) {
3634 throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
3637 ServiceDescriptor serviceDesc
=
3638 service
.getDescriptorForType();
3640 String methodName
= call
.getMethodName();
3641 MethodDescriptor methodDesc
=
3642 serviceDesc
.findMethodByName(methodName
);
3643 if (methodDesc
== null) {
3644 throw new UnknownProtocolException(service
.getClass(), "Unknown method " + methodName
+
3645 " called on executorService " + serviceName
);
3649 CoprocessorRpcUtils
.getRequest(service
, methodDesc
, call
.getRequest());
3650 final Message
.Builder responseBuilder
=
3651 service
.getResponsePrototype(methodDesc
).newBuilderForType();
3652 service
.callMethod(methodDesc
, serviceController
, request
, message
-> {
3653 if (message
!= null) {
3654 responseBuilder
.mergeFrom(message
);
3657 IOException exception
= CoprocessorRpcUtils
.getControllerException(serviceController
);
3658 if (exception
!= null) {
3661 return CoprocessorRpcUtils
.getResponse(responseBuilder
.build(), HConstants
.EMPTY_BYTE_ARRAY
);
3662 } catch (IOException ie
) {
3663 throw new ServiceException(ie
);
3668 * May be null if this is a master which not carry table.
3670 * @return The block cache instance used by the regionserver.
3673 public Optional
<BlockCache
> getBlockCache() {
3674 return Optional
.ofNullable(this.blockCache
);
3678 * May be null if this is a master which not carry table.
3680 * @return The cache for mob files used by the regionserver.
3683 public Optional
<MobFileCache
> getMobFileCache() {
3684 return Optional
.ofNullable(this.mobFileCache
);
3688 public AccessChecker
getAccessChecker() {
3689 return rpcServices
.getAccessChecker();
3693 public ZKPermissionWatcher
getZKPermissionWatcher() {
3694 return rpcServices
.getZkPermissionWatcher();
3698 * @return : Returns the ConfigurationManager object for testing purposes.
3700 @InterfaceAudience.Private
3701 ConfigurationManager
getConfigurationManager() {
3702 return configurationManager
;
3706 * @return Return table descriptors implementation.
3709 public TableDescriptors
getTableDescriptors() {
3710 return this.tableDescriptors
;
3714 * Reload the configuration from disk.
3716 void updateConfiguration() {
3717 LOG
.info("Reloading the configuration from disk.");
3718 // Reload the configuration from disk.
3719 conf
.reloadConfiguration();
3720 configurationManager
.notifyAllObservers(conf
);
3723 CacheEvictionStats
clearRegionBlockCache(Region region
) {
3724 long evictedBlocks
= 0;
3726 for(Store store
: region
.getStores()) {
3727 for(StoreFile hFile
: store
.getStorefiles()) {
3728 evictedBlocks
+= blockCache
.evictBlocksByHfileName(hFile
.getPath().getName());
3732 return CacheEvictionStats
.builder()
3733 .withEvictedBlocks(evictedBlocks
)
3738 public double getCompactionPressure() {
3740 for (Region region
: onlineRegions
.values()) {
3741 for (Store store
: region
.getStores()) {
3742 double normCount
= store
.getCompactionPressure();
3743 if (normCount
> max
) {
3752 public HeapMemoryManager
getHeapMemoryManager() {
3756 MemStoreFlusher
getMemStoreFlusher() {
3757 return cacheFlusher
;
3762 * @return whether all wal roll request finished for this regionserver
3764 @InterfaceAudience.Private
3765 public boolean walRollRequestFinished() {
3766 return this.walRoller
.walRollFinished();
3770 public ThroughputController
getFlushThroughputController() {
3771 return flushThroughputController
;
3775 public double getFlushPressure() {
3776 if (getRegionServerAccounting() == null || cacheFlusher
== null) {
3777 // return 0 during RS initialization
3780 return getRegionServerAccounting().getFlushPressure();
3784 public void onConfigurationChange(Configuration newConf
) {
3785 ThroughputController old
= this.flushThroughputController
;
3787 old
.stop("configuration change");
3789 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, newConf
);
3791 Superusers
.initialize(newConf
);
3792 } catch (IOException e
) {
3793 LOG
.warn("Failed to initialize SuperUsers on reloading of the configuration");
3798 public MetricsRegionServer
getMetrics() {
3799 return metricsRegionServer
;
3803 public SecureBulkLoadManager
getSecureBulkLoadManager() {
3804 return this.secureBulkLoadManager
;
3808 public EntityLock
regionLock(final List
<RegionInfo
> regionInfo
, final String description
,
3809 final Abortable abort
) {
3810 final LockServiceClient client
=
3811 new LockServiceClient(conf
, lockStub
, asyncClusterConnection
.getNonceGenerator());
3812 return client
.regionLock(regionInfo
, description
, abort
);
3816 public void unassign(byte[] regionName
) throws IOException
{
3817 FutureUtils
.get(asyncClusterConnection
.getAdmin().unassign(regionName
, false));
3821 public RegionServerSpaceQuotaManager
getRegionServerSpaceQuotaManager() {
3822 return this.rsSpaceQuotaManager
;
3826 public boolean reportFileArchivalForQuotas(TableName tableName
,
3827 Collection
<Entry
<String
, Long
>> archivedFiles
) {
3828 if (TEST_SKIP_REPORTING_TRANSITION
) {
3831 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
3832 if (rss
== null || rsSpaceQuotaManager
== null) {
3833 // the current server could be stopping.
3834 LOG
.trace("Skipping file archival reporting to HMaster as stub is null");
3838 RegionServerStatusProtos
.FileArchiveNotificationRequest request
=
3839 rsSpaceQuotaManager
.buildFileArchiveRequest(tableName
, archivedFiles
);
3840 rss
.reportFileArchival(null, request
);
3841 } catch (ServiceException se
) {
3842 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
3843 if (ioe
instanceof PleaseHoldException
) {
3844 if (LOG
.isTraceEnabled()) {
3845 LOG
.trace("Failed to report file archival(s) to Master because it is initializing."
3846 + " This will be retried.", ioe
);
3848 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3851 if (rssStub
== rss
) {
3854 // re-create the stub if we failed to report the archival
3855 createRegionServerStatusStub(true);
3856 LOG
.debug("Failed to report file archival(s) to Master. This will be retried.", ioe
);
3862 public NettyEventLoopGroupConfig
getEventLoopGroupConfig() {
3863 return eventLoopGroupConfig
;
3867 public Connection
createConnection(Configuration conf
) throws IOException
{
3868 User user
= UserProvider
.instantiate(conf
).getCurrent();
3869 return ConnectionFactory
.createConnection(conf
, null, user
);
3872 void executeProcedure(long procId
, RSProcedureCallable callable
) {
3873 executorService
.submit(new RSProcedureHandler(this, procId
, callable
));
3876 public void remoteProcedureComplete(long procId
, Throwable error
) {
3877 procedureResultReporter
.complete(procId
, error
);
3880 void reportProcedureDone(ReportProcedureDoneRequest request
) throws IOException
{
3881 RegionServerStatusService
.BlockingInterface rss
;
3882 // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3888 createRegionServerStatusStub();
3891 rss
.reportProcedureDone(null, request
);
3892 } catch (ServiceException se
) {
3893 if (rssStub
== rss
) {
3896 throw ProtobufUtil
.getRemoteException(se
);
3901 * Will ignore the open/close region procedures which already submitted or executed.
3903 * When master had unfinished open/close region procedure and restarted, new active master may
3904 * send duplicate open/close region request to regionserver. The open/close request is submitted
3905 * to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3907 * After the open/close region request executed and report region transition succeed, cache it in
3908 * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3909 * transition succeed, master will not send the open/close region request to regionserver again.
3910 * And we thought that the ongoing duplicate open/close region request should not be delayed more
3911 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
3913 * See HBASE-22404 for more details.
3915 * @param procId the id of the open/close region procedure
3916 * @return true if the procedure can be submitted.
3918 boolean submitRegionProcedure(long procId
) {
3922 // Ignore the region procedures which already submitted.
3923 Long previous
= submittedRegionProcedures
.putIfAbsent(procId
, procId
);
3924 if (previous
!= null) {
3925 LOG
.warn("Received procedure pid={}, which already submitted, just ignore it", procId
);
3928 // Ignore the region procedures which already executed.
3929 if (executedRegionProcedures
.getIfPresent(procId
) != null) {
3930 LOG
.warn("Received procedure pid={}, which already executed, just ignore it", procId
);
3937 * See {@link #submitRegionProcedure(long)}.
3938 * @param procId the id of the open/close region procedure
3940 public void finishRegionProcedure(long procId
) {
3941 executedRegionProcedures
.put(procId
, procId
);
3942 submittedRegionProcedures
.remove(procId
);
3945 public boolean isShutDown() {
3950 * Force to terminate region server when abort timeout.
3952 private static class SystemExitWhenAbortTimeout
extends TimerTask
{
3954 public SystemExitWhenAbortTimeout() {
3959 LOG
.warn("Aborting region server timed out, terminating forcibly" +
3960 " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3961 " Thread dump to stdout.");
3962 Threads
.printThreadInfo(System
.out
, "Zombie HRegionServer");
3963 Runtime
.getRuntime().halt(1);
3968 public AsyncClusterConnection
getAsyncClusterConnection() {
3969 return asyncClusterConnection
;
3972 @InterfaceAudience.Private
3973 public CompactedHFilesDischarger
getCompactedHFilesDischarger() {
3974 return compactedFileDischarger
;
3978 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3979 * @return pause time
3981 @InterfaceAudience.Private
3982 public long getRetryPauseTime() {
3983 return this.retryPauseTime
;