3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
;
21 import java
.io
.IOException
;
22 import java
.util
.HashSet
;
23 import java
.util
.List
;
25 import java
.util
.concurrent
.ExecutionException
;
26 import java
.util
.concurrent
.Future
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
30 import org
.slf4j
.Logger
;
31 import org
.slf4j
.LoggerFactory
;
33 public abstract class MultithreadedTestUtil
{
35 private static final Logger LOG
=
36 LoggerFactory
.getLogger(MultithreadedTestUtil
.class);
38 public static class TestContext
{
39 private final Configuration conf
;
40 private Throwable err
= null;
41 private boolean stopped
= false;
42 private int threadDoneCount
= 0;
43 private Set
<TestThread
> testThreads
= new HashSet
<>();
45 public TestContext(Configuration configuration
) {
46 this.conf
= configuration
;
49 protected Configuration
getConf() {
53 public synchronized boolean shouldRun() {
54 return !stopped
&& err
== null;
57 public void addThread(TestThread t
) {
61 public void startThreads() {
62 for (TestThread t
: testThreads
) {
67 public void waitFor(long millis
) throws Exception
{
68 long endTime
= EnvironmentEdgeManager
.currentTime() + millis
;
70 long left
= endTime
- EnvironmentEdgeManager
.currentTime();
78 private synchronized void checkException() throws Exception
{
80 throw new RuntimeException("Deferred", err
);
84 public synchronized void threadFailed(Throwable t
) {
85 if (err
== null) err
= t
;
86 LOG
.error("Failed!", err
);
90 public synchronized void threadDone() {
94 public void setStopFlag(boolean s
) throws Exception
{
100 public void stop() throws Exception
{
101 synchronized (this) {
104 for (TestThread t
: testThreads
) {
112 * A thread that can be added to a test context, and properly
113 * passes exceptions through.
115 public static abstract class TestThread
extends Thread
{
116 protected final TestContext ctx
;
117 protected boolean stopped
;
119 public TestThread(TestContext ctx
) {
127 } catch (Throwable t
) {
133 public abstract void doWork() throws Exception
;
135 protected void stopTestThread() {
141 * A test thread that performs a repeating operation.
143 public static abstract class RepeatingTestThread
extends TestThread
{
144 public RepeatingTestThread(TestContext ctx
) {
149 public final void doWork() throws Exception
{
151 while (ctx
.shouldRun() && !stopped
) {
159 public abstract void doAnAction() throws Exception
;
160 public void workDone() throws IOException
{}
164 * Verify that no assertions have failed inside a future.
165 * Used for unit tests that spawn threads. E.g.,
168 * List<Future<Void>> results = Lists.newArrayList();
169 * Future<Void> f = executor.submit(new Callable<Void> {
170 * public Void call() {
171 * assertTrue(someMethod());
175 * assertOnFutures(results);
177 * @param threadResults A list of futures
178 * @throws InterruptedException If interrupted when waiting for a result
179 * from one of the futures
180 * @throws ExecutionException If an exception other than AssertionError
181 * occurs inside any of the futures
183 public static void assertOnFutures(List
<Future
<?
>> threadResults
)
184 throws InterruptedException
, ExecutionException
{
185 for (Future
<?
> threadResult
: threadResults
) {
188 } catch (ExecutionException e
) {
189 if (e
.getCause() instanceof AssertionError
) {
190 throw (AssertionError
) e
.getCause();