HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / procedure / SimpleRSProcedureManager.java
blobd90dbde3534a73f9fb4aa9533229cfc7342467fa
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.procedure;
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorCompletionService;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Abortable;
34 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.Threads;
37 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
38 import org.apache.hadoop.hbase.errorhandling.ForeignException;
39 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
40 import org.apache.zookeeper.KeeperException;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 public class SimpleRSProcedureManager extends RegionServerProcedureManager {
46 private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class);
48 private RegionServerServices rss;
49 private ProcedureMemberRpcs memberRpcs;
50 private ProcedureMember member;
52 @Override
53 public void initialize(RegionServerServices rss) throws KeeperException {
54 this.rss = rss;
55 ZKWatcher zkw = rss.getZooKeeper();
56 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
58 ThreadPoolExecutor pool =
59 ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
60 this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
61 LOG.info("Initialized: " + rss.getServerName().toString());
64 @Override
65 public void start() {
66 this.memberRpcs.start(rss.getServerName().toString(), member);
67 LOG.info("Started.");
70 @Override
71 public void stop(boolean force) throws IOException {
72 LOG.info("stop: " + force);
73 try {
74 this.member.close();
75 } finally {
76 this.memberRpcs.close();
80 @Override
81 public String getProcedureSignature() {
82 return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
85 /**
86 * If in a running state, creates the specified subprocedure for handling a procedure.
87 * @return Subprocedure to submit to the ProcedureMemeber.
89 public Subprocedure buildSubprocedure(String name) {
91 // don't run a procedure if the parent is stop(ping)
92 if (rss.isStopping() || rss.isStopped()) {
93 throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
94 + ", because stopping/stopped!");
97 LOG.info("Attempting to run a procedure.");
98 ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
99 Configuration conf = rss.getConfiguration();
101 SimpleSubprocedurePool taskManager =
102 new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
103 return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
107 * Build the actual procedure runner that will do all the 'hard' work
109 public class SimleSubprocedureBuilder implements SubprocedureFactory {
111 @Override
112 public Subprocedure buildSubprocedure(String name, byte[] data) {
113 LOG.info("Building procedure: " + name);
114 return SimpleRSProcedureManager.this.buildSubprocedure(name);
118 public class SimpleSubprocedurePool implements Closeable, Abortable {
120 private final ExecutorCompletionService<Void> taskPool;
121 private final ThreadPoolExecutor executor;
122 private volatile boolean aborted;
123 private final List<Future<Void>> futures = new ArrayList<>();
124 private final String name;
126 public SimpleSubprocedurePool(String name, Configuration conf) {
127 this.name = name;
128 executor = new ThreadPoolExecutor(1, 1, 500,
129 TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
130 Threads.newDaemonThreadFactory("rs(" + name + ")-procedure"));
131 taskPool = new ExecutorCompletionService<>(executor);
135 * Submit a task to the pool.
137 public void submitTask(final Callable<Void> task) {
138 Future<Void> f = this.taskPool.submit(task);
139 futures.add(f);
143 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
145 * @return <tt>true</tt> on success, <tt>false</tt> otherwise
146 * @throws ForeignException
148 public boolean waitForOutstandingTasks() throws ForeignException {
149 LOG.debug("Waiting for procedure to finish.");
151 try {
152 for (Future<Void> f: futures) {
153 f.get();
155 return true;
156 } catch (InterruptedException e) {
157 if (aborted) throw new ForeignException(
158 "Interrupted and found to be aborted while waiting for tasks!", e);
159 Thread.currentThread().interrupt();
160 } catch (ExecutionException e) {
161 if (e.getCause() instanceof ForeignException) {
162 throw (ForeignException) e.getCause();
164 throw new ForeignException(name, e.getCause());
165 } finally {
166 // close off remaining tasks
167 for (Future<Void> f: futures) {
168 if (!f.isDone()) {
169 f.cancel(true);
173 return false;
177 * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
178 * finish
180 @Override
181 public void close() {
182 executor.shutdown();
185 @Override
186 public void abort(String why, Throwable e) {
187 if (this.aborted) return;
189 this.aborted = true;
190 LOG.warn("Aborting because: " + why, e);
191 this.executor.shutdownNow();
194 @Override
195 public boolean isAborted() {
196 return this.aborted;
200 public class SimpleSubprocedure extends Subprocedure {
201 private final RegionServerServices rss;
202 private final SimpleSubprocedurePool taskManager;
204 public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
205 ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
206 super(member, name, errorListener, 500, 60000);
207 LOG.info("Constructing a SimpleSubprocedure.");
208 this.rss = rss;
209 this.taskManager = taskManager;
213 * Callable task.
214 * TODO. We don't need a thread pool to execute roll log. This can be simplified
215 * with no use of subprocedurepool.
217 class RSSimpleTask implements Callable<Void> {
218 RSSimpleTask() {}
220 @Override
221 public Void call() throws Exception {
222 LOG.info("Execute subprocedure on " + rss.getServerName().toString());
223 return null;
228 private void execute() throws ForeignException {
230 monitor.rethrowException();
232 // running a task (e.g., roll log, flush table) on region server
233 taskManager.submitTask(new RSSimpleTask());
234 monitor.rethrowException();
236 // wait for everything to complete.
237 taskManager.waitForOutstandingTasks();
238 monitor.rethrowException();
242 @Override
243 public void acquireBarrier() throws ForeignException {
244 // do nothing, executing in inside barrier step.
248 * do a log roll.
250 @Override
251 public byte[] insideBarrier() throws ForeignException {
252 execute();
253 return Bytes.toBytes(SimpleMasterProcedureManager.SIMPLE_DATA);
257 * Cancel threads if they haven't finished.
259 @Override
260 public void cleanup(Exception e) {
261 taskManager.abort("Aborting simple subprocedure tasks due to error", e);