2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER
;
22 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_COORDINATED_BY_ZK
;
23 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_SPLIT_WAL_MAX_SPLITTER
;
25 import java
.io
.IOException
;
26 import java
.lang
.Thread
.UncaughtExceptionHandler
;
27 import java
.lang
.management
.MemoryType
;
28 import java
.lang
.management
.MemoryUsage
;
29 import java
.lang
.reflect
.Constructor
;
30 import java
.net
.BindException
;
31 import java
.net
.InetAddress
;
32 import java
.net
.InetSocketAddress
;
33 import java
.time
.Duration
;
34 import java
.util
.ArrayList
;
35 import java
.util
.Collection
;
36 import java
.util
.Collections
;
37 import java
.util
.Comparator
;
38 import java
.util
.HashSet
;
39 import java
.util
.Iterator
;
40 import java
.util
.List
;
42 import java
.util
.Map
.Entry
;
43 import java
.util
.Objects
;
44 import java
.util
.Optional
;
46 import java
.util
.SortedMap
;
47 import java
.util
.Timer
;
48 import java
.util
.TimerTask
;
49 import java
.util
.TreeMap
;
50 import java
.util
.TreeSet
;
51 import java
.util
.concurrent
.ConcurrentHashMap
;
52 import java
.util
.concurrent
.ConcurrentMap
;
53 import java
.util
.concurrent
.ConcurrentSkipListMap
;
54 import java
.util
.concurrent
.TimeUnit
;
55 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
56 import java
.util
.concurrent
.locks
.ReentrantReadWriteLock
;
57 import java
.util
.function
.Function
;
58 import javax
.management
.MalformedObjectNameException
;
59 import javax
.servlet
.http
.HttpServlet
;
60 import org
.apache
.commons
.lang3
.RandomUtils
;
61 import org
.apache
.commons
.lang3
.StringUtils
;
62 import org
.apache
.commons
.lang3
.SystemUtils
;
63 import org
.apache
.hadoop
.conf
.Configuration
;
64 import org
.apache
.hadoop
.fs
.FileSystem
;
65 import org
.apache
.hadoop
.fs
.Path
;
66 import org
.apache
.hadoop
.hbase
.Abortable
;
67 import org
.apache
.hadoop
.hbase
.CacheEvictionStats
;
68 import org
.apache
.hadoop
.hbase
.ChoreService
;
69 import org
.apache
.hadoop
.hbase
.ClockOutOfSyncException
;
70 import org
.apache
.hadoop
.hbase
.CoordinatedStateManager
;
71 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
72 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
73 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
74 import org
.apache
.hadoop
.hbase
.HConstants
;
75 import org
.apache
.hadoop
.hbase
.HealthCheckChore
;
76 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
77 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
78 import org
.apache
.hadoop
.hbase
.PleaseHoldException
;
79 import org
.apache
.hadoop
.hbase
.ScheduledChore
;
80 import org
.apache
.hadoop
.hbase
.ServerName
;
81 import org
.apache
.hadoop
.hbase
.Stoppable
;
82 import org
.apache
.hadoop
.hbase
.TableDescriptors
;
83 import org
.apache
.hadoop
.hbase
.TableName
;
84 import org
.apache
.hadoop
.hbase
.YouAreDeadException
;
85 import org
.apache
.hadoop
.hbase
.ZNodeClearer
;
86 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
87 import org
.apache
.hadoop
.hbase
.client
.ClusterConnection
;
88 import org
.apache
.hadoop
.hbase
.client
.ClusterConnectionFactory
;
89 import org
.apache
.hadoop
.hbase
.client
.Connection
;
90 import org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
;
91 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
92 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
93 import org
.apache
.hadoop
.hbase
.client
.RpcRetryingCallerFactory
;
94 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
95 import org
.apache
.hadoop
.hbase
.client
.locking
.EntityLock
;
96 import org
.apache
.hadoop
.hbase
.client
.locking
.LockServiceClient
;
97 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationManager
;
98 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationObserver
;
99 import org
.apache
.hadoop
.hbase
.coordination
.ZkCoordinatedStateManager
;
100 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
101 import org
.apache
.hadoop
.hbase
.exceptions
.RegionMovedException
;
102 import org
.apache
.hadoop
.hbase
.exceptions
.RegionOpeningException
;
103 import org
.apache
.hadoop
.hbase
.exceptions
.UnknownProtocolException
;
104 import org
.apache
.hadoop
.hbase
.executor
.ExecutorService
;
105 import org
.apache
.hadoop
.hbase
.executor
.ExecutorType
;
106 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
;
107 import org
.apache
.hadoop
.hbase
.http
.InfoServer
;
108 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
109 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCacheFactory
;
110 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
111 import org
.apache
.hadoop
.hbase
.io
.util
.MemorySizeUtil
;
112 import org
.apache
.hadoop
.hbase
.ipc
.CoprocessorRpcUtils
;
113 import org
.apache
.hadoop
.hbase
.ipc
.NettyRpcClientConfigHelper
;
114 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
115 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
116 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
117 import org
.apache
.hadoop
.hbase
.ipc
.RpcServerInterface
;
118 import org
.apache
.hadoop
.hbase
.ipc
.ServerNotRunningYetException
;
119 import org
.apache
.hadoop
.hbase
.ipc
.ServerRpcController
;
120 import org
.apache
.hadoop
.hbase
.log
.HBaseMarkers
;
121 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
122 import org
.apache
.hadoop
.hbase
.master
.LoadBalancer
;
123 import org
.apache
.hadoop
.hbase
.master
.MasterRpcServicesVersionWrapper
;
124 import org
.apache
.hadoop
.hbase
.master
.RegionState
.State
;
125 import org
.apache
.hadoop
.hbase
.mob
.MobFileCache
;
126 import org
.apache
.hadoop
.hbase
.procedure
.RegionServerProcedureManagerHost
;
127 import org
.apache
.hadoop
.hbase
.procedure2
.RSProcedureCallable
;
128 import org
.apache
.hadoop
.hbase
.quotas
.FileSystemUtilizationChore
;
129 import org
.apache
.hadoop
.hbase
.quotas
.QuotaUtil
;
130 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerRpcQuotaManager
;
131 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerSpaceQuotaManager
;
132 import org
.apache
.hadoop
.hbase
.quotas
.RegionSize
;
133 import org
.apache
.hadoop
.hbase
.quotas
.RegionSizeStore
;
134 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionConfiguration
;
135 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionLifeCycleTracker
;
136 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionProgress
;
137 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequester
;
138 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseMetaHandler
;
139 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseRegionHandler
;
140 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.RSProcedureHandler
;
141 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.RegionReplicaFlushHandler
;
142 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.FlushThroughputControllerFactory
;
143 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.ThroughputController
;
144 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationLoad
;
145 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationSourceInterface
;
146 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationStatus
;
147 import org
.apache
.hadoop
.hbase
.security
.SecurityConstants
;
148 import org
.apache
.hadoop
.hbase
.security
.Superusers
;
149 import org
.apache
.hadoop
.hbase
.security
.User
;
150 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
151 import org
.apache
.hadoop
.hbase
.security
.access
.AccessChecker
;
152 import org
.apache
.hadoop
.hbase
.security
.access
.ZKPermissionWatcher
;
153 import org
.apache
.hadoop
.hbase
.trace
.SpanReceiverHost
;
154 import org
.apache
.hadoop
.hbase
.trace
.TraceUtil
;
155 import org
.apache
.hadoop
.hbase
.util
.Addressing
;
156 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
157 import org
.apache
.hadoop
.hbase
.util
.CompressionTest
;
158 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
159 import org
.apache
.hadoop
.hbase
.util
.FSTableDescriptors
;
160 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
161 import org
.apache
.hadoop
.hbase
.util
.HasThread
;
162 import org
.apache
.hadoop
.hbase
.util
.JvmPauseMonitor
;
163 import org
.apache
.hadoop
.hbase
.util
.NettyEventLoopGroupConfig
;
164 import org
.apache
.hadoop
.hbase
.util
.Pair
;
165 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
;
166 import org
.apache
.hadoop
.hbase
.util
.RetryCounterFactory
;
167 import org
.apache
.hadoop
.hbase
.util
.ServerRegionReplicaUtil
;
168 import org
.apache
.hadoop
.hbase
.util
.Sleeper
;
169 import org
.apache
.hadoop
.hbase
.util
.Threads
;
170 import org
.apache
.hadoop
.hbase
.util
.VersionInfo
;
171 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
172 import org
.apache
.hadoop
.hbase
.wal
.NettyAsyncFSWALConfigHelper
;
173 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
174 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
175 import org
.apache
.hadoop
.hbase
.wal
.WALProvider
;
176 import org
.apache
.hadoop
.hbase
.zookeeper
.ClusterStatusTracker
;
177 import org
.apache
.hadoop
.hbase
.zookeeper
.MasterAddressTracker
;
178 import org
.apache
.hadoop
.hbase
.zookeeper
.MetaTableLocator
;
179 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKClusterId
;
180 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKNodeTracker
;
181 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
182 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
183 import org
.apache
.hadoop
.hbase
.zookeeper
.ZNodePaths
;
184 import org
.apache
.hadoop
.ipc
.RemoteException
;
185 import org
.apache
.hadoop
.util
.ReflectionUtils
;
186 import org
.apache
.yetus
.audience
.InterfaceAudience
;
187 import org
.apache
.zookeeper
.KeeperException
;
188 import org
.slf4j
.Logger
;
189 import org
.slf4j
.LoggerFactory
;
190 import sun
.misc
.Signal
;
191 import sun
.misc
.SignalHandler
;
193 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
194 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
195 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
196 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.Cache
;
197 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.cache
.CacheBuilder
;
198 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
199 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.BlockingRpcChannel
;
200 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
201 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
202 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.TextFormat
;
203 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.UnsafeByteOperations
;
205 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
206 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
207 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceCall
;
208 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceRequest
;
209 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceResponse
;
210 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
;
211 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionLoad
;
212 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionStoreSequenceIds
;
213 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
;
214 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
.Builder
;
215 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.NameStringPair
;
216 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionServerInfo
;
217 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
;
218 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
.RegionSpecifierType
;
219 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.LockServiceProtos
.LockService
;
220 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
;
221 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdRequest
;
222 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdResponse
;
223 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerReportRequest
;
224 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupRequest
;
225 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupResponse
;
226 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStatusService
;
227 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionSpaceUse
;
228 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionSpaceUseReportRequest
;
229 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
;
230 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
.TransitionCode
;
231 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportProcedureDoneRequest
;
232 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRSFatalErrorRequest
;
233 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionRequest
;
234 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionResponse
;
237 * HRegionServer makes a set of HRegions available to clients. It checks in with
238 * the HMaster. There are many HRegionServers in a single HBase deployment.
240 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
241 @SuppressWarnings({ "deprecation"})
242 public class HRegionServer
extends HasThread
implements
243 RegionServerServices
, LastSequenceId
, ConfigurationObserver
{
244 // Time to pause if master says 'please hold'. Make configurable if needed.
245 private static final int INIT_PAUSE_TIME_MS
= 1000;
247 private static final Logger LOG
= LoggerFactory
.getLogger(HRegionServer
.class);
250 * For testing only! Set to true to skip notifying region assignment to master .
253 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="MS_SHOULD_BE_FINAL")
254 public static boolean TEST_SKIP_REPORTING_TRANSITION
= false;
256 //RegionName vs current action in progress
257 //true - if open region action in progress
258 //false - if close region action in progress
259 protected final ConcurrentMap
<byte[], Boolean
> regionsInTransitionInRS
=
260 new ConcurrentSkipListMap
<>(Bytes
.BYTES_COMPARATOR
);
263 * Used to cache the open/close region procedures which already submitted.
264 * See {@link #submitRegionProcedure(long)}.
266 private final ConcurrentMap
<Long
, Long
> submittedRegionProcedures
= new ConcurrentHashMap
<>();
268 * Used to cache the open/close region procedures which already executed.
269 * See {@link #submitRegionProcedure(long)}.
271 private final Cache
<Long
, Long
> executedRegionProcedures
=
272 CacheBuilder
.newBuilder().expireAfterAccess(600, TimeUnit
.SECONDS
).build();
275 protected MemStoreFlusher cacheFlusher
;
277 protected HeapMemoryManager hMemManager
;
280 * Cluster connection to be shared by services.
281 * Initialized at server startup and closed when server shuts down.
282 * Clients must never close it explicitly.
283 * Clients hosted by this Server should make use of this clusterConnection rather than create
284 * their own; if they create their own, there is no way for the hosting server to shutdown
285 * ongoing client RPCs.
287 protected ClusterConnection clusterConnection
;
290 * The asynchronous cluster connection to be shared by services.
292 protected AsyncClusterConnection asyncClusterConnection
;
295 * Go here to get table descriptors.
297 protected TableDescriptors tableDescriptors
;
299 // Replication services. If no replication, this handler will be null.
300 protected ReplicationSourceService replicationSourceHandler
;
301 protected ReplicationSinkService replicationSinkHandler
;
304 public CompactSplit compactSplitThread
;
307 * Map of regions currently being served by this region server. Key is the
308 * encoded region name. All access should be synchronized.
310 protected final Map
<String
, HRegion
> onlineRegions
= new ConcurrentHashMap
<>();
313 * Map of encoded region names to the DataNode locations they should be hosted on
314 * We store the value as InetSocketAddress since this is used only in HDFS
315 * API (create() that takes favored nodes as hints for placing file blocks).
316 * We could have used ServerName here as the value class, but we'd need to
317 * convert it to InetSocketAddress at some point before the HDFS API call, and
318 * it seems a bit weird to store ServerName since ServerName refers to RegionServers
319 * and here we really mean DataNode locations.
321 protected final Map
<String
, InetSocketAddress
[]> regionFavoredNodesMap
=
322 new ConcurrentHashMap
<>();
325 protected Leases leases
;
327 // Instance of the hbase executor executorService.
328 protected ExecutorService executorService
;
330 // If false, the file system has become unavailable
331 protected volatile boolean fsOk
;
332 protected HFileSystem fs
;
333 protected HFileSystem walFs
;
335 // Set when a report to the master comes back with a message asking us to
336 // shutdown. Also set by call to stop when debugging or running unit tests
337 // of HRegionServer in isolation.
338 private volatile boolean stopped
= false;
340 // Go down hard. Used if file system becomes unavailable and also in
341 // debugging and unit tests.
342 private volatile boolean abortRequested
;
343 public static final String ABORT_TIMEOUT
= "hbase.regionserver.abort.timeout";
344 // Default abort timeout is 1200 seconds for safe
345 private static final long DEFAULT_ABORT_TIMEOUT
= 1200000;
346 // Will run this task when abort timeout
347 public static final String ABORT_TIMEOUT_TASK
= "hbase.regionserver.abort.timeout.task";
349 ConcurrentMap
<String
, Integer
> rowlocks
= new ConcurrentHashMap
<>();
351 // A state before we go into stopped state. At this stage we're closing user
353 private boolean stopping
= false;
355 volatile boolean killed
= false;
357 private volatile boolean shutDown
= false;
359 protected final Configuration conf
;
361 private Path rootDir
;
362 private Path walRootDir
;
364 protected final ReentrantReadWriteLock lock
= new ReentrantReadWriteLock();
366 final int numRetries
;
367 protected final int threadWakeFrequency
;
368 protected final int msgInterval
;
370 protected final int numRegionsToReport
;
372 // Stub to do region server status calls against the master.
373 private volatile RegionServerStatusService
.BlockingInterface rssStub
;
374 private volatile LockService
.BlockingInterface lockStub
;
375 // RPC client. Used to make the stub above that does region server status checking.
378 private RpcRetryingCallerFactory rpcRetryingCallerFactory
;
379 private RpcControllerFactory rpcControllerFactory
;
381 private UncaughtExceptionHandler uncaughtExceptionHandler
;
383 // Info server. Default access so can be used by unit tests. REGIONSERVER
384 // is name of the webapp and the attribute name used stuffing this instance
386 protected InfoServer infoServer
;
387 private JvmPauseMonitor pauseMonitor
;
389 /** region server process name */
390 public static final String REGIONSERVER
= "regionserver";
392 MetricsRegionServer metricsRegionServer
;
393 MetricsTable metricsTable
;
394 private SpanReceiverHost spanReceiverHost
;
397 * ChoreService used to schedule tasks that we want to run periodically
399 private ChoreService choreService
;
402 * Check for compactions requests.
404 ScheduledChore compactionChecker
;
409 ScheduledChore periodicFlusher
;
411 protected volatile WALFactory walFactory
;
413 // WAL roller. log is protected rather than private to avoid
414 // eclipse warning when accessed by inner classes
415 protected LogRoller walRoller
;
417 // A thread which calls reportProcedureDone
418 private RemoteProcedureResultReporter procedureResultReporter
;
420 // flag set after we're done setting up server threads
421 final AtomicBoolean online
= new AtomicBoolean(false);
423 // zookeeper connection and watcher
424 protected final ZKWatcher zooKeeper
;
426 // master address tracker
427 private final MasterAddressTracker masterAddressTracker
;
429 // Cluster Status Tracker
430 protected final ClusterStatusTracker clusterStatusTracker
;
432 // Log Splitting Worker
433 private SplitLogWorker splitLogWorker
;
435 // A sleeper that sleeps for msgInterval.
436 protected final Sleeper sleeper
;
438 private final int operationTimeout
;
439 private final int shortOperationTimeout
;
441 private final RegionServerAccounting regionServerAccounting
;
444 private BlockCache blockCache
;
445 // The cache for mob files
446 private MobFileCache mobFileCache
;
448 /** The health check chore. */
449 private HealthCheckChore healthCheckChore
;
451 /** The nonce manager chore. */
452 private ScheduledChore nonceManagerChore
;
454 private Map
<String
, com
.google
.protobuf
.Service
> coprocessorServiceHandlers
= Maps
.newHashMap();
457 * The server name the Master sees us as. Its made from the hostname the
458 * master passes us, port, and server startcode. Gets set after registration
461 protected ServerName serverName
;
464 * hostname specified by hostname config
466 protected String useThisHostnameInstead
;
468 // key to the config parameter of server hostname
469 // the specification of server hostname is optional. The hostname should be resolvable from
470 // both master and region server
471 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
472 final static String RS_HOSTNAME_KEY
= "hbase.regionserver.hostname";
473 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
474 protected final static String MASTER_HOSTNAME_KEY
= "hbase.master.hostname";
476 // HBASE-18226: This config and hbase.regionserver.hostname are mutually exclusive.
477 // Exception will be thrown if both are used.
478 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
=
479 "hbase.regionserver.hostname.disable.master.reversedns";
482 * This servers startcode.
484 protected final long startcode
;
487 * Unique identifier for the cluster we are a part of.
489 protected String clusterId
;
492 * Chore to clean periodically the moved region list
494 private MovedRegionsCleaner movedRegionsCleaner
;
496 // chore for refreshing store files for secondary regions
497 private StorefileRefresherChore storefileRefresher
;
499 private RegionServerCoprocessorHost rsHost
;
501 private RegionServerProcedureManagerHost rspmHost
;
503 private RegionServerRpcQuotaManager rsQuotaManager
;
504 private RegionServerSpaceQuotaManager rsSpaceQuotaManager
;
507 * Nonce manager. Nonces are used to make operations like increment and append idempotent
508 * in the case where client doesn't receive the response from a successful operation and
509 * retries. We track the successful ops for some time via a nonce sent by client and handle
510 * duplicate operations (currently, by failing them; in future we might use MVCC to return
511 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
513 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
514 * of past records. If we don't read the records, we don't read and recover the nonces.
515 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
516 * - There's no WAL recovery during normal region move, so nonces will not be transfered.
517 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
518 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
519 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
520 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
521 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
522 * latest nonce in it expired. It can also be recovered during move.
524 final ServerNonceManager nonceManager
;
526 private UserProvider userProvider
;
528 protected final RSRpcServices rpcServices
;
530 protected CoordinatedStateManager csm
;
533 * Configuration manager is used to register/deregister and notify the configuration observers
534 * when the regionserver is notified that there was a change in the on disk configs.
536 protected final ConfigurationManager configurationManager
;
539 CompactedHFilesDischarger compactedFileDischarger
;
541 private volatile ThroughputController flushThroughputController
;
543 protected SecureBulkLoadManager secureBulkLoadManager
;
545 protected FileSystemUtilizationChore fsUtilizationChore
;
547 private final NettyEventLoopGroupConfig eventLoopGroupConfig
;
550 * True if this RegionServer is coming up in a cluster where there is no Master;
551 * means it needs to just come up and make do without a Master to talk to: e.g. in test or
552 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
553 * purpose is as a Replication-stream sink; see HBASE-18846 for more.
555 private final boolean masterless
;
556 static final String MASTERLESS_CONFIG_NAME
= "hbase.masterless";
558 /**regionserver codec list **/
559 public static final String REGIONSERVER_CODEC
= "hbase.regionserver.codecs";
561 // A timer to shutdown the process if abort takes too long
562 private Timer abortMonitor
;
565 * Starts a HRegionServer at the default location
567 // Don't start any services or managers in here in the Constructor.
568 // Defer till after we register with the Master as much as possible. See #startServices.
569 public HRegionServer(Configuration conf
) throws IOException
{
570 super("RegionServer"); // thread name
571 TraceUtil
.initTracer(conf
);
573 this.startcode
= System
.currentTimeMillis();
576 this.masterless
= conf
.getBoolean(MASTERLESS_CONFIG_NAME
, false);
577 this.eventLoopGroupConfig
= setupNetty(this.conf
);
578 MemorySizeUtil
.checkForClusterFreeHeapMemoryLimit(this.conf
);
579 HFile
.checkHFileVersion(this.conf
);
580 checkCodecs(this.conf
);
581 this.userProvider
= UserProvider
.instantiate(conf
);
582 FSUtils
.setupShortCircuitRead(this.conf
);
584 // Disable usage of meta replicas in the regionserver
585 this.conf
.setBoolean(HConstants
.USE_META_REPLICAS
, false);
587 this.numRetries
= this.conf
.getInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
588 HConstants
.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER
);
589 this.threadWakeFrequency
= conf
.getInt(HConstants
.THREAD_WAKE_FREQUENCY
, 10 * 1000);
590 this.msgInterval
= conf
.getInt("hbase.regionserver.msginterval", 3 * 1000);
592 this.sleeper
= new Sleeper(this.msgInterval
, this);
594 boolean isNoncesEnabled
= conf
.getBoolean(HConstants
.HBASE_RS_NONCES_ENABLED
, true);
595 this.nonceManager
= isNoncesEnabled ?
new ServerNonceManager(this.conf
) : null;
597 this.numRegionsToReport
= conf
.getInt("hbase.regionserver.numregionstoreport", 10);
599 this.operationTimeout
= conf
.getInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
,
600 HConstants
.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
);
602 this.shortOperationTimeout
= conf
.getInt(HConstants
.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY
,
603 HConstants
.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT
);
605 this.abortRequested
= false;
606 this.stopped
= false;
608 rpcServices
= createRpcServices();
609 useThisHostnameInstead
= getUseThisHostnameInstead(conf
);
611 StringUtils
.isBlank(useThisHostnameInstead
) ?
this.rpcServices
.isa
.getHostName()
612 : this.useThisHostnameInstead
;
613 serverName
= ServerName
.valueOf(hostName
, this.rpcServices
.isa
.getPort(), this.startcode
);
615 rpcControllerFactory
= RpcControllerFactory
.instantiate(this.conf
);
616 rpcRetryingCallerFactory
= RpcRetryingCallerFactory
.instantiate(this.conf
);
618 // login the zookeeper client principal (if using security)
619 ZKUtil
.loginClient(this.conf
, HConstants
.ZK_CLIENT_KEYTAB_FILE
,
620 HConstants
.ZK_CLIENT_KERBEROS_PRINCIPAL
, hostName
);
621 // login the server principal (if using secure Hadoop)
622 login(userProvider
, hostName
);
623 // init superusers and add the server principal (if using security)
624 // or process owner as default super user.
625 Superusers
.initialize(conf
);
626 regionServerAccounting
= new RegionServerAccounting(conf
);
628 boolean isMasterNotCarryTable
=
629 this instanceof HMaster
&& !LoadBalancer
.isTablesOnMaster(conf
);
631 // no need to instantiate block cache and mob file cache when master not carry table
632 if (!isMasterNotCarryTable
) {
633 blockCache
= BlockCacheFactory
.createBlockCache(conf
);
634 mobFileCache
= new MobFileCache(conf
);
637 uncaughtExceptionHandler
= new UncaughtExceptionHandler() {
639 public void uncaughtException(Thread t
, Throwable e
) {
640 abort("Uncaught exception in executorService thread " + t
.getName(), e
);
644 initializeFileSystem();
645 spanReceiverHost
= SpanReceiverHost
.getInstance(getConfiguration());
647 this.configurationManager
= new ConfigurationManager();
648 setupWindows(getConfiguration(), getConfigurationManager());
650 // Some unit tests don't need a cluster, so no zookeeper at all
651 if (!conf
.getBoolean("hbase.testing.nocluster", false)) {
652 // Open connection to zookeeper and set primary watcher
653 zooKeeper
= new ZKWatcher(conf
, getProcessName() + ":" +
654 rpcServices
.isa
.getPort(), this, canCreateBaseZNode());
655 // If no master in cluster, skip trying to track one or look for a cluster status.
656 if (!this.masterless
) {
657 if (conf
.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK
,
658 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
)) {
659 this.csm
= new ZkCoordinatedStateManager(this);
662 masterAddressTracker
= new MasterAddressTracker(getZooKeeper(), this);
663 masterAddressTracker
.start();
665 clusterStatusTracker
= new ClusterStatusTracker(zooKeeper
, this);
666 clusterStatusTracker
.start();
668 masterAddressTracker
= null;
669 clusterStatusTracker
= null;
673 masterAddressTracker
= null;
674 clusterStatusTracker
= null;
676 this.rpcServices
.start(zooKeeper
);
677 // This violates 'no starting stuff in Constructor' but Master depends on the below chore
678 // and executor being created and takes a different startup route. Lots of overlap between HRS
679 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
680 // Master expects Constructor to put up web servers. Ugh.
682 this.choreService
= new ChoreService(getName(), true);
683 this.executorService
= new ExecutorService(getName());
685 } catch (Throwable t
) {
686 // Make sure we log the exception. HRegionServer is often started via reflection and the
687 // cause of failed startup is lost.
688 LOG
.error("Failed construction RegionServer", t
);
693 // HMaster should override this method to load the specific config for master
694 protected String
getUseThisHostnameInstead(Configuration conf
) throws IOException
{
695 String hostname
= conf
.get(RS_HOSTNAME_KEY
);
696 if (conf
.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
, false)) {
697 if (!StringUtils
.isBlank(hostname
)) {
698 String msg
= RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
+ " and " + RS_HOSTNAME_KEY
+
699 " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY
+
700 " to true while " + RS_HOSTNAME_KEY
+ " is used";
701 throw new IOException(msg
);
703 return rpcServices
.isa
.getHostName();
711 * If running on Windows, do windows-specific setup.
713 private static void setupWindows(final Configuration conf
, ConfigurationManager cm
) {
714 if (!SystemUtils
.IS_OS_WINDOWS
) {
715 Signal
.handle(new Signal("HUP"), new SignalHandler() {
717 public void handle(Signal signal
) {
718 conf
.reloadConfiguration();
719 cm
.notifyAllObservers(conf
);
725 private static NettyEventLoopGroupConfig
setupNetty(Configuration conf
) {
726 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
727 NettyEventLoopGroupConfig nelgc
=
728 new NettyEventLoopGroupConfig(conf
, "RS-EventLoopGroup");
729 NettyRpcClientConfigHelper
.setEventLoopConfig(conf
, nelgc
.group(), nelgc
.clientChannelClass());
730 NettyAsyncFSWALConfigHelper
.setEventLoopConfig(conf
, nelgc
.group(), nelgc
.clientChannelClass());
734 private void initializeFileSystem() throws IOException
{
735 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
736 // checksum verification enabled, then automatically switch off hdfs checksum verification.
737 boolean useHBaseChecksum
= conf
.getBoolean(HConstants
.HBASE_CHECKSUM_VERIFICATION
, true);
738 FSUtils
.setFsDefault(this.conf
, FSUtils
.getWALRootDir(this.conf
));
739 this.walFs
= new HFileSystem(this.conf
, useHBaseChecksum
);
740 this.walRootDir
= FSUtils
.getWALRootDir(this.conf
);
741 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
742 // underlying hadoop hdfs accessors will be going against wrong filesystem
743 // (unless all is set to defaults).
744 FSUtils
.setFsDefault(this.conf
, FSUtils
.getRootDir(this.conf
));
745 this.fs
= new HFileSystem(this.conf
, useHBaseChecksum
);
746 this.rootDir
= FSUtils
.getRootDir(this.conf
);
747 this.tableDescriptors
= getFsTableDescriptors();
750 protected TableDescriptors
getFsTableDescriptors() throws IOException
{
751 return new FSTableDescriptors(this.conf
,
752 this.fs
, this.rootDir
, !canUpdateTableDescriptor(), false, getMetaTableObserver());
755 protected Function
<TableDescriptorBuilder
, TableDescriptorBuilder
> getMetaTableObserver() {
759 protected void login(UserProvider user
, String host
) throws IOException
{
760 user
.login(SecurityConstants
.REGIONSERVER_KRB_KEYTAB_FILE
,
761 SecurityConstants
.REGIONSERVER_KRB_PRINCIPAL
, host
);
766 * Wait for an active Master.
767 * See override in Master superclass for how it is used.
769 protected void waitForMasterActive() {}
771 protected String
getProcessName() {
775 protected boolean canCreateBaseZNode() {
776 return this.masterless
;
779 protected boolean canUpdateTableDescriptor() {
783 protected RSRpcServices
createRpcServices() throws IOException
{
784 return new RSRpcServices(this);
787 protected void configureInfoServer() {
788 infoServer
.addServlet("rs-status", "/rs-status", RSStatusServlet
.class);
789 infoServer
.setAttribute(REGIONSERVER
, this);
792 protected Class
<?
extends HttpServlet
> getDumpServlet() {
793 return RSDumpServlet
.class;
797 public boolean registerService(com
.google
.protobuf
.Service instance
) {
799 * No stacking of instances is allowed for a single executorService name
801 com
.google
.protobuf
.Descriptors
.ServiceDescriptor serviceDesc
=
802 instance
.getDescriptorForType();
803 String serviceName
= CoprocessorRpcUtils
.getServiceName(serviceDesc
);
804 if (coprocessorServiceHandlers
.containsKey(serviceName
)) {
805 LOG
.error("Coprocessor executorService " + serviceName
806 + " already registered, rejecting request from " + instance
);
810 coprocessorServiceHandlers
.put(serviceName
, instance
);
811 if (LOG
.isDebugEnabled()) {
812 LOG
.debug("Registered regionserver coprocessor executorService: executorService=" + serviceName
);
817 private Configuration
unsetClientZookeeperQuorum() {
818 Configuration conf
= this.conf
;
819 if (conf
.get(HConstants
.CLIENT_ZOOKEEPER_QUORUM
) != null) {
820 // Use server ZK cluster for server-issued connections, so we clone
821 // the conf and unset the client ZK related properties
822 conf
= new Configuration(this.conf
);
823 conf
.unset(HConstants
.CLIENT_ZOOKEEPER_QUORUM
);
829 * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to the
830 * local server; i.e. a short-circuit Connection. Safe to use going to local or remote server.
832 private ClusterConnection
createClusterConnection() throws IOException
{
833 // Create a cluster connection that when appropriate, can short-circuit and go directly to the
834 // local server if the request is to the local server bypassing RPC. Can be used for both local
835 // and remote invocations.
836 ClusterConnection conn
=
837 ConnectionUtils
.createShortCircuitConnection(unsetClientZookeeperQuorum(), null,
838 userProvider
.getCurrent(), serverName
, rpcServices
, rpcServices
);
839 // This is used to initialize the batch thread pool inside the connection implementation.
840 // When deploy a fresh cluster, we may first use the cluster connection in InitMetaProcedure,
841 // which will be executed inside the PEWorker, and then the batch thread pool will inherit the
842 // thread group of PEWorker, which will be destroy when shutting down the ProcedureExecutor. It
843 // will cause lots of procedure related UTs to fail, so here let's initialize it first, no harm.
844 conn
.getTable(TableName
.META_TABLE_NAME
).close();
849 * Run test on configured codecs to make sure supporting libs are in place.
851 * @throws IOException
853 private static void checkCodecs(final Configuration c
) throws IOException
{
854 // check to see if the codec list is available:
855 String
[] codecs
= c
.getStrings(REGIONSERVER_CODEC
, (String
[])null);
856 if (codecs
== null) return;
857 for (String codec
: codecs
) {
858 if (!CompressionTest
.testCompression(codec
)) {
859 throw new IOException("Compression codec " + codec
+
860 " not supported, aborting RS construction");
865 public String
getClusterId() {
866 return this.clusterId
;
870 * Setup our cluster connection if not already initialized.
872 protected final synchronized void setupClusterConnection() throws IOException
{
873 if (clusterConnection
== null) {
874 clusterConnection
= createClusterConnection();
875 asyncClusterConnection
=
876 ClusterConnectionFactory
.createAsyncClusterConnection(unsetClientZookeeperQuorum(),
877 new InetSocketAddress(this.rpcServices
.isa
.getAddress(), 0), userProvider
.getCurrent());
882 * All initialization needed before we go register with Master.<br>
883 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
884 * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
886 private void preRegistrationInitialization() {
888 initializeZooKeeper();
889 setupClusterConnection();
890 // Setup RPC client for master communication
891 this.rpcClient
= asyncClusterConnection
.getRpcClient();
892 } catch (Throwable t
) {
893 // Call stop if error or process will stick around for ever since server
894 // puts up non-daemon threads.
895 this.rpcServices
.stop();
896 abort("Initialization of RS failed. Hence aborting RS.", t
);
901 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
902 * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
904 * Finally open long-living server short-circuit connection.
906 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
907 justification
="cluster Id znode read would give us correct response")
908 private void initializeZooKeeper() throws IOException
, InterruptedException
{
909 // Nothing to do in here if no Master in the mix.
910 if (this.masterless
) {
914 // Create the master address tracker, register with zk, and start it. Then
915 // block until a master is available. No point in starting up if no master
917 blockAndCheckIfStopped(this.masterAddressTracker
);
919 // Wait on cluster being up. Master will set this flag up in zookeeper
921 blockAndCheckIfStopped(this.clusterStatusTracker
);
923 // If we are HMaster then the cluster id should have already been set.
924 if (clusterId
== null) {
925 // Retrieve clusterId
926 // Since cluster status is now up
927 // ID should have already been set by HMaster
929 clusterId
= ZKClusterId
.readClusterIdZNode(this.zooKeeper
);
930 if (clusterId
== null) {
931 this.abort("Cluster ID has not been set");
933 LOG
.info("ClusterId : " + clusterId
);
934 } catch (KeeperException e
) {
935 this.abort("Failed to retrieve Cluster ID", e
);
939 waitForMasterActive();
940 if (isStopped() || isAborted()) {
941 return; // No need for further initialization
944 // watch for snapshots and other procedures
946 rspmHost
= new RegionServerProcedureManagerHost();
947 rspmHost
.loadProcedures(conf
);
948 rspmHost
.initialize(this);
949 } catch (KeeperException e
) {
950 this.abort("Failed to reach coordination cluster when creating procedure handler.", e
);
955 * Utilty method to wait indefinitely on a znode availability while checking
956 * if the region server is shut down
957 * @param tracker znode tracker to use
958 * @throws IOException any IO exception, plus if the RS is stopped
959 * @throws InterruptedException
961 private void blockAndCheckIfStopped(ZKNodeTracker tracker
)
962 throws IOException
, InterruptedException
{
963 while (tracker
.blockUntilAvailable(this.msgInterval
, false) == null) {
965 throw new IOException("Received the shutdown message while waiting.");
971 * @return True if the cluster is up.
974 public boolean isClusterUp() {
975 return this.masterless
||
976 (this.clusterStatusTracker
!= null && this.clusterStatusTracker
.isClusterUp());
980 * The HRegionServer sticks in this loop until closed.
985 // Do pre-registration initializations; zookeeper, lease threads, etc.
986 preRegistrationInitialization();
987 } catch (Throwable e
) {
988 abort("Fatal exception during initialization", e
);
992 if (!isStopped() && !isAborted()) {
993 ShutdownHook
.install(conf
, fs
, this, Thread
.currentThread());
994 // Initialize the RegionServerCoprocessorHost now that our ephemeral
995 // node was created, in case any coprocessors want to use ZooKeeper
996 this.rsHost
= new RegionServerCoprocessorHost(this, this.conf
);
999 // Try and register with the Master; tell it we are here. Break if server is stopped or the
1000 // clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and start
1001 // up all Services. Use RetryCounter to get backoff in case Master is struggling to come up.
1002 LOG
.debug("About to register with Master.");
1003 RetryCounterFactory rcf
= new RetryCounterFactory(Integer
.MAX_VALUE
,
1004 this.sleeper
.getPeriod(), 1000 * 60 * 5);
1005 RetryCounter rc
= rcf
.create();
1006 while (keepLooping()) {
1007 RegionServerStartupResponse w
= reportForDuty();
1009 long sleepTime
= rc
.getBackoffTimeAndIncrementAttempts();
1010 LOG
.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime
);
1011 this.sleeper
.sleep(sleepTime
);
1013 handleReportForDutyResponse(w
);
1018 if (!isStopped() && isHealthy()) {
1019 // start the snapshot handler and other procedure handlers,
1020 // since the server is ready to run
1021 if (this.rspmHost
!= null) {
1022 this.rspmHost
.start();
1024 // Start the Quota Manager
1025 if (this.rsQuotaManager
!= null) {
1026 rsQuotaManager
.start(getRpcServer().getScheduler());
1028 if (this.rsSpaceQuotaManager
!= null) {
1029 this.rsSpaceQuotaManager
.start();
1033 // We registered with the Master. Go into run mode.
1034 long lastMsg
= System
.currentTimeMillis();
1035 long oldRequestCount
= -1;
1036 // The main run loop.
1037 while (!isStopped() && isHealthy()) {
1038 if (!isClusterUp()) {
1039 if (isOnlineRegionsEmpty()) {
1040 stop("Exiting; cluster shutdown set and not carrying any regions");
1041 } else if (!this.stopping
) {
1042 this.stopping
= true;
1043 LOG
.info("Closing user regions");
1044 closeUserRegions(this.abortRequested
);
1045 } else if (this.stopping
) {
1046 boolean allUserRegionsOffline
= areAllUserRegionsOffline();
1047 if (allUserRegionsOffline
) {
1048 // Set stopped if no more write requests tp meta tables
1049 // since last time we went around the loop. Any open
1050 // meta regions will be closed on our way out.
1051 if (oldRequestCount
== getWriteRequestCount()) {
1052 stop("Stopped; only catalog regions remaining online");
1055 oldRequestCount
= getWriteRequestCount();
1057 // Make sure all regions have been closed -- some regions may
1058 // have not got it because we were splitting at the time of
1059 // the call to closeUserRegions.
1060 closeUserRegions(this.abortRequested
);
1062 LOG
.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1065 long now
= System
.currentTimeMillis();
1066 if ((now
- lastMsg
) >= msgInterval
) {
1067 tryRegionServerReport(lastMsg
, now
);
1068 lastMsg
= System
.currentTimeMillis();
1070 if (!isStopped() && !isAborted()) {
1071 this.sleeper
.sleep();
1074 } catch (Throwable t
) {
1075 if (!rpcServices
.checkOOME(t
)) {
1076 String prefix
= t
instanceof YouAreDeadException?
"": "Unhandled: ";
1077 abort(prefix
+ t
.getMessage(), t
);
1081 if (this.leases
!= null) {
1082 this.leases
.closeAfterLeasesExpire();
1084 if (this.splitLogWorker
!= null) {
1085 splitLogWorker
.stop();
1087 if (this.infoServer
!= null) {
1088 LOG
.info("Stopping infoServer");
1090 this.infoServer
.stop();
1091 } catch (Exception e
) {
1092 LOG
.error("Failed to stop infoServer", e
);
1095 // Send cache a shutdown.
1096 if (blockCache
!= null) {
1097 blockCache
.shutdown();
1099 if (mobFileCache
!= null) {
1100 mobFileCache
.shutdown();
1103 if (movedRegionsCleaner
!= null) {
1104 movedRegionsCleaner
.stop("Region Server stopping");
1107 // Send interrupts to wake up threads if sleeping so they notice shutdown.
1108 // TODO: Should we check they are alive? If OOME could have exited already
1109 if (this.hMemManager
!= null) this.hMemManager
.stop();
1110 if (this.cacheFlusher
!= null) this.cacheFlusher
.interruptIfNecessary();
1111 if (this.compactSplitThread
!= null) this.compactSplitThread
.interruptIfNecessary();
1112 sendShutdownInterrupt();
1114 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1115 if (rspmHost
!= null) {
1116 rspmHost
.stop(this.abortRequested
|| this.killed
);
1120 // Just skip out w/o closing regions. Used when testing.
1121 } else if (abortRequested
) {
1123 closeUserRegions(abortRequested
); // Don't leave any open file handles
1125 LOG
.info("aborting server " + this.serverName
);
1127 closeUserRegions(abortRequested
);
1128 LOG
.info("stopping server " + this.serverName
);
1131 if (this.clusterConnection
!= null && !clusterConnection
.isClosed()) {
1133 this.clusterConnection
.close();
1134 } catch (IOException e
) {
1135 // Although the {@link Closeable} interface throws an {@link
1136 // IOException}, in reality, the implementation would never do that.
1137 LOG
.warn("Attempt to close server's short circuit ClusterConnection failed.", e
);
1140 if (this.asyncClusterConnection
!= null) {
1142 this.asyncClusterConnection
.close();
1143 } catch (IOException e
) {
1144 // Although the {@link Closeable} interface throws an {@link
1145 // IOException}, in reality, the implementation would never do that.
1146 LOG
.warn("Attempt to close server's AsyncClusterConnection failed.", e
);
1149 // Closing the compactSplit thread before closing meta regions
1150 if (!this.killed
&& containsMetaTableRegions()) {
1151 if (!abortRequested
|| this.fsOk
) {
1152 if (this.compactSplitThread
!= null) {
1153 this.compactSplitThread
.join();
1154 this.compactSplitThread
= null;
1156 closeMetaTableRegions(abortRequested
);
1160 if (!this.killed
&& this.fsOk
) {
1161 waitOnAllRegionsToClose(abortRequested
);
1162 LOG
.info("stopping server " + this.serverName
+ "; all regions closed.");
1165 // Stop the quota manager
1166 if (rsQuotaManager
!= null) {
1167 rsQuotaManager
.stop();
1169 if (rsSpaceQuotaManager
!= null) {
1170 rsSpaceQuotaManager
.stop();
1171 rsSpaceQuotaManager
= null;
1174 //fsOk flag may be changed when closing regions throws exception.
1176 shutdownWAL(!abortRequested
);
1179 // Make sure the proxy is down.
1180 if (this.rssStub
!= null) {
1181 this.rssStub
= null;
1183 if (this.lockStub
!= null) {
1184 this.lockStub
= null;
1186 if (this.rpcClient
!= null) {
1187 this.rpcClient
.close();
1189 if (this.leases
!= null) {
1190 this.leases
.close();
1192 if (this.pauseMonitor
!= null) {
1193 this.pauseMonitor
.stop();
1197 stopServiceThreads();
1200 if (this.rpcServices
!= null) {
1201 this.rpcServices
.stop();
1205 deleteMyEphemeralNode();
1206 } catch (KeeperException
.NoNodeException nn
) {
1207 } catch (KeeperException e
) {
1208 LOG
.warn("Failed deleting my ephemeral node", e
);
1210 // We may have failed to delete the znode at the previous step, but
1211 // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1212 ZNodeClearer
.deleteMyEphemeralNodeOnDisk();
1214 if (this.zooKeeper
!= null) {
1215 this.zooKeeper
.close();
1217 this.shutDown
= true;
1218 LOG
.info("Exiting; stopping=" + this.serverName
+ "; zookeeper connection closed.");
1221 private boolean containsMetaTableRegions() {
1222 return onlineRegions
.containsKey(RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
1225 private boolean areAllUserRegionsOffline() {
1226 if (getNumberOfOnlineRegions() > 2) return false;
1227 boolean allUserRegionsOffline
= true;
1228 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1229 if (!e
.getValue().getRegionInfo().isMetaRegion()) {
1230 allUserRegionsOffline
= false;
1234 return allUserRegionsOffline
;
1238 * @return Current write count for all online regions.
1240 private long getWriteRequestCount() {
1241 long writeCount
= 0;
1242 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1243 writeCount
+= e
.getValue().getWriteRequestsCount();
1249 protected void tryRegionServerReport(long reportStartTime
, long reportEndTime
)
1250 throws IOException
{
1251 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
1253 // the current server could be stopping.
1256 ClusterStatusProtos
.ServerLoad sl
= buildServerLoad(reportStartTime
, reportEndTime
);
1258 RegionServerReportRequest
.Builder request
= RegionServerReportRequest
.newBuilder();
1259 request
.setServer(ProtobufUtil
.toServerName(this.serverName
));
1260 request
.setLoad(sl
);
1261 rss
.regionServerReport(null, request
.build());
1262 } catch (ServiceException se
) {
1263 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
1264 if (ioe
instanceof YouAreDeadException
) {
1265 // This will be caught and handled as a fatal error in run()
1268 if (rssStub
== rss
) {
1271 // Couldn't connect to the master, get location from zk and reconnect
1272 // Method blocks until new master is found or we are stopped
1273 createRegionServerStatusStub(true);
1278 * Reports the given map of Regions and their size on the filesystem to the active Master.
1280 * @param regionSizeStore The store containing region sizes
1281 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1283 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore
) {
1284 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
1286 // the current server could be stopping.
1287 LOG
.trace("Skipping Region size report to HMaster as stub is null");
1291 buildReportAndSend(rss
, regionSizeStore
);
1292 } catch (ServiceException se
) {
1293 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
1294 if (ioe
instanceof PleaseHoldException
) {
1295 LOG
.trace("Failed to report region sizes to Master because it is initializing."
1296 + " This will be retried.", ioe
);
1297 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1300 if (rssStub
== rss
) {
1303 createRegionServerStatusStub(true);
1304 if (ioe
instanceof DoNotRetryIOException
) {
1305 DoNotRetryIOException doNotRetryEx
= (DoNotRetryIOException
) ioe
;
1306 if (doNotRetryEx
.getCause() != null) {
1307 Throwable t
= doNotRetryEx
.getCause();
1308 if (t
instanceof UnsupportedOperationException
) {
1309 LOG
.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1314 LOG
.debug("Failed to report region sizes to Master. This will be retried.", ioe
);
1320 * Builds the region size report and sends it to the master. Upon successful sending of the
1321 * report, the region sizes that were sent are marked as sent.
1323 * @param rss The stub to send to the Master
1324 * @param regionSizeStore The store containing region sizes
1326 void buildReportAndSend(RegionServerStatusService
.BlockingInterface rss
,
1327 RegionSizeStore regionSizeStore
) throws ServiceException
{
1328 RegionSpaceUseReportRequest request
=
1329 buildRegionSpaceUseReportRequest(Objects
.requireNonNull(regionSizeStore
));
1330 rss
.reportRegionSpaceUse(null, request
);
1331 // Record the number of size reports sent
1332 if (metricsRegionServer
!= null) {
1333 metricsRegionServer
.incrementNumRegionSizeReportsSent(regionSizeStore
.size());
1338 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1340 * @param regionSizes The size in bytes of regions
1341 * @return The corresponding protocol buffer message.
1343 RegionSpaceUseReportRequest
buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes
) {
1344 RegionSpaceUseReportRequest
.Builder request
= RegionSpaceUseReportRequest
.newBuilder();
1345 for (Entry
<RegionInfo
, RegionSize
> entry
: regionSizes
) {
1346 request
.addSpaceUse(convertRegionSize(entry
.getKey(), entry
.getValue().getSize()));
1348 return request
.build();
1352 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1355 * @param regionInfo The RegionInfo
1356 * @param sizeInBytes The size in bytes of the Region
1357 * @return The protocol buffer
1359 RegionSpaceUse
convertRegionSize(RegionInfo regionInfo
, Long sizeInBytes
) {
1360 return RegionSpaceUse
.newBuilder()
1361 .setRegionInfo(ProtobufUtil
.toRegionInfo(Objects
.requireNonNull(regionInfo
)))
1362 .setRegionSize(Objects
.requireNonNull(sizeInBytes
))
1366 ClusterStatusProtos
.ServerLoad
buildServerLoad(long reportStartTime
, long reportEndTime
)
1367 throws IOException
{
1368 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1369 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1370 // the wrapper to compute those numbers in one place.
1371 // In the long term most of these should be moved off of ServerLoad and the heart beat.
1372 // Instead they should be stored in an HBase table so that external visibility into HBase is
1373 // improved; Additionally the load balancer will be able to take advantage of a more complete
1375 MetricsRegionServerWrapper regionServerWrapper
= metricsRegionServer
.getRegionServerWrapper();
1376 Collection
<HRegion
> regions
= getOnlineRegionsLocalContext();
1377 long usedMemory
= -1L;
1378 long maxMemory
= -1L;
1379 final MemoryUsage usage
= MemorySizeUtil
.safeGetHeapMemoryUsage();
1380 if (usage
!= null) {
1381 usedMemory
= usage
.getUsed();
1382 maxMemory
= usage
.getMax();
1385 ClusterStatusProtos
.ServerLoad
.Builder serverLoad
= ClusterStatusProtos
.ServerLoad
.newBuilder();
1386 serverLoad
.setNumberOfRequests((int) regionServerWrapper
.getRequestsPerSecond());
1387 serverLoad
.setTotalNumberOfRequests(regionServerWrapper
.getTotalRequestCount());
1388 serverLoad
.setUsedHeapMB((int)(usedMemory
/ 1024 / 1024));
1389 serverLoad
.setMaxHeapMB((int) (maxMemory
/ 1024 / 1024));
1390 Set
<String
> coprocessors
= getWAL(null).getCoprocessorHost().getCoprocessors();
1391 Builder coprocessorBuilder
= Coprocessor
.newBuilder();
1392 for (String coprocessor
: coprocessors
) {
1393 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1395 RegionLoad
.Builder regionLoadBldr
= RegionLoad
.newBuilder();
1396 RegionSpecifier
.Builder regionSpecifier
= RegionSpecifier
.newBuilder();
1397 for (HRegion region
: regions
) {
1398 if (region
.getCoprocessorHost() != null) {
1399 Set
<String
> regionCoprocessors
= region
.getCoprocessorHost().getCoprocessors();
1400 Iterator
<String
> iterator
= regionCoprocessors
.iterator();
1401 while (iterator
.hasNext()) {
1402 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(iterator
.next()).build());
1405 serverLoad
.addRegionLoads(createRegionLoad(region
, regionLoadBldr
, regionSpecifier
));
1406 for (String coprocessor
: getWAL(region
.getRegionInfo()).getCoprocessorHost()
1407 .getCoprocessors()) {
1408 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1411 serverLoad
.setReportStartTime(reportStartTime
);
1412 serverLoad
.setReportEndTime(reportEndTime
);
1413 if (this.infoServer
!= null) {
1414 serverLoad
.setInfoServerPort(this.infoServer
.getPort());
1416 serverLoad
.setInfoServerPort(-1);
1419 // for the replicationLoad purpose. Only need to get from one executorService
1420 // either source or sink will get the same info
1421 ReplicationSourceService rsources
= getReplicationSourceService();
1423 if (rsources
!= null) {
1424 // always refresh first to get the latest value
1425 ReplicationLoad rLoad
= rsources
.refreshAndGetReplicationLoad();
1426 if (rLoad
!= null) {
1427 serverLoad
.setReplLoadSink(rLoad
.getReplicationLoadSink());
1428 for (ClusterStatusProtos
.ReplicationLoadSource rLS
:
1429 rLoad
.getReplicationLoadSourceEntries()) {
1430 serverLoad
.addReplLoadSource(rLS
);
1436 return serverLoad
.build();
1439 String
getOnlineRegionsAsPrintableString() {
1440 StringBuilder sb
= new StringBuilder();
1441 for (Region r
: this.onlineRegions
.values()) {
1442 if (sb
.length() > 0) sb
.append(", ");
1443 sb
.append(r
.getRegionInfo().getEncodedName());
1445 return sb
.toString();
1449 * Wait on regions close.
1451 private void waitOnAllRegionsToClose(final boolean abort
) {
1452 // Wait till all regions are closed before going out.
1454 long previousLogTime
= 0;
1455 Set
<String
> closedRegions
= new HashSet
<>();
1456 boolean interrupted
= false;
1458 while (!isOnlineRegionsEmpty()) {
1459 int count
= getNumberOfOnlineRegions();
1460 // Only print a message if the count of regions has changed.
1461 if (count
!= lastCount
) {
1462 // Log every second at most
1463 if (System
.currentTimeMillis() > (previousLogTime
+ 1000)) {
1464 previousLogTime
= System
.currentTimeMillis();
1466 LOG
.info("Waiting on " + count
+ " regions to close");
1467 // Only print out regions still closing if a small number else will
1469 if (count
< 10 && LOG
.isDebugEnabled()) {
1470 LOG
.debug("Online Regions=" + this.onlineRegions
);
1474 // Ensure all user regions have been sent a close. Use this to
1475 // protect against the case where an open comes in after we start the
1476 // iterator of onlineRegions to close all user regions.
1477 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
1478 RegionInfo hri
= e
.getValue().getRegionInfo();
1479 if (!this.regionsInTransitionInRS
.containsKey(hri
.getEncodedNameAsBytes()) &&
1480 !closedRegions
.contains(hri
.getEncodedName())) {
1481 closedRegions
.add(hri
.getEncodedName());
1482 // Don't update zk with this close transition; pass false.
1483 closeRegionIgnoreErrors(hri
, abort
);
1486 // No regions in RIT, we could stop waiting now.
1487 if (this.regionsInTransitionInRS
.isEmpty()) {
1488 if (!isOnlineRegionsEmpty()) {
1489 LOG
.info("We were exiting though online regions are not empty," +
1490 " because some regions failed closing");
1500 Thread
.currentThread().interrupt();
1505 private boolean sleep(long millis
) {
1506 boolean interrupted
= false;
1508 Thread
.sleep(millis
);
1509 } catch (InterruptedException e
) {
1510 LOG
.warn("Interrupted while sleeping");
1516 private void shutdownWAL(final boolean close
) {
1517 if (this.walFactory
!= null) {
1522 walFactory
.shutdown();
1524 } catch (Throwable e
) {
1525 e
= e
instanceof RemoteException ?
((RemoteException
) e
).unwrapRemoteException() : e
;
1526 LOG
.error("Shutdown / close of WAL failed: " + e
);
1527 LOG
.debug("Shutdown / close exception details:", e
);
1533 * Run init. Sets up wal and starts up all server threads.
1535 * @param c Extra configuration.
1537 protected void handleReportForDutyResponse(final RegionServerStartupResponse c
)
1538 throws IOException
{
1540 boolean updateRootDir
= false;
1541 for (NameStringPair e
: c
.getMapEntriesList()) {
1542 String key
= e
.getName();
1543 // The hostname the master sees us as.
1544 if (key
.equals(HConstants
.KEY_FOR_HOSTNAME_SEEN_BY_MASTER
)) {
1545 String hostnameFromMasterPOV
= e
.getValue();
1546 this.serverName
= ServerName
.valueOf(hostnameFromMasterPOV
, rpcServices
.isa
.getPort(),
1548 if (!StringUtils
.isBlank(useThisHostnameInstead
) &&
1549 !hostnameFromMasterPOV
.equals(useThisHostnameInstead
)) {
1550 String msg
= "Master passed us a different hostname to use; was=" +
1551 this.useThisHostnameInstead
+ ", but now=" + hostnameFromMasterPOV
;
1553 throw new IOException(msg
);
1555 if (StringUtils
.isBlank(useThisHostnameInstead
) &&
1556 !hostnameFromMasterPOV
.equals(rpcServices
.isa
.getHostName())) {
1557 String msg
= "Master passed us a different hostname to use; was=" +
1558 rpcServices
.isa
.getHostName() + ", but now=" + hostnameFromMasterPOV
;
1564 String value
= e
.getValue();
1565 if (key
.equals(HConstants
.HBASE_DIR
)) {
1566 if (value
!= null && !value
.equals(conf
.get(HConstants
.HBASE_DIR
))) {
1567 updateRootDir
= true;
1571 if (LOG
.isDebugEnabled()) {
1572 LOG
.debug("Config from master: " + key
+ "=" + value
);
1574 this.conf
.set(key
, value
);
1576 // Set our ephemeral znode up in zookeeper now we have a name.
1577 createMyEphemeralNode();
1579 if (updateRootDir
) {
1580 // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1581 initializeFileSystem();
1584 // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1585 // config param for task trackers, but we can piggyback off of it.
1586 if (this.conf
.get("mapreduce.task.attempt.id") == null) {
1587 this.conf
.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName
.toString());
1590 // Save it in a file, this will allow to see if we crash
1591 ZNodeClearer
.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1593 // This call sets up an initialized replication and WAL. Later we start it up.
1594 setupWALAndReplication();
1595 // Init in here rather than in constructor after thread name has been set
1596 this.metricsTable
= new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1597 this.metricsRegionServer
= new MetricsRegionServer(
1598 new MetricsRegionServerWrapperImpl(this), conf
, metricsTable
);
1599 // Now that we have a metrics source, start the pause monitor
1600 this.pauseMonitor
= new JvmPauseMonitor(conf
, getMetrics().getMetricsSource());
1601 pauseMonitor
.start();
1603 // There is a rare case where we do NOT want services to start. Check config.
1604 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1607 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1608 // or make sense of it.
1609 startReplicationService();
1613 LOG
.info("Serving as " + this.serverName
+ ", RpcServer on " + rpcServices
.isa
+
1615 Long
.toHexString(this.zooKeeper
.getRecoverableZooKeeper().getSessionId()));
1617 // Wake up anyone waiting for this server to online
1618 synchronized (online
) {
1622 } catch (Throwable e
) {
1623 stop("Failed initialization");
1624 throw convertThrowableToIOE(cleanup(e
, "Failed init"),
1625 "Region server startup failed");
1627 sleeper
.skipSleepCycle();
1631 protected void initializeMemStoreChunkCreator() {
1632 if (MemStoreLAB
.isEnabled(conf
)) {
1633 // MSLAB is enabled. So initialize MemStoreChunkPool
1634 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1636 Pair
<Long
, MemoryType
> pair
= MemorySizeUtil
.getGlobalMemStoreSize(conf
);
1637 long globalMemStoreSize
= pair
.getFirst();
1638 boolean offheap
= this.regionServerAccounting
.isOffheap();
1639 // When off heap memstore in use, take full area for chunk pool.
1640 float poolSizePercentage
= offheap?
1.0F
:
1641 conf
.getFloat(MemStoreLAB
.CHUNK_POOL_MAXSIZE_KEY
, MemStoreLAB
.POOL_MAX_SIZE_DEFAULT
);
1642 float initialCountPercentage
= conf
.getFloat(MemStoreLAB
.CHUNK_POOL_INITIALSIZE_KEY
,
1643 MemStoreLAB
.POOL_INITIAL_SIZE_DEFAULT
);
1644 int chunkSize
= conf
.getInt(MemStoreLAB
.CHUNK_SIZE_KEY
, MemStoreLAB
.CHUNK_SIZE_DEFAULT
);
1645 // init the chunkCreator
1646 ChunkCreator
.initialize(chunkSize
, offheap
, globalMemStoreSize
, poolSizePercentage
,
1647 initialCountPercentage
, this.hMemManager
);
1651 private void startHeapMemoryManager() {
1652 if (this.blockCache
!= null) {
1654 new HeapMemoryManager(this.blockCache
, this.cacheFlusher
, this, regionServerAccounting
);
1655 this.hMemManager
.start(getChoreService());
1659 private void createMyEphemeralNode() throws KeeperException
, IOException
{
1660 RegionServerInfo
.Builder rsInfo
= RegionServerInfo
.newBuilder();
1661 rsInfo
.setInfoPort(infoServer
!= null ? infoServer
.getPort() : -1);
1662 rsInfo
.setVersionInfo(ProtobufUtil
.getVersionInfo());
1663 byte[] data
= ProtobufUtil
.prependPBMagic(rsInfo
.build().toByteArray());
1664 ZKUtil
.createEphemeralNodeAndWatch(this.zooKeeper
, getMyEphemeralNodePath(), data
);
1667 private void deleteMyEphemeralNode() throws KeeperException
{
1668 ZKUtil
.deleteNode(this.zooKeeper
, getMyEphemeralNodePath());
1672 public RegionServerAccounting
getRegionServerAccounting() {
1673 return regionServerAccounting
;
1677 * @param r Region to get RegionLoad for.
1678 * @param regionLoadBldr the RegionLoad.Builder, can be null
1679 * @param regionSpecifier the RegionSpecifier.Builder, can be null
1680 * @return RegionLoad instance.
1682 * @throws IOException
1684 RegionLoad
createRegionLoad(final HRegion r
, RegionLoad
.Builder regionLoadBldr
,
1685 RegionSpecifier
.Builder regionSpecifier
) throws IOException
{
1686 byte[] name
= r
.getRegionInfo().getRegionName();
1689 int storeUncompressedSizeMB
= 0;
1690 int storefileSizeMB
= 0;
1691 int memstoreSizeMB
= (int) (r
.getMemStoreDataSize() / 1024 / 1024);
1692 long storefileIndexSizeKB
= 0;
1693 int rootLevelIndexSizeKB
= 0;
1694 int totalStaticIndexSizeKB
= 0;
1695 int totalStaticBloomSizeKB
= 0;
1696 long totalCompactingKVs
= 0;
1697 long currentCompactedKVs
= 0;
1698 List
<HStore
> storeList
= r
.getStores();
1699 stores
+= storeList
.size();
1700 for (HStore store
: storeList
) {
1701 storefiles
+= store
.getStorefilesCount();
1702 storeUncompressedSizeMB
+= (int) (store
.getStoreSizeUncompressed() / 1024 / 1024);
1703 storefileSizeMB
+= (int) (store
.getStorefilesSize() / 1024 / 1024);
1704 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1705 storefileIndexSizeKB
+= store
.getStorefilesRootLevelIndexSize() / 1024;
1706 CompactionProgress progress
= store
.getCompactionProgress();
1707 if (progress
!= null) {
1708 totalCompactingKVs
+= progress
.getTotalCompactingKVs();
1709 currentCompactedKVs
+= progress
.currentCompactedKVs
;
1711 rootLevelIndexSizeKB
+= (int) (store
.getStorefilesRootLevelIndexSize() / 1024);
1712 totalStaticIndexSizeKB
+= (int) (store
.getTotalStaticIndexSize() / 1024);
1713 totalStaticBloomSizeKB
+= (int) (store
.getTotalStaticBloomSize() / 1024);
1716 float dataLocality
=
1717 r
.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName
.getHostname());
1718 if (regionLoadBldr
== null) {
1719 regionLoadBldr
= RegionLoad
.newBuilder();
1721 if (regionSpecifier
== null) {
1722 regionSpecifier
= RegionSpecifier
.newBuilder();
1724 regionSpecifier
.setType(RegionSpecifierType
.REGION_NAME
);
1725 regionSpecifier
.setValue(UnsafeByteOperations
.unsafeWrap(name
));
1726 regionLoadBldr
.setRegionSpecifier(regionSpecifier
.build())
1728 .setStorefiles(storefiles
)
1729 .setStoreUncompressedSizeMB(storeUncompressedSizeMB
)
1730 .setStorefileSizeMB(storefileSizeMB
)
1731 .setMemStoreSizeMB(memstoreSizeMB
)
1732 .setStorefileIndexSizeKB(storefileIndexSizeKB
)
1733 .setRootIndexSizeKB(rootLevelIndexSizeKB
)
1734 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB
)
1735 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB
)
1736 .setReadRequestsCount(r
.getReadRequestsCount())
1737 .setCpRequestsCount(r
.getCpRequestsCount())
1738 .setFilteredReadRequestsCount(r
.getFilteredReadRequestsCount())
1739 .setWriteRequestsCount(r
.getWriteRequestsCount())
1740 .setTotalCompactingKVs(totalCompactingKVs
)
1741 .setCurrentCompactedKVs(currentCompactedKVs
)
1742 .setDataLocality(dataLocality
)
1743 .setLastMajorCompactionTs(r
.getOldestHfileTs(true));
1744 r
.setCompleteSequenceId(regionLoadBldr
);
1746 return regionLoadBldr
.build();
1750 * @param encodedRegionName
1751 * @return An instance of RegionLoad.
1753 public RegionLoad
createRegionLoad(final String encodedRegionName
) throws IOException
{
1754 HRegion r
= onlineRegions
.get(encodedRegionName
);
1755 return r
!= null ?
createRegionLoad(r
, null, null) : null;
1759 * Inner class that runs on a long period checking if regions need compaction.
1761 private static class CompactionChecker
extends ScheduledChore
{
1762 private final HRegionServer instance
;
1763 private final int majorCompactPriority
;
1764 private final static int DEFAULT_PRIORITY
= Integer
.MAX_VALUE
;
1765 //Iteration is 1-based rather than 0-based so we don't check for compaction
1766 // immediately upon region server startup
1767 private long iteration
= 1;
1769 CompactionChecker(final HRegionServer h
, final int sleepTime
, final Stoppable stopper
) {
1770 super("CompactionChecker", stopper
, sleepTime
);
1772 LOG
.info(this.getName() + " runs every " + Duration
.ofMillis(sleepTime
));
1774 /* MajorCompactPriority is configurable.
1775 * If not set, the compaction will use default priority.
1777 this.majorCompactPriority
= this.instance
.conf
.
1778 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1783 protected void chore() {
1784 for (Region r
: this.instance
.onlineRegions
.values()) {
1788 HRegion hr
= (HRegion
) r
;
1789 for (HStore s
: hr
.stores
.values()) {
1791 long multiplier
= s
.getCompactionCheckMultiplier();
1792 assert multiplier
> 0;
1793 if (iteration
% multiplier
!= 0) {
1796 if (s
.needsCompaction()) {
1797 // Queue a compaction. Will recognize if major is needed.
1798 this.instance
.compactSplitThread
.requestSystemCompaction(hr
, s
,
1799 getName() + " requests compaction");
1800 } else if (s
.shouldPerformMajorCompaction()) {
1801 s
.triggerMajorCompaction();
1802 if (majorCompactPriority
== DEFAULT_PRIORITY
||
1803 majorCompactPriority
> hr
.getCompactPriority()) {
1804 this.instance
.compactSplitThread
.requestCompaction(hr
, s
,
1805 getName() + " requests major compaction; use default priority",
1807 CompactionLifeCycleTracker
.DUMMY
, null);
1809 this.instance
.compactSplitThread
.requestCompaction(hr
, s
,
1810 getName() + " requests major compaction; use configured priority",
1811 this.majorCompactPriority
, CompactionLifeCycleTracker
.DUMMY
, null);
1814 } catch (IOException e
) {
1815 LOG
.warn("Failed major compaction check on " + r
, e
);
1819 iteration
= (iteration
== Long
.MAX_VALUE
) ?
0 : (iteration
+ 1);
1823 static class PeriodicMemStoreFlusher
extends ScheduledChore
{
1824 final HRegionServer server
;
1825 final static int RANGE_OF_DELAY
= 5 * 60; // 5 min in seconds
1826 final static int MIN_DELAY_TIME
= 0; // millisec
1828 final int rangeOfDelay
;
1829 public PeriodicMemStoreFlusher(int cacheFlushInterval
, final HRegionServer server
) {
1830 super("MemstoreFlusherChore", server
, cacheFlushInterval
);
1831 this.server
= server
;
1833 this.rangeOfDelay
= this.server
.conf
.getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds",
1834 RANGE_OF_DELAY
)*1000;
1838 protected void chore() {
1839 final StringBuilder whyFlush
= new StringBuilder();
1840 for (HRegion r
: this.server
.onlineRegions
.values()) {
1841 if (r
== null) continue;
1842 if (r
.shouldFlush(whyFlush
)) {
1843 FlushRequester requester
= server
.getFlushRequester();
1844 if (requester
!= null) {
1845 long randomDelay
= (long) RandomUtils
.nextInt(0, rangeOfDelay
) + MIN_DELAY_TIME
;
1846 //Throttle the flushes by putting a delay. If we don't throttle, and there
1847 //is a balanced write-load on the regions in a table, we might end up
1848 //overwhelming the filesystem with too many flushes at once.
1849 if (requester
.requestDelayedFlush(r
, randomDelay
, false)) {
1850 LOG
.info("{} requesting flush of {} because {} after random delay {} ms",
1851 getName(), r
.getRegionInfo().getRegionNameAsString(), whyFlush
.toString(),
1861 * Report the status of the server. A server is online once all the startup is
1862 * completed (setting up filesystem, starting executorService threads, etc.). This
1863 * method is designed mostly to be useful in tests.
1865 * @return true if online, false if not.
1867 public boolean isOnline() {
1868 return online
.get();
1872 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1873 * be hooked up to WAL.
1875 private void setupWALAndReplication() throws IOException
{
1876 boolean isMasterNoTableOrSystemTableOnly
= this instanceof HMaster
&&
1877 !LoadBalancer
.isMasterCanHostUserRegions(conf
);
1878 WALFactory factory
=
1879 new WALFactory(conf
, serverName
.toString(), !isMasterNoTableOrSystemTableOnly
);
1880 if (!isMasterNoTableOrSystemTableOnly
) {
1881 // TODO Replication make assumptions here based on the default filesystem impl
1882 Path oldLogDir
= new Path(walRootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
1883 String logName
= AbstractFSWALProvider
.getWALDirectoryName(this.serverName
.toString());
1885 Path logDir
= new Path(walRootDir
, logName
);
1886 LOG
.debug("logDir={}", logDir
);
1887 if (this.walFs
.exists(logDir
)) {
1888 throw new RegionServerRunningException(
1889 "Region server has already created directory at " + this.serverName
.toString());
1891 // Always create wal directory as now we need this when master restarts to find out the live
1893 if (!this.walFs
.mkdirs(logDir
)) {
1894 throw new IOException("Can not create wal directory " + logDir
);
1896 // Instantiate replication if replication enabled. Pass it the log directories.
1897 createNewReplicationInstance(conf
, this, this.walFs
, logDir
, oldLogDir
,
1898 factory
.getWALProvider());
1900 this.walFactory
= factory
;
1904 * Start up replication source and sink handlers.
1905 * @throws IOException
1907 private void startReplicationService() throws IOException
{
1908 if (this.replicationSourceHandler
== this.replicationSinkHandler
&&
1909 this.replicationSourceHandler
!= null) {
1910 this.replicationSourceHandler
.startReplicationService();
1912 if (this.replicationSourceHandler
!= null) {
1913 this.replicationSourceHandler
.startReplicationService();
1915 if (this.replicationSinkHandler
!= null) {
1916 this.replicationSinkHandler
.startReplicationService();
1922 public MetricsRegionServer
getRegionServerMetrics() {
1923 return this.metricsRegionServer
;
1927 * @return Master address tracker instance.
1929 public MasterAddressTracker
getMasterAddressTracker() {
1930 return this.masterAddressTracker
;
1934 * Start maintenance Threads, Server, Worker and lease checker threads.
1935 * Start all threads we need to run. This is called after we've successfully
1936 * registered with the Master.
1937 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1938 * get an unhandled exception. We cannot set the handler on all threads.
1939 * Server's internal Listener thread is off limits. For Server, if an OOME, it
1940 * waits a while then retries. Meantime, a flush or a compaction that tries to
1941 * run should trigger same critical condition and the shutdown will run. On
1942 * its way out, this server will shut down Server. Leases are sort of
1943 * inbetween. It has an internal thread that while it inherits from Chore, it
1944 * keeps its own internal stop mechanism so needs to be stopped by this
1945 * hosting server. Worker logs the exception and exits.
1947 private void startServices() throws IOException
{
1948 if (!isStopped() && !isAborted()) {
1949 initializeThreads();
1951 this.secureBulkLoadManager
= new SecureBulkLoadManager(this.conf
, asyncClusterConnection
);
1952 this.secureBulkLoadManager
.start();
1954 // Health checker thread.
1955 if (isHealthCheckerConfigured()) {
1956 int sleepTime
= this.conf
.getInt(HConstants
.HEALTH_CHORE_WAKE_FREQ
,
1957 HConstants
.DEFAULT_THREAD_WAKE_FREQUENCY
);
1958 healthCheckChore
= new HealthCheckChore(sleepTime
, this, getConfiguration());
1961 this.walRoller
= new LogRoller(this, this);
1962 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, conf
);
1963 this.procedureResultReporter
= new RemoteProcedureResultReporter(this);
1965 // Create the CompactedFileDischarger chore executorService. This chore helps to
1966 // remove the compacted files
1967 // that will no longer be used in reads.
1968 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1969 // 2 mins so that compacted files can be archived before the TTLCleaner runs
1970 int cleanerInterval
=
1971 conf
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1972 this.compactedFileDischarger
=
1973 new CompactedHFilesDischarger(cleanerInterval
, this, this);
1974 choreService
.scheduleChore(compactedFileDischarger
);
1976 // Start executor services
1977 this.executorService
.startExecutorService(ExecutorType
.RS_OPEN_REGION
,
1978 conf
.getInt("hbase.regionserver.executor.openregion.threads", 3));
1979 this.executorService
.startExecutorService(ExecutorType
.RS_OPEN_META
,
1980 conf
.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1981 this.executorService
.startExecutorService(ExecutorType
.RS_OPEN_PRIORITY_REGION
,
1982 conf
.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
1983 this.executorService
.startExecutorService(ExecutorType
.RS_CLOSE_REGION
,
1984 conf
.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1985 this.executorService
.startExecutorService(ExecutorType
.RS_CLOSE_META
,
1986 conf
.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1987 if (conf
.getBoolean(StoreScanner
.STORESCANNER_PARALLEL_SEEK_ENABLE
, false)) {
1988 this.executorService
.startExecutorService(ExecutorType
.RS_PARALLEL_SEEK
,
1989 conf
.getInt("hbase.storescanner.parallel.seek.threads", 10));
1991 this.executorService
.startExecutorService(ExecutorType
.RS_LOG_REPLAY_OPS
, conf
.getInt(
1992 HBASE_SPLIT_WAL_MAX_SPLITTER
, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER
));
1993 // Start the threads for compacted files discharger
1994 this.executorService
.startExecutorService(ExecutorType
.RS_COMPACTED_FILES_DISCHARGER
,
1995 conf
.getInt(CompactionConfiguration
.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT
, 10));
1996 if (ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(conf
)) {
1997 this.executorService
.startExecutorService(ExecutorType
.RS_REGION_REPLICA_FLUSH_OPS
,
1998 conf
.getInt("hbase.regionserver.region.replica.flusher.threads",
1999 conf
.getInt("hbase.regionserver.executor.openregion.threads", 3)));
2001 this.executorService
.startExecutorService(ExecutorType
.RS_REFRESH_PEER
,
2002 conf
.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
2003 this.executorService
.startExecutorService(ExecutorType
.RS_REPLAY_SYNC_REPLICATION_WAL
,
2004 conf
.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1));
2005 this.executorService
.startExecutorService(ExecutorType
.RS_SWITCH_RPC_THROTTLE
,
2006 conf
.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1));
2008 Threads
.setDaemonThreadRunning(this.walRoller
.getThread(), getName() + ".logRoller",
2009 uncaughtExceptionHandler
);
2010 if (this.cacheFlusher
!= null) {
2011 this.cacheFlusher
.start(uncaughtExceptionHandler
);
2013 Threads
.setDaemonThreadRunning(this.procedureResultReporter
,
2014 getName() + ".procedureResultReporter", uncaughtExceptionHandler
);
2016 if (this.compactionChecker
!= null) choreService
.scheduleChore(compactionChecker
);
2017 if (this.periodicFlusher
!= null) choreService
.scheduleChore(periodicFlusher
);
2018 if (this.healthCheckChore
!= null) choreService
.scheduleChore(healthCheckChore
);
2019 if (this.nonceManagerChore
!= null) choreService
.scheduleChore(nonceManagerChore
);
2020 if (this.storefileRefresher
!= null) choreService
.scheduleChore(storefileRefresher
);
2021 if (this.movedRegionsCleaner
!= null) choreService
.scheduleChore(movedRegionsCleaner
);
2022 if (this.fsUtilizationChore
!= null) choreService
.scheduleChore(fsUtilizationChore
);
2024 // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2025 // an unhandled exception, it will just exit.
2026 Threads
.setDaemonThreadRunning(this.leases
.getThread(), getName() + ".leaseChecker",
2027 uncaughtExceptionHandler
);
2029 // Create the log splitting worker and start it
2030 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2031 // quite a while inside Connection layer. The worker won't be available for other
2032 // tasks even after current task is preempted after a split task times out.
2033 Configuration sinkConf
= HBaseConfiguration
.create(conf
);
2034 sinkConf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
2035 conf
.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2036 sinkConf
.setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
,
2037 conf
.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2038 sinkConf
.setInt(HConstants
.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER
, 1);
2039 if (this.csm
!= null && conf
.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK
,
2040 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK
)) {
2041 // SplitLogWorker needs csm. If none, don't start this.
2042 this.splitLogWorker
= new SplitLogWorker(sinkConf
, this, this, walFactory
);
2043 splitLogWorker
.start();
2045 LOG
.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");
2048 // Memstore services.
2049 startHeapMemoryManager();
2050 // Call it after starting HeapMemoryManager.
2051 initializeMemStoreChunkCreator();
2054 private void initializeThreads() throws IOException
{
2055 // Cache flushing thread.
2056 this.cacheFlusher
= new MemStoreFlusher(conf
, this);
2058 // Compaction thread
2059 this.compactSplitThread
= new CompactSplit(this);
2061 // Background thread to check for compactions; needed if region has not gotten updates
2062 // in a while. It will take care of not checking too frequently on store-by-store basis.
2063 this.compactionChecker
= new CompactionChecker(this, this.threadWakeFrequency
, this);
2064 this.periodicFlusher
= new PeriodicMemStoreFlusher(this.threadWakeFrequency
, this);
2065 this.leases
= new Leases(this.threadWakeFrequency
);
2067 // Create the thread to clean the moved regions list
2068 movedRegionsCleaner
= MovedRegionsCleaner
.create(this);
2070 if (this.nonceManager
!= null) {
2071 // Create the scheduled chore that cleans up nonces.
2072 nonceManagerChore
= this.nonceManager
.createCleanupScheduledChore(this);
2075 // Setup the Quota Manager
2076 rsQuotaManager
= new RegionServerRpcQuotaManager(this);
2077 rsSpaceQuotaManager
= new RegionServerSpaceQuotaManager(this);
2079 if (QuotaUtil
.isQuotaEnabled(conf
)) {
2080 this.fsUtilizationChore
= new FileSystemUtilizationChore(this);
2084 boolean onlyMetaRefresh
= false;
2085 int storefileRefreshPeriod
= conf
.getInt(
2086 StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
,
2087 StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
2088 if (storefileRefreshPeriod
== 0) {
2089 storefileRefreshPeriod
= conf
.getInt(
2090 StorefileRefresherChore
.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD
,
2091 StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
2092 onlyMetaRefresh
= true;
2094 if (storefileRefreshPeriod
> 0) {
2095 this.storefileRefresher
= new StorefileRefresherChore(storefileRefreshPeriod
,
2096 onlyMetaRefresh
, this, this);
2098 registerConfigurationObservers();
2101 private void registerConfigurationObservers() {
2102 // Registering the compactSplitThread object with the ConfigurationManager.
2103 configurationManager
.registerObserver(this.compactSplitThread
);
2104 configurationManager
.registerObserver(this.rpcServices
);
2105 configurationManager
.registerObserver(this);
2109 * Puts up the webui.
2110 * @return Returns final port -- maybe different from what we started with.
2111 * @throws IOException
2113 private int putUpWebUI() throws IOException
{
2114 int port
= this.conf
.getInt(HConstants
.REGIONSERVER_INFO_PORT
,
2115 HConstants
.DEFAULT_REGIONSERVER_INFOPORT
);
2116 String addr
= this.conf
.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
2118 if(this instanceof HMaster
) {
2119 port
= conf
.getInt(HConstants
.MASTER_INFO_PORT
,
2120 HConstants
.DEFAULT_MASTER_INFOPORT
);
2121 addr
= this.conf
.get("hbase.master.info.bindAddress", "0.0.0.0");
2123 // -1 is for disabling info server
2124 if (port
< 0) return port
;
2126 if (!Addressing
.isLocalAddress(InetAddress
.getByName(addr
))) {
2128 "Failed to start http info server. Address " + addr
2129 + " does not belong to this host. Correct configuration parameter: "
2130 + "hbase.regionserver.info.bindAddress";
2132 throw new IOException(msg
);
2134 // check if auto port bind enabled
2135 boolean auto
= this.conf
.getBoolean(HConstants
.REGIONSERVER_INFO_PORT_AUTO
,
2139 this.infoServer
= new InfoServer(getProcessName(), addr
, port
, false, this.conf
);
2140 infoServer
.addServlet("dump", "/dump", getDumpServlet());
2141 configureInfoServer();
2142 this.infoServer
.start();
2144 } catch (BindException e
) {
2146 // auto bind disabled throw BindException
2147 LOG
.error("Failed binding http info server to port: " + port
);
2150 // auto bind enabled, try to use another port
2151 LOG
.info("Failed binding http info server to port: " + port
);
2155 port
= this.infoServer
.getPort();
2156 conf
.setInt(HConstants
.REGIONSERVER_INFO_PORT
, port
);
2157 int masterInfoPort
= conf
.getInt(HConstants
.MASTER_INFO_PORT
,
2158 HConstants
.DEFAULT_MASTER_INFOPORT
);
2159 conf
.setInt("hbase.master.info.port.orig", masterInfoPort
);
2160 conf
.setInt(HConstants
.MASTER_INFO_PORT
, port
);
2165 * Verify that server is healthy
2167 private boolean isHealthy() {
2169 // File system problem
2172 // Verify that all threads are alive
2173 boolean healthy
= (this.leases
== null || this.leases
.isAlive())
2174 && (this.cacheFlusher
== null || this.cacheFlusher
.isAlive())
2175 && (this.walRoller
== null || this.walRoller
.isAlive())
2176 && (this.compactionChecker
== null || this.compactionChecker
.isScheduled())
2177 && (this.periodicFlusher
== null || this.periodicFlusher
.isScheduled());
2179 stop("One or more threads are no longer alive -- stop");
2185 public List
<WAL
> getWALs() throws IOException
{
2186 return walFactory
.getWALs();
2190 public WAL
getWAL(RegionInfo regionInfo
) throws IOException
{
2191 WAL wal
= walFactory
.getWAL(regionInfo
);
2192 if (this.walRoller
!= null) {
2193 this.walRoller
.addWAL(wal
);
2198 public LogRoller
getWalRoller() {
2203 public Connection
getConnection() {
2204 return getClusterConnection();
2208 public ClusterConnection
getClusterConnection() {
2209 return this.clusterConnection
;
2213 public void stop(final String msg
) {
2214 stop(msg
, false, RpcServer
.getRequestUser().orElse(null));
2218 * Stops the regionserver.
2219 * @param msg Status message
2220 * @param force True if this is a regionserver abort
2221 * @param user The user executing the stop request, or null if no user is associated
2223 public void stop(final String msg
, final boolean force
, final User user
) {
2224 if (!this.stopped
) {
2225 LOG
.info("***** STOPPING region server '" + this + "' *****");
2226 if (this.rsHost
!= null) {
2227 // when forced via abort don't allow CPs to override
2229 this.rsHost
.preStop(msg
, user
);
2230 } catch (IOException ioe
) {
2232 LOG
.warn("The region server did not stop", ioe
);
2235 LOG
.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe
);
2238 this.stopped
= true;
2239 LOG
.info("STOPPED: " + msg
);
2240 // Wakes run() if it is sleeping
2241 sleeper
.skipSleepCycle();
2245 public void waitForServerOnline(){
2246 while (!isStopped() && !isOnline()) {
2247 synchronized (online
) {
2249 online
.wait(msgInterval
);
2250 } catch (InterruptedException ie
) {
2251 Thread
.currentThread().interrupt();
2259 public void postOpenDeployTasks(final PostOpenDeployContext context
) throws IOException
{
2260 HRegion r
= context
.getRegion();
2261 long openProcId
= context
.getOpenProcId();
2262 long masterSystemTime
= context
.getMasterSystemTime();
2263 rpcServices
.checkOpen();
2264 LOG
.info("Post open deploy tasks for {}, openProcId={}, masterSystemTime={}",
2265 r
.getRegionInfo().getRegionNameAsString(), openProcId
, masterSystemTime
);
2266 // Do checks to see if we need to compact (references or too many files)
2267 for (HStore s
: r
.stores
.values()) {
2268 if (s
.hasReferences() || s
.needsCompaction()) {
2269 this.compactSplitThread
.requestSystemCompaction(r
, s
, "Opening Region");
2272 long openSeqNum
= r
.getOpenSeqNum();
2273 if (openSeqNum
== HConstants
.NO_SEQNUM
) {
2274 // If we opened a region, we should have read some sequence number from it.
2276 "No sequence number found when opening " + r
.getRegionInfo().getRegionNameAsString());
2281 if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode
.OPENED
,
2282 openSeqNum
, openProcId
, masterSystemTime
, r
.getRegionInfo()))) {
2283 throw new IOException(
2284 "Failed to report opened region to master: " + r
.getRegionInfo().getRegionNameAsString());
2287 triggerFlushInPrimaryRegion(r
);
2289 LOG
.debug("Finished post open deploy task for " + r
.getRegionInfo().getRegionNameAsString());
2293 public boolean reportRegionStateTransition(final RegionStateTransitionContext context
) {
2294 TransitionCode code
= context
.getCode();
2295 long openSeqNum
= context
.getOpenSeqNum();
2296 long masterSystemTime
= context
.getMasterSystemTime();
2297 RegionInfo
[] hris
= context
.getHris();
2298 long[] procIds
= context
.getProcIds();
2300 if (TEST_SKIP_REPORTING_TRANSITION
) {
2301 // This is for testing only in case there is no master
2302 // to handle the region transition report at all.
2303 if (code
== TransitionCode
.OPENED
) {
2304 Preconditions
.checkArgument(hris
!= null && hris
.length
== 1);
2305 if (hris
[0].isMetaRegion()) {
2307 MetaTableLocator
.setMetaLocation(getZooKeeper(), serverName
,
2308 hris
[0].getReplicaId(),State
.OPEN
);
2309 } catch (KeeperException e
) {
2310 LOG
.info("Failed to update meta location", e
);
2315 MetaTableAccessor
.updateRegionLocation(clusterConnection
,
2316 hris
[0], serverName
, openSeqNum
, masterSystemTime
);
2317 } catch (IOException e
) {
2318 LOG
.info("Failed to update meta", e
);
2326 ReportRegionStateTransitionRequest
.Builder builder
=
2327 ReportRegionStateTransitionRequest
.newBuilder();
2328 builder
.setServer(ProtobufUtil
.toServerName(serverName
));
2329 RegionStateTransition
.Builder transition
= builder
.addTransitionBuilder();
2330 transition
.setTransitionCode(code
);
2331 if (code
== TransitionCode
.OPENED
&& openSeqNum
>= 0) {
2332 transition
.setOpenSeqNum(openSeqNum
);
2334 for (RegionInfo hri
: hris
) {
2335 transition
.addRegionInfo(ProtobufUtil
.toRegionInfo(hri
));
2337 for (long procId
: procIds
) {
2338 transition
.addProcId(procId
);
2340 ReportRegionStateTransitionRequest request
= builder
.build();
2342 long pauseTime
= INIT_PAUSE_TIME_MS
;
2343 // Keep looping till we get an error. We want to send reports even though server is going down.
2344 // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2345 // HRegionServer does down.
2346 while (this.clusterConnection
!= null && !this.clusterConnection
.isClosed()) {
2347 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2350 createRegionServerStatusStub();
2353 ReportRegionStateTransitionResponse response
=
2354 rss
.reportRegionStateTransition(null, request
);
2355 if (response
.hasErrorMessage()) {
2356 LOG
.info("TRANSITION FAILED " + request
+ ": " + response
.getErrorMessage());
2359 // Log if we had to retry else don't log unless TRACE. We want to
2360 // know if were successful after an attempt showed in logs as failed.
2361 if (tries
> 0 || LOG
.isTraceEnabled()) {
2362 LOG
.info("TRANSITION REPORTED " + request
);
2364 // NOTE: Return mid-method!!!
2366 } catch (ServiceException se
) {
2367 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2368 boolean pause
= ioe
instanceof ServerNotRunningYetException
||
2369 ioe
instanceof PleaseHoldException
;
2371 // Do backoff else we flood the Master with requests.
2372 pauseTime
= ConnectionUtils
.getPauseTime(INIT_PAUSE_TIME_MS
, tries
);
2374 pauseTime
= INIT_PAUSE_TIME_MS
; // Reset.
2376 LOG
.info("Failed report transition " +
2377 TextFormat
.shortDebugString(request
) + "; retry (#" + tries
+ ")" +
2379 " after " + pauseTime
+ "ms delay (Master is coming online...).":
2382 if (pause
) Threads
.sleep(pauseTime
);
2384 if (rssStub
== rss
) {
2393 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2394 * block this thread. See RegionReplicaFlushHandler for details.
2396 void triggerFlushInPrimaryRegion(final HRegion region
) {
2397 if (ServerRegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo())) {
2400 if (!ServerRegionReplicaUtil
.isRegionReplicaReplicationEnabled(region
.conf
) ||
2401 !ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(
2403 region
.setReadsEnabled(true);
2407 region
.setReadsEnabled(false); // disable reads before marking the region as opened.
2408 // RegionReplicaFlushHandler might reset this.
2410 // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2411 if (this.executorService
!= null) {
2412 this.executorService
.submit(new RegionReplicaFlushHandler(this, region
));
2417 public RpcServerInterface
getRpcServer() {
2418 return rpcServices
.rpcServer
;
2422 public RSRpcServices
getRSRpcServices() {
2427 * Cause the server to exit without closing the regions it is serving, the log
2428 * it is using and without notifying the master. Used unit testing and on
2429 * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2432 * the reason we are aborting
2434 * the exception that caused the abort, or null
2437 public void abort(String reason
, Throwable cause
) {
2438 String msg
= "***** ABORTING region server " + this + ": " + reason
+ " *****";
2439 if (cause
!= null) {
2440 LOG
.error(HBaseMarkers
.FATAL
, msg
, cause
);
2442 LOG
.error(HBaseMarkers
.FATAL
, msg
);
2444 setAbortRequested();
2445 // HBASE-4014: show list of coprocessors that were loaded to help debug
2446 // regionserver crashes.Note that we're implicitly using
2447 // java.util.HashSet's toString() method to print the coprocessor names.
2448 LOG
.error(HBaseMarkers
.FATAL
, "RegionServer abort: loaded coprocessors are: " +
2449 CoprocessorHost
.getLoadedCoprocessors());
2450 // Try and dump metrics if abort -- might give clue as to how fatal came about....
2452 LOG
.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics
.dumpMetrics());
2453 } catch (MalformedObjectNameException
| IOException e
) {
2454 LOG
.warn("Failed dumping metrics", e
);
2457 // Do our best to report our abort to the master, but this may not work
2459 if (cause
!= null) {
2460 msg
+= "\nCause:\n" + Throwables
.getStackTraceAsString(cause
);
2462 // Report to the master but only if we have already registered with the master.
2463 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2464 if (rss
!= null && this.serverName
!= null) {
2465 ReportRSFatalErrorRequest
.Builder builder
=
2466 ReportRSFatalErrorRequest
.newBuilder();
2467 builder
.setServer(ProtobufUtil
.toServerName(this.serverName
));
2468 builder
.setErrorMessage(msg
);
2469 rss
.reportRSFatalError(null, builder
.build());
2471 } catch (Throwable t
) {
2472 LOG
.warn("Unable to report fatal error to master", t
);
2475 scheduleAbortTimer();
2476 // shutdown should be run as the internal user
2477 stop(reason
, true, null);
2480 protected final void setAbortRequested() {
2481 this.abortRequested
= true;
2485 * @see HRegionServer#abort(String, Throwable)
2487 public void abort(String reason
) {
2488 abort(reason
, null);
2492 public boolean isAborted() {
2493 return this.abortRequested
;
2497 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2498 * logs but it does close socket in case want to bring up server on old
2499 * hostname+port immediately.
2502 protected void kill() {
2504 abort("Simulated kill");
2508 * Called on stop/abort before closing the cluster connection and meta locator.
2510 protected void sendShutdownInterrupt() {
2513 // Limits the time spent in the shutdown process.
2514 private void scheduleAbortTimer() {
2515 if (this.abortMonitor
== null) {
2516 this.abortMonitor
= new Timer("Abort regionserver monitor", true);
2517 TimerTask abortTimeoutTask
= null;
2520 Class
.forName(conf
.get(ABORT_TIMEOUT_TASK
, SystemExitWhenAbortTimeout
.class.getName()))
2521 .asSubclass(TimerTask
.class).getDeclaredConstructor().newInstance();
2522 } catch (Exception e
) {
2523 LOG
.warn("Initialize abort timeout task failed", e
);
2525 if (abortTimeoutTask
!= null) {
2526 abortMonitor
.schedule(abortTimeoutTask
, conf
.getLong(ABORT_TIMEOUT
, DEFAULT_ABORT_TIMEOUT
));
2532 * Wait on all threads to finish. Presumption is that all closes and stops
2533 * have already been called.
2535 protected void stopServiceThreads() {
2536 // clean up the scheduled chores
2537 if (this.choreService
!= null) {
2538 choreService
.cancelChore(nonceManagerChore
);
2539 choreService
.cancelChore(compactionChecker
);
2540 choreService
.cancelChore(periodicFlusher
);
2541 choreService
.cancelChore(healthCheckChore
);
2542 choreService
.cancelChore(storefileRefresher
);
2543 choreService
.cancelChore(movedRegionsCleaner
);
2544 choreService
.cancelChore(fsUtilizationChore
);
2545 // clean up the remaining scheduled chores (in case we missed out any)
2546 choreService
.shutdown();
2549 if (this.cacheFlusher
!= null) {
2550 this.cacheFlusher
.join();
2553 if (this.spanReceiverHost
!= null) {
2554 this.spanReceiverHost
.closeReceivers();
2556 if (this.walRoller
!= null) {
2557 this.walRoller
.close();
2559 if (this.compactSplitThread
!= null) {
2560 this.compactSplitThread
.join();
2562 if (this.executorService
!= null) this.executorService
.shutdown();
2563 if (this.replicationSourceHandler
!= null &&
2564 this.replicationSourceHandler
== this.replicationSinkHandler
) {
2565 this.replicationSourceHandler
.stopReplicationService();
2567 if (this.replicationSourceHandler
!= null) {
2568 this.replicationSourceHandler
.stopReplicationService();
2570 if (this.replicationSinkHandler
!= null) {
2571 this.replicationSinkHandler
.stopReplicationService();
2577 * @return Return the object that implements the replication
2578 * source executorService.
2581 public ReplicationSourceService
getReplicationSourceService() {
2582 return replicationSourceHandler
;
2586 * @return Return the object that implements the replication sink executorService.
2588 public ReplicationSinkService
getReplicationSinkService() {
2589 return replicationSinkHandler
;
2593 * Get the current master from ZooKeeper and open the RPC connection to it.
2594 * To get a fresh connection, the current rssStub must be null.
2595 * Method will block until a master is available. You can break from this
2596 * block by requesting the server stop.
2598 * @return master + port, or null if server has been stopped
2601 protected synchronized ServerName
createRegionServerStatusStub() {
2602 // Create RS stub without refreshing the master node from ZK, use cached data
2603 return createRegionServerStatusStub(false);
2607 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2608 * connection, the current rssStub must be null. Method will block until a master is available.
2609 * You can break from this block by requesting the server stop.
2610 * @param refresh If true then master address will be read from ZK, otherwise use cached data
2611 * @return master + port, or null if server has been stopped
2614 protected synchronized ServerName
createRegionServerStatusStub(boolean refresh
) {
2615 if (rssStub
!= null) {
2616 return masterAddressTracker
.getMasterAddress();
2618 ServerName sn
= null;
2619 long previousLogTime
= 0;
2620 RegionServerStatusService
.BlockingInterface intRssStub
= null;
2621 LockService
.BlockingInterface intLockStub
= null;
2622 boolean interrupted
= false;
2624 while (keepLooping()) {
2625 sn
= this.masterAddressTracker
.getMasterAddress(refresh
);
2627 if (!keepLooping()) {
2628 // give up with no connection.
2629 LOG
.debug("No master found and cluster is stopped; bailing out");
2632 if (System
.currentTimeMillis() > (previousLogTime
+ 1000)) {
2633 LOG
.debug("No master found; retry");
2634 previousLogTime
= System
.currentTimeMillis();
2636 refresh
= true; // let's try pull it from ZK directly
2643 // If we are on the active master, use the shortcut
2644 if (this instanceof HMaster
&& sn
.equals(getServerName())) {
2645 // Wrap the shortcut in a class providing our version to the calls where it's relevant.
2646 // Normally, RpcServer-based threadlocals do that.
2647 intRssStub
= new MasterRpcServicesVersionWrapper(((HMaster
)this).getMasterRpcServices());
2648 intLockStub
= ((HMaster
)this).getMasterRpcServices();
2652 BlockingRpcChannel channel
=
2653 this.rpcClient
.createBlockingRpcChannel(sn
, userProvider
.getCurrent(),
2654 shortOperationTimeout
);
2655 intRssStub
= RegionServerStatusService
.newBlockingStub(channel
);
2656 intLockStub
= LockService
.newBlockingStub(channel
);
2658 } catch (IOException e
) {
2659 if (System
.currentTimeMillis() > (previousLogTime
+ 1000)) {
2660 e
= e
instanceof RemoteException ?
2661 ((RemoteException
)e
).unwrapRemoteException() : e
;
2662 if (e
instanceof ServerNotRunningYetException
) {
2663 LOG
.info("Master isn't available yet, retrying");
2665 LOG
.warn("Unable to connect to master. Retrying. Error was:", e
);
2667 previousLogTime
= System
.currentTimeMillis();
2676 Thread
.currentThread().interrupt();
2679 this.rssStub
= intRssStub
;
2680 this.lockStub
= intLockStub
;
2685 * @return True if we should break loop because cluster is going down or
2686 * this server has been stopped or hdfs has gone bad.
2688 private boolean keepLooping() {
2689 return !this.stopped
&& isClusterUp();
2693 * Let the master know we're here Run initialization using parameters passed
2695 * @return A Map of key/value configurations we got from the Master else
2696 * null if we failed to register.
2697 * @throws IOException
2699 private RegionServerStartupResponse
reportForDuty() throws IOException
{
2700 if (this.masterless
) return RegionServerStartupResponse
.getDefaultInstance();
2701 ServerName masterServerName
= createRegionServerStatusStub(true);
2702 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2703 if (masterServerName
== null || rss
== null) return null;
2704 RegionServerStartupResponse result
= null;
2706 rpcServices
.requestCount
.reset();
2707 rpcServices
.rpcGetRequestCount
.reset();
2708 rpcServices
.rpcScanRequestCount
.reset();
2709 rpcServices
.rpcMultiRequestCount
.reset();
2710 rpcServices
.rpcMutateRequestCount
.reset();
2711 LOG
.info("reportForDuty to master=" + masterServerName
+ " with port="
2712 + rpcServices
.isa
.getPort() + ", startcode=" + this.startcode
);
2713 long now
= EnvironmentEdgeManager
.currentTime();
2714 int port
= rpcServices
.isa
.getPort();
2715 RegionServerStartupRequest
.Builder request
= RegionServerStartupRequest
.newBuilder();
2716 if (!StringUtils
.isBlank(useThisHostnameInstead
)) {
2717 request
.setUseThisHostnameInstead(useThisHostnameInstead
);
2719 request
.setPort(port
);
2720 request
.setServerStartCode(this.startcode
);
2721 request
.setServerCurrentTime(now
);
2722 result
= rss
.regionServerStartup(null, request
.build());
2723 } catch (ServiceException se
) {
2724 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2725 if (ioe
instanceof ClockOutOfSyncException
) {
2726 LOG
.error(HBaseMarkers
.FATAL
, "Master rejected startup because clock is out of sync",
2728 // Re-throw IOE will cause RS to abort
2730 } else if (ioe
instanceof ServerNotRunningYetException
) {
2731 LOG
.debug("Master is not running yet");
2733 LOG
.warn("error telling master we are up", se
);
2741 public RegionStoreSequenceIds
getLastSequenceId(byte[] encodedRegionName
) {
2743 GetLastFlushedSequenceIdRequest req
=
2744 RequestConverter
.buildGetLastFlushedSequenceIdRequest(encodedRegionName
);
2745 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2746 if (rss
== null) { // Try to connect one more time
2747 createRegionServerStatusStub();
2750 // Still no luck, we tried
2751 LOG
.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2752 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2756 GetLastFlushedSequenceIdResponse resp
= rss
.getLastFlushedSequenceId(null, req
);
2757 return RegionStoreSequenceIds
.newBuilder()
2758 .setLastFlushedSequenceId(resp
.getLastFlushedSequenceId())
2759 .addAllStoreSequenceId(resp
.getStoreLastFlushedSequenceIdList()).build();
2760 } catch (ServiceException e
) {
2761 LOG
.warn("Unable to connect to the master to check the last flushed sequence id", e
);
2762 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2768 * Closes all regions. Called on our way out.
2769 * Assumes that its not possible for new regions to be added to onlineRegions
2770 * while this method runs.
2772 protected void closeAllRegions(final boolean abort
) {
2773 closeUserRegions(abort
);
2774 closeMetaTableRegions(abort
);
2778 * Close meta region if we carry it
2779 * @param abort Whether we're running an abort.
2781 void closeMetaTableRegions(final boolean abort
) {
2782 HRegion meta
= null;
2783 this.lock
.writeLock().lock();
2785 for (Map
.Entry
<String
, HRegion
> e
: onlineRegions
.entrySet()) {
2786 RegionInfo hri
= e
.getValue().getRegionInfo();
2787 if (hri
.isMetaRegion()) {
2788 meta
= e
.getValue();
2790 if (meta
!= null) break;
2793 this.lock
.writeLock().unlock();
2795 if (meta
!= null) closeRegionIgnoreErrors(meta
.getRegionInfo(), abort
);
2799 * Schedule closes on all user regions.
2800 * Should be safe calling multiple times because it wont' close regions
2801 * that are already closed or that are closing.
2802 * @param abort Whether we're running an abort.
2804 void closeUserRegions(final boolean abort
) {
2805 this.lock
.writeLock().lock();
2807 for (Map
.Entry
<String
, HRegion
> e
: this.onlineRegions
.entrySet()) {
2808 HRegion r
= e
.getValue();
2809 if (!r
.getRegionInfo().isMetaRegion() && r
.isAvailable()) {
2810 // Don't update zk with this close transition; pass false.
2811 closeRegionIgnoreErrors(r
.getRegionInfo(), abort
);
2815 this.lock
.writeLock().unlock();
2819 /** @return the info server */
2820 public InfoServer
getInfoServer() {
2825 * @return true if a stop has been requested.
2828 public boolean isStopped() {
2829 return this.stopped
;
2833 public boolean isStopping() {
2834 return this.stopping
;
2839 * @return the configuration
2842 public Configuration
getConfiguration() {
2846 /** @return the write lock for the server */
2847 ReentrantReadWriteLock
.WriteLock
getWriteLock() {
2848 return lock
.writeLock();
2851 public int getNumberOfOnlineRegions() {
2852 return this.onlineRegions
.size();
2855 boolean isOnlineRegionsEmpty() {
2856 return this.onlineRegions
.isEmpty();
2860 * For tests, web ui and metrics.
2861 * This method will only work if HRegionServer is in the same JVM as client;
2862 * HRegion cannot be serialized to cross an rpc.
2864 public Collection
<HRegion
> getOnlineRegionsLocalContext() {
2865 Collection
<HRegion
> regions
= this.onlineRegions
.values();
2866 return Collections
.unmodifiableCollection(regions
);
2870 public void addRegion(HRegion region
) {
2871 this.onlineRegions
.put(region
.getRegionInfo().getEncodedName(), region
);
2872 configurationManager
.registerObserver(region
);
2875 private void addRegion(SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
, HRegion region
,
2877 if (!sortedRegions
.containsKey(size
)) {
2878 sortedRegions
.put(size
, new ArrayList
<>());
2880 sortedRegions
.get(size
).add(region
);
2883 * @return A new Map of online regions sorted by region off-heap size with the first entry being
2886 SortedMap
<Long
, Collection
<HRegion
>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2887 // we'll sort the regions in reverse
2888 SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
= new TreeMap
<>(
2889 new Comparator
<Long
>() {
2891 public int compare(Long a
, Long b
) {
2892 return -1 * a
.compareTo(b
);
2895 // Copy over all regions. Regions are sorted by size with biggest first.
2896 for (HRegion region
: this.onlineRegions
.values()) {
2897 addRegion(sortedRegions
, region
, region
.getMemStoreOffHeapSize());
2899 return sortedRegions
;
2903 * @return A new Map of online regions sorted by region heap size with the first entry being the
2906 SortedMap
<Long
, Collection
<HRegion
>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2907 // we'll sort the regions in reverse
2908 SortedMap
<Long
, Collection
<HRegion
>> sortedRegions
= new TreeMap
<>(
2909 new Comparator
<Long
>() {
2911 public int compare(Long a
, Long b
) {
2912 return -1 * a
.compareTo(b
);
2915 // Copy over all regions. Regions are sorted by size with biggest first.
2916 for (HRegion region
: this.onlineRegions
.values()) {
2917 addRegion(sortedRegions
, region
, region
.getMemStoreHeapSize());
2919 return sortedRegions
;
2923 * @return time stamp in millis of when this region server was started
2925 public long getStartcode() {
2926 return this.startcode
;
2929 /** @return reference to FlushRequester */
2931 public FlushRequester
getFlushRequester() {
2932 return this.cacheFlusher
;
2936 public CompactionRequester
getCompactionRequestor() {
2937 return this.compactSplitThread
;
2941 public Leases
getLeases() {
2946 * @return Return the rootDir.
2948 protected Path
getRootDir() {
2953 * @return Return the fs.
2956 public FileSystem
getFileSystem() {
2961 * @return Return the walRootDir.
2963 public Path
getWALRootDir() {
2968 * @return Return the walFs.
2970 public FileSystem
getWALFileSystem() {
2975 public String
toString() {
2976 return getServerName().toString();
2980 * Interval at which threads should run
2982 * @return the interval
2984 public int getThreadWakeFrequency() {
2985 return threadWakeFrequency
;
2989 public ZKWatcher
getZooKeeper() {
2994 public CoordinatedStateManager
getCoordinatedStateManager() {
2999 public ServerName
getServerName() {
3003 public RegionServerCoprocessorHost
getRegionServerCoprocessorHost(){
3008 public ConcurrentMap
<byte[], Boolean
> getRegionsInTransitionInRS() {
3009 return this.regionsInTransitionInRS
;
3013 public ExecutorService
getExecutorService() {
3014 return executorService
;
3018 public ChoreService
getChoreService() {
3019 return choreService
;
3023 public RegionServerRpcQuotaManager
getRegionServerRpcQuotaManager() {
3024 return rsQuotaManager
;
3028 // Main program and support routines
3031 * Load the replication executorService objects, if any
3033 private static void createNewReplicationInstance(Configuration conf
, HRegionServer server
,
3034 FileSystem walFs
, Path walDir
, Path oldWALDir
, WALProvider walProvider
) throws IOException
{
3035 // read in the name of the source replication class from the config file.
3036 String sourceClassname
= conf
.get(HConstants
.REPLICATION_SOURCE_SERVICE_CLASSNAME
,
3037 HConstants
.REPLICATION_SERVICE_CLASSNAME_DEFAULT
);
3039 // read in the name of the sink replication class from the config file.
3040 String sinkClassname
= conf
.get(HConstants
.REPLICATION_SINK_SERVICE_CLASSNAME
,
3041 HConstants
.REPLICATION_SERVICE_CLASSNAME_DEFAULT
);
3043 // If both the sink and the source class names are the same, then instantiate
3045 if (sourceClassname
.equals(sinkClassname
)) {
3046 server
.replicationSourceHandler
= newReplicationInstance(sourceClassname
,
3047 ReplicationSourceService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walProvider
);
3048 server
.replicationSinkHandler
= (ReplicationSinkService
) server
.replicationSourceHandler
;
3050 server
.replicationSourceHandler
= newReplicationInstance(sourceClassname
,
3051 ReplicationSourceService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walProvider
);
3052 server
.replicationSinkHandler
= newReplicationInstance(sinkClassname
,
3053 ReplicationSinkService
.class, conf
, server
, walFs
, walDir
, oldWALDir
, walProvider
);
3057 private static <T
extends ReplicationService
> T
newReplicationInstance(String classname
,
3058 Class
<T
> xface
, Configuration conf
, HRegionServer server
, FileSystem walFs
, Path logDir
,
3059 Path oldLogDir
, WALProvider walProvider
) throws IOException
{
3060 Class
<?
extends T
> clazz
= null;
3062 ClassLoader classLoader
= Thread
.currentThread().getContextClassLoader();
3063 clazz
= Class
.forName(classname
, true, classLoader
).asSubclass(xface
);
3064 } catch (java
.lang
.ClassNotFoundException nfe
) {
3065 throw new IOException("Could not find class for " + classname
);
3067 T service
= ReflectionUtils
.newInstance(clazz
, conf
);
3068 service
.initialize(server
, walFs
, logDir
, oldLogDir
, walProvider
);
3072 public Map
<String
, ReplicationStatus
> getWalGroupsReplicationStatus(){
3073 Map
<String
, ReplicationStatus
> walGroupsReplicationStatus
= new TreeMap
<>();
3074 if(!this.isOnline()){
3075 return walGroupsReplicationStatus
;
3077 List
<ReplicationSourceInterface
> allSources
= new ArrayList
<>();
3078 allSources
.addAll(replicationSourceHandler
.getReplicationManager().getSources());
3079 allSources
.addAll(replicationSourceHandler
.getReplicationManager().getOldSources());
3080 for(ReplicationSourceInterface source
: allSources
){
3081 walGroupsReplicationStatus
.putAll(source
.getWalGroupStatus());
3083 return walGroupsReplicationStatus
;
3087 * Utility for constructing an instance of the passed HRegionServer class.
3089 * @param regionServerClass
3091 * @return HRegionServer instance.
3093 public static HRegionServer
constructRegionServer(
3094 Class
<?
extends HRegionServer
> regionServerClass
,
3095 final Configuration conf2
) {
3097 Constructor
<?
extends HRegionServer
> c
= regionServerClass
3098 .getConstructor(Configuration
.class);
3099 return c
.newInstance(conf2
);
3100 } catch (Exception e
) {
3101 throw new RuntimeException("Failed construction of " + "Regionserver: "
3102 + regionServerClass
.toString(), e
);
3107 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
3109 public static void main(String
[] args
) throws Exception
{
3110 LOG
.info("STARTING executorService " + HRegionServer
.class.getSimpleName());
3111 VersionInfo
.logVersion();
3112 Configuration conf
= HBaseConfiguration
.create();
3113 @SuppressWarnings("unchecked")
3114 Class
<?
extends HRegionServer
> regionServerClass
= (Class
<?
extends HRegionServer
>) conf
3115 .getClass(HConstants
.REGION_SERVER_IMPL
, HRegionServer
.class);
3117 new HRegionServerCommandLine(regionServerClass
).doMain(args
);
3121 * Gets the online regions of the specified table.
3122 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>.
3123 * Only returns <em>online</em> regions. If a region on this table has been
3124 * closed during a disable, etc., it will not be included in the returned list.
3125 * So, the returned list may not necessarily be ALL regions in this table, its
3126 * all the ONLINE regions in the table.
3128 * @return Online regions from <code>tableName</code>
3131 public List
<HRegion
> getRegions(TableName tableName
) {
3132 List
<HRegion
> tableRegions
= new ArrayList
<>();
3133 synchronized (this.onlineRegions
) {
3134 for (HRegion region
: this.onlineRegions
.values()) {
3135 RegionInfo regionInfo
= region
.getRegionInfo();
3136 if(regionInfo
.getTable().equals(tableName
)) {
3137 tableRegions
.add(region
);
3141 return tableRegions
;
3145 public List
<HRegion
> getRegions() {
3146 List
<HRegion
> allRegions
= new ArrayList
<>();
3147 synchronized (this.onlineRegions
) {
3148 // Return a clone copy of the onlineRegions
3149 allRegions
.addAll(onlineRegions
.values());
3155 * Gets the online tables in this RS.
3156 * This method looks at the in-memory onlineRegions.
3157 * @return all the online tables in this RS
3159 public Set
<TableName
> getOnlineTables() {
3160 Set
<TableName
> tables
= new HashSet
<>();
3161 synchronized (this.onlineRegions
) {
3162 for (Region region
: this.onlineRegions
.values()) {
3163 tables
.add(region
.getTableDescriptor().getTableName());
3169 // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
3170 public String
[] getRegionServerCoprocessors() {
3171 TreeSet
<String
> coprocessors
= new TreeSet
<>();
3173 coprocessors
.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3174 } catch (IOException exception
) {
3175 LOG
.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
3177 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
3179 Collection
<HRegion
> regions
= getOnlineRegionsLocalContext();
3180 for (HRegion region
: regions
) {
3181 coprocessors
.addAll(region
.getCoprocessorHost().getCoprocessors());
3183 coprocessors
.addAll(getWAL(region
.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3184 } catch (IOException exception
) {
3185 LOG
.warn("Exception attempting to fetch wal coprocessor information for region " + region
+
3187 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
3190 coprocessors
.addAll(rsHost
.getCoprocessors());
3191 return coprocessors
.toArray(new String
[coprocessors
.size()]);
3195 * Try to close the region, logs a warning on failure but continues.
3196 * @param region Region to close
3198 private void closeRegionIgnoreErrors(RegionInfo region
, final boolean abort
) {
3200 if (!closeRegion(region
.getEncodedName(), abort
, null)) {
3201 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
3202 " - ignoring and continuing");
3204 } catch (IOException e
) {
3205 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
3206 " - ignoring and continuing", e
);
3211 * Close asynchronously a region, can be called from the master or internally by the regionserver
3212 * when stopping. If called from the master, the region will update the znode status.
3215 * If an opening was in progress, this method will cancel it, but will not start a new close. The
3216 * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3220 * If a close was in progress, this new request will be ignored, and an exception thrown.
3223 * @param encodedName Region to close
3224 * @param abort True if we are aborting
3225 * @return True if closed a region.
3226 * @throws NotServingRegionException if the region is not online
3228 protected boolean closeRegion(String encodedName
, final boolean abort
, final ServerName sn
)
3229 throws NotServingRegionException
{
3230 //Check for permissions to close.
3231 HRegion actualRegion
= this.getRegion(encodedName
);
3232 // Can be null if we're calling close on a region that's not online
3233 if ((actualRegion
!= null) && (actualRegion
.getCoprocessorHost() != null)) {
3235 actualRegion
.getCoprocessorHost().preClose(false);
3236 } catch (IOException exp
) {
3237 LOG
.warn("Unable to close region: the coprocessor launched an error ", exp
);
3242 final Boolean previous
= this.regionsInTransitionInRS
.putIfAbsent(Bytes
.toBytes(encodedName
),
3245 if (Boolean
.TRUE
.equals(previous
)) {
3246 LOG
.info("Received CLOSE for the region:" + encodedName
+ " , which we are already " +
3247 "trying to OPEN. Cancelling OPENING.");
3248 if (!regionsInTransitionInRS
.replace(Bytes
.toBytes(encodedName
), previous
, Boolean
.FALSE
)) {
3249 // The replace failed. That should be an exceptional case, but theoretically it can happen.
3250 // We're going to try to do a standard close then.
3251 LOG
.warn("The opening for region " + encodedName
+ " was done before we could cancel it." +
3252 " Doing a standard close now");
3253 return closeRegion(encodedName
, abort
, sn
);
3255 // Let's get the region from the online region list again
3256 actualRegion
= this.getRegion(encodedName
);
3257 if (actualRegion
== null) { // If already online, we still need to close it.
3258 LOG
.info("The opening previously in progress has been cancelled by a CLOSE request.");
3259 // The master deletes the znode when it receives this exception.
3260 throw new NotServingRegionException("The region " + encodedName
+
3261 " was opening but not yet served. Opening is cancelled.");
3263 } else if (Boolean
.FALSE
.equals(previous
)) {
3264 LOG
.info("Received CLOSE for the region: " + encodedName
+
3265 ", which we are already trying to CLOSE, but not completed yet");
3269 if (actualRegion
== null) {
3270 LOG
.debug("Received CLOSE for a region which is not online, and we're not opening.");
3271 this.regionsInTransitionInRS
.remove(Bytes
.toBytes(encodedName
));
3272 // The master deletes the znode when it receives this exception.
3273 throw new NotServingRegionException("The region " + encodedName
+
3274 " is not online, and is not opening.");
3277 CloseRegionHandler crh
;
3278 final RegionInfo hri
= actualRegion
.getRegionInfo();
3279 if (hri
.isMetaRegion()) {
3280 crh
= new CloseMetaHandler(this, this, hri
, abort
);
3282 crh
= new CloseRegionHandler(this, this, hri
, abort
, sn
);
3284 this.executorService
.submit(crh
);
3289 * Close and offline the region for split or merge
3291 * @param regionEncodedName the name of the region(s) to close
3292 * @return true if closed the region successfully.
3293 * @throws IOException
3295 protected boolean closeAndOfflineRegionForSplitOrMerge(final List
<String
> regionEncodedName
)
3296 throws IOException
{
3297 for (int i
= 0; i
< regionEncodedName
.size(); ++i
) {
3298 HRegion regionToClose
= this.getRegion(regionEncodedName
.get(i
));
3299 if (regionToClose
!= null) {
3300 Map
<byte[], List
<HStoreFile
>> hstoreFiles
= null;
3301 Exception exceptionToThrow
= null;
3303 hstoreFiles
= regionToClose
.close(false);
3304 } catch (Exception e
) {
3305 exceptionToThrow
= e
;
3307 if (exceptionToThrow
== null && hstoreFiles
== null) {
3308 // The region was closed by someone else
3310 new IOException("Failed to close region: already closed by another thread");
3312 if (exceptionToThrow
!= null) {
3313 if (exceptionToThrow
instanceof IOException
) {
3314 throw (IOException
) exceptionToThrow
;
3316 throw new IOException(exceptionToThrow
);
3318 // Offline the region
3319 this.removeRegion(regionToClose
, null);
3327 * @return HRegion for the passed binary <code>regionName</code> or null if
3328 * named region is not member of the online regions.
3330 public HRegion
getOnlineRegion(final byte[] regionName
) {
3331 String encodedRegionName
= RegionInfo
.encodeRegionName(regionName
);
3332 return this.onlineRegions
.get(encodedRegionName
);
3335 public InetSocketAddress
[] getRegionBlockLocations(final String encodedRegionName
) {
3336 return this.regionFavoredNodesMap
.get(encodedRegionName
);
3340 public HRegion
getRegion(final String encodedRegionName
) {
3341 return this.onlineRegions
.get(encodedRegionName
);
3346 public boolean removeRegion(final HRegion r
, ServerName destination
) {
3347 HRegion toReturn
= this.onlineRegions
.remove(r
.getRegionInfo().getEncodedName());
3348 if (destination
!= null) {
3349 long closeSeqNum
= r
.getMaxFlushedSeqId();
3350 if (closeSeqNum
== HConstants
.NO_SEQNUM
) {
3351 // No edits in WAL for this region; get the sequence number when the region was opened.
3352 closeSeqNum
= r
.getOpenSeqNum();
3353 if (closeSeqNum
== HConstants
.NO_SEQNUM
) closeSeqNum
= 0;
3355 addToMovedRegions(r
.getRegionInfo().getEncodedName(), destination
, closeSeqNum
);
3357 this.regionFavoredNodesMap
.remove(r
.getRegionInfo().getEncodedName());
3358 return toReturn
!= null;
3362 * Protected Utility method for safely obtaining an HRegion handle.
3365 * Name of online {@link HRegion} to return
3366 * @return {@link HRegion} for <code>regionName</code>
3367 * @throws NotServingRegionException
3369 protected HRegion
getRegion(final byte[] regionName
)
3370 throws NotServingRegionException
{
3371 String encodedRegionName
= RegionInfo
.encodeRegionName(regionName
);
3372 return getRegionByEncodedName(regionName
, encodedRegionName
);
3375 public HRegion
getRegionByEncodedName(String encodedRegionName
)
3376 throws NotServingRegionException
{
3377 return getRegionByEncodedName(null, encodedRegionName
);
3380 protected HRegion
getRegionByEncodedName(byte[] regionName
, String encodedRegionName
)
3381 throws NotServingRegionException
{
3382 HRegion region
= this.onlineRegions
.get(encodedRegionName
);
3383 if (region
== null) {
3384 MovedRegionInfo moveInfo
= getMovedRegion(encodedRegionName
);
3385 if (moveInfo
!= null) {
3386 throw new RegionMovedException(moveInfo
.getServerName(), moveInfo
.getSeqNum());
3388 Boolean isOpening
= this.regionsInTransitionInRS
.get(Bytes
.toBytes(encodedRegionName
));
3389 String regionNameStr
= regionName
== null?
3390 encodedRegionName
: Bytes
.toStringBinary(regionName
);
3391 if (isOpening
!= null && isOpening
.booleanValue()) {
3392 throw new RegionOpeningException("Region " + regionNameStr
+
3393 " is opening on " + this.serverName
);
3395 throw new NotServingRegionException("" + regionNameStr
+
3396 " is not online on " + this.serverName
);
3402 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3403 * IOE if it isn't already.
3405 * @param t Throwable
3407 * @param msg Message to log in error. Can be null.
3409 * @return Throwable converted to an IOE; methods can only let out IOEs.
3411 private Throwable
cleanup(final Throwable t
, final String msg
) {
3412 // Don't log as error if NSRE; NSRE is 'normal' operation.
3413 if (t
instanceof NotServingRegionException
) {
3414 LOG
.debug("NotServingRegionException; " + t
.getMessage());
3417 Throwable e
= t
instanceof RemoteException ?
((RemoteException
) t
).unwrapRemoteException() : t
;
3423 if (!rpcServices
.checkOOME(t
)) {
3432 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3434 * @return Make <code>t</code> an IOE if it isn't already.
3436 protected IOException
convertThrowableToIOE(final Throwable t
, final String msg
) {
3437 return (t
instanceof IOException ?
(IOException
) t
: msg
== null
3438 || msg
.length() == 0 ?
new IOException(t
) : new IOException(msg
, t
));
3442 * Checks to see if the file system is still accessible. If not, sets
3443 * abortRequested and stopRequested
3445 * @return false if file system is not available
3447 public boolean checkFileSystem() {
3448 if (this.fsOk
&& this.fs
!= null) {
3450 FSUtils
.checkFileSystemAvailable(this.fs
);
3451 } catch (IOException e
) {
3452 abort("File System not available", e
);
3460 public void updateRegionFavoredNodesMapping(String encodedRegionName
,
3461 List
<org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.ServerName
> favoredNodes
) {
3462 InetSocketAddress
[] addr
= new InetSocketAddress
[favoredNodes
.size()];
3463 // Refer to the comment on the declaration of regionFavoredNodesMap on why
3464 // it is a map of region name to InetSocketAddress[]
3465 for (int i
= 0; i
< favoredNodes
.size(); i
++) {
3466 addr
[i
] = InetSocketAddress
.createUnresolved(favoredNodes
.get(i
).getHostName(),
3467 favoredNodes
.get(i
).getPort());
3469 regionFavoredNodesMap
.put(encodedRegionName
, addr
);
3473 * Return the favored nodes for a region given its encoded name. Look at the
3474 * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3475 * @param encodedRegionName
3476 * @return array of favored locations
3479 public InetSocketAddress
[] getFavoredNodesForRegion(String encodedRegionName
) {
3480 return regionFavoredNodesMap
.get(encodedRegionName
);
3484 public ServerNonceManager
getNonceManager() {
3485 return this.nonceManager
;
3488 private static class MovedRegionInfo
{
3489 private final ServerName serverName
;
3490 private final long seqNum
;
3491 private final long ts
;
3493 public MovedRegionInfo(ServerName serverName
, long closeSeqNum
) {
3494 this.serverName
= serverName
;
3495 this.seqNum
= closeSeqNum
;
3496 ts
= EnvironmentEdgeManager
.currentTime();
3499 public ServerName
getServerName() {
3503 public long getSeqNum() {
3507 public long getMoveTime() {
3512 // This map will contains all the regions that we closed for a move.
3513 // We add the time it was moved as we don't want to keep too old information
3514 protected Map
<String
, MovedRegionInfo
> movedRegions
= new ConcurrentHashMap
<>(3000);
3516 // We need a timeout. If not there is a risk of giving a wrong information: this would double
3517 // the number of network calls instead of reducing them.
3518 private static final int TIMEOUT_REGION_MOVED
= (2 * 60 * 1000);
3520 protected void addToMovedRegions(String encodedName
, ServerName destination
, long closeSeqNum
) {
3521 if (ServerName
.isSameAddress(destination
, this.getServerName())) {
3522 LOG
.warn("Not adding moved region record: " + encodedName
+ " to self.");
3525 LOG
.info("Adding " + encodedName
+ " move to " + destination
+ " record at close sequenceid=" +
3527 movedRegions
.put(encodedName
, new MovedRegionInfo(destination
, closeSeqNum
));
3530 void removeFromMovedRegions(String encodedName
) {
3531 movedRegions
.remove(encodedName
);
3534 private MovedRegionInfo
getMovedRegion(final String encodedRegionName
) {
3535 MovedRegionInfo dest
= movedRegions
.get(encodedRegionName
);
3537 long now
= EnvironmentEdgeManager
.currentTime();
3539 if (dest
.getMoveTime() > (now
- TIMEOUT_REGION_MOVED
)) {
3542 movedRegions
.remove(encodedRegionName
);
3550 * Remove the expired entries from the moved regions list.
3552 protected void cleanMovedRegions() {
3553 final long cutOff
= System
.currentTimeMillis() - TIMEOUT_REGION_MOVED
;
3554 Iterator
<Entry
<String
, MovedRegionInfo
>> it
= movedRegions
.entrySet().iterator();
3556 while (it
.hasNext()){
3557 Map
.Entry
<String
, MovedRegionInfo
> e
= it
.next();
3558 if (e
.getValue().getMoveTime() < cutOff
) {
3565 * Use this to allow tests to override and schedule more frequently.
3568 protected int movedRegionCleanerPeriod() {
3569 return TIMEOUT_REGION_MOVED
;
3573 * Creates a Chore thread to clean the moved region cache.
3575 protected final static class MovedRegionsCleaner
extends ScheduledChore
implements Stoppable
{
3576 private HRegionServer regionServer
;
3577 Stoppable stoppable
;
3579 private MovedRegionsCleaner(
3580 HRegionServer regionServer
, Stoppable stoppable
){
3581 super("MovedRegionsCleaner for region " + regionServer
, stoppable
,
3582 regionServer
.movedRegionCleanerPeriod());
3583 this.regionServer
= regionServer
;
3584 this.stoppable
= stoppable
;
3587 static MovedRegionsCleaner
create(HRegionServer rs
){
3588 Stoppable stoppable
= new Stoppable() {
3589 private volatile boolean isStopped
= false;
3590 @Override public void stop(String why
) { isStopped
= true;}
3591 @Override public boolean isStopped() {return isStopped
;}
3594 return new MovedRegionsCleaner(rs
, stoppable
);
3598 protected void chore() {
3599 regionServer
.cleanMovedRegions();
3603 public void stop(String why
) {
3604 stoppable
.stop(why
);
3608 public boolean isStopped() {
3609 return stoppable
.isStopped();
3613 private String
getMyEphemeralNodePath() {
3614 return ZNodePaths
.joinZNode(this.zooKeeper
.getZNodePaths().rsZNode
, getServerName().toString());
3617 private boolean isHealthCheckerConfigured() {
3618 String healthScriptLocation
= this.conf
.get(HConstants
.HEALTH_SCRIPT_LOC
);
3619 return org
.apache
.commons
.lang3
.StringUtils
.isNotBlank(healthScriptLocation
);
3623 * @return the underlying {@link CompactSplit} for the servers
3625 public CompactSplit
getCompactSplitThread() {
3626 return this.compactSplitThread
;
3629 public CoprocessorServiceResponse
execRegionServerService(
3630 @SuppressWarnings("UnusedParameters") final RpcController controller
,
3631 final CoprocessorServiceRequest serviceRequest
) throws ServiceException
{
3633 ServerRpcController serviceController
= new ServerRpcController();
3634 CoprocessorServiceCall call
= serviceRequest
.getCall();
3635 String serviceName
= call
.getServiceName();
3636 com
.google
.protobuf
.Service service
= coprocessorServiceHandlers
.get(serviceName
);
3637 if (service
== null) {
3638 throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
3641 com
.google
.protobuf
.Descriptors
.ServiceDescriptor serviceDesc
=
3642 service
.getDescriptorForType();
3644 String methodName
= call
.getMethodName();
3645 com
.google
.protobuf
.Descriptors
.MethodDescriptor methodDesc
=
3646 serviceDesc
.findMethodByName(methodName
);
3647 if (methodDesc
== null) {
3648 throw new UnknownProtocolException(service
.getClass(), "Unknown method " + methodName
+
3649 " called on executorService " + serviceName
);
3652 com
.google
.protobuf
.Message request
=
3653 CoprocessorRpcUtils
.getRequest(service
, methodDesc
, call
.getRequest());
3654 final com
.google
.protobuf
.Message
.Builder responseBuilder
=
3655 service
.getResponsePrototype(methodDesc
).newBuilderForType();
3656 service
.callMethod(methodDesc
, serviceController
, request
,
3657 new com
.google
.protobuf
.RpcCallback
<com
.google
.protobuf
.Message
>() {
3659 public void run(com
.google
.protobuf
.Message message
) {
3660 if (message
!= null) {
3661 responseBuilder
.mergeFrom(message
);
3665 IOException exception
= CoprocessorRpcUtils
.getControllerException(serviceController
);
3666 if (exception
!= null) {
3669 return CoprocessorRpcUtils
.getResponse(responseBuilder
.build(), HConstants
.EMPTY_BYTE_ARRAY
);
3670 } catch (IOException ie
) {
3671 throw new ServiceException(ie
);
3676 * May be null if this is a master which not carry table.
3678 * @return The block cache instance used by the regionserver.
3681 public Optional
<BlockCache
> getBlockCache() {
3682 return Optional
.ofNullable(this.blockCache
);
3686 * May be null if this is a master which not carry table.
3688 * @return The cache for mob files used by the regionserver.
3691 public Optional
<MobFileCache
> getMobFileCache() {
3692 return Optional
.ofNullable(this.mobFileCache
);
3696 public AccessChecker
getAccessChecker() {
3697 return rpcServices
.getAccessChecker();
3701 public ZKPermissionWatcher
getZKPermissionWatcher() {
3702 return rpcServices
.getZkPermissionWatcher();
3706 * @return : Returns the ConfigurationManager object for testing purposes.
3708 protected ConfigurationManager
getConfigurationManager() {
3709 return configurationManager
;
3713 * @return Return table descriptors implementation.
3716 public TableDescriptors
getTableDescriptors() {
3717 return this.tableDescriptors
;
3721 * Reload the configuration from disk.
3723 public void updateConfiguration() {
3724 LOG
.info("Reloading the configuration from disk.");
3725 // Reload the configuration from disk.
3726 conf
.reloadConfiguration();
3727 configurationManager
.notifyAllObservers(conf
);
3730 public CacheEvictionStats
clearRegionBlockCache(Region region
) {
3731 long evictedBlocks
= 0;
3733 for(Store store
: region
.getStores()) {
3734 for(StoreFile hFile
: store
.getStorefiles()) {
3735 evictedBlocks
+= blockCache
.evictBlocksByHfileName(hFile
.getPath().getName());
3739 return CacheEvictionStats
.builder()
3740 .withEvictedBlocks(evictedBlocks
)
3745 public double getCompactionPressure() {
3747 for (Region region
: onlineRegions
.values()) {
3748 for (Store store
: region
.getStores()) {
3749 double normCount
= store
.getCompactionPressure();
3750 if (normCount
> max
) {
3759 public HeapMemoryManager
getHeapMemoryManager() {
3765 * @return whether all wal roll request finished for this regionserver
3768 public boolean walRollRequestFinished() {
3769 return this.walRoller
.walRollFinished();
3773 public ThroughputController
getFlushThroughputController() {
3774 return flushThroughputController
;
3778 public double getFlushPressure() {
3779 if (getRegionServerAccounting() == null || cacheFlusher
== null) {
3780 // return 0 during RS initialization
3783 return getRegionServerAccounting().getFlushPressure();
3787 public void onConfigurationChange(Configuration newConf
) {
3788 ThroughputController old
= this.flushThroughputController
;
3790 old
.stop("configuration change");
3792 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, newConf
);
3794 Superusers
.initialize(newConf
);
3795 } catch (IOException e
) {
3796 LOG
.warn("Failed to initialize SuperUsers on reloading of the configuration");
3801 public MetricsRegionServer
getMetrics() {
3802 return metricsRegionServer
;
3806 public SecureBulkLoadManager
getSecureBulkLoadManager() {
3807 return this.secureBulkLoadManager
;
3811 public EntityLock
regionLock(List
<RegionInfo
> regionInfos
, String description
, Abortable abort
)
3812 throws IOException
{
3813 return new LockServiceClient(conf
, lockStub
, asyncClusterConnection
.getNonceGenerator())
3814 .regionLock(regionInfos
, description
, abort
);
3818 public void unassign(byte[] regionName
) throws IOException
{
3819 clusterConnection
.getAdmin().unassign(regionName
, false);
3823 public RegionServerSpaceQuotaManager
getRegionServerSpaceQuotaManager() {
3824 return this.rsSpaceQuotaManager
;
3828 public boolean reportFileArchivalForQuotas(TableName tableName
,
3829 Collection
<Entry
<String
, Long
>> archivedFiles
) {
3830 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
3831 if (rss
== null || rsSpaceQuotaManager
== null) {
3832 // the current server could be stopping.
3833 LOG
.trace("Skipping file archival reporting to HMaster as stub is null");
3837 RegionServerStatusProtos
.FileArchiveNotificationRequest request
=
3838 rsSpaceQuotaManager
.buildFileArchiveRequest(tableName
, archivedFiles
);
3839 rss
.reportFileArchival(null, request
);
3840 } catch (ServiceException se
) {
3841 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
3842 if (ioe
instanceof PleaseHoldException
) {
3843 if (LOG
.isTraceEnabled()) {
3844 LOG
.trace("Failed to report file archival(s) to Master because it is initializing."
3845 + " This will be retried.", ioe
);
3847 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3850 if (rssStub
== rss
) {
3853 // re-create the stub if we failed to report the archival
3854 createRegionServerStatusStub(true);
3855 LOG
.debug("Failed to report file archival(s) to Master. This will be retried.", ioe
);
3861 public NettyEventLoopGroupConfig
getEventLoopGroupConfig() {
3862 return eventLoopGroupConfig
;
3866 public Connection
createConnection(Configuration conf
) throws IOException
{
3867 User user
= UserProvider
.instantiate(conf
).getCurrent();
3868 return ConnectionUtils
.createShortCircuitConnection(conf
, null, user
, this.serverName
,
3869 this.rpcServices
, this.rpcServices
);
3872 public void executeProcedure(long procId
, RSProcedureCallable callable
) {
3873 executorService
.submit(new RSProcedureHandler(this, procId
, callable
));
3876 public void remoteProcedureComplete(long procId
, Throwable error
) {
3877 procedureResultReporter
.complete(procId
, error
);
3880 void reportProcedureDone(ReportProcedureDoneRequest request
) throws IOException
{
3881 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
3887 createRegionServerStatusStub();
3890 rss
.reportProcedureDone(null, request
);
3891 } catch (ServiceException se
) {
3892 if (rssStub
== rss
) {
3895 throw ProtobufUtil
.getRemoteException(se
);
3900 * Will ignore the open/close region procedures which already submitted or executed.
3902 * When master had unfinished open/close region procedure and restarted, new active master may
3903 * send duplicate open/close region request to regionserver. The open/close request is submitted
3904 * to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3906 * After the open/close region request executed and report region transition succeed, cache it in
3907 * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3908 * transition succeed, master will not send the open/close region request to regionserver again.
3909 * And we thought that the ongoing duplicate open/close region request should not be delayed more
3910 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
3912 * See HBASE-22404 for more details.
3914 * @param procId the id of the open/close region procedure
3915 * @return true if the procedure can be submitted.
3917 boolean submitRegionProcedure(long procId
) {
3921 // Ignore the region procedures which already submitted.
3922 Long previous
= submittedRegionProcedures
.putIfAbsent(procId
, procId
);
3923 if (previous
!= null) {
3924 LOG
.warn("Received procedure pid={}, which already submitted, just ignore it", procId
);
3927 // Ignore the region procedures which already executed.
3928 if (executedRegionProcedures
.getIfPresent(procId
) != null) {
3929 LOG
.warn("Received procedure pid={}, which already executed, just ignore it", procId
);
3936 * See {@link #submitRegionProcedure(long)}.
3937 * @param procId the id of the open/close region procedure
3939 public void finishRegionProcedure(long procId
) {
3940 executedRegionProcedures
.put(procId
, procId
);
3941 submittedRegionProcedures
.remove(procId
);
3944 public boolean isShutDown() {
3949 * Force to terminate region server when abort timeout.
3951 private static class SystemExitWhenAbortTimeout
extends TimerTask
{
3953 public SystemExitWhenAbortTimeout() {
3958 LOG
.warn("Aborting region server timed out, terminating forcibly" +
3959 " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3960 " Thread dump to stdout.");
3961 Threads
.printThreadInfo(System
.out
, "Zombie HRegionServer");
3962 Runtime
.getRuntime().halt(1);
3967 public AsyncClusterConnection
getAsyncClusterConnection() {
3968 return asyncClusterConnection
;