HBASE-23949 refactor loadBalancer implements for rsgroup balance by table to achieve...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / MultithreadedTestUtil.java
blob99aef6441291f2ff9ef3733b64094a6c2383152c
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase;
21 import java.io.IOException;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Future;
28 import org.apache.hadoop.conf.Configuration;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 public abstract class MultithreadedTestUtil {
34 private static final Logger LOG =
35 LoggerFactory.getLogger(MultithreadedTestUtil.class);
37 public static class TestContext {
38 private final Configuration conf;
39 private Throwable err = null;
40 private boolean stopped = false;
41 private int threadDoneCount = 0;
42 private Set<TestThread> testThreads = new HashSet<>();
44 public TestContext(Configuration configuration) {
45 this.conf = configuration;
48 protected Configuration getConf() {
49 return conf;
52 public synchronized boolean shouldRun() {
53 return !stopped && err == null;
56 public void addThread(TestThread t) {
57 testThreads.add(t);
60 public void startThreads() {
61 for (TestThread t : testThreads) {
62 t.start();
66 public void waitFor(long millis) throws Exception {
67 long endTime = System.currentTimeMillis() + millis;
68 while (!stopped) {
69 long left = endTime - System.currentTimeMillis();
70 if (left <= 0) break;
71 synchronized (this) {
72 checkException();
73 wait(left);
77 private synchronized void checkException() throws Exception {
78 if (err != null) {
79 throw new RuntimeException("Deferred", err);
83 public synchronized void threadFailed(Throwable t) {
84 if (err == null) err = t;
85 LOG.error("Failed!", err);
86 notify();
89 public synchronized void threadDone() {
90 threadDoneCount++;
93 public void setStopFlag(boolean s) throws Exception {
94 synchronized (this) {
95 stopped = s;
99 public void stop() throws Exception {
100 synchronized (this) {
101 stopped = true;
103 for (TestThread t : testThreads) {
104 t.join();
106 checkException();
111 * A thread that can be added to a test context, and properly
112 * passes exceptions through.
114 public static abstract class TestThread extends Thread {
115 protected final TestContext ctx;
116 protected boolean stopped;
118 public TestThread(TestContext ctx) {
119 this.ctx = ctx;
122 @Override
123 public void run() {
124 try {
125 doWork();
126 } catch (Throwable t) {
127 ctx.threadFailed(t);
129 ctx.threadDone();
132 public abstract void doWork() throws Exception;
134 protected void stopTestThread() {
135 this.stopped = true;
140 * A test thread that performs a repeating operation.
142 public static abstract class RepeatingTestThread extends TestThread {
143 public RepeatingTestThread(TestContext ctx) {
144 super(ctx);
147 @Override
148 public final void doWork() throws Exception {
149 try {
150 while (ctx.shouldRun() && !stopped) {
151 doAnAction();
153 } finally {
154 workDone();
158 public abstract void doAnAction() throws Exception;
159 public void workDone() throws IOException {}
163 * Verify that no assertions have failed inside a future.
164 * Used for unit tests that spawn threads. E.g.,
165 * <p>
166 * <pre>
167 * List&lt;Future&lt;Void>> results = Lists.newArrayList();
168 * Future&lt;Void> f = executor.submit(new Callable&lt;Void> {
169 * public Void call() {
170 * assertTrue(someMethod());
172 * });
173 * results.add(f);
174 * assertOnFutures(results);
175 * </pre>
176 * @param threadResults A list of futures
177 * @throws InterruptedException If interrupted when waiting for a result
178 * from one of the futures
179 * @throws ExecutionException If an exception other than AssertionError
180 * occurs inside any of the futures
182 public static void assertOnFutures(List<Future<?>> threadResults)
183 throws InterruptedException, ExecutionException {
184 for (Future<?> threadResult : threadResults) {
185 try {
186 threadResult.get();
187 } catch (ExecutionException e) {
188 if (e.getCause() instanceof AssertionError) {
189 throw (AssertionError) e.getCause();
191 throw e;