From 6dbbd78aa00ed4877292d9cd48621803a175d51a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 19 Jun 2018 15:02:10 +0800 Subject: [PATCH] HBASE-20708 Remove the usage of RecoverMetaProcedure in master startup --- .../org/apache/hadoop/hbase/MetaTableAccessor.java | 10 +- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 45 ++-- .../hbase/procedure2/ProcedureTestingUtility.java | 10 +- .../hbase/procedure2/TestChildProcedures.java | 4 +- .../hbase/procedure2/TestProcedureEvents.java | 4 +- .../hbase/procedure2/TestProcedureExecution.java | 4 +- .../hbase/procedure2/TestProcedureExecutor.java | 5 +- .../procedure2/TestProcedureInMemoryChore.java | 5 +- .../hbase/procedure2/TestProcedureMetrics.java | 2 +- .../hbase/procedure2/TestProcedureNonce.java | 4 +- .../hbase/procedure2/TestProcedureRecovery.java | 4 +- .../hbase/procedure2/TestProcedureReplayOrder.java | 6 +- .../hbase/procedure2/TestProcedureSuspended.java | 4 +- .../procedure2/TestStateMachineProcedure.java | 4 +- .../hbase/procedure2/TestYieldProcedures.java | 6 +- .../src/main/protobuf/MasterProcedure.proto | 15 +- .../apache/hadoop/hbase/master/CatalogJanitor.java | 13 +- .../org/apache/hadoop/hbase/master/HMaster.java | 180 +++++++------ .../hadoop/hbase/master/MasterMetaBootstrap.java | 41 +-- .../apache/hadoop/hbase/master/MasterServices.java | 12 - .../hadoop/hbase/master/MasterWalManager.java | 28 +- .../hadoop/hbase/master/RegionServerTracker.java | 30 ++- .../apache/hadoop/hbase/master/ServerManager.java | 163 ++---------- .../hbase/master/assignment/AssignmentManager.java | 293 ++++++++------------- .../assignment/MergeTableRegionsProcedure.java | 2 +- .../hbase/master/assignment/RegionStateStore.java | 21 +- .../assignment/RegionTransitionProcedure.java | 11 +- .../assignment/SplitTableRegionProcedure.java | 2 +- .../AbstractStateMachineTableProcedure.java | 6 +- .../hbase/master/procedure/InitMetaProcedure.java | 115 ++++++++ .../hbase/master/procedure/MasterProcedureEnv.java | 16 +- .../master/procedure/MasterProcedureScheduler.java | 27 +- .../master/procedure/MetaProcedureInterface.java | 5 + .../hadoop/hbase/master/procedure/MetaQueue.java | 5 + .../master/procedure/RecoverMetaProcedure.java | 9 +- .../hbase/master/procedure/SchemaLocking.java | 5 + .../master/procedure/ServerCrashProcedure.java | 61 +++-- .../apache/hadoop/hbase/TestMetaTableAccessor.java | 2 - .../hbase/master/MockNoopMasterServices.java | 19 -- .../hadoop/hbase/master/TestCatalogJanitor.java | 3 +- .../master/assignment/MockMasterServices.java | 25 +- .../master/assignment/TestAssignmentManager.java | 17 +- .../procedure/MasterProcedureTestingUtility.java | 5 - .../procedure/TestMasterProcedureEvents.java | 76 ++---- .../master/procedure/TestServerCrashProcedure.java | 32 +-- 45 files changed, 610 insertions(+), 746 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 91f3cf7514..60afaca389 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -1346,9 +1346,17 @@ public class MetaTableAccessor { */ public static void putsToMetaTable(final Connection connection, final List ps) throws IOException { + if (ps.isEmpty()) { + return; + } try (Table t = getMetaHTable(connection)) { debugLogMutations(ps); - t.put(ps); + // the implementation for putting a single Put is much simpler so here we do a check first. + if (ps.size() == 1) { + t.put(ps.get(0)); + } else { + t.put(ps); + } } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 3a75d33dd2..bd0a191490 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -511,21 +511,16 @@ public class ProcedureExecutor { } /** - * Start the procedure executor. - * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to - * recover the lease, and ensure a single executor, and start the procedure - * replay to resume and recover the previous pending and in-progress perocedures. - * + * Initialize the procedure executor, but do not start workers. We will start them later. + *

+ * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and + * ensure a single executor, and start the procedure replay to resume and recover the previous + * pending and in-progress procedures. * @param numThreads number of threads available for procedure execution. - * @param abortOnCorruption true if you want to abort your service in case - * a corrupted procedure is found on replay. otherwise false. + * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure + * is found on replay. otherwise false. */ - public void start(int numThreads, boolean abortOnCorruption) throws IOException { - if (!running.compareAndSet(false, true)) { - LOG.warn("Already running"); - return; - } - + public void init(int numThreads, boolean abortOnCorruption) throws IOException { // We have numThreads executor + one timer thread used for timing out // procedures and triggering periodic procedures. this.corePoolSize = numThreads; @@ -546,11 +541,11 @@ public class ProcedureExecutor { long st, et; // Acquire the store lease. - st = EnvironmentEdgeManager.currentTime(); + st = System.nanoTime(); store.recoverLease(); - et = EnvironmentEdgeManager.currentTime(); + et = System.nanoTime(); LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), - StringUtils.humanTimeDiff(et - st)); + StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); // start the procedure scheduler scheduler.start(); @@ -560,12 +555,21 @@ public class ProcedureExecutor { // The first one will make sure that we have the latest id, // so we can start the threads and accept new procedures. // The second step will do the actual load of old procedures. - st = EnvironmentEdgeManager.currentTime(); + st = System.nanoTime(); load(abortOnCorruption); - et = EnvironmentEdgeManager.currentTime(); + et = System.nanoTime(); LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), - StringUtils.humanTimeDiff(et - st)); + StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); + } + /** + * Start the workers. + */ + public void startWorkers() throws IOException { + if (!running.compareAndSet(false, true)) { + LOG.warn("Already running"); + return; + } // Start the executors. Here we must have the lastProcId set. LOG.trace("Start workers {}", workerThreads.size()); timeoutExecutor.start(); @@ -861,7 +865,6 @@ public class ProcedureExecutor { justification = "FindBugs is blind to the check-for-null") public long submitProcedure(final Procedure proc, final NonceKey nonceKey) { Preconditions.checkArgument(lastProcId.get() >= 0); - Preconditions.checkArgument(isRunning(), "executor not running"); prepareProcedure(proc); @@ -895,7 +898,6 @@ public class ProcedureExecutor { // TODO: Do we need to take nonces here? public void submitProcedures(final Procedure[] procs) { Preconditions.checkArgument(lastProcId.get() >= 0); - Preconditions.checkArgument(isRunning(), "executor not running"); if (procs == null || procs.length <= 0) { return; } @@ -919,7 +921,6 @@ public class ProcedureExecutor { private Procedure prepareProcedure(final Procedure proc) { Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); - Preconditions.checkArgument(isRunning(), "executor not running"); Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); if (this.checkOwnerSet) { Preconditions.checkArgument(proc.hasOwner(), "missing owner"); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index d29e37615b..e8d72f9d63 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -70,6 +70,12 @@ public class ProcedureTestingUtility { restart(procExecutor, false, true, null, null); } + public static void initAndStartWorkers(ProcedureExecutor procExecutor, int numThreads, + boolean abortOnCorruption) throws IOException { + procExecutor.init(numThreads, abortOnCorruption); + procExecutor.startWorkers(); + } + public static void restart(final ProcedureExecutor procExecutor, final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted, final Callable stopAction, final Callable startAction) @@ -98,7 +104,7 @@ public class ProcedureTestingUtility { // re-start LOG.info("RESTART - Start"); procStore.start(storeThreads); - procExecutor.start(execThreads, failOnCorrupted); + initAndStartWorkers(procExecutor, execThreads, failOnCorrupted); if (startAction != null) { startAction.call(); } @@ -183,7 +189,7 @@ public class ProcedureTestingUtility { NoopProcedureStore procStore = new NoopProcedureStore(); ProcedureExecutor procExecutor = new ProcedureExecutor<>(conf, env, procStore); procStore.start(1); - procExecutor.start(1, false); + initAndStartWorkers(procExecutor, 1, false); try { return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); } finally { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java index 3d99b31abd..cce4caf1ce 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java @@ -66,10 +66,10 @@ public class TestChildProcedures { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java index b7c59c80c6..8351e4cd32 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java @@ -67,9 +67,9 @@ public class TestProcedureEvents { procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procStore.start(1); - procExecutor.start(1, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java index 7e660e4d0b..a3cff582fd 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java @@ -71,9 +71,9 @@ public class TestProcedureExecution { logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java index 1c53098a19..7f130caf4a 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java @@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -71,8 +70,8 @@ public class TestProcedureExecutor { } private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception { - procExecutor = new ProcedureExecutor(conf, procEnv, procStore); - procExecutor.start(numThreads, true); + procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true); } @Test diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java index 86293e1347..75c8d16485 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java @@ -53,17 +53,16 @@ public class TestProcedureInMemoryChore { private HBaseCommonTestingUtility htu; - @SuppressWarnings("rawtypes") @Before public void setUp() throws IOException { htu = new HBaseCommonTestingUtility(); procEnv = new TestProcEnv(); procStore = new NoopProcedureStore(); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java index 94a293d75f..2acb7ddcf3 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java @@ -75,7 +75,7 @@ public class TestProcedureMetrics { procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java index b702314dc7..2bf11fbaa0 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java @@ -72,10 +72,10 @@ public class TestProcedureNonce { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index aece1de4f4..532fcf3871 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -76,10 +76,10 @@ public class TestProcedureRecovery { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); procSleepInterval = 0; } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java index 7d0529e70a..319ddb294d 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java @@ -53,7 +53,7 @@ public class TestProcedureReplayOrder { private static final int NUM_THREADS = 16; - private ProcedureExecutor procExecutor; + private ProcedureExecutor procExecutor; private TestProcedureEnv procEnv; private ProcedureStore procStore; @@ -74,9 +74,9 @@ public class TestProcedureReplayOrder { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcedureEnv(); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procStore.start(NUM_THREADS); - procExecutor.start(1, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java index 3da7c117a6..a9e919cb73 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -60,9 +60,9 @@ public class TestProcedureSuspended { htu = new HBaseCommonTestingUtility(); procStore = new NoopProcedureStore(); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java index 69b2208356..19ef4bb7e6 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java @@ -81,9 +81,9 @@ public class TestStateMachineProcedure { logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index 7fa768243b..8d9b325169 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -71,10 +71,10 @@ public class TestYieldProcedures { logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procRunnables = new TestScheduler(); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), - procStore, procRunnables); + procExecutor = + new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index b65551f50d..0b4e1d799a 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -295,15 +295,17 @@ message RecoverMetaStateData { enum ServerCrashState { SERVER_CRASH_START = 1; - SERVER_CRASH_PROCESS_META = 2; + SERVER_CRASH_PROCESS_META = 2[deprecated=true]; SERVER_CRASH_GET_REGIONS = 3; - SERVER_CRASH_NO_SPLIT_LOGS = 4; + SERVER_CRASH_NO_SPLIT_LOGS = 4[deprecated=true]; SERVER_CRASH_SPLIT_LOGS = 5; // Removed SERVER_CRASH_PREPARE_LOG_REPLAY = 6; // Removed SERVER_CRASH_CALC_REGIONS_TO_ASSIGN = 7; SERVER_CRASH_ASSIGN = 8; SERVER_CRASH_WAIT_ON_ASSIGN = 9; - SERVER_CRASH_HANDLE_RIT2 = 20; + SERVER_CRASH_SPLIT_META_LOGS = 10; + SERVER_CRASH_ASSIGN_META = 11; + SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true]; SERVER_CRASH_FINISH = 100; } @@ -445,3 +447,10 @@ enum ReopenTableRegionsState { message ReopenTableRegionsStateData { required TableName table_name = 1; } + +enum InitMetaState { + INIT_META_ASSIGN_META = 1; +} + +message InitMetaStateData { +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 23912d67c8..85150937e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -111,17 +111,14 @@ public class CatalogJanitor extends ScheduledChore { protected void chore() { try { AssignmentManager am = this.services.getAssignmentManager(); - if (this.enabled.get() - && !this.services.isInMaintenanceMode() - && am != null - && am.isFailoverCleanupDone() - && !am.hasRegionsInTransition()) { + if (this.enabled.get() && !this.services.isInMaintenanceMode() && am != null && + am.isMetaLoaded() && !am.hasRegionsInTransition()) { scan(); } else { LOG.warn("CatalogJanitor is disabled! Enabled=" + this.enabled.get() + - ", maintenanceMode=" + this.services.isInMaintenanceMode() + - ", am=" + am + ", failoverCleanupDone=" + (am != null && am.isFailoverCleanupDone()) + - ", hasRIT=" + (am != null && am.hasRegionsInTransition())); + ", maintenanceMode=" + this.services.isInMaintenanceMode() + ", am=" + am + + ", metaLoaded=" + (am != null && am.isMetaLoaded()) + ", hasRIT=" + + (am != null && am.hasRegionsInTransition())); } } catch (IOException e) { LOG.warn("Failed scan of catalog table", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 38aac50063..d6b793a2ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -122,13 +124,14 @@ import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; -import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; @@ -235,7 +238,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @SuppressWarnings("deprecation") public class HMaster extends HRegionServer implements MasterServices { - private static Logger LOG = LoggerFactory.getLogger(HMaster.class.getName()); + private static Logger LOG = LoggerFactory.getLogger(HMaster.class); /** * Protection against zombie master. Started once Master accepts active responsibility and @@ -351,10 +354,6 @@ public class HMaster extends HRegionServer implements MasterServices { // initialization may have not completed yet. volatile boolean serviceStarted = false; - // flag set after we complete assignMeta. - private final ProcedureEvent serverCrashProcessingEnabled = - new ProcedureEvent<>("server crash processing"); - // Maximum time we should run balancer for private final int maxBlancingTime; // Maximum percent of regions in transition when balancing @@ -725,7 +724,8 @@ public class HMaster extends HRegionServer implements MasterServices { /** *

- * Initialize all ZK based system trackers. + * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it + * should have already been initialized along with {@link ServerManager}. *

*

* Will be overridden in tests. @@ -747,15 +747,8 @@ public class HMaster extends HRegionServer implements MasterServices { this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this); this.splitOrMergeTracker.start(); - // Create Assignment Manager - this.assignmentManager = new AssignmentManager(this); - this.assignmentManager.start(); - this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); - this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); - this.regionServerTracker.start(); - this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker.start(); @@ -800,18 +793,40 @@ public class HMaster extends HRegionServer implements MasterServices { /** * Finish initialization of HMaster after becoming the primary master. - * + *

+ * The startup order is a bit complicated but very important, do not change it unless you know + * what you are doing. *

    - *
  1. Initialize master components - file system manager, server manager, - * assignment manager, region server tracker, etc
  2. - *
  3. Start necessary service threads - balancer, catalog janior, - * executor services, etc
  4. - *
  5. Set cluster as UP in ZooKeeper
  6. - *
  7. Wait for RegionServers to check-in
  8. - *
  9. Split logs and perform data recovery, if necessary
  10. - *
  11. Ensure assignment of meta/namespace regions
  12. - *
  13. Handle either fresh cluster start or master failover
  14. + *
  15. Initialize file system based components - file system manager, wal manager, table + * descriptors, etc
  16. + *
  17. Publish cluster id
  18. + *
  19. Here comes the most complicated part - initialize server manager, assignment manager and + * region server tracker + *
      + *
    1. Create server manager
    2. + *
    3. Create procedure executor, load the procedures, but do not start workers. We will start it + * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same + * server
    4. + *
    5. Create assignment manager and start it, load the meta region state, but do not load data + * from meta region
    6. + *
    7. Start region server tracker, construct the online servers set and find out dead servers and + * schedule SCP for them. The online servers will be constructed by scanning zk, and we will also + * scan the wal directory to find out possible live region servers, and the differences between + * these two sets are the dead servers
    8. + *
    + *
  20. + *
  21. If this is a new deploy, schedule a InitMetaProcedure to initialize meta
  22. + *
  23. Start necessary service threads - balancer, catalog janior, executor services, and also the + * procedure executor, etc. Notice that the balancer must be created first as assignment manager + * may use it when assigning regions.
  24. + *
  25. Wait for meta to be initialized if necesssary, start table state manager.
  26. + *
  27. Wait for enough region servers to check-in
  28. + *
  29. Let assignment manager load data from meta and construct region states
  30. + *
  31. Start all other things such as chore services, etc
  32. *
+ *

+ * Notice that now we will not schedule a special procedure to make meta online(unless the first + * time where meta has not been created yet), we will rely on SCP to bring meta online. */ private void finishActiveMasterInitialization(MonitoredTask status) throws IOException, InterruptedException, KeeperException, ReplicationException { @@ -849,10 +864,20 @@ public class HMaster extends HRegionServer implements MasterServices { ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId()); this.clusterId = clusterId.toString(); - this.serverManager = createServerManager(this); - // This manager is started AFTER hbase:meta is confirmed on line. - // See inside metaBootstrap.recoverMeta(); below. Shouldn't be so cryptic! + + status.setStatus("Initialze ServerManager and schedule SCP for crash servers"); + this.serverManager = createServerManager(this); + createProcedureExecutor(); + // Create Assignment Manager + this.assignmentManager = new AssignmentManager(this); + this.assignmentManager.start(); + this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); + this.regionServerTracker.start( + procedureExecutor.getProcedures().stream().filter(p -> p instanceof ServerCrashProcedure) + .map(p -> ((ServerCrashProcedure) p).getServerName()).collect(Collectors.toSet()), + walManager.getLiveServersFromWALDir()); + // This manager will be started AFTER hbase:meta is confirmed on line. // hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table // state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients. this.tableStateManager = @@ -888,10 +913,37 @@ public class HMaster extends HRegionServer implements MasterServices { status.setStatus("Initializing master coprocessors"); this.cpHost = new MasterCoprocessorHost(this, this.conf); + status.setStatus("Initializing meta table if this is a new deploy"); + InitMetaProcedure initMetaProc = null; + if (assignmentManager.getRegionStates().getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO) + .isOffline()) { + Optional> optProc = procedureExecutor.getProcedures().stream() + .filter(p -> p instanceof InitMetaProcedure).findAny(); + if (optProc.isPresent()) { + initMetaProc = (InitMetaProcedure) optProc.get(); + } else { + // schedule an init meta procedure if meta has not been deployed yet + initMetaProc = new InitMetaProcedure(); + procedureExecutor.submitProcedure(initMetaProc); + } + } + if (this.balancer instanceof FavoredNodesPromoter) { + favoredNodesManager = new FavoredNodesManager(this); + } + + // initialize load balancer + this.balancer.setMasterServices(this); + this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor()); + this.balancer.initialize(); + // start up all service threads. status.setStatus("Initializing master service threads"); startServiceThreads(); - + // wait meta to be initialized after we start procedure executor + if (initMetaProc != null) { + initMetaProc.await(); + } + tableStateManager.start(); // Wake up this server to check in sleeper.skipSleepCycle(); @@ -903,28 +955,11 @@ public class HMaster extends HRegionServer implements MasterServices { LOG.info(Objects.toString(status)); waitForRegionServers(status); - if (this.balancer instanceof FavoredNodesPromoter) { - favoredNodesManager = new FavoredNodesManager(this); - } - - //initialize load balancer - this.balancer.setMasterServices(this); - this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor()); - this.balancer.initialize(); - - // Make sure meta assigned before proceeding. - status.setStatus("Recovering Meta Region"); - // Check if master is shutting down because issue initializing regionservers or balancer. if (isStopped()) { return; } - // Bring up hbase:meta. recoverMeta is a blocking call waiting until hbase:meta is deployed. - // It also starts the TableStateManager. - MasterMetaBootstrap metaBootstrap = createMetaBootstrap(); - metaBootstrap.recoverMeta(); - //Initialize after meta as it scans meta if (favoredNodesManager != null) { SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment = @@ -933,9 +968,6 @@ public class HMaster extends HRegionServer implements MasterServices { favoredNodesManager.initialize(snapshotOfRegionAssignment); } - status.setStatus("Submitting log splitting work for previously failed region servers"); - metaBootstrap.processDeadServers(); - // Fix up assignment manager status status.setStatus("Starting assignment manager"); this.assignmentManager.joinCluster(); @@ -977,6 +1009,7 @@ public class HMaster extends HRegionServer implements MasterServices { setInitialized(true); assignmentManager.checkIfShouldMoveSystemRegionAsync(); status.setStatus("Assign meta replicas"); + MasterMetaBootstrap metaBootstrap = createMetaBootstrap(); metaBootstrap.assignMetaReplicas(); status.setStatus("Starting quota manager"); initQuotaManager(); @@ -1119,7 +1152,6 @@ public class HMaster extends HRegionServer implements MasterServices { private void initQuotaManager() throws IOException { MasterQuotaManager quotaManager = new MasterQuotaManager(this); - this.assignmentManager.setRegionStateListener(quotaManager); quotaManager.start(); this.quotaManager = quotaManager; } @@ -1281,10 +1313,10 @@ public class HMaster extends HRegionServer implements MasterServices { } } - private void startProcedureExecutor() throws IOException { - final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); - procedureStore = new WALProcedureStore(conf, - new MasterProcedureEnv.WALStoreLeaseRecovery(this)); + private void createProcedureExecutor() throws IOException { + MasterProcedureEnv procEnv = new MasterProcedureEnv(this); + procedureStore = + new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler); @@ -1297,10 +1329,17 @@ public class HMaster extends HRegionServer implements MasterServices { conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); procedureStore.start(numThreads); - procedureExecutor.start(numThreads, abortOnCorruption); + // Just initialize it but do not start the workers, we will start the workers later by calling + // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more + // details. + procedureExecutor.init(numThreads, abortOnCorruption); procEnv.getRemoteDispatcher().start(); } + private void startProcedureExecutor() throws IOException { + procedureExecutor.startWorkers(); + } + private void stopProcedureExecutor() { if (procedureExecutor != null) { configurationManager.deregisterObserver(procedureExecutor.getEnvironment()); @@ -2858,25 +2897,6 @@ public class HMaster extends HRegionServer implements MasterServices { } /** - * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing - * of crashed servers. - * @return true if assignMeta has completed; - */ - @Override - public boolean isServerCrashProcessingEnabled() { - return serverCrashProcessingEnabled.isReady(); - } - - @VisibleForTesting - public void setServerCrashProcessingEnabled(final boolean b) { - procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b); - } - - public ProcedureEvent getServerCrashProcessingEnabledEvent() { - return serverCrashProcessingEnabled; - } - - /** * Compute the average load across all region servers. * Currently, this uses a very naive computation - just uses the number of * regions being served, ignoring stats about number of requests. @@ -3623,18 +3643,6 @@ public class HMaster extends HRegionServer implements MasterServices { return lockManager; } - @Override - public boolean recoverMeta() throws IOException { - // we need to block here so the latch should be greater than the current version to make sure - // that we will block. - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(Integer.MAX_VALUE, 0); - procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch)); - latch.await(); - LOG.info("hbase:meta deployed at={}", - getMetaTableLocator().getMetaRegionLocation(getZooKeeper())); - return assignmentManager.isMetaInitialized(); - } - public QuotaObserverChore getQuotaObserverChore() { return this.quotaObserverChore; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java index dd46e411ca..ce2146598a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.List; -import java.util.Set; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory; * Used by the HMaster on startup to split meta logs and assign the meta table. */ @InterfaceAudience.Private -public class MasterMetaBootstrap { +class MasterMetaBootstrap { private static final Logger LOG = LoggerFactory.getLogger(MasterMetaBootstrap.class); private final HMaster master; @@ -48,35 +47,12 @@ public class MasterMetaBootstrap { this.master = master; } - public void recoverMeta() throws InterruptedException, IOException { - // This is a blocking call that waits until hbase:meta is deployed. - master.recoverMeta(); - // Now we can start the TableStateManager. It is backed by hbase:meta. - master.getTableStateManager().start(); - // Enable server crash procedure handling - enableCrashedServerProcessing(); - } - - public void processDeadServers() { - // get a list for previously failed RS which need log splitting work - // we recover hbase:meta region servers inside master initialization and - // handle other failed servers in SSH in order to start up master node ASAP - Set previouslyFailedServers = - master.getMasterWalManager().getFailedServersFromLogFolders(); - - // Master has recovered hbase:meta region server and we put - // other failed region servers in a queue to be handled later by SSH - for (ServerName tmpServer : previouslyFailedServers) { - master.getServerManager().processDeadServer(tmpServer, true); - } - } - /** * For assigning hbase:meta replicas only. * TODO: The way this assign runs, nothing but chance to stop all replicas showing up on same * server as the hbase:meta region. */ - protected void assignMetaReplicas() + void assignMetaReplicas() throws IOException, InterruptedException, KeeperException { int numReplicas = master.getConfiguration().getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM); @@ -85,7 +61,7 @@ public class MasterMetaBootstrap { return; } final AssignmentManager assignmentManager = master.getAssignmentManager(); - if (!assignmentManager.isMetaInitialized()) { + if (!assignmentManager.isMetaLoaded()) { throw new IllegalStateException("hbase:meta must be initialized first before we can " + "assign out its replicas"); } @@ -137,15 +113,4 @@ public class MasterMetaBootstrap { LOG.warn("Ignoring exception " + ex); } } - - private void enableCrashedServerProcessing() throws InterruptedException { - // If crashed server processing is disabled, we enable it and expire those dead but not expired - // servers. This is required so that if meta is assigning to a server which dies after - // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be - // stuck here waiting forever if waitForMeta is specified. - if (!master.isServerCrashProcessingEnabled()) { - master.setServerCrashProcessingEnabled(true); - master.getServerManager().processQueuedDeadServers(); - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 3d2b9af51c..ac521d5aba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -321,11 +321,6 @@ public interface MasterServices extends Server { TableDescriptors getTableDescriptors(); /** - * @return true if master enables ServerShutdownHandler; - */ - boolean isServerCrashProcessingEnabled(); - - /** * Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint. * *

@@ -494,13 +489,6 @@ public interface MasterServices extends Server { */ public void checkIfShouldMoveSystemRegionAsync(); - /** - * Recover meta table. Will result in no-op is meta is already initialized. Any code that has - * access to master and requires to access meta during process initialization can call this - * method to make sure meta is initialized. - */ - boolean recoverMeta() throws IOException; - String getClientIdAuditPrefix(); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 4070ed3818..2dc891872c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -26,7 +26,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -142,10 +143,34 @@ public class MasterWalManager { return this.fsOk; } + public Set getLiveServersFromWALDir() throws IOException { + Path walDirPath = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + FileStatus[] walDirForLiveServers = FSUtils.listStatus(fs, walDirPath, + p -> !p.getName().endsWith(AbstractFSWALProvider.SPLITTING_EXT)); + if (walDirForLiveServers == null) { + return Collections.emptySet(); + } + return Stream.of(walDirForLiveServers).map(s -> { + ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(s.getPath()); + if (serverName == null) { + LOG.warn("Log folder {} doesn't look like its name includes a " + + "region server name; leaving in place. If you see later errors about missing " + + "write ahead logs they may be saved in this location.", s.getPath()); + return null; + } + return serverName; + }).filter(s -> s != null).collect(Collectors.toSet()); + } + /** * Inspect the log directory to find dead servers which need recovery work * @return A set of ServerNames which aren't running but still have WAL files left in file system + * @deprecated With proc-v2, we can record the crash server with procedure store, so do not need + * to scan the wal directory to find out the splitting wal directory any more. Leave + * it here only because {@code RecoverMetaProcedure}(which is also deprecated) uses + * it. */ + @Deprecated public Set getFailedServersFromLogFolders() { boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); @@ -240,6 +265,7 @@ public class MasterWalManager { boolean needReleaseLock = false; if (!this.services.isInitialized()) { // during master initialization, we could have multiple places splitting a same wal + // XXX: Does this still exist after we move to proc-v2? this.splitLogLock.lock(); needReleaseLock = true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java index 12c8e574f8..c599f78368 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java @@ -46,19 +46,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; /** - *

* Tracks the online region servers via ZK. - *

- *

+ *

* Handling of new RSs checking in is done via RPC. This class is only responsible for watching for * expired nodes. It handles listening for changes in the RS node list. The only exception is when * master restart, we will use the list fetched from zk to construct the initial set of live region * servers. - *

- *

+ *

* If an RS node gets deleted, this automatically handles calling of * {@link ServerManager#expireServer(ServerName)} - *

*/ @InterfaceAudience.Private public class RegionServerTracker extends ZKListener { @@ -76,7 +72,7 @@ public class RegionServerTracker extends ZKListener { super(watcher); this.server = server; this.serverManager = serverManager; - executor = Executors.newSingleThreadExecutor( + this.executor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build()); } @@ -109,14 +105,19 @@ public class RegionServerTracker extends ZKListener { } /** - *

- * Starts the tracking of online RegionServers. - *

- *

- * All RSs will be tracked after this method is called. - *

+ * Starts the tracking of online RegionServers. All RSes will be tracked after this method is + * called. + *

+ * In this method, we will also construct the region server sets in {@link ServerManager}. If a + * region server is dead between the crash of the previous master instance and the start of the + * current master instance, we will schedule a SCP for it. This is done in + * {@link ServerManager#findOutDeadServersAndProcess(Set, Set)}, we call it here under the lock + * protection to prevent concurrency issues with server expiration operation. + * @param deadServersFromPE the region servers which already have SCP associated. + * @param liveServersFromWALDir the live region servers from wal directory. */ - public void start() throws KeeperException, IOException { + public void start(Set deadServersFromPE, Set liveServersFromWALDir) + throws KeeperException, IOException { watcher.registerListener(this); synchronized (this) { List servers = @@ -132,6 +133,7 @@ public class RegionServerTracker extends ZKListener { : ServerMetricsBuilder.of(serverName); serverManager.checkAndRecordNewServer(serverName, serverMetrics); } + serverManager.findOutDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index cfbd52fc6e..201466e75c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -25,13 +25,11 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -180,41 +178,6 @@ public class ServerManager { private final RpcControllerFactory rpcControllerFactory; - /** - * Set of region servers which are dead but not processed immediately. If one - * server died before master enables ServerShutdownHandler, the server will be - * added to this set and will be processed through calling - * {@link ServerManager#processQueuedDeadServers()} by master. - *

- * A dead server is a server instance known to be dead, not listed in the /hbase/rs - * znode any more. It may have not been submitted to ServerShutdownHandler yet - * because the handler is not enabled. - *

- * A dead server, which has been submitted to ServerShutdownHandler while the - * handler is not enabled, is queued up. - *

- * So this is a set of region servers known to be dead but not submitted to - * ServerShutdownHandler for processing yet. - */ - private Set queuedDeadServers = new HashSet<>(); - - /** - * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not - * fully processed immediately. - *

- * If one server died before assignment manager finished the failover cleanup, the server will be - * added to this set and will be processed through calling - * {@link ServerManager#processQueuedDeadServers()} by assignment manager. - *

- * The Boolean value indicates whether log split is needed inside ServerShutdownHandler - *

- * ServerShutdownHandler processes a dead server submitted to the handler after the handler is - * enabled. It may not be able to complete the processing because meta is not yet online or master - * is currently in startup mode. In this case, the dead server will be parked in this set - * temporarily. - */ - private Map requeuedDeadServers = new ConcurrentHashMap<>(); - /** Listeners that are called on server events. */ private List listeners = new CopyOnWriteArrayList<>(); @@ -378,6 +341,26 @@ public class ServerManager { } /** + * Find out the region servers crashed between the crash of the previous master instance and the + * current master instance and schedule SCP for them. + *

+ * Since the {@code RegionServerTracker} has already helped us to construct the online servers set + * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir} + * to find out whether there are servers which are already dead. + *

+ * Must be called inside the initialization method of {@code RegionServerTracker} to avoid + * concurrency issue. + * @param deadServersFromPE the region servers which already have SCP associated. + * @param liveServersFromWALDir the live region servers from wal directory. + */ + void findOutDeadServersAndProcess(Set deadServersFromPE, + Set liveServersFromWALDir) { + deadServersFromPE.forEach(deadservers::add); + liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn)) + .forEach(this::expireServer); + } + + /** * Checks if the clock skew between the server and the master. If the clock skew exceeds the * configured max, it will throw an exception; if it exceeds the configured warning threshold, * it will log a warning but start normally. @@ -386,7 +369,7 @@ public class ServerManager { * @throws ClockOutOfSyncException if the skew exceeds the configured max value */ private void checkClockSkew(final ServerName serverName, final long serverCurrentTime) - throws ClockOutOfSyncException { + throws ClockOutOfSyncException { long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime); if (skew > maxSkew) { String message = "Server " + serverName + " has been " + @@ -406,9 +389,7 @@ public class ServerManager { * If this server is on the dead list, reject it with a YouAreDeadException. * If it was dead but came back with a new start code, remove the old entry * from the dead list. - * @param serverName * @param what START or REPORT - * @throws org.apache.hadoop.hbase.YouAreDeadException */ private void checkIsDead(final ServerName serverName, final String what) throws YouAreDeadException { @@ -589,13 +570,12 @@ public class ServerManager { return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode); } - /* - * Expire the passed server. Add it to list of dead servers and queue a - * shutdown processing. - * @return True if we queued a ServerCrashProcedure else false if we did not (could happen - * for many reasons including the fact that its this server that is going down or we already - * have queued an SCP for this server or SCP processing is currently disabled because we are - * in startup phase). + /** + * Expire the passed server. Add it to list of dead servers and queue a shutdown processing. + * @return True if we queued a ServerCrashProcedure else false if we did not (could happen for + * many reasons including the fact that its this server that is going down or we already + * have queued an SCP for this server or SCP processing is currently disabled because we + * are in startup phase). */ public synchronized boolean expireServer(final ServerName serverName) { // THIS server is going down... can't handle our own expiration. @@ -605,18 +585,6 @@ public class ServerManager { } return false; } - // No SCP handling during startup. - if (!master.isServerCrashProcessingEnabled()) { - LOG.info("Master doesn't enable ServerShutdownHandler during initialization, " - + "delay expiring server " + serverName); - // Even though we delay expire of this server, we still need to handle Meta's RIT - // that are against the crashed server; since when we do RecoverMetaProcedure, - // the SCP is not enabled yet and Meta's RIT may be suspend forever. See HBase-19287 - master.getAssignmentManager().handleMetaRITOnCrashedServer(serverName); - this.queuedDeadServers.add(serverName); - // Return true because though on SCP queued, there will be one queued later. - return true; - } if (this.deadservers.isDeadServer(serverName)) { LOG.warn("Expiration called on {} but crash processing already in progress", serverName); return false; @@ -665,52 +633,6 @@ public class ServerManager { this.rsAdmins.remove(sn); } - public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) { - // When assignment manager is cleaning up the zookeeper nodes and rebuilding the - // in-memory region states, region servers could be down. Meta table can and - // should be re-assigned, log splitting can be done too. However, it is better to - // wait till the cleanup is done before re-assigning user regions. - // - // We should not wait in the server shutdown handler thread since it can clog - // the handler threads and meta table could not be re-assigned in case - // the corresponding server is down. So we queue them up here instead. - if (!master.getAssignmentManager().isFailoverCleanupDone()) { - requeuedDeadServers.put(serverName, shouldSplitWal); - return; - } - - this.deadservers.add(serverName); - master.getAssignmentManager().submitServerCrash(serverName, shouldSplitWal); - } - - /** - * Process the servers which died during master's initialization. It will be - * called after HMaster#assignMeta and AssignmentManager#joinCluster. - * */ - synchronized void processQueuedDeadServers() { - if (!master.isServerCrashProcessingEnabled()) { - LOG.info("Master hasn't enabled ServerShutdownHandler"); - } - Iterator serverIterator = queuedDeadServers.iterator(); - while (serverIterator.hasNext()) { - ServerName tmpServerName = serverIterator.next(); - expireServer(tmpServerName); - serverIterator.remove(); - requeuedDeadServers.remove(tmpServerName); - } - - if (!master.getAssignmentManager().isFailoverCleanupDone()) { - if (LOG.isTraceEnabled()) { - LOG.trace("AssignmentManager failover cleanup not done."); - } - } - - for (Map.Entry entry : requeuedDeadServers.entrySet()) { - processDeadServer(entry.getKey(), entry.getValue()); - } - requeuedDeadServers.clear(); - } - /* * Remove the server from the drain list. */ @@ -975,13 +897,6 @@ public class ServerManager { return new ArrayList<>(this.drainingServers); } - /** - * @return A copy of the internal set of deadNotExpired servers. - */ - Set getDeadNotExpiredServers() { - return new HashSet<>(this.queuedDeadServers); - } - public boolean isServerOnline(ServerName serverName) { return serverName != null && onlineServers.containsKey(serverName); } @@ -993,9 +908,7 @@ public class ServerManager { * master any more, for example, a very old previous instance). */ public synchronized boolean isServerDead(ServerName serverName) { - return serverName == null || deadservers.isDeadServer(serverName) - || queuedDeadServers.contains(serverName) - || requeuedDeadServers.containsKey(serverName); + return serverName == null || deadservers.isDeadServer(serverName); } public void shutdownCluster() { @@ -1061,8 +974,6 @@ public class ServerManager { final List drainingServersCopy = getDrainingServersList(); destServers.removeAll(drainingServersCopy); - // Remove the deadNotExpired servers from the server list. - removeDeadNotExpiredServers(destServers); return destServers; } @@ -1073,23 +984,6 @@ public class ServerManager { return createDestinationServersList(null); } - /** - * Loop through the deadNotExpired server list and remove them from the - * servers. - * This function should be used carefully outside of this class. You should use a high level - * method such as {@link #createDestinationServersList()} instead of managing you own list. - */ - void removeDeadNotExpiredServers(List servers) { - Set deadNotExpiredServersCopy = this.getDeadNotExpiredServers(); - if (!deadNotExpiredServersCopy.isEmpty()) { - for (ServerName server : deadNotExpiredServersCopy) { - LOG.debug("Removing dead but not expired server: " + server - + " from eligible server pool."); - servers.remove(server); - } - } - } - /** * To clear any dead server with same host name and port of any online server */ @@ -1259,7 +1153,6 @@ public class ServerManager { } } - private class FlushedSequenceIdFlusher extends ScheduledChore { public FlushedSequenceIdFlusher(String name, int p) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 3412c82f2d..0d1fc16254 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.assignment; import java.io.IOException; @@ -24,7 +23,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,7 +38,6 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.RegionException; -import org.apache.hadoop.hbase.RegionStateListener; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; @@ -66,7 +63,6 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerState; import org.apache.hadoop.hbase.master.assignment.RegionStates.ServerStateNode; import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; -import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; @@ -84,7 +80,10 @@ import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,26 +146,13 @@ public class AssignmentManager implements ServerListener { "hbase.metrics.rit.stuck.warning.threshold"; private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; - private final ProcedureEvent metaInitializedEvent = new ProcedureEvent<>("meta initialized"); + private final ProcedureEvent metaAssignEvent = new ProcedureEvent<>("meta assign"); private final ProcedureEvent metaLoadEvent = new ProcedureEvent<>("meta load"); - /** - * Indicator that AssignmentManager has recovered the region states so - * that ServerCrashProcedure can be fully enabled and re-assign regions - * of dead servers. So that when re-assignment happens, AssignmentManager - * has proper region states. - */ - private final ProcedureEvent failoverCleanupDone = new ProcedureEvent<>("failover cleanup"); - /** Listeners that are called on assignment events. */ private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); - // TODO: why is this different from the listeners (carried over from the old AM) - private RegionStateListener regionStateListener; - - private RegionNormalizer regionNormalizer; - private final MetricsAssignmentManager metrics; private final RegionInTransitionChore ritChore; private final MasterServices master; @@ -210,12 +196,9 @@ public class AssignmentManager implements ServerListener { int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC); this.ritChore = new RegionInTransitionChore(ritChoreInterval); - - // Used for region related procedure. - setRegionNormalizer(master.getRegionNormalizer()); } - public void start() throws IOException { + public void start() throws IOException, KeeperException { if (!running.compareAndSet(false, true)) { return; } @@ -227,6 +210,20 @@ public class AssignmentManager implements ServerListener { // Start the Assignment Thread startAssignmentThread(); + + // load meta region state + ZKWatcher zkw = master.getZooKeeper(); + // it could be null in some tests + if (zkw != null) { + RegionState regionState = MetaTableLocator.getMetaRegionState(zkw); + RegionStateNode regionStateNode = + regionStates.getOrCreateRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO); + synchronized (regionStateNode) { + regionStateNode.setRegionLocation(regionState.getServerName()); + regionStateNode.setState(regionState.getState()); + setMetaAssigned(regionState.getRegion(), regionState.getState() == State.OPEN); + } + } } public void stop() { @@ -257,9 +254,8 @@ public class AssignmentManager implements ServerListener { // Update meta events (for testing) if (hasProcExecutor) { metaLoadEvent.suspend(); - setFailoverCleanupDone(false); for (RegionInfo hri: getMetaRegionSet()) { - setMetaInitialized(hri, false); + setMetaAssigned(hri, false); } } } @@ -288,7 +284,7 @@ public class AssignmentManager implements ServerListener { return getProcedureEnvironment().getProcedureScheduler(); } - protected int getAssignMaxAttempts() { + int getAssignMaxAttempts() { return assignMaxAttempts; } @@ -308,18 +304,6 @@ public class AssignmentManager implements ServerListener { return this.listeners.remove(listener); } - public void setRegionStateListener(final RegionStateListener listener) { - this.regionStateListener = listener; - } - - public void setRegionNormalizer(final RegionNormalizer normalizer) { - this.regionNormalizer = normalizer; - } - - public RegionNormalizer getRegionNormalizer() { - return regionNormalizer; - } - public RegionStates getRegionStates() { return regionStates; } @@ -371,12 +355,8 @@ public class AssignmentManager implements ServerListener { } public boolean isCarryingMeta(final ServerName serverName) { - for (RegionInfo hri: getMetaRegionSet()) { - if (isCarryingRegion(serverName, hri)) { - return true; - } - } - return false; + // TODO: handle multiple meta + return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); } private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { @@ -402,49 +382,66 @@ public class AssignmentManager implements ServerListener { // ============================================================================================ // META Event(s) helpers // ============================================================================================ - public boolean isMetaInitialized() { - return metaInitializedEvent.isReady(); + /** + * Notice that, this only means the meta region is available on a RS, but the AM may still be + * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first + * before checking this method, unless you can make sure that your piece of code can only be + * executed after AM builds the region states. + * @see #isMetaLoaded() + */ + public boolean isMetaAssigned() { + return metaAssignEvent.isReady(); } public boolean isMetaRegionInTransition() { - return !isMetaInitialized(); - } - - public boolean waitMetaInitialized(final Procedure proc) { - // TODO: handle multiple meta. should this wait on all meta? - // this is used by the ServerCrashProcedure... - return waitMetaInitialized(proc, RegionInfoBuilder.FIRST_META_REGIONINFO); + return !isMetaAssigned(); } - public boolean waitMetaInitialized(final Procedure proc, final RegionInfo regionInfo) { - return getMetaInitializedEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); + /** + * Notice that this event does not mean the AM has already finished region state rebuilding. See + * the comment of {@link #isMetaAssigned()} for more details. + * @see #isMetaAssigned() + */ + public boolean waitMetaAssigned(Procedure proc, RegionInfo regionInfo) { + return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); } - private void setMetaInitialized(final RegionInfo metaRegionInfo, final boolean isInitialized) { + private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; - final ProcedureEvent metaInitEvent = getMetaInitializedEvent(metaRegionInfo); - if (isInitialized) { - metaInitEvent.wake(getProcedureScheduler()); + ProcedureEvent metaAssignEvent = getMetaAssignEvent(metaRegionInfo); + if (assigned) { + metaAssignEvent.wake(getProcedureScheduler()); } else { - metaInitEvent.suspend(); + metaAssignEvent.suspend(); } } - private ProcedureEvent getMetaInitializedEvent(final RegionInfo metaRegionInfo) { + private ProcedureEvent getMetaAssignEvent(RegionInfo metaRegionInfo) { assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; // TODO: handle multiple meta. - return metaInitializedEvent; + return metaAssignEvent; } - public boolean waitMetaLoaded(final Procedure proc) { + /** + * Wait until AM finishes the meta loading, i.e, the region states rebuilding. + * @see #isMetaLoaded() + * @see #waitMetaAssigned(Procedure, RegionInfo) + */ + public boolean waitMetaLoaded(Procedure proc) { return metaLoadEvent.suspendIfNotReady(proc); } - protected void wakeMetaLoadedEvent() { + @VisibleForTesting + void wakeMetaLoadedEvent() { metaLoadEvent.wake(getProcedureScheduler()); assert isMetaLoaded() : "expected meta to be loaded"; } + /** + * Return whether AM finishes the meta loading, i.e, the region states rebuilding. + * @see #isMetaAssigned() + * @see #waitMetaLoaded(Procedure) + */ public boolean isMetaLoaded() { return metaLoadEvent.isReady(); } @@ -849,7 +846,7 @@ public class AssignmentManager implements ServerListener { private void updateRegionTransition(final ServerName serverName, final TransitionCode state, final RegionInfo regionInfo, final long seqId) throws PleaseHoldException, UnexpectedStateException { - checkFailoverCleanupCompleted(regionInfo); + checkMetaLoaded(regionInfo); final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); if (regionNode == null) { @@ -890,7 +887,7 @@ public class AssignmentManager implements ServerListener { private void updateRegionSplitTransition(final ServerName serverName, final TransitionCode state, final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB) throws IOException { - checkFailoverCleanupCompleted(parent); + checkMetaLoaded(parent); if (state != TransitionCode.READY_TO_SPLIT) { throw new UnexpectedStateException("unsupported split regionState=" + state + @@ -922,7 +919,7 @@ public class AssignmentManager implements ServerListener { private void updateRegionMergeTransition(final ServerName serverName, final TransitionCode state, final RegionInfo merged, final RegionInfo hriA, final RegionInfo hriB) throws IOException { - checkFailoverCleanupCompleted(merged); + checkMetaLoaded(merged); if (state != TransitionCode.READY_TO_MERGE) { throw new UnexpectedStateException("Unsupported merge regionState=" + state + @@ -1063,7 +1060,7 @@ public class AssignmentManager implements ServerListener { } } - protected boolean waitServerReportEvent(final ServerName serverName, final Procedure proc) { + protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) { final ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); if (serverNode == null) { LOG.warn("serverName=null; {}", proc); @@ -1152,7 +1149,7 @@ public class AssignmentManager implements ServerListener { public Collection getRegionOverThreshold() { Map m = this.ritsOverThreshold; - return m != null? m.values(): Collections.EMPTY_SET; + return m != null? m.values(): Collections.emptySet(); } public boolean isRegionOverThreshold(final RegionInfo regionInfo) { @@ -1209,27 +1206,44 @@ public class AssignmentManager implements ServerListener { // TODO: Master load/bootstrap // ============================================================================================ public void joinCluster() throws IOException { - final long startTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); LOG.debug("Joining cluster..."); // Scan hbase:meta to build list of existing regions, servers, and assignment // hbase:meta is online when we get to here and TableStateManager has been started. loadMeta(); - for (int i = 0; master.getServerManager().countOfRegionServers() < 1; ++i) { - LOG.info("Waiting for RegionServers to join; current count=" + - master.getServerManager().countOfRegionServers()); + while (master.getServerManager().countOfRegionServers() < 1) { + LOG.info("Waiting for RegionServers to join; current count={}", + master.getServerManager().countOfRegionServers()); Threads.sleep(250); } - LOG.info("Number of RegionServers=" + master.getServerManager().countOfRegionServers()); + LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); - boolean failover = processofflineServersWithOnlineRegions(); + processOfflineRegions(); // Start the RIT chore master.getMasterProcedureExecutor().addChore(this.ritChore); - LOG.info(String.format("Joined the cluster in %s, failover=%s", - StringUtils.humanTimeDiff(System.currentTimeMillis() - startTime), failover)); + long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); + } + + // Create assign procedure for offline regions. + // Just follow the old processofflineServersWithOnlineRegions method. Since now we do not need to + // deal with dead server any more, we only deal with the regions in OFFLINE state in this method. + // And this is a bit strange, that for new regions, we will add it in CLOSED state instead of + // OFFLINE state, and usually there will be a procedure to track them. The + // processofflineServersWithOnlineRegions is a legacy from long ago, as things are going really + // different now, maybe we do not need this method any more. Need to revisit later. + private void processOfflineRegions() { + List offlineRegions = regionStates.getRegionStates().stream() + .filter(RegionState::isOffline).filter(s -> isTableEnabled(s.getRegion().getTable())) + .map(RegionState::getRegion).collect(Collectors.toList()); + if (!offlineRegions.isEmpty()) { + master.getMasterProcedureExecutor().submitProcedures( + master.getAssignmentManager().createRoundRobinAssignProcedures(offlineRegions)); + } } private void loadMeta() throws IOException { @@ -1286,117 +1300,21 @@ public class AssignmentManager implements ServerListener { } /** - * Look at what is in meta and the list of servers that have checked in and make reconciliation. - * We cannot tell definitively the difference between a clean shutdown and a cluster that has - * been crashed down. At this stage of a Master startup, they look the same: they have the - * same state in hbase:meta. We could do detective work probing ZK and the FS for old WALs to - * split but SCP does this already so just let it do its job. - *

>The profiles of clean shutdown and cluster crash-down are the same because on clean - * shutdown currently, we do not update hbase:meta with region close state (In AMv2, region - * state is kept in hbse:meta). Usually the master runs all region transitions as of AMv2 but on - * cluster controlled shutdown, the RegionServers close all their regions only reporting the - * final change to the Master. Currently this report is ignored. Later we could take it and - * update as many regions as we can before hbase:meta goes down or have the master run the - * close of all regions out on the cluster but we may never be able to achieve the proper state on - * all regions (at least not w/o lots of painful manipulations and waiting) so clean shutdown - * might not be possible especially on big clusters.... And clean shutdown will take time. Given - * this current state of affairs, we just run ServerCrashProcedure in both cases. It will always - * do the right thing. - * @return True if for sure this is a failover where a Master is starting up into an already - * running cluster. - */ - // The assumption here is that if RSs are crashing while we are executing this - // they will be handled by the SSH that are put in the ServerManager deadservers "queue". - private boolean processofflineServersWithOnlineRegions() { - boolean deadServers = !master.getServerManager().getDeadServers().isEmpty(); - final Set offlineServersWithOnlineRegions = new HashSet<>(); - int size = regionStates.getRegionStateNodes().size(); - final List offlineRegionsToAssign = new ArrayList<>(size); - // If deadservers then its a failover, else, we are not sure yet. - boolean failover = deadServers; - for (RegionStateNode regionNode: regionStates.getRegionStateNodes()) { - // Region State can be OPEN even if we did controlled cluster shutdown; Master does not close - // the regions in this case. The RegionServer does the close so hbase:meta is state in - // hbase:meta is not updated -- Master does all updates -- and is left with OPEN as region - // state in meta. How to tell difference between ordered shutdown and crashed-down cluster - // then? We can't. Not currently. Perhaps if we updated hbase:meta with CLOSED on ordered - // shutdown. This would slow shutdown though and not all edits would make it in anyways. - // TODO: Examine. - // Because we can't be sure it an ordered shutdown, we run ServerCrashProcedure always. - // ServerCrashProcedure will try to retain old deploy when it goes to assign. - if (regionNode.getState() == State.OPEN) { - final ServerName serverName = regionNode.getRegionLocation(); - if (!master.getServerManager().isServerOnline(serverName)) { - offlineServersWithOnlineRegions.add(serverName); - } else { - // Server is online. This a failover. Master is starting into already-running cluster. - failover = true; - } - } else if (regionNode.getState() == State.OFFLINE) { - if (isTableEnabled(regionNode.getTable())) { - offlineRegionsToAssign.add(regionNode.getRegionInfo()); - } - } - } - // Kill servers with online regions just-in-case. Runs ServerCrashProcedure. - for (ServerName serverName: offlineServersWithOnlineRegions) { - if (!master.getServerManager().isServerOnline(serverName)) { - LOG.info("KILL RegionServer=" + serverName + " hosting regions but not online."); - killRegionServer(serverName); - } - } - setFailoverCleanupDone(true); - - // Assign offline regions. Uses round-robin. - if (offlineRegionsToAssign.size() > 0) { - master.getMasterProcedureExecutor().submitProcedures(master.getAssignmentManager(). - createRoundRobinAssignProcedures(offlineRegionsToAssign)); - } - - return failover; - } - - /** - * Used by ServerCrashProcedure to make sure AssignmentManager has completed - * the failover cleanup before re-assigning regions of dead servers. So that - * when re-assignment happens, AssignmentManager has proper region states. - */ - public boolean isFailoverCleanupDone() { - return failoverCleanupDone.isReady(); - } - - /** - * Used by ServerCrashProcedure tests verify the ability to suspend the - * execution of the ServerCrashProcedure. - */ - @VisibleForTesting - public void setFailoverCleanupDone(final boolean b) { - master.getMasterProcedureExecutor().getEnvironment() - .setEventReady(failoverCleanupDone, b); - } - - public ProcedureEvent getFailoverCleanupEvent() { - return failoverCleanupDone; - } - - /** - * Used to check if the failover cleanup is done. + * Used to check if the meta loading is done. + *

* if not we throw PleaseHoldException since we are rebuilding the RegionStates * @param hri region to check if it is already rebuild - * @throws PleaseHoldException if the failover cleanup is not completed + * @throws PleaseHoldException if meta has not been loaded yet */ - private void checkFailoverCleanupCompleted(final RegionInfo hri) throws PleaseHoldException { + private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { if (!isRunning()) { throw new PleaseHoldException("AssignmentManager not running"); } - - // TODO: can we avoid throwing an exception if hri is already loaded? - // at the moment we bypass only meta boolean meta = isMetaRegion(hri); - boolean cleanup = isFailoverCleanupDone(); - if (!isMetaRegion(hri) && !isFailoverCleanupDone()) { - String msg = "Master not fully online; hbase:meta=" + meta + ", failoverCleanup=" + cleanup; - throw new PleaseHoldException(msg); + boolean metaLoaded = isMetaLoaded(); + if (!meta && !metaLoaded) { + throw new PleaseHoldException( + "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); } } @@ -1539,7 +1457,7 @@ public class AssignmentManager implements ServerListener { // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state // on table that contains state. - setMetaInitialized(hri, true); + setMetaAssigned(hri, true); } regionStates.addRegionToServer(regionNode); // TODO: OPENING Updates hbase:meta too... we need to do both here and there? @@ -1555,7 +1473,7 @@ public class AssignmentManager implements ServerListener { regionNode.transitionState(State.CLOSING, RegionStates.STATES_EXPECTED_ON_CLOSE); // Set meta has not initialized early. so people trying to create/edit tables will wait if (isMetaRegion(hri)) { - setMetaInitialized(hri, false); + setMetaAssigned(hri, false); } regionStates.addRegionToServer(regionNode); regionStateStore.updateRegionLocation(regionNode); @@ -1831,7 +1749,7 @@ public class AssignmentManager implements ServerListener { private void acceptPlan(final HashMap regions, final Map> plan) throws HBaseIOException { - final ProcedureEvent[] events = new ProcedureEvent[regions.size()]; + final ProcedureEvent[] events = new ProcedureEvent[regions.size()]; final long st = System.currentTimeMillis(); if (plan == null) { @@ -1883,7 +1801,7 @@ public class AssignmentManager implements ServerListener { .map((s)->new Pair<>(s, master.getRegionServerVersion(s))) .collect(Collectors.toList()); if (serverList.isEmpty()) { - return Collections.EMPTY_LIST; + return Collections.emptyList(); } String highestVersion = Collections.max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())).getSecond(); @@ -1909,11 +1827,6 @@ public class AssignmentManager implements ServerListener { wakeServerReportEvent(serverNode); } - private void killRegionServer(final ServerName serverName) { - final ServerStateNode serverNode = regionStates.getServerNode(serverName); - killRegionServer(serverNode); - } - private void killRegionServer(final ServerStateNode serverNode) { master.getServerManager().expireServer(serverNode.getServerName()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index 57e71f897a..4d454d7cc9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -553,7 +553,7 @@ public class MergeTableRegionsProcedure try { env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion); } catch (QuotaExceededException e) { - env.getAssignmentManager().getRegionNormalizer().planSkipped(this.mergedRegion, + env.getMasterServices().getRegionNormalizer().planSkipped(this.mergedRegion, NormalizationPlan.PlanType.MERGE); throw e; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java index fa252316d6..2124d84378 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -128,23 +128,24 @@ public class RegionStateStore { public void updateRegionLocation(RegionStates.RegionStateNode regionStateNode) throws IOException { if (regionStateNode.getRegionInfo().isMetaRegion()) { - updateMetaLocation(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); + updateMetaLocation(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation(), + regionStateNode.getState()); } else { - long openSeqNum = regionStateNode.getState() == State.OPEN ? - regionStateNode.getOpenSeqNum() : HConstants.NO_SEQNUM; + long openSeqNum = regionStateNode.getState() == State.OPEN ? regionStateNode.getOpenSeqNum() + : HConstants.NO_SEQNUM; updateUserRegionLocation(regionStateNode.getRegionInfo(), regionStateNode.getState(), - regionStateNode.getRegionLocation(), regionStateNode.getLastHost(), openSeqNum, - // The regionStateNode may have no procedure in a test scenario; allow for this. - regionStateNode.getProcedure() != null? - regionStateNode.getProcedure().getProcId(): Procedure.NO_PROC_ID); + regionStateNode.getRegionLocation(), regionStateNode.getLastHost(), openSeqNum, + // The regionStateNode may have no procedure in a test scenario; allow for this. + regionStateNode.getProcedure() != null ? regionStateNode.getProcedure().getProcId() + : Procedure.NO_PROC_ID); } } - private void updateMetaLocation(final RegionInfo regionInfo, final ServerName serverName) + private void updateMetaLocation(RegionInfo regionInfo, ServerName serverName, State state) throws IOException { try { - MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, - regionInfo.getReplicaId(), State.OPEN); + MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, regionInfo.getReplicaId(), + state); } catch (KeeperException e) { throw new IOException(e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index b96fb20d2d..c3b2458ddc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -421,9 +421,11 @@ public abstract class RegionTransitionProcedure @Override protected LockState acquireLock(final MasterProcedureEnv env) { // Unless we are assigning meta, wait for meta to be available and loaded. - if (!isMeta() && (env.waitFailoverCleanup(this) || - env.getAssignmentManager().waitMetaInitialized(this, getRegionInfo()))) { - return LockState.LOCK_EVENT_WAIT; + if (!isMeta()) { + AssignmentManager am = env.getAssignmentManager(); + if (am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo)) { + return LockState.LOCK_EVENT_WAIT; + } } // TODO: Revisit this and move it to the executor @@ -432,8 +434,7 @@ public abstract class RegionTransitionProcedure LOG.debug(LockState.LOCK_EVENT_WAIT + " pid=" + getProcId() + " " + env.getProcedureScheduler().dumpLocks()); } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + // ignore, just for logging } return LockState.LOCK_EVENT_WAIT; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index 341affb10a..230603721f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -498,7 +498,7 @@ public class SplitTableRegionProcedure try { env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion()); } catch (QuotaExceededException e) { - env.getAssignmentManager().getRegionNormalizer().planSkipped(this.getParentRegion(), + env.getMasterServices().getRegionNormalizer().planSkipped(this.getParentRegion(), NormalizationPlan.PlanType.SPLIT); throw e; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index 5815255bb3..1af244590c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; @@ -31,7 +30,6 @@ import org.apache.hadoop.hbase.client.DoNotRetryRegionException; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionOfflineException; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.TableStateManager; @@ -92,7 +90,9 @@ public abstract class AbstractStateMachineTableProcedure @Override protected LockState acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; + if (env.waitInitialized(this)) { + return LockState.LOCK_EVENT_WAIT; + } if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) { return LockState.LOCK_EVENT_WAIT; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java new file mode 100644 index 0000000000..4736d6576f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.master.assignment.AssignProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.InitMetaState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.InitMetaStateData; + +/** + * This procedure is used to initialize meta table for a new hbase deploy. It will just schedule an + * {@link AssignProcedure} to assign meta. + */ +@InterfaceAudience.Private +public class InitMetaProcedure extends AbstractStateMachineTableProcedure { + + private CountDownLatch latch = new CountDownLatch(1); + + @Override + public TableName getTableName() { + return TableName.META_TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.CREATE; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, InitMetaState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + switch (state) { + case INIT_META_ASSIGN_META: + addChildProcedure(env.getAssignmentManager() + .createAssignProcedure(RegionInfoBuilder.FIRST_META_REGIONINFO)); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + // we do not need to wait for master initialized, we are part of the initialization. + if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void rollbackState(MasterProcedureEnv env, InitMetaState state) + throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected InitMetaState getState(int stateId) { + return InitMetaState.forNumber(stateId); + } + + @Override + protected int getStateId(InitMetaState state) { + return state.getNumber(); + } + + @Override + protected InitMetaState getInitialState() { + return InitMetaState.INIT_META_ASSIGN_META; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(InitMetaStateData.getDefaultInstance()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + serializer.deserialize(InitMetaStateData.class); + } + + @Override + protected void completionCleanup(MasterProcedureEnv env) { + latch.countDown(); + } + + public void await() throws InterruptedException { + latch.await(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 7fb187fe05..0ec932ce47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -19,13 +19,11 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; @@ -70,8 +68,7 @@ public class MasterProcedureEnv implements ConfigurationObserver { } private boolean isRunning() { - return master.isActiveMaster() && !master.isStopped() && - !master.isStopping() && !master.isAborted(); + return !master.isStopped() && !master.isStopping() && !master.isAborted(); } } @@ -155,17 +152,6 @@ public class MasterProcedureEnv implements ConfigurationObserver { return master.getInitializedEvent().suspendIfNotReady(proc); } - public boolean waitServerCrashProcessingEnabled(Procedure proc) { - if (master instanceof HMaster) { - return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc); - } - return false; - } - - public boolean waitFailoverCleanup(Procedure proc) { - return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc); - } - public void setEventReady(ProcedureEvent event, boolean isReady) { if (isReady) { event.wake(procSched); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 373a957258..8389961572 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; @@ -480,7 +479,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param table Table to lock * @return true if the procedure has to wait for the table to be available */ - public boolean waitTableExclusiveLock(final Procedure procedure, final TableName table) { + public boolean waitTableExclusiveLock(final Procedure procedure, final TableName table) { schedLock(); try { final String namespace = table.getNamespaceAsString(); @@ -509,7 +508,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure releasing the lock * @param table the name of the table that has the exclusive lock */ - public void wakeTableExclusiveLock(final Procedure procedure, final TableName table) { + public void wakeTableExclusiveLock(final Procedure procedure, final TableName table) { schedLock(); try { final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); @@ -537,7 +536,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param table Table to lock * @return true if the procedure has to wait for the table to be available */ - public boolean waitTableSharedLock(final Procedure procedure, final TableName table) { + public boolean waitTableSharedLock(final Procedure procedure, final TableName table) { return waitTableQueueSharedLock(procedure, table) == null; } @@ -568,7 +567,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure releasing the lock * @param table the name of the table that has the shared lock */ - public void wakeTableSharedLock(final Procedure procedure, final TableName table) { + public void wakeTableSharedLock(final Procedure procedure, final TableName table) { schedLock(); try { final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); @@ -629,7 +628,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param regionInfo the region we are trying to lock * @return true if the procedure has to wait for the regions to be available */ - public boolean waitRegion(final Procedure procedure, final RegionInfo regionInfo) { + public boolean waitRegion(final Procedure procedure, final RegionInfo regionInfo) { return waitRegions(procedure, regionInfo.getTable(), regionInfo); } @@ -640,7 +639,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param regionInfo the list of regions we are trying to lock * @return true if the procedure has to wait for the regions to be available */ - public boolean waitRegions(final Procedure procedure, final TableName table, + public boolean waitRegions(final Procedure procedure, final TableName table, final RegionInfo... regionInfo) { Arrays.sort(regionInfo, RegionInfo.COMPARATOR); schedLock(); @@ -688,7 +687,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure that was holding the region * @param regionInfo the region the procedure was holding */ - public void wakeRegion(final Procedure procedure, final RegionInfo regionInfo) { + public void wakeRegion(final Procedure procedure, final RegionInfo regionInfo) { wakeRegions(procedure, regionInfo.getTable(), regionInfo); } @@ -697,7 +696,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure that was holding the regions * @param regionInfo the list of regions the procedure was holding */ - public void wakeRegions(final Procedure procedure,final TableName table, + public void wakeRegions(final Procedure procedure,final TableName table, final RegionInfo... regionInfo) { Arrays.sort(regionInfo, RegionInfo.COMPARATOR); schedLock(); @@ -744,7 +743,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param namespace Namespace to lock * @return true if the procedure has to wait for the namespace to be available */ - public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String namespace) { + public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String namespace) { schedLock(); try { final LockAndQueue systemNamespaceTableLock = @@ -775,7 +774,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure releasing the lock * @param namespace the namespace that has the exclusive lock */ - public void wakeNamespaceExclusiveLock(final Procedure procedure, final String namespace) { + public void wakeNamespaceExclusiveLock(final Procedure procedure, final String namespace) { schedLock(); try { final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); @@ -893,7 +892,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @see #wakeMetaExclusiveLock(Procedure) * @param procedure the procedure trying to acquire the lock * @return true if the procedure has to wait for meta to be available + * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with + * {@link RecoverMetaProcedure}. */ + @Deprecated public boolean waitMetaExclusiveLock(Procedure procedure) { schedLock(); try { @@ -914,7 +916,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * Wake the procedures waiting for meta. * @see #waitMetaExclusiveLock(Procedure) * @param procedure the procedure releasing the lock + * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with + * {@link RecoverMetaProcedure}. */ + @Deprecated public void wakeMetaExclusiveLock(Procedure procedure) { schedLock(); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaProcedureInterface.java index 39c271b9bf..39892acada 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaProcedureInterface.java @@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.master.procedure; import org.apache.yetus.audience.InterfaceAudience; +/** + * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with + * {@link RecoverMetaProcedure}. + */ +@Deprecated @InterfaceAudience.Private public interface MetaProcedureInterface { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaQueue.java index 190c95654c..f4121e8ae3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MetaQueue.java @@ -22,6 +22,11 @@ import org.apache.hadoop.hbase.procedure2.LockStatus; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.yetus.audience.InterfaceAudience; +/** + * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with + * {@link RecoverMetaProcedure}. + */ +@Deprecated @InterfaceAudience.Private class MetaQueue extends Queue { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RecoverMetaProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RecoverMetaProcedure.java index 97035f14fb..3b848fa33c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RecoverMetaProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RecoverMetaProcedure.java @@ -48,7 +48,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R * This procedure recovers meta from prior shutdown/ crash of a server, and brings meta online by * assigning meta region/s. Any place where meta is accessed and requires meta to be online, need to * submit this procedure instead of duplicating steps to recover meta in the code. + *

+ * @deprecated Do not use any more, leave it here only for compatible. The recovery work will be + * done in {@link ServerCrashProcedure} directly, and the initial work for meta table + * will be done by {@link InitMetaProcedure}. + * @see ServerCrashProcedure + * @see InitMetaProcedure */ +@Deprecated @InterfaceAudience.Private public class RecoverMetaProcedure extends StateMachineProcedure @@ -281,7 +288,7 @@ public class RecoverMetaProcedure * already initialized */ private boolean isRunRequired() { - return failedMetaServer != null || !master.getAssignmentManager().isMetaInitialized(); + return failedMetaServer != null || !master.getAssignmentManager().isMetaAssigned(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java index 4bd35f3f25..9a5eb7e64d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -78,6 +78,11 @@ class SchemaLocking { return getLock(regionLocks, encodedRegionName); } + /** + * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with + * {@link RecoverMetaProcedure}. + */ + @Deprecated LockAndQueue getMetaLock() { return metaLock; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 5a4c10fabc..775c8c2bea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -19,12 +19,14 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterWalManager; @@ -83,11 +85,8 @@ public class ServerCrashProcedure * @param shouldSplitWal True if we should split WALs as part of crashed server processing. * @param carryingMeta True if carrying hbase:meta table region. */ - public ServerCrashProcedure( - final MasterProcedureEnv env, - final ServerName serverName, - final boolean shouldSplitWal, - final boolean carryingMeta) { + public ServerCrashProcedure(final MasterProcedureEnv env, final ServerName serverName, + final boolean shouldSplitWal, final boolean carryingMeta) { this.serverName = serverName; this.shouldSplitWal = shouldSplitWal; this.carryingMeta = carryingMeta; @@ -119,18 +118,32 @@ public class ServerCrashProcedure LOG.info("Start " + this); // If carrying meta, process it first. Else, get list of regions on crashed server. if (this.carryingMeta) { - setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META); + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS); } else { setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); } break; - + case SERVER_CRASH_SPLIT_META_LOGS: + splitMetaLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + break; + case SERVER_CRASH_ASSIGN_META: + handleRIT(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO)); + addChildProcedure(env.getAssignmentManager() + .createAssignProcedure(RegionInfoBuilder.FIRST_META_REGIONINFO)); + setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); + break; + case SERVER_CRASH_PROCESS_META: + // not used any more but still leave it here to keep compatible as there maybe old SCP + // which is stored in ProcedureStore which has this state. + processMeta(env); + setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); + break; case SERVER_CRASH_GET_REGIONS: // If hbase:meta is not assigned, yield. if (env.getAssignmentManager().waitMetaLoaded(this)) { throw new ProcedureSuspendedException(); } - this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates() .getServerRegionInfoSet(serverName); // Where to go next? Depends on whether we should split logs at all or @@ -141,17 +154,10 @@ public class ServerCrashProcedure setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); } break; - - case SERVER_CRASH_PROCESS_META: - processMeta(env); - setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); - break; - case SERVER_CRASH_SPLIT_LOGS: splitLogs(env); setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); break; - case SERVER_CRASH_ASSIGN: // If no regions to assign, skip assign and skip to the finish. // Filter out meta regions. Those are handled elsewhere in this procedure. @@ -177,18 +183,15 @@ public class ServerCrashProcedure setNextState(ServerCrashState.SERVER_CRASH_FINISH); } break; - case SERVER_CRASH_HANDLE_RIT2: // Noop. Left in place because we used to call handleRIT here for a second time // but no longer necessary since HBASE-20634. setNextState(ServerCrashState.SERVER_CRASH_FINISH); break; - case SERVER_CRASH_FINISH: services.getAssignmentManager().getRegionStates().removeServer(serverName); services.getServerManager().getDeadServers().finish(serverName); return Flow.NO_MORE_STATE; - default: throw new UnsupportedOperationException("unhandled state=" + state); } @@ -198,11 +201,6 @@ public class ServerCrashProcedure return Flow.HAS_MORE_STATE; } - - /** - * @param env - * @throws IOException - */ private void processMeta(final MasterProcedureEnv env) throws IOException { LOG.debug("{}; processing hbase:meta", this); @@ -227,10 +225,18 @@ public class ServerCrashProcedure RegionReplicaUtil.isDefaultReplica(hri); } + private void splitMetaLogs(MasterProcedureEnv env) throws IOException { + LOG.debug("Splitting meta WALs {}", this); + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + am.getRegionStates().metaLogSplitting(serverName); + mwm.splitMetaLog(serverName); + am.getRegionStates().metaLogSplit(serverName); + LOG.debug("Done splitting meta WALs {}", this); + } + private void splitLogs(final MasterProcedureEnv env) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Splitting WALs " + this); - } + LOG.debug("Splitting WALs {}", this); MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); AssignmentManager am = env.getMasterServices().getAssignmentManager(); // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running. @@ -271,9 +277,6 @@ public class ServerCrashProcedure @Override protected LockState acquireLock(final MasterProcedureEnv env) { - // TODO: Put this BACK AFTER AMv2 goes in!!!! - // if (env.waitFailoverCleanup(this)) return LockState.LOCK_EVENT_WAIT; - if (env.waitServerCrashProcessingEnabled(this)) return LockState.LOCK_EVENT_WAIT; if (env.getProcedureScheduler().waitServerExclusiveLock(this, getServerName())) { return LockState.LOCK_EVENT_WAIT; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index 9161e255bb..f7865ee330 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -108,8 +108,6 @@ public class TestMetaTableAccessor { * Does {@link MetaTableAccessor#getRegion(Connection, byte[])} and a write * against hbase:meta while its hosted server is restarted to prove our retrying * works. - * @throws IOException - * @throws InterruptedException */ @Test public void testRetrying() throws IOException, InterruptedException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 02566605a0..34570d37b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -20,10 +20,8 @@ package org.apache.hadoop.hbase.master; import static org.mockito.Mockito.mock; import com.google.protobuf.Service; - import java.io.IOException; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.ChoreService; @@ -57,8 +55,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import com.google.protobuf.Service; - public class MockNoopMasterServices implements MasterServices { private final Configuration conf; private final MetricsMaster metricsMaster; @@ -214,16 +210,6 @@ public class MockNoopMasterServices implements MasterServices { return null; } - private boolean serverCrashProcessingEnabled = true; - - public void setServerCrashProcessingEnabled(boolean b) { - serverCrashProcessingEnabled = b; - } - @Override - public boolean isServerCrashProcessingEnabled() { - return serverCrashProcessingEnabled; - } - @Override public boolean registerService(Service instance) { return false; @@ -453,11 +439,6 @@ public class MockNoopMasterServices implements MasterServices { } @Override - public boolean recoverMeta() throws IOException { - return false; - } - - @Override public String getClientIdAuditPrefix() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index b8a53b640c..27eec65274 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Triple; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -90,7 +91,7 @@ public class TestCatalogJanitor { } @Before - public void setup() throws IOException { + public void setup() throws IOException, KeeperException { setRootDirAndCleanIt(HTU, this.name.getMethodName()); NavigableMap> regionsToRegionServers = new ConcurrentSkipListMap>(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java index 2272bec8b5..c4a2f0389e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.SortedSet; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateManager; @@ -56,11 +55,18 @@ import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; -import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.zookeeper.KeeperException; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; @@ -70,10 +76,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; -import org.apache.hadoop.hbase.util.FSUtils; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; /** @@ -174,7 +176,7 @@ public class MockMasterServices extends MockNoopMasterServices { } public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) - throws IOException { + throws IOException, KeeperException { startProcedureExecutor(remoteDispatcher); this.assignmentManager.start(); for (int i = 0; i < numServes; ++i) { @@ -217,17 +219,14 @@ public class MockMasterServices extends MockNoopMasterServices { private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) throws IOException { final Configuration conf = getConfiguration(); - final Path logDir = new Path(fileSystemManager.getRootDir(), - WALProcedureStore.MASTER_PROCEDURE_LOGDIR); - this.procedureStore = new NoopProcedureStore(); this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); this.procedureEnv = new MasterProcedureEnv(this, remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); - this.procedureExecutor = new ProcedureExecutor(conf, procedureEnv, procedureStore, - procedureEnv.getProcedureScheduler()); + this.procedureExecutor = new ProcedureExecutor<>(conf, procedureEnv, procedureStore, + procedureEnv.getProcedureScheduler()); final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(Runtime.getRuntime().availableProcessors(), @@ -236,7 +235,7 @@ public class MockMasterServices extends MockNoopMasterServices { MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); this.procedureStore.start(numThreads); - this.procedureExecutor.start(numThreads, abortOnCorruption); + ProcedureTestingUtility.initAndStartWorkers(procedureExecutor, numThreads, abortOnCorruption); this.procedureEnv.getRemoteDispatcher().start(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 0aac4c7dac..08ecb815ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -150,7 +150,6 @@ public class TestAssignmentManager { rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); am.wakeMetaLoadedEvent(); - am.setFailoverCleanupDone(true); } @After @@ -427,18 +426,15 @@ public class TestAssignmentManager { am = master.getAssignmentManager(); // Assign meta - master.setServerCrashProcessingEnabled(false); rsDispatcher.setMockRsExecutor(new HangThenRSRestartExecutor()); am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); - assertEquals(true, am.isMetaInitialized()); + assertEquals(true, am.isMetaAssigned()); // set it back as default, see setUpMeta() - master.setServerCrashProcessingEnabled(true); am.wakeMetaLoadedEvent(); - am.setFailoverCleanupDone(true); } - private Future submitProcedure(final Procedure proc) { + private Future submitProcedure(final Procedure proc) { return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); } @@ -449,7 +445,7 @@ public class TestAssignmentManager { LOG.info("ExecutionException", e); Exception ee = (Exception)e.getCause(); if (ee instanceof InterruptedIOException) { - for (Procedure p: this.master.getMasterProcedureExecutor().getProcedures()) { + for (Procedure p: this.master.getMasterProcedureExecutor().getProcedures()) { LOG.info(p.toStringDetails()); } } @@ -493,13 +489,6 @@ public class TestAssignmentManager { return proc; } - private UnassignProcedure createAndSubmitUnassign(TableName tableName, int regionId) { - RegionInfo hri = createRegionInfo(tableName, regionId); - UnassignProcedure proc = am.createUnassignProcedure(hri, null, false); - master.getMasterProcedureExecutor().submitProcedure(proc); - return proc; - } - private RegionInfo createRegionInfo(final TableName tableName, final long regionId) { return RegionInfoBuilder.newBuilder(tableName) .setStartKey(Bytes.toBytes(regionId)) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java index 3f61de462f..1c1a36ea5b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterMetaBootstrap; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; @@ -85,7 +84,6 @@ public class MasterProcedureTestingUtility { env.getMasterServices().getServerManager().removeRegion(regionState.getRegion()); } am.stop(); - master.setServerCrashProcessingEnabled(false); master.setInitialized(false); return null; } @@ -96,9 +94,6 @@ public class MasterProcedureTestingUtility { public Void call() throws Exception { final AssignmentManager am = env.getAssignmentManager(); am.start(); - MasterMetaBootstrap metaBootstrap = new MasterMetaBootstrap(master); - metaBootstrap.recoverMeta(); - metaBootstrap.processDeadServers(); am.joinCluster(); master.setInitialized(true); return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java index 96bdbde83b..9a0e2f61ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -22,18 +22,18 @@ import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.After; @@ -84,7 +84,7 @@ public class TestMasterProcedureEvents { @After public void tearDown() throws Exception { - for (HTableDescriptor htd: UTIL.getAdmin().listTables()) { + for (TableDescriptor htd: UTIL.getAdmin().listTableDescriptors()) { LOG.info("Tear down, remove table=" + htd.getTableName()); UTIL.deleteTable(htd.getTableName()); } @@ -96,58 +96,22 @@ public class TestMasterProcedureEvents { HMaster master = UTIL.getMiniHBaseCluster().getMaster(); ProcedureExecutor procExec = master.getMasterProcedureExecutor(); - HRegionInfo hri = new HRegionInfo(tableName); - HTableDescriptor htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor("f")); + RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build(); - while (!master.isInitialized()) Thread.sleep(250); + while (!master.isInitialized()) { + Thread.sleep(250); + } master.setInitialized(false); // fake it, set back later // check event wait/wake testProcedureEventWaitWake(master, master.getInitializedEvent(), - new CreateTableProcedure(procExec.getEnvironment(), htd, new HRegionInfo[] { hri })); - } - - @Test - public void testServerCrashProcedureEvent() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); - HMaster master = UTIL.getMiniHBaseCluster().getMaster(); - ProcedureExecutor procExec = master.getMasterProcedureExecutor(); - - while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() || - master.getAssignmentManager().getRegionStates().hasRegionsInTransition()) { - Thread.sleep(25); - } - - UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]); - try (Table t = UTIL.getConnection().getTable(tableName)) { - // Load the table with a bit of data so some logs to split and some edits in each region. - UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]); - } - - master.setServerCrashProcessingEnabled(false); // fake it, set back later - - // Kill a server. Master will notice but do nothing other than add it to list of dead servers. - HRegionServer hrs = getServerWithRegions(); - boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(hrs.getServerName()); - UTIL.getHBaseCluster().killRegionServer(hrs.getServerName()); - hrs.join(); - - // Wait until the expiration of the server has arrived at the master. We won't process it - // by queuing a ServerCrashProcedure because we have disabled crash processing... but wait - // here so ServerManager gets notice and adds expired server to appropriate queues. - while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10); - - // Do some of the master processing of dead servers so when SCP runs, it has expected 'state'. - master.getServerManager().moveFromOnlineToDeadServers(hrs.getServerName()); - - // check event wait/wake - testProcedureEventWaitWake(master, master.getServerCrashProcessingEnabledEvent(), - new ServerCrashProcedure(procExec.getEnvironment(), hrs.getServerName(), true, carryingMeta)); + new CreateTableProcedure(procExec.getEnvironment(), htd, new RegionInfo[] { hri })); } - private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent event, - final Procedure proc) throws Exception { + private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent event, + final Procedure proc) throws Exception { final ProcedureExecutor procExec = master.getMasterProcedureExecutor(); final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureScheduler(); @@ -188,14 +152,4 @@ public class TestMasterProcedureEvents { " pollCalls=" + (procSched.getPollCalls() - startPollCalls) + " nullPollCalls=" + (procSched.getNullPollCalls() - startNullPollCalls)); } - - private HRegionServer getServerWithRegions() { - for (int i = 0; i < 3; ++i) { - HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i); - if (hrs.getNumberOfOnlineRegions() > 0) { - return hrs; - } - } - return null; - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java index 58d5d46d23..9f7fafeca6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java @@ -23,10 +23,10 @@ import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; @@ -102,13 +102,17 @@ public class TestServerCrashProcedure { testRecoveryAndDoubleExecution(false, true); } + private long getSCPProcId(ProcedureExecutor procExec) { + util.waitFor(30000, () -> !procExec.getProcedures().isEmpty()); + return procExec.getActiveProcIds().stream().mapToLong(Long::longValue).min().getAsLong(); + } + /** * Run server crash procedure steps twice to test idempotency and that we are persisting all * needed state. - * @throws Exception */ - private void testRecoveryAndDoubleExecution(final boolean carryingMeta, - final boolean doubleExecution) throws Exception { + private void testRecoveryAndDoubleExecution(boolean carryingMeta, boolean doubleExecution) + throws Exception { final TableName tableName = TableName.valueOf( "testRecoveryAndDoubleExecution-carryingMeta-" + carryingMeta); final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS, @@ -123,33 +127,29 @@ public class TestServerCrashProcedure { // Master's running of the server crash processing. final HMaster master = this.util.getHBaseCluster().getMaster(); final ProcedureExecutor procExec = master.getMasterProcedureExecutor(); - master.setServerCrashProcessingEnabled(false); // find the first server that match the request and executes the test ServerName rsToKill = null; - for (HRegionInfo hri : util.getHBaseAdmin().getTableRegions(tableName)) { + for (RegionInfo hri : util.getAdmin().getRegions(tableName)) { final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri); if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == carryingMeta) { rsToKill = serverName; break; } } - // kill the RS - AssignmentTestingUtil.killRs(util, rsToKill); - // Now, reenable processing else we can't get a lock on the ServerCrashProcedure. - master.setServerCrashProcessingEnabled(true); - // Do some of the master processing of dead servers so when SCP runs, it has expected 'state'. - master.getServerManager().moveFromOnlineToDeadServers(rsToKill); // Enable test flags and then queue the crash procedure. ProcedureTestingUtility.waitNoProcedureRunning(procExec); - ServerCrashProcedure scp = new ServerCrashProcedure(procExec.getEnvironment(), rsToKill, - true, carryingMeta); if (doubleExecution) { ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); - long procId = procExec.submitProcedure(scp); + // kill the RS + AssignmentTestingUtil.killRs(util, rsToKill); + long procId = getSCPProcId(procExec); // Now run through the procedure twice crashing the executor on each step... MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId); } else { - ProcedureTestingUtility.submitAndWait(procExec, scp); + // kill the RS + AssignmentTestingUtil.killRs(util, rsToKill); + long procId = getSCPProcId(procExec); + ProcedureTestingUtility.waitProcedure(procExec, procId); } // Assert all data came back. assertEquals(count, util.countRows(t)); -- 2.11.4.GIT