HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / util / JVMClusterUtil.java
blob1e2ac3ebb973bb00a022283fd11300aed05b326f
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import java.io.InterruptedIOException;
22 import java.io.IOException;
23 import java.lang.reflect.Constructor;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.Supplier;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.yetus.audience.InterfaceAudience;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.master.HMaster;
35 import org.apache.hadoop.hbase.regionserver.HRegionServer;
36 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
38 /**
39 * Utility used running a cluster all in the one JVM.
41 @InterfaceAudience.Private
42 public class JVMClusterUtil {
43 private static final Logger LOG = LoggerFactory.getLogger(JVMClusterUtil.class);
45 /**
46 * Datastructure to hold RegionServer Thread and RegionServer instance
48 public static class RegionServerThread extends Thread {
49 private final HRegionServer regionServer;
51 public RegionServerThread(final HRegionServer r, final int index) {
52 super(r, "RS:" + index + ";" + r.getServerName().toShortString());
53 this.regionServer = r;
56 /** @return the region server */
57 public HRegionServer getRegionServer() {
58 return this.regionServer;
61 /**
62 * Block until the region server has come online, indicating it is ready
63 * to be used.
65 public void waitForServerOnline() {
66 // The server is marked online after the init method completes inside of
67 // the HRS#run method. HRS#init can fail for whatever region. In those
68 // cases, we'll jump out of the run without setting online flag. Check
69 // stopRequested so we don't wait here a flag that will never be flipped.
70 regionServer.waitForServerOnline();
74 /**
75 * Creates a {@link RegionServerThread}.
76 * Call 'start' on the returned thread to make it run.
77 * @param c Configuration to use.
78 * @param hrsc Class to create.
79 * @param index Used distinguishing the object returned.
80 * @throws IOException
81 * @return Region server added.
83 public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c,
84 final Class<? extends HRegionServer> hrsc, final int index) throws IOException {
85 HRegionServer server;
86 try {
87 Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
88 ctor.setAccessible(true);
89 server = ctor.newInstance(c);
90 } catch (InvocationTargetException ite) {
91 Throwable target = ite.getTargetException();
92 throw new RuntimeException("Failed construction of RegionServer: " +
93 hrsc.toString() + ((target.getCause() != null)?
94 target.getCause().getMessage(): ""), target);
95 } catch (Exception e) {
96 throw new IOException(e);
98 return new JVMClusterUtil.RegionServerThread(server, index);
103 * Datastructure to hold Master Thread and Master instance
105 public static class MasterThread extends Thread {
106 private final HMaster master;
108 public MasterThread(final HMaster m, final int index) {
109 super(m, "M:" + index + ";" + m.getServerName().toShortString());
110 this.master = m;
113 /** @return the master */
114 public HMaster getMaster() {
115 return this.master;
120 * Creates a {@link MasterThread}.
121 * Call 'start' on the returned thread to make it run.
122 * @param c Configuration to use.
123 * @param hmc Class to create.
124 * @param index Used distinguishing the object returned.
125 * @throws IOException
126 * @return Master added.
128 public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
129 final Class<? extends HMaster> hmc, final int index) throws IOException {
130 HMaster server;
131 try {
132 server = hmc.getConstructor(Configuration.class).newInstance(c);
133 } catch (InvocationTargetException ite) {
134 Throwable target = ite.getTargetException();
135 throw new RuntimeException("Failed construction of Master: " +
136 hmc.toString() + ((target.getCause() != null)?
137 target.getCause().getMessage(): ""), target);
138 } catch (Exception e) {
139 throw new IOException(e);
141 // Needed if a master based registry is configured for internal cluster connections. Here, we
142 // just add the current master host port since we do not know other master addresses up front
143 // in mini cluster tests.
144 c.set(HConstants.MASTER_ADDRS_KEY,
145 Preconditions.checkNotNull(server.getServerName().getAddress()).toString());
146 return new JVMClusterUtil.MasterThread(server, index);
149 private static JVMClusterUtil.MasterThread findActiveMaster(
150 List<JVMClusterUtil.MasterThread> masters) {
151 for (JVMClusterUtil.MasterThread t : masters) {
152 if (t.master.isActiveMaster()) {
153 return t;
157 return null;
161 * Start the cluster. Waits until there is a primary master initialized
162 * and returns its address.
163 * @param masters
164 * @param regionservers
165 * @return Address to use contacting primary master.
167 public static String startup(final List<JVMClusterUtil.MasterThread> masters,
168 final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
169 // Implementation note: This method relies on timed sleeps in a loop. It's not great, and
170 // should probably be re-written to use actual synchronization objects, but it's ok for now
172 Configuration configuration = null;
174 if (masters == null || masters.isEmpty()) {
175 return null;
178 for (JVMClusterUtil.MasterThread t : masters) {
179 configuration = t.getMaster().getConfiguration();
180 t.start();
183 // Wait for an active master
184 // having an active master before starting the region threads allows
185 // then to succeed on their connection to master
186 final int startTimeout = configuration != null ? Integer.parseInt(
187 configuration.get("hbase.master.start.timeout.localHBaseCluster", "30000")) : 30000;
188 waitForEvent(startTimeout, "active", () -> findActiveMaster(masters) != null);
190 if (regionservers != null) {
191 for (JVMClusterUtil.RegionServerThread t: regionservers) {
192 t.start();
196 // Wait for an active master to be initialized (implies being master)
197 // with this, when we return the cluster is complete
198 final int initTimeout = configuration != null ? Integer.parseInt(
199 configuration.get("hbase.master.init.timeout.localHBaseCluster", "200000")) : 200000;
200 waitForEvent(initTimeout, "initialized", () -> {
201 JVMClusterUtil.MasterThread t = findActiveMaster(masters);
202 // master thread should never be null at this point, but let's keep the check anyway
203 return t != null && t.master.isInitialized();
207 return findActiveMaster(masters).master.getServerName().toString();
211 * Utility method to wait some time for an event to occur, and then return control to the caller.
212 * @param millis How long to wait, in milliseconds.
213 * @param action The action that we are waiting for. Will be used in log message if the event
214 * does not occur.
215 * @param check A Supplier that will be checked periodically to produce an updated true/false
216 * result indicating if the expected event has happened or not.
217 * @throws InterruptedIOException If we are interrupted while waiting for the event.
218 * @throws RuntimeException If we reach the specified timeout while waiting for the event.
220 private static void waitForEvent(long millis, String action, Supplier<Boolean> check)
221 throws InterruptedIOException {
222 long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis);
224 while (true) {
225 if (check.get()) {
226 return;
229 if (System.nanoTime() > end) {
230 String msg = "Master not " + action + " after " + millis + "ms";
231 Threads.printThreadInfo(System.out, "Thread dump because: " + msg);
232 throw new RuntimeException(msg);
235 try {
236 Thread.sleep(100);
237 } catch (InterruptedException e) {
238 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
245 * @param masters
246 * @param regionservers
248 public static void shutdown(final List<MasterThread> masters,
249 final List<RegionServerThread> regionservers) {
250 LOG.debug("Shutting down HBase Cluster");
251 if (masters != null) {
252 // Do backups first.
253 JVMClusterUtil.MasterThread activeMaster = null;
254 for (JVMClusterUtil.MasterThread t : masters) {
255 // Master was killed but could be still considered as active. Check first if it is stopped.
256 if (!t.master.isStopped()) {
257 if (!t.master.isActiveMaster()) {
258 try {
259 t.master.stopMaster();
260 } catch (IOException e) {
261 LOG.error("Exception occurred while stopping master", e);
263 LOG.info("Stopped backup Master {} is stopped: {}",
264 t.master.hashCode(), t.master.isStopped());
265 } else {
266 if (activeMaster != null) {
267 LOG.warn("Found more than 1 active master, hash {}", activeMaster.master.hashCode());
269 activeMaster = t;
270 LOG.debug("Found active master hash={}, stopped={}",
271 t.master.hashCode(), t.master.isStopped());
275 // Do active after.
276 if (activeMaster != null) {
277 try {
278 activeMaster.master.shutdown();
279 } catch (IOException e) {
280 LOG.error("Exception occurred in HMaster.shutdown()", e);
284 boolean wasInterrupted = false;
285 final long maxTime = EnvironmentEdgeManager.currentTime() + 30 * 1000;
286 if (regionservers != null) {
287 // first try nicely.
288 for (RegionServerThread t : regionservers) {
289 t.getRegionServer().stop("Shutdown requested");
291 for (RegionServerThread t : regionservers) {
292 long now = EnvironmentEdgeManager.currentTime();
293 if (t.isAlive() && !wasInterrupted && now < maxTime) {
294 try {
295 t.join(maxTime - now);
296 } catch (InterruptedException e) {
297 LOG.info("Got InterruptedException on shutdown - " +
298 "not waiting anymore on region server ends", e);
299 wasInterrupted = true; // someone wants us to speed up.
304 // Let's try to interrupt the remaining threads if any.
305 for (int i = 0; i < 100; ++i) {
306 boolean atLeastOneLiveServer = false;
307 for (RegionServerThread t : regionservers) {
308 if (t.isAlive()) {
309 atLeastOneLiveServer = true;
310 try {
311 LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
312 t.join(1000);
313 } catch (InterruptedException e) {
314 wasInterrupted = true;
318 if (!atLeastOneLiveServer) break;
319 for (RegionServerThread t : regionservers) {
320 if (t.isAlive()) {
321 LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump " +
322 "if > 3 attempts: i=" + i);
323 if (i > 3) {
324 Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
326 t.interrupt();
332 if (masters != null) {
333 for (JVMClusterUtil.MasterThread t : masters) {
334 while (t.master.isAlive() && !wasInterrupted) {
335 try {
336 // The below has been replaced to debug sometime hangs on end of
337 // tests.
338 // this.master.join():
339 Threads.threadDumpingIsAlive(t.master);
340 } catch(InterruptedException e) {
341 LOG.info("Got InterruptedException on shutdown - " +
342 "not waiting anymore on master ends", e);
343 wasInterrupted = true;
348 LOG.info("Shutdown of " +
349 ((masters != null) ? masters.size() : "0") + " master(s) and " +
350 ((regionservers != null) ? regionservers.size() : "0") +
351 " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
353 if (wasInterrupted){
354 Thread.currentThread().interrupt();