HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / regionserver / HRegionServer.java
blobcbf6561b838390ff435795afeccdba821fc285b0
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.regionserver;
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.lang.management.MemoryType;
25 import java.lang.management.MemoryUsage;
26 import java.lang.reflect.Constructor;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Comparator;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.Set;
41 import java.util.SortedMap;
42 import java.util.TreeMap;
43 import java.util.TreeSet;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.ConcurrentMap;
46 import java.util.concurrent.ConcurrentSkipListMap;
47 import java.util.concurrent.CountDownLatch;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.atomic.AtomicBoolean;
50 import java.util.concurrent.locks.ReentrantReadWriteLock;
52 import javax.management.MalformedObjectNameException;
53 import javax.management.ObjectName;
54 import javax.servlet.http.HttpServlet;
56 import org.apache.commons.lang.SystemUtils;
57 import org.apache.commons.lang.math.RandomUtils;
58 import org.apache.commons.logging.Log;
59 import org.apache.commons.logging.LogFactory;
60 import org.apache.hadoop.conf.Configuration;
61 import org.apache.hadoop.fs.FileSystem;
62 import org.apache.hadoop.fs.Path;
63 import org.apache.hadoop.hbase.Abortable;
64 import org.apache.hadoop.hbase.ChoreService;
65 import org.apache.hadoop.hbase.ClockOutOfSyncException;
66 import org.apache.hadoop.hbase.CoordinatedStateManager;
67 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
68 import org.apache.hadoop.hbase.HBaseConfiguration;
69 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
70 import org.apache.hadoop.hbase.HConstants;
71 import org.apache.hadoop.hbase.HRegionInfo;
72 import org.apache.hadoop.hbase.HealthCheckChore;
73 import org.apache.hadoop.hbase.MetaTableAccessor;
74 import org.apache.hadoop.hbase.NotServingRegionException;
75 import org.apache.hadoop.hbase.ScheduledChore;
76 import org.apache.hadoop.hbase.ServerName;
77 import org.apache.hadoop.hbase.Stoppable;
78 import org.apache.hadoop.hbase.TableDescriptors;
79 import org.apache.hadoop.hbase.TableName;
80 import org.apache.hadoop.hbase.YouAreDeadException;
81 import org.apache.hadoop.hbase.ZNodeClearer;
82 import org.apache.hadoop.hbase.classification.InterfaceAudience;
83 import org.apache.hadoop.hbase.client.ClusterConnection;
84 import org.apache.hadoop.hbase.client.Connection;
85 import org.apache.hadoop.hbase.client.ConnectionUtils;
86 import org.apache.hadoop.hbase.client.NonceGenerator;
87 import org.apache.hadoop.hbase.client.Put;
88 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
89 import org.apache.hadoop.hbase.client.locking.EntityLock;
90 import org.apache.hadoop.hbase.client.locking.LockServiceClient;
91 import org.apache.hadoop.hbase.conf.ConfigurationManager;
92 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
93 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
94 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
95 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
96 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
97 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
98 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
99 import org.apache.hadoop.hbase.executor.ExecutorService;
100 import org.apache.hadoop.hbase.executor.ExecutorType;
101 import org.apache.hadoop.hbase.fs.HFileSystem;
102 import org.apache.hadoop.hbase.http.InfoServer;
103 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
104 import org.apache.hadoop.hbase.io.hfile.HFile;
105 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
106 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
107 import org.apache.hadoop.hbase.ipc.RpcClient;
108 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
109 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
110 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
111 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
112 import org.apache.hadoop.hbase.ipc.ServerRpcController;
113 import org.apache.hadoop.hbase.master.HMaster;
114 import org.apache.hadoop.hbase.master.RegionState.State;
115 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
116 import org.apache.hadoop.hbase.mob.MobCacheConfig;
117 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
118 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
119 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
120 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
121 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
122 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
123 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
124 import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
125 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
126 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
127 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
128 import org.apache.hadoop.hbase.replication.regionserver.Replication;
129 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
130 import org.apache.hadoop.hbase.security.Superusers;
131 import org.apache.hadoop.hbase.security.UserProvider;
132 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
133 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
134 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
135 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
136 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
137 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
138 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
139 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
140 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
141 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
142 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
143 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
144 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
145 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
146 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
147 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
148 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
149 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
150 import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
151 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
152 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
153 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
154 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
155 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
156 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
157 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
158 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
159 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
160 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
161 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
162 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
163 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
164 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest;
165 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse;
166 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
167 import org.apache.hadoop.hbase.util.Addressing;
168 import org.apache.hadoop.hbase.util.Bytes;
169 import org.apache.hadoop.hbase.util.CompressionTest;
170 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
171 import org.apache.hadoop.hbase.util.FSTableDescriptors;
172 import org.apache.hadoop.hbase.util.FSUtils;
173 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
174 import org.apache.hadoop.hbase.util.HasThread;
175 import org.apache.hadoop.hbase.util.JSONBean;
176 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
177 import org.apache.hadoop.hbase.util.Pair;
178 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
179 import org.apache.hadoop.hbase.util.Sleeper;
180 import org.apache.hadoop.hbase.util.Threads;
181 import org.apache.hadoop.hbase.util.VersionInfo;
182 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
183 import org.apache.hadoop.hbase.wal.WAL;
184 import org.apache.hadoop.hbase.wal.WALFactory;
185 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
186 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
187 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
188 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
189 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
190 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
191 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
192 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
193 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
194 import org.apache.hadoop.ipc.RemoteException;
195 import org.apache.hadoop.metrics2.util.MBeans;
196 import org.apache.hadoop.util.ReflectionUtils;
197 import org.apache.hadoop.util.StringUtils;
198 import org.apache.zookeeper.KeeperException;
199 import org.apache.zookeeper.KeeperException.NoNodeException;
200 import org.apache.zookeeper.data.Stat;
202 import com.google.common.annotations.VisibleForTesting;
203 import com.google.common.base.Preconditions;
204 import com.google.common.collect.Maps;
206 import sun.misc.Signal;
207 import sun.misc.SignalHandler;
210 * HRegionServer makes a set of HRegions available to clients. It checks in with
211 * the HMaster. There are many HRegionServers in a single HBase deployment.
213 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
214 @SuppressWarnings({ "deprecation", "restriction" })
215 public class HRegionServer extends HasThread implements
216 RegionServerServices, LastSequenceId, ConfigurationObserver {
218 public static final String REGION_LOCK_AWAIT_TIME_SEC =
219 "hbase.regionserver.region.lock.await.time.sec";
220 public static final int DEFAULT_REGION_LOCK_AWAIT_TIME_SEC = 300; // 5 min
221 private static final Log LOG = LogFactory.getLog(HRegionServer.class);
224 * For testing only! Set to true to skip notifying region assignment to master .
226 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
227 public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
230 * Strings to be used in forming the exception message for
231 * RegionsAlreadyInTransitionException.
233 protected static final String OPEN = "OPEN";
234 protected static final String CLOSE = "CLOSE";
236 //RegionName vs current action in progress
237 //true - if open region action in progress
238 //false - if close region action in progress
239 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
240 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
242 // Cache flushing
243 protected MemStoreFlusher cacheFlusher;
245 protected HeapMemoryManager hMemManager;
246 protected CountDownLatch initLatch = null;
249 * Cluster connection to be shared by services.
250 * Initialized at server startup and closed when server shuts down.
251 * Clients must never close it explicitly.
253 protected ClusterConnection clusterConnection;
256 * Long-living meta table locator, which is created when the server is started and stopped
257 * when server shuts down. References to this locator shall be used to perform according
258 * operations in EventHandlers. Primary reason for this decision is to make it mockable
259 * for tests.
261 protected MetaTableLocator metaTableLocator;
263 // Watch if a region is out of recovering state from ZooKeeper
264 @SuppressWarnings("unused")
265 private RecoveringRegionWatcher recoveringRegionWatcher;
268 * Go here to get table descriptors.
270 protected TableDescriptors tableDescriptors;
272 // Replication services. If no replication, this handler will be null.
273 protected ReplicationSourceService replicationSourceHandler;
274 protected ReplicationSinkService replicationSinkHandler;
276 // Compactions
277 public CompactSplitThread compactSplitThread;
280 * Map of regions currently being served by this region server. Key is the
281 * encoded region name. All access should be synchronized.
283 protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<>();
286 * Map of encoded region names to the DataNode locations they should be hosted on
287 * We store the value as InetSocketAddress since this is used only in HDFS
288 * API (create() that takes favored nodes as hints for placing file blocks).
289 * We could have used ServerName here as the value class, but we'd need to
290 * convert it to InetSocketAddress at some point before the HDFS API call, and
291 * it seems a bit weird to store ServerName since ServerName refers to RegionServers
292 * and here we really mean DataNode locations.
294 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
295 new ConcurrentHashMap<>();
298 * Set of regions currently being in recovering state which means it can accept writes(edits from
299 * previous failed region server) but not reads. A recovering region is also an online region.
301 protected final Map<String, Region> recoveringRegions = Collections
302 .synchronizedMap(new HashMap<String, Region>());
304 // Leases
305 protected Leases leases;
307 // Instance of the hbase executor service.
308 protected ExecutorService service;
310 // If false, the file system has become unavailable
311 protected volatile boolean fsOk;
312 protected HFileSystem fs;
313 protected HFileSystem walFs;
315 // Set when a report to the master comes back with a message asking us to
316 // shutdown. Also set by call to stop when debugging or running unit tests
317 // of HRegionServer in isolation.
318 private volatile boolean stopped = false;
320 // Go down hard. Used if file system becomes unavailable and also in
321 // debugging and unit tests.
322 private volatile boolean abortRequested;
324 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<>();
326 // A state before we go into stopped state. At this stage we're closing user
327 // space regions.
328 private boolean stopping = false;
330 volatile boolean killed = false;
332 protected final Configuration conf;
334 private Path rootDir;
335 private Path walRootDir;
337 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
339 final int numRetries;
340 protected final int threadWakeFrequency;
341 protected final int msgInterval;
343 protected final int numRegionsToReport;
345 // Stub to do region server status calls against the master.
346 private volatile RegionServerStatusService.BlockingInterface rssStub;
347 private volatile LockService.BlockingInterface lockStub;
348 // RPC client. Used to make the stub above that does region server status checking.
349 RpcClient rpcClient;
351 private RpcRetryingCallerFactory rpcRetryingCallerFactory;
352 private RpcControllerFactory rpcControllerFactory;
354 private UncaughtExceptionHandler uncaughtExceptionHandler;
356 // Info server. Default access so can be used by unit tests. REGIONSERVER
357 // is name of the webapp and the attribute name used stuffing this instance
358 // into web context.
359 protected InfoServer infoServer;
360 private JvmPauseMonitor pauseMonitor;
362 /** region server process name */
363 public static final String REGIONSERVER = "regionserver";
365 MetricsRegionServer metricsRegionServer;
366 MetricsTable metricsTable;
367 private SpanReceiverHost spanReceiverHost;
370 * ChoreService used to schedule tasks that we want to run periodically
372 private final ChoreService choreService;
375 * Check for compactions requests.
377 ScheduledChore compactionChecker;
380 * Check for flushes
382 ScheduledChore periodicFlusher;
384 protected volatile WALFactory walFactory;
386 // WAL roller. log is protected rather than private to avoid
387 // eclipse warning when accessed by inner classes
388 final LogRoller walRoller;
390 // flag set after we're done setting up server threads
391 final AtomicBoolean online = new AtomicBoolean(false);
393 // zookeeper connection and watcher
394 protected ZooKeeperWatcher zooKeeper;
396 // master address tracker
397 private MasterAddressTracker masterAddressTracker;
399 // Cluster Status Tracker
400 protected ClusterStatusTracker clusterStatusTracker;
402 // Log Splitting Worker
403 private SplitLogWorker splitLogWorker;
405 // A sleeper that sleeps for msgInterval.
406 protected final Sleeper sleeper;
408 private final int operationTimeout;
409 private final int shortOperationTimeout;
411 private final RegionServerAccounting regionServerAccounting;
413 // Cache configuration and block cache reference
414 protected CacheConfig cacheConfig;
415 // Cache configuration for mob
416 final MobCacheConfig mobCacheConfig;
418 /** The health check chore. */
419 private HealthCheckChore healthCheckChore;
421 /** The nonce manager chore. */
422 private ScheduledChore nonceManagerChore;
424 private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
427 * The server name the Master sees us as. Its made from the hostname the
428 * master passes us, port, and server startcode. Gets set after registration
429 * against Master.
431 protected ServerName serverName;
434 * hostname specified by hostname config
436 protected String useThisHostnameInstead;
438 // key to the config parameter of server hostname
439 // the specification of server hostname is optional. The hostname should be resolvable from
440 // both master and region server
441 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
442 final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
443 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
444 protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
447 * This servers startcode.
449 protected final long startcode;
452 * Unique identifier for the cluster we are a part of.
454 private String clusterId;
457 * MX Bean for RegionServerInfo
459 private ObjectName mxBean = null;
462 * Chore to clean periodically the moved region list
464 private MovedRegionsCleaner movedRegionsCleaner;
466 // chore for refreshing store files for secondary regions
467 private StorefileRefresherChore storefileRefresher;
469 private RegionServerCoprocessorHost rsHost;
471 private RegionServerProcedureManagerHost rspmHost;
473 private RegionServerQuotaManager rsQuotaManager;
476 * Nonce manager. Nonces are used to make operations like increment and append idempotent
477 * in the case where client doesn't receive the response from a successful operation and
478 * retries. We track the successful ops for some time via a nonce sent by client and handle
479 * duplicate operations (currently, by failing them; in future we might use MVCC to return
480 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
481 * HBASE-3787) are:
482 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
483 * of past records. If we don't read the records, we don't read and recover the nonces.
484 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
485 * - There's no WAL recovery during normal region move, so nonces will not be transfered.
486 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
487 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
488 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
489 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
490 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
491 * latest nonce in it expired. It can also be recovered during move.
493 final ServerNonceManager nonceManager;
495 private UserProvider userProvider;
497 protected final RSRpcServices rpcServices;
499 protected BaseCoordinatedStateManager csm;
502 * Configuration manager is used to register/deregister and notify the configuration observers
503 * when the regionserver is notified that there was a change in the on disk configs.
505 protected final ConfigurationManager configurationManager;
507 private CompactedHFilesDischarger compactedFileDischarger;
509 private volatile ThroughputController flushThroughputController;
511 protected SecureBulkLoadManager secureBulkLoadManager;
514 * Starts a HRegionServer at the default location.
516 public HRegionServer(Configuration conf) throws IOException, InterruptedException {
517 this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
521 * Starts a HRegionServer at the default location
522 * @param csm implementation of CoordinatedStateManager to be used
524 public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws IOException {
525 super("RegionServer"); // thread name
526 this.fsOk = true;
527 this.conf = conf;
528 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
529 HFile.checkHFileVersion(this.conf);
530 checkCodecs(this.conf);
531 this.userProvider = UserProvider.instantiate(conf);
532 FSUtils.setupShortCircuitRead(this.conf);
534 Replication.decorateRegionServerConfiguration(this.conf);
536 // Disable usage of meta replicas in the regionserver
537 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
539 // Config'ed params
540 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
541 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
542 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
543 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
545 this.sleeper = new Sleeper(this.msgInterval, this);
547 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
548 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
550 this.numRegionsToReport = conf.getInt(
551 "hbase.regionserver.numregionstoreport", 10);
553 this.operationTimeout = conf.getInt(
554 HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
555 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
557 this.shortOperationTimeout = conf.getInt(
558 HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
559 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
561 this.abortRequested = false;
562 this.stopped = false;
564 rpcServices = createRpcServices();
565 this.startcode = System.currentTimeMillis();
566 if (this instanceof HMaster) {
567 useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
568 } else {
569 useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
571 String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
572 rpcServices.isa.getHostName();
573 serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
575 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
576 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
578 // login the zookeeper client principal (if using security)
579 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
580 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
581 // login the server principal (if using secure Hadoop)
582 login(userProvider, hostName);
583 // init superusers and add the server principal (if using security)
584 // or process owner as default super user.
585 Superusers.initialize(conf);
587 regionServerAccounting = new RegionServerAccounting(conf);
588 cacheConfig = new CacheConfig(conf);
589 mobCacheConfig = new MobCacheConfig(conf);
590 uncaughtExceptionHandler = new UncaughtExceptionHandler() {
591 @Override
592 public void uncaughtException(Thread t, Throwable e) {
593 abort("Uncaught exception in service thread " + t.getName(), e);
597 initializeFileSystem();
599 service = new ExecutorService(getServerName().toShortString());
600 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
602 // Some unit tests don't need a cluster, so no zookeeper at all
603 if (!conf.getBoolean("hbase.testing.nocluster", false)) {
604 // Open connection to zookeeper and set primary watcher
605 zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
606 rpcServices.isa.getPort(), this, canCreateBaseZNode());
608 this.csm = (BaseCoordinatedStateManager) csm;
609 this.csm.initialize(this);
610 this.csm.start();
612 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
613 masterAddressTracker.start();
615 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
616 clusterStatusTracker.start();
618 this.configurationManager = new ConfigurationManager();
620 rpcServices.start();
621 putUpWebUI();
622 this.walRoller = new LogRoller(this, this);
623 this.choreService = new ChoreService(getServerName().toString(), true);
624 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
626 if (!SystemUtils.IS_OS_WINDOWS) {
627 Signal.handle(new Signal("HUP"), new SignalHandler() {
628 @Override
629 public void handle(Signal signal) {
630 getConfiguration().reloadConfiguration();
631 configurationManager.notifyAllObservers(getConfiguration());
635 // Create the CompactedFileDischarger chore service. This chore helps to
636 // remove the compacted files
637 // that will no longer be used in reads.
638 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
639 // 2 mins so that compacted files can be archived before the TTLCleaner runs
640 int cleanerInterval =
641 conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
642 this.compactedFileDischarger =
643 new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this);
644 choreService.scheduleChore(compactedFileDischarger);
647 private void initializeFileSystem() throws IOException {
648 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
649 // checksum verification enabled, then automatically switch off hdfs checksum verification.
650 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
651 FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf));
652 this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
653 this.walRootDir = FSUtils.getWALRootDir(this.conf);
654 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
655 // underlying hadoop hdfs accessors will be going against wrong filesystem
656 // (unless all is set to defaults).
657 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
658 this.fs = new HFileSystem(this.conf, useHBaseChecksum);
659 this.rootDir = FSUtils.getRootDir(this.conf);
660 this.tableDescriptors = getFsTableDescriptors();
663 protected TableDescriptors getFsTableDescriptors() throws IOException {
664 return new FSTableDescriptors(this.conf,
665 this.fs, this.rootDir, !canUpdateTableDescriptor(), false);
668 protected void setInitLatch(CountDownLatch latch) {
669 this.initLatch = latch;
673 * Returns true if configured hostname should be used
675 protected boolean shouldUseThisHostnameInstead() {
676 return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
679 protected void login(UserProvider user, String host) throws IOException {
680 user.login("hbase.regionserver.keytab.file",
681 "hbase.regionserver.kerberos.principal", host);
684 protected void waitForMasterActive(){
687 protected String getProcessName() {
688 return REGIONSERVER;
691 protected boolean canCreateBaseZNode() {
692 return false;
695 protected boolean canUpdateTableDescriptor() {
696 return false;
699 protected RSRpcServices createRpcServices() throws IOException {
700 return new RSRpcServices(this);
703 protected void configureInfoServer() {
704 infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
705 infoServer.setAttribute(REGIONSERVER, this);
708 protected Class<? extends HttpServlet> getDumpServlet() {
709 return RSDumpServlet.class;
712 @Override
713 public boolean registerService(com.google.protobuf.Service instance) {
715 * No stacking of instances is allowed for a single service name
717 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
718 instance.getDescriptorForType();
719 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
720 if (coprocessorServiceHandlers.containsKey(serviceName)) {
721 LOG.error("Coprocessor service " + serviceName
722 + " already registered, rejecting request from " + instance);
723 return false;
726 coprocessorServiceHandlers.put(serviceName, instance);
727 if (LOG.isDebugEnabled()) {
728 LOG.debug("Registered regionserver coprocessor service: service=" + serviceName);
730 return true;
734 * Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to
735 * the local server. Safe to use going to local or remote server.
736 * Create this instance in a method can be intercepted and mocked in tests.
737 * @throws IOException
739 @VisibleForTesting
740 protected ClusterConnection createClusterConnection() throws IOException {
741 // Create a cluster connection that when appropriate, can short-circuit and go directly to the
742 // local server if the request is to the local server bypassing RPC. Can be used for both local
743 // and remote invocations.
744 return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
745 serverName, rpcServices, rpcServices);
749 * Run test on configured codecs to make sure supporting libs are in place.
750 * @param c
751 * @throws IOException
753 private static void checkCodecs(final Configuration c) throws IOException {
754 // check to see if the codec list is available:
755 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
756 if (codecs == null) return;
757 for (String codec : codecs) {
758 if (!CompressionTest.testCompression(codec)) {
759 throw new IOException("Compression codec " + codec +
760 " not supported, aborting RS construction");
765 public String getClusterId() {
766 return this.clusterId;
770 * Setup our cluster connection if not already initialized.
771 * @throws IOException
773 protected synchronized void setupClusterConnection() throws IOException {
774 if (clusterConnection == null) {
775 clusterConnection = createClusterConnection();
776 metaTableLocator = new MetaTableLocator();
781 * All initialization needed before we go register with Master.
783 * @throws IOException
784 * @throws InterruptedException
786 private void preRegistrationInitialization(){
787 try {
788 setupClusterConnection();
790 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
791 this.secureBulkLoadManager.start();
793 // Health checker thread.
794 if (isHealthCheckerConfigured()) {
795 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
796 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
797 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
800 initializeZooKeeper();
801 if (!isStopped() && !isAborted()) {
802 initializeThreads();
804 } catch (Throwable t) {
805 // Call stop if error or process will stick around for ever since server
806 // puts up non-daemon threads.
807 this.rpcServices.stop();
808 abort("Initialization of RS failed. Hence aborting RS.", t);
813 * Bring up connection to zk ensemble and then wait until a master for this
814 * cluster and then after that, wait until cluster 'up' flag has been set.
815 * This is the order in which master does things.
816 * Finally open long-living server short-circuit connection.
817 * @throws IOException
818 * @throws InterruptedException
820 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
821 justification="cluster Id znode read would give us correct response")
822 private void initializeZooKeeper() throws IOException, InterruptedException {
823 // Create the master address tracker, register with zk, and start it. Then
824 // block until a master is available. No point in starting up if no master
825 // running.
826 blockAndCheckIfStopped(this.masterAddressTracker);
828 // Wait on cluster being up. Master will set this flag up in zookeeper
829 // when ready.
830 blockAndCheckIfStopped(this.clusterStatusTracker);
832 doLatch(this.initLatch);
834 // Retrieve clusterId
835 // Since cluster status is now up
836 // ID should have already been set by HMaster
837 try {
838 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
839 if (clusterId == null) {
840 this.abort("Cluster ID has not been set");
842 LOG.info("ClusterId : "+clusterId);
843 } catch (KeeperException e) {
844 this.abort("Failed to retrieve Cluster ID",e);
847 // In case colocated master, wait here till it's active.
848 // So backup masters won't start as regionservers.
849 // This is to avoid showing backup masters as regionservers
850 // in master web UI, or assigning any region to them.
851 waitForMasterActive();
852 if (isStopped() || isAborted()) {
853 return; // No need for further initialization
856 // watch for snapshots and other procedures
857 try {
858 rspmHost = new RegionServerProcedureManagerHost();
859 rspmHost.loadProcedures(conf);
860 rspmHost.initialize(this);
861 } catch (KeeperException e) {
862 this.abort("Failed to reach zk cluster when creating procedure handler.", e);
864 // register watcher for recovering regions
865 this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
868 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED",
869 justification="We don't care about the return")
870 private void doLatch(final CountDownLatch latch) throws InterruptedException {
871 if (latch != null) {
872 // Result is ignored intentionally but if I remove the below, findbugs complains (the
873 // above justification on this method doesn't seem to suppress it).
874 boolean result = latch.await(20, TimeUnit.SECONDS);
879 * Utilty method to wait indefinitely on a znode availability while checking
880 * if the region server is shut down
881 * @param tracker znode tracker to use
882 * @throws IOException any IO exception, plus if the RS is stopped
883 * @throws InterruptedException
885 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
886 throws IOException, InterruptedException {
887 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
888 if (this.stopped) {
889 throw new IOException("Received the shutdown message while waiting.");
895 * @return False if cluster shutdown in progress
897 private boolean isClusterUp() {
898 return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
901 private void initializeThreads() throws IOException {
902 // Cache flushing thread.
903 this.cacheFlusher = new MemStoreFlusher(conf, this);
905 // Compaction thread
906 this.compactSplitThread = new CompactSplitThread(this);
908 // Background thread to check for compactions; needed if region has not gotten updates
909 // in a while. It will take care of not checking too frequently on store-by-store basis.
910 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
911 this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
912 this.leases = new Leases(this.threadWakeFrequency);
914 // Create the thread to clean the moved regions list
915 movedRegionsCleaner = MovedRegionsCleaner.create(this);
917 if (this.nonceManager != null) {
918 // Create the scheduled chore that cleans up nonces.
919 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
922 // Setup the Quota Manager
923 rsQuotaManager = new RegionServerQuotaManager(this);
925 // Setup RPC client for master communication
926 rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
927 rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
929 boolean onlyMetaRefresh = false;
930 int storefileRefreshPeriod = conf.getInt(
931 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
932 , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
933 if (storefileRefreshPeriod == 0) {
934 storefileRefreshPeriod = conf.getInt(
935 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
936 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
937 onlyMetaRefresh = true;
939 if (storefileRefreshPeriod > 0) {
940 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
941 onlyMetaRefresh, this, this);
943 registerConfigurationObservers();
946 private void registerConfigurationObservers() {
947 // Registering the compactSplitThread object with the ConfigurationManager.
948 configurationManager.registerObserver(this.compactSplitThread);
949 configurationManager.registerObserver(this.rpcServices);
950 configurationManager.registerObserver(this);
954 * The HRegionServer sticks in this loop until closed.
956 @Override
957 public void run() {
958 try {
959 // Do pre-registration initializations; zookeeper, lease threads, etc.
960 preRegistrationInitialization();
961 } catch (Throwable e) {
962 abort("Fatal exception during initialization", e);
965 try {
966 if (!isStopped() && !isAborted()) {
967 ShutdownHook.install(conf, fs, this, Thread.currentThread());
968 // Initialize the RegionServerCoprocessorHost now that our ephemeral
969 // node was created, in case any coprocessors want to use ZooKeeper
970 this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
973 // Try and register with the Master; tell it we are here. Break if
974 // server is stopped or the clusterup flag is down or hdfs went wacky.
975 while (keepLooping()) {
976 RegionServerStartupResponse w = reportForDuty();
977 if (w == null) {
978 LOG.warn("reportForDuty failed; sleeping and then retrying.");
979 this.sleeper.sleep();
980 } else {
981 handleReportForDutyResponse(w);
982 break;
986 if (!isStopped() && isHealthy()){
987 // start the snapshot handler and other procedure handlers,
988 // since the server is ready to run
989 rspmHost.start();
991 // Start the Quota Manager
992 rsQuotaManager.start(getRpcServer().getScheduler());
995 // We registered with the Master. Go into run mode.
996 long lastMsg = System.currentTimeMillis();
997 long oldRequestCount = -1;
998 // The main run loop.
999 while (!isStopped() && isHealthy()) {
1000 if (!isClusterUp()) {
1001 if (isOnlineRegionsEmpty()) {
1002 stop("Exiting; cluster shutdown set and not carrying any regions");
1003 } else if (!this.stopping) {
1004 this.stopping = true;
1005 LOG.info("Closing user regions");
1006 closeUserRegions(this.abortRequested);
1007 } else if (this.stopping) {
1008 boolean allUserRegionsOffline = areAllUserRegionsOffline();
1009 if (allUserRegionsOffline) {
1010 // Set stopped if no more write requests tp meta tables
1011 // since last time we went around the loop. Any open
1012 // meta regions will be closed on our way out.
1013 if (oldRequestCount == getWriteRequestCount()) {
1014 stop("Stopped; only catalog regions remaining online");
1015 break;
1017 oldRequestCount = getWriteRequestCount();
1018 } else {
1019 // Make sure all regions have been closed -- some regions may
1020 // have not got it because we were splitting at the time of
1021 // the call to closeUserRegions.
1022 closeUserRegions(this.abortRequested);
1024 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1027 long now = System.currentTimeMillis();
1028 if ((now - lastMsg) >= msgInterval) {
1029 tryRegionServerReport(lastMsg, now);
1030 lastMsg = System.currentTimeMillis();
1032 if (!isStopped() && !isAborted()) {
1033 this.sleeper.sleep();
1035 } // for
1036 } catch (Throwable t) {
1037 if (!rpcServices.checkOOME(t)) {
1038 String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
1039 abort(prefix + t.getMessage(), t);
1042 // Run shutdown.
1043 if (mxBean != null) {
1044 MBeans.unregister(mxBean);
1045 mxBean = null;
1047 if (this.leases != null) this.leases.closeAfterLeasesExpire();
1048 if (this.splitLogWorker != null) {
1049 splitLogWorker.stop();
1051 if (this.infoServer != null) {
1052 LOG.info("Stopping infoServer");
1053 try {
1054 this.infoServer.stop();
1055 } catch (Exception e) {
1056 LOG.error("Failed to stop infoServer", e);
1059 // Send cache a shutdown.
1060 if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
1061 cacheConfig.getBlockCache().shutdown();
1063 mobCacheConfig.getMobFileCache().shutdown();
1065 if (movedRegionsCleaner != null) {
1066 movedRegionsCleaner.stop("Region Server stopping");
1069 // Send interrupts to wake up threads if sleeping so they notice shutdown.
1070 // TODO: Should we check they are alive? If OOME could have exited already
1071 if (this.hMemManager != null) this.hMemManager.stop();
1072 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1073 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1074 if (this.compactionChecker != null) this.compactionChecker.cancel(true);
1075 if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
1076 if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
1077 if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
1078 sendShutdownInterrupt();
1080 // Stop the quota manager
1081 if (rsQuotaManager != null) {
1082 rsQuotaManager.stop();
1085 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1086 if (rspmHost != null) {
1087 rspmHost.stop(this.abortRequested || this.killed);
1090 if (this.killed) {
1091 // Just skip out w/o closing regions. Used when testing.
1092 } else if (abortRequested) {
1093 if (this.fsOk) {
1094 closeUserRegions(abortRequested); // Don't leave any open file handles
1096 LOG.info("aborting server " + this.serverName);
1097 } else {
1098 closeUserRegions(abortRequested);
1099 LOG.info("stopping server " + this.serverName);
1102 // so callers waiting for meta without timeout can stop
1103 if (this.metaTableLocator != null) this.metaTableLocator.stop();
1104 if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1105 try {
1106 this.clusterConnection.close();
1107 } catch (IOException e) {
1108 // Although the {@link Closeable} interface throws an {@link
1109 // IOException}, in reality, the implementation would never do that.
1110 LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
1114 // Closing the compactSplit thread before closing meta regions
1115 if (!this.killed && containsMetaTableRegions()) {
1116 if (!abortRequested || this.fsOk) {
1117 if (this.compactSplitThread != null) {
1118 this.compactSplitThread.join();
1119 this.compactSplitThread = null;
1121 closeMetaTableRegions(abortRequested);
1125 if (!this.killed && this.fsOk) {
1126 waitOnAllRegionsToClose(abortRequested);
1127 LOG.info("stopping server " + this.serverName +
1128 "; all regions closed.");
1131 //fsOk flag may be changed when closing regions throws exception.
1132 if (this.fsOk) {
1133 shutdownWAL(!abortRequested);
1136 // Make sure the proxy is down.
1137 if (this.rssStub != null) {
1138 this.rssStub = null;
1140 if (this.lockStub != null) {
1141 this.lockStub = null;
1143 if (this.rpcClient != null) {
1144 this.rpcClient.close();
1146 if (this.leases != null) {
1147 this.leases.close();
1149 if (this.pauseMonitor != null) {
1150 this.pauseMonitor.stop();
1153 if (!killed) {
1154 stopServiceThreads();
1157 if (this.rpcServices != null) {
1158 this.rpcServices.stop();
1161 try {
1162 deleteMyEphemeralNode();
1163 } catch (KeeperException.NoNodeException nn) {
1164 } catch (KeeperException e) {
1165 LOG.warn("Failed deleting my ephemeral node", e);
1167 // We may have failed to delete the znode at the previous step, but
1168 // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1169 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1171 if (this.zooKeeper != null) {
1172 this.zooKeeper.close();
1174 LOG.info("stopping server " + this.serverName +
1175 "; zookeeper connection closed.");
1177 LOG.info(Thread.currentThread().getName() + " exiting");
1180 private boolean containsMetaTableRegions() {
1181 return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1184 private boolean areAllUserRegionsOffline() {
1185 if (getNumberOfOnlineRegions() > 2) return false;
1186 boolean allUserRegionsOffline = true;
1187 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1188 if (!e.getValue().getRegionInfo().isMetaTable()) {
1189 allUserRegionsOffline = false;
1190 break;
1193 return allUserRegionsOffline;
1197 * @return Current write count for all online regions.
1199 private long getWriteRequestCount() {
1200 long writeCount = 0;
1201 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1202 writeCount += e.getValue().getWriteRequestsCount();
1204 return writeCount;
1207 @VisibleForTesting
1208 protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1209 throws IOException {
1210 RegionServerStatusService.BlockingInterface rss = rssStub;
1211 if (rss == null) {
1212 // the current server could be stopping.
1213 return;
1215 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1216 try {
1217 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1218 ServerName sn = ServerName.parseVersionedServerName(
1219 this.serverName.getVersionedBytes());
1220 request.setServer(ProtobufUtil.toServerName(sn));
1221 request.setLoad(sl);
1222 rss.regionServerReport(null, request.build());
1223 } catch (ServiceException se) {
1224 IOException ioe = ProtobufUtil.getRemoteException(se);
1225 if (ioe instanceof YouAreDeadException) {
1226 // This will be caught and handled as a fatal error in run()
1227 throw ioe;
1229 if (rssStub == rss) {
1230 rssStub = null;
1232 // Couldn't connect to the master, get location from zk and reconnect
1233 // Method blocks until new master is found or we are stopped
1234 createRegionServerStatusStub(true);
1238 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1239 throws IOException {
1240 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1241 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1242 // the wrapper to compute those numbers in one place.
1243 // In the long term most of these should be moved off of ServerLoad and the heart beat.
1244 // Instead they should be stored in an HBase table so that external visibility into HBase is
1245 // improved; Additionally the load balancer will be able to take advantage of a more complete
1246 // history.
1247 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1248 Collection<Region> regions = getOnlineRegionsLocalContext();
1249 long usedMemory = -1L;
1250 long maxMemory = -1L;
1251 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1252 if (usage != null) {
1253 usedMemory = usage.getUsed();
1254 maxMemory = usage.getMax();
1257 ClusterStatusProtos.ServerLoad.Builder serverLoad =
1258 ClusterStatusProtos.ServerLoad.newBuilder();
1259 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1260 serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1261 serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1262 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1263 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1264 Builder coprocessorBuilder = Coprocessor.newBuilder();
1265 for (String coprocessor : coprocessors) {
1266 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1268 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1269 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1270 for (Region region : regions) {
1271 if (region.getCoprocessorHost() != null) {
1272 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1273 Iterator<String> iterator = regionCoprocessors.iterator();
1274 while (iterator.hasNext()) {
1275 serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1278 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1279 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1280 .getCoprocessors()) {
1281 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1284 serverLoad.setReportStartTime(reportStartTime);
1285 serverLoad.setReportEndTime(reportEndTime);
1286 if (this.infoServer != null) {
1287 serverLoad.setInfoServerPort(this.infoServer.getPort());
1288 } else {
1289 serverLoad.setInfoServerPort(-1);
1292 // for the replicationLoad purpose. Only need to get from one service
1293 // either source or sink will get the same info
1294 ReplicationSourceService rsources = getReplicationSourceService();
1296 if (rsources != null) {
1297 // always refresh first to get the latest value
1298 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1299 if (rLoad != null) {
1300 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1301 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1302 serverLoad.addReplLoadSource(rLS);
1307 return serverLoad.build();
1310 String getOnlineRegionsAsPrintableString() {
1311 StringBuilder sb = new StringBuilder();
1312 for (Region r: this.onlineRegions.values()) {
1313 if (sb.length() > 0) sb.append(", ");
1314 sb.append(r.getRegionInfo().getEncodedName());
1316 return sb.toString();
1320 * Wait on regions close.
1322 private void waitOnAllRegionsToClose(final boolean abort) {
1323 // Wait till all regions are closed before going out.
1324 int lastCount = -1;
1325 long previousLogTime = 0;
1326 Set<String> closedRegions = new HashSet<>();
1327 boolean interrupted = false;
1328 try {
1329 while (!isOnlineRegionsEmpty()) {
1330 int count = getNumberOfOnlineRegions();
1331 // Only print a message if the count of regions has changed.
1332 if (count != lastCount) {
1333 // Log every second at most
1334 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1335 previousLogTime = System.currentTimeMillis();
1336 lastCount = count;
1337 LOG.info("Waiting on " + count + " regions to close");
1338 // Only print out regions still closing if a small number else will
1339 // swamp the log.
1340 if (count < 10 && LOG.isDebugEnabled()) {
1341 LOG.debug(this.onlineRegions);
1345 // Ensure all user regions have been sent a close. Use this to
1346 // protect against the case where an open comes in after we start the
1347 // iterator of onlineRegions to close all user regions.
1348 for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1349 HRegionInfo hri = e.getValue().getRegionInfo();
1350 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1351 && !closedRegions.contains(hri.getEncodedName())) {
1352 closedRegions.add(hri.getEncodedName());
1353 // Don't update zk with this close transition; pass false.
1354 closeRegionIgnoreErrors(hri, abort);
1357 // No regions in RIT, we could stop waiting now.
1358 if (this.regionsInTransitionInRS.isEmpty()) {
1359 if (!isOnlineRegionsEmpty()) {
1360 LOG.info("We were exiting though online regions are not empty," +
1361 " because some regions failed closing");
1363 break;
1365 if (sleep(200)) {
1366 interrupted = true;
1369 } finally {
1370 if (interrupted) {
1371 Thread.currentThread().interrupt();
1376 private boolean sleep(long millis) {
1377 boolean interrupted = false;
1378 try {
1379 Thread.sleep(millis);
1380 } catch (InterruptedException e) {
1381 LOG.warn("Interrupted while sleeping");
1382 interrupted = true;
1384 return interrupted;
1387 private void shutdownWAL(final boolean close) {
1388 if (this.walFactory != null) {
1389 try {
1390 if (close) {
1391 walFactory.close();
1392 } else {
1393 walFactory.shutdown();
1395 } catch (Throwable e) {
1396 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1397 LOG.error("Shutdown / close of WAL failed: " + e);
1398 LOG.debug("Shutdown / close exception details:", e);
1404 * Run init. Sets up wal and starts up all server threads.
1406 * @param c Extra configuration.
1408 protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1409 throws IOException {
1410 try {
1411 boolean updateRootDir = false;
1412 for (NameStringPair e : c.getMapEntriesList()) {
1413 String key = e.getName();
1414 // The hostname the master sees us as.
1415 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1416 String hostnameFromMasterPOV = e.getValue();
1417 this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1418 rpcServices.isa.getPort(), this.startcode);
1419 if (shouldUseThisHostnameInstead() &&
1420 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1421 String msg = "Master passed us a different hostname to use; was=" +
1422 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1423 LOG.error(msg);
1424 throw new IOException(msg);
1426 if (!shouldUseThisHostnameInstead() &&
1427 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1428 String msg = "Master passed us a different hostname to use; was=" +
1429 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1430 LOG.error(msg);
1432 continue;
1435 String value = e.getValue();
1436 if (key.equals(HConstants.HBASE_DIR)) {
1437 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1438 updateRootDir = true;
1442 if (LOG.isDebugEnabled()) {
1443 LOG.debug("Config from master: " + key + "=" + value);
1445 this.conf.set(key, value);
1447 // Set our ephemeral znode up in zookeeper now we have a name.
1448 createMyEphemeralNode();
1450 if (updateRootDir) {
1451 // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1452 initializeFileSystem();
1455 // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1456 // config param for task trackers, but we can piggyback off of it.
1457 if (this.conf.get("mapreduce.task.attempt.id") == null) {
1458 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1459 this.serverName.toString());
1462 // Save it in a file, this will allow to see if we crash
1463 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1465 this.walFactory = setupWALAndReplication();
1466 // Init in here rather than in constructor after thread name has been set
1467 this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1468 this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1469 // Now that we have a metrics source, start the pause monitor
1470 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1471 pauseMonitor.start();
1473 startServiceThreads();
1474 startHeapMemoryManager();
1475 // Call it after starting HeapMemoryManager.
1476 initializeMemStoreChunkPool();
1477 LOG.info("Serving as " + this.serverName +
1478 ", RpcServer on " + rpcServices.isa +
1479 ", sessionid=0x" +
1480 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1482 // Wake up anyone waiting for this server to online
1483 synchronized (online) {
1484 online.set(true);
1485 online.notifyAll();
1487 } catch (Throwable e) {
1488 stop("Failed initialization");
1489 throw convertThrowableToIOE(cleanup(e, "Failed init"),
1490 "Region server startup failed");
1491 } finally {
1492 sleeper.skipSleepCycle();
1496 private void initializeMemStoreChunkPool() {
1497 if (MemStoreLAB.isEnabled(conf)) {
1498 // MSLAB is enabled. So initialize MemStoreChunkPool
1499 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1500 // it.
1501 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemstoreSize(conf);
1502 long globalMemStoreSize = pair.getFirst();
1503 boolean offheap = this.regionServerAccounting.isOffheap();
1504 // When off heap memstore in use, take full area for chunk pool.
1505 float poolSizePercentage = offheap ? 1.0F
1506 : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
1507 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
1508 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
1509 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
1510 MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage,
1511 initialCountPercentage, chunkSize, offheap);
1512 if (pool != null && this.hMemManager != null) {
1513 // Register with Heap Memory manager
1514 this.hMemManager.registerTuneObserver(pool);
1519 private void startHeapMemoryManager() {
1520 this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this,
1521 this.regionServerAccounting);
1522 if (this.hMemManager != null) {
1523 this.hMemManager.start(getChoreService());
1527 private void createMyEphemeralNode() throws KeeperException, IOException {
1528 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1529 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1530 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1531 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1532 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1533 getMyEphemeralNodePath(), data);
1536 private void deleteMyEphemeralNode() throws KeeperException {
1537 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1540 @Override
1541 public RegionServerAccounting getRegionServerAccounting() {
1542 return regionServerAccounting;
1546 * @param r Region to get RegionLoad for.
1547 * @param regionLoadBldr the RegionLoad.Builder, can be null
1548 * @param regionSpecifier the RegionSpecifier.Builder, can be null
1549 * @return RegionLoad instance.
1551 * @throws IOException
1553 RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1554 RegionSpecifier.Builder regionSpecifier) throws IOException {
1555 byte[] name = r.getRegionInfo().getRegionName();
1556 int stores = 0;
1557 int storefiles = 0;
1558 int storeUncompressedSizeMB = 0;
1559 int storefileSizeMB = 0;
1560 int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1561 int storefileIndexSizeMB = 0;
1562 int rootIndexSizeKB = 0;
1563 int totalStaticIndexSizeKB = 0;
1564 int totalStaticBloomSizeKB = 0;
1565 long totalCompactingKVs = 0;
1566 long currentCompactedKVs = 0;
1567 List<Store> storeList = r.getStores();
1568 stores += storeList.size();
1569 for (Store store : storeList) {
1570 storefiles += store.getStorefilesCount();
1571 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1572 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1573 storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1574 CompactionProgress progress = store.getCompactionProgress();
1575 if (progress != null) {
1576 totalCompactingKVs += progress.totalCompactingKVs;
1577 currentCompactedKVs += progress.currentCompactedKVs;
1579 rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1580 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1581 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1584 float dataLocality =
1585 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1586 if (regionLoadBldr == null) {
1587 regionLoadBldr = RegionLoad.newBuilder();
1589 if (regionSpecifier == null) {
1590 regionSpecifier = RegionSpecifier.newBuilder();
1592 regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1593 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1594 regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1595 .setStores(stores)
1596 .setStorefiles(storefiles)
1597 .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1598 .setStorefileSizeMB(storefileSizeMB)
1599 .setMemstoreSizeMB(memstoreSizeMB)
1600 .setStorefileIndexSizeMB(storefileIndexSizeMB)
1601 .setRootIndexSizeKB(rootIndexSizeKB)
1602 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1603 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1604 .setReadRequestsCount(r.getReadRequestsCount())
1605 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1606 .setWriteRequestsCount(r.getWriteRequestsCount())
1607 .setTotalCompactingKVs(totalCompactingKVs)
1608 .setCurrentCompactedKVs(currentCompactedKVs)
1609 .setDataLocality(dataLocality)
1610 .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1611 ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1613 return regionLoadBldr.build();
1617 * @param encodedRegionName
1618 * @return An instance of RegionLoad.
1620 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1621 Region r = onlineRegions.get(encodedRegionName);
1622 return r != null ? createRegionLoad(r, null, null) : null;
1626 * Inner class that runs on a long period checking if regions need compaction.
1628 private static class CompactionChecker extends ScheduledChore {
1629 private final HRegionServer instance;
1630 private final int majorCompactPriority;
1631 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1632 private long iteration = 0;
1634 CompactionChecker(final HRegionServer h, final int sleepTime,
1635 final Stoppable stopper) {
1636 super("CompactionChecker", stopper, sleepTime);
1637 this.instance = h;
1638 LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1640 /* MajorCompactPriority is configurable.
1641 * If not set, the compaction will use default priority.
1643 this.majorCompactPriority = this.instance.conf.
1644 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1645 DEFAULT_PRIORITY);
1648 @Override
1649 protected void chore() {
1650 for (Region r : this.instance.onlineRegions.values()) {
1651 if (r == null)
1652 continue;
1653 for (Store s : r.getStores()) {
1654 try {
1655 long multiplier = s.getCompactionCheckMultiplier();
1656 assert multiplier > 0;
1657 if (iteration % multiplier != 0) continue;
1658 if (s.needsCompaction()) {
1659 // Queue a compaction. Will recognize if major is needed.
1660 this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1661 + " requests compaction");
1662 } else if (s.isMajorCompaction()) {
1663 s.triggerMajorCompaction();
1664 if (majorCompactPriority == DEFAULT_PRIORITY
1665 || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1666 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1667 + " requests major compaction; use default priority", null);
1668 } else {
1669 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1670 + " requests major compaction; use configured priority",
1671 this.majorCompactPriority, null, null);
1674 } catch (IOException e) {
1675 LOG.warn("Failed major compaction check on " + r, e);
1679 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1683 static class PeriodicMemstoreFlusher extends ScheduledChore {
1684 final HRegionServer server;
1685 final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
1686 final static int MIN_DELAY_TIME = 0; // millisec
1687 public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1688 super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1689 this.server = server;
1692 @Override
1693 protected void chore() {
1694 final StringBuffer whyFlush = new StringBuffer();
1695 for (Region r : this.server.onlineRegions.values()) {
1696 if (r == null) continue;
1697 if (((HRegion)r).shouldFlush(whyFlush)) {
1698 FlushRequester requester = server.getFlushRequester();
1699 if (requester != null) {
1700 long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1701 LOG.info(getName() + " requesting flush of " +
1702 r.getRegionInfo().getRegionNameAsString() + " because " +
1703 whyFlush.toString() +
1704 " after random delay " + randomDelay + "ms");
1705 //Throttle the flushes by putting a delay. If we don't throttle, and there
1706 //is a balanced write-load on the regions in a table, we might end up
1707 //overwhelming the filesystem with too many flushes at once.
1708 requester.requestDelayedFlush(r, randomDelay, false);
1716 * Report the status of the server. A server is online once all the startup is
1717 * completed (setting up filesystem, starting service threads, etc.). This
1718 * method is designed mostly to be useful in tests.
1720 * @return true if online, false if not.
1722 public boolean isOnline() {
1723 return online.get();
1727 * Setup WAL log and replication if enabled.
1728 * Replication setup is done in here because it wants to be hooked up to WAL.
1729 * @return A WAL instance.
1730 * @throws IOException
1732 private WALFactory setupWALAndReplication() throws IOException {
1733 // TODO Replication make assumptions here based on the default filesystem impl
1734 final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1735 final String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1737 Path logDir = new Path(walRootDir, logName);
1738 if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
1739 if (this.walFs.exists(logDir)) {
1740 throw new RegionServerRunningException("Region server has already " +
1741 "created directory at " + this.serverName.toString());
1744 // Instantiate replication manager if replication enabled. Pass it the
1745 // log directories.
1746 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
1748 // listeners the wal factory will add to wals it creates.
1749 final List<WALActionsListener> listeners = new ArrayList<>();
1750 listeners.add(new MetricsWAL());
1751 if (this.replicationSourceHandler != null &&
1752 this.replicationSourceHandler.getWALActionsListener() != null) {
1753 // Replication handler is an implementation of WALActionsListener.
1754 listeners.add(this.replicationSourceHandler.getWALActionsListener());
1757 return new WALFactory(conf, listeners, serverName.toString());
1760 public MetricsRegionServer getRegionServerMetrics() {
1761 return this.metricsRegionServer;
1765 * @return Master address tracker instance.
1767 public MasterAddressTracker getMasterAddressTracker() {
1768 return this.masterAddressTracker;
1772 * Start maintenance Threads, Server, Worker and lease checker threads.
1773 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1774 * get an unhandled exception. We cannot set the handler on all threads.
1775 * Server's internal Listener thread is off limits. For Server, if an OOME, it
1776 * waits a while then retries. Meantime, a flush or a compaction that tries to
1777 * run should trigger same critical condition and the shutdown will run. On
1778 * its way out, this server will shut down Server. Leases are sort of
1779 * inbetween. It has an internal thread that while it inherits from Chore, it
1780 * keeps its own internal stop mechanism so needs to be stopped by this
1781 * hosting server. Worker logs the exception and exits.
1783 private void startServiceThreads() throws IOException {
1784 // Start executor services
1785 this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1786 conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1787 this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1788 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1789 this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
1790 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
1791 this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1792 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1793 this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1794 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1795 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1796 this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1797 conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1799 this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1800 "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1801 // Start the threads for compacted files discharger
1802 this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
1803 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
1804 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1805 this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1806 conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1807 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1810 Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1811 uncaughtExceptionHandler);
1812 this.cacheFlusher.start(uncaughtExceptionHandler);
1814 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1815 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1816 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1817 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1818 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1819 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1821 // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1822 // an unhandled exception, it will just exit.
1823 Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1824 uncaughtExceptionHandler);
1826 if (this.replicationSourceHandler == this.replicationSinkHandler &&
1827 this.replicationSourceHandler != null) {
1828 this.replicationSourceHandler.startReplicationService();
1829 } else {
1830 if (this.replicationSourceHandler != null) {
1831 this.replicationSourceHandler.startReplicationService();
1833 if (this.replicationSinkHandler != null) {
1834 this.replicationSinkHandler.startReplicationService();
1838 // Create the log splitting worker and start it
1839 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1840 // quite a while inside Connection layer. The worker won't be available for other
1841 // tasks even after current task is preempted after a split task times out.
1842 Configuration sinkConf = HBaseConfiguration.create(conf);
1843 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1844 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1845 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1846 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1847 sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1848 this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1849 splitLogWorker.start();
1853 * Puts up the webui.
1854 * @return Returns final port -- maybe different from what we started with.
1855 * @throws IOException
1857 private int putUpWebUI() throws IOException {
1858 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1859 HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1860 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1862 if(this instanceof HMaster) {
1863 port = conf.getInt(HConstants.MASTER_INFO_PORT,
1864 HConstants.DEFAULT_MASTER_INFOPORT);
1865 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1867 // -1 is for disabling info server
1868 if (port < 0) return port;
1870 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1871 String msg =
1872 "Failed to start http info server. Address " + addr
1873 + " does not belong to this host. Correct configuration parameter: "
1874 + "hbase.regionserver.info.bindAddress";
1875 LOG.error(msg);
1876 throw new IOException(msg);
1878 // check if auto port bind enabled
1879 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1880 false);
1881 while (true) {
1882 try {
1883 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1884 infoServer.addServlet("dump", "/dump", getDumpServlet());
1885 configureInfoServer();
1886 this.infoServer.start();
1887 break;
1888 } catch (BindException e) {
1889 if (!auto) {
1890 // auto bind disabled throw BindException
1891 LOG.error("Failed binding http info server to port: " + port);
1892 throw e;
1894 // auto bind enabled, try to use another port
1895 LOG.info("Failed binding http info server to port: " + port);
1896 port++;
1899 port = this.infoServer.getPort();
1900 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1901 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1902 HConstants.DEFAULT_MASTER_INFOPORT);
1903 conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1904 conf.setInt(HConstants.MASTER_INFO_PORT, port);
1905 return port;
1909 * Verify that server is healthy
1911 private boolean isHealthy() {
1912 if (!fsOk) {
1913 // File system problem
1914 return false;
1916 // Verify that all threads are alive
1917 if (!(leases.isAlive()
1918 && cacheFlusher.isAlive() && walRoller.isAlive()
1919 && this.compactionChecker.isScheduled()
1920 && this.periodicFlusher.isScheduled())) {
1921 stop("One or more threads are no longer alive -- stop");
1922 return false;
1924 return true;
1927 private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1929 @Override
1930 public List<WAL> getWALs() throws IOException {
1931 return walFactory.getWALs();
1934 @Override
1935 public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1936 WAL wal;
1937 // _ROOT_ and hbase:meta regions have separate WAL.
1938 if (regionInfo != null && regionInfo.isMetaTable()
1939 && regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1940 wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1941 } else if (regionInfo == null) {
1942 wal = walFactory.getWAL(UNSPECIFIED_REGION, null);
1943 } else {
1944 byte[] namespace = regionInfo.getTable().getNamespace();
1945 wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace);
1947 walRoller.addWAL(wal);
1948 return wal;
1951 @Override
1952 public Connection getConnection() {
1953 return getClusterConnection();
1956 @Override
1957 public ClusterConnection getClusterConnection() {
1958 return this.clusterConnection;
1961 @Override
1962 public MetaTableLocator getMetaTableLocator() {
1963 return this.metaTableLocator;
1966 @Override
1967 public void stop(final String msg) {
1968 if (!this.stopped) {
1969 LOG.info("***** STOPPING region server '" + this + "' *****");
1970 try {
1971 if (this.rsHost != null) {
1972 this.rsHost.preStop(msg);
1974 this.stopped = true;
1975 LOG.info("STOPPED: " + msg);
1976 // Wakes run() if it is sleeping
1977 sleeper.skipSleepCycle();
1978 } catch (IOException exp) {
1979 LOG.warn("The region server did not stop", exp);
1984 public void waitForServerOnline(){
1985 while (!isStopped() && !isOnline()) {
1986 synchronized (online) {
1987 try {
1988 online.wait(msgInterval);
1989 } catch (InterruptedException ie) {
1990 Thread.currentThread().interrupt();
1991 break;
1997 @Override
1998 public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
1999 postOpenDeployTasks(new PostOpenDeployContext(r, -1));
2002 @Override
2003 public void postOpenDeployTasks(final PostOpenDeployContext context)
2004 throws KeeperException, IOException {
2005 Region r = context.getRegion();
2006 long masterSystemTime = context.getMasterSystemTime();
2007 Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
2008 rpcServices.checkOpen();
2009 LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
2010 // Do checks to see if we need to compact (references or too many files)
2011 for (Store s : r.getStores()) {
2012 if (s.hasReferences() || s.needsCompaction()) {
2013 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2016 long openSeqNum = r.getOpenSeqNum();
2017 if (openSeqNum == HConstants.NO_SEQNUM) {
2018 // If we opened a region, we should have read some sequence number from it.
2019 LOG.error("No sequence number found when opening " +
2020 r.getRegionInfo().getRegionNameAsString());
2021 openSeqNum = 0;
2024 // Update flushed sequence id of a recovering region in ZK
2025 updateRecoveringRegionLastFlushedSequenceId(r);
2027 // Notify master
2028 if (!reportRegionStateTransition(new RegionStateTransitionContext(
2029 TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
2030 throw new IOException("Failed to report opened region to master: "
2031 + r.getRegionInfo().getRegionNameAsString());
2034 triggerFlushInPrimaryRegion((HRegion)r);
2036 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2039 @Override
2040 public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
2041 return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
2044 @Override
2045 public boolean reportRegionStateTransition(
2046 TransitionCode code, long openSeqNum, HRegionInfo... hris) {
2047 return reportRegionStateTransition(
2048 new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
2051 @Override
2052 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2053 TransitionCode code = context.getCode();
2054 long openSeqNum = context.getOpenSeqNum();
2055 long masterSystemTime = context.getMasterSystemTime();
2056 HRegionInfo[] hris = context.getHris();
2058 if (TEST_SKIP_REPORTING_TRANSITION) {
2059 // This is for testing only in case there is no master
2060 // to handle the region transition report at all.
2061 if (code == TransitionCode.OPENED) {
2062 Preconditions.checkArgument(hris != null && hris.length == 1);
2063 if (hris[0].isMetaRegion()) {
2064 try {
2065 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName,
2066 hris[0].getReplicaId(),State.OPEN);
2067 } catch (KeeperException e) {
2068 LOG.info("Failed to update meta location", e);
2069 return false;
2071 } else {
2072 try {
2073 MetaTableAccessor.updateRegionLocation(clusterConnection,
2074 hris[0], serverName, openSeqNum, masterSystemTime);
2075 } catch (IOException e) {
2076 LOG.info("Failed to update meta", e);
2077 return false;
2081 return true;
2084 ReportRegionStateTransitionRequest.Builder builder =
2085 ReportRegionStateTransitionRequest.newBuilder();
2086 builder.setServer(ProtobufUtil.toServerName(serverName));
2087 RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2088 transition.setTransitionCode(code);
2089 if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2090 transition.setOpenSeqNum(openSeqNum);
2092 for (HRegionInfo hri: hris) {
2093 transition.addRegionInfo(HRegionInfo.convert(hri));
2095 ReportRegionStateTransitionRequest request = builder.build();
2096 while (keepLooping()) {
2097 RegionServerStatusService.BlockingInterface rss = rssStub;
2098 try {
2099 if (rss == null) {
2100 createRegionServerStatusStub();
2101 continue;
2103 ReportRegionStateTransitionResponse response =
2104 rss.reportRegionStateTransition(null, request);
2105 if (response.hasErrorMessage()) {
2106 LOG.info("Failed to transition " + hris[0]
2107 + " to " + code + ": " + response.getErrorMessage());
2108 return false;
2110 return true;
2111 } catch (ServiceException se) {
2112 IOException ioe = ProtobufUtil.getRemoteException(se);
2113 LOG.info("Failed to report region transition, will retry", ioe);
2114 if (rssStub == rss) {
2115 rssStub = null;
2119 return false;
2122 @Override
2123 public long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow) {
2124 NonceGenerator ng = clusterConnection.getNonceGenerator();
2125 final long nonceGroup = ng.getNonceGroup();
2126 final long nonce = ng.newNonce();
2127 long procId = -1;
2128 SplitTableRegionRequest request =
2129 RequestConverter.buildSplitTableRegionRequest(regionInfo, splitRow, nonceGroup, nonce);
2131 while (keepLooping()) {
2132 RegionServerStatusService.BlockingInterface rss = rssStub;
2133 try {
2134 if (rss == null) {
2135 createRegionServerStatusStub();
2136 continue;
2138 SplitTableRegionResponse response = rss.splitRegion(null, request);
2140 //TODO: should we limit the retry number before quitting?
2141 if (response == null || (procId = response.getProcId()) == -1) {
2142 LOG.warn("Failed to split " + regionInfo + " retrying...");
2143 continue;
2146 break;
2147 } catch (ServiceException se) {
2148 // TODO: retry or just fail
2149 IOException ioe = ProtobufUtil.getRemoteException(se);
2150 LOG.info("Failed to split region, will retry", ioe);
2151 if (rssStub == rss) {
2152 rssStub = null;
2156 return procId;
2159 @Override
2160 public boolean isProcedureFinished(final long procId) throws IOException {
2161 GetProcedureResultRequest request =
2162 GetProcedureResultRequest.newBuilder().setProcId(procId).build();
2164 while (keepLooping()) {
2165 RegionServerStatusService.BlockingInterface rss = rssStub;
2166 try {
2167 if (rss == null) {
2168 createRegionServerStatusStub();
2169 continue;
2171 // TODO: find a way to get proc result
2172 GetProcedureResultResponse response = rss.getProcedureResult(null, request);
2174 if (response == null) {
2175 LOG.warn("Failed to get procedure (id=" + procId + ") status.");
2176 return false;
2177 } else if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2178 return false;
2179 } else if (response.hasException()) {
2180 // Procedure failed.
2181 throw ForeignExceptionUtil.toIOException(response.getException());
2183 // Procedure completes successfully
2184 break;
2185 } catch (ServiceException se) {
2186 // TODO: retry or just fail
2187 IOException ioe = ProtobufUtil.getRemoteException(se);
2188 LOG.warn("Failed to get split region procedure result. Retrying", ioe);
2189 if (rssStub == rss) {
2190 rssStub = null;
2194 return true;
2198 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2199 * block this thread. See RegionReplicaFlushHandler for details.
2201 void triggerFlushInPrimaryRegion(final HRegion region) {
2202 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2203 return;
2205 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2206 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2207 region.conf)) {
2208 region.setReadsEnabled(true);
2209 return;
2212 region.setReadsEnabled(false); // disable reads before marking the region as opened.
2213 // RegionReplicaFlushHandler might reset this.
2215 // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2216 this.service.submit(
2217 new RegionReplicaFlushHandler(this, clusterConnection,
2218 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2221 @Override
2222 public RpcServerInterface getRpcServer() {
2223 return rpcServices.rpcServer;
2226 @VisibleForTesting
2227 public RSRpcServices getRSRpcServices() {
2228 return rpcServices;
2232 * Cause the server to exit without closing the regions it is serving, the log
2233 * it is using and without notifying the master. Used unit testing and on
2234 * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2236 * @param reason
2237 * the reason we are aborting
2238 * @param cause
2239 * the exception that caused the abort, or null
2241 @Override
2242 public void abort(String reason, Throwable cause) {
2243 String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2244 if (cause != null) {
2245 LOG.fatal(msg, cause);
2246 } else {
2247 LOG.fatal(msg);
2249 this.abortRequested = true;
2250 // HBASE-4014: show list of coprocessors that were loaded to help debug
2251 // regionserver crashes.Note that we're implicitly using
2252 // java.util.HashSet's toString() method to print the coprocessor names.
2253 LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2254 CoprocessorHost.getLoadedCoprocessors());
2255 // Try and dump metrics if abort -- might give clue as to how fatal came about....
2256 try {
2257 LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2258 } catch (MalformedObjectNameException | IOException e) {
2259 LOG.warn("Failed dumping metrics", e);
2262 // Do our best to report our abort to the master, but this may not work
2263 try {
2264 if (cause != null) {
2265 msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2267 // Report to the master but only if we have already registered with the master.
2268 if (rssStub != null && this.serverName != null) {
2269 ReportRSFatalErrorRequest.Builder builder =
2270 ReportRSFatalErrorRequest.newBuilder();
2271 ServerName sn =
2272 ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2273 builder.setServer(ProtobufUtil.toServerName(sn));
2274 builder.setErrorMessage(msg);
2275 rssStub.reportRSFatalError(null, builder.build());
2277 } catch (Throwable t) {
2278 LOG.warn("Unable to report fatal error to master", t);
2280 stop(reason);
2284 * @see HRegionServer#abort(String, Throwable)
2286 public void abort(String reason) {
2287 abort(reason, null);
2290 @Override
2291 public boolean isAborted() {
2292 return this.abortRequested;
2296 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2297 * logs but it does close socket in case want to bring up server on old
2298 * hostname+port immediately.
2300 @VisibleForTesting
2301 protected void kill() {
2302 this.killed = true;
2303 abort("Simulated kill");
2307 * Called on stop/abort before closing the cluster connection and meta locator.
2309 protected void sendShutdownInterrupt() {
2313 * Wait on all threads to finish. Presumption is that all closes and stops
2314 * have already been called.
2316 protected void stopServiceThreads() {
2317 // clean up the scheduled chores
2318 if (this.choreService != null) choreService.shutdown();
2319 if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2320 if (this.compactionChecker != null) compactionChecker.cancel(true);
2321 if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2322 if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2323 if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2324 if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2326 if (this.cacheFlusher != null) {
2327 this.cacheFlusher.join();
2330 if (this.spanReceiverHost != null) {
2331 this.spanReceiverHost.closeReceivers();
2333 if (this.walRoller != null) {
2334 this.walRoller.close();
2336 if (this.compactSplitThread != null) {
2337 this.compactSplitThread.join();
2339 if (this.service != null) this.service.shutdown();
2340 if (this.replicationSourceHandler != null &&
2341 this.replicationSourceHandler == this.replicationSinkHandler) {
2342 this.replicationSourceHandler.stopReplicationService();
2343 } else {
2344 if (this.replicationSourceHandler != null) {
2345 this.replicationSourceHandler.stopReplicationService();
2347 if (this.replicationSinkHandler != null) {
2348 this.replicationSinkHandler.stopReplicationService();
2354 * @return Return the object that implements the replication
2355 * source service.
2357 @VisibleForTesting
2358 public ReplicationSourceService getReplicationSourceService() {
2359 return replicationSourceHandler;
2363 * @return Return the object that implements the replication
2364 * sink service.
2366 ReplicationSinkService getReplicationSinkService() {
2367 return replicationSinkHandler;
2371 * Get the current master from ZooKeeper and open the RPC connection to it.
2372 * To get a fresh connection, the current rssStub must be null.
2373 * Method will block until a master is available. You can break from this
2374 * block by requesting the server stop.
2376 * @return master + port, or null if server has been stopped
2378 @VisibleForTesting
2379 protected synchronized ServerName createRegionServerStatusStub() {
2380 // Create RS stub without refreshing the master node from ZK, use cached data
2381 return createRegionServerStatusStub(false);
2385 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2386 * connection, the current rssStub must be null. Method will block until a master is available.
2387 * You can break from this block by requesting the server stop.
2388 * @param refresh If true then master address will be read from ZK, otherwise use cached data
2389 * @return master + port, or null if server has been stopped
2391 @VisibleForTesting
2392 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2393 if (rssStub != null) {
2394 return masterAddressTracker.getMasterAddress();
2396 ServerName sn = null;
2397 long previousLogTime = 0;
2398 RegionServerStatusService.BlockingInterface intRssStub = null;
2399 LockService.BlockingInterface intLockStub = null;
2400 boolean interrupted = false;
2401 try {
2402 while (keepLooping()) {
2403 sn = this.masterAddressTracker.getMasterAddress(refresh);
2404 if (sn == null) {
2405 if (!keepLooping()) {
2406 // give up with no connection.
2407 LOG.debug("No master found and cluster is stopped; bailing out");
2408 return null;
2410 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2411 LOG.debug("No master found; retry");
2412 previousLogTime = System.currentTimeMillis();
2414 refresh = true; // let's try pull it from ZK directly
2415 if (sleep(200)) {
2416 interrupted = true;
2418 continue;
2421 // If we are on the active master, use the shortcut
2422 if (this instanceof HMaster && sn.equals(getServerName())) {
2423 intRssStub = ((HMaster)this).getMasterRpcServices();
2424 intLockStub = ((HMaster)this).getMasterRpcServices();
2425 break;
2427 try {
2428 BlockingRpcChannel channel =
2429 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2430 shortOperationTimeout);
2431 intRssStub = RegionServerStatusService.newBlockingStub(channel);
2432 intLockStub = LockService.newBlockingStub(channel);
2433 break;
2434 } catch (IOException e) {
2435 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2436 e = e instanceof RemoteException ?
2437 ((RemoteException)e).unwrapRemoteException() : e;
2438 if (e instanceof ServerNotRunningYetException) {
2439 LOG.info("Master isn't available yet, retrying");
2440 } else {
2441 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2443 previousLogTime = System.currentTimeMillis();
2445 if (sleep(200)) {
2446 interrupted = true;
2450 } finally {
2451 if (interrupted) {
2452 Thread.currentThread().interrupt();
2455 this.rssStub = intRssStub;
2456 this.lockStub = intLockStub;
2457 return sn;
2461 * @return True if we should break loop because cluster is going down or
2462 * this server has been stopped or hdfs has gone bad.
2464 private boolean keepLooping() {
2465 return !this.stopped && isClusterUp();
2469 * Let the master know we're here Run initialization using parameters passed
2470 * us by the master.
2471 * @return A Map of key/value configurations we got from the Master else
2472 * null if we failed to register.
2473 * @throws IOException
2475 private RegionServerStartupResponse reportForDuty() throws IOException {
2476 ServerName masterServerName = createRegionServerStatusStub(true);
2477 if (masterServerName == null) return null;
2478 RegionServerStartupResponse result = null;
2479 try {
2480 rpcServices.requestCount.reset();
2481 rpcServices.rpcGetRequestCount.reset();
2482 rpcServices.rpcScanRequestCount.reset();
2483 rpcServices.rpcMultiRequestCount.reset();
2484 rpcServices.rpcMutateRequestCount.reset();
2485 LOG.info("reportForDuty to master=" + masterServerName + " with port="
2486 + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2487 long now = EnvironmentEdgeManager.currentTime();
2488 int port = rpcServices.isa.getPort();
2489 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2490 if (shouldUseThisHostnameInstead()) {
2491 request.setUseThisHostnameInstead(useThisHostnameInstead);
2493 request.setPort(port);
2494 request.setServerStartCode(this.startcode);
2495 request.setServerCurrentTime(now);
2496 result = this.rssStub.regionServerStartup(null, request.build());
2497 } catch (ServiceException se) {
2498 IOException ioe = ProtobufUtil.getRemoteException(se);
2499 if (ioe instanceof ClockOutOfSyncException) {
2500 LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2501 // Re-throw IOE will cause RS to abort
2502 throw ioe;
2503 } else if (ioe instanceof ServerNotRunningYetException) {
2504 LOG.debug("Master is not running yet");
2505 } else {
2506 LOG.warn("error telling master we are up", se);
2508 rssStub = null;
2510 return result;
2513 @Override
2514 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2515 try {
2516 GetLastFlushedSequenceIdRequest req =
2517 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2518 RegionServerStatusService.BlockingInterface rss = rssStub;
2519 if (rss == null) { // Try to connect one more time
2520 createRegionServerStatusStub();
2521 rss = rssStub;
2522 if (rss == null) {
2523 // Still no luck, we tried
2524 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2525 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2526 .build();
2529 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2530 return RegionStoreSequenceIds.newBuilder()
2531 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2532 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2533 } catch (ServiceException e) {
2534 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2535 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2536 .build();
2541 * Closes all regions. Called on our way out.
2542 * Assumes that its not possible for new regions to be added to onlineRegions
2543 * while this method runs.
2545 protected void closeAllRegions(final boolean abort) {
2546 closeUserRegions(abort);
2547 closeMetaTableRegions(abort);
2551 * Close meta region if we carry it
2552 * @param abort Whether we're running an abort.
2554 void closeMetaTableRegions(final boolean abort) {
2555 Region meta = null;
2556 this.lock.writeLock().lock();
2557 try {
2558 for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2559 HRegionInfo hri = e.getValue().getRegionInfo();
2560 if (hri.isMetaRegion()) {
2561 meta = e.getValue();
2563 if (meta != null) break;
2565 } finally {
2566 this.lock.writeLock().unlock();
2568 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2572 * Schedule closes on all user regions.
2573 * Should be safe calling multiple times because it wont' close regions
2574 * that are already closed or that are closing.
2575 * @param abort Whether we're running an abort.
2577 void closeUserRegions(final boolean abort) {
2578 this.lock.writeLock().lock();
2579 try {
2580 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2581 Region r = e.getValue();
2582 if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2583 // Don't update zk with this close transition; pass false.
2584 closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2587 } finally {
2588 this.lock.writeLock().unlock();
2592 /** @return the info server */
2593 public InfoServer getInfoServer() {
2594 return infoServer;
2598 * @return true if a stop has been requested.
2600 @Override
2601 public boolean isStopped() {
2602 return this.stopped;
2605 @Override
2606 public boolean isStopping() {
2607 return this.stopping;
2610 @Override
2611 public Map<String, Region> getRecoveringRegions() {
2612 return this.recoveringRegions;
2617 * @return the configuration
2619 @Override
2620 public Configuration getConfiguration() {
2621 return conf;
2624 /** @return the write lock for the server */
2625 ReentrantReadWriteLock.WriteLock getWriteLock() {
2626 return lock.writeLock();
2629 public int getNumberOfOnlineRegions() {
2630 return this.onlineRegions.size();
2633 boolean isOnlineRegionsEmpty() {
2634 return this.onlineRegions.isEmpty();
2638 * For tests, web ui and metrics.
2639 * This method will only work if HRegionServer is in the same JVM as client;
2640 * HRegion cannot be serialized to cross an rpc.
2642 public Collection<Region> getOnlineRegionsLocalContext() {
2643 Collection<Region> regions = this.onlineRegions.values();
2644 return Collections.unmodifiableCollection(regions);
2647 @Override
2648 public void addToOnlineRegions(Region region) {
2649 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2650 configurationManager.registerObserver(region);
2654 * @return A new Map of online regions sorted by region size with the first entry being the
2655 * biggest. If two regions are the same size, then the last one found wins; i.e. this method
2656 * may NOT return all regions.
2658 SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2659 // we'll sort the regions in reverse
2660 SortedMap<Long, Region> sortedRegions = new TreeMap<>(
2661 new Comparator<Long>() {
2662 @Override
2663 public int compare(Long a, Long b) {
2664 return -1 * a.compareTo(b);
2667 // Copy over all regions. Regions are sorted by size with biggest first.
2668 for (Region region : this.onlineRegions.values()) {
2669 sortedRegions.put(region.getMemstoreSize(), region);
2671 return sortedRegions;
2675 * @return time stamp in millis of when this region server was started
2677 public long getStartcode() {
2678 return this.startcode;
2681 /** @return reference to FlushRequester */
2682 @Override
2683 public FlushRequester getFlushRequester() {
2684 return this.cacheFlusher;
2688 * Get the top N most loaded regions this server is serving so we can tell the
2689 * master which regions it can reallocate if we're overloaded. TODO: actually
2690 * calculate which regions are most loaded. (Right now, we're just grabbing
2691 * the first N regions being served regardless of load.)
2693 protected HRegionInfo[] getMostLoadedRegions() {
2694 ArrayList<HRegionInfo> regions = new ArrayList<>();
2695 for (Region r : onlineRegions.values()) {
2696 if (!r.isAvailable()) {
2697 continue;
2699 if (regions.size() < numRegionsToReport) {
2700 regions.add(r.getRegionInfo());
2701 } else {
2702 break;
2705 return regions.toArray(new HRegionInfo[regions.size()]);
2708 @Override
2709 public Leases getLeases() {
2710 return leases;
2714 * @return Return the rootDir.
2716 protected Path getRootDir() {
2717 return rootDir;
2721 * @return Return the fs.
2723 @Override
2724 public FileSystem getFileSystem() {
2725 return fs;
2729 * @return Return the walRootDir.
2731 protected Path getWALRootDir() {
2732 return walRootDir;
2736 * @return Return the walFs.
2738 protected FileSystem getWALFileSystem() {
2739 return walFs;
2742 @Override
2743 public String toString() {
2744 return getServerName().toString();
2748 * Interval at which threads should run
2750 * @return the interval
2752 public int getThreadWakeFrequency() {
2753 return threadWakeFrequency;
2756 @Override
2757 public ZooKeeperWatcher getZooKeeper() {
2758 return zooKeeper;
2761 @Override
2762 public BaseCoordinatedStateManager getCoordinatedStateManager() {
2763 return csm;
2766 @Override
2767 public ServerName getServerName() {
2768 return serverName;
2771 @Override
2772 public CompactionRequestor getCompactionRequester() {
2773 return this.compactSplitThread;
2776 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2777 return this.rsHost;
2780 @Override
2781 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2782 return this.regionsInTransitionInRS;
2785 @Override
2786 public ExecutorService getExecutorService() {
2787 return service;
2790 @Override
2791 public ChoreService getChoreService() {
2792 return choreService;
2795 @Override
2796 public RegionServerQuotaManager getRegionServerQuotaManager() {
2797 return rsQuotaManager;
2801 // Main program and support routines
2805 * Load the replication service objects, if any
2807 static private void createNewReplicationInstance(Configuration conf,
2808 HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{
2810 if ((server instanceof HMaster) &&
2811 (!BaseLoadBalancer.userTablesOnMaster(conf))) {
2812 return;
2815 // read in the name of the source replication class from the config file.
2816 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2817 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2819 // read in the name of the sink replication class from the config file.
2820 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2821 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2823 // If both the sink and the source class names are the same, then instantiate
2824 // only one object.
2825 if (sourceClassname.equals(sinkClassname)) {
2826 server.replicationSourceHandler = (ReplicationSourceService)
2827 newReplicationInstance(sourceClassname,
2828 conf, server, walFs, walDir, oldWALDir);
2829 server.replicationSinkHandler = (ReplicationSinkService)
2830 server.replicationSourceHandler;
2831 } else {
2832 server.replicationSourceHandler = (ReplicationSourceService)
2833 newReplicationInstance(sourceClassname,
2834 conf, server, walFs, walDir, oldWALDir);
2835 server.replicationSinkHandler = (ReplicationSinkService)
2836 newReplicationInstance(sinkClassname,
2837 conf, server, walFs, walDir, oldWALDir);
2841 static private ReplicationService newReplicationInstance(String classname,
2842 Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2843 Path oldLogDir) throws IOException{
2845 Class<?> clazz = null;
2846 try {
2847 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2848 clazz = Class.forName(classname, true, classLoader);
2849 } catch (java.lang.ClassNotFoundException nfe) {
2850 throw new IOException("Could not find class for " + classname);
2853 // create an instance of the replication object.
2854 ReplicationService service = (ReplicationService)
2855 ReflectionUtils.newInstance(clazz, conf);
2856 service.initialize(server, walFs, logDir, oldLogDir);
2857 return service;
2861 * Utility for constructing an instance of the passed HRegionServer class.
2863 * @param regionServerClass
2864 * @param conf2
2865 * @return HRegionServer instance.
2867 public static HRegionServer constructRegionServer(
2868 Class<? extends HRegionServer> regionServerClass,
2869 final Configuration conf2, CoordinatedStateManager cp) {
2870 try {
2871 Constructor<? extends HRegionServer> c = regionServerClass
2872 .getConstructor(Configuration.class, CoordinatedStateManager.class);
2873 return c.newInstance(conf2, cp);
2874 } catch (Exception e) {
2875 throw new RuntimeException("Failed construction of " + "Regionserver: "
2876 + regionServerClass.toString(), e);
2881 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2883 public static void main(String[] args) throws Exception {
2884 LOG.info("***** STARTING service '" + HRegionServer.class.getSimpleName() + "' *****");
2885 VersionInfo.logVersion();
2886 Configuration conf = HBaseConfiguration.create();
2887 @SuppressWarnings("unchecked")
2888 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2889 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2891 new HRegionServerCommandLine(regionServerClass).doMain(args);
2895 * Gets the online regions of the specified table.
2896 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>.
2897 * Only returns <em>online</em> regions. If a region on this table has been
2898 * closed during a disable, etc., it will not be included in the returned list.
2899 * So, the returned list may not necessarily be ALL regions in this table, its
2900 * all the ONLINE regions in the table.
2901 * @param tableName
2902 * @return Online regions from <code>tableName</code>
2904 @Override
2905 public List<Region> getOnlineRegions(TableName tableName) {
2906 List<Region> tableRegions = new ArrayList<>();
2907 synchronized (this.onlineRegions) {
2908 for (Region region: this.onlineRegions.values()) {
2909 HRegionInfo regionInfo = region.getRegionInfo();
2910 if(regionInfo.getTable().equals(tableName)) {
2911 tableRegions.add(region);
2915 return tableRegions;
2918 @Override
2919 public List<Region> getOnlineRegions() {
2920 List<Region> allRegions = new ArrayList<>();
2921 synchronized (this.onlineRegions) {
2922 // Return a clone copy of the onlineRegions
2923 allRegions.addAll(onlineRegions.values());
2925 return allRegions;
2928 * Gets the online tables in this RS.
2929 * This method looks at the in-memory onlineRegions.
2930 * @return all the online tables in this RS
2932 @Override
2933 public Set<TableName> getOnlineTables() {
2934 Set<TableName> tables = new HashSet<>();
2935 synchronized (this.onlineRegions) {
2936 for (Region region: this.onlineRegions.values()) {
2937 tables.add(region.getTableDesc().getTableName());
2940 return tables;
2943 // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
2944 public String[] getRegionServerCoprocessors() {
2945 TreeSet<String> coprocessors = new TreeSet<>();
2946 try {
2947 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2948 } catch (IOException exception) {
2949 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2950 "skipping.");
2951 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2953 Collection<Region> regions = getOnlineRegionsLocalContext();
2954 for (Region region: regions) {
2955 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2956 try {
2957 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2958 } catch (IOException exception) {
2959 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2960 "; skipping.");
2961 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2964 coprocessors.addAll(rsHost.getCoprocessors());
2965 return coprocessors.toArray(new String[coprocessors.size()]);
2969 * Try to close the region, logs a warning on failure but continues.
2970 * @param region Region to close
2972 private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2973 try {
2974 if (!closeRegion(region.getEncodedName(), abort, null)) {
2975 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2976 " - ignoring and continuing");
2978 } catch (IOException e) {
2979 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2980 " - ignoring and continuing", e);
2985 * Close asynchronously a region, can be called from the master or internally by the regionserver
2986 * when stopping. If called from the master, the region will update the znode status.
2988 * <p>
2989 * If an opening was in progress, this method will cancel it, but will not start a new close. The
2990 * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2991 * </p>
2993 * <p>
2994 * If a close was in progress, this new request will be ignored, and an exception thrown.
2995 * </p>
2997 * @param encodedName Region to close
2998 * @param abort True if we are aborting
2999 * @return True if closed a region.
3000 * @throws NotServingRegionException if the region is not online
3002 protected boolean closeRegion(String encodedName, final boolean abort, final ServerName sn)
3003 throws NotServingRegionException {
3004 //Check for permissions to close.
3005 Region actualRegion = this.getFromOnlineRegions(encodedName);
3006 // Can be null if we're calling close on a region that's not online
3007 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3008 try {
3009 actualRegion.getCoprocessorHost().preClose(false);
3010 } catch (IOException exp) {
3011 LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3012 return false;
3016 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
3017 Boolean.FALSE);
3019 if (Boolean.TRUE.equals(previous)) {
3020 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
3021 "trying to OPEN. Cancelling OPENING.");
3022 if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
3023 // The replace failed. That should be an exceptional case, but theoretically it can happen.
3024 // We're going to try to do a standard close then.
3025 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
3026 " Doing a standard close now");
3027 return closeRegion(encodedName, abort, sn);
3029 // Let's get the region from the online region list again
3030 actualRegion = this.getFromOnlineRegions(encodedName);
3031 if (actualRegion == null) { // If already online, we still need to close it.
3032 LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3033 // The master deletes the znode when it receives this exception.
3034 throw new NotServingRegionException("The region " + encodedName +
3035 " was opening but not yet served. Opening is cancelled.");
3037 } else if (Boolean.FALSE.equals(previous)) {
3038 LOG.info("Received CLOSE for the region: " + encodedName +
3039 ", which we are already trying to CLOSE, but not completed yet");
3040 return true;
3043 if (actualRegion == null) {
3044 LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3045 this.regionsInTransitionInRS.remove(encodedName.getBytes());
3046 // The master deletes the znode when it receives this exception.
3047 throw new NotServingRegionException("The region " + encodedName +
3048 " is not online, and is not opening.");
3051 CloseRegionHandler crh;
3052 final HRegionInfo hri = actualRegion.getRegionInfo();
3053 if (hri.isMetaRegion()) {
3054 crh = new CloseMetaHandler(this, this, hri, abort);
3055 } else {
3056 crh = new CloseRegionHandler(this, this, hri, abort, sn);
3058 this.service.submit(crh);
3059 return true;
3063 * Close and offline the region for split or merge
3065 * @param regionEncodedName the name of the region(s) to close
3066 * @return true if closed the region successfully.
3067 * @throws IOException
3069 protected boolean closeAndOfflineRegionForSplitOrMerge(
3070 final List<String> regionEncodedName) throws IOException {
3071 for (int i = 0; i < regionEncodedName.size(); ++i) {
3072 Region regionToClose = this.getFromOnlineRegions(regionEncodedName.get(i));
3073 if (regionToClose != null) {
3074 Map<byte[], List<StoreFile>> hstoreFiles = null;
3075 Exception exceptionToThrow = null;
3076 try{
3077 hstoreFiles = ((HRegion)regionToClose).close(false);
3078 } catch (Exception e) {
3079 exceptionToThrow = e;
3081 if (exceptionToThrow == null && hstoreFiles == null) {
3082 // The region was closed by someone else
3083 exceptionToThrow =
3084 new IOException("Failed to close region: already closed by another thread");
3087 if (exceptionToThrow != null) {
3088 if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
3089 throw new IOException(exceptionToThrow);
3091 if (regionToClose.getTableDesc().hasSerialReplicationScope()) {
3092 // For serial replication, we need add a final barrier on this region. But the splitting
3093 // or merging may be reverted, so we should make sure if we reopen this region, the open
3094 // barrier is same as this final barrier
3095 long seq = regionToClose.getMaxFlushedSeqId();
3096 if (seq == HConstants.NO_SEQNUM) {
3097 // No edits in WAL for this region; get the sequence number when the region was opened.
3098 seq = regionToClose.getOpenSeqNum();
3099 if (seq == HConstants.NO_SEQNUM) {
3100 // This region has no data
3101 seq = 0;
3103 } else {
3104 seq++;
3106 Put finalBarrier = MetaTableAccessor.makeBarrierPut(
3107 Bytes.toBytes(regionEncodedName.get(i)),
3108 seq,
3109 regionToClose.getTableDesc().getTableName().getName());
3110 MetaTableAccessor.putToMetaTable(getConnection(), finalBarrier);
3112 // Offline the region
3113 this.removeFromOnlineRegions(regionToClose, null);
3116 return true;
3120 * @param regionName
3121 * @return HRegion for the passed binary <code>regionName</code> or null if
3122 * named region is not member of the online regions.
3124 public Region getOnlineRegion(final byte[] regionName) {
3125 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
3126 return this.onlineRegions.get(encodedRegionName);
3129 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
3130 return this.regionFavoredNodesMap.get(encodedRegionName);
3133 @Override
3134 public Region getFromOnlineRegions(final String encodedRegionName) {
3135 return this.onlineRegions.get(encodedRegionName);
3139 @Override
3140 public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
3141 Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3142 if (destination != null) {
3143 long closeSeqNum = r.getMaxFlushedSeqId();
3144 if (closeSeqNum == HConstants.NO_SEQNUM) {
3145 // No edits in WAL for this region; get the sequence number when the region was opened.
3146 closeSeqNum = r.getOpenSeqNum();
3147 if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
3149 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
3151 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3152 return toReturn != null;
3156 * Protected utility method for safely obtaining an HRegion handle.
3158 * @param regionName
3159 * Name of online {@link HRegion} to return
3160 * @return {@link HRegion} for <code>regionName</code>
3161 * @throws NotServingRegionException
3163 protected Region getRegion(final byte[] regionName)
3164 throws NotServingRegionException {
3165 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
3166 return getRegionByEncodedName(regionName, encodedRegionName);
3169 public Region getRegionByEncodedName(String encodedRegionName)
3170 throws NotServingRegionException {
3171 return getRegionByEncodedName(null, encodedRegionName);
3174 protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3175 throws NotServingRegionException {
3176 Region region = this.onlineRegions.get(encodedRegionName);
3177 if (region == null) {
3178 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3179 if (moveInfo != null) {
3180 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3182 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3183 String regionNameStr = regionName == null?
3184 encodedRegionName: Bytes.toStringBinary(regionName);
3185 if (isOpening != null && isOpening.booleanValue()) {
3186 throw new RegionOpeningException("Region " + regionNameStr +
3187 " is opening on " + this.serverName);
3189 throw new NotServingRegionException("Region " + regionNameStr +
3190 " is not online on " + this.serverName);
3192 return region;
3196 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3197 * IOE if it isn't already.
3199 * @param t Throwable
3201 * @param msg Message to log in error. Can be null.
3203 * @return Throwable converted to an IOE; methods can only let out IOEs.
3205 private Throwable cleanup(final Throwable t, final String msg) {
3206 // Don't log as error if NSRE; NSRE is 'normal' operation.
3207 if (t instanceof NotServingRegionException) {
3208 LOG.debug("NotServingRegionException; " + t.getMessage());
3209 return t;
3211 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3212 if (msg == null) {
3213 LOG.error("", e);
3214 } else {
3215 LOG.error(msg, e);
3217 if (!rpcServices.checkOOME(t)) {
3218 checkFileSystem();
3220 return t;
3224 * @param t
3226 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3228 * @return Make <code>t</code> an IOE if it isn't already.
3230 protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
3231 return (t instanceof IOException ? (IOException) t : msg == null
3232 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3236 * Checks to see if the file system is still accessible. If not, sets
3237 * abortRequested and stopRequested
3239 * @return false if file system is not available
3241 public boolean checkFileSystem() {
3242 if (this.fsOk && this.fs != null) {
3243 try {
3244 FSUtils.checkFileSystemAvailable(this.fs);
3245 } catch (IOException e) {
3246 abort("File System not available", e);
3247 this.fsOk = false;
3250 return this.fsOk;
3253 @Override
3254 public void updateRegionFavoredNodesMapping(String encodedRegionName,
3255 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3256 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3257 // Refer to the comment on the declaration of regionFavoredNodesMap on why
3258 // it is a map of region name to InetSocketAddress[]
3259 for (int i = 0; i < favoredNodes.size(); i++) {
3260 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3261 favoredNodes.get(i).getPort());
3263 regionFavoredNodesMap.put(encodedRegionName, addr);
3267 * Return the favored nodes for a region given its encoded name. Look at the
3268 * comment around {@link #regionFavoredNodesMap} on why it is InetSocketAddress[]
3269 * @param encodedRegionName
3270 * @return array of favored locations
3272 @Override
3273 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3274 return regionFavoredNodesMap.get(encodedRegionName);
3277 @Override
3278 public ServerNonceManager getNonceManager() {
3279 return this.nonceManager;
3282 private static class MovedRegionInfo {
3283 private final ServerName serverName;
3284 private final long seqNum;
3285 private final long ts;
3287 public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3288 this.serverName = serverName;
3289 this.seqNum = closeSeqNum;
3290 ts = EnvironmentEdgeManager.currentTime();
3293 public ServerName getServerName() {
3294 return serverName;
3297 public long getSeqNum() {
3298 return seqNum;
3301 public long getMoveTime() {
3302 return ts;
3306 // This map will contains all the regions that we closed for a move.
3307 // We add the time it was moved as we don't want to keep too old information
3308 protected Map<String, MovedRegionInfo> movedRegions =
3309 new ConcurrentHashMap<>(3000);
3311 // We need a timeout. If not there is a risk of giving a wrong information: this would double
3312 // the number of network calls instead of reducing them.
3313 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3315 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3316 if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3317 LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3318 return;
3320 LOG.info("Adding moved region record: "
3321 + encodedName + " to " + destination + " as of " + closeSeqNum);
3322 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3325 void removeFromMovedRegions(String encodedName) {
3326 movedRegions.remove(encodedName);
3329 private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3330 MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3332 long now = EnvironmentEdgeManager.currentTime();
3333 if (dest != null) {
3334 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3335 return dest;
3336 } else {
3337 movedRegions.remove(encodedRegionName);
3341 return null;
3345 * Remove the expired entries from the moved regions list.
3347 protected void cleanMovedRegions() {
3348 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3349 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3351 while (it.hasNext()){
3352 Map.Entry<String, MovedRegionInfo> e = it.next();
3353 if (e.getValue().getMoveTime() < cutOff) {
3354 it.remove();
3360 * Use this to allow tests to override and schedule more frequently.
3363 protected int movedRegionCleanerPeriod() {
3364 return TIMEOUT_REGION_MOVED;
3368 * Creates a Chore thread to clean the moved region cache.
3371 protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3372 private HRegionServer regionServer;
3373 Stoppable stoppable;
3375 private MovedRegionsCleaner(
3376 HRegionServer regionServer, Stoppable stoppable){
3377 super("MovedRegionsCleaner for region " + regionServer, stoppable,
3378 regionServer.movedRegionCleanerPeriod());
3379 this.regionServer = regionServer;
3380 this.stoppable = stoppable;
3383 static MovedRegionsCleaner create(HRegionServer rs){
3384 Stoppable stoppable = new Stoppable() {
3385 private volatile boolean isStopped = false;
3386 @Override public void stop(String why) { isStopped = true;}
3387 @Override public boolean isStopped() {return isStopped;}
3390 return new MovedRegionsCleaner(rs, stoppable);
3393 @Override
3394 protected void chore() {
3395 regionServer.cleanMovedRegions();
3398 @Override
3399 public void stop(String why) {
3400 stoppable.stop(why);
3403 @Override
3404 public boolean isStopped() {
3405 return stoppable.isStopped();
3409 private String getMyEphemeralNodePath() {
3410 return ZKUtil.joinZNode(this.zooKeeper.znodePaths.rsZNode, getServerName().toString());
3413 private boolean isHealthCheckerConfigured() {
3414 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3415 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3419 * @return the underlying {@link CompactSplitThread} for the servers
3421 public CompactSplitThread getCompactSplitThread() {
3422 return this.compactSplitThread;
3426 * A helper function to store the last flushed sequence Id with the previous failed RS for a
3427 * recovering region. The Id is used to skip wal edits which are flushed. Since the flushed
3428 * sequence id is only valid for each RS, we associate the Id with corresponding failed RS.
3429 * @throws KeeperException
3430 * @throws IOException
3432 private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3433 IOException {
3434 if (!r.isRecovering()) {
3435 // return immdiately for non-recovering regions
3436 return;
3439 HRegionInfo regionInfo = r.getRegionInfo();
3440 ZooKeeperWatcher zkw = getZooKeeper();
3441 String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3442 Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3443 long minSeqIdForLogReplay = -1;
3444 for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3445 if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3446 minSeqIdForLogReplay = storeSeqIdForReplay;
3450 try {
3451 long lastRecordedFlushedSequenceId = -1;
3452 String nodePath = ZKUtil.joinZNode(this.zooKeeper.znodePaths.recoveringRegionsZNode,
3453 regionInfo.getEncodedName());
3454 // recovering-region level
3455 byte[] data;
3456 try {
3457 data = ZKUtil.getData(zkw, nodePath);
3458 } catch (InterruptedException e) {
3459 throw new InterruptedIOException();
3461 if (data != null) {
3462 lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3464 if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3465 ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3467 if (previousRSName != null) {
3468 // one level deeper for the failed RS
3469 nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3470 ZKUtil.setData(zkw, nodePath,
3471 ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3472 LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3473 " for " + previousRSName);
3474 } else {
3475 LOG.warn("Can't find failed region server for recovering region " +
3476 regionInfo.getEncodedName());
3478 } catch (NoNodeException ignore) {
3479 LOG.debug("Region " + regionInfo.getEncodedName() +
3480 " must have completed recovery because its recovery znode has been removed", ignore);
3485 * Return the last failed RS name under /hbase/recovering-regions/encodedRegionName
3486 * @param encodedRegionName
3487 * @throws KeeperException
3489 private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3490 String result = null;
3491 long maxZxid = 0;
3492 ZooKeeperWatcher zkw = this.getZooKeeper();
3493 String nodePath = ZKUtil.joinZNode(zkw.znodePaths.recoveringRegionsZNode, encodedRegionName);
3494 List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3495 if (failedServers == null || failedServers.isEmpty()) {
3496 return result;
3498 for (String failedServer : failedServers) {
3499 String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3500 Stat stat = new Stat();
3501 ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3502 if (maxZxid < stat.getCzxid()) {
3503 maxZxid = stat.getCzxid();
3504 result = failedServer;
3507 return result;
3510 public CoprocessorServiceResponse execRegionServerService(
3511 @SuppressWarnings("UnusedParameters") final RpcController controller,
3512 final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3513 try {
3514 ServerRpcController serviceController = new ServerRpcController();
3515 CoprocessorServiceCall call = serviceRequest.getCall();
3516 String serviceName = call.getServiceName();
3517 com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);
3518 if (service == null) {
3519 throw new UnknownProtocolException(null, "No registered coprocessor service found for " +
3520 serviceName);
3522 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
3523 service.getDescriptorForType();
3525 String methodName = call.getMethodName();
3526 com.google.protobuf.Descriptors.MethodDescriptor methodDesc =
3527 serviceDesc.findMethodByName(methodName);
3528 if (methodDesc == null) {
3529 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName +
3530 " called on service " + serviceName);
3533 com.google.protobuf.Message request =
3534 CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3535 final com.google.protobuf.Message.Builder responseBuilder =
3536 service.getResponsePrototype(methodDesc).newBuilderForType();
3537 service.callMethod(methodDesc, serviceController, request,
3538 new com.google.protobuf.RpcCallback<com.google.protobuf.Message>() {
3539 @Override
3540 public void run(com.google.protobuf.Message message) {
3541 if (message != null) {
3542 responseBuilder.mergeFrom(message);
3546 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3547 if (exception != null) {
3548 throw exception;
3550 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3551 } catch (IOException ie) {
3552 throw new ServiceException(ie);
3557 * @return The cache config instance used by the regionserver.
3559 public CacheConfig getCacheConfig() {
3560 return this.cacheConfig;
3564 * @return : Returns the ConfigurationManager object for testing purposes.
3566 protected ConfigurationManager getConfigurationManager() {
3567 return configurationManager;
3571 * @return Return table descriptors implementation.
3573 public TableDescriptors getTableDescriptors() {
3574 return this.tableDescriptors;
3578 * Reload the configuration from disk.
3580 public void updateConfiguration() {
3581 LOG.info("Reloading the configuration from disk.");
3582 // Reload the configuration from disk.
3583 conf.reloadConfiguration();
3584 configurationManager.notifyAllObservers(conf);
3587 @Override
3588 public double getCompactionPressure() {
3589 double max = 0;
3590 for (Region region : onlineRegions.values()) {
3591 for (Store store : region.getStores()) {
3592 double normCount = store.getCompactionPressure();
3593 if (normCount > max) {
3594 max = normCount;
3598 return max;
3601 @Override
3602 public HeapMemoryManager getHeapMemoryManager() {
3603 return hMemManager;
3607 * For testing
3608 * @return whether all wal roll request finished for this regionserver
3610 @VisibleForTesting
3611 public boolean walRollRequestFinished() {
3612 return this.walRoller.walRollFinished();
3615 @Override
3616 public ThroughputController getFlushThroughputController() {
3617 return flushThroughputController;
3620 @Override
3621 public double getFlushPressure() {
3622 if (getRegionServerAccounting() == null || cacheFlusher == null) {
3623 // return 0 during RS initialization
3624 return 0.0;
3626 return getRegionServerAccounting().getFlushPressure();
3629 @Override
3630 public void onConfigurationChange(Configuration newConf) {
3631 ThroughputController old = this.flushThroughputController;
3632 if (old != null) {
3633 old.stop("configuration change");
3635 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3638 @Override
3639 public MetricsRegionServer getMetrics() {
3640 return metricsRegionServer;
3643 @Override
3644 public SecureBulkLoadManager getSecureBulkLoadManager() {
3645 return this.secureBulkLoadManager;
3648 @Override
3649 public EntityLock regionLock(List<HRegionInfo> regionInfos, String description,
3650 Abortable abort) throws IOException {
3651 return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
3652 .regionLock(regionInfos, description, abort);