3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import java
.io
.InterruptedIOException
;
22 import java
.io
.IOException
;
23 import java
.lang
.reflect
.Constructor
;
24 import java
.lang
.reflect
.InvocationTargetException
;
25 import java
.util
.List
;
26 import java
.util
.concurrent
.TimeUnit
;
27 import java
.util
.function
.Supplier
;
29 import org
.apache
.yetus
.audience
.InterfaceAudience
;
30 import org
.slf4j
.Logger
;
31 import org
.slf4j
.LoggerFactory
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
34 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
37 * Utility used running a cluster all in the one JVM.
39 @InterfaceAudience.Private
40 public class JVMClusterUtil
{
41 private static final Logger LOG
= LoggerFactory
.getLogger(JVMClusterUtil
.class);
44 * Datastructure to hold RegionServer Thread and RegionServer instance
46 public static class RegionServerThread
extends Thread
{
47 private final HRegionServer regionServer
;
49 public RegionServerThread(final HRegionServer r
, final int index
) {
50 super(r
, "RS:" + index
+ ";" + r
.getServerName().toShortString());
51 this.regionServer
= r
;
54 /** @return the region server */
55 public HRegionServer
getRegionServer() {
56 return this.regionServer
;
60 * Block until the region server has come online, indicating it is ready
63 public void waitForServerOnline() {
64 // The server is marked online after the init method completes inside of
65 // the HRS#run method. HRS#init can fail for whatever region. In those
66 // cases, we'll jump out of the run without setting online flag. Check
67 // stopRequested so we don't wait here a flag that will never be flipped.
68 regionServer
.waitForServerOnline();
73 * Creates a {@link RegionServerThread}.
74 * Call 'start' on the returned thread to make it run.
75 * @param c Configuration to use.
76 * @param hrsc Class to create.
77 * @param index Used distinguishing the object returned.
79 * @return Region server added.
81 public static JVMClusterUtil
.RegionServerThread
createRegionServerThread(final Configuration c
,
82 final Class
<?
extends HRegionServer
> hrsc
, final int index
) throws IOException
{
85 Constructor
<?
extends HRegionServer
> ctor
= hrsc
.getConstructor(Configuration
.class);
86 ctor
.setAccessible(true);
87 server
= ctor
.newInstance(c
);
88 } catch (InvocationTargetException ite
) {
89 Throwable target
= ite
.getTargetException();
90 throw new RuntimeException("Failed construction of RegionServer: " +
91 hrsc
.toString() + ((target
.getCause() != null)?
92 target
.getCause().getMessage(): ""), target
);
93 } catch (Exception e
) {
94 IOException ioe
= new IOException();
98 return new JVMClusterUtil
.RegionServerThread(server
, index
);
103 * Datastructure to hold Master Thread and Master instance
105 public static class MasterThread
extends Thread
{
106 private final HMaster master
;
108 public MasterThread(final HMaster m
, final int index
) {
109 super(m
, "M:" + index
+ ";" + m
.getServerName().toShortString());
113 /** @return the master */
114 public HMaster
getMaster() {
120 * Creates a {@link MasterThread}.
121 * Call 'start' on the returned thread to make it run.
122 * @param c Configuration to use.
123 * @param hmc Class to create.
124 * @param index Used distinguishing the object returned.
125 * @throws IOException
126 * @return Master added.
128 public static JVMClusterUtil
.MasterThread
createMasterThread(final Configuration c
,
129 final Class
<?
extends HMaster
> hmc
, final int index
) throws IOException
{
132 server
= hmc
.getConstructor(Configuration
.class).newInstance(c
);
133 } catch (InvocationTargetException ite
) {
134 Throwable target
= ite
.getTargetException();
135 throw new RuntimeException("Failed construction of Master: " +
136 hmc
.toString() + ((target
.getCause() != null)?
137 target
.getCause().getMessage(): ""), target
);
138 } catch (Exception e
) {
139 IOException ioe
= new IOException();
143 return new JVMClusterUtil
.MasterThread(server
, index
);
146 private static JVMClusterUtil
.MasterThread
findActiveMaster(
147 List
<JVMClusterUtil
.MasterThread
> masters
) {
148 for (JVMClusterUtil
.MasterThread t
: masters
) {
149 if (t
.master
.isActiveMaster()) {
158 * Start the cluster. Waits until there is a primary master initialized
159 * and returns its address.
161 * @param regionservers
162 * @return Address to use contacting primary master.
164 public static String
startup(final List
<JVMClusterUtil
.MasterThread
> masters
,
165 final List
<JVMClusterUtil
.RegionServerThread
> regionservers
) throws IOException
{
166 // Implementation note: This method relies on timed sleeps in a loop. It's not great, and
167 // should probably be re-written to use actual synchronization objects, but it's ok for now
169 Configuration configuration
= null;
171 if (masters
== null || masters
.isEmpty()) {
175 for (JVMClusterUtil
.MasterThread t
: masters
) {
176 configuration
= t
.getMaster().getConfiguration();
180 // Wait for an active master
181 // having an active master before starting the region threads allows
182 // then to succeed on their connection to master
183 final int startTimeout
= configuration
!= null ? Integer
.parseInt(
184 configuration
.get("hbase.master.start.timeout.localHBaseCluster", "30000")) : 30000;
185 waitForEvent(startTimeout
, "active", () -> findActiveMaster(masters
) != null);
187 if (regionservers
!= null) {
188 for (JVMClusterUtil
.RegionServerThread t
: regionservers
) {
193 // Wait for an active master to be initialized (implies being master)
194 // with this, when we return the cluster is complete
195 final int initTimeout
= configuration
!= null ? Integer
.parseInt(
196 configuration
.get("hbase.master.init.timeout.localHBaseCluster", "200000")) : 200000;
197 waitForEvent(initTimeout
, "initialized", () -> {
198 JVMClusterUtil
.MasterThread t
= findActiveMaster(masters
);
199 // master thread should never be null at this point, but let's keep the check anyway
200 return t
!= null && t
.master
.isInitialized();
204 return findActiveMaster(masters
).master
.getServerName().toString();
208 * Utility method to wait some time for an event to occur, and then return control to the caller.
209 * @param millis How long to wait, in milliseconds.
210 * @param action The action that we are waiting for. Will be used in log message if the event
212 * @param check A Supplier that will be checked periodically to produce an updated true/false
213 * result indicating if the expected event has happened or not.
214 * @throws InterruptedIOException If we are interrupted while waiting for the event.
215 * @throws RuntimeException If we reach the specified timeout while waiting for the event.
217 private static void waitForEvent(long millis
, String action
, Supplier
<Boolean
> check
)
218 throws InterruptedIOException
{
219 long end
= System
.nanoTime() + TimeUnit
.MILLISECONDS
.toNanos(millis
);
226 if (System
.nanoTime() > end
) {
227 String msg
= "Master not " + action
+ " after " + millis
+ "ms";
228 Threads
.printThreadInfo(System
.out
, "Thread dump because: " + msg
);
229 throw new RuntimeException(msg
);
234 } catch (InterruptedException e
) {
235 throw (InterruptedIOException
)new InterruptedIOException().initCause(e
);
243 * @param regionservers
245 public static void shutdown(final List
<MasterThread
> masters
,
246 final List
<RegionServerThread
> regionservers
) {
247 LOG
.debug("Shutting down HBase Cluster");
248 if (masters
!= null) {
250 JVMClusterUtil
.MasterThread activeMaster
= null;
251 for (JVMClusterUtil
.MasterThread t
: masters
) {
252 // Master was killed but could be still considered as active. Check first if it is stopped.
253 if (!t
.master
.isStopped()) {
254 if (!t
.master
.isActiveMaster()) {
256 t
.master
.stopMaster();
257 } catch (IOException e
) {
258 LOG
.error("Exception occurred while stopping master", e
);
260 LOG
.info("Stopped backup Master {} is stopped: {}",
261 t
.master
.hashCode(), t
.master
.isStopped());
263 if (activeMaster
!= null) {
264 LOG
.warn("Found more than 1 active master, hash {}", activeMaster
.master
.hashCode());
267 LOG
.debug("Found active master hash={}, stopped={}",
268 t
.master
.hashCode(), t
.master
.isStopped());
273 if (activeMaster
!= null) {
275 activeMaster
.master
.shutdown();
276 } catch (IOException e
) {
277 LOG
.error("Exception occurred in HMaster.shutdown()", e
);
281 boolean wasInterrupted
= false;
282 final long maxTime
= System
.currentTimeMillis() + 30 * 1000;
283 if (regionservers
!= null) {
285 for (RegionServerThread t
: regionservers
) {
286 t
.getRegionServer().stop("Shutdown requested");
288 for (RegionServerThread t
: regionservers
) {
289 long now
= System
.currentTimeMillis();
290 if (t
.isAlive() && !wasInterrupted
&& now
< maxTime
) {
292 t
.join(maxTime
- now
);
293 } catch (InterruptedException e
) {
294 LOG
.info("Got InterruptedException on shutdown - " +
295 "not waiting anymore on region server ends", e
);
296 wasInterrupted
= true; // someone wants us to speed up.
301 // Let's try to interrupt the remaining threads if any.
302 for (int i
= 0; i
< 100; ++i
) {
303 boolean atLeastOneLiveServer
= false;
304 for (RegionServerThread t
: regionservers
) {
306 atLeastOneLiveServer
= true;
308 LOG
.warn("RegionServerThreads remaining, give one more chance before interrupting");
310 } catch (InterruptedException e
) {
311 wasInterrupted
= true;
315 if (!atLeastOneLiveServer
) break;
316 for (RegionServerThread t
: regionservers
) {
318 LOG
.warn("RegionServerThreads taking too long to stop, interrupting; thread dump " +
319 "if > 3 attempts: i=" + i
);
321 Threads
.printThreadInfo(System
.out
, "Thread dump " + t
.getName());
329 if (masters
!= null) {
330 for (JVMClusterUtil
.MasterThread t
: masters
) {
331 while (t
.master
.isAlive() && !wasInterrupted
) {
333 // The below has been replaced to debug sometime hangs on end of
335 // this.master.join():
336 Threads
.threadDumpingIsAlive(t
.master
.getThread());
337 } catch(InterruptedException e
) {
338 LOG
.info("Got InterruptedException on shutdown - " +
339 "not waiting anymore on master ends", e
);
340 wasInterrupted
= true;
345 LOG
.info("Shutdown of " +
346 ((masters
!= null) ? masters
.size() : "0") + " master(s) and " +
347 ((regionservers
!= null) ? regionservers
.size() : "0") +
348 " regionserver(s) " + (wasInterrupted ?
"interrupted" : "complete"));
351 Thread
.currentThread().interrupt();