HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / procedure / SimpleRSProcedureManager.java
blob9ccee661586a4ceac9767aab0aa0f42433992071
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.procedure;
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorCompletionService;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.ThreadPoolExecutor;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Abortable;
34 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.Threads;
37 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
38 import org.apache.hadoop.hbase.errorhandling.ForeignException;
39 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
40 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
41 import org.apache.zookeeper.KeeperException;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 public class SimpleRSProcedureManager extends RegionServerProcedureManager {
47 private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class);
49 private RegionServerServices rss;
50 private ProcedureMemberRpcs memberRpcs;
51 private ProcedureMember member;
53 @Override
54 public void initialize(RegionServerServices rss) throws KeeperException {
55 this.rss = rss;
56 ZKWatcher zkw = rss.getZooKeeper();
57 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
59 ThreadPoolExecutor pool =
60 ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
61 this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
62 LOG.info("Initialized: " + rss.getServerName().toString());
65 @Override
66 public void start() {
67 this.memberRpcs.start(rss.getServerName().toString(), member);
68 LOG.info("Started.");
71 @Override
72 public void stop(boolean force) throws IOException {
73 LOG.info("stop: " + force);
74 try {
75 this.member.close();
76 } finally {
77 this.memberRpcs.close();
81 @Override
82 public String getProcedureSignature() {
83 return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
86 /**
87 * If in a running state, creates the specified subprocedure for handling a procedure.
88 * @return Subprocedure to submit to the ProcedureMember.
90 public Subprocedure buildSubprocedure(String name) {
92 // don't run a procedure if the parent is stop(ping)
93 if (rss.isStopping() || rss.isStopped()) {
94 throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
95 + ", because stopping/stopped!");
98 LOG.info("Attempting to run a procedure.");
99 ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
100 Configuration conf = rss.getConfiguration();
102 SimpleSubprocedurePool taskManager =
103 new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
104 return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
108 * Build the actual procedure runner that will do all the 'hard' work
110 public class SimleSubprocedureBuilder implements SubprocedureFactory {
112 @Override
113 public Subprocedure buildSubprocedure(String name, byte[] data) {
114 LOG.info("Building procedure: " + name);
115 return SimpleRSProcedureManager.this.buildSubprocedure(name);
119 public static class SimpleSubprocedurePool implements Closeable, Abortable {
121 private final ExecutorCompletionService<Void> taskPool;
122 private final ExecutorService executor;
123 private volatile boolean aborted;
124 private final List<Future<Void>> futures = new ArrayList<>();
125 private final String name;
127 public SimpleSubprocedurePool(String name, Configuration conf) {
128 this.name = name;
129 executor = Executors.newSingleThreadExecutor(
130 new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d")
131 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
132 taskPool = new ExecutorCompletionService<>(executor);
136 * Submit a task to the pool.
138 public void submitTask(final Callable<Void> task) {
139 Future<Void> f = this.taskPool.submit(task);
140 futures.add(f);
144 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
146 * @return <tt>true</tt> on success, <tt>false</tt> otherwise
147 * @throws ForeignException
149 public boolean waitForOutstandingTasks() throws ForeignException {
150 LOG.debug("Waiting for procedure to finish.");
152 try {
153 for (Future<Void> f: futures) {
154 f.get();
156 return true;
157 } catch (InterruptedException e) {
158 if (aborted) throw new ForeignException(
159 "Interrupted and found to be aborted while waiting for tasks!", e);
160 Thread.currentThread().interrupt();
161 } catch (ExecutionException e) {
162 if (e.getCause() instanceof ForeignException) {
163 throw (ForeignException) e.getCause();
165 throw new ForeignException(name, e.getCause());
166 } finally {
167 // close off remaining tasks
168 for (Future<Void> f: futures) {
169 if (!f.isDone()) {
170 f.cancel(true);
174 return false;
178 * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
179 * finish
181 @Override
182 public void close() {
183 executor.shutdown();
186 @Override
187 public void abort(String why, Throwable e) {
188 if (this.aborted) return;
190 this.aborted = true;
191 LOG.warn("Aborting because: " + why, e);
192 this.executor.shutdownNow();
195 @Override
196 public boolean isAborted() {
197 return this.aborted;
201 public class SimpleSubprocedure extends Subprocedure {
202 private final RegionServerServices rss;
203 private final SimpleSubprocedurePool taskManager;
205 public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
206 ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
207 super(member, name, errorListener, 500, 60000);
208 LOG.info("Constructing a SimpleSubprocedure.");
209 this.rss = rss;
210 this.taskManager = taskManager;
214 * Callable task.
215 * TODO. We don't need a thread pool to execute roll log. This can be simplified
216 * with no use of subprocedurepool.
218 class RSSimpleTask implements Callable<Void> {
219 RSSimpleTask() {}
221 @Override
222 public Void call() throws Exception {
223 LOG.info("Execute subprocedure on " + rss.getServerName().toString());
224 return null;
229 private void execute() throws ForeignException {
231 monitor.rethrowException();
233 // running a task (e.g., roll log, flush table) on region server
234 taskManager.submitTask(new RSSimpleTask());
235 monitor.rethrowException();
237 // wait for everything to complete.
238 taskManager.waitForOutstandingTasks();
239 monitor.rethrowException();
243 @Override
244 public void acquireBarrier() throws ForeignException {
245 // do nothing, executing in inside barrier step.
249 * do a log roll.
251 @Override
252 public byte[] insideBarrier() throws ForeignException {
253 execute();
254 return Bytes.toBytes(SimpleMasterProcedureManager.SIMPLE_DATA);
258 * Cancel threads if they haven't finished.
260 @Override
261 public void cleanup(Exception e) {
262 taskManager.abort("Aborting simple subprocedure tasks due to error", e);