HBASE-26811 Secondary replica may be disabled for read forever (#4182)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / regionserver / HRegionServer.java
blobc56157e6a0e796af7afedd0b52078a350f337244
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
21 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
22 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
23 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
24 import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
26 import java.io.IOException;
27 import java.io.PrintWriter;
28 import java.lang.management.MemoryUsage;
29 import java.lang.reflect.Constructor;
30 import java.net.InetSocketAddress;
31 import java.time.Duration;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.Comparator;
36 import java.util.HashSet;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Objects;
42 import java.util.Optional;
43 import java.util.Set;
44 import java.util.SortedMap;
45 import java.util.Timer;
46 import java.util.TimerTask;
47 import java.util.TreeMap;
48 import java.util.TreeSet;
49 import java.util.concurrent.ConcurrentHashMap;
50 import java.util.concurrent.ConcurrentMap;
51 import java.util.concurrent.ConcurrentSkipListMap;
52 import java.util.concurrent.ThreadLocalRandom;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import java.util.concurrent.locks.ReentrantReadWriteLock;
56 import java.util.stream.Collectors;
57 import javax.management.MalformedObjectNameException;
58 import javax.servlet.http.HttpServlet;
59 import org.apache.commons.lang3.StringUtils;
60 import org.apache.hadoop.conf.Configuration;
61 import org.apache.hadoop.fs.FileSystem;
62 import org.apache.hadoop.fs.Path;
63 import org.apache.hadoop.hbase.Abortable;
64 import org.apache.hadoop.hbase.CacheEvictionStats;
65 import org.apache.hadoop.hbase.CallQueueTooBigException;
66 import org.apache.hadoop.hbase.ClockOutOfSyncException;
67 import org.apache.hadoop.hbase.DoNotRetryIOException;
68 import org.apache.hadoop.hbase.ExecutorStatusChore;
69 import org.apache.hadoop.hbase.HBaseConfiguration;
70 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
71 import org.apache.hadoop.hbase.HBaseServerBase;
72 import org.apache.hadoop.hbase.HConstants;
73 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
74 import org.apache.hadoop.hbase.HRegionLocation;
75 import org.apache.hadoop.hbase.HealthCheckChore;
76 import org.apache.hadoop.hbase.MetaTableAccessor;
77 import org.apache.hadoop.hbase.NotServingRegionException;
78 import org.apache.hadoop.hbase.PleaseHoldException;
79 import org.apache.hadoop.hbase.ScheduledChore;
80 import org.apache.hadoop.hbase.ServerName;
81 import org.apache.hadoop.hbase.Stoppable;
82 import org.apache.hadoop.hbase.TableName;
83 import org.apache.hadoop.hbase.YouAreDeadException;
84 import org.apache.hadoop.hbase.ZNodeClearer;
85 import org.apache.hadoop.hbase.client.ConnectionUtils;
86 import org.apache.hadoop.hbase.client.RegionInfo;
87 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
88 import org.apache.hadoop.hbase.client.locking.EntityLock;
89 import org.apache.hadoop.hbase.client.locking.LockServiceClient;
90 import org.apache.hadoop.hbase.conf.ConfigurationManager;
91 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
92 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
93 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
94 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
95 import org.apache.hadoop.hbase.executor.ExecutorType;
96 import org.apache.hadoop.hbase.http.InfoServer;
97 import org.apache.hadoop.hbase.io.hfile.BlockCache;
98 import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
99 import org.apache.hadoop.hbase.io.hfile.HFile;
100 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
101 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
102 import org.apache.hadoop.hbase.ipc.RpcClient;
103 import org.apache.hadoop.hbase.ipc.RpcServer;
104 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
105 import org.apache.hadoop.hbase.ipc.ServerRpcController;
106 import org.apache.hadoop.hbase.log.HBaseMarkers;
107 import org.apache.hadoop.hbase.mob.MobFileCache;
108 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
109 import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
110 import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
111 import org.apache.hadoop.hbase.net.Address;
112 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
113 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
114 import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
115 import org.apache.hadoop.hbase.quotas.QuotaUtil;
116 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
117 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
118 import org.apache.hadoop.hbase.quotas.RegionSize;
119 import org.apache.hadoop.hbase.quotas.RegionSizeStore;
120 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
121 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
122 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
123 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
124 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
125 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
126 import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
127 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
128 import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
129 import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
130 import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
131 import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
132 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
133 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
134 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
135 import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
136 import org.apache.hadoop.hbase.security.SecurityConstants;
137 import org.apache.hadoop.hbase.security.Superusers;
138 import org.apache.hadoop.hbase.security.User;
139 import org.apache.hadoop.hbase.security.UserProvider;
140 import org.apache.hadoop.hbase.util.Bytes;
141 import org.apache.hadoop.hbase.util.CompressionTest;
142 import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
143 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
144 import org.apache.hadoop.hbase.util.FSUtils;
145 import org.apache.hadoop.hbase.util.FutureUtils;
146 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
147 import org.apache.hadoop.hbase.util.Pair;
148 import org.apache.hadoop.hbase.util.RetryCounter;
149 import org.apache.hadoop.hbase.util.RetryCounterFactory;
150 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
151 import org.apache.hadoop.hbase.util.Threads;
152 import org.apache.hadoop.hbase.util.VersionInfo;
153 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
154 import org.apache.hadoop.hbase.wal.WAL;
155 import org.apache.hadoop.hbase.wal.WALFactory;
156 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
157 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
158 import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
159 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
160 import org.apache.hadoop.ipc.RemoteException;
161 import org.apache.hadoop.util.ReflectionUtils;
162 import org.apache.yetus.audience.InterfaceAudience;
163 import org.apache.zookeeper.KeeperException;
164 import org.slf4j.Logger;
165 import org.slf4j.LoggerFactory;
167 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
168 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
169 import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
170 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
171 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
172 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
173 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
174 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
175 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
176 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
177 import org.apache.hbase.thirdparty.com.google.protobuf.Service;
178 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
179 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
180 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
182 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
183 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
184 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
185 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
186 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
187 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
188 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
189 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
190 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
191 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
192 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
193 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
194 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
195 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
196 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
197 import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
198 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
199 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
200 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
201 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
202 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
203 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
204 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
205 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
206 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
207 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
208 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
209 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
210 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
211 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
212 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
215 * HRegionServer makes a set of HRegions available to clients. It checks in with
216 * the HMaster. There are many HRegionServers in a single HBase deployment.
218 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
219 @SuppressWarnings({ "deprecation"})
220 public class HRegionServer extends HBaseServerBase<RSRpcServices>
221 implements RegionServerServices, LastSequenceId {
223 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
226 * For testing only! Set to true to skip notifying region assignment to master .
228 @InterfaceAudience.Private
229 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
230 public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
233 * A map from RegionName to current action in progress. Boolean value indicates:
234 * true - if open region action in progress
235 * false - if close region action in progress
237 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
238 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
241 * Used to cache the open/close region procedures which already submitted.
242 * See {@link #submitRegionProcedure(long)}.
244 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
246 * Used to cache the open/close region procedures which already executed.
247 * See {@link #submitRegionProcedure(long)}.
249 private final Cache<Long, Long> executedRegionProcedures =
250 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
253 * Used to cache the moved-out regions
255 private final Cache<String, MovedRegionInfo> movedRegionInfoCache =
256 CacheBuilder.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(),
257 TimeUnit.MILLISECONDS).build();
259 private MemStoreFlusher cacheFlusher;
261 private HeapMemoryManager hMemManager;
263 // Replication services. If no replication, this handler will be null.
264 private ReplicationSourceService replicationSourceHandler;
265 private ReplicationSinkService replicationSinkHandler;
266 private boolean sameReplicationSourceAndSink;
268 // Compactions
269 private CompactSplit compactSplitThread;
272 * Map of regions currently being served by this region server. Key is the
273 * encoded region name. All access should be synchronized.
275 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
277 * Lock for gating access to {@link #onlineRegions}.
278 * TODO: If this map is gated by a lock, does it need to be a ConcurrentHashMap?
280 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
283 * Map of encoded region names to the DataNode locations they should be hosted on
284 * We store the value as Address since InetSocketAddress is required by the HDFS
285 * API (create() that takes favored nodes as hints for placing file blocks).
286 * We could have used ServerName here as the value class, but we'd need to
287 * convert it to InetSocketAddress at some point before the HDFS API call, and
288 * it seems a bit weird to store ServerName since ServerName refers to RegionServers
289 * and here we really mean DataNode locations. We don't store it as InetSocketAddress
290 * here because the conversion on demand from Address to InetSocketAddress will
291 * guarantee the resolution results will be fresh when we need it.
293 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
295 private LeaseManager leaseManager;
297 private volatile boolean dataFsOk;
299 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
300 // Default abort timeout is 1200 seconds for safe
301 private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
302 // Will run this task when abort timeout
303 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
305 // A state before we go into stopped state. At this stage we're closing user
306 // space regions.
307 private boolean stopping = false;
308 private volatile boolean killed = false;
310 private final int threadWakeFrequency;
312 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
313 private final int compactionCheckFrequency;
314 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
315 private final int flushCheckFrequency;
317 // Stub to do region server status calls against the master.
318 private volatile RegionServerStatusService.BlockingInterface rssStub;
319 private volatile LockService.BlockingInterface lockStub;
320 // RPC client. Used to make the stub above that does region server status checking.
321 private RpcClient rpcClient;
323 private UncaughtExceptionHandler uncaughtExceptionHandler;
325 private JvmPauseMonitor pauseMonitor;
327 private RSSnapshotVerifier rsSnapshotVerifier;
329 /** region server process name */
330 public static final String REGIONSERVER = "regionserver";
333 private MetricsRegionServer metricsRegionServer;
334 MetricsRegionServerWrapperImpl metricsRegionServerImpl;
337 * Check for compactions requests.
339 private ScheduledChore compactionChecker;
342 * Check for flushes
344 private ScheduledChore periodicFlusher;
346 private volatile WALFactory walFactory;
348 private LogRoller walRoller;
350 // A thread which calls reportProcedureDone
351 private RemoteProcedureResultReporter procedureResultReporter;
353 // flag set after we're done setting up server threads
354 final AtomicBoolean online = new AtomicBoolean(false);
356 // master address tracker
357 private final MasterAddressTracker masterAddressTracker;
359 // Log Splitting Worker
360 private SplitLogWorker splitLogWorker;
362 private final int shortOperationTimeout;
364 // Time to pause if master says 'please hold'
365 private final long retryPauseTime;
367 private final RegionServerAccounting regionServerAccounting;
369 private SlowLogTableOpsChore slowLogTableOpsChore = null;
371 // Block cache
372 private BlockCache blockCache;
373 // The cache for mob files
374 private MobFileCache mobFileCache;
376 /** The health check chore. */
377 private HealthCheckChore healthCheckChore;
379 /** The Executor status collect chore. */
380 private ExecutorStatusChore executorStatusChore;
382 /** The nonce manager chore. */
383 private ScheduledChore nonceManagerChore;
385 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
388 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
389 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
390 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
392 @Deprecated
393 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
394 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
395 "hbase.regionserver.hostname.disable.master.reversedns";
398 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
399 * Exception will be thrown if both are used.
401 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
402 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
403 "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
406 * Unique identifier for the cluster we are a part of.
408 private String clusterId;
410 // chore for refreshing store files for secondary regions
411 private StorefileRefresherChore storefileRefresher;
413 private volatile RegionServerCoprocessorHost rsHost;
415 private RegionServerProcedureManagerHost rspmHost;
417 private RegionServerRpcQuotaManager rsQuotaManager;
418 private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
421 * Nonce manager. Nonces are used to make operations like increment and append idempotent
422 * in the case where client doesn't receive the response from a successful operation and
423 * retries. We track the successful ops for some time via a nonce sent by client and handle
424 * duplicate operations (currently, by failing them; in future we might use MVCC to return
425 * result). Nonces are also recovered from WAL during, recovery; however, the caveats (from
426 * HBASE-3787) are:
427 * - WAL recovery is optimized, and under high load we won't read nearly nonce-timeout worth
428 * of past records. If we don't read the records, we don't read and recover the nonces.
429 * Some WALs within nonce-timeout at recovery may not even be present due to rolling/cleanup.
430 * - There's no WAL recovery during normal region move, so nonces will not be transfered.
431 * We can have separate additional "Nonce WAL". It will just contain bunch of numbers and
432 * won't be flushed on main path - because WAL itself also contains nonces, if we only flush
433 * it before memstore flush, for a given nonce we will either see it in the WAL (if it was
434 * never flushed to disk, it will be part of recovery), or we'll see it as part of the nonce
435 * log (or both occasionally, which doesn't matter). Nonce log file can be deleted after the
436 * latest nonce in it expired. It can also be recovered during move.
438 final ServerNonceManager nonceManager;
440 private BrokenStoreFileCleaner brokenStoreFileCleaner;
442 @InterfaceAudience.Private
443 CompactedHFilesDischarger compactedFileDischarger;
445 private volatile ThroughputController flushThroughputController;
447 private SecureBulkLoadManager secureBulkLoadManager;
449 private FileSystemUtilizationChore fsUtilizationChore;
451 private BootstrapNodeManager bootstrapNodeManager;
454 * True if this RegionServer is coming up in a cluster where there is no Master;
455 * means it needs to just come up and make do without a Master to talk to: e.g. in test or
456 * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
457 * purpose is as a Replication-stream sink; see HBASE-18846 for more.
458 * TODO: can this replace {@link #TEST_SKIP_REPORTING_TRANSITION} ?
460 private final boolean masterless;
461 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
463 /**regionserver codec list **/
464 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
466 // A timer to shutdown the process if abort takes too long
467 private Timer abortMonitor;
469 private RegionReplicationBufferManager regionReplicationBufferManager;
471 * Starts a HRegionServer at the default location.
472 * <p/>
473 * Don't start any services or managers in here in the Constructor.
474 * Defer till after we register with the Master as much as possible. See {@link #startServices}.
476 public HRegionServer(final Configuration conf) throws IOException {
477 super(conf, "RegionServer"); // thread name
478 try {
479 this.dataFsOk = true;
480 this.masterless = !clusterMode();
481 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
482 HFile.checkHFileVersion(this.conf);
483 checkCodecs(this.conf);
484 FSUtils.setupShortCircuitRead(this.conf);
486 // Disable usage of meta replicas in the regionserver
487 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
488 // Config'ed params
489 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
490 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
491 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
493 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
494 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
496 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
497 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
499 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
500 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
502 regionServerAccounting = new RegionServerAccounting(conf);
504 blockCache = BlockCacheFactory.createBlockCache(conf);
505 mobFileCache = new MobFileCache(conf);
507 rsSnapshotVerifier = new RSSnapshotVerifier(conf);
509 uncaughtExceptionHandler =
510 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
512 // If no master in cluster, skip trying to track one or look for a cluster status.
513 if (!this.masterless) {
514 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
515 masterAddressTracker.start();
516 } else {
517 masterAddressTracker = null;
519 this.rpcServices.start(zooKeeper);
520 } catch (Throwable t) {
521 // Make sure we log the exception. HRegionServer is often started via reflection and the
522 // cause of failed startup is lost.
523 LOG.error("Failed construction RegionServer", t);
524 throw t;
528 // HMaster should override this method to load the specific config for master
529 @Override
530 protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
531 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
532 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
533 if (!StringUtils.isBlank(hostname)) {
534 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " +
535 UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " +
536 UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " +
537 UNSAFE_RS_HOSTNAME_KEY + " is used";
538 throw new IOException(msg);
539 } else {
540 return rpcServices.getSocketAddress().getHostName();
542 } else {
543 return hostname;
547 @Override
548 protected void login(UserProvider user, String host) throws IOException {
549 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
550 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
553 @Override
554 protected String getProcessName() {
555 return REGIONSERVER;
558 @Override
559 protected boolean canCreateBaseZNode() {
560 return !clusterMode();
563 @Override
564 protected boolean canUpdateTableDescriptor() {
565 return false;
568 @Override
569 protected boolean cacheTableDescriptor() {
570 return false;
573 protected RSRpcServices createRpcServices() throws IOException {
574 return new RSRpcServices(this);
577 @Override
578 protected void configureInfoServer(InfoServer infoServer) {
579 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
580 infoServer.setAttribute(REGIONSERVER, this);
583 @Override
584 protected Class<? extends HttpServlet> getDumpServlet() {
585 return RSDumpServlet.class;
589 * Used by {@link RSDumpServlet} to generate debugging information.
591 public void dumpRowLocks(final PrintWriter out) {
592 StringBuilder sb = new StringBuilder();
593 for (HRegion region : getRegions()) {
594 if (region.getLockedRows().size() > 0) {
595 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
596 sb.setLength(0);
597 sb.append(region.getTableDescriptor().getTableName()).append(",")
598 .append(region.getRegionInfo().getEncodedName()).append(",");
599 sb.append(rowLockContext.toString());
600 out.println(sb);
606 @Override
607 public boolean registerService(Service instance) {
608 // No stacking of instances is allowed for a single executorService name
609 ServiceDescriptor serviceDesc = instance.getDescriptorForType();
610 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
611 if (coprocessorServiceHandlers.containsKey(serviceName)) {
612 LOG.error("Coprocessor executorService " + serviceName +
613 " already registered, rejecting request from " + instance);
614 return false;
617 coprocessorServiceHandlers.put(serviceName, instance);
618 if (LOG.isDebugEnabled()) {
619 LOG.debug(
620 "Registered regionserver coprocessor executorService: executorService=" + serviceName);
622 return true;
626 * Run test on configured codecs to make sure supporting libs are in place.
628 private static void checkCodecs(final Configuration c) throws IOException {
629 // check to see if the codec list is available:
630 String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null);
631 if (codecs == null) {
632 return;
634 for (String codec : codecs) {
635 if (!CompressionTest.testCompression(codec)) {
636 throw new IOException("Compression codec " + codec +
637 " not supported, aborting RS construction");
642 public String getClusterId() {
643 return this.clusterId;
647 * All initialization needed before we go register with Master.<br>
648 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
649 * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
651 private void preRegistrationInitialization() {
652 try {
653 initializeZooKeeper();
654 setupClusterConnection();
655 bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
656 regionReplicationBufferManager = new RegionReplicationBufferManager(this);
657 // Setup RPC client for master communication
658 this.rpcClient = asyncClusterConnection.getRpcClient();
659 } catch (Throwable t) {
660 // Call stop if error or process will stick around for ever since server
661 // puts up non-daemon threads.
662 this.rpcServices.stop();
663 abort("Initialization of RS failed. Hence aborting RS.", t);
668 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
669 * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
670 * <p>
671 * Finally open long-living server short-circuit connection.
673 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
674 justification="cluster Id znode read would give us correct response")
675 private void initializeZooKeeper() throws IOException, InterruptedException {
676 // Nothing to do in here if no Master in the mix.
677 if (this.masterless) {
678 return;
681 // Create the master address tracker, register with zk, and start it. Then
682 // block until a master is available. No point in starting up if no master
683 // running.
684 blockAndCheckIfStopped(this.masterAddressTracker);
686 // Wait on cluster being up. Master will set this flag up in zookeeper
687 // when ready.
688 blockAndCheckIfStopped(this.clusterStatusTracker);
690 // If we are HMaster then the cluster id should have already been set.
691 if (clusterId == null) {
692 // Retrieve clusterId
693 // Since cluster status is now up
694 // ID should have already been set by HMaster
695 try {
696 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
697 if (clusterId == null) {
698 this.abort("Cluster ID has not been set");
700 LOG.info("ClusterId : " + clusterId);
701 } catch (KeeperException e) {
702 this.abort("Failed to retrieve Cluster ID", e);
706 if (isStopped() || isAborted()) {
707 return; // No need for further initialization
710 // watch for snapshots and other procedures
711 try {
712 rspmHost = new RegionServerProcedureManagerHost();
713 rspmHost.loadProcedures(conf);
714 rspmHost.initialize(this);
715 } catch (KeeperException e) {
716 this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
721 * Utilty method to wait indefinitely on a znode availability while checking
722 * if the region server is shut down
723 * @param tracker znode tracker to use
724 * @throws IOException any IO exception, plus if the RS is stopped
725 * @throws InterruptedException if the waiting thread is interrupted
727 private void blockAndCheckIfStopped(ZKNodeTracker tracker)
728 throws IOException, InterruptedException {
729 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
730 if (this.stopped) {
731 throw new IOException("Received the shutdown message while waiting.");
737 * @return True if the cluster is up.
739 @Override
740 public boolean isClusterUp() {
741 return this.masterless ||
742 (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
746 * The HRegionServer sticks in this loop until closed.
748 @Override
749 public void run() {
750 if (isStopped()) {
751 LOG.info("Skipping run; stopped");
752 return;
754 try {
755 // Do pre-registration initializations; zookeeper, lease threads, etc.
756 preRegistrationInitialization();
757 } catch (Throwable e) {
758 abort("Fatal exception during initialization", e);
761 try {
762 if (!isStopped() && !isAborted()) {
763 ShutdownHook.install(conf, dataFs, this, Thread.currentThread());
764 // Initialize the RegionServerCoprocessorHost now that our ephemeral
765 // node was created, in case any coprocessors want to use ZooKeeper
766 this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
768 // Try and register with the Master; tell it we are here. Break if server is stopped or
769 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
770 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
771 // come up.
772 LOG.debug("About to register with Master.");
773 RetryCounterFactory rcf =
774 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
775 RetryCounter rc = rcf.create();
776 while (keepLooping()) {
777 RegionServerStartupResponse w = reportForDuty();
778 if (w == null) {
779 long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
780 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
781 this.sleeper.sleep(sleepTime);
782 } else {
783 handleReportForDutyResponse(w);
784 break;
789 if (!isStopped() && isHealthy()) {
790 // start the snapshot handler and other procedure handlers,
791 // since the server is ready to run
792 if (this.rspmHost != null) {
793 this.rspmHost.start();
795 // Start the Quota Manager
796 if (this.rsQuotaManager != null) {
797 rsQuotaManager.start(getRpcServer().getScheduler());
799 if (this.rsSpaceQuotaManager != null) {
800 this.rsSpaceQuotaManager.start();
804 // We registered with the Master. Go into run mode.
805 long lastMsg = EnvironmentEdgeManager.currentTime();
806 long oldRequestCount = -1;
807 // The main run loop.
808 while (!isStopped() && isHealthy()) {
809 if (!isClusterUp()) {
810 if (onlineRegions.isEmpty()) {
811 stop("Exiting; cluster shutdown set and not carrying any regions");
812 } else if (!this.stopping) {
813 this.stopping = true;
814 LOG.info("Closing user regions");
815 closeUserRegions(isAborted());
816 } else {
817 boolean allUserRegionsOffline = areAllUserRegionsOffline();
818 if (allUserRegionsOffline) {
819 // Set stopped if no more write requests tp meta tables
820 // since last time we went around the loop. Any open
821 // meta regions will be closed on our way out.
822 if (oldRequestCount == getWriteRequestCount()) {
823 stop("Stopped; only catalog regions remaining online");
824 break;
826 oldRequestCount = getWriteRequestCount();
827 } else {
828 // Make sure all regions have been closed -- some regions may
829 // have not got it because we were splitting at the time of
830 // the call to closeUserRegions.
831 closeUserRegions(this.abortRequested.get());
833 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
836 long now = EnvironmentEdgeManager.currentTime();
837 if ((now - lastMsg) >= msgInterval) {
838 tryRegionServerReport(lastMsg, now);
839 lastMsg = EnvironmentEdgeManager.currentTime();
841 if (!isStopped() && !isAborted()) {
842 this.sleeper.sleep();
844 } // for
845 } catch (Throwable t) {
846 if (!rpcServices.checkOOME(t)) {
847 String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
848 abort(prefix + t.getMessage(), t);
852 if (this.leaseManager != null) {
853 this.leaseManager.closeAfterLeasesExpire();
855 if (this.splitLogWorker != null) {
856 splitLogWorker.stop();
858 stopInfoServer();
859 // Send cache a shutdown.
860 if (blockCache != null) {
861 blockCache.shutdown();
863 if (mobFileCache != null) {
864 mobFileCache.shutdown();
867 // Send interrupts to wake up threads if sleeping so they notice shutdown.
868 // TODO: Should we check they are alive? If OOME could have exited already
869 if (this.hMemManager != null) {
870 this.hMemManager.stop();
872 if (this.cacheFlusher != null) {
873 this.cacheFlusher.interruptIfNecessary();
875 if (this.compactSplitThread != null) {
876 this.compactSplitThread.interruptIfNecessary();
879 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
880 if (rspmHost != null) {
881 rspmHost.stop(this.abortRequested.get() || this.killed);
884 if (this.killed) {
885 // Just skip out w/o closing regions. Used when testing.
886 } else if (abortRequested.get()) {
887 if (this.dataFsOk) {
888 closeUserRegions(abortRequested.get()); // Don't leave any open file handles
890 LOG.info("aborting server " + this.serverName);
891 } else {
892 closeUserRegions(abortRequested.get());
893 LOG.info("stopping server " + this.serverName);
895 regionReplicationBufferManager.stop();
896 closeClusterConnection();
897 // Closing the compactSplit thread before closing meta regions
898 if (!this.killed && containsMetaTableRegions()) {
899 if (!abortRequested.get() || this.dataFsOk) {
900 if (this.compactSplitThread != null) {
901 this.compactSplitThread.join();
902 this.compactSplitThread = null;
904 closeMetaTableRegions(abortRequested.get());
908 if (!this.killed && this.dataFsOk) {
909 waitOnAllRegionsToClose(abortRequested.get());
910 LOG.info("stopping server " + this.serverName + "; all regions closed.");
913 // Stop the quota manager
914 if (rsQuotaManager != null) {
915 rsQuotaManager.stop();
917 if (rsSpaceQuotaManager != null) {
918 rsSpaceQuotaManager.stop();
919 rsSpaceQuotaManager = null;
922 // flag may be changed when closing regions throws exception.
923 if (this.dataFsOk) {
924 shutdownWAL(!abortRequested.get());
927 // Make sure the proxy is down.
928 if (this.rssStub != null) {
929 this.rssStub = null;
931 if (this.lockStub != null) {
932 this.lockStub = null;
934 if (this.rpcClient != null) {
935 this.rpcClient.close();
937 if (this.leaseManager != null) {
938 this.leaseManager.close();
940 if (this.pauseMonitor != null) {
941 this.pauseMonitor.stop();
944 if (!killed) {
945 stopServiceThreads();
948 if (this.rpcServices != null) {
949 this.rpcServices.stop();
952 try {
953 deleteMyEphemeralNode();
954 } catch (KeeperException.NoNodeException nn) {
955 // pass
956 } catch (KeeperException e) {
957 LOG.warn("Failed deleting my ephemeral node", e);
959 // We may have failed to delete the znode at the previous step, but
960 // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
961 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
963 closeZooKeeper();
964 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
967 private boolean containsMetaTableRegions() {
968 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
971 private boolean areAllUserRegionsOffline() {
972 if (getNumberOfOnlineRegions() > 2) {
973 return false;
975 boolean allUserRegionsOffline = true;
976 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
977 if (!e.getValue().getRegionInfo().isMetaRegion()) {
978 allUserRegionsOffline = false;
979 break;
982 return allUserRegionsOffline;
986 * @return Current write count for all online regions.
988 private long getWriteRequestCount() {
989 long writeCount = 0;
990 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
991 writeCount += e.getValue().getWriteRequestsCount();
993 return writeCount;
996 @InterfaceAudience.Private
997 protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
998 throws IOException {
999 RegionServerStatusService.BlockingInterface rss = rssStub;
1000 if (rss == null) {
1001 // the current server could be stopping.
1002 return;
1004 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1005 try {
1006 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1007 request.setServer(ProtobufUtil.toServerName(this.serverName));
1008 request.setLoad(sl);
1009 rss.regionServerReport(null, request.build());
1010 } catch (ServiceException se) {
1011 IOException ioe = ProtobufUtil.getRemoteException(se);
1012 if (ioe instanceof YouAreDeadException) {
1013 // This will be caught and handled as a fatal error in run()
1014 throw ioe;
1016 if (rssStub == rss) {
1017 rssStub = null;
1019 // Couldn't connect to the master, get location from zk and reconnect
1020 // Method blocks until new master is found or we are stopped
1021 createRegionServerStatusStub(true);
1026 * Reports the given map of Regions and their size on the filesystem to the active Master.
1028 * @param regionSizeStore The store containing region sizes
1029 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1031 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1032 RegionServerStatusService.BlockingInterface rss = rssStub;
1033 if (rss == null) {
1034 // the current server could be stopping.
1035 LOG.trace("Skipping Region size report to HMaster as stub is null");
1036 return true;
1038 try {
1039 buildReportAndSend(rss, regionSizeStore);
1040 } catch (ServiceException se) {
1041 IOException ioe = ProtobufUtil.getRemoteException(se);
1042 if (ioe instanceof PleaseHoldException) {
1043 LOG.trace("Failed to report region sizes to Master because it is initializing."
1044 + " This will be retried.", ioe);
1045 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1046 return true;
1048 if (rssStub == rss) {
1049 rssStub = null;
1051 createRegionServerStatusStub(true);
1052 if (ioe instanceof DoNotRetryIOException) {
1053 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1054 if (doNotRetryEx.getCause() != null) {
1055 Throwable t = doNotRetryEx.getCause();
1056 if (t instanceof UnsupportedOperationException) {
1057 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1058 return false;
1062 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1064 return true;
1068 * Builds the region size report and sends it to the master. Upon successful sending of the
1069 * report, the region sizes that were sent are marked as sent.
1071 * @param rss The stub to send to the Master
1072 * @param regionSizeStore The store containing region sizes
1074 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1075 RegionSizeStore regionSizeStore) throws ServiceException {
1076 RegionSpaceUseReportRequest request =
1077 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1078 rss.reportRegionSpaceUse(null, request);
1079 // Record the number of size reports sent
1080 if (metricsRegionServer != null) {
1081 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1086 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1088 * @param regionSizes The size in bytes of regions
1089 * @return The corresponding protocol buffer message.
1091 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1092 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1093 for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1094 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1096 return request.build();
1100 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse}
1101 * protobuf message.
1103 * @param regionInfo The RegionInfo
1104 * @param sizeInBytes The size in bytes of the Region
1105 * @return The protocol buffer
1107 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1108 return RegionSpaceUse.newBuilder()
1109 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1110 .setRegionSize(Objects.requireNonNull(sizeInBytes))
1111 .build();
1114 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1115 throws IOException {
1116 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1117 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1118 // the wrapper to compute those numbers in one place.
1119 // In the long term most of these should be moved off of ServerLoad and the heart beat.
1120 // Instead they should be stored in an HBase table so that external visibility into HBase is
1121 // improved; Additionally the load balancer will be able to take advantage of a more complete
1122 // history.
1123 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1124 Collection<HRegion> regions = getOnlineRegionsLocalContext();
1125 long usedMemory = -1L;
1126 long maxMemory = -1L;
1127 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1128 if (usage != null) {
1129 usedMemory = usage.getUsed();
1130 maxMemory = usage.getMax();
1133 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1134 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1135 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1136 serverLoad.setUsedHeapMB((int)(usedMemory / 1024 / 1024));
1137 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1138 serverLoad.setReadRequestsCount(this.metricsRegionServerImpl.getReadRequestsCount());
1139 serverLoad.setWriteRequestsCount(this.metricsRegionServerImpl.getWriteRequestsCount());
1140 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1141 Builder coprocessorBuilder = Coprocessor.newBuilder();
1142 for (String coprocessor : coprocessors) {
1143 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1145 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1146 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1147 for (HRegion region : regions) {
1148 if (region.getCoprocessorHost() != null) {
1149 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1150 for (String regionCoprocessor : regionCoprocessors) {
1151 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1154 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1155 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1156 .getCoprocessors()) {
1157 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1160 serverLoad.setReportStartTime(reportStartTime);
1161 serverLoad.setReportEndTime(reportEndTime);
1162 if (this.infoServer != null) {
1163 serverLoad.setInfoServerPort(this.infoServer.getPort());
1164 } else {
1165 serverLoad.setInfoServerPort(-1);
1167 MetricsUserAggregateSource userSource =
1168 metricsRegionServer.getMetricsUserAggregate().getSource();
1169 if (userSource != null) {
1170 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1171 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1172 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1176 if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
1177 // always refresh first to get the latest value
1178 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1179 if (rLoad != null) {
1180 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1181 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1182 .getReplicationLoadSourceEntries()) {
1183 serverLoad.addReplLoadSource(rLS);
1186 } else {
1187 if (replicationSourceHandler != null) {
1188 ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
1189 if (rLoad != null) {
1190 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1191 .getReplicationLoadSourceEntries()) {
1192 serverLoad.addReplLoadSource(rLS);
1196 if (replicationSinkHandler != null) {
1197 ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
1198 if (rLoad != null) {
1199 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1204 TaskMonitor.get().getTasks().forEach(task ->
1205 serverLoad.addTasks(ClusterStatusProtos.ServerTask.newBuilder()
1206 .setDescription(task.getDescription())
1207 .setStatus(task.getStatus() != null ? task.getStatus() : "")
1208 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1209 .setStartTime(task.getStartTime())
1210 .setCompletionTime(task.getCompletionTimestamp())
1211 .build()));
1213 return serverLoad.build();
1216 private String getOnlineRegionsAsPrintableString() {
1217 StringBuilder sb = new StringBuilder();
1218 for (Region r: this.onlineRegions.values()) {
1219 if (sb.length() > 0) {
1220 sb.append(", ");
1222 sb.append(r.getRegionInfo().getEncodedName());
1224 return sb.toString();
1228 * Wait on regions close.
1230 private void waitOnAllRegionsToClose(final boolean abort) {
1231 // Wait till all regions are closed before going out.
1232 int lastCount = -1;
1233 long previousLogTime = 0;
1234 Set<String> closedRegions = new HashSet<>();
1235 boolean interrupted = false;
1236 try {
1237 while (!onlineRegions.isEmpty()) {
1238 int count = getNumberOfOnlineRegions();
1239 // Only print a message if the count of regions has changed.
1240 if (count != lastCount) {
1241 // Log every second at most
1242 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1243 previousLogTime = EnvironmentEdgeManager.currentTime();
1244 lastCount = count;
1245 LOG.info("Waiting on " + count + " regions to close");
1246 // Only print out regions still closing if a small number else will
1247 // swamp the log.
1248 if (count < 10 && LOG.isDebugEnabled()) {
1249 LOG.debug("Online Regions=" + this.onlineRegions);
1253 // Ensure all user regions have been sent a close. Use this to
1254 // protect against the case where an open comes in after we start the
1255 // iterator of onlineRegions to close all user regions.
1256 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1257 RegionInfo hri = e.getValue().getRegionInfo();
1258 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) &&
1259 !closedRegions.contains(hri.getEncodedName())) {
1260 closedRegions.add(hri.getEncodedName());
1261 // Don't update zk with this close transition; pass false.
1262 closeRegionIgnoreErrors(hri, abort);
1265 // No regions in RIT, we could stop waiting now.
1266 if (this.regionsInTransitionInRS.isEmpty()) {
1267 if (!onlineRegions.isEmpty()) {
1268 LOG.info("We were exiting though online regions are not empty," +
1269 " because some regions failed closing");
1271 break;
1272 } else {
1273 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream().
1274 map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1276 if (sleepInterrupted(200)) {
1277 interrupted = true;
1280 } finally {
1281 if (interrupted) {
1282 Thread.currentThread().interrupt();
1287 private static boolean sleepInterrupted(long millis) {
1288 boolean interrupted = false;
1289 try {
1290 Thread.sleep(millis);
1291 } catch (InterruptedException e) {
1292 LOG.warn("Interrupted while sleeping");
1293 interrupted = true;
1295 return interrupted;
1298 private void shutdownWAL(final boolean close) {
1299 if (this.walFactory != null) {
1300 try {
1301 if (close) {
1302 walFactory.close();
1303 } else {
1304 walFactory.shutdown();
1306 } catch (Throwable e) {
1307 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1308 LOG.error("Shutdown / close of WAL failed: " + e);
1309 LOG.debug("Shutdown / close exception details:", e);
1315 * Run init. Sets up wal and starts up all server threads.
1317 * @param c Extra configuration.
1319 protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1320 throws IOException {
1321 try {
1322 boolean updateRootDir = false;
1323 for (NameStringPair e : c.getMapEntriesList()) {
1324 String key = e.getName();
1325 // The hostname the master sees us as.
1326 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1327 String hostnameFromMasterPOV = e.getValue();
1328 this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1329 rpcServices.getSocketAddress().getPort(), this.startcode);
1330 if (!StringUtils.isBlank(useThisHostnameInstead) &&
1331 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1332 String msg = "Master passed us a different hostname to use; was=" +
1333 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1334 LOG.error(msg);
1335 throw new IOException(msg);
1337 if (StringUtils.isBlank(useThisHostnameInstead) &&
1338 !hostnameFromMasterPOV.equals(rpcServices.getSocketAddress().getHostName())) {
1339 String msg = "Master passed us a different hostname to use; was=" +
1340 rpcServices.getSocketAddress().getHostName() + ", but now=" + hostnameFromMasterPOV;
1341 LOG.error(msg);
1343 continue;
1346 String value = e.getValue();
1347 if (key.equals(HConstants.HBASE_DIR)) {
1348 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1349 updateRootDir = true;
1353 if (LOG.isDebugEnabled()) {
1354 LOG.debug("Config from master: " + key + "=" + value);
1356 this.conf.set(key, value);
1358 // Set our ephemeral znode up in zookeeper now we have a name.
1359 createMyEphemeralNode();
1361 if (updateRootDir) {
1362 // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1363 initializeFileSystem();
1366 // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1367 // config param for task trackers, but we can piggyback off of it.
1368 if (this.conf.get("mapreduce.task.attempt.id") == null) {
1369 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1372 // Save it in a file, this will allow to see if we crash
1373 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1375 // This call sets up an initialized replication and WAL. Later we start it up.
1376 setupWALAndReplication();
1377 // Init in here rather than in constructor after thread name has been set
1378 final MetricsTable metricsTable =
1379 new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1380 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1381 this.metricsRegionServer = new MetricsRegionServer(
1382 metricsRegionServerImpl, conf, metricsTable);
1383 // Now that we have a metrics source, start the pause monitor
1384 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1385 pauseMonitor.start();
1387 // There is a rare case where we do NOT want services to start. Check config.
1388 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1389 startServices();
1391 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1392 // or make sense of it.
1393 startReplicationService();
1395 // Set up ZK
1396 LOG.info("Serving as " + this.serverName + ", RpcServer on " +
1397 rpcServices.getSocketAddress() + ", sessionid=0x" +
1398 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1400 // Wake up anyone waiting for this server to online
1401 synchronized (online) {
1402 online.set(true);
1403 online.notifyAll();
1405 } catch (Throwable e) {
1406 stop("Failed initialization");
1407 throw convertThrowableToIOE(cleanup(e, "Failed init"),
1408 "Region server startup failed");
1409 } finally {
1410 sleeper.skipSleepCycle();
1414 private void startHeapMemoryManager() {
1415 if (this.blockCache != null) {
1416 this.hMemManager =
1417 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1418 this.hMemManager.start(getChoreService());
1422 private void createMyEphemeralNode() throws KeeperException {
1423 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1424 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1425 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1426 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1427 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1430 private void deleteMyEphemeralNode() throws KeeperException {
1431 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1434 @Override
1435 public RegionServerAccounting getRegionServerAccounting() {
1436 return regionServerAccounting;
1439 // Round the size with KB or MB.
1440 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1441 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1442 // HBASE-26340 for more details on why this is important.
1443 private static int roundSize(long sizeInByte, int sizeUnit) {
1444 if (sizeInByte == 0) {
1445 return 0;
1446 } else if (sizeInByte < sizeUnit) {
1447 return 1;
1448 } else {
1449 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1454 * @param r Region to get RegionLoad for.
1455 * @param regionLoadBldr the RegionLoad.Builder, can be null
1456 * @param regionSpecifier the RegionSpecifier.Builder, can be null
1457 * @return RegionLoad instance.
1459 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1460 RegionSpecifier.Builder regionSpecifier) throws IOException {
1461 byte[] name = r.getRegionInfo().getRegionName();
1462 int stores = 0;
1463 int storefiles = 0;
1464 int storeRefCount = 0;
1465 int maxCompactedStoreFileRefCount = 0;
1466 long storeUncompressedSize = 0L;
1467 long storefileSize = 0L;
1468 long storefileIndexSize = 0L;
1469 long rootLevelIndexSize = 0L;
1470 long totalStaticIndexSize = 0L;
1471 long totalStaticBloomSize = 0L;
1472 long totalCompactingKVs = 0L;
1473 long currentCompactedKVs = 0L;
1474 List<HStore> storeList = r.getStores();
1475 stores += storeList.size();
1476 for (HStore store : storeList) {
1477 storefiles += store.getStorefilesCount();
1478 int currentStoreRefCount = store.getStoreRefCount();
1479 storeRefCount += currentStoreRefCount;
1480 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1481 maxCompactedStoreFileRefCount = Math.max(maxCompactedStoreFileRefCount,
1482 currentMaxCompactedStoreFileRefCount);
1483 storeUncompressedSize += store.getStoreSizeUncompressed();
1484 storefileSize += store.getStorefilesSize();
1485 //TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1486 storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1487 CompactionProgress progress = store.getCompactionProgress();
1488 if (progress != null) {
1489 totalCompactingKVs += progress.getTotalCompactingKVs();
1490 currentCompactedKVs += progress.currentCompactedKVs;
1492 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1493 totalStaticIndexSize += store.getTotalStaticIndexSize();
1494 totalStaticBloomSize += store.getTotalStaticBloomSize();
1497 int unitMB = 1024 * 1024;
1498 int unitKB = 1024;
1500 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1501 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1502 int storefileSizeMB = roundSize(storefileSize, unitMB);
1503 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1504 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1505 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1506 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1508 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1509 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1510 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1511 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1512 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1513 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1514 if (regionLoadBldr == null) {
1515 regionLoadBldr = RegionLoad.newBuilder();
1517 if (regionSpecifier == null) {
1518 regionSpecifier = RegionSpecifier.newBuilder();
1521 regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1522 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1523 regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1524 .setStores(stores)
1525 .setStorefiles(storefiles)
1526 .setStoreRefCount(storeRefCount)
1527 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1528 .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1529 .setStorefileSizeMB(storefileSizeMB)
1530 .setMemStoreSizeMB(memstoreSizeMB)
1531 .setStorefileIndexSizeKB(storefileIndexSizeKB)
1532 .setRootIndexSizeKB(rootLevelIndexSizeKB)
1533 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1534 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1535 .setReadRequestsCount(r.getReadRequestsCount())
1536 .setCpRequestsCount(r.getCpRequestsCount())
1537 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1538 .setWriteRequestsCount(r.getWriteRequestsCount())
1539 .setTotalCompactingKVs(totalCompactingKVs)
1540 .setCurrentCompactedKVs(currentCompactedKVs)
1541 .setDataLocality(dataLocality)
1542 .setDataLocalityForSsd(dataLocalityForSsd)
1543 .setBlocksLocalWeight(blocksLocalWeight)
1544 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight)
1545 .setBlocksTotalWeight(blocksTotalWeight)
1546 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1547 .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1548 r.setCompleteSequenceId(regionLoadBldr);
1549 return regionLoadBldr.build();
1552 private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1553 UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1554 userLoadBldr.setUserName(user);
1555 userSource.getClientMetrics().values().stream().map(
1556 clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1557 .setHostName(clientMetrics.getHostName())
1558 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1559 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1560 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1561 .forEach(userLoadBldr::addClientMetrics);
1562 return userLoadBldr.build();
1565 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1566 HRegion r = onlineRegions.get(encodedRegionName);
1567 return r != null ? createRegionLoad(r, null, null) : null;
1571 * Inner class that runs on a long period checking if regions need compaction.
1573 private static class CompactionChecker extends ScheduledChore {
1574 private final HRegionServer instance;
1575 private final int majorCompactPriority;
1576 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1577 //Iteration is 1-based rather than 0-based so we don't check for compaction
1578 // immediately upon region server startup
1579 private long iteration = 1;
1581 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1582 super("CompactionChecker", stopper, sleepTime);
1583 this.instance = h;
1584 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1586 /* MajorCompactPriority is configurable.
1587 * If not set, the compaction will use default priority.
1589 this.majorCompactPriority = this.instance.conf.
1590 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1591 DEFAULT_PRIORITY);
1594 @Override
1595 protected void chore() {
1596 for (Region r : this.instance.onlineRegions.values()) {
1597 // Skip compaction if region is read only
1598 if (r == null || r.isReadOnly()) {
1599 continue;
1602 HRegion hr = (HRegion) r;
1603 for (HStore s : hr.stores.values()) {
1604 try {
1605 long multiplier = s.getCompactionCheckMultiplier();
1606 assert multiplier > 0;
1607 if (iteration % multiplier != 0) {
1608 continue;
1610 if (s.needsCompaction()) {
1611 // Queue a compaction. Will recognize if major is needed.
1612 this.instance.compactSplitThread.requestSystemCompaction(hr, s,
1613 getName() + " requests compaction");
1614 } else if (s.shouldPerformMajorCompaction()) {
1615 s.triggerMajorCompaction();
1616 if (majorCompactPriority == DEFAULT_PRIORITY ||
1617 majorCompactPriority > hr.getCompactPriority()) {
1618 this.instance.compactSplitThread.requestCompaction(hr, s,
1619 getName() + " requests major compaction; use default priority",
1620 Store.NO_PRIORITY,
1621 CompactionLifeCycleTracker.DUMMY, null);
1622 } else {
1623 this.instance.compactSplitThread.requestCompaction(hr, s,
1624 getName() + " requests major compaction; use configured priority",
1625 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
1628 } catch (IOException e) {
1629 LOG.warn("Failed major compaction check on " + r, e);
1633 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1637 private static class PeriodicMemStoreFlusher extends ScheduledChore {
1638 private final HRegionServer server;
1639 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
1640 private final static int MIN_DELAY_TIME = 0; // millisec
1641 private final long rangeOfDelayMs;
1643 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1644 super("MemstoreFlusherChore", server, cacheFlushInterval);
1645 this.server = server;
1647 final long configuredRangeOfDelay = server.getConfiguration().getInt(
1648 "hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
1649 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
1652 @Override
1653 protected void chore() {
1654 final StringBuilder whyFlush = new StringBuilder();
1655 for (HRegion r : this.server.onlineRegions.values()) {
1656 if (r == null) {
1657 continue;
1659 if (r.shouldFlush(whyFlush)) {
1660 FlushRequester requester = server.getFlushRequester();
1661 if (requester != null) {
1662 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
1663 //Throttle the flushes by putting a delay. If we don't throttle, and there
1664 //is a balanced write-load on the regions in a table, we might end up
1665 //overwhelming the filesystem with too many flushes at once.
1666 if (requester.requestDelayedFlush(r, delay)) {
1667 LOG.info("{} requesting flush of {} because {} after random delay {} ms",
1668 getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(),
1669 delay);
1678 * Report the status of the server. A server is online once all the startup is
1679 * completed (setting up filesystem, starting executorService threads, etc.). This
1680 * method is designed mostly to be useful in tests.
1682 * @return true if online, false if not.
1684 public boolean isOnline() {
1685 return online.get();
1689 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
1690 * be hooked up to WAL.
1692 private void setupWALAndReplication() throws IOException {
1693 WALFactory factory = new WALFactory(conf, serverName.toString(), this, true);
1694 // TODO Replication make assumptions here based on the default filesystem impl
1695 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1696 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
1698 Path logDir = new Path(walRootDir, logName);
1699 LOG.debug("logDir={}", logDir);
1700 if (this.walFs.exists(logDir)) {
1701 throw new RegionServerRunningException(
1702 "Region server has already created directory at " + this.serverName.toString());
1704 // Always create wal directory as now we need this when master restarts to find out the live
1705 // region servers.
1706 if (!this.walFs.mkdirs(logDir)) {
1707 throw new IOException("Can not create wal directory " + logDir);
1709 // Instantiate replication if replication enabled. Pass it the log directories.
1710 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
1711 this.walFactory = factory;
1715 * Start up replication source and sink handlers.
1717 private void startReplicationService() throws IOException {
1718 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
1719 this.replicationSourceHandler.startReplicationService();
1720 } else {
1721 if (this.replicationSourceHandler != null) {
1722 this.replicationSourceHandler.startReplicationService();
1724 if (this.replicationSinkHandler != null) {
1725 this.replicationSinkHandler.startReplicationService();
1731 * @return Master address tracker instance.
1733 public MasterAddressTracker getMasterAddressTracker() {
1734 return this.masterAddressTracker;
1738 * Start maintenance Threads, Server, Worker and lease checker threads.
1739 * Start all threads we need to run. This is called after we've successfully
1740 * registered with the Master.
1741 * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1742 * get an unhandled exception. We cannot set the handler on all threads.
1743 * Server's internal Listener thread is off limits. For Server, if an OOME, it
1744 * waits a while then retries. Meantime, a flush or a compaction that tries to
1745 * run should trigger same critical condition and the shutdown will run. On
1746 * its way out, this server will shut down Server. Leases are sort of
1747 * inbetween. It has an internal thread that while it inherits from Chore, it
1748 * keeps its own internal stop mechanism so needs to be stopped by this
1749 * hosting server. Worker logs the exception and exits.
1751 private void startServices() throws IOException {
1752 if (!isStopped() && !isAborted()) {
1753 initializeThreads();
1755 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, asyncClusterConnection);
1756 this.secureBulkLoadManager.start();
1758 // Health checker thread.
1759 if (isHealthCheckerConfigured()) {
1760 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
1761 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
1762 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
1764 // Executor status collect thread.
1765 if (this.conf.getBoolean(HConstants.EXECUTOR_STATUS_COLLECT_ENABLED,
1766 HConstants.DEFAULT_EXECUTOR_STATUS_COLLECT_ENABLED)) {
1767 int sleepTime = this.conf.getInt(ExecutorStatusChore.WAKE_FREQ,
1768 ExecutorStatusChore.DEFAULT_WAKE_FREQ);
1769 executorStatusChore = new ExecutorStatusChore(sleepTime, this, this.getExecutorService(),
1770 this.metricsRegionServer.getMetricsSource());
1773 this.walRoller = new LogRoller(this);
1774 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
1775 this.procedureResultReporter = new RemoteProcedureResultReporter(this);
1777 // Create the CompactedFileDischarger chore executorService. This chore helps to
1778 // remove the compacted files that will no longer be used in reads.
1779 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
1780 // 2 mins so that compacted files can be archived before the TTLCleaner runs
1781 int cleanerInterval =
1782 conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
1783 this.compactedFileDischarger =
1784 new CompactedHFilesDischarger(cleanerInterval, this, this);
1785 choreService.scheduleChore(compactedFileDischarger);
1787 // Start executor services
1788 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
1789 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1790 ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
1791 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
1792 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1793 ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
1794 final int openPriorityRegionThreads =
1795 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
1796 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1797 ExecutorType.RS_OPEN_PRIORITY_REGION).setCorePoolSize(openPriorityRegionThreads));
1798 final int closeRegionThreads =
1799 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
1800 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1801 ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
1802 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
1803 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1804 ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
1805 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1806 final int storeScannerParallelSeekThreads =
1807 conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
1808 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1809 ExecutorType.RS_PARALLEL_SEEK).setCorePoolSize(storeScannerParallelSeekThreads)
1810 .setAllowCoreThreadTimeout(true));
1812 final int logReplayOpsThreads = conf.getInt(
1813 HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
1814 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1815 ExecutorType.RS_LOG_REPLAY_OPS).setCorePoolSize(logReplayOpsThreads)
1816 .setAllowCoreThreadTimeout(true));
1817 // Start the threads for compacted files discharger
1818 final int compactionDischargerThreads =
1819 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
1820 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1821 ExecutorType.RS_COMPACTED_FILES_DISCHARGER).setCorePoolSize(compactionDischargerThreads));
1822 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1823 final int regionReplicaFlushThreads = conf.getInt(
1824 "hbase.regionserver.region.replica.flusher.threads", conf.getInt(
1825 "hbase.regionserver.executor.openregion.threads", 3));
1826 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1827 ExecutorType.RS_REGION_REPLICA_FLUSH_OPS).setCorePoolSize(regionReplicaFlushThreads));
1829 final int refreshPeerThreads =
1830 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
1831 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1832 ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
1833 final int replaySyncReplicationWALThreads =
1834 conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
1835 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1836 ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL).setCorePoolSize(
1837 replaySyncReplicationWALThreads));
1838 final int switchRpcThrottleThreads =
1839 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
1840 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1841 ExecutorType.RS_SWITCH_RPC_THROTTLE).setCorePoolSize(switchRpcThrottleThreads));
1842 final int claimReplicationQueueThreads =
1843 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
1844 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1845 ExecutorType.RS_CLAIM_REPLICATION_QUEUE).setCorePoolSize(claimReplicationQueueThreads));
1846 final int rsSnapshotOperationThreads =
1847 conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
1848 executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType(
1849 ExecutorType.RS_SNAPSHOT_OPERATIONS).setCorePoolSize(rsSnapshotOperationThreads));
1851 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
1852 uncaughtExceptionHandler);
1853 if (this.cacheFlusher != null) {
1854 this.cacheFlusher.start(uncaughtExceptionHandler);
1856 Threads.setDaemonThreadRunning(this.procedureResultReporter,
1857 getName() + ".procedureResultReporter", uncaughtExceptionHandler);
1859 if (this.compactionChecker != null) {
1860 choreService.scheduleChore(compactionChecker);
1862 if (this.periodicFlusher != null) {
1863 choreService.scheduleChore(periodicFlusher);
1865 if (this.healthCheckChore != null) {
1866 choreService.scheduleChore(healthCheckChore);
1868 if (this.executorStatusChore != null) {
1869 choreService.scheduleChore(executorStatusChore);
1871 if (this.nonceManagerChore != null) {
1872 choreService.scheduleChore(nonceManagerChore);
1874 if (this.storefileRefresher != null) {
1875 choreService.scheduleChore(storefileRefresher);
1877 if (this.fsUtilizationChore != null) {
1878 choreService.scheduleChore(fsUtilizationChore);
1880 if (this.slowLogTableOpsChore != null) {
1881 choreService.scheduleChore(slowLogTableOpsChore);
1883 if (this.brokenStoreFileCleaner != null) {
1884 choreService.scheduleChore(brokenStoreFileCleaner);
1887 // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1888 // an unhandled exception, it will just exit.
1889 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
1890 uncaughtExceptionHandler);
1892 // Create the log splitting worker and start it
1893 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
1894 // quite a while inside Connection layer. The worker won't be available for other
1895 // tasks even after current task is preempted after a split task times out.
1896 Configuration sinkConf = HBaseConfiguration.create(conf);
1897 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1898 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
1899 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1900 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
1901 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
1902 if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
1903 DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
1904 // SplitLogWorker needs csm. If none, don't start this.
1905 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
1906 splitLogWorker.start();
1907 LOG.debug("SplitLogWorker started");
1910 // Memstore services.
1911 startHeapMemoryManager();
1912 // Call it after starting HeapMemoryManager.
1913 initializeMemStoreChunkCreator(hMemManager);
1916 private void initializeThreads() {
1917 // Cache flushing thread.
1918 this.cacheFlusher = new MemStoreFlusher(conf, this);
1920 // Compaction thread
1921 this.compactSplitThread = new CompactSplit(this);
1923 // Background thread to check for compactions; needed if region has not gotten updates
1924 // in a while. It will take care of not checking too frequently on store-by-store basis.
1925 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
1926 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
1927 this.leaseManager = new LeaseManager(this.threadWakeFrequency);
1929 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
1930 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
1931 if (isSlowLogTableEnabled) {
1932 // default chore duration: 10 min
1933 final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
1934 slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
1937 if (this.nonceManager != null) {
1938 // Create the scheduled chore that cleans up nonces.
1939 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
1942 // Setup the Quota Manager
1943 rsQuotaManager = new RegionServerRpcQuotaManager(this);
1944 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
1946 if (QuotaUtil.isQuotaEnabled(conf)) {
1947 this.fsUtilizationChore = new FileSystemUtilizationChore(this);
1951 boolean onlyMetaRefresh = false;
1952 int storefileRefreshPeriod = conf.getInt(
1953 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
1954 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1955 if (storefileRefreshPeriod == 0) {
1956 storefileRefreshPeriod = conf.getInt(
1957 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
1958 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
1959 onlyMetaRefresh = true;
1961 if (storefileRefreshPeriod > 0) {
1962 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
1963 onlyMetaRefresh, this, this);
1966 int brokenStoreFileCleanerPeriod = conf.getInt(
1967 BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
1968 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
1969 int brokenStoreFileCleanerDelay = conf.getInt(
1970 BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
1971 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
1972 double brokenStoreFileCleanerDelayJitter = conf.getDouble(
1973 BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
1974 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
1975 double jitterRate = (ThreadLocalRandom.current().nextDouble() - 0.5D) *
1976 brokenStoreFileCleanerDelayJitter;
1977 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
1978 this.brokenStoreFileCleaner =
1979 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
1980 brokenStoreFileCleanerPeriod, this, conf, this);
1982 registerConfigurationObservers();
1985 private void registerConfigurationObservers() {
1986 // Registering the compactSplitThread object with the ConfigurationManager.
1987 configurationManager.registerObserver(this.compactSplitThread);
1988 configurationManager.registerObserver(this.rpcServices);
1989 configurationManager.registerObserver(this);
1993 * Verify that server is healthy
1995 private boolean isHealthy() {
1996 if (!dataFsOk) {
1997 // File system problem
1998 return false;
2000 // Verify that all threads are alive
2001 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2002 && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2003 && (this.walRoller == null || this.walRoller.isAlive())
2004 && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2005 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2006 if (!healthy) {
2007 stop("One or more threads are no longer alive -- stop");
2009 return healthy;
2012 @Override
2013 public List<WAL> getWALs() {
2014 return walFactory.getWALs();
2017 @Override
2018 public WAL getWAL(RegionInfo regionInfo) throws IOException {
2019 WAL wal = walFactory.getWAL(regionInfo);
2020 if (this.walRoller != null) {
2021 this.walRoller.addWAL(wal);
2023 return wal;
2026 public LogRoller getWalRoller() {
2027 return walRoller;
2030 WALFactory getWalFactory() {
2031 return walFactory;
2034 @Override
2035 public void stop(final String msg) {
2036 stop(msg, false, RpcServer.getRequestUser().orElse(null));
2040 * Stops the regionserver.
2041 * @param msg Status message
2042 * @param force True if this is a regionserver abort
2043 * @param user The user executing the stop request, or null if no user is associated
2045 public void stop(final String msg, final boolean force, final User user) {
2046 if (!this.stopped) {
2047 LOG.info("***** STOPPING region server '" + this + "' *****");
2048 if (this.rsHost != null) {
2049 // when forced via abort don't allow CPs to override
2050 try {
2051 this.rsHost.preStop(msg, user);
2052 } catch (IOException ioe) {
2053 if (!force) {
2054 LOG.warn("The region server did not stop", ioe);
2055 return;
2057 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2060 this.stopped = true;
2061 LOG.info("STOPPED: " + msg);
2062 // Wakes run() if it is sleeping
2063 sleeper.skipSleepCycle();
2067 public void waitForServerOnline(){
2068 while (!isStopped() && !isOnline()) {
2069 synchronized (online) {
2070 try {
2071 online.wait(msgInterval);
2072 } catch (InterruptedException ie) {
2073 Thread.currentThread().interrupt();
2074 break;
2080 @Override
2081 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2082 HRegion r = context.getRegion();
2083 long openProcId = context.getOpenProcId();
2084 long masterSystemTime = context.getMasterSystemTime();
2085 rpcServices.checkOpen();
2086 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2087 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2088 // Do checks to see if we need to compact (references or too many files)
2089 // Skip compaction check if region is read only
2090 if (!r.isReadOnly()) {
2091 for (HStore s : r.stores.values()) {
2092 if (s.hasReferences() || s.needsCompaction()) {
2093 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2097 long openSeqNum = r.getOpenSeqNum();
2098 if (openSeqNum == HConstants.NO_SEQNUM) {
2099 // If we opened a region, we should have read some sequence number from it.
2100 LOG.error(
2101 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2102 openSeqNum = 0;
2105 // Notify master
2106 if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2107 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) {
2108 throw new IOException(
2109 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2112 triggerFlushInPrimaryRegion(r);
2114 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2118 * Helper method for use in tests. Skip the region transition report when there's no master
2119 * around to receive it.
2121 private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2122 final TransitionCode code = context.getCode();
2123 final long openSeqNum = context.getOpenSeqNum();
2124 long masterSystemTime = context.getMasterSystemTime();
2125 final RegionInfo[] hris = context.getHris();
2127 if (code == TransitionCode.OPENED) {
2128 Preconditions.checkArgument(hris != null && hris.length == 1);
2129 if (hris[0].isMetaRegion()) {
2130 LOG.warn(
2131 "meta table location is stored in master local store, so we can not skip reporting");
2132 return false;
2133 } else {
2134 try {
2135 MetaTableAccessor.updateRegionLocation(asyncClusterConnection.toConnection(), hris[0],
2136 serverName, openSeqNum, masterSystemTime);
2137 } catch (IOException e) {
2138 LOG.info("Failed to update meta", e);
2139 return false;
2143 return true;
2146 private ReportRegionStateTransitionRequest createReportRegionStateTransitionRequest(
2147 final RegionStateTransitionContext context) {
2148 final TransitionCode code = context.getCode();
2149 final long openSeqNum = context.getOpenSeqNum();
2150 final RegionInfo[] hris = context.getHris();
2151 final long[] procIds = context.getProcIds();
2153 ReportRegionStateTransitionRequest.Builder builder =
2154 ReportRegionStateTransitionRequest.newBuilder();
2155 builder.setServer(ProtobufUtil.toServerName(serverName));
2156 RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2157 transition.setTransitionCode(code);
2158 if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2159 transition.setOpenSeqNum(openSeqNum);
2161 for (RegionInfo hri: hris) {
2162 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2164 for (long procId: procIds) {
2165 transition.addProcId(procId);
2168 return builder.build();
2171 @Override
2172 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2173 if (TEST_SKIP_REPORTING_TRANSITION) {
2174 return skipReportingTransition(context);
2176 final ReportRegionStateTransitionRequest request =
2177 createReportRegionStateTransitionRequest(context);
2179 int tries = 0;
2180 long pauseTime = this.retryPauseTime;
2181 // Keep looping till we get an error. We want to send reports even though server is going down.
2182 // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2183 // HRegionServer does down.
2184 while (this.asyncClusterConnection != null && !this.asyncClusterConnection.isClosed()) {
2185 RegionServerStatusService.BlockingInterface rss = rssStub;
2186 try {
2187 if (rss == null) {
2188 createRegionServerStatusStub();
2189 continue;
2191 ReportRegionStateTransitionResponse response =
2192 rss.reportRegionStateTransition(null, request);
2193 if (response.hasErrorMessage()) {
2194 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2195 break;
2197 // Log if we had to retry else don't log unless TRACE. We want to
2198 // know if were successful after an attempt showed in logs as failed.
2199 if (tries > 0 || LOG.isTraceEnabled()) {
2200 LOG.info("TRANSITION REPORTED " + request);
2202 // NOTE: Return mid-method!!!
2203 return true;
2204 } catch (ServiceException se) {
2205 IOException ioe = ProtobufUtil.getRemoteException(se);
2206 boolean pause =
2207 ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException
2208 || ioe instanceof CallQueueTooBigException;
2209 if (pause) {
2210 // Do backoff else we flood the Master with requests.
2211 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2212 } else {
2213 pauseTime = this.retryPauseTime; // Reset.
2215 LOG.info("Failed report transition " +
2216 TextFormat.shortDebugString(request) + "; retry (#" + tries + ")" +
2217 (pause?
2218 " after " + pauseTime + "ms delay (Master is coming online...).":
2219 " immediately."),
2220 ioe);
2221 if (pause) {
2222 Threads.sleep(pauseTime);
2224 tries++;
2225 if (rssStub == rss) {
2226 rssStub = null;
2230 return false;
2234 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2235 * block this thread. See RegionReplicaFlushHandler for details.
2237 private void triggerFlushInPrimaryRegion(final HRegion region) {
2238 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2239 return;
2241 TableName tn = region.getTableDescriptor().getTableName();
2242 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) ||
2243 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2244 // If the memstore replication not setup, we do not have to wait for observing a flush event
2245 // from primary before starting to serve reads, because gaps from replication is not
2246 // applicable,this logic is from
2247 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2248 // HBASE-13063
2249 !region.getTableDescriptor().hasRegionMemStoreReplication()) {
2250 region.setReadsEnabled(true);
2251 return;
2254 region.setReadsEnabled(false); // disable reads before marking the region as opened.
2255 // RegionReplicaFlushHandler might reset this.
2257 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2258 if (this.executorService != null) {
2259 this.executorService.submit(new RegionReplicaFlushHandler(this, region));
2260 } else {
2261 LOG.info("Executor is null; not running flush of primary region replica for {}",
2262 region.getRegionInfo());
2266 @InterfaceAudience.Private
2267 public RSRpcServices getRSRpcServices() {
2268 return rpcServices;
2272 * Cause the server to exit without closing the regions it is serving, the log
2273 * it is using and without notifying the master. Used unit testing and on
2274 * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
2276 * @param reason
2277 * the reason we are aborting
2278 * @param cause
2279 * the exception that caused the abort, or null
2281 @Override
2282 public void abort(String reason, Throwable cause) {
2283 if (!setAbortRequested()) {
2284 // Abort already in progress, ignore the new request.
2285 LOG.debug(
2286 "Abort already in progress. Ignoring the current request with reason: {}", reason);
2287 return;
2289 String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2290 if (cause != null) {
2291 LOG.error(HBaseMarkers.FATAL, msg, cause);
2292 } else {
2293 LOG.error(HBaseMarkers.FATAL, msg);
2295 // HBASE-4014: show list of coprocessors that were loaded to help debug
2296 // regionserver crashes.Note that we're implicitly using
2297 // java.util.HashSet's toString() method to print the coprocessor names.
2298 LOG.error(HBaseMarkers.FATAL, "RegionServer abort: loaded coprocessors are: " +
2299 CoprocessorHost.getLoadedCoprocessors());
2300 // Try and dump metrics if abort -- might give clue as to how fatal came about....
2301 try {
2302 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2303 } catch (MalformedObjectNameException | IOException e) {
2304 LOG.warn("Failed dumping metrics", e);
2307 // Do our best to report our abort to the master, but this may not work
2308 try {
2309 if (cause != null) {
2310 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2312 // Report to the master but only if we have already registered with the master.
2313 RegionServerStatusService.BlockingInterface rss = rssStub;
2314 if (rss != null && this.serverName != null) {
2315 ReportRSFatalErrorRequest.Builder builder =
2316 ReportRSFatalErrorRequest.newBuilder();
2317 builder.setServer(ProtobufUtil.toServerName(this.serverName));
2318 builder.setErrorMessage(msg);
2319 rss.reportRSFatalError(null, builder.build());
2321 } catch (Throwable t) {
2322 LOG.warn("Unable to report fatal error to master", t);
2325 scheduleAbortTimer();
2326 // shutdown should be run as the internal user
2327 stop(reason, true, null);
2331 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
2332 * logs but it does close socket in case want to bring up server on old
2333 * hostname+port immediately.
2335 @InterfaceAudience.Private
2336 protected void kill() {
2337 this.killed = true;
2338 abort("Simulated kill");
2341 // Limits the time spent in the shutdown process.
2342 private void scheduleAbortTimer() {
2343 if (this.abortMonitor == null) {
2344 this.abortMonitor = new Timer("Abort regionserver monitor", true);
2345 TimerTask abortTimeoutTask = null;
2346 try {
2347 Constructor<? extends TimerTask> timerTaskCtor =
2348 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2349 .asSubclass(TimerTask.class).getDeclaredConstructor();
2350 timerTaskCtor.setAccessible(true);
2351 abortTimeoutTask = timerTaskCtor.newInstance();
2352 } catch (Exception e) {
2353 LOG.warn("Initialize abort timeout task failed", e);
2355 if (abortTimeoutTask != null) {
2356 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2362 * Wait on all threads to finish. Presumption is that all closes and stops
2363 * have already been called.
2365 protected void stopServiceThreads() {
2366 // clean up the scheduled chores
2367 stopChoreService();
2368 if (bootstrapNodeManager != null) {
2369 bootstrapNodeManager.stop();
2371 if (this.cacheFlusher != null) {
2372 this.cacheFlusher.join();
2374 if (this.walRoller != null) {
2375 this.walRoller.close();
2377 if (this.compactSplitThread != null) {
2378 this.compactSplitThread.join();
2380 stopExecutorService();
2381 if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
2382 this.replicationSourceHandler.stopReplicationService();
2383 } else {
2384 if (this.replicationSourceHandler != null) {
2385 this.replicationSourceHandler.stopReplicationService();
2387 if (this.replicationSinkHandler != null) {
2388 this.replicationSinkHandler.stopReplicationService();
2394 * @return Return the object that implements the replication source executorService.
2396 @Override
2397 public ReplicationSourceService getReplicationSourceService() {
2398 return replicationSourceHandler;
2402 * @return Return the object that implements the replication sink executorService.
2404 public ReplicationSinkService getReplicationSinkService() {
2405 return replicationSinkHandler;
2409 * Get the current master from ZooKeeper and open the RPC connection to it.
2410 * To get a fresh connection, the current rssStub must be null.
2411 * Method will block until a master is available. You can break from this
2412 * block by requesting the server stop.
2414 * @return master + port, or null if server has been stopped
2416 private synchronized ServerName createRegionServerStatusStub() {
2417 // Create RS stub without refreshing the master node from ZK, use cached data
2418 return createRegionServerStatusStub(false);
2422 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2423 * connection, the current rssStub must be null. Method will block until a master is available.
2424 * You can break from this block by requesting the server stop.
2425 * @param refresh If true then master address will be read from ZK, otherwise use cached data
2426 * @return master + port, or null if server has been stopped
2428 @InterfaceAudience.Private
2429 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2430 if (rssStub != null) {
2431 return masterAddressTracker.getMasterAddress();
2433 ServerName sn = null;
2434 long previousLogTime = 0;
2435 RegionServerStatusService.BlockingInterface intRssStub = null;
2436 LockService.BlockingInterface intLockStub = null;
2437 boolean interrupted = false;
2438 try {
2439 while (keepLooping()) {
2440 sn = this.masterAddressTracker.getMasterAddress(refresh);
2441 if (sn == null) {
2442 if (!keepLooping()) {
2443 // give up with no connection.
2444 LOG.debug("No master found and cluster is stopped; bailing out");
2445 return null;
2447 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2448 LOG.debug("No master found; retry");
2449 previousLogTime = EnvironmentEdgeManager.currentTime();
2451 refresh = true; // let's try pull it from ZK directly
2452 if (sleepInterrupted(200)) {
2453 interrupted = true;
2455 continue;
2457 try {
2458 BlockingRpcChannel channel =
2459 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2460 shortOperationTimeout);
2461 intRssStub = RegionServerStatusService.newBlockingStub(channel);
2462 intLockStub = LockService.newBlockingStub(channel);
2463 break;
2464 } catch (IOException e) {
2465 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2466 e = e instanceof RemoteException ?
2467 ((RemoteException)e).unwrapRemoteException() : e;
2468 if (e instanceof ServerNotRunningYetException) {
2469 LOG.info("Master isn't available yet, retrying");
2470 } else {
2471 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2473 previousLogTime = EnvironmentEdgeManager.currentTime();
2475 if (sleepInterrupted(200)) {
2476 interrupted = true;
2480 } finally {
2481 if (interrupted) {
2482 Thread.currentThread().interrupt();
2485 this.rssStub = intRssStub;
2486 this.lockStub = intLockStub;
2487 return sn;
2491 * @return True if we should break loop because cluster is going down or this server has been
2492 * stopped or hdfs has gone bad.
2494 private boolean keepLooping() {
2495 return !this.stopped && isClusterUp();
2499 * Let the master know we're here Run initialization using parameters passed
2500 * us by the master.
2501 * @return A Map of key/value configurations we got from the Master else
2502 * null if we failed to register.
2503 * @throws IOException
2505 private RegionServerStartupResponse reportForDuty() throws IOException {
2506 if (this.masterless) {
2507 return RegionServerStartupResponse.getDefaultInstance();
2509 ServerName masterServerName = createRegionServerStatusStub(true);
2510 RegionServerStatusService.BlockingInterface rss = rssStub;
2511 if (masterServerName == null || rss == null) {
2512 return null;
2514 RegionServerStartupResponse result = null;
2515 try {
2516 rpcServices.requestCount.reset();
2517 rpcServices.rpcGetRequestCount.reset();
2518 rpcServices.rpcScanRequestCount.reset();
2519 rpcServices.rpcFullScanRequestCount.reset();
2520 rpcServices.rpcMultiRequestCount.reset();
2521 rpcServices.rpcMutateRequestCount.reset();
2522 LOG.info("reportForDuty to master=" + masterServerName + " with port="
2523 + rpcServices.getSocketAddress().getPort() + ", startcode=" + this.startcode);
2524 long now = EnvironmentEdgeManager.currentTime();
2525 int port = rpcServices.getSocketAddress().getPort();
2526 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2527 if (!StringUtils.isBlank(useThisHostnameInstead)) {
2528 request.setUseThisHostnameInstead(useThisHostnameInstead);
2530 request.setPort(port);
2531 request.setServerStartCode(this.startcode);
2532 request.setServerCurrentTime(now);
2533 result = rss.regionServerStartup(null, request.build());
2534 } catch (ServiceException se) {
2535 IOException ioe = ProtobufUtil.getRemoteException(se);
2536 if (ioe instanceof ClockOutOfSyncException) {
2537 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync",
2538 ioe);
2539 // Re-throw IOE will cause RS to abort
2540 throw ioe;
2541 } else if (ioe instanceof ServerNotRunningYetException) {
2542 LOG.debug("Master is not running yet");
2543 } else {
2544 LOG.warn("error telling master we are up", se);
2546 rssStub = null;
2548 return result;
2551 @Override
2552 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2553 try {
2554 GetLastFlushedSequenceIdRequest req =
2555 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2556 RegionServerStatusService.BlockingInterface rss = rssStub;
2557 if (rss == null) { // Try to connect one more time
2558 createRegionServerStatusStub();
2559 rss = rssStub;
2560 if (rss == null) {
2561 // Still no luck, we tried
2562 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2563 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2564 .build();
2567 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2568 return RegionStoreSequenceIds.newBuilder()
2569 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2570 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2571 } catch (ServiceException e) {
2572 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2573 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2574 .build();
2579 * Close meta region if we carry it
2580 * @param abort Whether we're running an abort.
2582 private void closeMetaTableRegions(final boolean abort) {
2583 HRegion meta = null;
2584 this.onlineRegionsLock.writeLock().lock();
2585 try {
2586 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2587 RegionInfo hri = e.getValue().getRegionInfo();
2588 if (hri.isMetaRegion()) {
2589 meta = e.getValue();
2591 if (meta != null) {
2592 break;
2595 } finally {
2596 this.onlineRegionsLock.writeLock().unlock();
2598 if (meta != null) {
2599 closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2604 * Schedule closes on all user regions.
2605 * Should be safe calling multiple times because it wont' close regions
2606 * that are already closed or that are closing.
2607 * @param abort Whether we're running an abort.
2609 private void closeUserRegions(final boolean abort) {
2610 this.onlineRegionsLock.writeLock().lock();
2611 try {
2612 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2613 HRegion r = e.getValue();
2614 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
2615 // Don't update zk with this close transition; pass false.
2616 closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2619 } finally {
2620 this.onlineRegionsLock.writeLock().unlock();
2624 protected Map<String, HRegion> getOnlineRegions() {
2625 return this.onlineRegions;
2628 public int getNumberOfOnlineRegions() {
2629 return this.onlineRegions.size();
2633 * For tests, web ui and metrics.
2634 * This method will only work if HRegionServer is in the same JVM as client;
2635 * HRegion cannot be serialized to cross an rpc.
2637 public Collection<HRegion> getOnlineRegionsLocalContext() {
2638 Collection<HRegion> regions = this.onlineRegions.values();
2639 return Collections.unmodifiableCollection(regions);
2642 @Override
2643 public void addRegion(HRegion region) {
2644 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2645 configurationManager.registerObserver(region);
2648 private void addRegion(SortedMap<Long, Collection<HRegion>> sortedRegions, HRegion region,
2649 long size) {
2650 if (!sortedRegions.containsKey(size)) {
2651 sortedRegions.put(size, new ArrayList<>());
2653 sortedRegions.get(size).add(region);
2656 * @return A new Map of online regions sorted by region off-heap size with the first entry being
2657 * the biggest.
2659 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOffHeapSize() {
2660 // we'll sort the regions in reverse
2661 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2662 // Copy over all regions. Regions are sorted by size with biggest first.
2663 for (HRegion region : this.onlineRegions.values()) {
2664 addRegion(sortedRegions, region, region.getMemStoreOffHeapSize());
2666 return sortedRegions;
2670 * @return A new Map of online regions sorted by region heap size with the first entry being the
2671 * biggest.
2673 SortedMap<Long, Collection<HRegion>> getCopyOfOnlineRegionsSortedByOnHeapSize() {
2674 // we'll sort the regions in reverse
2675 SortedMap<Long, Collection<HRegion>> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
2676 // Copy over all regions. Regions are sorted by size with biggest first.
2677 for (HRegion region : this.onlineRegions.values()) {
2678 addRegion(sortedRegions, region, region.getMemStoreHeapSize());
2680 return sortedRegions;
2683 /** @return reference to FlushRequester */
2684 @Override
2685 public FlushRequester getFlushRequester() {
2686 return this.cacheFlusher;
2689 @Override
2690 public CompactionRequester getCompactionRequestor() {
2691 return this.compactSplitThread;
2694 @Override
2695 public LeaseManager getLeaseManager() {
2696 return leaseManager;
2700 * @return {@code true} when the data file system is available, {@code false} otherwise.
2702 boolean isDataFileSystemOk() {
2703 return this.dataFsOk;
2706 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2707 return this.rsHost;
2710 @Override
2711 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2712 return this.regionsInTransitionInRS;
2715 @Override
2716 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2717 return rsQuotaManager;
2721 // Main program and support routines
2724 * Load the replication executorService objects, if any
2726 private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
2727 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
2728 // read in the name of the source replication class from the config file.
2729 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2730 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2732 // read in the name of the sink replication class from the config file.
2733 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2734 HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
2736 // If both the sink and the source class names are the same, then instantiate
2737 // only one object.
2738 if (sourceClassname.equals(sinkClassname)) {
2739 server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2740 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2741 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
2742 server.sameReplicationSourceAndSink = true;
2743 } else {
2744 server.replicationSourceHandler = newReplicationInstance(sourceClassname,
2745 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2746 server.replicationSinkHandler = newReplicationInstance(sinkClassname,
2747 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
2748 server.sameReplicationSourceAndSink = false;
2752 private static <T extends ReplicationService> T newReplicationInstance(String classname,
2753 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
2754 Path oldLogDir, WALFactory walFactory) throws IOException {
2755 final Class<? extends T> clazz;
2756 try {
2757 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2758 clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
2759 } catch (java.lang.ClassNotFoundException nfe) {
2760 throw new IOException("Could not find class for " + classname);
2762 T service = ReflectionUtils.newInstance(clazz, conf);
2763 service.initialize(server, walFs, logDir, oldLogDir, walFactory);
2764 return service;
2767 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
2768 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
2769 if(!this.isOnline()){
2770 return walGroupsReplicationStatus;
2772 List<ReplicationSourceInterface> allSources = new ArrayList<>();
2773 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
2774 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
2775 for(ReplicationSourceInterface source: allSources){
2776 walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
2778 return walGroupsReplicationStatus;
2782 * Utility for constructing an instance of the passed HRegionServer class.
2784 static HRegionServer constructRegionServer(
2785 final Class<? extends HRegionServer> regionServerClass,
2786 final Configuration conf
2788 try {
2789 Constructor<? extends HRegionServer> c =
2790 regionServerClass.getConstructor(Configuration.class);
2791 return c.newInstance(conf);
2792 } catch (Exception e) {
2793 throw new RuntimeException("Failed construction of " + "Regionserver: "
2794 + regionServerClass.toString(), e);
2799 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2801 public static void main(String[] args) {
2802 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
2803 VersionInfo.logVersion();
2804 Configuration conf = HBaseConfiguration.create();
2805 @SuppressWarnings("unchecked")
2806 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2807 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2809 new HRegionServerCommandLine(regionServerClass).doMain(args);
2813 * Gets the online regions of the specified table.
2814 * This method looks at the in-memory onlineRegions. It does not go to <code>hbase:meta</code>.
2815 * Only returns <em>online</em> regions. If a region on this table has been
2816 * closed during a disable, etc., it will not be included in the returned list.
2817 * So, the returned list may not necessarily be ALL regions in this table, its
2818 * all the ONLINE regions in the table.
2819 * @param tableName table to limit the scope of the query
2820 * @return Online regions from <code>tableName</code>
2822 @Override
2823 public List<HRegion> getRegions(TableName tableName) {
2824 List<HRegion> tableRegions = new ArrayList<>();
2825 synchronized (this.onlineRegions) {
2826 for (HRegion region: this.onlineRegions.values()) {
2827 RegionInfo regionInfo = region.getRegionInfo();
2828 if(regionInfo.getTable().equals(tableName)) {
2829 tableRegions.add(region);
2833 return tableRegions;
2836 @Override
2837 public List<HRegion> getRegions() {
2838 List<HRegion> allRegions;
2839 synchronized (this.onlineRegions) {
2840 // Return a clone copy of the onlineRegions
2841 allRegions = new ArrayList<>(onlineRegions.values());
2843 return allRegions;
2847 * Gets the online tables in this RS.
2848 * This method looks at the in-memory onlineRegions.
2849 * @return all the online tables in this RS
2851 public Set<TableName> getOnlineTables() {
2852 Set<TableName> tables = new HashSet<>();
2853 synchronized (this.onlineRegions) {
2854 for (Region region: this.onlineRegions.values()) {
2855 tables.add(region.getTableDescriptor().getTableName());
2858 return tables;
2861 public String[] getRegionServerCoprocessors() {
2862 TreeSet<String> coprocessors = new TreeSet<>();
2863 try {
2864 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2865 } catch (IOException exception) {
2866 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2867 "skipping.");
2868 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2870 Collection<HRegion> regions = getOnlineRegionsLocalContext();
2871 for (HRegion region: regions) {
2872 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2873 try {
2874 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2875 } catch (IOException exception) {
2876 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2877 "; skipping.");
2878 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2881 coprocessors.addAll(rsHost.getCoprocessors());
2882 return coprocessors.toArray(new String[0]);
2886 * Try to close the region, logs a warning on failure but continues.
2887 * @param region Region to close
2889 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
2890 try {
2891 if (!closeRegion(region.getEncodedName(), abort, null)) {
2892 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2893 " - ignoring and continuing");
2895 } catch (IOException e) {
2896 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2897 " - ignoring and continuing", e);
2902 * Close asynchronously a region, can be called from the master or internally by the regionserver
2903 * when stopping. If called from the master, the region will update the status.
2905 * <p>
2906 * If an opening was in progress, this method will cancel it, but will not start a new close. The
2907 * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
2908 * </p>
2910 * <p>
2911 * If a close was in progress, this new request will be ignored, and an exception thrown.
2912 * </p>
2914 * @param encodedName Region to close
2915 * @param abort True if we are aborting
2916 * @param destination Where the Region is being moved too... maybe null if unknown.
2917 * @return True if closed a region.
2918 * @throws NotServingRegionException if the region is not online
2920 protected boolean closeRegion(String encodedName, final boolean abort,
2921 final ServerName destination)
2922 throws NotServingRegionException {
2923 //Check for permissions to close.
2924 HRegion actualRegion = this.getRegion(encodedName);
2925 // Can be null if we're calling close on a region that's not online
2926 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2927 try {
2928 actualRegion.getCoprocessorHost().preClose(false);
2929 } catch (IOException exp) {
2930 LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2931 return false;
2935 // previous can come back 'null' if not in map.
2936 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName),
2937 Boolean.FALSE);
2939 if (Boolean.TRUE.equals(previous)) {
2940 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2941 "trying to OPEN. Cancelling OPENING.");
2942 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
2943 // The replace failed. That should be an exceptional case, but theoretically it can happen.
2944 // We're going to try to do a standard close then.
2945 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2946 " Doing a standard close now");
2947 return closeRegion(encodedName, abort, destination);
2949 // Let's get the region from the online region list again
2950 actualRegion = this.getRegion(encodedName);
2951 if (actualRegion == null) { // If already online, we still need to close it.
2952 LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2953 // The master deletes the znode when it receives this exception.
2954 throw new NotServingRegionException("The region " + encodedName +
2955 " was opening but not yet served. Opening is cancelled.");
2957 } else if (previous == null) {
2958 LOG.info("Received CLOSE for {}", encodedName);
2959 } else if (Boolean.FALSE.equals(previous)) {
2960 LOG.info("Received CLOSE for the region: " + encodedName +
2961 ", which we are already trying to CLOSE, but not completed yet");
2962 return true;
2965 if (actualRegion == null) {
2966 LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
2967 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
2968 // The master deletes the znode when it receives this exception.
2969 throw new NotServingRegionException("The region " + encodedName +
2970 " is not online, and is not opening.");
2973 CloseRegionHandler crh;
2974 final RegionInfo hri = actualRegion.getRegionInfo();
2975 if (hri.isMetaRegion()) {
2976 crh = new CloseMetaHandler(this, this, hri, abort);
2977 } else {
2978 crh = new CloseRegionHandler(this, this, hri, abort, destination);
2980 this.executorService.submit(crh);
2981 return true;
2985 * @return HRegion for the passed binary <code>regionName</code> or null if
2986 * named region is not member of the online regions.
2988 public HRegion getOnlineRegion(final byte[] regionName) {
2989 String encodedRegionName = RegionInfo.encodeRegionName(regionName);
2990 return this.onlineRegions.get(encodedRegionName);
2993 @Override
2994 public HRegion getRegion(final String encodedRegionName) {
2995 return this.onlineRegions.get(encodedRegionName);
2999 @Override
3000 public boolean removeRegion(final HRegion r, ServerName destination) {
3001 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3002 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3003 if (destination != null) {
3004 long closeSeqNum = r.getMaxFlushedSeqId();
3005 if (closeSeqNum == HConstants.NO_SEQNUM) {
3006 // No edits in WAL for this region; get the sequence number when the region was opened.
3007 closeSeqNum = r.getOpenSeqNum();
3008 if (closeSeqNum == HConstants.NO_SEQNUM) {
3009 closeSeqNum = 0;
3012 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3013 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3014 if (selfMove) {
3015 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3016 r.getRegionInfo().getEncodedName(),
3017 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3020 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3021 configurationManager.deregisterObserver(r);
3022 return toReturn != null;
3026 * Protected Utility method for safely obtaining an HRegion handle.
3028 * @param regionName Name of online {@link HRegion} to return
3029 * @return {@link HRegion} for <code>regionName</code>
3031 protected HRegion getRegion(final byte[] regionName)
3032 throws NotServingRegionException {
3033 String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3034 return getRegionByEncodedName(regionName, encodedRegionName);
3037 public HRegion getRegionByEncodedName(String encodedRegionName)
3038 throws NotServingRegionException {
3039 return getRegionByEncodedName(null, encodedRegionName);
3042 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3043 throws NotServingRegionException {
3044 HRegion region = this.onlineRegions.get(encodedRegionName);
3045 if (region == null) {
3046 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3047 if (moveInfo != null) {
3048 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3050 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3051 String regionNameStr = regionName == null?
3052 encodedRegionName: Bytes.toStringBinary(regionName);
3053 if (isOpening != null && isOpening) {
3054 throw new RegionOpeningException("Region " + regionNameStr +
3055 " is opening on " + this.serverName);
3057 throw new NotServingRegionException("" + regionNameStr +
3058 " is not online on " + this.serverName);
3060 return region;
3064 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
3065 * IOE if it isn't already.
3067 * @param t Throwable
3068 * @param msg Message to log in error. Can be null.
3069 * @return Throwable converted to an IOE; methods can only let out IOEs.
3071 private Throwable cleanup(final Throwable t, final String msg) {
3072 // Don't log as error if NSRE; NSRE is 'normal' operation.
3073 if (t instanceof NotServingRegionException) {
3074 LOG.debug("NotServingRegionException; " + t.getMessage());
3075 return t;
3077 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3078 if (msg == null) {
3079 LOG.error("", e);
3080 } else {
3081 LOG.error(msg, e);
3083 if (!rpcServices.checkOOME(t)) {
3084 checkFileSystem();
3086 return t;
3090 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3091 * @return Make <code>t</code> an IOE if it isn't already.
3093 private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3094 return (t instanceof IOException ? (IOException) t : msg == null
3095 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3099 * Checks to see if the file system is still accessible. If not, sets
3100 * abortRequested and stopRequested
3102 * @return false if file system is not available
3104 boolean checkFileSystem() {
3105 if (this.dataFsOk && this.dataFs != null) {
3106 try {
3107 FSUtils.checkFileSystemAvailable(this.dataFs);
3108 } catch (IOException e) {
3109 abort("File System not available", e);
3110 this.dataFsOk = false;
3113 return this.dataFsOk;
3116 @Override
3117 public void updateRegionFavoredNodesMapping(String encodedRegionName,
3118 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3119 Address[] addr = new Address[favoredNodes.size()];
3120 // Refer to the comment on the declaration of regionFavoredNodesMap on why
3121 // it is a map of region name to Address[]
3122 for (int i = 0; i < favoredNodes.size(); i++) {
3123 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(),
3124 favoredNodes.get(i).getPort());
3126 regionFavoredNodesMap.put(encodedRegionName, addr);
3130 * Return the favored nodes for a region given its encoded name. Look at the
3131 * comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[]
3132 * here.
3133 * @param encodedRegionName the encoded region name.
3134 * @return array of favored locations
3136 @Override
3137 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3138 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3141 @Override
3142 public ServerNonceManager getNonceManager() {
3143 return this.nonceManager;
3146 private static class MovedRegionInfo {
3147 private final ServerName serverName;
3148 private final long seqNum;
3150 MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3151 this.serverName = serverName;
3152 this.seqNum = closeSeqNum;
3155 public ServerName getServerName() {
3156 return serverName;
3159 public long getSeqNum() {
3160 return seqNum;
3165 * We need a timeout. If not there is a risk of giving a wrong information: this would double
3166 * the number of network calls instead of reducing them.
3168 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3170 private void addToMovedRegions(String encodedName, ServerName destination,
3171 long closeSeqNum, boolean selfMove) {
3172 if (selfMove) {
3173 LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3174 return;
3176 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
3177 closeSeqNum);
3178 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3181 void removeFromMovedRegions(String encodedName) {
3182 movedRegionInfoCache.invalidate(encodedName);
3185 @InterfaceAudience.Private
3186 public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3187 return movedRegionInfoCache.getIfPresent(encodedRegionName);
3190 @InterfaceAudience.Private
3191 public int movedRegionCacheExpiredTime() {
3192 return TIMEOUT_REGION_MOVED;
3195 private String getMyEphemeralNodePath() {
3196 return zooKeeper.getZNodePaths().getRsPath(serverName);
3199 private boolean isHealthCheckerConfigured() {
3200 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3201 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3205 * @return the underlying {@link CompactSplit} for the servers
3207 public CompactSplit getCompactSplitThread() {
3208 return this.compactSplitThread;
3211 CoprocessorServiceResponse execRegionServerService(
3212 @SuppressWarnings("UnusedParameters") final RpcController controller,
3213 final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3214 try {
3215 ServerRpcController serviceController = new ServerRpcController();
3216 CoprocessorServiceCall call = serviceRequest.getCall();
3217 String serviceName = call.getServiceName();
3218 Service service = coprocessorServiceHandlers.get(serviceName);
3219 if (service == null) {
3220 throw new UnknownProtocolException(null,
3221 "No registered coprocessor executorService found for " + serviceName);
3223 ServiceDescriptor serviceDesc =
3224 service.getDescriptorForType();
3226 String methodName = call.getMethodName();
3227 MethodDescriptor methodDesc =
3228 serviceDesc.findMethodByName(methodName);
3229 if (methodDesc == null) {
3230 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName +
3231 " called on executorService " + serviceName);
3234 Message request =
3235 CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3236 final Message.Builder responseBuilder =
3237 service.getResponsePrototype(methodDesc).newBuilderForType();
3238 service.callMethod(methodDesc, serviceController, request, message -> {
3239 if (message != null) {
3240 responseBuilder.mergeFrom(message);
3243 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3244 if (exception != null) {
3245 throw exception;
3247 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3248 } catch (IOException ie) {
3249 throw new ServiceException(ie);
3254 * May be null if this is a master which not carry table.
3256 * @return The block cache instance used by the regionserver.
3258 @Override
3259 public Optional<BlockCache> getBlockCache() {
3260 return Optional.ofNullable(this.blockCache);
3264 * May be null if this is a master which not carry table.
3266 * @return The cache for mob files used by the regionserver.
3268 @Override
3269 public Optional<MobFileCache> getMobFileCache() {
3270 return Optional.ofNullable(this.mobFileCache);
3274 * @return : Returns the ConfigurationManager object for testing purposes.
3276 ConfigurationManager getConfigurationManager() {
3277 return configurationManager;
3280 CacheEvictionStats clearRegionBlockCache(Region region) {
3281 long evictedBlocks = 0;
3283 for(Store store : region.getStores()) {
3284 for(StoreFile hFile : store.getStorefiles()) {
3285 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3289 return CacheEvictionStats.builder()
3290 .withEvictedBlocks(evictedBlocks)
3291 .build();
3294 @Override
3295 public double getCompactionPressure() {
3296 double max = 0;
3297 for (Region region : onlineRegions.values()) {
3298 for (Store store : region.getStores()) {
3299 double normCount = store.getCompactionPressure();
3300 if (normCount > max) {
3301 max = normCount;
3305 return max;
3308 @Override
3309 public HeapMemoryManager getHeapMemoryManager() {
3310 return hMemManager;
3313 public MemStoreFlusher getMemStoreFlusher() {
3314 return cacheFlusher;
3318 * For testing
3319 * @return whether all wal roll request finished for this regionserver
3321 @InterfaceAudience.Private
3322 public boolean walRollRequestFinished() {
3323 return this.walRoller.walRollFinished();
3326 @Override
3327 public ThroughputController getFlushThroughputController() {
3328 return flushThroughputController;
3331 @Override
3332 public double getFlushPressure() {
3333 if (getRegionServerAccounting() == null || cacheFlusher == null) {
3334 // return 0 during RS initialization
3335 return 0.0;
3337 return getRegionServerAccounting().getFlushPressure();
3340 @Override
3341 public void onConfigurationChange(Configuration newConf) {
3342 ThroughputController old = this.flushThroughputController;
3343 if (old != null) {
3344 old.stop("configuration change");
3346 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3347 try {
3348 Superusers.initialize(newConf);
3349 } catch (IOException e) {
3350 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3353 // update region server coprocessor if the configuration has changed.
3354 if (CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
3355 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)) {
3356 LOG.info("Update region server coprocessors because the configuration has changed");
3357 this.rsHost = new RegionServerCoprocessorHost(this, newConf);
3361 @Override
3362 public MetricsRegionServer getMetrics() {
3363 return metricsRegionServer;
3366 @Override
3367 public SecureBulkLoadManager getSecureBulkLoadManager() {
3368 return this.secureBulkLoadManager;
3371 @Override
3372 public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
3373 final Abortable abort) {
3374 final LockServiceClient client =
3375 new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
3376 return client.regionLock(regionInfo, description, abort);
3379 @Override
3380 public void unassign(byte[] regionName) throws IOException {
3381 FutureUtils.get(asyncClusterConnection.getAdmin().unassign(regionName, false));
3384 @Override
3385 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3386 return this.rsSpaceQuotaManager;
3389 @Override
3390 public boolean reportFileArchivalForQuotas(TableName tableName,
3391 Collection<Entry<String, Long>> archivedFiles) {
3392 if (TEST_SKIP_REPORTING_TRANSITION) {
3393 return false;
3395 RegionServerStatusService.BlockingInterface rss = rssStub;
3396 if (rss == null || rsSpaceQuotaManager == null) {
3397 // the current server could be stopping.
3398 LOG.trace("Skipping file archival reporting to HMaster as stub is null");
3399 return false;
3401 try {
3402 RegionServerStatusProtos.FileArchiveNotificationRequest request =
3403 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
3404 rss.reportFileArchival(null, request);
3405 } catch (ServiceException se) {
3406 IOException ioe = ProtobufUtil.getRemoteException(se);
3407 if (ioe instanceof PleaseHoldException) {
3408 if (LOG.isTraceEnabled()) {
3409 LOG.trace("Failed to report file archival(s) to Master because it is initializing."
3410 + " This will be retried.", ioe);
3412 // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
3413 return false;
3415 if (rssStub == rss) {
3416 rssStub = null;
3418 // re-create the stub if we failed to report the archival
3419 createRegionServerStatusStub(true);
3420 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
3421 return false;
3423 return true;
3426 void executeProcedure(long procId, RSProcedureCallable callable) {
3427 executorService.submit(new RSProcedureHandler(this, procId, callable));
3430 public void remoteProcedureComplete(long procId, Throwable error) {
3431 procedureResultReporter.complete(procId, error);
3434 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
3435 RegionServerStatusService.BlockingInterface rss;
3436 // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
3437 for (;;) {
3438 rss = rssStub;
3439 if (rss != null) {
3440 break;
3442 createRegionServerStatusStub();
3444 try {
3445 rss.reportProcedureDone(null, request);
3446 } catch (ServiceException se) {
3447 if (rssStub == rss) {
3448 rssStub = null;
3450 throw ProtobufUtil.getRemoteException(se);
3455 * Will ignore the open/close region procedures which already submitted or executed.
3457 * When master had unfinished open/close region procedure and restarted, new active master may
3458 * send duplicate open/close region request to regionserver. The open/close request is submitted
3459 * to a thread pool and execute. So first need a cache for submitted open/close region procedures.
3461 * After the open/close region request executed and report region transition succeed, cache it in
3462 * executed region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
3463 * transition succeed, master will not send the open/close region request to regionserver again.
3464 * And we thought that the ongoing duplicate open/close region request should not be delayed more
3465 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds.
3467 * See HBASE-22404 for more details.
3469 * @param procId the id of the open/close region procedure
3470 * @return true if the procedure can be submitted.
3472 boolean submitRegionProcedure(long procId) {
3473 if (procId == -1) {
3474 return true;
3476 // Ignore the region procedures which already submitted.
3477 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
3478 if (previous != null) {
3479 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
3480 return false;
3482 // Ignore the region procedures which already executed.
3483 if (executedRegionProcedures.getIfPresent(procId) != null) {
3484 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
3485 return false;
3487 return true;
3491 * See {@link #submitRegionProcedure(long)}.
3492 * @param procId the id of the open/close region procedure
3494 public void finishRegionProcedure(long procId) {
3495 executedRegionProcedures.put(procId, procId);
3496 submittedRegionProcedures.remove(procId);
3500 * Force to terminate region server when abort timeout.
3502 private static class SystemExitWhenAbortTimeout extends TimerTask {
3504 public SystemExitWhenAbortTimeout() {
3507 @Override
3508 public void run() {
3509 LOG.warn("Aborting region server timed out, terminating forcibly" +
3510 " and does not wait for any running shutdown hooks or finalizers to finish their work." +
3511 " Thread dump to stdout.");
3512 Threads.printThreadInfo(System.out, "Zombie HRegionServer");
3513 Runtime.getRuntime().halt(1);
3517 @InterfaceAudience.Private
3518 public CompactedHFilesDischarger getCompactedHFilesDischarger() {
3519 return compactedFileDischarger;
3523 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
3524 * @return pause time
3526 @InterfaceAudience.Private
3527 public long getRetryPauseTime() {
3528 return this.retryPauseTime;
3531 @Override
3532 public Optional<ServerName> getActiveMaster() {
3533 return Optional.ofNullable(masterAddressTracker.getMasterAddress());
3536 @Override
3537 public List<ServerName> getBackupMasters() {
3538 return masterAddressTracker.getBackupMasters();
3541 @Override
3542 public Iterator<ServerName> getBootstrapNodes() {
3543 return bootstrapNodeManager.getBootstrapNodes().iterator();
3546 @Override
3547 public List<HRegionLocation> getMetaLocations() {
3548 return metaRegionLocationCache.getMetaRegionLocations();
3551 @Override
3552 protected NamedQueueRecorder createNamedQueueRecord() {
3553 final boolean isOnlineLogProviderEnabled = conf.getBoolean(
3554 HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
3555 if (isOnlineLogProviderEnabled) {
3556 return NamedQueueRecorder.getInstance(conf);
3557 } else {
3558 return null;
3562 @Override
3563 protected boolean clusterMode() {
3564 // this method will be called in the constructor of super class, so we can not return masterless
3565 // directly here, as it will always be false.
3566 return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
3569 @InterfaceAudience.Private
3570 public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
3571 return brokenStoreFileCleaner;
3574 RSSnapshotVerifier getRsSnapshotVerifier() {
3575 return rsSnapshotVerifier;
3578 @Override
3579 protected void stopChores() {
3580 shutdownChore(nonceManagerChore);
3581 shutdownChore(compactionChecker);
3582 shutdownChore(compactedFileDischarger);
3583 shutdownChore(periodicFlusher);
3584 shutdownChore(healthCheckChore);
3585 shutdownChore(executorStatusChore);
3586 shutdownChore(storefileRefresher);
3587 shutdownChore(fsUtilizationChore);
3588 shutdownChore(slowLogTableOpsChore);
3589 shutdownChore(brokenStoreFileCleaner);
3592 @Override
3593 public RegionReplicationBufferManager getRegionReplicationBufferManager() {
3594 return regionReplicationBufferManager;