servers =
@@ -132,6 +133,7 @@ public class RegionServerTracker extends ZKListener {
: ServerMetricsBuilder.of(serverName);
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
}
+ serverManager.findOutDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index cfbd52fc6e..201466e75c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -25,13 +25,11 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -180,41 +178,6 @@ public class ServerManager {
private final RpcControllerFactory rpcControllerFactory;
- /**
- * Set of region servers which are dead but not processed immediately. If one
- * server died before master enables ServerShutdownHandler, the server will be
- * added to this set and will be processed through calling
- * {@link ServerManager#processQueuedDeadServers()} by master.
- *
- * A dead server is a server instance known to be dead, not listed in the /hbase/rs
- * znode any more. It may have not been submitted to ServerShutdownHandler yet
- * because the handler is not enabled.
- *
- * A dead server, which has been submitted to ServerShutdownHandler while the
- * handler is not enabled, is queued up.
- *
- * So this is a set of region servers known to be dead but not submitted to
- * ServerShutdownHandler for processing yet.
- */
- private Set queuedDeadServers = new HashSet<>();
-
- /**
- * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not
- * fully processed immediately.
- *
- * If one server died before assignment manager finished the failover cleanup, the server will be
- * added to this set and will be processed through calling
- * {@link ServerManager#processQueuedDeadServers()} by assignment manager.
- *
- * The Boolean value indicates whether log split is needed inside ServerShutdownHandler
- *
- * ServerShutdownHandler processes a dead server submitted to the handler after the handler is
- * enabled. It may not be able to complete the processing because meta is not yet online or master
- * is currently in startup mode. In this case, the dead server will be parked in this set
- * temporarily.
- */
- private Map requeuedDeadServers = new ConcurrentHashMap<>();
-
/** Listeners that are called on server events. */
private List listeners = new CopyOnWriteArrayList<>();
@@ -378,6 +341,26 @@ public class ServerManager {
}
/**
+ * Find out the region servers crashed between the crash of the previous master instance and the
+ * current master instance and schedule SCP for them.
+ *
+ * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
+ * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
+ * to find out whether there are servers which are already dead.
+ *
+ * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
+ * concurrency issue.
+ * @param deadServersFromPE the region servers which already have SCP associated.
+ * @param liveServersFromWALDir the live region servers from wal directory.
+ */
+ void findOutDeadServersAndProcess(Set deadServersFromPE,
+ Set liveServersFromWALDir) {
+ deadServersFromPE.forEach(deadservers::add);
+ liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
+ .forEach(this::expireServer);
+ }
+
+ /**
* Checks if the clock skew between the server and the master. If the clock skew exceeds the
* configured max, it will throw an exception; if it exceeds the configured warning threshold,
* it will log a warning but start normally.
@@ -386,7 +369,7 @@ public class ServerManager {
* @throws ClockOutOfSyncException if the skew exceeds the configured max value
*/
private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
- throws ClockOutOfSyncException {
+ throws ClockOutOfSyncException {
long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
if (skew > maxSkew) {
String message = "Server " + serverName + " has been " +
@@ -406,9 +389,7 @@ public class ServerManager {
* If this server is on the dead list, reject it with a YouAreDeadException.
* If it was dead but came back with a new start code, remove the old entry
* from the dead list.
- * @param serverName
* @param what START or REPORT
- * @throws org.apache.hadoop.hbase.YouAreDeadException
*/
private void checkIsDead(final ServerName serverName, final String what)
throws YouAreDeadException {
@@ -589,13 +570,12 @@ public class ServerManager {
return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
}
- /*
- * Expire the passed server. Add it to list of dead servers and queue a
- * shutdown processing.
- * @return True if we queued a ServerCrashProcedure else false if we did not (could happen
- * for many reasons including the fact that its this server that is going down or we already
- * have queued an SCP for this server or SCP processing is currently disabled because we are
- * in startup phase).
+ /**
+ * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
+ * @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
+ * many reasons including the fact that its this server that is going down or we already
+ * have queued an SCP for this server or SCP processing is currently disabled because we
+ * are in startup phase).
*/
public synchronized boolean expireServer(final ServerName serverName) {
// THIS server is going down... can't handle our own expiration.
@@ -605,18 +585,6 @@ public class ServerManager {
}
return false;
}
- // No SCP handling during startup.
- if (!master.isServerCrashProcessingEnabled()) {
- LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
- + "delay expiring server " + serverName);
- // Even though we delay expire of this server, we still need to handle Meta's RIT
- // that are against the crashed server; since when we do RecoverMetaProcedure,
- // the SCP is not enabled yet and Meta's RIT may be suspend forever. See HBase-19287
- master.getAssignmentManager().handleMetaRITOnCrashedServer(serverName);
- this.queuedDeadServers.add(serverName);
- // Return true because though on SCP queued, there will be one queued later.
- return true;
- }
if (this.deadservers.isDeadServer(serverName)) {
LOG.warn("Expiration called on {} but crash processing already in progress", serverName);
return false;
@@ -665,52 +633,6 @@ public class ServerManager {
this.rsAdmins.remove(sn);
}
- public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
- // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
- // in-memory region states, region servers could be down. Meta table can and
- // should be re-assigned, log splitting can be done too. However, it is better to
- // wait till the cleanup is done before re-assigning user regions.
- //
- // We should not wait in the server shutdown handler thread since it can clog
- // the handler threads and meta table could not be re-assigned in case
- // the corresponding server is down. So we queue them up here instead.
- if (!master.getAssignmentManager().isFailoverCleanupDone()) {
- requeuedDeadServers.put(serverName, shouldSplitWal);
- return;
- }
-
- this.deadservers.add(serverName);
- master.getAssignmentManager().submitServerCrash(serverName, shouldSplitWal);
- }
-
- /**
- * Process the servers which died during master's initialization. It will be
- * called after HMaster#assignMeta and AssignmentManager#joinCluster.
- * */
- synchronized void processQueuedDeadServers() {
- if (!master.isServerCrashProcessingEnabled()) {
- LOG.info("Master hasn't enabled ServerShutdownHandler");
- }
- Iterator serverIterator = queuedDeadServers.iterator();
- while (serverIterator.hasNext()) {
- ServerName tmpServerName = serverIterator.next();
- expireServer(tmpServerName);
- serverIterator.remove();
- requeuedDeadServers.remove(tmpServerName);
- }
-
- if (!master.getAssignmentManager().isFailoverCleanupDone()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("AssignmentManager failover cleanup not done.");
- }
- }
-
- for (Map.Entry entry : requeuedDeadServers.entrySet()) {
- processDeadServer(entry.getKey(), entry.getValue());
- }
- requeuedDeadServers.clear();
- }
-
/*
* Remove the server from the drain list.
*/
@@ -975,13 +897,6 @@ public class ServerManager {
return new ArrayList<>(this.drainingServers);
}
- /**
- * @return A copy of the internal set of deadNotExpired servers.
- */
- Set getDeadNotExpiredServers() {
- return new HashSet<>(this.queuedDeadServers);
- }
-
public boolean isServerOnline(ServerName serverName) {
return serverName != null && onlineServers.containsKey(serverName);
}
@@ -993,9 +908,7 @@ public class ServerManager {
* master any more, for example, a very old previous instance).
*/
public synchronized boolean isServerDead(ServerName serverName) {
- return serverName == null || deadservers.isDeadServer(serverName)
- || queuedDeadServers.contains(serverName)
- || requeuedDeadServers.containsKey(serverName);
+ return serverName == null || deadservers.isDeadServer(serverName);
}
public void shutdownCluster() {
@@ -1061,8 +974,6 @@ public class ServerManager {
final List drainingServersCopy = getDrainingServersList();
destServers.removeAll(drainingServersCopy);
- // Remove the deadNotExpired servers from the server list.
- removeDeadNotExpiredServers(destServers);
return destServers;
}
@@ -1073,23 +984,6 @@ public class ServerManager {
return createDestinationServersList(null);
}
- /**
- * Loop through the deadNotExpired server list and remove them from the
- * servers.
- * This function should be used carefully outside of this class. You should use a high level
- * method such as {@link #createDestinationServersList()} instead of managing you own list.
- */
- void removeDeadNotExpiredServers(List servers) {
- Set deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
- if (!deadNotExpiredServersCopy.isEmpty()) {
- for (ServerName server : deadNotExpiredServersCopy) {
- LOG.debug("Removing dead but not expired server: " + server
- + " from eligible server pool.");
- servers.remove(server);
- }
- }
- }
-
/**
* To clear any dead server with same host name and port of any online server
*/
@@ -1259,7 +1153,6 @@ public class ServerManager {
}
}
-
private class FlushedSequenceIdFlusher extends ScheduledChore {
public FlushedSequenceIdFlusher(String name, int p) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 3412c82f2d..0d1fc16254 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
@@ -24,7 +23,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,7 +38,6 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.RegionException;
-import org.apache.hadoop.hbase.RegionStateListener;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException;
@@ -66,7 +63,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState;
import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode;
import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
-import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
@@ -84,7 +80,10 @@ import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,26 +146,13 @@ public class AssignmentManager implements ServerListener {
"hbase.metrics.rit.stuck.warning.threshold";
private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
- private final ProcedureEvent> metaInitializedEvent = new ProcedureEvent<>("meta initialized");
+ private final ProcedureEvent> metaAssignEvent = new ProcedureEvent<>("meta assign");
private final ProcedureEvent> metaLoadEvent = new ProcedureEvent<>("meta load");
- /**
- * Indicator that AssignmentManager has recovered the region states so
- * that ServerCrashProcedure can be fully enabled and re-assign regions
- * of dead servers. So that when re-assignment happens, AssignmentManager
- * has proper region states.
- */
- private final ProcedureEvent> failoverCleanupDone = new ProcedureEvent<>("failover cleanup");
-
/** Listeners that are called on assignment events. */
private final CopyOnWriteArrayList listeners =
new CopyOnWriteArrayList();
- // TODO: why is this different from the listeners (carried over from the old AM)
- private RegionStateListener regionStateListener;
-
- private RegionNormalizer regionNormalizer;
-
private final MetricsAssignmentManager metrics;
private final RegionInTransitionChore ritChore;
private final MasterServices master;
@@ -210,12 +196,9 @@ public class AssignmentManager implements ServerListener {
int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
DEFAULT_RIT_CHORE_INTERVAL_MSEC);
this.ritChore = new RegionInTransitionChore(ritChoreInterval);
-
- // Used for region related procedure.
- setRegionNormalizer(master.getRegionNormalizer());
}
- public void start() throws IOException {
+ public void start() throws IOException, KeeperException {
if (!running.compareAndSet(false, true)) {
return;
}
@@ -227,6 +210,20 @@ public class AssignmentManager implements ServerListener {
// Start the Assignment Thread
startAssignmentThread();
+
+ // load meta region state
+ ZKWatcher zkw = master.getZooKeeper();
+ // it could be null in some tests
+ if (zkw != null) {
+ RegionState regionState = MetaTableLocator.getMetaRegionState(zkw);
+ RegionStateNode regionStateNode =
+ regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ synchronized (regionStateNode) {
+ regionStateNode.setRegionLocation(regionState.getServerName());
+ regionStateNode.setState(regionState.getState());
+ setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN);
+ }
+ }
}
public void stop() {
@@ -257,9 +254,8 @@ public class AssignmentManager implements ServerListener {
// Update meta events (for testing)
if (hasProcExecutor) {
metaLoadEvent.suspend();
- setFailoverCleanupDone(false);
for (RegionInfo hri: getMetaRegionSet()) {
- setMetaInitialized(hri, false);
+ setMetaAssigned(hri, false);
}
}
}
@@ -288,7 +284,7 @@ public class AssignmentManager implements ServerListener {
return getProcedureEnvironment().getProcedureScheduler();
}
- protected int getAssignMaxAttempts() {
+ int getAssignMaxAttempts() {
return assignMaxAttempts;
}
@@ -308,18 +304,6 @@ public class AssignmentManager implements ServerListener {
return this.listeners.remove(listener);
}
- public void setRegionStateListener(final RegionStateListener listener) {
- this.regionStateListener = listener;
- }
-
- public void setRegionNormalizer(final RegionNormalizer normalizer) {
- this.regionNormalizer = normalizer;
- }
-
- public RegionNormalizer getRegionNormalizer() {
- return regionNormalizer;
- }
-
public RegionStates getRegionStates() {
return regionStates;
}
@@ -371,12 +355,8 @@ public class AssignmentManager implements ServerListener {
}
public boolean isCarryingMeta(final ServerName serverName) {
- for (RegionInfo hri: getMetaRegionSet()) {
- if (isCarryingRegion(serverName, hri)) {
- return true;
- }
- }
- return false;
+ // TODO: handle multiple meta
+ return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
}
private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
@@ -402,49 +382,66 @@ public class AssignmentManager implements ServerListener {
// ============================================================================================
// META Event(s) helpers
// ============================================================================================
- public boolean isMetaInitialized() {
- return metaInitializedEvent.isReady();
+ /**
+ * Notice that, this only means the meta region is available on a RS, but the AM may still be
+ * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
+ * before checking this method, unless you can make sure that your piece of code can only be
+ * executed after AM builds the region states.
+ * @see #isMetaLoaded()
+ */
+ public boolean isMetaAssigned() {
+ return metaAssignEvent.isReady();
}
public boolean isMetaRegionInTransition() {
- return !isMetaInitialized();
- }
-
- public boolean waitMetaInitialized(final Procedure proc) {
- // TODO: handle multiple meta. should this wait on all meta?
- // this is used by the ServerCrashProcedure...
- return waitMetaInitialized(proc, RegionInfoBuilder.FIRST_META_REGIONINFO);
+ return !isMetaAssigned();
}
- public boolean waitMetaInitialized(final Procedure proc, final RegionInfo regionInfo) {
- return getMetaInitializedEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
+ /**
+ * Notice that this event does not mean the AM has already finished region state rebuilding. See
+ * the comment of {@link #isMetaAssigned()} for more details.
+ * @see #isMetaAssigned()
+ */
+ public boolean waitMetaAssigned(Procedure> proc, RegionInfo regionInfo) {
+ return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
}
- private void setMetaInitialized(final RegionInfo metaRegionInfo, final boolean isInitialized) {
+ private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
- final ProcedureEvent metaInitEvent = getMetaInitializedEvent(metaRegionInfo);
- if (isInitialized) {
- metaInitEvent.wake(getProcedureScheduler());
+ ProcedureEvent> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
+ if (assigned) {
+ metaAssignEvent.wake(getProcedureScheduler());
} else {
- metaInitEvent.suspend();
+ metaAssignEvent.suspend();
}
}
- private ProcedureEvent getMetaInitializedEvent(final RegionInfo metaRegionInfo) {
+ private ProcedureEvent> getMetaAssignEvent(RegionInfo metaRegionInfo) {
assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
// TODO: handle multiple meta.
- return metaInitializedEvent;
+ return metaAssignEvent;
}
- public boolean waitMetaLoaded(final Procedure proc) {
+ /**
+ * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
+ * @see #isMetaLoaded()
+ * @see #waitMetaAssigned(Procedure, RegionInfo)
+ */
+ public boolean waitMetaLoaded(Procedure> proc) {
return metaLoadEvent.suspendIfNotReady(proc);
}
- protected void wakeMetaLoadedEvent() {
+ @VisibleForTesting
+ void wakeMetaLoadedEvent() {
metaLoadEvent.wake(getProcedureScheduler());
assert isMetaLoaded() : "expected meta to be loaded";
}
+ /**
+ * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
+ * @see #isMetaAssigned()
+ * @see #waitMetaLoaded(Procedure)
+ */
public boolean isMetaLoaded() {
return metaLoadEvent.isReady();
}
@@ -849,7 +846,7 @@ public class AssignmentManager implements ServerListener {
private void updateRegionTransition(final ServerName serverName, final TransitionCode state,
final RegionInfo regionInfo, final long seqId)
throws PleaseHoldException, UnexpectedStateException {
- checkFailoverCleanupCompleted(regionInfo);
+ checkMetaLoaded(regionInfo);
final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
if (regionNode == null) {
@@ -890,7 +887,7 @@ public class AssignmentManager implements ServerListener {
private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state,
final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB)
throws IOException {
- checkFailoverCleanupCompleted(parent);
+ checkMetaLoaded(parent);
if (state != TransitionCode.READY_TO_SPLIT) {
throw new UnexpectedStateException("unsupported split regionState=" + state +
@@ -922,7 +919,7 @@ public class AssignmentManager implements ServerListener {
private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state,
final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException {
- checkFailoverCleanupCompleted(merged);
+ checkMetaLoaded(merged);
if (state != TransitionCode.READY_TO_MERGE) {
throw new UnexpectedStateException("Unsupported merge regionState=" + state +
@@ -1063,7 +1060,7 @@ public class AssignmentManager implements ServerListener {
}
}
- protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) {
+ protected boolean waitServerReportEvent(ServerName serverName, Procedure> proc) {
final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
if (serverNode == null) {
LOG.warn("serverName=null; {}", proc);
@@ -1152,7 +1149,7 @@ public class AssignmentManager implements ServerListener {
public Collection getRegionOverThreshold() {
Map m = this.ritsOverThreshold;
- return m != null? m.values(): Collections.EMPTY_SET;
+ return m != null? m.values(): Collections.emptySet();
}
public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
@@ -1209,27 +1206,44 @@ public class AssignmentManager implements ServerListener {
// TODO: Master load/bootstrap
// ============================================================================================
public void joinCluster() throws IOException {
- final long startTime = System.currentTimeMillis();
+ long startTime = System.nanoTime();
LOG.debug("Joining cluster...");
// Scan hbase:meta to build list of existing regions, servers, and assignment
// hbase:meta is online when we get to here and TableStateManager has been started.
loadMeta();
- for (int i = 0; master.getServerManager().countOfRegionServers() < 1; ++i) {
- LOG.info("Waiting for RegionServers to join; current count=" +
- master.getServerManager().countOfRegionServers());
+ while (master.getServerManager().countOfRegionServers() < 1) {
+ LOG.info("Waiting for RegionServers to join; current count={}",
+ master.getServerManager().countOfRegionServers());
Threads.sleep(250);
}
- LOG.info("Number of RegionServers=" + master.getServerManager().countOfRegionServers());
+ LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
- boolean failover = processofflineServersWithOnlineRegions();
+ processOfflineRegions();
// Start the RIT chore
master.getMasterProcedureExecutor().addChore(this.ritChore);
- LOG.info(String.format("Joined the cluster in %s, failover=%s",
- StringUtils.humanTimeDiff(System.currentTimeMillis() - startTime), failover));
+ long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+ LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
+ }
+
+ // Create assign procedure for offline regions.
+ // Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to
+ // deal with dead server any more, we only deal with the regions in OFFLINE state in this method.
+ // And this is a bit strange, that for new regions, we will add it in CLOSED state instead of
+ // OFFLINE state, and usually there will be a procedure to track them. The
+ // processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really
+ // different now, maybe we do not need this method any more. Need to revisit later.
+ private void processOfflineRegions() {
+ List offlineRegions = regionStates.getRegionStates().stream()
+ .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable()))
+ .map(RegionState::getRegion).collect(Collectors.toList());
+ if (!offlineRegions.isEmpty()) {
+ master.getMasterProcedureExecutor().submitProcedures(
+ master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions));
+ }
}
private void loadMeta() throws IOException {
@@ -1286,117 +1300,21 @@ public class AssignmentManager implements ServerListener {
}
/**
- * Look at what is in meta and the list of servers that have checked in and make reconciliation.
- * We cannot tell definitively the difference between a clean shutdown and a cluster that has
- * been crashed down. At this stage of a Master startup, they look the same: they have the
- * same state in hbase:meta. We could do detective work probing ZK and the FS for old WALs to
- * split but SCP does this already so just let it do its job.
- * >The profiles of clean shutdown and cluster crash-down are the same because on clean
- * shutdown currently, we do not update hbase:meta with region close state (In AMv2, region
- * state is kept in hbse:meta). Usually the master runs all region transitions as of AMv2 but on
- * cluster controlled shutdown, the RegionServers close all their regions only reporting the
- * final change to the Master. Currently this report is ignored. Later we could take it and
- * update as many regions as we can before hbase:meta goes down or have the master run the
- * close of all regions out on the cluster but we may never be able to achieve the proper state on
- * all regions (at least not w/o lots of painful manipulations and waiting) so clean shutdown
- * might not be possible especially on big clusters.... And clean shutdown will take time. Given
- * this current state of affairs, we just run ServerCrashProcedure in both cases. It will always
- * do the right thing.
- * @return True if for sure this is a failover where a Master is starting up into an already
- * running cluster.
- */
- // The assumption here is that if RSs are crashing while we are executing this
- // they will be handled by the SSH that are put in the ServerManager deadservers "queue".
- private boolean processofflineServersWithOnlineRegions() {
- boolean deadServers = !master.getServerManager().getDeadServers().isEmpty();
- final Set offlineServersWithOnlineRegions = new HashSet<>();
- int size = regionStates.getRegionStateNodes().size();
- final List offlineRegionsToAssign = new ArrayList<>(size);
- // If deadservers then its a failover, else, we are not sure yet.
- boolean failover = deadServers;
- for (RegionStateNode regionNode: regionStates.getRegionStateNodes()) {
- // Region State can be OPEN even if we did controlled cluster shutdown; Master does not close
- // the regions in this case. The RegionServer does the close so hbase:meta is state in
- // hbase:meta is not updated -- Master does all updates -- and is left with OPEN as region
- // state in meta. How to tell difference between ordered shutdown and crashed-down cluster
- // then? We can't. Not currently. Perhaps if we updated hbase:meta with CLOSED on ordered
- // shutdown. This would slow shutdown though and not all edits would make it in anyways.
- // TODO: Examine.
- // Because we can't be sure it an ordered shutdown, we run ServerCrashProcedure always.
- // ServerCrashProcedure will try to retain old deploy when it goes to assign.
- if (regionNode.getState() == State.OPEN) {
- final ServerName serverName = regionNode.getRegionLocation();
- if (!master.getServerManager().isServerOnline(serverName)) {
- offlineServersWithOnlineRegions.add(serverName);
- } else {
- // Server is online. This a failover. Master is starting into already-running cluster.
- failover = true;
- }
- } else if (regionNode.getState() == State.OFFLINE) {
- if (isTableEnabled(regionNode.getTable())) {
- offlineRegionsToAssign.add(regionNode.getRegionInfo());
- }
- }
- }
- // Kill servers with online regions just-in-case. Runs ServerCrashProcedure.
- for (ServerName serverName: offlineServersWithOnlineRegions) {
- if (!master.getServerManager().isServerOnline(serverName)) {
- LOG.info("KILL RegionServer=" + serverName + " hosting regions but not online.");
- killRegionServer(serverName);
- }
- }
- setFailoverCleanupDone(true);
-
- // Assign offline regions. Uses round-robin.
- if (offlineRegionsToAssign.size() > 0) {
- master.getMasterProcedureExecutor().submitProcedures(master.getAssignmentManager().
- createRoundRobinAssignProcedures(offlineRegionsToAssign));
- }
-
- return failover;
- }
-
- /**
- * Used by ServerCrashProcedure to make sure AssignmentManager has completed
- * the failover cleanup before re-assigning regions of dead servers. So that
- * when re-assignment happens, AssignmentManager has proper region states.
- */
- public boolean isFailoverCleanupDone() {
- return failoverCleanupDone.isReady();
- }
-
- /**
- * Used by ServerCrashProcedure tests verify the ability to suspend the
- * execution of the ServerCrashProcedure.
- */
- @VisibleForTesting
- public void setFailoverCleanupDone(final boolean b) {
- master.getMasterProcedureExecutor().getEnvironment()
- .setEventReady(failoverCleanupDone, b);
- }
-
- public ProcedureEvent getFailoverCleanupEvent() {
- return failoverCleanupDone;
- }
-
- /**
- * Used to check if the failover cleanup is done.
+ * Used to check if the meta loading is done.
+ *
* if not we throw PleaseHoldException since we are rebuilding the RegionStates
* @param hri region to check if it is already rebuild
- * @throws PleaseHoldException if the failover cleanup is not completed
+ * @throws PleaseHoldException if meta has not been loaded yet
*/
- private void checkFailoverCleanupCompleted(final RegionInfo hri) throws PleaseHoldException {
+ private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
if (!isRunning()) {
throw new PleaseHoldException("AssignmentManager not running");
}
-
- // TODO: can we avoid throwing an exception if hri is already loaded?
- // at the moment we bypass only meta
boolean meta = isMetaRegion(hri);
- boolean cleanup = isFailoverCleanupDone();
- if (!isMetaRegion(hri) && !isFailoverCleanupDone()) {
- String msg = "Master not fully online; hbase:meta=" + meta + ", failoverCleanup=" + cleanup;
- throw new PleaseHoldException(msg);
+ boolean metaLoaded = isMetaLoaded();
+ if (!meta && !metaLoaded) {
+ throw new PleaseHoldException(
+ "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
}
}
@@ -1539,7 +1457,7 @@ public class AssignmentManager implements ServerListener {
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
// on table that contains state.
- setMetaInitialized(hri, true);
+ setMetaAssigned(hri, true);
}
regionStates.addRegionToServer(regionNode);
// TODO: OPENING Updates hbase:meta too... we need to do both here and there?
@@ -1555,7 +1473,7 @@ public class AssignmentManager implements ServerListener {
regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE);
// Set meta has not initialized early. so people trying to create/edit tables will wait
if (isMetaRegion(hri)) {
- setMetaInitialized(hri, false);
+ setMetaAssigned(hri, false);
}
regionStates.addRegionToServer(regionNode);
regionStateStore.updateRegionLocation(regionNode);
@@ -1831,7 +1749,7 @@ public class AssignmentManager implements ServerListener {
private void acceptPlan(final HashMap regions,
final Map> plan) throws HBaseIOException {
- final ProcedureEvent[] events = new ProcedureEvent[regions.size()];
+ final ProcedureEvent>[] events = new ProcedureEvent[regions.size()];
final long st = System.currentTimeMillis();
if (plan == null) {
@@ -1883,7 +1801,7 @@ public class AssignmentManager implements ServerListener {
.map((s)->new Pair<>(s, master.getRegionServerVersion(s)))
.collect(Collectors.toList());
if (serverList.isEmpty()) {
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
String highestVersion = Collections.max(serverList,
(o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond();
@@ -1909,11 +1827,6 @@ public class AssignmentManager implements ServerListener {
wakeServerReportEvent(serverNode);
}
- private void killRegionServer(final ServerName serverName) {
- final ServerStateNode serverNode = regionStates.getServerNode(serverName);
- killRegionServer(serverNode);
- }
-
private void killRegionServer(final ServerStateNode serverNode) {
master.getServerManager().expireServer(serverNode.getServerName());
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 57e71f897a..4d454d7cc9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -553,7 +553,7 @@ public class MergeTableRegionsProcedure
try {
env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion);
} catch (QuotaExceededException e) {
- env.getAssignmentManager().getRegionNormalizer().planSkipped(this.mergedRegion,
+ env.getMasterServices().getRegionNormalizer().planSkipped(this.mergedRegion,
NormalizationPlan.PlanType.MERGE);
throw e;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index fa252316d6..2124d84378 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -128,23 +128,24 @@ public class RegionStateStore {
public void updateRegionLocation(RegionStates.RegionStateNode regionStateNode)
throws IOException {
if (regionStateNode.getRegionInfo().isMetaRegion()) {
- updateMetaLocation(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
+ updateMetaLocation(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation(),
+ regionStateNode.getState());
} else {
- long openSeqNum = regionStateNode.getState() == State.OPEN ?
- regionStateNode.getOpenSeqNum() : HConstants.NO_SEQNUM;
+ long openSeqNum = regionStateNode.getState() == State.OPEN ? regionStateNode.getOpenSeqNum()
+ : HConstants.NO_SEQNUM;
updateUserRegionLocation(regionStateNode.getRegionInfo(), regionStateNode.getState(),
- regionStateNode.getRegionLocation(), regionStateNode.getLastHost(), openSeqNum,
- // The regionStateNode may have no procedure in a test scenario; allow for this.
- regionStateNode.getProcedure() != null?
- regionStateNode.getProcedure().getProcId(): Procedure.NO_PROC_ID);
+ regionStateNode.getRegionLocation(), regionStateNode.getLastHost(), openSeqNum,
+ // The regionStateNode may have no procedure in a test scenario; allow for this.
+ regionStateNode.getProcedure() != null ? regionStateNode.getProcedure().getProcId()
+ : Procedure.NO_PROC_ID);
}
}
- private void updateMetaLocation(final RegionInfo regionInfo, final ServerName serverName)
+ private void updateMetaLocation(RegionInfo regionInfo, ServerName serverName, State state)
throws IOException {
try {
- MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName,
- regionInfo.getReplicaId(), State.OPEN);
+ MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, regionInfo.getReplicaId(),
+ state);
} catch (KeeperException e) {
throw new IOException(e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
index b96fb20d2d..c3b2458ddc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -421,9 +421,11 @@ public abstract class RegionTransitionProcedure
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
// Unless we are assigning meta, wait for meta to be available and loaded.
- if (!isMeta() && (env.waitFailoverCleanup(this) ||
- env.getAssignmentManager().waitMetaInitialized(this, getRegionInfo()))) {
- return LockState.LOCK_EVENT_WAIT;
+ if (!isMeta()) {
+ AssignmentManager am = env.getAssignmentManager();
+ if (am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo)) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
}
// TODO: Revisit this and move it to the executor
@@ -432,8 +434,7 @@ public abstract class RegionTransitionProcedure
LOG.debug(LockState.LOCK_EVENT_WAIT + " pid=" + getProcId() + " " +
env.getProcedureScheduler().dumpLocks());
} catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ // ignore, just for logging
}
return LockState.LOCK_EVENT_WAIT;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index 341affb10a..230603721f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -498,7 +498,7 @@ public class SplitTableRegionProcedure
try {
env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion());
} catch (QuotaExceededException e) {
- env.getAssignmentManager().getRegionNormalizer().planSkipped(this.getParentRegion(),
+ env.getMasterServices().getRegionNormalizer().planSkipped(this.getParentRegion(),
NormalizationPlan.PlanType.SPLIT);
throw e;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
index 5815255bb3..1af244590c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
@@ -31,7 +30,6 @@ import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionOfflineException;
import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.TableStateManager;
@@ -92,7 +90,9 @@ public abstract class AbstractStateMachineTableProcedure
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
- if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
+ if (env.waitInitialized(this)) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java
new file mode 100644
index 0000000000..4736d6576f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.InitMetaState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.InitMetaStateData;
+
+/**
+ * This procedure is used to initialize meta table for a new hbase deploy. It will just schedule an
+ * {@link AssignProcedure} to assign meta.
+ */
+@InterfaceAudience.Private
+public class InitMetaProcedure extends AbstractStateMachineTableProcedure {
+
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public TableName getTableName() {
+ return TableName.META_TABLE_NAME;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.CREATE;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, InitMetaState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ switch (state) {
+ case INIT_META_ASSIGN_META:
+ addChildProcedure(env.getAssignmentManager()
+ .createAssignProcedure(RegionInfoBuilder.FIRST_META_REGIONINFO));
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ }
+
+ @Override
+ protected LockState acquireLock(MasterProcedureEnv env) {
+ // we do not need to wait for master initialized, we are part of the initialization.
+ if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, InitMetaState state)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected InitMetaState getState(int stateId) {
+ return InitMetaState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(InitMetaState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected InitMetaState getInitialState() {
+ return InitMetaState.INIT_META_ASSIGN_META;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ serializer.serialize(InitMetaStateData.getDefaultInstance());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ serializer.deserialize(InitMetaStateData.class);
+ }
+
+ @Override
+ protected void completionCleanup(MasterProcedureEnv env) {
+ latch.countDown();
+ }
+
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 7fb187fe05..0ec932ce47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -19,13 +19,11 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@@ -70,8 +68,7 @@ public class MasterProcedureEnv implements ConfigurationObserver {
}
private boolean isRunning() {
- return master.isActiveMaster() && !master.isStopped() &&
- !master.isStopping() && !master.isAborted();
+ return !master.isStopped() && !master.isStopping() && !master.isAborted();
}
}
@@ -155,17 +152,6 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return master.getInitializedEvent().suspendIfNotReady(proc);
}
- public boolean waitServerCrashProcessingEnabled(Procedure> proc) {
- if (master instanceof HMaster) {
- return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc);
- }
- return false;
- }
-
- public boolean waitFailoverCleanup(Procedure> proc) {
- return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc);
- }
-
public void setEventReady(ProcedureEvent> event, boolean isReady) {
if (isReady) {
event.wake(procSched);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 373a957258..8389961572 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
@@ -480,7 +479,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param table Table to lock
* @return true if the procedure has to wait for the table to be available
*/
- public boolean waitTableExclusiveLock(final Procedure procedure, final TableName table) {
+ public boolean waitTableExclusiveLock(final Procedure> procedure, final TableName table) {
schedLock();
try {
final String namespace = table.getNamespaceAsString();
@@ -509,7 +508,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure releasing the lock
* @param table the name of the table that has the exclusive lock
*/
- public void wakeTableExclusiveLock(final Procedure procedure, final TableName table) {
+ public void wakeTableExclusiveLock(final Procedure> procedure, final TableName table) {
schedLock();
try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
@@ -537,7 +536,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param table Table to lock
* @return true if the procedure has to wait for the table to be available
*/
- public boolean waitTableSharedLock(final Procedure procedure, final TableName table) {
+ public boolean waitTableSharedLock(final Procedure> procedure, final TableName table) {
return waitTableQueueSharedLock(procedure, table) == null;
}
@@ -568,7 +567,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure releasing the lock
* @param table the name of the table that has the shared lock
*/
- public void wakeTableSharedLock(final Procedure procedure, final TableName table) {
+ public void wakeTableSharedLock(final Procedure> procedure, final TableName table) {
schedLock();
try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
@@ -629,7 +628,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param regionInfo the region we are trying to lock
* @return true if the procedure has to wait for the regions to be available
*/
- public boolean waitRegion(final Procedure procedure, final RegionInfo regionInfo) {
+ public boolean waitRegion(final Procedure> procedure, final RegionInfo regionInfo) {
return waitRegions(procedure, regionInfo.getTable(), regionInfo);
}
@@ -640,7 +639,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param regionInfo the list of regions we are trying to lock
* @return true if the procedure has to wait for the regions to be available
*/
- public boolean waitRegions(final Procedure procedure, final TableName table,
+ public boolean waitRegions(final Procedure> procedure, final TableName table,
final RegionInfo... regionInfo) {
Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
schedLock();
@@ -688,7 +687,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure that was holding the region
* @param regionInfo the region the procedure was holding
*/
- public void wakeRegion(final Procedure procedure, final RegionInfo regionInfo) {
+ public void wakeRegion(final Procedure> procedure, final RegionInfo regionInfo) {
wakeRegions(procedure, regionInfo.getTable(), regionInfo);
}
@@ -697,7 +696,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure that was holding the regions
* @param regionInfo the list of regions the procedure was holding
*/
- public void wakeRegions(final Procedure procedure,final TableName table,
+ public void wakeRegions(final Procedure> procedure,final TableName table,
final RegionInfo... regionInfo) {
Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
schedLock();
@@ -744,7 +743,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param namespace Namespace to lock
* @return true if the procedure has to wait for the namespace to be available
*/
- public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String namespace) {
+ public boolean waitNamespaceExclusiveLock(final Procedure> procedure, final String namespace) {
schedLock();
try {
final LockAndQueue systemNamespaceTableLock =
@@ -775,7 +774,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure releasing the lock
* @param namespace the namespace that has the exclusive lock
*/
- public void wakeNamespaceExclusiveLock(final Procedure procedure, final String namespace) {
+ public void wakeNamespaceExclusiveLock(final Procedure> procedure, final String namespace) {
schedLock();
try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
@@ -893,7 +892,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @see #wakeMetaExclusiveLock(Procedure)
* @param procedure the procedure trying to acquire the lock
* @return true if the procedure has to wait for meta to be available
+ * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
+ * {@link RecoverMetaProcedure}.
*/
+ @Deprecated
public boolean waitMetaExclusiveLock(Procedure> procedure) {
schedLock();
try {
@@ -914,7 +916,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* Wake the procedures waiting for meta.
* @see #waitMetaExclusiveLock(Procedure)
* @param procedure the procedure releasing the lock
+ * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
+ * {@link RecoverMetaProcedure}.
*/
+ @Deprecated
public void wakeMetaExclusiveLock(Procedure> procedure) {
schedLock();
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaProcedureInterface.java
index 39c271b9bf..39892acada 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaProcedureInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaProcedureInterface.java
@@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.master.procedure;
import org.apache.yetus.audience.InterfaceAudience;
+/**
+ * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
+ * {@link RecoverMetaProcedure}.
+ */
+@Deprecated
@InterfaceAudience.Private
public interface MetaProcedureInterface {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaQueue.java
index 190c95654c..f4121e8ae3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaQueue.java
@@ -22,6 +22,11 @@ import org.apache.hadoop.hbase.procedure2.LockStatus;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.yetus.audience.InterfaceAudience;
+/**
+ * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
+ * {@link RecoverMetaProcedure}.
+ */
+@Deprecated
@InterfaceAudience.Private
class MetaQueue extends Queue {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RecoverMetaProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RecoverMetaProcedure.java
index 97035f14fb..3b848fa33c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RecoverMetaProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RecoverMetaProcedure.java
@@ -48,7 +48,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
* This procedure recovers meta from prior shutdown/ crash of a server, and brings meta online by
* assigning meta region/s. Any place where meta is accessed and requires meta to be online, need to
* submit this procedure instead of duplicating steps to recover meta in the code.
+ *
+ * @deprecated Do not use any more, leave it here only for compatible. The recovery work will be
+ * done in {@link ServerCrashProcedure} directly, and the initial work for meta table
+ * will be done by {@link InitMetaProcedure}.
+ * @see ServerCrashProcedure
+ * @see InitMetaProcedure
*/
+@Deprecated
@InterfaceAudience.Private
public class RecoverMetaProcedure
extends StateMachineProcedure
@@ -281,7 +288,7 @@ public class RecoverMetaProcedure
* already initialized
*/
private boolean isRunRequired() {
- return failedMetaServer != null || !master.getAssignmentManager().isMetaInitialized();
+ return failedMetaServer != null || !master.getAssignmentManager().isMetaAssigned();
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
index 4bd35f3f25..9a5eb7e64d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
@@ -78,6 +78,11 @@ class SchemaLocking {
return getLock(regionLocks, encodedRegionName);
}
+ /**
+ * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
+ * {@link RecoverMetaProcedure}.
+ */
+ @Deprecated
LockAndQueue getMetaLock() {
return metaLock;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 5a4c10fabc..775c8c2bea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterWalManager;
@@ -83,11 +85,8 @@ public class ServerCrashProcedure
* @param shouldSplitWal True if we should split WALs as part of crashed server processing.
* @param carryingMeta True if carrying hbase:meta table region.
*/
- public ServerCrashProcedure(
- final MasterProcedureEnv env,
- final ServerName serverName,
- final boolean shouldSplitWal,
- final boolean carryingMeta) {
+ public ServerCrashProcedure(final MasterProcedureEnv env, final ServerName serverName,
+ final boolean shouldSplitWal, final boolean carryingMeta) {
this.serverName = serverName;
this.shouldSplitWal = shouldSplitWal;
this.carryingMeta = carryingMeta;
@@ -119,18 +118,32 @@ public class ServerCrashProcedure
LOG.info("Start " + this);
// If carrying meta, process it first. Else, get list of regions on crashed server.
if (this.carryingMeta) {
- setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
+ setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
} else {
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
}
break;
-
+ case SERVER_CRASH_SPLIT_META_LOGS:
+ splitMetaLogs(env);
+ setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
+ break;
+ case SERVER_CRASH_ASSIGN_META:
+ handleRIT(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
+ addChildProcedure(env.getAssignmentManager()
+ .createAssignProcedure(RegionInfoBuilder.FIRST_META_REGIONINFO));
+ setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
+ break;
+ case SERVER_CRASH_PROCESS_META:
+ // not used any more but still leave it here to keep compatible as there maybe old SCP
+ // which is stored in ProcedureStore which has this state.
+ processMeta(env);
+ setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
+ break;
case SERVER_CRASH_GET_REGIONS:
// If hbase:meta is not assigned, yield.
if (env.getAssignmentManager().waitMetaLoaded(this)) {
throw new ProcedureSuspendedException();
}
-
this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates()
.getServerRegionInfoSet(serverName);
// Where to go next? Depends on whether we should split logs at all or
@@ -141,17 +154,10 @@ public class ServerCrashProcedure
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
}
break;
-
- case SERVER_CRASH_PROCESS_META:
- processMeta(env);
- setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
- break;
-
case SERVER_CRASH_SPLIT_LOGS:
splitLogs(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
break;
-
case SERVER_CRASH_ASSIGN:
// If no regions to assign, skip assign and skip to the finish.
// Filter out meta regions. Those are handled elsewhere in this procedure.
@@ -177,18 +183,15 @@ public class ServerCrashProcedure
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
}
break;
-
case SERVER_CRASH_HANDLE_RIT2:
// Noop. Left in place because we used to call handleRIT here for a second time
// but no longer necessary since HBASE-20634.
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break;
-
case SERVER_CRASH_FINISH:
services.getAssignmentManager().getRegionStates().removeServer(serverName);
services.getServerManager().getDeadServers().finish(serverName);
return Flow.NO_MORE_STATE;
-
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
@@ -198,11 +201,6 @@ public class ServerCrashProcedure
return Flow.HAS_MORE_STATE;
}
-
- /**
- * @param env
- * @throws IOException
- */
private void processMeta(final MasterProcedureEnv env) throws IOException {
LOG.debug("{}; processing hbase:meta", this);
@@ -227,10 +225,18 @@ public class ServerCrashProcedure
RegionReplicaUtil.isDefaultReplica(hri);
}
+ private void splitMetaLogs(MasterProcedureEnv env) throws IOException {
+ LOG.debug("Splitting meta WALs {}", this);
+ MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
+ AssignmentManager am = env.getMasterServices().getAssignmentManager();
+ am.getRegionStates().metaLogSplitting(serverName);
+ mwm.splitMetaLog(serverName);
+ am.getRegionStates().metaLogSplit(serverName);
+ LOG.debug("Done splitting meta WALs {}", this);
+ }
+
private void splitLogs(final MasterProcedureEnv env) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Splitting WALs " + this);
- }
+ LOG.debug("Splitting WALs {}", this);
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
// TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
@@ -271,9 +277,6 @@ public class ServerCrashProcedure
@Override
protected LockState acquireLock(final MasterProcedureEnv env) {
- // TODO: Put this BACK AFTER AMv2 goes in!!!!
- // if (env.waitFailoverCleanup(this)) return LockState.LOCK_EVENT_WAIT;
- if (env.waitServerCrashProcessingEnabled(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitServerExclusiveLock(this, getServerName())) {
return LockState.LOCK_EVENT_WAIT;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 9161e255bb..f7865ee330 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -108,8 +108,6 @@ public class TestMetaTableAccessor {
* Does {@link MetaTableAccessor#getRegion(Connection, byte[])} and a write
* against hbase:meta while its hosted server is restarted to prove our retrying
* works.
- * @throws IOException
- * @throws InterruptedException
*/
@Test public void testRetrying()
throws IOException, InterruptedException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 02566605a0..34570d37b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -20,10 +20,8 @@ package org.apache.hadoop.hbase.master;
import static org.mockito.Mockito.mock;
import com.google.protobuf.Service;
-
import java.io.IOException;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService;
@@ -57,8 +55,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import com.google.protobuf.Service;
-
public class MockNoopMasterServices implements MasterServices {
private final Configuration conf;
private final MetricsMaster metricsMaster;
@@ -214,16 +210,6 @@ public class MockNoopMasterServices implements MasterServices {
return null;
}
- private boolean serverCrashProcessingEnabled = true;
-
- public void setServerCrashProcessingEnabled(boolean b) {
- serverCrashProcessingEnabled = b;
- }
- @Override
- public boolean isServerCrashProcessingEnabled() {
- return serverCrashProcessingEnabled;
- }
-
@Override
public boolean registerService(Service instance) {
return false;
@@ -453,11 +439,6 @@ public class MockNoopMasterServices implements MasterServices {
}
@Override
- public boolean recoverMeta() throws IOException {
- return false;
- }
-
- @Override
public String getClientIdAuditPrefix() {
return null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index b8a53b640c..27eec65274 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Triple;
+import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -90,7 +91,7 @@ public class TestCatalogJanitor {
}
@Before
- public void setup() throws IOException {
+ public void setup() throws IOException, KeeperException {
setRootDirAndCleanIt(HTU, this.name.getMethodName());
NavigableMap> regionsToRegionServers =
new ConcurrentSkipListMap>();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
index 2272bec8b5..c4a2f0389e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedSet;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
@@ -56,11 +55,18 @@ import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.security.Superusers;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.zookeeper.KeeperException;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -70,10 +76,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
/**
@@ -174,7 +176,7 @@ public class MockMasterServices extends MockNoopMasterServices {
}
public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher)
- throws IOException {
+ throws IOException, KeeperException {
startProcedureExecutor(remoteDispatcher);
this.assignmentManager.start();
for (int i = 0; i < numServes; ++i) {
@@ -217,17 +219,14 @@ public class MockMasterServices extends MockNoopMasterServices {
private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher)
throws IOException {
final Configuration conf = getConfiguration();
- final Path logDir = new Path(fileSystemManager.getRootDir(),
- WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
-
this.procedureStore = new NoopProcedureStore();
this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
this.procedureEnv = new MasterProcedureEnv(this,
remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this));
- this.procedureExecutor = new ProcedureExecutor(conf, procedureEnv, procedureStore,
- procedureEnv.getProcedureScheduler());
+ this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore,
+ procedureEnv.getProcedureScheduler());
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
Math.max(Runtime.getRuntime().availableProcessors(),
@@ -236,7 +235,7 @@ public class MockMasterServices extends MockNoopMasterServices {
MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
this.procedureStore.start(numThreads);
- this.procedureExecutor.start(numThreads, abortOnCorruption);
+ ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption);
this.procedureEnv.getRemoteDispatcher().start();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 0aac4c7dac..08ecb815ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -150,7 +150,6 @@ public class TestAssignmentManager {
rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
am.wakeMetaLoadedEvent();
- am.setFailoverCleanupDone(true);
}
@After
@@ -427,18 +426,15 @@ public class TestAssignmentManager {
am = master.getAssignmentManager();
// Assign meta
- master.setServerCrashProcessingEnabled(false);
rsDispatcher.setMockRsExecutor(new HangThenRSRestartExecutor());
am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
- assertEquals(true, am.isMetaInitialized());
+ assertEquals(true, am.isMetaAssigned());
// set it back as default, see setUpMeta()
- master.setServerCrashProcessingEnabled(true);
am.wakeMetaLoadedEvent();
- am.setFailoverCleanupDone(true);
}
- private Future submitProcedure(final Procedure proc) {
+ private Future submitProcedure(final Procedure> proc) {
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
}
@@ -449,7 +445,7 @@ public class TestAssignmentManager {
LOG.info("ExecutionException", e);
Exception ee = (Exception)e.getCause();
if (ee instanceof InterruptedIOException) {
- for (Procedure p: this.master.getMasterProcedureExecutor().getProcedures()) {
+ for (Procedure> p: this.master.getMasterProcedureExecutor().getProcedures()) {
LOG.info(p.toStringDetails());
}
}
@@ -493,13 +489,6 @@ public class TestAssignmentManager {
return proc;
}
- private UnassignProcedure createAndSubmitUnassign(TableName tableName, int regionId) {
- RegionInfo hri = createRegionInfo(tableName, regionId);
- UnassignProcedure proc = am.createUnassignProcedure(hri, null, false);
- master.getMasterProcedureExecutor().submitProcedure(proc);
- return proc;
- }
-
private RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
return RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes(regionId))
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
index 3f61de462f..1c1a36ea5b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.master.MasterMetaBootstrap;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@@ -85,7 +84,6 @@ public class MasterProcedureTestingUtility {
env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
}
am.stop();
- master.setServerCrashProcessingEnabled(false);
master.setInitialized(false);
return null;
}
@@ -96,9 +94,6 @@ public class MasterProcedureTestingUtility {
public Void call() throws Exception {
final AssignmentManager am = env.getAssignmentManager();
am.start();
- MasterMetaBootstrap metaBootstrap = new MasterMetaBootstrap(master);
- metaBootstrap.recoverMeta();
- metaBootstrap.processDeadServers();
am.joinCluster();
master.setInitialized(true);
return null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
index 96bdbde83b..9a0e2f61ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
@@ -22,18 +22,18 @@ import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
@@ -84,7 +84,7 @@ public class TestMasterProcedureEvents {
@After
public void tearDown() throws Exception {
- for (HTableDescriptor htd: UTIL.getAdmin().listTables()) {
+ for (TableDescriptor htd: UTIL.getAdmin().listTableDescriptors()) {
LOG.info("Tear down, remove table=" + htd.getTableName());
UTIL.deleteTable(htd.getTableName());
}
@@ -96,58 +96,22 @@ public class TestMasterProcedureEvents {
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
ProcedureExecutor procExec = master.getMasterProcedureExecutor();
- HRegionInfo hri = new HRegionInfo(tableName);
- HTableDescriptor htd = new HTableDescriptor(tableName);
- htd.addFamily(new HColumnDescriptor("f"));
+ RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
+ TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
- while (!master.isInitialized()) Thread.sleep(250);
+ while (!master.isInitialized()) {
+ Thread.sleep(250);
+ }
master.setInitialized(false); // fake it, set back later
// check event wait/wake
testProcedureEventWaitWake(master, master.getInitializedEvent(),
- new CreateTableProcedure(procExec.getEnvironment(), htd, new HRegionInfo[] { hri }));
- }
-
- @Test
- public void testServerCrashProcedureEvent() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
- HMaster master = UTIL.getMiniHBaseCluster().getMaster();
- ProcedureExecutor procExec = master.getMasterProcedureExecutor();
-
- while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() ||
- master.getAssignmentManager().getRegionStates().hasRegionsInTransition()) {
- Thread.sleep(25);
- }
-
- UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]);
- try (Table t = UTIL.getConnection().getTable(tableName)) {
- // Load the table with a bit of data so some logs to split and some edits in each region.
- UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
- }
-
- master.setServerCrashProcessingEnabled(false); // fake it, set back later
-
- // Kill a server. Master will notice but do nothing other than add it to list of dead servers.
- HRegionServer hrs = getServerWithRegions();
- boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(hrs.getServerName());
- UTIL.getHBaseCluster().killRegionServer(hrs.getServerName());
- hrs.join();
-
- // Wait until the expiration of the server has arrived at the master. We won't process it
- // by queuing a ServerCrashProcedure because we have disabled crash processing... but wait
- // here so ServerManager gets notice and adds expired server to appropriate queues.
- while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10);
-
- // Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
- master.getServerManager().moveFromOnlineToDeadServers(hrs.getServerName());
-
- // check event wait/wake
- testProcedureEventWaitWake(master, master.getServerCrashProcessingEnabledEvent(),
- new ServerCrashProcedure(procExec.getEnvironment(), hrs.getServerName(), true, carryingMeta));
+ new CreateTableProcedure(procExec.getEnvironment(), htd, new RegionInfo[] { hri }));
}
- private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent event,
- final Procedure proc) throws Exception {
+ private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent> event,
+ final Procedure> proc) throws Exception {
final ProcedureExecutor procExec = master.getMasterProcedureExecutor();
final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureScheduler();
@@ -188,14 +152,4 @@ public class TestMasterProcedureEvents {
" pollCalls=" + (procSched.getPollCalls() - startPollCalls) +
" nullPollCalls=" + (procSched.getNullPollCalls() - startNullPollCalls));
}
-
- private HRegionServer getServerWithRegions() {
- for (int i = 0; i < 3; ++i) {
- HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i);
- if (hrs.getNumberOfOnlineRegions() > 0) {
- return hrs;
- }
- }
- return null;
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
index 58d5d46d23..9f7fafeca6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
@@ -23,10 +23,10 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
@@ -102,13 +102,17 @@ public class TestServerCrashProcedure {
testRecoveryAndDoubleExecution(false, true);
}
+ private long getSCPProcId(ProcedureExecutor> procExec) {
+ util.waitFor(30000, () -> !procExec.getProcedures().isEmpty());
+ return procExec.getActiveProcIds().stream().mapToLong(Long::longValue).min().getAsLong();
+ }
+
/**
* Run server crash procedure steps twice to test idempotency and that we are persisting all
* needed state.
- * @throws Exception
*/
- private void testRecoveryAndDoubleExecution(final boolean carryingMeta,
- final boolean doubleExecution) throws Exception {
+ private void testRecoveryAndDoubleExecution(boolean carryingMeta, boolean doubleExecution)
+ throws Exception {
final TableName tableName = TableName.valueOf(
"testRecoveryAndDoubleExecution-carryingMeta-" + carryingMeta);
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
@@ -123,33 +127,29 @@ public class TestServerCrashProcedure {
// Master's running of the server crash processing.
final HMaster master = this.util.getHBaseCluster().getMaster();
final ProcedureExecutor procExec = master.getMasterProcedureExecutor();
- master.setServerCrashProcessingEnabled(false);
// find the first server that match the request and executes the test
ServerName rsToKill = null;
- for (HRegionInfo hri : util.getHBaseAdmin().getTableRegions(tableName)) {
+ for (RegionInfo hri : util.getAdmin().getRegions(tableName)) {
final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri);
if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == carryingMeta) {
rsToKill = serverName;
break;
}
}
- // kill the RS
- AssignmentTestingUtil.killRs(util, rsToKill);
- // Now, reenable processing else we can't get a lock on the ServerCrashProcedure.
- master.setServerCrashProcessingEnabled(true);
- // Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
- master.getServerManager().moveFromOnlineToDeadServers(rsToKill);
// Enable test flags and then queue the crash procedure.
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
- ServerCrashProcedure scp = new ServerCrashProcedure(procExec.getEnvironment(), rsToKill,
- true, carryingMeta);
if (doubleExecution) {
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
- long procId = procExec.submitProcedure(scp);
+ // kill the RS
+ AssignmentTestingUtil.killRs(util, rsToKill);
+ long procId = getSCPProcId(procExec);
// Now run through the procedure twice crashing the executor on each step...
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
} else {
- ProcedureTestingUtility.submitAndWait(procExec, scp);
+ // kill the RS
+ AssignmentTestingUtil.killRs(util, rsToKill);
+ long procId = getSCPProcId(procExec);
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
}
// Assert all data came back.
assertEquals(count, util.countRows(t));
--
2.11.4.GIT