3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.regionserver
;
21 import java
.io
.IOException
;
22 import java
.io
.InterruptedIOException
;
23 import java
.lang
.Thread
.UncaughtExceptionHandler
;
24 import java
.lang
.management
.MemoryType
;
25 import java
.lang
.management
.MemoryUsage
;
26 import java
.lang
.reflect
.Constructor
;
27 import java
.net
.BindException
;
28 import java
.net
.InetAddress
;
29 import java
.net
.InetSocketAddress
;
30 import java
.util
.ArrayList
;
31 import java
.util
.Collection
;
32 import java
.util
.Collections
;
33 import java
.util
.Comparator
;
34 import java
.util
.HashMap
;
35 import java
.util
.HashSet
;
36 import java
.util
.Iterator
;
37 import java
.util
.List
;
39 import java
.util
.Map
.Entry
;
41 import java
.util
.SortedMap
;
42 import java
.util
.TreeMap
;
43 import java
.util
.TreeSet
;
44 import java
.util
.concurrent
.ConcurrentHashMap
;
45 import java
.util
.concurrent
.ConcurrentMap
;
46 import java
.util
.concurrent
.ConcurrentSkipListMap
;
47 import java
.util
.concurrent
.CountDownLatch
;
48 import java
.util
.concurrent
.TimeUnit
;
49 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
50 import java
.util
.concurrent
.locks
.ReentrantReadWriteLock
;
52 import javax
.management
.MalformedObjectNameException
;
53 import javax
.management
.ObjectName
;
54 import javax
.servlet
.http
.HttpServlet
;
56 import org
.apache
.commons
.lang
.SystemUtils
;
57 import org
.apache
.commons
.lang
.math
.RandomUtils
;
58 import org
.apache
.commons
.logging
.Log
;
59 import org
.apache
.commons
.logging
.LogFactory
;
60 import org
.apache
.hadoop
.conf
.Configuration
;
61 import org
.apache
.hadoop
.fs
.FileSystem
;
62 import org
.apache
.hadoop
.fs
.Path
;
63 import org
.apache
.hadoop
.hbase
.Abortable
;
64 import org
.apache
.hadoop
.hbase
.ChoreService
;
65 import org
.apache
.hadoop
.hbase
.ClockOutOfSyncException
;
66 import org
.apache
.hadoop
.hbase
.CoordinatedStateManager
;
67 import org
.apache
.hadoop
.hbase
.CoordinatedStateManagerFactory
;
68 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
69 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
70 import org
.apache
.hadoop
.hbase
.HConstants
;
71 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
72 import org
.apache
.hadoop
.hbase
.HealthCheckChore
;
73 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
;
74 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
75 import org
.apache
.hadoop
.hbase
.ScheduledChore
;
76 import org
.apache
.hadoop
.hbase
.ServerName
;
77 import org
.apache
.hadoop
.hbase
.Stoppable
;
78 import org
.apache
.hadoop
.hbase
.TableDescriptors
;
79 import org
.apache
.hadoop
.hbase
.TableName
;
80 import org
.apache
.hadoop
.hbase
.YouAreDeadException
;
81 import org
.apache
.hadoop
.hbase
.ZNodeClearer
;
82 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
83 import org
.apache
.hadoop
.hbase
.client
.ClusterConnection
;
84 import org
.apache
.hadoop
.hbase
.client
.Connection
;
85 import org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
;
86 import org
.apache
.hadoop
.hbase
.client
.NonceGenerator
;
87 import org
.apache
.hadoop
.hbase
.client
.Put
;
88 import org
.apache
.hadoop
.hbase
.client
.RpcRetryingCallerFactory
;
89 import org
.apache
.hadoop
.hbase
.client
.locking
.EntityLock
;
90 import org
.apache
.hadoop
.hbase
.client
.locking
.LockServiceClient
;
91 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationManager
;
92 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationObserver
;
93 import org
.apache
.hadoop
.hbase
.coordination
.BaseCoordinatedStateManager
;
94 import org
.apache
.hadoop
.hbase
.coordination
.SplitLogWorkerCoordination
;
95 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
96 import org
.apache
.hadoop
.hbase
.exceptions
.RegionMovedException
;
97 import org
.apache
.hadoop
.hbase
.exceptions
.RegionOpeningException
;
98 import org
.apache
.hadoop
.hbase
.exceptions
.UnknownProtocolException
;
99 import org
.apache
.hadoop
.hbase
.executor
.ExecutorService
;
100 import org
.apache
.hadoop
.hbase
.executor
.ExecutorType
;
101 import org
.apache
.hadoop
.hbase
.fs
.HFileSystem
;
102 import org
.apache
.hadoop
.hbase
.http
.InfoServer
;
103 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
104 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
105 import org
.apache
.hadoop
.hbase
.io
.util
.MemorySizeUtil
;
106 import org
.apache
.hadoop
.hbase
.ipc
.CoprocessorRpcUtils
;
107 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
108 import org
.apache
.hadoop
.hbase
.ipc
.RpcClientFactory
;
109 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
110 import org
.apache
.hadoop
.hbase
.ipc
.RpcServerInterface
;
111 import org
.apache
.hadoop
.hbase
.ipc
.ServerNotRunningYetException
;
112 import org
.apache
.hadoop
.hbase
.ipc
.ServerRpcController
;
113 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
114 import org
.apache
.hadoop
.hbase
.master
.RegionState
.State
;
115 import org
.apache
.hadoop
.hbase
.master
.balancer
.BaseLoadBalancer
;
116 import org
.apache
.hadoop
.hbase
.mob
.MobCacheConfig
;
117 import org
.apache
.hadoop
.hbase
.procedure
.RegionServerProcedureManagerHost
;
118 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerQuotaManager
;
119 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionConfiguration
;
120 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionProgress
;
121 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseMetaHandler
;
122 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.CloseRegionHandler
;
123 import org
.apache
.hadoop
.hbase
.regionserver
.handler
.RegionReplicaFlushHandler
;
124 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.FlushThroughputControllerFactory
;
125 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.ThroughputController
;
126 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.MetricsWAL
;
127 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALActionsListener
;
128 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.Replication
;
129 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationLoad
;
130 import org
.apache
.hadoop
.hbase
.security
.Superusers
;
131 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
132 import org
.apache
.hadoop
.hbase
.shaded
.com
.google
.protobuf
.BlockingRpcChannel
;
133 import org
.apache
.hadoop
.hbase
.shaded
.com
.google
.protobuf
.RpcController
;
134 import org
.apache
.hadoop
.hbase
.shaded
.com
.google
.protobuf
.ServiceException
;
135 import org
.apache
.hadoop
.hbase
.shaded
.com
.google
.protobuf
.UnsafeByteOperations
;
136 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
137 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
138 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceCall
;
139 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceRequest
;
140 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CoprocessorServiceResponse
;
141 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
;
142 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionLoad
;
143 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
.RegionStoreSequenceIds
;
144 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
;
145 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.Coprocessor
.Builder
;
146 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.NameStringPair
;
147 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionServerInfo
;
148 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
;
149 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.RegionSpecifier
.RegionSpecifierType
;
150 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.LockServiceProtos
.LockService
;
151 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.GetProcedureResultRequest
;
152 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.GetProcedureResultResponse
;
153 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdRequest
;
154 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.GetLastFlushedSequenceIdResponse
;
155 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerReportRequest
;
156 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupRequest
;
157 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStartupResponse
;
158 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionServerStatusService
;
159 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
;
160 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.RegionStateTransition
.TransitionCode
;
161 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRSFatalErrorRequest
;
162 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionRequest
;
163 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.ReportRegionStateTransitionResponse
;
164 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.SplitTableRegionRequest
;
165 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
.SplitTableRegionResponse
;
166 import org
.apache
.hadoop
.hbase
.trace
.SpanReceiverHost
;
167 import org
.apache
.hadoop
.hbase
.util
.Addressing
;
168 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
169 import org
.apache
.hadoop
.hbase
.util
.CompressionTest
;
170 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
171 import org
.apache
.hadoop
.hbase
.util
.FSTableDescriptors
;
172 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
173 import org
.apache
.hadoop
.hbase
.util
.ForeignExceptionUtil
;
174 import org
.apache
.hadoop
.hbase
.util
.HasThread
;
175 import org
.apache
.hadoop
.hbase
.util
.JSONBean
;
176 import org
.apache
.hadoop
.hbase
.util
.JvmPauseMonitor
;
177 import org
.apache
.hadoop
.hbase
.util
.Pair
;
178 import org
.apache
.hadoop
.hbase
.util
.ServerRegionReplicaUtil
;
179 import org
.apache
.hadoop
.hbase
.util
.Sleeper
;
180 import org
.apache
.hadoop
.hbase
.util
.Threads
;
181 import org
.apache
.hadoop
.hbase
.util
.VersionInfo
;
182 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
183 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
184 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
185 import org
.apache
.hadoop
.hbase
.zookeeper
.ClusterStatusTracker
;
186 import org
.apache
.hadoop
.hbase
.zookeeper
.MasterAddressTracker
;
187 import org
.apache
.hadoop
.hbase
.zookeeper
.MetaTableLocator
;
188 import org
.apache
.hadoop
.hbase
.zookeeper
.RecoveringRegionWatcher
;
189 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKClusterId
;
190 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKSplitLog
;
191 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
192 import org
.apache
.hadoop
.hbase
.zookeeper
.ZooKeeperNodeTracker
;
193 import org
.apache
.hadoop
.hbase
.zookeeper
.ZooKeeperWatcher
;
194 import org
.apache
.hadoop
.ipc
.RemoteException
;
195 import org
.apache
.hadoop
.metrics2
.util
.MBeans
;
196 import org
.apache
.hadoop
.util
.ReflectionUtils
;
197 import org
.apache
.hadoop
.util
.StringUtils
;
198 import org
.apache
.zookeeper
.KeeperException
;
199 import org
.apache
.zookeeper
.KeeperException
.NoNodeException
;
200 import org
.apache
.zookeeper
.data
.Stat
;
202 import com
.google
.common
.annotations
.VisibleForTesting
;
203 import com
.google
.common
.base
.Preconditions
;
204 import com
.google
.common
.collect
.Maps
;
206 import sun
.misc
.Signal
;
207 import sun
.misc
.SignalHandler
;
210 * HRegionServer makes a set of HRegions available to clients. It checks in with
211 * the HMaster. There are many HRegionServers in a single HBase deployment.
213 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
214 @SuppressWarnings({ "deprecation", "restriction" })
215 public class HRegionServer
extends HasThread
implements
216 RegionServerServices
, LastSequenceId
, ConfigurationObserver
{
218 public static final String REGION_LOCK_AWAIT_TIME_SEC
=
219 "hbase.regionserver.region.lock.await.time.sec";
220 public static final int DEFAULT_REGION_LOCK_AWAIT_TIME_SEC
= 300; // 5 min
221 private static final Log LOG
= LogFactory
.getLog(HRegionServer
.class);
224 * For testing only! Set to true to skip notifying region assignment to master .
226 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="MS_SHOULD_BE_FINAL")
227 public static boolean TEST_SKIP_REPORTING_TRANSITION
= false;
230 * Strings to be used in forming the exception message for
231 * RegionsAlreadyInTransitionException.
233 protected static final String OPEN
= "OPEN";
234 protected static final String CLOSE
= "CLOSE";
236 //RegionName vs current action in progress
237 //true - if open region action in progress
238 //false - if close region action in progress
239 protected final ConcurrentMap
<byte[], Boolean
> regionsInTransitionInRS
=
240 new ConcurrentSkipListMap
<>(Bytes
.BYTES_COMPARATOR
);
243 protected MemStoreFlusher cacheFlusher
;
245 protected HeapMemoryManager hMemManager
;
246 protected CountDownLatch initLatch
= null;
249 * Cluster connection to be shared by services.
250 * Initialized at server startup and closed when server shuts down.
251 * Clients must never close it explicitly.
253 protected ClusterConnection clusterConnection
;
256 * Long-living meta table locator, which is created when the server is started and stopped
257 * when server shuts down. References to this locator shall be used to perform according
258 * operations in EventHandlers. Primary reason for this decision is to make it mockable
261 protected MetaTableLocator metaTableLocator
;
263 // Watch if a region is out of recovering state from ZooKeeper
264 @SuppressWarnings("unused")
265 private RecoveringRegionWatcher recoveringRegionWatcher
;
268 * Go here to get table descriptors.
270 protected TableDescriptors tableDescriptors
;
272 // Replication services. If no replication, this handler will be null.
273 protected ReplicationSourceService replicationSourceHandler
;
274 protected ReplicationSinkService replicationSinkHandler
;
277 public CompactSplitThread compactSplitThread
;
280 * Map of regions currently being served by this region server. Key is the
281 * encoded region name. All access should be synchronized.
283 protected final Map
<String
, Region
> onlineRegions
= new ConcurrentHashMap
<>();
286 * Map of encoded region names to the DataNode locations they should be hosted on
287 * We store the value as InetSocketAddress since this is used only in HDFS
288 * API (create() that takes favored nodes as hints for placing file blocks).
289 * We could have used ServerName here as the value class, but we'd need to
290 * convert it to InetSocketAddress at some point before the HDFS API call, and
291 * it seems a bit weird to store ServerName since ServerName refers to RegionServers
292 * and here we really mean DataNode locations.
294 protected final Map
<String
, InetSocketAddress
[]> regionFavoredNodesMap
=
295 new ConcurrentHashMap
<>();
298 * Set of regions currently being in recovering state which means it can accept writes(edits from
299 * previous failed region server) but not reads. A recovering region is also an online region.
301 protected final Map
<String
, Region
> recoveringRegions
= Collections
302 .synchronizedMap(new HashMap
<String
, Region
>());
305 protected Leases leases
;
307 // Instance of the hbase executor service.
308 protected ExecutorService service
;
310 // If false, the file system has become unavailable
311 protected volatile boolean fsOk
;
312 protected HFileSystem fs
;
313 protected HFileSystem walFs
;
315 // Set when a report to the master comes back with a message asking us to
316 // shutdown. Also set by call to stop when debugging or running unit tests
317 // of HRegionServer in isolation.
318 private volatile boolean stopped
= false;
320 // Go down hard. Used if file system becomes unavailable and also in
321 // debugging and unit tests.
322 private volatile boolean abortRequested
;
324 ConcurrentMap
<String
, Integer
> rowlocks
= new ConcurrentHashMap
<>();
326 // A state before we go into stopped state. At this stage we're closing user
328 private boolean stopping
= false;
330 volatile boolean killed
= false;
332 protected final Configuration conf
;
334 private Path rootDir
;
335 private Path walRootDir
;
337 protected final ReentrantReadWriteLock lock
= new ReentrantReadWriteLock();
339 final int numRetries
;
340 protected final int threadWakeFrequency
;
341 protected final int msgInterval
;
343 protected final int numRegionsToReport
;
345 // Stub to do region server status calls against the master.
346 private volatile RegionServerStatusService
.BlockingInterface rssStub
;
347 private volatile LockService
.BlockingInterface lockStub
;
348 // RPC client. Used to make the stub above that does region server status checking.
351 private RpcRetryingCallerFactory rpcRetryingCallerFactory
;
352 private RpcControllerFactory rpcControllerFactory
;
354 private UncaughtExceptionHandler uncaughtExceptionHandler
;
356 // Info server. Default access so can be used by unit tests. REGIONSERVER
357 // is name of the webapp and the attribute name used stuffing this instance
359 protected InfoServer infoServer
;
360 private JvmPauseMonitor pauseMonitor
;
362 /** region server process name */
363 public static final String REGIONSERVER
= "regionserver";
365 MetricsRegionServer metricsRegionServer
;
366 MetricsTable metricsTable
;
367 private SpanReceiverHost spanReceiverHost
;
370 * ChoreService used to schedule tasks that we want to run periodically
372 private final ChoreService choreService
;
375 * Check for compactions requests.
377 ScheduledChore compactionChecker
;
382 ScheduledChore periodicFlusher
;
384 protected volatile WALFactory walFactory
;
386 // WAL roller. log is protected rather than private to avoid
387 // eclipse warning when accessed by inner classes
388 final LogRoller walRoller
;
390 // flag set after we're done setting up server threads
391 final AtomicBoolean online
= new AtomicBoolean(false);
393 // zookeeper connection and watcher
394 protected ZooKeeperWatcher zooKeeper
;
396 // master address tracker
397 private MasterAddressTracker masterAddressTracker
;
399 // Cluster Status Tracker
400 protected ClusterStatusTracker clusterStatusTracker
;
402 // Log Splitting Worker
403 private SplitLogWorker splitLogWorker
;
405 // A sleeper that sleeps for msgInterval.
406 protected final Sleeper sleeper
;
408 private final int operationTimeout
;
409 private final int shortOperationTimeout
;
411 private final RegionServerAccounting regionServerAccounting
;
413 // Cache configuration and block cache reference
414 protected CacheConfig cacheConfig
;
415 // Cache configuration for mob
416 final MobCacheConfig mobCacheConfig
;
418 /** The health check chore. */
419 private HealthCheckChore healthCheckChore
;
421 /** The nonce manager chore. */
422 private ScheduledChore nonceManagerChore
;
424 private Map
<String
, com
.google
.protobuf
.Service
> coprocessorServiceHandlers
= Maps
.newHashMap();
427 * The server name the Master sees us as. Its made from the hostname the
428 * master passes us, port, and server startcode. Gets set after registration
431 protected ServerName serverName
;
434 * hostname specified by hostname config
436 protected String useThisHostnameInstead
;
438 // key to the config parameter of server hostname
439 // the specification of server hostname is optional. The hostname should be resolvable from
440 // both master and region server
441 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
442 final static String RS_HOSTNAME_KEY
= "hbase.regionserver.hostname";
443 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
444 protected final static String MASTER_HOSTNAME_KEY
= "hbase.master.hostname";
447 * This servers startcode.
449 protected final long startcode
;
452 * Unique identifier for the cluster we are a part of.
454 private String clusterId
;
457 * MX Bean for RegionServerInfo
459 private ObjectName mxBean
= null;
462 * Chore to clean periodically the moved region list
464 private MovedRegionsCleaner movedRegionsCleaner
;
466 // chore for refreshing store files for secondary regions
467 private StorefileRefresherChore storefileRefresher
;
469 private RegionServerCoprocessorHost rsHost
;
471 private RegionServerProcedureManagerHost rspmHost
;
473 private RegionServerQuotaManager rsQuotaManager
;
476 * Nonce manager. Nonces are used to make operations like increment and append idempotent
477 * in the case where client doesn't receive the response from a successful operation and
478 * retries. We track the successful ops for some time via a nonce sent by client and handle
479 * duplicate operations (currently, by failing them; in future we might use MVCC to return
480 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
482 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
483 * of past records. If we don't read the records, we don't read and recover the nonces.
484 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
485 * - There's no WAL recovery during normal region move, so nonces will not be transfered.
486 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
487 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
488 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
489 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
490 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
491 * latest nonce in it expired. It can also be recovered during move.
493 final ServerNonceManager nonceManager
;
495 private UserProvider userProvider
;
497 protected final RSRpcServices rpcServices
;
499 protected BaseCoordinatedStateManager csm
;
502 * Configuration manager is used to register/deregister and notify the configuration observers
503 * when the regionserver is notified that there was a change in the on disk configs.
505 protected final ConfigurationManager configurationManager
;
507 private CompactedHFilesDischarger compactedFileDischarger
;
509 private volatile ThroughputController flushThroughputController
;
511 protected SecureBulkLoadManager secureBulkLoadManager
;
514 * Starts a HRegionServer at the default location.
516 public HRegionServer(Configuration conf
) throws IOException
, InterruptedException
{
517 this(conf
, CoordinatedStateManagerFactory
.getCoordinatedStateManager(conf
));
521 * Starts a HRegionServer at the default location
522 * @param csm implementation of CoordinatedStateManager to be used
524 public HRegionServer(Configuration conf
, CoordinatedStateManager csm
) throws IOException
{
525 super("RegionServer"); // thread name
528 MemorySizeUtil
.checkForClusterFreeHeapMemoryLimit(this.conf
);
529 HFile
.checkHFileVersion(this.conf
);
530 checkCodecs(this.conf
);
531 this.userProvider
= UserProvider
.instantiate(conf
);
532 FSUtils
.setupShortCircuitRead(this.conf
);
534 Replication
.decorateRegionServerConfiguration(this.conf
);
536 // Disable usage of meta replicas in the regionserver
537 this.conf
.setBoolean(HConstants
.USE_META_REPLICAS
, false);
540 this.numRetries
= this.conf
.getInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
541 HConstants
.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER
);
542 this.threadWakeFrequency
= conf
.getInt(HConstants
.THREAD_WAKE_FREQUENCY
, 10 * 1000);
543 this.msgInterval
= conf
.getInt("hbase.regionserver.msginterval", 3 * 1000);
545 this.sleeper
= new Sleeper(this.msgInterval
, this);
547 boolean isNoncesEnabled
= conf
.getBoolean(HConstants
.HBASE_RS_NONCES_ENABLED
, true);
548 this.nonceManager
= isNoncesEnabled ?
new ServerNonceManager(this.conf
) : null;
550 this.numRegionsToReport
= conf
.getInt(
551 "hbase.regionserver.numregionstoreport", 10);
553 this.operationTimeout
= conf
.getInt(
554 HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
,
555 HConstants
.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
);
557 this.shortOperationTimeout
= conf
.getInt(
558 HConstants
.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY
,
559 HConstants
.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT
);
561 this.abortRequested
= false;
562 this.stopped
= false;
564 rpcServices
= createRpcServices();
565 this.startcode
= System
.currentTimeMillis();
566 if (this instanceof HMaster
) {
567 useThisHostnameInstead
= conf
.get(MASTER_HOSTNAME_KEY
);
569 useThisHostnameInstead
= conf
.get(RS_HOSTNAME_KEY
);
571 String hostName
= shouldUseThisHostnameInstead() ? useThisHostnameInstead
:
572 rpcServices
.isa
.getHostName();
573 serverName
= ServerName
.valueOf(hostName
, rpcServices
.isa
.getPort(), startcode
);
575 rpcControllerFactory
= RpcControllerFactory
.instantiate(this.conf
);
576 rpcRetryingCallerFactory
= RpcRetryingCallerFactory
.instantiate(this.conf
);
578 // login the zookeeper client principal (if using security)
579 ZKUtil
.loginClient(this.conf
, HConstants
.ZK_CLIENT_KEYTAB_FILE
,
580 HConstants
.ZK_CLIENT_KERBEROS_PRINCIPAL
, hostName
);
581 // login the server principal (if using secure Hadoop)
582 login(userProvider
, hostName
);
583 // init superusers and add the server principal (if using security)
584 // or process owner as default super user.
585 Superusers
.initialize(conf
);
587 regionServerAccounting
= new RegionServerAccounting(conf
);
588 cacheConfig
= new CacheConfig(conf
);
589 mobCacheConfig
= new MobCacheConfig(conf
);
590 uncaughtExceptionHandler
= new UncaughtExceptionHandler() {
592 public void uncaughtException(Thread t
, Throwable e
) {
593 abort("Uncaught exception in service thread " + t
.getName(), e
);
597 initializeFileSystem();
599 service
= new ExecutorService(getServerName().toShortString());
600 spanReceiverHost
= SpanReceiverHost
.getInstance(getConfiguration());
602 // Some unit tests don't need a cluster, so no zookeeper at all
603 if (!conf
.getBoolean("hbase.testing.nocluster", false)) {
604 // Open connection to zookeeper and set primary watcher
605 zooKeeper
= new ZooKeeperWatcher(conf
, getProcessName() + ":" +
606 rpcServices
.isa
.getPort(), this, canCreateBaseZNode());
608 this.csm
= (BaseCoordinatedStateManager
) csm
;
609 this.csm
.initialize(this);
612 masterAddressTracker
= new MasterAddressTracker(getZooKeeper(), this);
613 masterAddressTracker
.start();
615 clusterStatusTracker
= new ClusterStatusTracker(zooKeeper
, this);
616 clusterStatusTracker
.start();
618 this.configurationManager
= new ConfigurationManager();
622 this.walRoller
= new LogRoller(this, this);
623 this.choreService
= new ChoreService(getServerName().toString(), true);
624 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, conf
);
626 if (!SystemUtils
.IS_OS_WINDOWS
) {
627 Signal
.handle(new Signal("HUP"), new SignalHandler() {
629 public void handle(Signal signal
) {
630 getConfiguration().reloadConfiguration();
631 configurationManager
.notifyAllObservers(getConfiguration());
635 // Create the CompactedFileDischarger chore service. This chore helps to
636 // remove the compacted files
637 // that will no longer be used in reads.
638 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
639 // 2 mins so that compacted files can be archived before the TTLCleaner runs
640 int cleanerInterval
=
641 conf
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
642 this.compactedFileDischarger
=
643 new CompactedHFilesDischarger(cleanerInterval
, (Stoppable
)this, (RegionServerServices
)this);
644 choreService
.scheduleChore(compactedFileDischarger
);
647 private void initializeFileSystem() throws IOException
{
648 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
649 // checksum verification enabled, then automatically switch off hdfs checksum verification.
650 boolean useHBaseChecksum
= conf
.getBoolean(HConstants
.HBASE_CHECKSUM_VERIFICATION
, true);
651 FSUtils
.setFsDefault(this.conf
, FSUtils
.getWALRootDir(this.conf
));
652 this.walFs
= new HFileSystem(this.conf
, useHBaseChecksum
);
653 this.walRootDir
= FSUtils
.getWALRootDir(this.conf
);
654 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
655 // underlying hadoop hdfs accessors will be going against wrong filesystem
656 // (unless all is set to defaults).
657 FSUtils
.setFsDefault(this.conf
, FSUtils
.getRootDir(this.conf
));
658 this.fs
= new HFileSystem(this.conf
, useHBaseChecksum
);
659 this.rootDir
= FSUtils
.getRootDir(this.conf
);
660 this.tableDescriptors
= getFsTableDescriptors();
663 protected TableDescriptors
getFsTableDescriptors() throws IOException
{
664 return new FSTableDescriptors(this.conf
,
665 this.fs
, this.rootDir
, !canUpdateTableDescriptor(), false);
668 protected void setInitLatch(CountDownLatch latch
) {
669 this.initLatch
= latch
;
673 * Returns true if configured hostname should be used
675 protected boolean shouldUseThisHostnameInstead() {
676 return useThisHostnameInstead
!= null && !useThisHostnameInstead
.isEmpty();
679 protected void login(UserProvider user
, String host
) throws IOException
{
680 user
.login("hbase.regionserver.keytab.file",
681 "hbase.regionserver.kerberos.principal", host
);
684 protected void waitForMasterActive(){
687 protected String
getProcessName() {
691 protected boolean canCreateBaseZNode() {
695 protected boolean canUpdateTableDescriptor() {
699 protected RSRpcServices
createRpcServices() throws IOException
{
700 return new RSRpcServices(this);
703 protected void configureInfoServer() {
704 infoServer
.addServlet("rs-status", "/rs-status", RSStatusServlet
.class);
705 infoServer
.setAttribute(REGIONSERVER
, this);
708 protected Class
<?
extends HttpServlet
> getDumpServlet() {
709 return RSDumpServlet
.class;
713 public boolean registerService(com
.google
.protobuf
.Service instance
) {
715 * No stacking of instances is allowed for a single service name
717 com
.google
.protobuf
.Descriptors
.ServiceDescriptor serviceDesc
=
718 instance
.getDescriptorForType();
719 String serviceName
= CoprocessorRpcUtils
.getServiceName(serviceDesc
);
720 if (coprocessorServiceHandlers
.containsKey(serviceName
)) {
721 LOG
.error("Coprocessor service " + serviceName
722 + " already registered, rejecting request from " + instance
);
726 coprocessorServiceHandlers
.put(serviceName
, instance
);
727 if (LOG
.isDebugEnabled()) {
728 LOG
.debug("Registered regionserver coprocessor service: service=" + serviceName
);
734 * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to
735 * the local server. Safe to use going to local or remote server.
736 * Create this instance in a method can be intercepted and mocked in tests.
737 * @throws IOException
740 protected ClusterConnection
createClusterConnection() throws IOException
{
741 // Create a cluster connection that when appropriate, can short-circuit and go directly to the
742 // local server if the request is to the local server bypassing RPC. Can be used for both local
743 // and remote invocations.
744 return ConnectionUtils
.createShortCircuitConnection(conf
, null, userProvider
.getCurrent(),
745 serverName
, rpcServices
, rpcServices
);
749 * Run test on configured codecs to make sure supporting libs are in place.
751 * @throws IOException
753 private static void checkCodecs(final Configuration c
) throws IOException
{
754 // check to see if the codec list is available:
755 String
[] codecs
= c
.getStrings("hbase.regionserver.codecs", (String
[])null);
756 if (codecs
== null) return;
757 for (String codec
: codecs
) {
758 if (!CompressionTest
.testCompression(codec
)) {
759 throw new IOException("Compression codec " + codec
+
760 " not supported, aborting RS construction");
765 public String
getClusterId() {
766 return this.clusterId
;
770 * Setup our cluster connection if not already initialized.
771 * @throws IOException
773 protected synchronized void setupClusterConnection() throws IOException
{
774 if (clusterConnection
== null) {
775 clusterConnection
= createClusterConnection();
776 metaTableLocator
= new MetaTableLocator();
781 * All initialization needed before we go register with Master.
783 * @throws IOException
784 * @throws InterruptedException
786 private void preRegistrationInitialization(){
788 setupClusterConnection();
790 this.secureBulkLoadManager
= new SecureBulkLoadManager(this.conf
, clusterConnection
);
791 this.secureBulkLoadManager
.start();
793 // Health checker thread.
794 if (isHealthCheckerConfigured()) {
795 int sleepTime
= this.conf
.getInt(HConstants
.HEALTH_CHORE_WAKE_FREQ
,
796 HConstants
.DEFAULT_THREAD_WAKE_FREQUENCY
);
797 healthCheckChore
= new HealthCheckChore(sleepTime
, this, getConfiguration());
800 initializeZooKeeper();
801 if (!isStopped() && !isAborted()) {
804 } catch (Throwable t
) {
805 // Call stop if error or process will stick around for ever since server
806 // puts up non-daemon threads.
807 this.rpcServices
.stop();
808 abort("Initialization of RS failed. Hence aborting RS.", t
);
813 * Bring up connection to zk ensemble and then wait until a master for this
814 * cluster and then after that, wait until cluster 'up' flag has been set.
815 * This is the order in which master does things.
816 * Finally open long-living server short-circuit connection.
817 * @throws IOException
818 * @throws InterruptedException
820 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
821 justification
="cluster Id znode read would give us correct response")
822 private void initializeZooKeeper() throws IOException
, InterruptedException
{
823 // Create the master address tracker, register with zk, and start it. Then
824 // block until a master is available. No point in starting up if no master
826 blockAndCheckIfStopped(this.masterAddressTracker
);
828 // Wait on cluster being up. Master will set this flag up in zookeeper
830 blockAndCheckIfStopped(this.clusterStatusTracker
);
832 doLatch(this.initLatch
);
834 // Retrieve clusterId
835 // Since cluster status is now up
836 // ID should have already been set by HMaster
838 clusterId
= ZKClusterId
.readClusterIdZNode(this.zooKeeper
);
839 if (clusterId
== null) {
840 this.abort("Cluster ID has not been set");
842 LOG
.info("ClusterId : "+clusterId
);
843 } catch (KeeperException e
) {
844 this.abort("Failed to retrieve Cluster ID",e
);
847 // In case colocated master, wait here till it's active.
848 // So backup masters won't start as regionservers.
849 // This is to avoid showing backup masters as regionservers
850 // in master web UI, or assigning any region to them.
851 waitForMasterActive();
852 if (isStopped() || isAborted()) {
853 return; // No need for further initialization
856 // watch for snapshots and other procedures
858 rspmHost
= new RegionServerProcedureManagerHost();
859 rspmHost
.loadProcedures(conf
);
860 rspmHost
.initialize(this);
861 } catch (KeeperException e
) {
862 this.abort("Failed to reach zk cluster when creating procedure handler.", e
);
864 // register watcher for recovering regions
865 this.recoveringRegionWatcher
= new RecoveringRegionWatcher(this.zooKeeper
, this);
868 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
="RV_RETURN_VALUE_IGNORED",
869 justification
="We don't care about the return")
870 private void doLatch(final CountDownLatch latch
) throws InterruptedException
{
872 // Result is ignored intentionally but if I remove the below, findbugs complains (the
873 // above justification on this method doesn't seem to suppress it).
874 boolean result
= latch
.await(20, TimeUnit
.SECONDS
);
879 * Utilty method to wait indefinitely on a znode availability while checking
880 * if the region server is shut down
881 * @param tracker znode tracker to use
882 * @throws IOException any IO exception, plus if the RS is stopped
883 * @throws InterruptedException
885 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker
)
886 throws IOException
, InterruptedException
{
887 while (tracker
.blockUntilAvailable(this.msgInterval
, false) == null) {
889 throw new IOException("Received the shutdown message while waiting.");
895 * @return False if cluster shutdown in progress
897 private boolean isClusterUp() {
898 return clusterStatusTracker
!= null && clusterStatusTracker
.isClusterUp();
901 private void initializeThreads() throws IOException
{
902 // Cache flushing thread.
903 this.cacheFlusher
= new MemStoreFlusher(conf
, this);
906 this.compactSplitThread
= new CompactSplitThread(this);
908 // Background thread to check for compactions; needed if region has not gotten updates
909 // in a while. It will take care of not checking too frequently on store-by-store basis.
910 this.compactionChecker
= new CompactionChecker(this, this.threadWakeFrequency
, this);
911 this.periodicFlusher
= new PeriodicMemstoreFlusher(this.threadWakeFrequency
, this);
912 this.leases
= new Leases(this.threadWakeFrequency
);
914 // Create the thread to clean the moved regions list
915 movedRegionsCleaner
= MovedRegionsCleaner
.create(this);
917 if (this.nonceManager
!= null) {
918 // Create the scheduled chore that cleans up nonces.
919 nonceManagerChore
= this.nonceManager
.createCleanupScheduledChore(this);
922 // Setup the Quota Manager
923 rsQuotaManager
= new RegionServerQuotaManager(this);
925 // Setup RPC client for master communication
926 rpcClient
= RpcClientFactory
.createClient(conf
, clusterId
, new InetSocketAddress(
927 rpcServices
.isa
.getAddress(), 0), clusterConnection
.getConnectionMetrics());
929 boolean onlyMetaRefresh
= false;
930 int storefileRefreshPeriod
= conf
.getInt(
931 StorefileRefresherChore
.REGIONSERVER_STOREFILE_REFRESH_PERIOD
932 , StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
933 if (storefileRefreshPeriod
== 0) {
934 storefileRefreshPeriod
= conf
.getInt(
935 StorefileRefresherChore
.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD
,
936 StorefileRefresherChore
.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD
);
937 onlyMetaRefresh
= true;
939 if (storefileRefreshPeriod
> 0) {
940 this.storefileRefresher
= new StorefileRefresherChore(storefileRefreshPeriod
,
941 onlyMetaRefresh
, this, this);
943 registerConfigurationObservers();
946 private void registerConfigurationObservers() {
947 // Registering the compactSplitThread object with the ConfigurationManager.
948 configurationManager
.registerObserver(this.compactSplitThread
);
949 configurationManager
.registerObserver(this.rpcServices
);
950 configurationManager
.registerObserver(this);
954 * The HRegionServer sticks in this loop until closed.
959 // Do pre-registration initializations; zookeeper, lease threads, etc.
960 preRegistrationInitialization();
961 } catch (Throwable e
) {
962 abort("Fatal exception during initialization", e
);
966 if (!isStopped() && !isAborted()) {
967 ShutdownHook
.install(conf
, fs
, this, Thread
.currentThread());
968 // Initialize the RegionServerCoprocessorHost now that our ephemeral
969 // node was created, in case any coprocessors want to use ZooKeeper
970 this.rsHost
= new RegionServerCoprocessorHost(this, this.conf
);
973 // Try and register with the Master; tell it we are here. Break if
974 // server is stopped or the clusterup flag is down or hdfs went wacky.
975 while (keepLooping()) {
976 RegionServerStartupResponse w
= reportForDuty();
978 LOG
.warn("reportForDuty failed; sleeping and then retrying.");
979 this.sleeper
.sleep();
981 handleReportForDutyResponse(w
);
986 if (!isStopped() && isHealthy()){
987 // start the snapshot handler and other procedure handlers,
988 // since the server is ready to run
991 // Start the Quota Manager
992 rsQuotaManager
.start(getRpcServer().getScheduler());
995 // We registered with the Master. Go into run mode.
996 long lastMsg
= System
.currentTimeMillis();
997 long oldRequestCount
= -1;
998 // The main run loop.
999 while (!isStopped() && isHealthy()) {
1000 if (!isClusterUp()) {
1001 if (isOnlineRegionsEmpty()) {
1002 stop("Exiting; cluster shutdown set and not carrying any regions");
1003 } else if (!this.stopping
) {
1004 this.stopping
= true;
1005 LOG
.info("Closing user regions");
1006 closeUserRegions(this.abortRequested
);
1007 } else if (this.stopping
) {
1008 boolean allUserRegionsOffline
= areAllUserRegionsOffline();
1009 if (allUserRegionsOffline
) {
1010 // Set stopped if no more write requests tp meta tables
1011 // since last time we went around the loop. Any open
1012 // meta regions will be closed on our way out.
1013 if (oldRequestCount
== getWriteRequestCount()) {
1014 stop("Stopped; only catalog regions remaining online");
1017 oldRequestCount
= getWriteRequestCount();
1019 // Make sure all regions have been closed -- some regions may
1020 // have not got it because we were splitting at the time of
1021 // the call to closeUserRegions.
1022 closeUserRegions(this.abortRequested
);
1024 LOG
.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1027 long now
= System
.currentTimeMillis();
1028 if ((now
- lastMsg
) >= msgInterval
) {
1029 tryRegionServerReport(lastMsg
, now
);
1030 lastMsg
= System
.currentTimeMillis();
1032 if (!isStopped() && !isAborted()) {
1033 this.sleeper
.sleep();
1036 } catch (Throwable t
) {
1037 if (!rpcServices
.checkOOME(t
)) {
1038 String prefix
= t
instanceof YouAreDeadException?
"": "Unhandled: ";
1039 abort(prefix
+ t
.getMessage(), t
);
1043 if (mxBean
!= null) {
1044 MBeans
.unregister(mxBean
);
1047 if (this.leases
!= null) this.leases
.closeAfterLeasesExpire();
1048 if (this.splitLogWorker
!= null) {
1049 splitLogWorker
.stop();
1051 if (this.infoServer
!= null) {
1052 LOG
.info("Stopping infoServer");
1054 this.infoServer
.stop();
1055 } catch (Exception e
) {
1056 LOG
.error("Failed to stop infoServer", e
);
1059 // Send cache a shutdown.
1060 if (cacheConfig
!= null && cacheConfig
.isBlockCacheEnabled()) {
1061 cacheConfig
.getBlockCache().shutdown();
1063 mobCacheConfig
.getMobFileCache().shutdown();
1065 if (movedRegionsCleaner
!= null) {
1066 movedRegionsCleaner
.stop("Region Server stopping");
1069 // Send interrupts to wake up threads if sleeping so they notice shutdown.
1070 // TODO: Should we check they are alive? If OOME could have exited already
1071 if (this.hMemManager
!= null) this.hMemManager
.stop();
1072 if (this.cacheFlusher
!= null) this.cacheFlusher
.interruptIfNecessary();
1073 if (this.compactSplitThread
!= null) this.compactSplitThread
.interruptIfNecessary();
1074 if (this.compactionChecker
!= null) this.compactionChecker
.cancel(true);
1075 if (this.healthCheckChore
!= null) this.healthCheckChore
.cancel(true);
1076 if (this.nonceManagerChore
!= null) this.nonceManagerChore
.cancel(true);
1077 if (this.storefileRefresher
!= null) this.storefileRefresher
.cancel(true);
1078 sendShutdownInterrupt();
1080 // Stop the quota manager
1081 if (rsQuotaManager
!= null) {
1082 rsQuotaManager
.stop();
1085 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1086 if (rspmHost
!= null) {
1087 rspmHost
.stop(this.abortRequested
|| this.killed
);
1091 // Just skip out w/o closing regions. Used when testing.
1092 } else if (abortRequested
) {
1094 closeUserRegions(abortRequested
); // Don't leave any open file handles
1096 LOG
.info("aborting server " + this.serverName
);
1098 closeUserRegions(abortRequested
);
1099 LOG
.info("stopping server " + this.serverName
);
1102 // so callers waiting for meta without timeout can stop
1103 if (this.metaTableLocator
!= null) this.metaTableLocator
.stop();
1104 if (this.clusterConnection
!= null && !clusterConnection
.isClosed()) {
1106 this.clusterConnection
.close();
1107 } catch (IOException e
) {
1108 // Although the {@link Closeable} interface throws an {@link
1109 // IOException}, in reality, the implementation would never do that.
1110 LOG
.warn("Attempt to close server's short circuit ClusterConnection failed.", e
);
1114 // Closing the compactSplit thread before closing meta regions
1115 if (!this.killed
&& containsMetaTableRegions()) {
1116 if (!abortRequested
|| this.fsOk
) {
1117 if (this.compactSplitThread
!= null) {
1118 this.compactSplitThread
.join();
1119 this.compactSplitThread
= null;
1121 closeMetaTableRegions(abortRequested
);
1125 if (!this.killed
&& this.fsOk
) {
1126 waitOnAllRegionsToClose(abortRequested
);
1127 LOG
.info("stopping server " + this.serverName
+
1128 "; all regions closed.");
1131 //fsOk flag may be changed when closing regions throws exception.
1133 shutdownWAL(!abortRequested
);
1136 // Make sure the proxy is down.
1137 if (this.rssStub
!= null) {
1138 this.rssStub
= null;
1140 if (this.lockStub
!= null) {
1141 this.lockStub
= null;
1143 if (this.rpcClient
!= null) {
1144 this.rpcClient
.close();
1146 if (this.leases
!= null) {
1147 this.leases
.close();
1149 if (this.pauseMonitor
!= null) {
1150 this.pauseMonitor
.stop();
1154 stopServiceThreads();
1157 if (this.rpcServices
!= null) {
1158 this.rpcServices
.stop();
1162 deleteMyEphemeralNode();
1163 } catch (KeeperException
.NoNodeException nn
) {
1164 } catch (KeeperException e
) {
1165 LOG
.warn("Failed deleting my ephemeral node", e
);
1167 // We may have failed to delete the znode at the previous step, but
1168 // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1169 ZNodeClearer
.deleteMyEphemeralNodeOnDisk();
1171 if (this.zooKeeper
!= null) {
1172 this.zooKeeper
.close();
1174 LOG
.info("stopping server " + this.serverName
+
1175 "; zookeeper connection closed.");
1177 LOG
.info(Thread
.currentThread().getName() + " exiting");
1180 private boolean containsMetaTableRegions() {
1181 return onlineRegions
.containsKey(HRegionInfo
.FIRST_META_REGIONINFO
.getEncodedName());
1184 private boolean areAllUserRegionsOffline() {
1185 if (getNumberOfOnlineRegions() > 2) return false;
1186 boolean allUserRegionsOffline
= true;
1187 for (Map
.Entry
<String
, Region
> e
: this.onlineRegions
.entrySet()) {
1188 if (!e
.getValue().getRegionInfo().isMetaTable()) {
1189 allUserRegionsOffline
= false;
1193 return allUserRegionsOffline
;
1197 * @return Current write count for all online regions.
1199 private long getWriteRequestCount() {
1200 long writeCount
= 0;
1201 for (Map
.Entry
<String
, Region
> e
: this.onlineRegions
.entrySet()) {
1202 writeCount
+= e
.getValue().getWriteRequestsCount();
1208 protected void tryRegionServerReport(long reportStartTime
, long reportEndTime
)
1209 throws IOException
{
1210 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
1212 // the current server could be stopping.
1215 ClusterStatusProtos
.ServerLoad sl
= buildServerLoad(reportStartTime
, reportEndTime
);
1217 RegionServerReportRequest
.Builder request
= RegionServerReportRequest
.newBuilder();
1218 ServerName sn
= ServerName
.parseVersionedServerName(
1219 this.serverName
.getVersionedBytes());
1220 request
.setServer(ProtobufUtil
.toServerName(sn
));
1221 request
.setLoad(sl
);
1222 rss
.regionServerReport(null, request
.build());
1223 } catch (ServiceException se
) {
1224 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
1225 if (ioe
instanceof YouAreDeadException
) {
1226 // This will be caught and handled as a fatal error in run()
1229 if (rssStub
== rss
) {
1232 // Couldn't connect to the master, get location from zk and reconnect
1233 // Method blocks until new master is found or we are stopped
1234 createRegionServerStatusStub(true);
1238 ClusterStatusProtos
.ServerLoad
buildServerLoad(long reportStartTime
, long reportEndTime
)
1239 throws IOException
{
1240 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1241 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1242 // the wrapper to compute those numbers in one place.
1243 // In the long term most of these should be moved off of ServerLoad and the heart beat.
1244 // Instead they should be stored in an HBase table so that external visibility into HBase is
1245 // improved; Additionally the load balancer will be able to take advantage of a more complete
1247 MetricsRegionServerWrapper regionServerWrapper
= metricsRegionServer
.getRegionServerWrapper();
1248 Collection
<Region
> regions
= getOnlineRegionsLocalContext();
1249 long usedMemory
= -1L;
1250 long maxMemory
= -1L;
1251 final MemoryUsage usage
= MemorySizeUtil
.safeGetHeapMemoryUsage();
1252 if (usage
!= null) {
1253 usedMemory
= usage
.getUsed();
1254 maxMemory
= usage
.getMax();
1257 ClusterStatusProtos
.ServerLoad
.Builder serverLoad
=
1258 ClusterStatusProtos
.ServerLoad
.newBuilder();
1259 serverLoad
.setNumberOfRequests((int) regionServerWrapper
.getRequestsPerSecond());
1260 serverLoad
.setTotalNumberOfRequests((int) regionServerWrapper
.getTotalRequestCount());
1261 serverLoad
.setUsedHeapMB((int)(usedMemory
/ 1024 / 1024));
1262 serverLoad
.setMaxHeapMB((int) (maxMemory
/ 1024 / 1024));
1263 Set
<String
> coprocessors
= getWAL(null).getCoprocessorHost().getCoprocessors();
1264 Builder coprocessorBuilder
= Coprocessor
.newBuilder();
1265 for (String coprocessor
: coprocessors
) {
1266 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1268 RegionLoad
.Builder regionLoadBldr
= RegionLoad
.newBuilder();
1269 RegionSpecifier
.Builder regionSpecifier
= RegionSpecifier
.newBuilder();
1270 for (Region region
: regions
) {
1271 if (region
.getCoprocessorHost() != null) {
1272 Set
<String
> regionCoprocessors
= region
.getCoprocessorHost().getCoprocessors();
1273 Iterator
<String
> iterator
= regionCoprocessors
.iterator();
1274 while (iterator
.hasNext()) {
1275 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(iterator
.next()).build());
1278 serverLoad
.addRegionLoads(createRegionLoad(region
, regionLoadBldr
, regionSpecifier
));
1279 for (String coprocessor
: getWAL(region
.getRegionInfo()).getCoprocessorHost()
1280 .getCoprocessors()) {
1281 serverLoad
.addCoprocessors(coprocessorBuilder
.setName(coprocessor
).build());
1284 serverLoad
.setReportStartTime(reportStartTime
);
1285 serverLoad
.setReportEndTime(reportEndTime
);
1286 if (this.infoServer
!= null) {
1287 serverLoad
.setInfoServerPort(this.infoServer
.getPort());
1289 serverLoad
.setInfoServerPort(-1);
1292 // for the replicationLoad purpose. Only need to get from one service
1293 // either source or sink will get the same info
1294 ReplicationSourceService rsources
= getReplicationSourceService();
1296 if (rsources
!= null) {
1297 // always refresh first to get the latest value
1298 ReplicationLoad rLoad
= rsources
.refreshAndGetReplicationLoad();
1299 if (rLoad
!= null) {
1300 serverLoad
.setReplLoadSink(rLoad
.getReplicationLoadSink());
1301 for (ClusterStatusProtos
.ReplicationLoadSource rLS
: rLoad
.getReplicationLoadSourceList()) {
1302 serverLoad
.addReplLoadSource(rLS
);
1307 return serverLoad
.build();
1310 String
getOnlineRegionsAsPrintableString() {
1311 StringBuilder sb
= new StringBuilder();
1312 for (Region r
: this.onlineRegions
.values()) {
1313 if (sb
.length() > 0) sb
.append(", ");
1314 sb
.append(r
.getRegionInfo().getEncodedName());
1316 return sb
.toString();
1320 * Wait on regions close.
1322 private void waitOnAllRegionsToClose(final boolean abort
) {
1323 // Wait till all regions are closed before going out.
1325 long previousLogTime
= 0;
1326 Set
<String
> closedRegions
= new HashSet
<>();
1327 boolean interrupted
= false;
1329 while (!isOnlineRegionsEmpty()) {
1330 int count
= getNumberOfOnlineRegions();
1331 // Only print a message if the count of regions has changed.
1332 if (count
!= lastCount
) {
1333 // Log every second at most
1334 if (System
.currentTimeMillis() > (previousLogTime
+ 1000)) {
1335 previousLogTime
= System
.currentTimeMillis();
1337 LOG
.info("Waiting on " + count
+ " regions to close");
1338 // Only print out regions still closing if a small number else will
1340 if (count
< 10 && LOG
.isDebugEnabled()) {
1341 LOG
.debug(this.onlineRegions
);
1345 // Ensure all user regions have been sent a close. Use this to
1346 // protect against the case where an open comes in after we start the
1347 // iterator of onlineRegions to close all user regions.
1348 for (Map
.Entry
<String
, Region
> e
: this.onlineRegions
.entrySet()) {
1349 HRegionInfo hri
= e
.getValue().getRegionInfo();
1350 if (!this.regionsInTransitionInRS
.containsKey(hri
.getEncodedNameAsBytes())
1351 && !closedRegions
.contains(hri
.getEncodedName())) {
1352 closedRegions
.add(hri
.getEncodedName());
1353 // Don't update zk with this close transition; pass false.
1354 closeRegionIgnoreErrors(hri
, abort
);
1357 // No regions in RIT, we could stop waiting now.
1358 if (this.regionsInTransitionInRS
.isEmpty()) {
1359 if (!isOnlineRegionsEmpty()) {
1360 LOG
.info("We were exiting though online regions are not empty," +
1361 " because some regions failed closing");
1371 Thread
.currentThread().interrupt();
1376 private boolean sleep(long millis
) {
1377 boolean interrupted
= false;
1379 Thread
.sleep(millis
);
1380 } catch (InterruptedException e
) {
1381 LOG
.warn("Interrupted while sleeping");
1387 private void shutdownWAL(final boolean close
) {
1388 if (this.walFactory
!= null) {
1393 walFactory
.shutdown();
1395 } catch (Throwable e
) {
1396 e
= e
instanceof RemoteException ?
((RemoteException
) e
).unwrapRemoteException() : e
;
1397 LOG
.error("Shutdown / close of WAL failed: " + e
);
1398 LOG
.debug("Shutdown / close exception details:", e
);
1404 * Run init. Sets up wal and starts up all server threads.
1406 * @param c Extra configuration.
1408 protected void handleReportForDutyResponse(final RegionServerStartupResponse c
)
1409 throws IOException
{
1411 boolean updateRootDir
= false;
1412 for (NameStringPair e
: c
.getMapEntriesList()) {
1413 String key
= e
.getName();
1414 // The hostname the master sees us as.
1415 if (key
.equals(HConstants
.KEY_FOR_HOSTNAME_SEEN_BY_MASTER
)) {
1416 String hostnameFromMasterPOV
= e
.getValue();
1417 this.serverName
= ServerName
.valueOf(hostnameFromMasterPOV
,
1418 rpcServices
.isa
.getPort(), this.startcode
);
1419 if (shouldUseThisHostnameInstead() &&
1420 !hostnameFromMasterPOV
.equals(useThisHostnameInstead
)) {
1421 String msg
= "Master passed us a different hostname to use; was=" +
1422 this.useThisHostnameInstead
+ ", but now=" + hostnameFromMasterPOV
;
1424 throw new IOException(msg
);
1426 if (!shouldUseThisHostnameInstead() &&
1427 !hostnameFromMasterPOV
.equals(rpcServices
.isa
.getHostName())) {
1428 String msg
= "Master passed us a different hostname to use; was=" +
1429 rpcServices
.isa
.getHostName() + ", but now=" + hostnameFromMasterPOV
;
1435 String value
= e
.getValue();
1436 if (key
.equals(HConstants
.HBASE_DIR
)) {
1437 if (value
!= null && !value
.equals(conf
.get(HConstants
.HBASE_DIR
))) {
1438 updateRootDir
= true;
1442 if (LOG
.isDebugEnabled()) {
1443 LOG
.debug("Config from master: " + key
+ "=" + value
);
1445 this.conf
.set(key
, value
);
1447 // Set our ephemeral znode up in zookeeper now we have a name.
1448 createMyEphemeralNode();
1450 if (updateRootDir
) {
1451 // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1452 initializeFileSystem();
1455 // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1456 // config param for task trackers, but we can piggyback off of it.
1457 if (this.conf
.get("mapreduce.task.attempt.id") == null) {
1458 this.conf
.set("mapreduce.task.attempt.id", "hb_rs_" +
1459 this.serverName
.toString());
1462 // Save it in a file, this will allow to see if we crash
1463 ZNodeClearer
.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1465 this.walFactory
= setupWALAndReplication();
1466 // Init in here rather than in constructor after thread name has been set
1467 this.metricsRegionServer
= new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1468 this.metricsTable
= new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1469 // Now that we have a metrics source, start the pause monitor
1470 this.pauseMonitor
= new JvmPauseMonitor(conf
, getMetrics().getMetricsSource());
1471 pauseMonitor
.start();
1473 startServiceThreads();
1474 startHeapMemoryManager();
1475 // Call it after starting HeapMemoryManager.
1476 initializeMemStoreChunkPool();
1477 LOG
.info("Serving as " + this.serverName
+
1478 ", RpcServer on " + rpcServices
.isa
+
1480 Long
.toHexString(this.zooKeeper
.getRecoverableZooKeeper().getSessionId()));
1482 // Wake up anyone waiting for this server to online
1483 synchronized (online
) {
1487 } catch (Throwable e
) {
1488 stop("Failed initialization");
1489 throw convertThrowableToIOE(cleanup(e
, "Failed init"),
1490 "Region server startup failed");
1492 sleeper
.skipSleepCycle();
1496 private void initializeMemStoreChunkPool() {
1497 if (MemStoreLAB
.isEnabled(conf
)) {
1498 // MSLAB is enabled. So initialize MemStoreChunkPool
1499 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1501 Pair
<Long
, MemoryType
> pair
= MemorySizeUtil
.getGlobalMemstoreSize(conf
);
1502 long globalMemStoreSize
= pair
.getFirst();
1503 boolean offheap
= this.regionServerAccounting
.isOffheap();
1504 // When off heap memstore in use, take full area for chunk pool.
1505 float poolSizePercentage
= offheap ?
1.0F
1506 : conf
.getFloat(MemStoreLAB
.CHUNK_POOL_MAXSIZE_KEY
, MemStoreLAB
.POOL_MAX_SIZE_DEFAULT
);
1507 float initialCountPercentage
= conf
.getFloat(MemStoreLAB
.CHUNK_POOL_INITIALSIZE_KEY
,
1508 MemStoreLAB
.POOL_INITIAL_SIZE_DEFAULT
);
1509 int chunkSize
= conf
.getInt(MemStoreLAB
.CHUNK_SIZE_KEY
, MemStoreLAB
.CHUNK_SIZE_DEFAULT
);
1510 MemStoreChunkPool pool
= MemStoreChunkPool
.initialize(globalMemStoreSize
, poolSizePercentage
,
1511 initialCountPercentage
, chunkSize
, offheap
);
1512 if (pool
!= null && this.hMemManager
!= null) {
1513 // Register with Heap Memory manager
1514 this.hMemManager
.registerTuneObserver(pool
);
1519 private void startHeapMemoryManager() {
1520 this.hMemManager
= HeapMemoryManager
.create(this.conf
, this.cacheFlusher
, this,
1521 this.regionServerAccounting
);
1522 if (this.hMemManager
!= null) {
1523 this.hMemManager
.start(getChoreService());
1527 private void createMyEphemeralNode() throws KeeperException
, IOException
{
1528 RegionServerInfo
.Builder rsInfo
= RegionServerInfo
.newBuilder();
1529 rsInfo
.setInfoPort(infoServer
!= null ? infoServer
.getPort() : -1);
1530 rsInfo
.setVersionInfo(ProtobufUtil
.getVersionInfo());
1531 byte[] data
= ProtobufUtil
.prependPBMagic(rsInfo
.build().toByteArray());
1532 ZKUtil
.createEphemeralNodeAndWatch(this.zooKeeper
,
1533 getMyEphemeralNodePath(), data
);
1536 private void deleteMyEphemeralNode() throws KeeperException
{
1537 ZKUtil
.deleteNode(this.zooKeeper
, getMyEphemeralNodePath());
1541 public RegionServerAccounting
getRegionServerAccounting() {
1542 return regionServerAccounting
;
1546 * @param r Region to get RegionLoad for.
1547 * @param regionLoadBldr the RegionLoad.Builder, can be null
1548 * @param regionSpecifier the RegionSpecifier.Builder, can be null
1549 * @return RegionLoad instance.
1551 * @throws IOException
1553 RegionLoad
createRegionLoad(final Region r
, RegionLoad
.Builder regionLoadBldr
,
1554 RegionSpecifier
.Builder regionSpecifier
) throws IOException
{
1555 byte[] name
= r
.getRegionInfo().getRegionName();
1558 int storeUncompressedSizeMB
= 0;
1559 int storefileSizeMB
= 0;
1560 int memstoreSizeMB
= (int) (r
.getMemstoreSize() / 1024 / 1024);
1561 int storefileIndexSizeMB
= 0;
1562 int rootIndexSizeKB
= 0;
1563 int totalStaticIndexSizeKB
= 0;
1564 int totalStaticBloomSizeKB
= 0;
1565 long totalCompactingKVs
= 0;
1566 long currentCompactedKVs
= 0;
1567 List
<Store
> storeList
= r
.getStores();
1568 stores
+= storeList
.size();
1569 for (Store store
: storeList
) {
1570 storefiles
+= store
.getStorefilesCount();
1571 storeUncompressedSizeMB
+= (int) (store
.getStoreSizeUncompressed() / 1024 / 1024);
1572 storefileSizeMB
+= (int) (store
.getStorefilesSize() / 1024 / 1024);
1573 storefileIndexSizeMB
+= (int) (store
.getStorefilesIndexSize() / 1024 / 1024);
1574 CompactionProgress progress
= store
.getCompactionProgress();
1575 if (progress
!= null) {
1576 totalCompactingKVs
+= progress
.totalCompactingKVs
;
1577 currentCompactedKVs
+= progress
.currentCompactedKVs
;
1579 rootIndexSizeKB
+= (int) (store
.getStorefilesIndexSize() / 1024);
1580 totalStaticIndexSizeKB
+= (int) (store
.getTotalStaticIndexSize() / 1024);
1581 totalStaticBloomSizeKB
+= (int) (store
.getTotalStaticBloomSize() / 1024);
1584 float dataLocality
=
1585 r
.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName
.getHostname());
1586 if (regionLoadBldr
== null) {
1587 regionLoadBldr
= RegionLoad
.newBuilder();
1589 if (regionSpecifier
== null) {
1590 regionSpecifier
= RegionSpecifier
.newBuilder();
1592 regionSpecifier
.setType(RegionSpecifierType
.REGION_NAME
);
1593 regionSpecifier
.setValue(UnsafeByteOperations
.unsafeWrap(name
));
1594 regionLoadBldr
.setRegionSpecifier(regionSpecifier
.build())
1596 .setStorefiles(storefiles
)
1597 .setStoreUncompressedSizeMB(storeUncompressedSizeMB
)
1598 .setStorefileSizeMB(storefileSizeMB
)
1599 .setMemstoreSizeMB(memstoreSizeMB
)
1600 .setStorefileIndexSizeMB(storefileIndexSizeMB
)
1601 .setRootIndexSizeKB(rootIndexSizeKB
)
1602 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB
)
1603 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB
)
1604 .setReadRequestsCount(r
.getReadRequestsCount())
1605 .setFilteredReadRequestsCount(r
.getFilteredReadRequestsCount())
1606 .setWriteRequestsCount(r
.getWriteRequestsCount())
1607 .setTotalCompactingKVs(totalCompactingKVs
)
1608 .setCurrentCompactedKVs(currentCompactedKVs
)
1609 .setDataLocality(dataLocality
)
1610 .setLastMajorCompactionTs(r
.getOldestHfileTs(true));
1611 ((HRegion
)r
).setCompleteSequenceId(regionLoadBldr
);
1613 return regionLoadBldr
.build();
1617 * @param encodedRegionName
1618 * @return An instance of RegionLoad.
1620 public RegionLoad
createRegionLoad(final String encodedRegionName
) throws IOException
{
1621 Region r
= onlineRegions
.get(encodedRegionName
);
1622 return r
!= null ?
createRegionLoad(r
, null, null) : null;
1626 * Inner class that runs on a long period checking if regions need compaction.
1628 private static class CompactionChecker
extends ScheduledChore
{
1629 private final HRegionServer instance
;
1630 private final int majorCompactPriority
;
1631 private final static int DEFAULT_PRIORITY
= Integer
.MAX_VALUE
;
1632 private long iteration
= 0;
1634 CompactionChecker(final HRegionServer h
, final int sleepTime
,
1635 final Stoppable stopper
) {
1636 super("CompactionChecker", stopper
, sleepTime
);
1638 LOG
.info(this.getName() + " runs every " + StringUtils
.formatTime(sleepTime
));
1640 /* MajorCompactPriority is configurable.
1641 * If not set, the compaction will use default priority.
1643 this.majorCompactPriority
= this.instance
.conf
.
1644 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1649 protected void chore() {
1650 for (Region r
: this.instance
.onlineRegions
.values()) {
1653 for (Store s
: r
.getStores()) {
1655 long multiplier
= s
.getCompactionCheckMultiplier();
1656 assert multiplier
> 0;
1657 if (iteration
% multiplier
!= 0) continue;
1658 if (s
.needsCompaction()) {
1659 // Queue a compaction. Will recognize if major is needed.
1660 this.instance
.compactSplitThread
.requestSystemCompaction(r
, s
, getName()
1661 + " requests compaction");
1662 } else if (s
.isMajorCompaction()) {
1663 s
.triggerMajorCompaction();
1664 if (majorCompactPriority
== DEFAULT_PRIORITY
1665 || majorCompactPriority
> ((HRegion
)r
).getCompactPriority()) {
1666 this.instance
.compactSplitThread
.requestCompaction(r
, s
, getName()
1667 + " requests major compaction; use default priority", null);
1669 this.instance
.compactSplitThread
.requestCompaction(r
, s
, getName()
1670 + " requests major compaction; use configured priority",
1671 this.majorCompactPriority
, null, null);
1674 } catch (IOException e
) {
1675 LOG
.warn("Failed major compaction check on " + r
, e
);
1679 iteration
= (iteration
== Long
.MAX_VALUE
) ?
0 : (iteration
+ 1);
1683 static class PeriodicMemstoreFlusher
extends ScheduledChore
{
1684 final HRegionServer server
;
1685 final static int RANGE_OF_DELAY
= 5 * 60 * 1000; // 5 min in milliseconds
1686 final static int MIN_DELAY_TIME
= 0; // millisec
1687 public PeriodicMemstoreFlusher(int cacheFlushInterval
, final HRegionServer server
) {
1688 super(server
.getServerName() + "-MemstoreFlusherChore", server
, cacheFlushInterval
);
1689 this.server
= server
;
1693 protected void chore() {
1694 final StringBuffer whyFlush
= new StringBuffer();
1695 for (Region r
: this.server
.onlineRegions
.values()) {
1696 if (r
== null) continue;
1697 if (((HRegion
)r
).shouldFlush(whyFlush
)) {
1698 FlushRequester requester
= server
.getFlushRequester();
1699 if (requester
!= null) {
1700 long randomDelay
= RandomUtils
.nextInt(RANGE_OF_DELAY
) + MIN_DELAY_TIME
;
1701 LOG
.info(getName() + " requesting flush of " +
1702 r
.getRegionInfo().getRegionNameAsString() + " because " +
1703 whyFlush
.toString() +
1704 " after random delay " + randomDelay
+ "ms");
1705 //Throttle the flushes by putting a delay. If we don't throttle, and there
1706 //is a balanced write-load on the regions in a table, we might end up
1707 //overwhelming the filesystem with too many flushes at once.
1708 requester
.requestDelayedFlush(r
, randomDelay
, false);
1716 * Report the status of the server. A server is online once all the startup is
1717 * completed (setting up filesystem, starting service threads, etc.). This
1718 * method is designed mostly to be useful in tests.
1720 * @return true if online, false if not.
1722 public boolean isOnline() {
1723 return online
.get();
1727 * Setup WAL log and replication if enabled.
1728 * Replication setup is done in here because it wants to be hooked up to WAL.
1729 * @return A WAL instance.
1730 * @throws IOException
1732 private WALFactory
setupWALAndReplication() throws IOException
{
1733 // TODO Replication make assumptions here based on the default filesystem impl
1734 final Path oldLogDir
= new Path(walRootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
1735 final String logName
= AbstractFSWALProvider
.getWALDirectoryName(this.serverName
.toString());
1737 Path logDir
= new Path(walRootDir
, logName
);
1738 if (LOG
.isDebugEnabled()) LOG
.debug("logDir=" + logDir
);
1739 if (this.walFs
.exists(logDir
)) {
1740 throw new RegionServerRunningException("Region server has already " +
1741 "created directory at " + this.serverName
.toString());
1744 // Instantiate replication manager if replication enabled. Pass it the
1746 createNewReplicationInstance(conf
, this, this.walFs
, logDir
, oldLogDir
);
1748 // listeners the wal factory will add to wals it creates.
1749 final List
<WALActionsListener
> listeners
= new ArrayList
<>();
1750 listeners
.add(new MetricsWAL());
1751 if (this.replicationSourceHandler
!= null &&
1752 this.replicationSourceHandler
.getWALActionsListener() != null) {
1753 // Replication handler is an implementation of WALActionsListener.
1754 listeners
.add(this.replicationSourceHandler
.getWALActionsListener());
1757 return new WALFactory(conf
, listeners
, serverName
.toString());
1760 public MetricsRegionServer
getRegionServerMetrics() {
1761 return this.metricsRegionServer
;
1765 * @return Master address tracker instance.
1767 public MasterAddressTracker
getMasterAddressTracker() {
1768 return this.masterAddressTracker
;
1772 * Start maintenance Threads, Server, Worker and lease checker threads.
1773 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1774 * get an unhandled exception. We cannot set the handler on all threads.
1775 * Server's internal Listener thread is off limits. For Server, if an OOME, it
1776 * waits a while then retries. Meantime, a flush or a compaction that tries to
1777 * run should trigger same critical condition and the shutdown will run. On
1778 * its way out, this server will shut down Server. Leases are sort of
1779 * inbetween. It has an internal thread that while it inherits from Chore, it
1780 * keeps its own internal stop mechanism so needs to be stopped by this
1781 * hosting server. Worker logs the exception and exits.
1783 private void startServiceThreads() throws IOException
{
1784 // Start executor services
1785 this.service
.startExecutorService(ExecutorType
.RS_OPEN_REGION
,
1786 conf
.getInt("hbase.regionserver.executor.openregion.threads", 3));
1787 this.service
.startExecutorService(ExecutorType
.RS_OPEN_META
,
1788 conf
.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1789 this.service
.startExecutorService(ExecutorType
.RS_OPEN_PRIORITY_REGION
,
1790 conf
.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
1791 this.service
.startExecutorService(ExecutorType
.RS_CLOSE_REGION
,
1792 conf
.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1793 this.service
.startExecutorService(ExecutorType
.RS_CLOSE_META
,
1794 conf
.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1795 if (conf
.getBoolean(StoreScanner
.STORESCANNER_PARALLEL_SEEK_ENABLE
, false)) {
1796 this.service
.startExecutorService(ExecutorType
.RS_PARALLEL_SEEK
,
1797 conf
.getInt("hbase.storescanner.parallel.seek.threads", 10));
1799 this.service
.startExecutorService(ExecutorType
.RS_LOG_REPLAY_OPS
, conf
.getInt(
1800 "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination
.DEFAULT_MAX_SPLITTERS
));
1801 // Start the threads for compacted files discharger
1802 this.service
.startExecutorService(ExecutorType
.RS_COMPACTED_FILES_DISCHARGER
,
1803 conf
.getInt(CompactionConfiguration
.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT
, 10));
1804 if (ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(conf
)) {
1805 this.service
.startExecutorService(ExecutorType
.RS_REGION_REPLICA_FLUSH_OPS
,
1806 conf
.getInt("hbase.regionserver.region.replica.flusher.threads",
1807 conf
.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1810 Threads
.setDaemonThreadRunning(this.walRoller
.getThread(), getName() + ".logRoller",
1811 uncaughtExceptionHandler
);
1812 this.cacheFlusher
.start(uncaughtExceptionHandler
);
1814 if (this.compactionChecker
!= null) choreService
.scheduleChore(compactionChecker
);
1815 if (this.periodicFlusher
!= null) choreService
.scheduleChore(periodicFlusher
);
1816 if (this.healthCheckChore
!= null) choreService
.scheduleChore(healthCheckChore
);
1817 if (this.nonceManagerChore
!= null) choreService
.scheduleChore(nonceManagerChore
);
1818 if (this.storefileRefresher
!= null) choreService
.scheduleChore(storefileRefresher
);
1819 if (this.movedRegionsCleaner
!= null) choreService
.scheduleChore(movedRegionsCleaner
);
1821 // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1822 // an unhandled exception, it will just exit.
1823 Threads
.setDaemonThreadRunning(this.leases
.getThread(), getName() + ".leaseChecker",
1824 uncaughtExceptionHandler
);
1826 if (this.replicationSourceHandler
== this.replicationSinkHandler
&&
1827 this.replicationSourceHandler
!= null) {
1828 this.replicationSourceHandler
.startReplicationService();
1830 if (this.replicationSourceHandler
!= null) {
1831 this.replicationSourceHandler
.startReplicationService();
1833 if (this.replicationSinkHandler
!= null) {
1834 this.replicationSinkHandler
.startReplicationService();
1838 // Create the log splitting worker and start it
1839 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1840 // quite a while inside Connection layer. The worker won't be available for other
1841 // tasks even after current task is preempted after a split task times out.
1842 Configuration sinkConf
= HBaseConfiguration
.create(conf
);
1843 sinkConf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
1844 conf
.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1845 sinkConf
.setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
,
1846 conf
.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1847 sinkConf
.setInt("hbase.client.serverside.retries.multiplier", 1);
1848 this.splitLogWorker
= new SplitLogWorker(this, sinkConf
, this, this, walFactory
);
1849 splitLogWorker
.start();
1853 * Puts up the webui.
1854 * @return Returns final port -- maybe different from what we started with.
1855 * @throws IOException
1857 private int putUpWebUI() throws IOException
{
1858 int port
= this.conf
.getInt(HConstants
.REGIONSERVER_INFO_PORT
,
1859 HConstants
.DEFAULT_REGIONSERVER_INFOPORT
);
1860 String addr
= this.conf
.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1862 if(this instanceof HMaster
) {
1863 port
= conf
.getInt(HConstants
.MASTER_INFO_PORT
,
1864 HConstants
.DEFAULT_MASTER_INFOPORT
);
1865 addr
= this.conf
.get("hbase.master.info.bindAddress", "0.0.0.0");
1867 // -1 is for disabling info server
1868 if (port
< 0) return port
;
1870 if (!Addressing
.isLocalAddress(InetAddress
.getByName(addr
))) {
1872 "Failed to start http info server. Address " + addr
1873 + " does not belong to this host. Correct configuration parameter: "
1874 + "hbase.regionserver.info.bindAddress";
1876 throw new IOException(msg
);
1878 // check if auto port bind enabled
1879 boolean auto
= this.conf
.getBoolean(HConstants
.REGIONSERVER_INFO_PORT_AUTO
,
1883 this.infoServer
= new InfoServer(getProcessName(), addr
, port
, false, this.conf
);
1884 infoServer
.addServlet("dump", "/dump", getDumpServlet());
1885 configureInfoServer();
1886 this.infoServer
.start();
1888 } catch (BindException e
) {
1890 // auto bind disabled throw BindException
1891 LOG
.error("Failed binding http info server to port: " + port
);
1894 // auto bind enabled, try to use another port
1895 LOG
.info("Failed binding http info server to port: " + port
);
1899 port
= this.infoServer
.getPort();
1900 conf
.setInt(HConstants
.REGIONSERVER_INFO_PORT
, port
);
1901 int masterInfoPort
= conf
.getInt(HConstants
.MASTER_INFO_PORT
,
1902 HConstants
.DEFAULT_MASTER_INFOPORT
);
1903 conf
.setInt("hbase.master.info.port.orig", masterInfoPort
);
1904 conf
.setInt(HConstants
.MASTER_INFO_PORT
, port
);
1909 * Verify that server is healthy
1911 private boolean isHealthy() {
1913 // File system problem
1916 // Verify that all threads are alive
1917 if (!(leases
.isAlive()
1918 && cacheFlusher
.isAlive() && walRoller
.isAlive()
1919 && this.compactionChecker
.isScheduled()
1920 && this.periodicFlusher
.isScheduled())) {
1921 stop("One or more threads are no longer alive -- stop");
1927 private static final byte[] UNSPECIFIED_REGION
= new byte[]{};
1930 public List
<WAL
> getWALs() throws IOException
{
1931 return walFactory
.getWALs();
1935 public WAL
getWAL(HRegionInfo regionInfo
) throws IOException
{
1937 // _ROOT_ and hbase:meta regions have separate WAL.
1938 if (regionInfo
!= null && regionInfo
.isMetaTable()
1939 && regionInfo
.getReplicaId() == HRegionInfo
.DEFAULT_REPLICA_ID
) {
1940 wal
= walFactory
.getMetaWAL(regionInfo
.getEncodedNameAsBytes());
1941 } else if (regionInfo
== null) {
1942 wal
= walFactory
.getWAL(UNSPECIFIED_REGION
, null);
1944 byte[] namespace
= regionInfo
.getTable().getNamespace();
1945 wal
= walFactory
.getWAL(regionInfo
.getEncodedNameAsBytes(), namespace
);
1947 walRoller
.addWAL(wal
);
1952 public Connection
getConnection() {
1953 return getClusterConnection();
1957 public ClusterConnection
getClusterConnection() {
1958 return this.clusterConnection
;
1962 public MetaTableLocator
getMetaTableLocator() {
1963 return this.metaTableLocator
;
1967 public void stop(final String msg
) {
1968 if (!this.stopped
) {
1969 LOG
.info("***** STOPPING region server '" + this + "' *****");
1971 if (this.rsHost
!= null) {
1972 this.rsHost
.preStop(msg
);
1974 this.stopped
= true;
1975 LOG
.info("STOPPED: " + msg
);
1976 // Wakes run() if it is sleeping
1977 sleeper
.skipSleepCycle();
1978 } catch (IOException exp
) {
1979 LOG
.warn("The region server did not stop", exp
);
1984 public void waitForServerOnline(){
1985 while (!isStopped() && !isOnline()) {
1986 synchronized (online
) {
1988 online
.wait(msgInterval
);
1989 } catch (InterruptedException ie
) {
1990 Thread
.currentThread().interrupt();
1998 public void postOpenDeployTasks(final Region r
) throws KeeperException
, IOException
{
1999 postOpenDeployTasks(new PostOpenDeployContext(r
, -1));
2003 public void postOpenDeployTasks(final PostOpenDeployContext context
)
2004 throws KeeperException
, IOException
{
2005 Region r
= context
.getRegion();
2006 long masterSystemTime
= context
.getMasterSystemTime();
2007 Preconditions
.checkArgument(r
instanceof HRegion
, "r must be an HRegion");
2008 rpcServices
.checkOpen();
2009 LOG
.info("Post open deploy tasks for " + r
.getRegionInfo().getRegionNameAsString());
2010 // Do checks to see if we need to compact (references or too many files)
2011 for (Store s
: r
.getStores()) {
2012 if (s
.hasReferences() || s
.needsCompaction()) {
2013 this.compactSplitThread
.requestSystemCompaction(r
, s
, "Opening Region");
2016 long openSeqNum
= r
.getOpenSeqNum();
2017 if (openSeqNum
== HConstants
.NO_SEQNUM
) {
2018 // If we opened a region, we should have read some sequence number from it.
2019 LOG
.error("No sequence number found when opening " +
2020 r
.getRegionInfo().getRegionNameAsString());
2024 // Update flushed sequence id of a recovering region in ZK
2025 updateRecoveringRegionLastFlushedSequenceId(r
);
2028 if (!reportRegionStateTransition(new RegionStateTransitionContext(
2029 TransitionCode
.OPENED
, openSeqNum
, masterSystemTime
, r
.getRegionInfo()))) {
2030 throw new IOException("Failed to report opened region to master: "
2031 + r
.getRegionInfo().getRegionNameAsString());
2034 triggerFlushInPrimaryRegion((HRegion
)r
);
2036 LOG
.debug("Finished post open deploy task for " + r
.getRegionInfo().getRegionNameAsString());
2040 public boolean reportRegionStateTransition(TransitionCode code
, HRegionInfo
... hris
) {
2041 return reportRegionStateTransition(code
, HConstants
.NO_SEQNUM
, hris
);
2045 public boolean reportRegionStateTransition(
2046 TransitionCode code
, long openSeqNum
, HRegionInfo
... hris
) {
2047 return reportRegionStateTransition(
2048 new RegionStateTransitionContext(code
, HConstants
.NO_SEQNUM
, -1, hris
));
2052 public boolean reportRegionStateTransition(final RegionStateTransitionContext context
) {
2053 TransitionCode code
= context
.getCode();
2054 long openSeqNum
= context
.getOpenSeqNum();
2055 long masterSystemTime
= context
.getMasterSystemTime();
2056 HRegionInfo
[] hris
= context
.getHris();
2058 if (TEST_SKIP_REPORTING_TRANSITION
) {
2059 // This is for testing only in case there is no master
2060 // to handle the region transition report at all.
2061 if (code
== TransitionCode
.OPENED
) {
2062 Preconditions
.checkArgument(hris
!= null && hris
.length
== 1);
2063 if (hris
[0].isMetaRegion()) {
2065 MetaTableLocator
.setMetaLocation(getZooKeeper(), serverName
,
2066 hris
[0].getReplicaId(),State
.OPEN
);
2067 } catch (KeeperException e
) {
2068 LOG
.info("Failed to update meta location", e
);
2073 MetaTableAccessor
.updateRegionLocation(clusterConnection
,
2074 hris
[0], serverName
, openSeqNum
, masterSystemTime
);
2075 } catch (IOException e
) {
2076 LOG
.info("Failed to update meta", e
);
2084 ReportRegionStateTransitionRequest
.Builder builder
=
2085 ReportRegionStateTransitionRequest
.newBuilder();
2086 builder
.setServer(ProtobufUtil
.toServerName(serverName
));
2087 RegionStateTransition
.Builder transition
= builder
.addTransitionBuilder();
2088 transition
.setTransitionCode(code
);
2089 if (code
== TransitionCode
.OPENED
&& openSeqNum
>= 0) {
2090 transition
.setOpenSeqNum(openSeqNum
);
2092 for (HRegionInfo hri
: hris
) {
2093 transition
.addRegionInfo(HRegionInfo
.convert(hri
));
2095 ReportRegionStateTransitionRequest request
= builder
.build();
2096 while (keepLooping()) {
2097 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2100 createRegionServerStatusStub();
2103 ReportRegionStateTransitionResponse response
=
2104 rss
.reportRegionStateTransition(null, request
);
2105 if (response
.hasErrorMessage()) {
2106 LOG
.info("Failed to transition " + hris
[0]
2107 + " to " + code
+ ": " + response
.getErrorMessage());
2111 } catch (ServiceException se
) {
2112 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2113 LOG
.info("Failed to report region transition, will retry", ioe
);
2114 if (rssStub
== rss
) {
2123 public long requestRegionSplit(final HRegionInfo regionInfo
, final byte[] splitRow
) {
2124 NonceGenerator ng
= clusterConnection
.getNonceGenerator();
2125 final long nonceGroup
= ng
.getNonceGroup();
2126 final long nonce
= ng
.newNonce();
2128 SplitTableRegionRequest request
=
2129 RequestConverter
.buildSplitTableRegionRequest(regionInfo
, splitRow
, nonceGroup
, nonce
);
2131 while (keepLooping()) {
2132 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2135 createRegionServerStatusStub();
2138 SplitTableRegionResponse response
= rss
.splitRegion(null, request
);
2140 //TODO: should we limit the retry number before quitting?
2141 if (response
== null || (procId
= response
.getProcId()) == -1) {
2142 LOG
.warn("Failed to split " + regionInfo
+ " retrying...");
2147 } catch (ServiceException se
) {
2148 // TODO: retry or just fail
2149 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2150 LOG
.info("Failed to split region, will retry", ioe
);
2151 if (rssStub
== rss
) {
2160 public boolean isProcedureFinished(final long procId
) throws IOException
{
2161 GetProcedureResultRequest request
=
2162 GetProcedureResultRequest
.newBuilder().setProcId(procId
).build();
2164 while (keepLooping()) {
2165 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2168 createRegionServerStatusStub();
2171 // TODO: find a way to get proc result
2172 GetProcedureResultResponse response
= rss
.getProcedureResult(null, request
);
2174 if (response
== null) {
2175 LOG
.warn("Failed to get procedure (id=" + procId
+ ") status.");
2177 } else if (response
.getState() == GetProcedureResultResponse
.State
.RUNNING
) {
2179 } else if (response
.hasException()) {
2180 // Procedure failed.
2181 throw ForeignExceptionUtil
.toIOException(response
.getException());
2183 // Procedure completes successfully
2185 } catch (ServiceException se
) {
2186 // TODO: retry or just fail
2187 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2188 LOG
.warn("Failed to get split region procedure result. Retrying", ioe
);
2189 if (rssStub
== rss
) {
2198 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2199 * block this thread. See RegionReplicaFlushHandler for details.
2201 void triggerFlushInPrimaryRegion(final HRegion region
) {
2202 if (ServerRegionReplicaUtil
.isDefaultReplica(region
.getRegionInfo())) {
2205 if (!ServerRegionReplicaUtil
.isRegionReplicaReplicationEnabled(region
.conf
) ||
2206 !ServerRegionReplicaUtil
.isRegionReplicaWaitForPrimaryFlushEnabled(
2208 region
.setReadsEnabled(true);
2212 region
.setReadsEnabled(false); // disable reads before marking the region as opened.
2213 // RegionReplicaFlushHandler might reset this.
2215 // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2216 this.service
.submit(
2217 new RegionReplicaFlushHandler(this, clusterConnection
,
2218 rpcRetryingCallerFactory
, rpcControllerFactory
, operationTimeout
, region
));
2222 public RpcServerInterface
getRpcServer() {
2223 return rpcServices
.rpcServer
;
2227 public RSRpcServices
getRSRpcServices() {
2232 * Cause the server to exit without closing the regions it is serving, the log
2233 * it is using and without notifying the master. Used unit testing and on
2234 * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2237 * the reason we are aborting
2239 * the exception that caused the abort, or null
2242 public void abort(String reason
, Throwable cause
) {
2243 String msg
= "***** ABORTING region server " + this + ": " + reason
+ " *****";
2244 if (cause
!= null) {
2245 LOG
.fatal(msg
, cause
);
2249 this.abortRequested
= true;
2250 // HBASE-4014: show list of coprocessors that were loaded to help debug
2251 // regionserver crashes.Note that we're implicitly using
2252 // java.util.HashSet's toString() method to print the coprocessor names.
2253 LOG
.fatal("RegionServer abort: loaded coprocessors are: " +
2254 CoprocessorHost
.getLoadedCoprocessors());
2255 // Try and dump metrics if abort -- might give clue as to how fatal came about....
2257 LOG
.info("Dump of metrics as JSON on abort: " + JSONBean
.dumpRegionServerMetrics());
2258 } catch (MalformedObjectNameException
| IOException e
) {
2259 LOG
.warn("Failed dumping metrics", e
);
2262 // Do our best to report our abort to the master, but this may not work
2264 if (cause
!= null) {
2265 msg
+= "\nCause:\n" + StringUtils
.stringifyException(cause
);
2267 // Report to the master but only if we have already registered with the master.
2268 if (rssStub
!= null && this.serverName
!= null) {
2269 ReportRSFatalErrorRequest
.Builder builder
=
2270 ReportRSFatalErrorRequest
.newBuilder();
2272 ServerName
.parseVersionedServerName(this.serverName
.getVersionedBytes());
2273 builder
.setServer(ProtobufUtil
.toServerName(sn
));
2274 builder
.setErrorMessage(msg
);
2275 rssStub
.reportRSFatalError(null, builder
.build());
2277 } catch (Throwable t
) {
2278 LOG
.warn("Unable to report fatal error to master", t
);
2284 * @see HRegionServer#abort(String, Throwable)
2286 public void abort(String reason
) {
2287 abort(reason
, null);
2291 public boolean isAborted() {
2292 return this.abortRequested
;
2296 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2297 * logs but it does close socket in case want to bring up server on old
2298 * hostname+port immediately.
2301 protected void kill() {
2303 abort("Simulated kill");
2307 * Called on stop/abort before closing the cluster connection and meta locator.
2309 protected void sendShutdownInterrupt() {
2313 * Wait on all threads to finish. Presumption is that all closes and stops
2314 * have already been called.
2316 protected void stopServiceThreads() {
2317 // clean up the scheduled chores
2318 if (this.choreService
!= null) choreService
.shutdown();
2319 if (this.nonceManagerChore
!= null) nonceManagerChore
.cancel(true);
2320 if (this.compactionChecker
!= null) compactionChecker
.cancel(true);
2321 if (this.periodicFlusher
!= null) periodicFlusher
.cancel(true);
2322 if (this.healthCheckChore
!= null) healthCheckChore
.cancel(true);
2323 if (this.storefileRefresher
!= null) storefileRefresher
.cancel(true);
2324 if (this.movedRegionsCleaner
!= null) movedRegionsCleaner
.cancel(true);
2326 if (this.cacheFlusher
!= null) {
2327 this.cacheFlusher
.join();
2330 if (this.spanReceiverHost
!= null) {
2331 this.spanReceiverHost
.closeReceivers();
2333 if (this.walRoller
!= null) {
2334 this.walRoller
.close();
2336 if (this.compactSplitThread
!= null) {
2337 this.compactSplitThread
.join();
2339 if (this.service
!= null) this.service
.shutdown();
2340 if (this.replicationSourceHandler
!= null &&
2341 this.replicationSourceHandler
== this.replicationSinkHandler
) {
2342 this.replicationSourceHandler
.stopReplicationService();
2344 if (this.replicationSourceHandler
!= null) {
2345 this.replicationSourceHandler
.stopReplicationService();
2347 if (this.replicationSinkHandler
!= null) {
2348 this.replicationSinkHandler
.stopReplicationService();
2354 * @return Return the object that implements the replication
2358 public ReplicationSourceService
getReplicationSourceService() {
2359 return replicationSourceHandler
;
2363 * @return Return the object that implements the replication
2366 ReplicationSinkService
getReplicationSinkService() {
2367 return replicationSinkHandler
;
2371 * Get the current master from ZooKeeper and open the RPC connection to it.
2372 * To get a fresh connection, the current rssStub must be null.
2373 * Method will block until a master is available. You can break from this
2374 * block by requesting the server stop.
2376 * @return master + port, or null if server has been stopped
2379 protected synchronized ServerName
createRegionServerStatusStub() {
2380 // Create RS stub without refreshing the master node from ZK, use cached data
2381 return createRegionServerStatusStub(false);
2385 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2386 * connection, the current rssStub must be null. Method will block until a master is available.
2387 * You can break from this block by requesting the server stop.
2388 * @param refresh If true then master address will be read from ZK, otherwise use cached data
2389 * @return master + port, or null if server has been stopped
2392 protected synchronized ServerName
createRegionServerStatusStub(boolean refresh
) {
2393 if (rssStub
!= null) {
2394 return masterAddressTracker
.getMasterAddress();
2396 ServerName sn
= null;
2397 long previousLogTime
= 0;
2398 RegionServerStatusService
.BlockingInterface intRssStub
= null;
2399 LockService
.BlockingInterface intLockStub
= null;
2400 boolean interrupted
= false;
2402 while (keepLooping()) {
2403 sn
= this.masterAddressTracker
.getMasterAddress(refresh
);
2405 if (!keepLooping()) {
2406 // give up with no connection.
2407 LOG
.debug("No master found and cluster is stopped; bailing out");
2410 if (System
.currentTimeMillis() > (previousLogTime
+ 1000)) {
2411 LOG
.debug("No master found; retry");
2412 previousLogTime
= System
.currentTimeMillis();
2414 refresh
= true; // let's try pull it from ZK directly
2421 // If we are on the active master, use the shortcut
2422 if (this instanceof HMaster
&& sn
.equals(getServerName())) {
2423 intRssStub
= ((HMaster
)this).getMasterRpcServices();
2424 intLockStub
= ((HMaster
)this).getMasterRpcServices();
2428 BlockingRpcChannel channel
=
2429 this.rpcClient
.createBlockingRpcChannel(sn
, userProvider
.getCurrent(),
2430 shortOperationTimeout
);
2431 intRssStub
= RegionServerStatusService
.newBlockingStub(channel
);
2432 intLockStub
= LockService
.newBlockingStub(channel
);
2434 } catch (IOException e
) {
2435 if (System
.currentTimeMillis() > (previousLogTime
+ 1000)) {
2436 e
= e
instanceof RemoteException ?
2437 ((RemoteException
)e
).unwrapRemoteException() : e
;
2438 if (e
instanceof ServerNotRunningYetException
) {
2439 LOG
.info("Master isn't available yet, retrying");
2441 LOG
.warn("Unable to connect to master. Retrying. Error was:", e
);
2443 previousLogTime
= System
.currentTimeMillis();
2452 Thread
.currentThread().interrupt();
2455 this.rssStub
= intRssStub
;
2456 this.lockStub
= intLockStub
;
2461 * @return True if we should break loop because cluster is going down or
2462 * this server has been stopped or hdfs has gone bad.
2464 private boolean keepLooping() {
2465 return !this.stopped
&& isClusterUp();
2469 * Let the master know we're here Run initialization using parameters passed
2471 * @return A Map of key/value configurations we got from the Master else
2472 * null if we failed to register.
2473 * @throws IOException
2475 private RegionServerStartupResponse
reportForDuty() throws IOException
{
2476 ServerName masterServerName
= createRegionServerStatusStub(true);
2477 if (masterServerName
== null) return null;
2478 RegionServerStartupResponse result
= null;
2480 rpcServices
.requestCount
.reset();
2481 rpcServices
.rpcGetRequestCount
.reset();
2482 rpcServices
.rpcScanRequestCount
.reset();
2483 rpcServices
.rpcMultiRequestCount
.reset();
2484 rpcServices
.rpcMutateRequestCount
.reset();
2485 LOG
.info("reportForDuty to master=" + masterServerName
+ " with port="
2486 + rpcServices
.isa
.getPort() + ", startcode=" + this.startcode
);
2487 long now
= EnvironmentEdgeManager
.currentTime();
2488 int port
= rpcServices
.isa
.getPort();
2489 RegionServerStartupRequest
.Builder request
= RegionServerStartupRequest
.newBuilder();
2490 if (shouldUseThisHostnameInstead()) {
2491 request
.setUseThisHostnameInstead(useThisHostnameInstead
);
2493 request
.setPort(port
);
2494 request
.setServerStartCode(this.startcode
);
2495 request
.setServerCurrentTime(now
);
2496 result
= this.rssStub
.regionServerStartup(null, request
.build());
2497 } catch (ServiceException se
) {
2498 IOException ioe
= ProtobufUtil
.getRemoteException(se
);
2499 if (ioe
instanceof ClockOutOfSyncException
) {
2500 LOG
.fatal("Master rejected startup because clock is out of sync", ioe
);
2501 // Re-throw IOE will cause RS to abort
2503 } else if (ioe
instanceof ServerNotRunningYetException
) {
2504 LOG
.debug("Master is not running yet");
2506 LOG
.warn("error telling master we are up", se
);
2514 public RegionStoreSequenceIds
getLastSequenceId(byte[] encodedRegionName
) {
2516 GetLastFlushedSequenceIdRequest req
=
2517 RequestConverter
.buildGetLastFlushedSequenceIdRequest(encodedRegionName
);
2518 RegionServerStatusService
.BlockingInterface rss
= rssStub
;
2519 if (rss
== null) { // Try to connect one more time
2520 createRegionServerStatusStub();
2523 // Still no luck, we tried
2524 LOG
.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2525 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2529 GetLastFlushedSequenceIdResponse resp
= rss
.getLastFlushedSequenceId(null, req
);
2530 return RegionStoreSequenceIds
.newBuilder()
2531 .setLastFlushedSequenceId(resp
.getLastFlushedSequenceId())
2532 .addAllStoreSequenceId(resp
.getStoreLastFlushedSequenceIdList()).build();
2533 } catch (ServiceException e
) {
2534 LOG
.warn("Unable to connect to the master to check the last flushed sequence id", e
);
2535 return RegionStoreSequenceIds
.newBuilder().setLastFlushedSequenceId(HConstants
.NO_SEQNUM
)
2541 * Closes all regions. Called on our way out.
2542 * Assumes that its not possible for new regions to be added to onlineRegions
2543 * while this method runs.
2545 protected void closeAllRegions(final boolean abort
) {
2546 closeUserRegions(abort
);
2547 closeMetaTableRegions(abort
);
2551 * Close meta region if we carry it
2552 * @param abort Whether we're running an abort.
2554 void closeMetaTableRegions(final boolean abort
) {
2556 this.lock
.writeLock().lock();
2558 for (Map
.Entry
<String
, Region
> e
: onlineRegions
.entrySet()) {
2559 HRegionInfo hri
= e
.getValue().getRegionInfo();
2560 if (hri
.isMetaRegion()) {
2561 meta
= e
.getValue();
2563 if (meta
!= null) break;
2566 this.lock
.writeLock().unlock();
2568 if (meta
!= null) closeRegionIgnoreErrors(meta
.getRegionInfo(), abort
);
2572 * Schedule closes on all user regions.
2573 * Should be safe calling multiple times because it wont' close regions
2574 * that are already closed or that are closing.
2575 * @param abort Whether we're running an abort.
2577 void closeUserRegions(final boolean abort
) {
2578 this.lock
.writeLock().lock();
2580 for (Map
.Entry
<String
, Region
> e
: this.onlineRegions
.entrySet()) {
2581 Region r
= e
.getValue();
2582 if (!r
.getRegionInfo().isMetaTable() && r
.isAvailable()) {
2583 // Don't update zk with this close transition; pass false.
2584 closeRegionIgnoreErrors(r
.getRegionInfo(), abort
);
2588 this.lock
.writeLock().unlock();
2592 /** @return the info server */
2593 public InfoServer
getInfoServer() {
2598 * @return true if a stop has been requested.
2601 public boolean isStopped() {
2602 return this.stopped
;
2606 public boolean isStopping() {
2607 return this.stopping
;
2611 public Map
<String
, Region
> getRecoveringRegions() {
2612 return this.recoveringRegions
;
2617 * @return the configuration
2620 public Configuration
getConfiguration() {
2624 /** @return the write lock for the server */
2625 ReentrantReadWriteLock
.WriteLock
getWriteLock() {
2626 return lock
.writeLock();
2629 public int getNumberOfOnlineRegions() {
2630 return this.onlineRegions
.size();
2633 boolean isOnlineRegionsEmpty() {
2634 return this.onlineRegions
.isEmpty();
2638 * For tests, web ui and metrics.
2639 * This method will only work if HRegionServer is in the same JVM as client;
2640 * HRegion cannot be serialized to cross an rpc.
2642 public Collection
<Region
> getOnlineRegionsLocalContext() {
2643 Collection
<Region
> regions
= this.onlineRegions
.values();
2644 return Collections
.unmodifiableCollection(regions
);
2648 public void addToOnlineRegions(Region region
) {
2649 this.onlineRegions
.put(region
.getRegionInfo().getEncodedName(), region
);
2650 configurationManager
.registerObserver(region
);
2654 * @return A new Map of online regions sorted by region size with the first entry being the
2655 * biggest. If two regions are the same size, then the last one found wins; i.e. this method
2656 * may NOT return all regions.
2658 SortedMap
<Long
, Region
> getCopyOfOnlineRegionsSortedBySize() {
2659 // we'll sort the regions in reverse
2660 SortedMap
<Long
, Region
> sortedRegions
= new TreeMap
<>(
2661 new Comparator
<Long
>() {
2663 public int compare(Long a
, Long b
) {
2664 return -1 * a
.compareTo(b
);
2667 // Copy over all regions. Regions are sorted by size with biggest first.
2668 for (Region region
: this.onlineRegions
.values()) {
2669 sortedRegions
.put(region
.getMemstoreSize(), region
);
2671 return sortedRegions
;
2675 * @return time stamp in millis of when this region server was started
2677 public long getStartcode() {
2678 return this.startcode
;
2681 /** @return reference to FlushRequester */
2683 public FlushRequester
getFlushRequester() {
2684 return this.cacheFlusher
;
2688 * Get the top N most loaded regions this server is serving so we can tell the
2689 * master which regions it can reallocate if we're overloaded. TODO: actually
2690 * calculate which regions are most loaded. (Right now, we're just grabbing
2691 * the first N regions being served regardless of load.)
2693 protected HRegionInfo
[] getMostLoadedRegions() {
2694 ArrayList
<HRegionInfo
> regions
= new ArrayList
<>();
2695 for (Region r
: onlineRegions
.values()) {
2696 if (!r
.isAvailable()) {
2699 if (regions
.size() < numRegionsToReport
) {
2700 regions
.add(r
.getRegionInfo());
2705 return regions
.toArray(new HRegionInfo
[regions
.size()]);
2709 public Leases
getLeases() {
2714 * @return Return the rootDir.
2716 protected Path
getRootDir() {
2721 * @return Return the fs.
2724 public FileSystem
getFileSystem() {
2729 * @return Return the walRootDir.
2731 protected Path
getWALRootDir() {
2736 * @return Return the walFs.
2738 protected FileSystem
getWALFileSystem() {
2743 public String
toString() {
2744 return getServerName().toString();
2748 * Interval at which threads should run
2750 * @return the interval
2752 public int getThreadWakeFrequency() {
2753 return threadWakeFrequency
;
2757 public ZooKeeperWatcher
getZooKeeper() {
2762 public BaseCoordinatedStateManager
getCoordinatedStateManager() {
2767 public ServerName
getServerName() {
2772 public CompactionRequestor
getCompactionRequester() {
2773 return this.compactSplitThread
;
2776 public RegionServerCoprocessorHost
getRegionServerCoprocessorHost(){
2781 public ConcurrentMap
<byte[], Boolean
> getRegionsInTransitionInRS() {
2782 return this.regionsInTransitionInRS
;
2786 public ExecutorService
getExecutorService() {
2791 public ChoreService
getChoreService() {
2792 return choreService
;
2796 public RegionServerQuotaManager
getRegionServerQuotaManager() {
2797 return rsQuotaManager
;
2801 // Main program and support routines
2805 * Load the replication service objects, if any
2807 static private void createNewReplicationInstance(Configuration conf
,
2808 HRegionServer server
, FileSystem walFs
, Path walDir
, Path oldWALDir
) throws IOException
{
2810 if ((server
instanceof HMaster
) &&
2811 (!BaseLoadBalancer
.userTablesOnMaster(conf
))) {
2815 // read in the name of the source replication class from the config file.
2816 String sourceClassname
= conf
.get(HConstants
.REPLICATION_SOURCE_SERVICE_CLASSNAME
,
2817 HConstants
.REPLICATION_SERVICE_CLASSNAME_DEFAULT
);
2819 // read in the name of the sink replication class from the config file.
2820 String sinkClassname
= conf
.get(HConstants
.REPLICATION_SINK_SERVICE_CLASSNAME
,
2821 HConstants
.REPLICATION_SERVICE_CLASSNAME_DEFAULT
);
2823 // If both the sink and the source class names are the same, then instantiate
2825 if (sourceClassname
.equals(sinkClassname
)) {
2826 server
.replicationSourceHandler
= (ReplicationSourceService
)
2827 newReplicationInstance(sourceClassname
,
2828 conf
, server
, walFs
, walDir
, oldWALDir
);
2829 server
.replicationSinkHandler
= (ReplicationSinkService
)
2830 server
.replicationSourceHandler
;
2832 server
.replicationSourceHandler
= (ReplicationSourceService
)
2833 newReplicationInstance(sourceClassname
,
2834 conf
, server
, walFs
, walDir
, oldWALDir
);
2835 server
.replicationSinkHandler
= (ReplicationSinkService
)
2836 newReplicationInstance(sinkClassname
,
2837 conf
, server
, walFs
, walDir
, oldWALDir
);
2841 static private ReplicationService
newReplicationInstance(String classname
,
2842 Configuration conf
, HRegionServer server
, FileSystem walFs
, Path logDir
,
2843 Path oldLogDir
) throws IOException
{
2845 Class
<?
> clazz
= null;
2847 ClassLoader classLoader
= Thread
.currentThread().getContextClassLoader();
2848 clazz
= Class
.forName(classname
, true, classLoader
);
2849 } catch (java
.lang
.ClassNotFoundException nfe
) {
2850 throw new IOException("Could not find class for " + classname
);
2853 // create an instance of the replication object.
2854 ReplicationService service
= (ReplicationService
)
2855 ReflectionUtils
.newInstance(clazz
, conf
);
2856 service
.initialize(server
, walFs
, logDir
, oldLogDir
);
2861 * Utility for constructing an instance of the passed HRegionServer class.
2863 * @param regionServerClass
2865 * @return HRegionServer instance.
2867 public static HRegionServer
constructRegionServer(
2868 Class
<?
extends HRegionServer
> regionServerClass
,
2869 final Configuration conf2
, CoordinatedStateManager cp
) {
2871 Constructor
<?
extends HRegionServer
> c
= regionServerClass
2872 .getConstructor(Configuration
.class, CoordinatedStateManager
.class);
2873 return c
.newInstance(conf2
, cp
);
2874 } catch (Exception e
) {
2875 throw new RuntimeException("Failed construction of " + "Regionserver: "
2876 + regionServerClass
.toString(), e
);
2881 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2883 public static void main(String
[] args
) throws Exception
{
2884 LOG
.info("***** STARTING service '" + HRegionServer
.class.getSimpleName() + "' *****");
2885 VersionInfo
.logVersion();
2886 Configuration conf
= HBaseConfiguration
.create();
2887 @SuppressWarnings("unchecked")
2888 Class
<?
extends HRegionServer
> regionServerClass
= (Class
<?
extends HRegionServer
>) conf
2889 .getClass(HConstants
.REGION_SERVER_IMPL
, HRegionServer
.class);
2891 new HRegionServerCommandLine(regionServerClass
).doMain(args
);
2895 * Gets the online regions of the specified table.
2896 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>.
2897 * Only returns <em>online</em> regions. If a region on this table has been
2898 * closed during a disable, etc., it will not be included in the returned list.
2899 * So, the returned list may not necessarily be ALL regions in this table, its
2900 * all the ONLINE regions in the table.
2902 * @return Online regions from <code>tableName</code>
2905 public List
<Region
> getOnlineRegions(TableName tableName
) {
2906 List
<Region
> tableRegions
= new ArrayList
<>();
2907 synchronized (this.onlineRegions
) {
2908 for (Region region
: this.onlineRegions
.values()) {
2909 HRegionInfo regionInfo
= region
.getRegionInfo();
2910 if(regionInfo
.getTable().equals(tableName
)) {
2911 tableRegions
.add(region
);
2915 return tableRegions
;
2919 public List
<Region
> getOnlineRegions() {
2920 List
<Region
> allRegions
= new ArrayList
<>();
2921 synchronized (this.onlineRegions
) {
2922 // Return a clone copy of the onlineRegions
2923 allRegions
.addAll(onlineRegions
.values());
2928 * Gets the online tables in this RS.
2929 * This method looks at the in-memory onlineRegions.
2930 * @return all the online tables in this RS
2933 public Set
<TableName
> getOnlineTables() {
2934 Set
<TableName
> tables
= new HashSet
<>();
2935 synchronized (this.onlineRegions
) {
2936 for (Region region
: this.onlineRegions
.values()) {
2937 tables
.add(region
.getTableDesc().getTableName());
2943 // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2944 public String
[] getRegionServerCoprocessors() {
2945 TreeSet
<String
> coprocessors
= new TreeSet
<>();
2947 coprocessors
.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2948 } catch (IOException exception
) {
2949 LOG
.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2951 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
2953 Collection
<Region
> regions
= getOnlineRegionsLocalContext();
2954 for (Region region
: regions
) {
2955 coprocessors
.addAll(region
.getCoprocessorHost().getCoprocessors());
2957 coprocessors
.addAll(getWAL(region
.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2958 } catch (IOException exception
) {
2959 LOG
.warn("Exception attempting to fetch wal coprocessor information for region " + region
+
2961 LOG
.debug("Exception details for failure to fetch wal coprocessor information.", exception
);
2964 coprocessors
.addAll(rsHost
.getCoprocessors());
2965 return coprocessors
.toArray(new String
[coprocessors
.size()]);
2969 * Try to close the region, logs a warning on failure but continues.
2970 * @param region Region to close
2972 private void closeRegionIgnoreErrors(HRegionInfo region
, final boolean abort
) {
2974 if (!closeRegion(region
.getEncodedName(), abort
, null)) {
2975 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
2976 " - ignoring and continuing");
2978 } catch (IOException e
) {
2979 LOG
.warn("Failed to close " + region
.getRegionNameAsString() +
2980 " - ignoring and continuing", e
);
2985 * Close asynchronously a region, can be called from the master or internally by the regionserver
2986 * when stopping. If called from the master, the region will update the znode status.
2989 * If an opening was in progress, this method will cancel it, but will not start a new close. The
2990 * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2994 * If a close was in progress, this new request will be ignored, and an exception thrown.
2997 * @param encodedName Region to close
2998 * @param abort True if we are aborting
2999 * @return True if closed a region.
3000 * @throws NotServingRegionException if the region is not online
3002 protected boolean closeRegion(String encodedName
, final boolean abort
, final ServerName sn
)
3003 throws NotServingRegionException
{
3004 //Check for permissions to close.
3005 Region actualRegion
= this.getFromOnlineRegions(encodedName
);
3006 // Can be null if we're calling close on a region that's not online
3007 if ((actualRegion
!= null) && (actualRegion
.getCoprocessorHost() != null)) {
3009 actualRegion
.getCoprocessorHost().preClose(false);
3010 } catch (IOException exp
) {
3011 LOG
.warn("Unable to close region: the coprocessor launched an error ", exp
);
3016 final Boolean previous
= this.regionsInTransitionInRS
.putIfAbsent(encodedName
.getBytes(),
3019 if (Boolean
.TRUE
.equals(previous
)) {
3020 LOG
.info("Received CLOSE for the region:" + encodedName
+ " , which we are already " +
3021 "trying to OPEN. Cancelling OPENING.");
3022 if (!regionsInTransitionInRS
.replace(encodedName
.getBytes(), previous
, Boolean
.FALSE
)){
3023 // The replace failed. That should be an exceptional case, but theoretically it can happen.
3024 // We're going to try to do a standard close then.
3025 LOG
.warn("The opening for region " + encodedName
+ " was done before we could cancel it." +
3026 " Doing a standard close now");
3027 return closeRegion(encodedName
, abort
, sn
);
3029 // Let's get the region from the online region list again
3030 actualRegion
= this.getFromOnlineRegions(encodedName
);
3031 if (actualRegion
== null) { // If already online, we still need to close it.
3032 LOG
.info("The opening previously in progress has been cancelled by a CLOSE request.");
3033 // The master deletes the znode when it receives this exception.
3034 throw new NotServingRegionException("The region " + encodedName
+
3035 " was opening but not yet served. Opening is cancelled.");
3037 } else if (Boolean
.FALSE
.equals(previous
)) {
3038 LOG
.info("Received CLOSE for the region: " + encodedName
+
3039 ", which we are already trying to CLOSE, but not completed yet");
3043 if (actualRegion
== null) {
3044 LOG
.debug("Received CLOSE for a region which is not online, and we're not opening.");
3045 this.regionsInTransitionInRS
.remove(encodedName
.getBytes());
3046 // The master deletes the znode when it receives this exception.
3047 throw new NotServingRegionException("The region " + encodedName
+
3048 " is not online, and is not opening.");
3051 CloseRegionHandler crh
;
3052 final HRegionInfo hri
= actualRegion
.getRegionInfo();
3053 if (hri
.isMetaRegion()) {
3054 crh
= new CloseMetaHandler(this, this, hri
, abort
);
3056 crh
= new CloseRegionHandler(this, this, hri
, abort
, sn
);
3058 this.service
.submit(crh
);
3063 * Close and offline the region for split or merge
3065 * @param regionEncodedName the name of the region(s) to close
3066 * @return true if closed the region successfully.
3067 * @throws IOException
3069 protected boolean closeAndOfflineRegionForSplitOrMerge(
3070 final List
<String
> regionEncodedName
) throws IOException
{
3071 for (int i
= 0; i
< regionEncodedName
.size(); ++i
) {
3072 Region regionToClose
= this.getFromOnlineRegions(regionEncodedName
.get(i
));
3073 if (regionToClose
!= null) {
3074 Map
<byte[], List
<StoreFile
>> hstoreFiles
= null;
3075 Exception exceptionToThrow
= null;
3077 hstoreFiles
= ((HRegion
)regionToClose
).close(false);
3078 } catch (Exception e
) {
3079 exceptionToThrow
= e
;
3081 if (exceptionToThrow
== null && hstoreFiles
== null) {
3082 // The region was closed by someone else
3084 new IOException("Failed to close region: already closed by another thread");
3087 if (exceptionToThrow
!= null) {
3088 if (exceptionToThrow
instanceof IOException
) throw (IOException
)exceptionToThrow
;
3089 throw new IOException(exceptionToThrow
);
3091 if (regionToClose
.getTableDesc().hasSerialReplicationScope()) {
3092 // For serial replication, we need add a final barrier on this region. But the splitting
3093 // or merging may be reverted, so we should make sure if we reopen this region, the open
3094 // barrier is same as this final barrier
3095 long seq
= regionToClose
.getMaxFlushedSeqId();
3096 if (seq
== HConstants
.NO_SEQNUM
) {
3097 // No edits in WAL for this region; get the sequence number when the region was opened.
3098 seq
= regionToClose
.getOpenSeqNum();
3099 if (seq
== HConstants
.NO_SEQNUM
) {
3100 // This region has no data
3106 Put finalBarrier
= MetaTableAccessor
.makeBarrierPut(
3107 Bytes
.toBytes(regionEncodedName
.get(i
)),
3109 regionToClose
.getTableDesc().getTableName().getName());
3110 MetaTableAccessor
.putToMetaTable(getConnection(), finalBarrier
);
3112 // Offline the region
3113 this.removeFromOnlineRegions(regionToClose
, null);
3121 * @return HRegion for the passed binary <code>regionName</code> or null if
3122 * named region is not member of the online regions.
3124 public Region
getOnlineRegion(final byte[] regionName
) {
3125 String encodedRegionName
= HRegionInfo
.encodeRegionName(regionName
);
3126 return this.onlineRegions
.get(encodedRegionName
);
3129 public InetSocketAddress
[] getRegionBlockLocations(final String encodedRegionName
) {
3130 return this.regionFavoredNodesMap
.get(encodedRegionName
);
3134 public Region
getFromOnlineRegions(final String encodedRegionName
) {
3135 return this.onlineRegions
.get(encodedRegionName
);
3140 public boolean removeFromOnlineRegions(final Region r
, ServerName destination
) {
3141 Region toReturn
= this.onlineRegions
.remove(r
.getRegionInfo().getEncodedName());
3142 if (destination
!= null) {
3143 long closeSeqNum
= r
.getMaxFlushedSeqId();
3144 if (closeSeqNum
== HConstants
.NO_SEQNUM
) {
3145 // No edits in WAL for this region; get the sequence number when the region was opened.
3146 closeSeqNum
= r
.getOpenSeqNum();
3147 if (closeSeqNum
== HConstants
.NO_SEQNUM
) closeSeqNum
= 0;
3149 addToMovedRegions(r
.getRegionInfo().getEncodedName(), destination
, closeSeqNum
);
3151 this.regionFavoredNodesMap
.remove(r
.getRegionInfo().getEncodedName());
3152 return toReturn
!= null;
3156 * Protected utility method for safely obtaining an HRegion handle.
3159 * Name of online {@link HRegion} to return
3160 * @return {@link HRegion} for <code>regionName</code>
3161 * @throws NotServingRegionException
3163 protected Region
getRegion(final byte[] regionName
)
3164 throws NotServingRegionException
{
3165 String encodedRegionName
= HRegionInfo
.encodeRegionName(regionName
);
3166 return getRegionByEncodedName(regionName
, encodedRegionName
);
3169 public Region
getRegionByEncodedName(String encodedRegionName
)
3170 throws NotServingRegionException
{
3171 return getRegionByEncodedName(null, encodedRegionName
);
3174 protected Region
getRegionByEncodedName(byte[] regionName
, String encodedRegionName
)
3175 throws NotServingRegionException
{
3176 Region region
= this.onlineRegions
.get(encodedRegionName
);
3177 if (region
== null) {
3178 MovedRegionInfo moveInfo
= getMovedRegion(encodedRegionName
);
3179 if (moveInfo
!= null) {
3180 throw new RegionMovedException(moveInfo
.getServerName(), moveInfo
.getSeqNum());
3182 Boolean isOpening
= this.regionsInTransitionInRS
.get(Bytes
.toBytes(encodedRegionName
));
3183 String regionNameStr
= regionName
== null?
3184 encodedRegionName
: Bytes
.toStringBinary(regionName
);
3185 if (isOpening
!= null && isOpening
.booleanValue()) {
3186 throw new RegionOpeningException("Region " + regionNameStr
+
3187 " is opening on " + this.serverName
);
3189 throw new NotServingRegionException("Region " + regionNameStr
+
3190 " is not online on " + this.serverName
);
3196 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3197 * IOE if it isn't already.
3199 * @param t Throwable
3201 * @param msg Message to log in error. Can be null.
3203 * @return Throwable converted to an IOE; methods can only let out IOEs.
3205 private Throwable
cleanup(final Throwable t
, final String msg
) {
3206 // Don't log as error if NSRE; NSRE is 'normal' operation.
3207 if (t
instanceof NotServingRegionException
) {
3208 LOG
.debug("NotServingRegionException; " + t
.getMessage());
3211 Throwable e
= t
instanceof RemoteException ?
((RemoteException
) t
).unwrapRemoteException() : t
;
3217 if (!rpcServices
.checkOOME(t
)) {
3226 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3228 * @return Make <code>t</code> an IOE if it isn't already.
3230 protected IOException
convertThrowableToIOE(final Throwable t
, final String msg
) {
3231 return (t
instanceof IOException ?
(IOException
) t
: msg
== null
3232 || msg
.length() == 0 ?
new IOException(t
) : new IOException(msg
, t
));
3236 * Checks to see if the file system is still accessible. If not, sets
3237 * abortRequested and stopRequested
3239 * @return false if file system is not available
3241 public boolean checkFileSystem() {
3242 if (this.fsOk
&& this.fs
!= null) {
3244 FSUtils
.checkFileSystemAvailable(this.fs
);
3245 } catch (IOException e
) {
3246 abort("File System not available", e
);
3254 public void updateRegionFavoredNodesMapping(String encodedRegionName
,
3255 List
<org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
.ServerName
> favoredNodes
) {
3256 InetSocketAddress
[] addr
= new InetSocketAddress
[favoredNodes
.size()];
3257 // Refer to the comment on the declaration of regionFavoredNodesMap on why
3258 // it is a map of region name to InetSocketAddress[]
3259 for (int i
= 0; i
< favoredNodes
.size(); i
++) {
3260 addr
[i
] = InetSocketAddress
.createUnresolved(favoredNodes
.get(i
).getHostName(),
3261 favoredNodes
.get(i
).getPort());
3263 regionFavoredNodesMap
.put(encodedRegionName
, addr
);
3267 * Return the favored nodes for a region given its encoded name. Look at the
3268 * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3269 * @param encodedRegionName
3270 * @return array of favored locations
3273 public InetSocketAddress
[] getFavoredNodesForRegion(String encodedRegionName
) {
3274 return regionFavoredNodesMap
.get(encodedRegionName
);
3278 public ServerNonceManager
getNonceManager() {
3279 return this.nonceManager
;
3282 private static class MovedRegionInfo
{
3283 private final ServerName serverName
;
3284 private final long seqNum
;
3285 private final long ts
;
3287 public MovedRegionInfo(ServerName serverName
, long closeSeqNum
) {
3288 this.serverName
= serverName
;
3289 this.seqNum
= closeSeqNum
;
3290 ts
= EnvironmentEdgeManager
.currentTime();
3293 public ServerName
getServerName() {
3297 public long getSeqNum() {
3301 public long getMoveTime() {
3306 // This map will contains all the regions that we closed for a move.
3307 // We add the time it was moved as we don't want to keep too old information
3308 protected Map
<String
, MovedRegionInfo
> movedRegions
=
3309 new ConcurrentHashMap
<>(3000);
3311 // We need a timeout. If not there is a risk of giving a wrong information: this would double
3312 // the number of network calls instead of reducing them.
3313 private static final int TIMEOUT_REGION_MOVED
= (2 * 60 * 1000);
3315 protected void addToMovedRegions(String encodedName
, ServerName destination
, long closeSeqNum
) {
3316 if (ServerName
.isSameHostnameAndPort(destination
, this.getServerName())) {
3317 LOG
.warn("Not adding moved region record: " + encodedName
+ " to self.");
3320 LOG
.info("Adding moved region record: "
3321 + encodedName
+ " to " + destination
+ " as of " + closeSeqNum
);
3322 movedRegions
.put(encodedName
, new MovedRegionInfo(destination
, closeSeqNum
));
3325 void removeFromMovedRegions(String encodedName
) {
3326 movedRegions
.remove(encodedName
);
3329 private MovedRegionInfo
getMovedRegion(final String encodedRegionName
) {
3330 MovedRegionInfo dest
= movedRegions
.get(encodedRegionName
);
3332 long now
= EnvironmentEdgeManager
.currentTime();
3334 if (dest
.getMoveTime() > (now
- TIMEOUT_REGION_MOVED
)) {
3337 movedRegions
.remove(encodedRegionName
);
3345 * Remove the expired entries from the moved regions list.
3347 protected void cleanMovedRegions() {
3348 final long cutOff
= System
.currentTimeMillis() - TIMEOUT_REGION_MOVED
;
3349 Iterator
<Entry
<String
, MovedRegionInfo
>> it
= movedRegions
.entrySet().iterator();
3351 while (it
.hasNext()){
3352 Map
.Entry
<String
, MovedRegionInfo
> e
= it
.next();
3353 if (e
.getValue().getMoveTime() < cutOff
) {
3360 * Use this to allow tests to override and schedule more frequently.
3363 protected int movedRegionCleanerPeriod() {
3364 return TIMEOUT_REGION_MOVED
;
3368 * Creates a Chore thread to clean the moved region cache.
3371 protected final static class MovedRegionsCleaner
extends ScheduledChore
implements Stoppable
{
3372 private HRegionServer regionServer
;
3373 Stoppable stoppable
;
3375 private MovedRegionsCleaner(
3376 HRegionServer regionServer
, Stoppable stoppable
){
3377 super("MovedRegionsCleaner for region " + regionServer
, stoppable
,
3378 regionServer
.movedRegionCleanerPeriod());
3379 this.regionServer
= regionServer
;
3380 this.stoppable
= stoppable
;
3383 static MovedRegionsCleaner
create(HRegionServer rs
){
3384 Stoppable stoppable
= new Stoppable() {
3385 private volatile boolean isStopped
= false;
3386 @Override public void stop(String why
) { isStopped
= true;}
3387 @Override public boolean isStopped() {return isStopped
;}
3390 return new MovedRegionsCleaner(rs
, stoppable
);
3394 protected void chore() {
3395 regionServer
.cleanMovedRegions();
3399 public void stop(String why
) {
3400 stoppable
.stop(why
);
3404 public boolean isStopped() {
3405 return stoppable
.isStopped();
3409 private String
getMyEphemeralNodePath() {
3410 return ZKUtil
.joinZNode(this.zooKeeper
.znodePaths
.rsZNode
, getServerName().toString());
3413 private boolean isHealthCheckerConfigured() {
3414 String healthScriptLocation
= this.conf
.get(HConstants
.HEALTH_SCRIPT_LOC
);
3415 return org
.apache
.commons
.lang
.StringUtils
.isNotBlank(healthScriptLocation
);
3419 * @return the underlying {@link CompactSplitThread} for the servers
3421 public CompactSplitThread
getCompactSplitThread() {
3422 return this.compactSplitThread
;
3426 * A helper function to store the last flushed sequence Id with the previous failed RS for a
3427 * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
3428 * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
3429 * @throws KeeperException
3430 * @throws IOException
3432 private void updateRecoveringRegionLastFlushedSequenceId(Region r
) throws KeeperException
,
3434 if (!r
.isRecovering()) {
3435 // return immdiately for non-recovering regions
3439 HRegionInfo regionInfo
= r
.getRegionInfo();
3440 ZooKeeperWatcher zkw
= getZooKeeper();
3441 String previousRSName
= this.getLastFailedRSFromZK(regionInfo
.getEncodedName());
3442 Map
<byte[], Long
> maxSeqIdInStores
= r
.getMaxStoreSeqId();
3443 long minSeqIdForLogReplay
= -1;
3444 for (Long storeSeqIdForReplay
: maxSeqIdInStores
.values()) {
3445 if (minSeqIdForLogReplay
== -1 || storeSeqIdForReplay
< minSeqIdForLogReplay
) {
3446 minSeqIdForLogReplay
= storeSeqIdForReplay
;
3451 long lastRecordedFlushedSequenceId
= -1;
3452 String nodePath
= ZKUtil
.joinZNode(this.zooKeeper
.znodePaths
.recoveringRegionsZNode
,
3453 regionInfo
.getEncodedName());
3454 // recovering-region level
3457 data
= ZKUtil
.getData(zkw
, nodePath
);
3458 } catch (InterruptedException e
) {
3459 throw new InterruptedIOException();
3462 lastRecordedFlushedSequenceId
= ZKSplitLog
.parseLastFlushedSequenceIdFrom(data
);
3464 if (data
== null || lastRecordedFlushedSequenceId
< minSeqIdForLogReplay
) {
3465 ZKUtil
.setData(zkw
, nodePath
, ZKUtil
.positionToByteArray(minSeqIdForLogReplay
));
3467 if (previousRSName
!= null) {
3468 // one level deeper for the failed RS
3469 nodePath
= ZKUtil
.joinZNode(nodePath
, previousRSName
);
3470 ZKUtil
.setData(zkw
, nodePath
,
3471 ZKUtil
.regionSequenceIdsToByteArray(minSeqIdForLogReplay
, maxSeqIdInStores
));
3472 LOG
.debug("Update last flushed sequence id of region " + regionInfo
.getEncodedName() +
3473 " for " + previousRSName
);
3475 LOG
.warn("Can't find failed region server for recovering region " +
3476 regionInfo
.getEncodedName());
3478 } catch (NoNodeException ignore
) {
3479 LOG
.debug("Region " + regionInfo
.getEncodedName() +
3480 " must have completed recovery because its recovery znode has been removed", ignore
);
3485 * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3486 * @param encodedRegionName
3487 * @throws KeeperException
3489 private String
getLastFailedRSFromZK(String encodedRegionName
) throws KeeperException
{
3490 String result
= null;
3492 ZooKeeperWatcher zkw
= this.getZooKeeper();
3493 String nodePath
= ZKUtil
.joinZNode(zkw
.znodePaths
.recoveringRegionsZNode
, encodedRegionName
);
3494 List
<String
> failedServers
= ZKUtil
.listChildrenNoWatch(zkw
, nodePath
);
3495 if (failedServers
== null || failedServers
.isEmpty()) {
3498 for (String failedServer
: failedServers
) {
3499 String rsPath
= ZKUtil
.joinZNode(nodePath
, failedServer
);
3500 Stat stat
= new Stat();
3501 ZKUtil
.getDataNoWatch(zkw
, rsPath
, stat
);
3502 if (maxZxid
< stat
.getCzxid()) {
3503 maxZxid
= stat
.getCzxid();
3504 result
= failedServer
;
3510 public CoprocessorServiceResponse
execRegionServerService(
3511 @SuppressWarnings("UnusedParameters") final RpcController controller
,
3512 final CoprocessorServiceRequest serviceRequest
) throws ServiceException
{
3514 ServerRpcController serviceController
= new ServerRpcController();
3515 CoprocessorServiceCall call
= serviceRequest
.getCall();
3516 String serviceName
= call
.getServiceName();
3517 com
.google
.protobuf
.Service service
= coprocessorServiceHandlers
.get(serviceName
);
3518 if (service
== null) {
3519 throw new UnknownProtocolException(null, "No registered coprocessor service found for " +
3522 com
.google
.protobuf
.Descriptors
.ServiceDescriptor serviceDesc
=
3523 service
.getDescriptorForType();
3525 String methodName
= call
.getMethodName();
3526 com
.google
.protobuf
.Descriptors
.MethodDescriptor methodDesc
=
3527 serviceDesc
.findMethodByName(methodName
);
3528 if (methodDesc
== null) {
3529 throw new UnknownProtocolException(service
.getClass(), "Unknown method " + methodName
+
3530 " called on service " + serviceName
);
3533 com
.google
.protobuf
.Message request
=
3534 CoprocessorRpcUtils
.getRequest(service
, methodDesc
, call
.getRequest());
3535 final com
.google
.protobuf
.Message
.Builder responseBuilder
=
3536 service
.getResponsePrototype(methodDesc
).newBuilderForType();
3537 service
.callMethod(methodDesc
, serviceController
, request
,
3538 new com
.google
.protobuf
.RpcCallback
<com
.google
.protobuf
.Message
>() {
3540 public void run(com
.google
.protobuf
.Message message
) {
3541 if (message
!= null) {
3542 responseBuilder
.mergeFrom(message
);
3546 IOException exception
= CoprocessorRpcUtils
.getControllerException(serviceController
);
3547 if (exception
!= null) {
3550 return CoprocessorRpcUtils
.getResponse(responseBuilder
.build(), HConstants
.EMPTY_BYTE_ARRAY
);
3551 } catch (IOException ie
) {
3552 throw new ServiceException(ie
);
3557 * @return The cache config instance used by the regionserver.
3559 public CacheConfig
getCacheConfig() {
3560 return this.cacheConfig
;
3564 * @return : Returns the ConfigurationManager object for testing purposes.
3566 protected ConfigurationManager
getConfigurationManager() {
3567 return configurationManager
;
3571 * @return Return table descriptors implementation.
3573 public TableDescriptors
getTableDescriptors() {
3574 return this.tableDescriptors
;
3578 * Reload the configuration from disk.
3580 public void updateConfiguration() {
3581 LOG
.info("Reloading the configuration from disk.");
3582 // Reload the configuration from disk.
3583 conf
.reloadConfiguration();
3584 configurationManager
.notifyAllObservers(conf
);
3588 public double getCompactionPressure() {
3590 for (Region region
: onlineRegions
.values()) {
3591 for (Store store
: region
.getStores()) {
3592 double normCount
= store
.getCompactionPressure();
3593 if (normCount
> max
) {
3602 public HeapMemoryManager
getHeapMemoryManager() {
3608 * @return whether all wal roll request finished for this regionserver
3611 public boolean walRollRequestFinished() {
3612 return this.walRoller
.walRollFinished();
3616 public ThroughputController
getFlushThroughputController() {
3617 return flushThroughputController
;
3621 public double getFlushPressure() {
3622 if (getRegionServerAccounting() == null || cacheFlusher
== null) {
3623 // return 0 during RS initialization
3626 return getRegionServerAccounting().getFlushPressure();
3630 public void onConfigurationChange(Configuration newConf
) {
3631 ThroughputController old
= this.flushThroughputController
;
3633 old
.stop("configuration change");
3635 this.flushThroughputController
= FlushThroughputControllerFactory
.create(this, newConf
);
3639 public MetricsRegionServer
getMetrics() {
3640 return metricsRegionServer
;
3644 public SecureBulkLoadManager
getSecureBulkLoadManager() {
3645 return this.secureBulkLoadManager
;
3649 public EntityLock
regionLock(List
<HRegionInfo
> regionInfos
, String description
,
3650 Abortable abort
) throws IOException
{
3651 return new LockServiceClient(conf
, lockStub
, clusterConnection
.getNonceGenerator())
3652 .regionLock(regionInfos
, description
, abort
);